retrofitted schedulers by renaming 'taken' to 'offerTaken' for the boolean to indicate whether an offer has been consumed.

This commit is contained in:
Pradyumna Kaushik 2017-02-04 16:59:25 -05:00
parent 7fc5b5d19e
commit eea0db0b3d
18 changed files with 54 additions and 54 deletions

View file

@ -58,7 +58,7 @@ func main() {
startTime := time.Now().Format("20060102150405")
logPrefix := *pcplogPrefix + "_" + startTime
scheduler := schedulers.NewBinPackSortedWattsSortedOffers(tasks, *ignoreWatts, logPrefix)
scheduler := schedulers.NewBinPackSortedWatts(tasks, *ignoreWatts, logPrefix)
driver, err := sched.NewMesosSchedulerDriver(sched.DriverConfig{
Master: *master,
Framework: &mesos.FrameworkInfo{
@ -72,8 +72,8 @@ func main() {
return
}
go pcp.Start(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix)
//go pcp.StartPCPLogAndExtremaDynamicCap(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix, *hiThreshold, *loThreshold)
//go pcp.Start(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix)
go pcp.StartPCPLogAndExtremaDynamicCap(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix, *hiThreshold, *loThreshold)
time.Sleep(1 * time.Second) // Take a second between starting PCP log and continuing
// Attempt to handle signint to not leave pmdumptext running

View file

@ -154,7 +154,7 @@ func (s *BinPackSortedWattsSortedOffers) ResourceOffers(driver sched.SchedulerDr
offer_cpu, offer_ram, offer_watts := offerUtils.OfferAgg(offer)
taken := false
offerTaken := false
totalWatts := 0.0
totalCPU := 0.0
totalRAM := 0.0
@ -175,7 +175,7 @@ func (s *BinPackSortedWattsSortedOffers) ResourceOffers(driver sched.SchedulerDr
(offer_cpu >= (totalCPU + task.CPU)) &&
(offer_ram >= (totalRAM + task.RAM)) {
taken = true
offerTaken = true
totalWatts += task.Watts
totalCPU += task.CPU
totalRAM += task.RAM
@ -203,7 +203,7 @@ func (s *BinPackSortedWattsSortedOffers) ResourceOffers(driver sched.SchedulerDr
}
}
if taken {
if offerTaken {
log.Printf("Starting on [%s]\n", offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
} else {

View file

@ -249,7 +249,7 @@ func (s *BinPackedPistonCapper) ResourceOffers(driver sched.SchedulerDriver, off
fitTasks := []*mesos.TaskInfo{}
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
taken := false
offerTaken := false
totalWatts := 0.0
totalCPU := 0.0
totalRAM := 0.0
@ -276,7 +276,7 @@ func (s *BinPackedPistonCapper) ResourceOffers(driver sched.SchedulerDriver, off
s.startCapping()
}
taken = true
offerTaken = true
totalWatts += task.Watts
totalCPU += task.CPU
totalRAM += task.RAM
@ -305,7 +305,7 @@ func (s *BinPackedPistonCapper) ResourceOffers(driver sched.SchedulerDriver, off
}
}
if taken {
if offerTaken {
// Updating the cap value for offer.Hostname
bpPistonMutex.Lock()
bpPistonCapValues[*offer.Hostname] += partialLoad

View file

@ -143,7 +143,7 @@ func (s *BinPackSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offers
offer_cpu, offer_ram, offer_watts := offerUtils.OfferAgg(offer)
taken := false
offerTaken := false
totalWatts := 0.0
totalCPU := 0.0
totalRAM := 0.0
@ -164,7 +164,7 @@ func (s *BinPackSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offers
(offer_cpu >= (totalCPU + task.CPU)) &&
(offer_ram >= (totalRAM + task.RAM)) {
taken = true
offerTaken = true
totalWatts += task.Watts
totalCPU += task.CPU
totalRAM += task.RAM
@ -192,7 +192,7 @@ func (s *BinPackSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offers
}
}
if taken {
if offerTaken {
log.Printf("Starting on [%s]\n", offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
} else {

View file

@ -166,7 +166,7 @@ func (s *BottomHeavy) pack(offers []*mesos.Offer, driver sched.SchedulerDriver)
totalWatts := 0.0
totalCPU := 0.0
totalRAM := 0.0
taken := false
offerTaken := false
for i := 0; i < len(s.largeTasks); i++ {
task := s.largeTasks[i]
@ -182,7 +182,7 @@ func (s *BottomHeavy) pack(offers []*mesos.Offer, driver sched.SchedulerDriver)
if (s.ignoreWatts || (offerWatts >= (totalWatts + wattsToConsider))) &&
(offerCPU >= (totalCPU + task.CPU)) &&
(offerRAM >= (totalRAM + task.RAM)) {
taken = true
offerTaken = true
totalWatts += wattsToConsider
totalCPU += task.CPU
totalRAM += task.RAM
@ -199,7 +199,7 @@ func (s *BottomHeavy) pack(offers []*mesos.Offer, driver sched.SchedulerDriver)
}
}
if taken {
if offerTaken {
log.Printf("Starting on [%s]\n", offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
} else {

View file

@ -143,7 +143,7 @@ func (s *BPSWClassMapWatts) ResourceOffers(driver sched.SchedulerDriver, offers
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
taken := false
offerTaken := false
totalWatts := 0.0
totalCPU := 0.0
totalRAM := 0.0
@ -168,7 +168,7 @@ func (s *BPSWClassMapWatts) ResourceOffers(driver sched.SchedulerDriver, offers
(offerRAM >= (totalRAM + task.RAM)) {
fmt.Println("Watts being used: ", task.ClassToWatts[powerClass])
taken = true
offerTaken = true
totalWatts += task.ClassToWatts[powerClass]
totalCPU += task.CPU
totalRAM += task.RAM
@ -196,7 +196,7 @@ func (s *BPSWClassMapWatts) ResourceOffers(driver sched.SchedulerDriver, offers
}
}
if taken {
if offerTaken {
log.Printf("Starting on [%s]\n", offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
} else {

View file

@ -242,7 +242,7 @@ func (s *BPSWClassMapWattsPistonCapping) ResourceOffers(driver sched.SchedulerDr
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
taken := false
offerTaken := false
totalWatts := 0.0
totalCPU := 0.0
totalRAM := 0.0
@ -275,7 +275,7 @@ func (s *BPSWClassMapWattsPistonCapping) ResourceOffers(driver sched.SchedulerDr
}
fmt.Println("Watts being used: ", task.ClassToWatts[powerClass])
taken = true
offerTaken = true
totalWatts += task.ClassToWatts[powerClass]
totalCPU += task.CPU
totalRAM += task.RAM
@ -303,7 +303,7 @@ func (s *BPSWClassMapWattsPistonCapping) ResourceOffers(driver sched.SchedulerDr
}
}
if taken {
if offerTaken {
// Updating the cap value for offer.Hostname
bpswClassMapWattsPistonMutex.Lock()
bpswClassMapWattsPistonCapValues[*offer.Hostname] += partialLoad

View file

@ -280,7 +280,7 @@ func (s *BPSWClassMapWattsProacCC) ResourceOffers(driver sched.SchedulerDriver,
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
taken := false
offerTaken := false
totalWatts := 0.0
totalCPU := 0.0
totalRAM := 0.0
@ -321,7 +321,7 @@ func (s *BPSWClassMapWattsProacCC) ResourceOffers(driver sched.SchedulerDriver,
log.Println("Failed to determine new cluster-wide cap:")
log.Println(err)
}
taken = true
offerTaken = true
totalWatts += task.ClassToWatts[powerClass]
totalCPU += task.CPU
totalRAM += task.RAM
@ -352,7 +352,7 @@ func (s *BPSWClassMapWattsProacCC) ResourceOffers(driver sched.SchedulerDriver,
}
}
if taken {
if offerTaken {
log.Printf("Starting on [%s]\n", offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
} else {

View file

@ -142,7 +142,7 @@ func (s *FirstFit) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.
// First fit strategy
taken := false
offerTaken := false
for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i]
@ -166,7 +166,7 @@ func (s *FirstFit) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.
log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
taken = true
offerTaken = true
fmt.Println("Inst: ", *task.Instances)
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
@ -187,7 +187,7 @@ func (s *FirstFit) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.
}
// If there was no match for the task
if !taken {
if !offerTaken {
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := offerUtils.OfferAgg(offer)

View file

@ -154,7 +154,7 @@ func (s *FirstFitSortedOffers) ResourceOffers(driver sched.SchedulerDriver, offe
// First fit strategy
taken := false
offerTaken := false
for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i]
@ -178,7 +178,7 @@ func (s *FirstFitSortedOffers) ResourceOffers(driver sched.SchedulerDriver, offe
log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
taken = true
offerTaken = true
fmt.Println("Inst: ", *task.Instances)
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
@ -199,7 +199,7 @@ func (s *FirstFitSortedOffers) ResourceOffers(driver sched.SchedulerDriver, offe
}
// If there was no match for the task
if !taken {
if !offerTaken {
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := offerUtils.OfferAgg(offer)

View file

@ -129,7 +129,7 @@ func (s *FirstFitSortedWattsClassMapWatts) ResourceOffers(driver sched.Scheduler
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
// First fit strategy
taken := false
offerTaken := false
for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i]
// Check host if it exists
@ -155,7 +155,7 @@ func (s *FirstFitSortedWattsClassMapWatts) ResourceOffers(driver sched.Scheduler
log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, []*mesos.TaskInfo{taskToSchedule}, mesosUtils.DefaultFilter)
taken = true
offerTaken = true
fmt.Println("Inst: ", *task.Instances)
*task.Instances--
if *task.Instances <= 0 {
@ -172,7 +172,7 @@ func (s *FirstFitSortedWattsClassMapWatts) ResourceOffers(driver sched.Scheduler
}
// If there was no match for the task
if !taken {
if !offerTaken {
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := offerUtils.OfferAgg(offer)

View file

@ -267,7 +267,7 @@ func (s *FirstFitSortedWattsClassMapWattsProacCC) ResourceOffers(driver sched.Sc
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
// First fit strategy
taken := false
offerTaken := false
for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i]
// Check host if it exists
@ -312,7 +312,7 @@ func (s *FirstFitSortedWattsClassMapWattsProacCC) ResourceOffers(driver sched.Sc
log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, []*mesos.TaskInfo{taskToSchedule}, mesosUtils.DefaultFilter)
taken = true
offerTaken = true
fmt.Println("Inst: ", *task.Instances)
*task.Instances--
if *task.Instances <= 0 {
@ -332,7 +332,7 @@ func (s *FirstFitSortedWattsClassMapWattsProacCC) ResourceOffers(driver sched.Sc
}
// If there was no match for the task
if !taken {
if !offerTaken {
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := offerUtils.OfferAgg(offer)

View file

@ -157,7 +157,7 @@ func (s *FirstFitSortedWattsSortedOffers) ResourceOffers(driver sched.SchedulerD
// First fit strategy
taken := false
offerTaken := false
for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i]
@ -181,7 +181,7 @@ func (s *FirstFitSortedWattsSortedOffers) ResourceOffers(driver sched.SchedulerD
log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
taken = true
offerTaken = true
fmt.Println("Inst: ", *task.Instances)
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
@ -201,7 +201,7 @@ func (s *FirstFitSortedWattsSortedOffers) ResourceOffers(driver sched.SchedulerD
}
// If there was no match for the task
if !taken {
if !offerTaken {
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := offerUtils.OfferAgg(offer)

View file

@ -145,7 +145,7 @@ func (s *FirstFitSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offer
// First fit strategy
taken := false
offerTaken := false
for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i]
@ -169,7 +169,7 @@ func (s *FirstFitSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offer
log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
taken = true
offerTaken = true
fmt.Println("Inst: ", *task.Instances)
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
@ -189,7 +189,7 @@ func (s *FirstFitSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offer
}
// If there was no match for the task
if !taken {
if !offerTaken {
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := offerUtils.OfferAgg(offer)

View file

@ -136,7 +136,7 @@ func (s *FirstFitWattsOnly) ResourceOffers(driver sched.SchedulerDriver, offers
// First fit strategy
taken := false
offerTaken := false
for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i]
@ -160,7 +160,7 @@ func (s *FirstFitWattsOnly) ResourceOffers(driver sched.SchedulerDriver, offers
log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
taken = true
offerTaken = true
fmt.Println("Inst: ", *task.Instances)
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
@ -181,7 +181,7 @@ func (s *FirstFitWattsOnly) ResourceOffers(driver sched.SchedulerDriver, offers
}
// If there was no match for the task
if !taken {
if !offerTaken {
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := offerUtils.OfferAgg(offer)

View file

@ -275,7 +275,7 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive
Cluster wide capping is currently performed at regular intervals of time.
*/
taken := false
offerTaken := false
for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i]
@ -293,7 +293,7 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive
fcfsMutex.Unlock()
s.startCapping()
}
taken = true
offerTaken = true
tempCap, err := s.capper.FCFSDeterminedCap(s.totalPower, &task)
if err == nil {
@ -331,7 +331,7 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive
}
// If no task fit the offer, then declining the offer.
if !taken {
if !offerTaken {
log.Printf("There is not enough resources to launch a task on Host: %s\n", offer.GetHostname())
cpus, mem, watts := offerUtils.OfferAgg(offer)

View file

@ -299,7 +299,7 @@ func (s *ProactiveClusterwideCapRanked) ResourceOffers(driver sched.SchedulerDri
Cluster wide capping is currently performed at regular intervals of time.
*/
taken := false
offerTaken := false
for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i]
@ -317,7 +317,7 @@ func (s *ProactiveClusterwideCapRanked) ResourceOffers(driver sched.SchedulerDri
rankedMutex.Unlock()
s.startCapping()
}
taken = true
offerTaken = true
tempCap, err := s.capper.FCFSDeterminedCap(s.totalPower, &task)
if err == nil {
@ -354,7 +354,7 @@ func (s *ProactiveClusterwideCapRanked) ResourceOffers(driver sched.SchedulerDri
}
// If no tasks fit the offer, then declining the offer.
if !taken {
if !offerTaken {
log.Printf("There is not enough resources to launch a task on Host: %s\n", offer.GetHostname())
cpus, mem, watts := offerUtils.OfferAgg(offer)

View file

@ -228,7 +228,7 @@ func (s *TopHeavy) spread(offers []*mesos.Offer, driver sched.SchedulerDriver) {
tasks := []*mesos.TaskInfo{}
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
taken := false
offerTaken := false
for i := 0; i < len(s.largeTasks); i++ {
task := s.largeTasks[i]
powerClass := offerUtils.PowerClass(offer)
@ -240,7 +240,7 @@ func (s *TopHeavy) spread(offers []*mesos.Offer, driver sched.SchedulerDriver) {
}
if (s.ignoreWatts || (offerWatts >= wattsToConsider)) &&
(offerCPU >= task.CPU) && (offerRAM >= task.RAM) {
taken = true
offerTaken = true
tasks = append(tasks, s.createTaskInfoAndLogSchedTrace(offer, powerClass, task))
log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
@ -254,7 +254,7 @@ func (s *TopHeavy) spread(offers []*mesos.Offer, driver sched.SchedulerDriver) {
}
}
if !taken {
if !offerTaken {
// If there was no match for the task
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := offerUtils.OfferAgg(offer)