diff --git a/README.md b/README.md index 96226e8..64d6fcb 100644 --- a/README.md +++ b/README.md @@ -15,8 +15,8 @@ To Do: * Log fix for declining offer -- different reason when insufficient resources as compared to when there are no longer any tasks to schedule. * Have a centralised logFile that can be filtered by identifier. All electron logs should go into this file. - * Make ClassMapWatts to commandLine arguments so Electron can be run with ClassMapWatts enabled/disabled. * Make def.Task an interface for further modularization and flexibility. + * Convert def#WattsToConsider(...) to be a receiver of def.Task and change the name of it to Watts(...). **Requires [Performance Co-Pilot](http://pcp.io/) tool pmdumptext to be installed on the machine on which electron is launched for logging to work and PCP collector agents installed diff --git a/constants/constants.go b/constants/constants.go index 834d65f..bbacf42 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -1,8 +1,9 @@ /* Constants that are used across scripts 1. The available hosts = stratos-00x (x varies from 1 to 8) -2. CapMargin = percentage of the requested power to allocate +2. Tolerance = tolerance for a task that when exceeded would starve the task. 3. ConsiderationWindowSize = number of tasks to consider for computation of the dynamic cap. +TODO: Clean this up and use Mesos Attributes instead. */ package constants @@ -11,7 +12,11 @@ var Hosts = []string{"stratos-001.cs.binghamton.edu", "stratos-002.cs.binghamton "stratos-005.cs.binghamton.edu", "stratos-006.cs.binghamton.edu", "stratos-007.cs.binghamton.edu", "stratos-008.cs.binghamton.edu"} -// Classification of the nodes in the cluster based on their power consumption. +/* + Classification of the nodes in the cluster based on their Thermal Design Power (TDP). + The power classes are labelled in the decreasing order of the corresponding TDP, with class A nodes + having the highest TDP and class C nodes having the lowest TDP. +*/ var PowerClasses = map[string]map[string]bool{ "A": map[string]bool{ "stratos-005.cs.binghamton.edu": true, @@ -31,10 +36,10 @@ var PowerClasses = map[string]map[string]bool{ /* Margin with respect to the required power for a job. - So, if power required = 10W, the node would be capped to CapMargin * 10W. + So, if power required = 10W, the node would be capped to Tolerance * 10W. This value can be changed upon convenience. */ -var CapMargin = 0.70 +var Tolerance = 0.70 // Window size for running average var ConsiderationWindowSize = 20 diff --git a/powerCapping/proactiveclusterwidecappers.go b/powerCapping/proactiveclusterwidecappers.go index 8dcbd83..3f10d6a 100644 --- a/powerCapping/proactiveclusterwidecappers.go +++ b/powerCapping/proactiveclusterwidecappers.go @@ -23,7 +23,7 @@ type taskWrapper struct { } func (tw taskWrapper) Val() float64 { - return tw.task.Watts * constants.CapMargin + return tw.task.Watts * constants.Tolerance } func (tw taskWrapper) ID() string { @@ -121,7 +121,7 @@ func (capper ClusterwideCapper) CleverRecap(totalPower map[string]float64, // Not considering this task for the computation of totalAllocatedPower and totalRunningTasks continue } - wattsUsages[host] = append(wattsUsages[host], float64(task.Watts)*constants.CapMargin) + wattsUsages[host] = append(wattsUsages[host], float64(task.Watts)*constants.Tolerance) } } @@ -202,7 +202,7 @@ func (capper ClusterwideCapper) NaiveRecap(totalPower map[string]float64, // Not considering this task for the computation of totalAllocatedPower and totalRunningTasks continue } - totalAllocatedPower += (float64(task.Watts) * constants.CapMargin) + totalAllocatedPower += (float64(task.Watts) * constants.Tolerance) totalRunningTasks++ } } diff --git a/schedulers/binPackSortedWattsSortedOffers.go b/schedulers/binPackSortedWattsSortedOffers.go index 700147e..e4359de 100644 --- a/schedulers/binPackSortedWattsSortedOffers.go +++ b/schedulers/binPackSortedWattsSortedOffers.go @@ -12,26 +12,23 @@ import ( "log" "os" "sort" - "strings" "time" ) // Decides if to take an offer or not func (s *BinPackSortedWattsSortedOffers) takeOffer(offer *mesos.Offer, task def.Task) bool { - cpus, mem, watts := offerUtils.OfferAgg(offer) + offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) //TODO: Insert watts calculation here instead of taking them as a parameter - wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer) if err != nil { // Error in determining wattsConsideration log.Fatal(err) } - if cpus >= task.CPU && mem >= task.RAM && (!s.wattsAsAResource || (watts >= wattsConsideration)) { + if offerCPU >= task.CPU && offerRAM >= task.RAM && (!s.wattsAsAResource || (offerWatts >= wattsConsideration)) { return true } - return false } @@ -145,8 +142,6 @@ func (s *BinPackSortedWattsSortedOffers) ResourceOffers(driver sched.SchedulerDr tasks := []*mesos.TaskInfo{} - offer_cpu, offer_ram, offer_watts := offerUtils.OfferAgg(offer) - offerTaken := false totalWatts := 0.0 totalCPU := 0.0 @@ -159,19 +154,14 @@ func (s *BinPackSortedWattsSortedOffers) ResourceOffers(driver sched.SchedulerDr log.Fatal(err) } - // Check host if it exists - if task.Host != "" { - // Don't take offer if it doesn't match our task's host requirement - if !strings.HasPrefix(*offer.Hostname, task.Host) { - continue - } + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { + continue } for *task.Instances > 0 { // Does the task fit - if (!s.wattsAsAResource || (offer_watts >= (totalWatts + wattsConsideration))) && - (offer_cpu >= (totalCPU + task.CPU)) && - (offer_ram >= (totalRAM + task.RAM)) { + if s.takeOffer(offer, task) { offerTaken = true totalWatts += wattsConsideration diff --git a/schedulers/binpackedpistoncapping.go b/schedulers/binpackedpistoncapping.go index a9a3fab..95225e9 100644 --- a/schedulers/binpackedpistoncapping.go +++ b/schedulers/binpackedpistoncapping.go @@ -15,7 +15,6 @@ import ( "log" "math" "os" - "strings" "sync" "time" ) @@ -257,12 +256,9 @@ func (s *BinPackedPistonCapper) ResourceOffers(driver sched.SchedulerDriver, off log.Fatal(err) } - // Check host if it exists - if task.Host != "" { - // Don't take offer if it doens't match our task's host requirement. - if !strings.HasPrefix(*offer.Hostname, task.Host) { - continue - } + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { + continue } for *task.Instances > 0 { @@ -289,7 +285,7 @@ func (s *BinPackedPistonCapper) ResourceOffers(driver sched.SchedulerDriver, off s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) *task.Instances-- // updating the cap value for offer.Hostname - partialLoad += ((wattsConsideration * constants.CapMargin) / s.totalPower[*offer.Hostname]) * 100 + partialLoad += ((wattsConsideration * constants.Tolerance) / s.totalPower[*offer.Hostname]) * 100 if *task.Instances <= 0 { // All instances of task have been scheduled. Remove it @@ -379,7 +375,7 @@ func (s *BinPackedPistonCapper) StatusUpdate(driver sched.SchedulerDriver, statu } // Need to update the cap values for host of the finishedTask bpPistonMutex.Lock() - bpPistonCapValues[hostOfFinishedTask] -= ((wattsConsideration * constants.CapMargin) / s.totalPower[hostOfFinishedTask]) * 100 + bpPistonCapValues[hostOfFinishedTask] -= ((wattsConsideration * constants.Tolerance) / s.totalPower[hostOfFinishedTask]) * 100 // Checking to see if the cap value has become 0, in which case we uncap the host. if int(math.Floor(bpPistonCapValues[hostOfFinishedTask]+0.5)) == 0 { bpPistonCapValues[hostOfFinishedTask] = 100 diff --git a/schedulers/binpacksortedwatts.go b/schedulers/binpacksortedwatts.go index 1f07e7d..b047373 100644 --- a/schedulers/binpacksortedwatts.go +++ b/schedulers/binpacksortedwatts.go @@ -12,7 +12,6 @@ import ( "log" "os" "sort" - "strings" "time" ) @@ -31,7 +30,6 @@ func (s *BinPackSortedWatts) takeOffer(offer *mesos.Offer, task def.Task) bool { if cpus >= task.CPU && mem >= task.RAM && (!s.wattsAsAResource || (watts >= wattsConsideration)) { return true } - return false } @@ -133,8 +131,6 @@ func (s *BinPackSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offers tasks := []*mesos.TaskInfo{} - offer_cpu, offer_ram, offer_watts := offerUtils.OfferAgg(offer) - offerTaken := false totalWatts := 0.0 totalCPU := 0.0 @@ -147,19 +143,14 @@ func (s *BinPackSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offers log.Fatal(err) } - // Check host if it exists - if task.Host != "" { - // Don't take offer if it doesn't match our task's host requirement - if !strings.HasPrefix(*offer.Hostname, task.Host) { - continue - } + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { + continue } for *task.Instances > 0 { // Does the task fit - if (!s.wattsAsAResource || (offer_watts >= (totalWatts + wattsConsideration))) && - (offer_cpu >= (totalCPU + task.CPU)) && - (offer_ram >= (totalRAM + task.RAM)) { + if s.takeOffer(offer, task) { offerTaken = true totalWatts += wattsConsideration diff --git a/schedulers/bottomHeavy.go b/schedulers/bottomHeavy.go index 2af57d6..4f9f15b 100644 --- a/schedulers/bottomHeavy.go +++ b/schedulers/bottomHeavy.go @@ -26,6 +26,31 @@ BinPacking has the most effect when co-scheduling of tasks is increased. Large t co-scheduling them has a great impact on the total power utilization. */ +func (s *BottomHeavy) takeOfferBinPack(offer *mesos.Offer, totalCPU, totalRAM, totalWatts, + wattsToConsider float64, task def.Task) bool { + offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) + + //TODO: Insert watts calculation here instead of taking them as a parameter + if (!s.wattsAsAResource || (offerWatts >= (totalWatts + wattsToConsider))) && + (offerCPU >= (totalCPU + task.CPU)) && + (offerRAM >= (totalRAM + task.RAM)) { + return true + } + return false + +} + +func (s *BottomHeavy) takeOfferFirstFit(offer *mesos.Offer, wattsConsideration float64, task def.Task) bool { + offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) + + //TODO: Insert watts calculation here instead of taking them as a parameter + if (!s.wattsAsAResource || (offerWatts >= wattsConsideration)) && + (offerCPU >= task.CPU) && (offerRAM >= task.RAM) { + return true + } + return false +} + // electronScheduler implements the Scheduler interface type BottomHeavy struct { base // Type embedded to inherit common functions @@ -148,7 +173,6 @@ func (s *BottomHeavy) pack(offers []*mesos.Offer, driver sched.SchedulerDriver) } tasks := []*mesos.TaskInfo{} - offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) totalWatts := 0.0 totalCPU := 0.0 totalRAM := 0.0 @@ -165,9 +189,7 @@ func (s *BottomHeavy) pack(offers []*mesos.Offer, driver sched.SchedulerDriver) // Does the task fit // OR lazy evaluation. If ignore watts is set to true, second statement won't // be evaluated. - if (!s.wattsAsAResource || (offerWatts >= (totalWatts + wattsConsideration))) && - (offerCPU >= (totalCPU + task.CPU)) && - (offerRAM >= (totalRAM + task.RAM)) { + if s.takeOfferBinPack(offer, totalCPU, totalRAM, totalWatts, wattsConsideration, task) { offerTaken = true totalWatts += wattsConsideration totalCPU += task.CPU @@ -213,7 +235,6 @@ func (s *BottomHeavy) spread(offers []*mesos.Offer, driver sched.SchedulerDriver } tasks := []*mesos.TaskInfo{} - offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) taken := false for i := 0; i < len(s.smallTasks); i++ { task := s.smallTasks[i] @@ -221,14 +242,10 @@ func (s *BottomHeavy) spread(offers []*mesos.Offer, driver sched.SchedulerDriver if err != nil { // Error in determining wattsConsideration log.Fatal(err) - } else { - // Logging the watts consideration - log.Printf("Watts Considered for host[%s], task[%s] = %f\n", *offer.Hostname, task.Name, wattsConsideration) } // Decision to take the offer or not - if (!s.wattsAsAResource || (offerWatts >= wattsConsideration)) && - (offerCPU >= task.CPU) && (offerRAM >= task.RAM) { + if s.takeOfferFirstFit(offer, wattsConsideration, task) { taken = true tasks = append(tasks, s.createTaskInfoAndLogSchedTrace(offer, task)) log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) diff --git a/schedulers/bpswMaxMin.go b/schedulers/bpswMaxMin.go index 54c4e0d..73af1db 100644 --- a/schedulers/bpswMaxMin.go +++ b/schedulers/bpswMaxMin.go @@ -12,7 +12,6 @@ import ( "log" "os" "sort" - "strings" "time" ) @@ -31,7 +30,6 @@ func (s *BPSWMaxMinWatts) takeOffer(offer *mesos.Offer, task def.Task) bool { if cpus >= task.CPU && mem >= task.RAM && (!s.wattsAsAResource || (watts >= wattsConsideration)) { return true } - return false } @@ -40,7 +38,7 @@ type BPSWMaxMinWatts struct { } // New electron scheduler -func NewBPMaxMinWatts(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *BPSWMaxMinWatts { +func NewBPSWMaxMinWatts(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *BPSWMaxMinWatts { sort.Sort(def.WattsSorter(tasks)) logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") @@ -120,7 +118,8 @@ func (s *BPSWMaxMinWatts) newTask(offer *mesos.Offer, task def.Task) *mesos.Task // Determine if the remaining space inside of the offer is enough for this // the task we need to create. If it is, create a TaskInfo and return it. -func (s *BPSWMaxMinWatts) CheckFit(i int, +func (s *BPSWMaxMinWatts) CheckFit( + i int, task def.Task, wattsConsideration float64, offer *mesos.Offer, @@ -128,12 +127,8 @@ func (s *BPSWMaxMinWatts) CheckFit(i int, totalRAM *float64, totalWatts *float64) (bool, *mesos.TaskInfo) { - offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) - // Does the task fit - if (!s.wattsAsAResource || (offerWatts >= (*totalWatts + wattsConsideration))) && - (offerCPU >= (*totalCPU + task.CPU)) && - (offerRAM >= (*totalRAM + task.RAM)) { + if s.takeOffer(offer, task) { *totalWatts += wattsConsideration *totalCPU += task.CPU @@ -197,12 +192,9 @@ func (s *BPSWMaxMinWatts) ResourceOffers(driver sched.SchedulerDriver, offers [] log.Fatal(err) } - // Check host if it exists - if task.Host != "" { - // Don't take offer if it doesn't match our task's host requirement - if !strings.HasPrefix(*offer.Hostname, task.Host) { - continue - } + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { + continue } // TODO: Fix this so index doesn't need to be passed @@ -225,12 +217,9 @@ func (s *BPSWMaxMinWatts) ResourceOffers(driver sched.SchedulerDriver, offers [] log.Fatal(err) } - // Check host if it exists - if task.Host != "" { - // Don't take offer if it doesn't match our task's host requirement - if !strings.HasPrefix(*offer.Hostname, task.Host) { - continue - } + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { + continue } for *task.Instances > 0 { diff --git a/schedulers/bpswMaxMinPistonCapping.go b/schedulers/bpswMaxMinPistonCapping.go index 05c709a..fceec50 100644 --- a/schedulers/bpswMaxMinPistonCapping.go +++ b/schedulers/bpswMaxMinPistonCapping.go @@ -16,7 +16,6 @@ import ( "math" "os" "sort" - "strings" "sync" "time" ) @@ -36,7 +35,6 @@ func (s *BPSWMaxMinPistonCapping) takeOffer(offer *mesos.Offer, task def.Task) b if cpus >= task.CPU && mem >= task.RAM && (!s.wattsAsAResource || (watts >= wattsConsideration)) { return true } - return false } @@ -209,7 +207,8 @@ func (s *BPSWMaxMinPistonCapping) stopCapping() { // Determine if the remaining sapce inside of the offer is enough for // the task we need to create. If it is, create a TaskInfo and return it. -func (s *BPSWMaxMinPistonCapping) CheckFit(i int, +func (s *BPSWMaxMinPistonCapping) CheckFit( + i int, task def.Task, wattsConsideration float64, offer *mesos.Offer, @@ -218,12 +217,8 @@ func (s *BPSWMaxMinPistonCapping) CheckFit(i int, totalWatts *float64, partialLoad *float64) (bool, *mesos.TaskInfo) { - offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) - // Does the task fit - if (!s.wattsAsAResource || (offerWatts >= (*totalWatts + wattsConsideration))) && - (offerCPU >= (*totalCPU + task.CPU)) && - (offerRAM >= (*totalRAM + task.RAM)) { + if s.takeOffer(offer, task) { // Start piston capping if haven't started yet if !s.isCapping { @@ -242,7 +237,7 @@ func (s *BPSWMaxMinPistonCapping) CheckFit(i int, fmt.Println("Inst: ", *task.Instances) s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) *task.Instances-- - *partialLoad += ((wattsConsideration * constants.CapMargin) / s.totalPower[*offer.Hostname]) * 100 + *partialLoad += ((wattsConsideration * constants.Tolerance) / s.totalPower[*offer.Hostname]) * 100 if *task.Instances <= 0 { // All instances of task have been scheduled, remove it @@ -297,12 +292,9 @@ func (s *BPSWMaxMinPistonCapping) ResourceOffers(driver sched.SchedulerDriver, o log.Fatal(err) } - // Check host if it exists - if task.Host != "" { - // Don't take offer if it doesn't match our task's host requirement - if !strings.HasPrefix(*offer.Hostname, task.Host) { - continue - } + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { + continue } // TODO: Fix this so index doesn't need to be passed @@ -325,12 +317,9 @@ func (s *BPSWMaxMinPistonCapping) ResourceOffers(driver sched.SchedulerDriver, o log.Fatal(err) } - // Check host if it exists - if task.Host != "" { - // Don't take offer if it doesn't match our task's host requirement - if !strings.HasPrefix(*offer.Hostname, task.Host) { - continue - } + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { + continue } for *task.Instances > 0 { @@ -422,7 +411,7 @@ func (s *BPSWMaxMinPistonCapping) StatusUpdate(driver sched.SchedulerDriver, sta } // Need to update the cap values for host of the finishedTask bpMaxMinPistonCappingMutex.Lock() - bpMaxMinPistonCappingCapValues[hostOfFinishedTask] -= ((wattsConsideration * constants.CapMargin) / s.totalPower[hostOfFinishedTask]) * 100 + bpMaxMinPistonCappingCapValues[hostOfFinishedTask] -= ((wattsConsideration * constants.Tolerance) / s.totalPower[hostOfFinishedTask]) * 100 // Checking to see if the cap value has become 0, in which case we uncap the host. if int(math.Floor(bpMaxMinPistonCappingCapValues[hostOfFinishedTask]+0.5)) == 0 { bpMaxMinPistonCappingCapValues[hostOfFinishedTask] = 100 diff --git a/schedulers/bpswMaxMinProacCC.go b/schedulers/bpswMaxMinProacCC.go index e1a48f0..564134a 100644 --- a/schedulers/bpswMaxMinProacCC.go +++ b/schedulers/bpswMaxMinProacCC.go @@ -16,7 +16,6 @@ import ( "math" "os" "sort" - "strings" "sync" "time" ) @@ -35,7 +34,6 @@ func (s *BPSWMaxMinProacCC) takeOffer(offer *mesos.Offer, task def.Task) bool { if cpus >= task.CPU && mem >= task.RAM && (!s.wattsAsAResource || (watts >= wattsConsideration)) { return true } - return false } @@ -233,7 +231,8 @@ func (s *BPSWMaxMinProacCC) stopRecapping() { // Determine if the remaining space inside of the offer is enough for // the task we need to create. If it is, create TaskInfo and return it. -func (s *BPSWMaxMinProacCC) CheckFit(i int, +func (s *BPSWMaxMinProacCC) CheckFit( + i int, task def.Task, wattsConsideration float64, offer *mesos.Offer, @@ -241,12 +240,8 @@ func (s *BPSWMaxMinProacCC) CheckFit(i int, totalRAM *float64, totalWatts *float64) (bool, *mesos.TaskInfo) { - offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) - // Does the task fit - if (!s.wattsAsAResource || (offerWatts >= (*totalWatts + wattsConsideration))) && - (offerCPU >= (*totalCPU + task.CPU)) && - (offerRAM >= (*totalRAM + task.RAM)) { + if s.takeOffer(offer, task) { // Capping the cluster if haven't yet started if !s.isCapping { @@ -345,12 +340,9 @@ func (s *BPSWMaxMinProacCC) ResourceOffers(driver sched.SchedulerDriver, offers // Error in determining wattsConsideration log.Fatal(err) } - // Check host if it exists - if task.Host != "" { - // Don't take offer if it doesn't match our task's host requirement - if !strings.HasPrefix(*offer.Hostname, task.Host) { - continue - } + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { + continue } // TODO: Fix this so index doesn't need to be passed @@ -373,12 +365,9 @@ func (s *BPSWMaxMinProacCC) ResourceOffers(driver sched.SchedulerDriver, offers log.Fatal(err) } - // Check host if it exists - if task.Host != "" { - // Don't take offer if it doesn't match our task's host requirement - if !strings.HasPrefix(*offer.Hostname, task.Host) { - continue - } + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { + continue } for *task.Instances > 0 { diff --git a/schedulers/firstfit.go b/schedulers/firstfit.go index d0235ce..864a208 100644 --- a/schedulers/firstfit.go +++ b/schedulers/firstfit.go @@ -11,7 +11,6 @@ import ( sched "github.com/mesos/mesos-go/scheduler" "log" "os" - "strings" "time" ) @@ -138,12 +137,9 @@ func (s *FirstFit) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos. for i := 0; i < len(s.tasks); i++ { task := s.tasks[i] - // Check host if it exists - if task.Host != "" { - // Don't take offer if it doesn't match our task's host requirement - if !strings.HasPrefix(*offer.Hostname, task.Host) { - continue - } + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { + continue } // Decision to take the offer or not diff --git a/schedulers/firstfitProacCC.go b/schedulers/firstfitProacCC.go index 4e361a5..caddfd3 100644 --- a/schedulers/firstfitProacCC.go +++ b/schedulers/firstfitProacCC.go @@ -15,7 +15,6 @@ import ( "log" "math" "os" - "strings" "sync" "time" ) @@ -271,8 +270,8 @@ func (s *FirstFitProacCC) ResourceOffers(driver sched.SchedulerDriver, offers [] for i := 0; i < len(s.tasks); i++ { task := s.tasks[i] - // Don't take offer if it doesn't match our task's host requirement. - if !strings.HasPrefix(*offer.Hostname, task.Host) { + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { continue } diff --git a/schedulers/firstfitSortedOffers.go b/schedulers/firstfitSortedOffers.go index d088b3d..48a9ec1 100644 --- a/schedulers/firstfitSortedOffers.go +++ b/schedulers/firstfitSortedOffers.go @@ -12,7 +12,6 @@ import ( "log" "os" "sort" - "strings" "time" ) @@ -150,12 +149,9 @@ func (s *FirstFitSortedOffers) ResourceOffers(driver sched.SchedulerDriver, offe for i := 0; i < len(s.tasks); i++ { task := s.tasks[i] - // Check host if it exists - if task.Host != "" { - // Don't take offer if it doesn't match our task's host requirement - if !strings.HasPrefix(*offer.Hostname, task.Host) { - continue - } + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { + continue } // Decision to take the offer or not diff --git a/schedulers/firstfitSortedWattsProacCC.go b/schedulers/firstfitSortedWattsProacCC.go index cee8af5..e00590c 100644 --- a/schedulers/firstfitSortedWattsProacCC.go +++ b/schedulers/firstfitSortedWattsProacCC.go @@ -26,7 +26,6 @@ import ( "math" "os" "sort" - "strings" "sync" "time" ) @@ -287,9 +286,8 @@ func (s *FirstFitSortedWattsProacCC) ResourceOffers(driver sched.SchedulerDriver for i := 0; i < len(s.tasks); i++ { task := s.tasks[i] - - // Don't take offer if it doesn't match our task's host requirement. - if !strings.HasPrefix(*offer.Hostname, task.Host) { + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { continue } diff --git a/schedulers/firstfitSortedWattsSortedOffers.go b/schedulers/firstfitSortedWattsSortedOffers.go index 2f309fd..d473a09 100644 --- a/schedulers/firstfitSortedWattsSortedOffers.go +++ b/schedulers/firstfitSortedWattsSortedOffers.go @@ -12,7 +12,6 @@ import ( "log" "os" "sort" - "strings" "time" ) @@ -154,12 +153,9 @@ func (s *FirstFitSortedWattsSortedOffers) ResourceOffers(driver sched.SchedulerD for i := 0; i < len(s.tasks); i++ { task := s.tasks[i] - // Check host if it exists - if task.Host != "" { - // Don't take offer if it doesn't match our task's host requirement - if !strings.HasPrefix(*offer.Hostname, task.Host) { - continue - } + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { + continue } // Decision to take the offer or not diff --git a/schedulers/firstfitsortedwatts.go b/schedulers/firstfitsortedwatts.go index 0b05f12..dfb7bed 100644 --- a/schedulers/firstfitsortedwatts.go +++ b/schedulers/firstfitsortedwatts.go @@ -12,7 +12,6 @@ import ( "log" "os" "sort" - "strings" "time" ) @@ -141,12 +140,9 @@ func (s *FirstFitSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offer for i := 0; i < len(s.tasks); i++ { task := s.tasks[i] - // Check host if it exists - if task.Host != "" { - // Don't take offer if it doesn't match our task's host requirement - if !strings.HasPrefix(*offer.Hostname, task.Host) { - continue - } + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { + continue } // Decision to take the offer or not diff --git a/schedulers/firstfitwattsonly.go b/schedulers/firstfitwattsonly.go index 5b29914..e2225ff 100644 --- a/schedulers/firstfitwattsonly.go +++ b/schedulers/firstfitwattsonly.go @@ -11,7 +11,6 @@ import ( sched "github.com/mesos/mesos-go/scheduler" "log" "os" - "strings" "time" ) @@ -131,12 +130,9 @@ func (s *FirstFitWattsOnly) ResourceOffers(driver sched.SchedulerDriver, offers for i := 0; i < len(s.tasks); i++ { task := s.tasks[i] - // Check host if it exists - if task.Host != "" { - // Don't take offer if it doesn't match our task's host requirement - if !strings.HasPrefix(*offer.Hostname, task.Host) { - continue - } + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { + continue } // Decision to take the offer or not diff --git a/schedulers/helpers.go b/schedulers/helpers.go index 23d1441..e6ba7fb 100644 --- a/schedulers/helpers.go +++ b/schedulers/helpers.go @@ -1,9 +1,9 @@ package schedulers import ( + "bitbucket.org/sunybingcloud/electron/constants" "fmt" "log" - "bitbucket.org/sunybingcloud/electron/constants" ) func coLocated(tasks map[string]bool) { @@ -24,4 +24,3 @@ func hostToPowerClass(hostName string) string { } return "" } - diff --git a/schedulers/topHeavy.go b/schedulers/topHeavy.go index 1281634..da03f37 100644 --- a/schedulers/topHeavy.go +++ b/schedulers/topHeavy.go @@ -26,6 +26,30 @@ This was done to give a little more room for the large tasks (power intensive) f starvation of power intensive tasks. */ +func (s *TopHeavy) takeOfferBinPack(offer *mesos.Offer, totalCPU, totalRAM, totalWatts, + wattsToConsider float64, task def.Task) bool { + offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) + + //TODO: Insert watts calculation here instead of taking them as a parameter + if (!s.wattsAsAResource || (offerWatts >= (totalWatts + wattsToConsider))) && + (offerCPU >= (totalCPU + task.CPU)) && + (offerRAM >= (totalRAM + task.RAM)) { + return true + } + return false +} + +func (s *TopHeavy) takeOfferFirstFit(offer *mesos.Offer, wattsConsideration float64, task def.Task) bool { + offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) + + //TODO: Insert watts calculation here instead of taking them as a parameter + if (!s.wattsAsAResource || (offerWatts >= wattsConsideration)) && + (offerCPU >= task.CPU) && (offerRAM >= task.RAM) { + return true + } + return false +} + // electronScheduler implements the Scheduler interface type TopHeavy struct { base // Type embedded to inherit common functions @@ -148,7 +172,6 @@ func (s *TopHeavy) pack(offers []*mesos.Offer, driver sched.SchedulerDriver) { } tasks := []*mesos.TaskInfo{} - offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) totalWatts := 0.0 totalCPU := 0.0 totalRAM := 0.0 @@ -165,9 +188,7 @@ func (s *TopHeavy) pack(offers []*mesos.Offer, driver sched.SchedulerDriver) { // Does the task fit // OR lazy evaluation. If ignore watts is set to true, second statement won't // be evaluated. - if (!s.wattsAsAResource || (offerWatts >= (totalWatts + wattsConsideration))) && - (offerCPU >= (totalCPU + task.CPU)) && - (offerRAM >= (totalRAM + task.RAM)) { + if s.takeOfferBinPack(offer, totalCPU, totalRAM, totalWatts, wattsConsideration, task) { taken = true totalWatts += wattsConsideration totalCPU += task.CPU @@ -213,7 +234,6 @@ func (s *TopHeavy) spread(offers []*mesos.Offer, driver sched.SchedulerDriver) { } tasks := []*mesos.TaskInfo{} - offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) offerTaken := false for i := 0; i < len(s.largeTasks); i++ { task := s.largeTasks[i] @@ -224,8 +244,7 @@ func (s *TopHeavy) spread(offers []*mesos.Offer, driver sched.SchedulerDriver) { } // Decision to take the offer or not - if (!s.wattsAsAResource || (offerWatts >= wattsConsideration)) && - (offerCPU >= task.CPU) && (offerRAM >= task.RAM) { + if s.takeOfferFirstFit(offer, wattsConsideration, task) { offerTaken = true tasks = append(tasks, s.createTaskInfoAndLogSchedTrace(offer, task)) log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) diff --git a/utilities/offerUtils/offerUtils.go b/utilities/offerUtils/offerUtils.go index 16144dd..6f5dc81 100644 --- a/utilities/offerUtils/offerUtils.go +++ b/utilities/offerUtils/offerUtils.go @@ -2,6 +2,7 @@ package offerUtils import ( mesos "github.com/mesos/mesos-go/mesosproto" + "strings" ) func OfferAgg(offer *mesos.Offer) (float64, float64, float64) { @@ -32,6 +33,8 @@ func PowerClass(offer *mesos.Offer) string { return powerClass } +// Implements the sort.Sort interface to sort Offers based on CPU. +// TODO: Have a generic sorter that sorts based on a defined requirement (CPU, RAM, DISK or Watts) type OffersSorter []*mesos.Offer func (offersSorter OffersSorter) Len() int { @@ -49,3 +52,11 @@ func (offersSorter OffersSorter) Less(i, j int) bool { cpu2, _, _ := OfferAgg(offersSorter[j]) return cpu1 <= cpu2 } + +// Is there a mismatch between the task's host requirement and the host corresponding to the offer. +func HostMismatch(offerHost string, taskHost string) bool { + if taskHost != "" && !strings.HasPrefix(offerHost, taskHost) { + return true + } + return false +} diff --git a/utilities/utils.go b/utilities/utils.go index 6662c59..18b2400 100644 --- a/utilities/utils.go +++ b/utilities/utils.go @@ -45,4 +45,3 @@ func OrderedKeys(plist PairList) ([]string, error) { } return orderedKeys, nil } -