diff --git a/scheduler.go b/scheduler.go index 82d7dba..9b6dc45 100644 --- a/scheduler.go +++ b/scheduler.go @@ -17,7 +17,7 @@ import ( var master = flag.String("master", "xavier:5050", "Location of leading Mesos master") var tasksFile = flag.String("workload", "", "JSON file containing task definitions") -var ignoreWatts = flag.Bool("ignoreWatts", false, "Ignore watts in offers") +var wattsAsAResource = flag.Bool("wattsAsAResource", false, "Enable Watts as a Resource") var pcplogPrefix = flag.String("logPrefix", "", "Prefix for pcplog") var hiThreshold = flag.Float64("hiThreshold", 0.0, "Upperbound for when we should start capping") var loThreshold = flag.Float64("loThreshold", 0.0, "Lowerbound for when we should start uncapping") @@ -27,7 +27,7 @@ var classMapWatts = flag.Bool("classMapWatts", false, "Enable mapping of watts t func init() { flag.StringVar(master, "m", "xavier:5050", "Location of leading Mesos master (shorthand)") flag.StringVar(tasksFile, "w", "", "JSON file containing task definitions (shorthand)") - flag.BoolVar(ignoreWatts, "i", false, "Ignore watts in offers (shorthand)") + flag.BoolVar(wattsAsAResource, "waar", false, "Enable Watts as a Resource") flag.StringVar(pcplogPrefix, "p", "", "Prefix for pcplog (shorthand)") flag.Float64Var(hiThreshold, "ht", 700.0, "Upperbound for when we should start capping (shorthand)") flag.Float64Var(loThreshold, "lt", 400.0, "Lowerbound for when we should start uncapping (shorthand)") @@ -60,7 +60,7 @@ func main() { startTime := time.Now().Format("20060102150405") logPrefix := *pcplogPrefix + "_" + startTime - scheduler := schedulers.NewBinPackedPistonCapper(tasks, *ignoreWatts, logPrefix, *classMapWatts) + scheduler := schedulers.NewBinPackedPistonCapper(tasks, *wattsAsAResource, logPrefix, *classMapWatts) driver, err := sched.NewMesosSchedulerDriver(sched.DriverConfig{ Master: *master, Framework: &mesos.FrameworkInfo{ diff --git a/schedulers/binPackSortedWattsSortedOffers.go b/schedulers/binPackSortedWattsSortedOffers.go index 0eae312..9c27aad 100644 --- a/schedulers/binPackSortedWattsSortedOffers.go +++ b/schedulers/binPackSortedWattsSortedOffers.go @@ -28,7 +28,7 @@ func (s *BinPackSortedWattsSortedOffers) takeOffer(offer *mesos.Offer, task def. // Error in determining wattsConsideration log.Fatal(err) } - if cpus >= task.CPU && mem >= task.RAM && (s.ignoreWatts || (watts >= wattsConsideration)) { + if cpus >= task.CPU && mem >= task.RAM && (!s.wattsAsAResource || (watts >= wattsConsideration)) { return true } @@ -36,14 +36,14 @@ func (s *BinPackSortedWattsSortedOffers) takeOffer(offer *mesos.Offer, task def. } type BinPackSortedWattsSortedOffers struct { - base // Type embedded to inherit common functions - tasksCreated int - tasksRunning int - tasks []def.Task - metrics map[string]def.Metric - running map[string]map[string]bool - ignoreWatts bool - classMapWatts bool + base // Type embedded to inherit common functions + tasksCreated int + tasksRunning int + tasks []def.Task + metrics map[string]def.Metric + running map[string]map[string]bool + wattsAsAResource bool + classMapWatts bool // First set of PCP values are garbage values, signal to logger to start recording when we're // about to schedule a new task @@ -63,7 +63,7 @@ type BinPackSortedWattsSortedOffers struct { } // New electron scheduler -func NewBinPackSortedWattsSortedOffers(tasks []def.Task, ignoreWatts bool, schedTracePrefix string, +func NewBinPackSortedWattsSortedOffers(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *BinPackSortedWattsSortedOffers { sort.Sort(def.WattsSorter(tasks)) @@ -73,15 +73,15 @@ func NewBinPackSortedWattsSortedOffers(tasks []def.Task, ignoreWatts bool, sched } s := &BinPackSortedWattsSortedOffers{ - tasks: tasks, - ignoreWatts: ignoreWatts, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - RecordPCP: false, - schedTrace: log.New(logFile, "", log.LstdFlags), + tasks: tasks, + wattsAsAResource: wattsAsAResource, + classMapWatts: classMapWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + RecordPCP: false, + schedTrace: log.New(logFile, "", log.LstdFlags), } return s } @@ -109,7 +109,7 @@ func (s *BinPackSortedWattsSortedOffers) newTask(offer *mesos.Offer, task def.Ta mesosutil.NewScalarResource("mem", task.RAM), } - if !s.ignoreWatts { + if s.wattsAsAResource { if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil { log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider) resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider)) @@ -190,7 +190,7 @@ func (s *BinPackSortedWattsSortedOffers) ResourceOffers(driver sched.SchedulerDr for *task.Instances > 0 { // Does the task fit - if (s.ignoreWatts || (offer_watts >= (totalWatts + wattsConsideration))) && + if (!s.wattsAsAResource || (offer_watts >= (totalWatts + wattsConsideration))) && (offer_cpu >= (totalCPU + task.CPU)) && (offer_ram >= (totalRAM + task.RAM)) { diff --git a/schedulers/binpackedpistoncapping.go b/schedulers/binpackedpistoncapping.go index b77a89e..ca5ab5a 100644 --- a/schedulers/binpackedpistoncapping.go +++ b/schedulers/binpackedpistoncapping.go @@ -27,18 +27,18 @@ import ( corresponding to the load on that node. */ type BinPackedPistonCapper struct { - base // Type embedded to inherit common functions - tasksCreated int - tasksRunning int - tasks []def.Task - metrics map[string]def.Metric - running map[string]map[string]bool - taskMonitor map[string][]def.Task - totalPower map[string]float64 - ignoreWatts bool - classMapWatts bool - ticker *time.Ticker - isCapping bool + base // Type embedded to inherit common functions + tasksCreated int + tasksRunning int + tasks []def.Task + metrics map[string]def.Metric + running map[string]map[string]bool + taskMonitor map[string][]def.Task + totalPower map[string]float64 + wattsAsAResource bool + classMapWatts bool + ticker *time.Ticker + isCapping bool // First set of PCP values are garbage values, signal to logger to start recording when we're // about to schedule the new task. @@ -59,7 +59,7 @@ type BinPackedPistonCapper struct { } // New electron scheduler. -func NewBinPackedPistonCapper(tasks []def.Task, ignoreWatts bool, schedTracePrefix string, +func NewBinPackedPistonCapper(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *BinPackedPistonCapper { logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") @@ -68,19 +68,19 @@ func NewBinPackedPistonCapper(tasks []def.Task, ignoreWatts bool, schedTracePref } s := &BinPackedPistonCapper{ - tasks: tasks, - ignoreWatts: ignoreWatts, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - taskMonitor: make(map[string][]def.Task), - totalPower: make(map[string]float64), - RecordPCP: false, - ticker: time.NewTicker(5 * time.Second), - isCapping: false, - schedTrace: log.New(logFile, "", log.LstdFlags), + tasks: tasks, + wattsAsAResource: wattsAsAResource, + classMapWatts: classMapWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + taskMonitor: make(map[string][]def.Task), + totalPower: make(map[string]float64), + RecordPCP: false, + ticker: time.NewTicker(5 * time.Second), + isCapping: false, + schedTrace: log.New(logFile, "", log.LstdFlags), } return s } @@ -93,7 +93,7 @@ func (s *BinPackedPistonCapper) takeOffer(offer *mesos.Offer, offerWatts float64 // Error in determining wattsToConsider log.Fatal(err) } - if (s.ignoreWatts || (offerWatts >= (totalWatts + wattsConsideration))) && + if (!s.wattsAsAResource || (offerWatts >= (totalWatts + wattsConsideration))) && (offerCPU >= (totalCPU + task.CPU)) && (offerRAM >= (totalRAM + task.RAM)) { return true @@ -137,7 +137,7 @@ func (s *BinPackedPistonCapper) newTask(offer *mesos.Offer, task def.Task) *meso mesosutil.NewScalarResource("mem", task.RAM), } - if !s.ignoreWatts { + if s.wattsAsAResource { if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil { log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider) resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider)) diff --git a/schedulers/binpacksortedwatts.go b/schedulers/binpacksortedwatts.go index 215341a..936f7f6 100644 --- a/schedulers/binpacksortedwatts.go +++ b/schedulers/binpacksortedwatts.go @@ -28,7 +28,7 @@ func (s *BinPackSortedWatts) takeOffer(offer *mesos.Offer, task def.Task) bool { // Error in determining wattsConsideration log.Fatal(err) } - if cpus >= task.CPU && mem >= task.RAM && watts >= wattsConsideration { + if cpus >= task.CPU && mem >= task.RAM && (!s.wattsAsAResource || (watts >= wattsConsideration)) { return true } @@ -36,14 +36,14 @@ func (s *BinPackSortedWatts) takeOffer(offer *mesos.Offer, task def.Task) bool { } type BinPackSortedWatts struct { - base // Type embedded to inherit common functions - tasksCreated int - tasksRunning int - tasks []def.Task - metrics map[string]def.Metric - running map[string]map[string]bool - ignoreWatts bool - classMapWatts bool + base // Type embedded to inherit common functions + tasksCreated int + tasksRunning int + tasks []def.Task + metrics map[string]def.Metric + running map[string]map[string]bool + wattsAsAResource bool + classMapWatts bool // First set of PCP values are garbage values, signal to logger to start recording when we're // about to schedule a new task @@ -63,7 +63,7 @@ type BinPackSortedWatts struct { } // New electron scheduler -func NewBinPackSortedWatts(tasks []def.Task, ignoreWatts bool, schedTracePrefix string, classMapWatts bool) *BinPackSortedWatts { +func NewBinPackSortedWatts(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *BinPackSortedWatts { sort.Sort(def.WattsSorter(tasks)) logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") @@ -72,15 +72,15 @@ func NewBinPackSortedWatts(tasks []def.Task, ignoreWatts bool, schedTracePrefix } s := &BinPackSortedWatts{ - tasks: tasks, - ignoreWatts: ignoreWatts, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - RecordPCP: false, - schedTrace: log.New(logFile, "", log.LstdFlags), + tasks: tasks, + wattsAsAResource: wattsAsAResource, + classMapWatts: classMapWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + RecordPCP: false, + schedTrace: log.New(logFile, "", log.LstdFlags), } return s } @@ -108,7 +108,7 @@ func (s *BinPackSortedWatts) newTask(offer *mesos.Offer, task def.Task) *mesos.T mesosutil.NewScalarResource("mem", task.RAM), } - if !s.ignoreWatts { + if s.wattsAsAResource { if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil { log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider) resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider)) @@ -178,7 +178,7 @@ func (s *BinPackSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offers for *task.Instances > 0 { // Does the task fit - if (s.ignoreWatts || (offer_watts >= (totalWatts + wattsConsideration))) && + if (!s.wattsAsAResource || (offer_watts >= (totalWatts + wattsConsideration))) && (offer_cpu >= (totalCPU + task.CPU)) && (offer_ram >= (totalRAM + task.RAM)) { diff --git a/schedulers/bottomHeavy.go b/schedulers/bottomHeavy.go index a0bf3b4..7ee8fca 100644 --- a/schedulers/bottomHeavy.go +++ b/schedulers/bottomHeavy.go @@ -34,7 +34,7 @@ type BottomHeavy struct { tasks []def.Task metrics map[string]def.Metric running map[string]map[string]bool - ignoreWatts bool + wattsAsAResource bool classMapWatts bool smallTasks, largeTasks []def.Task @@ -56,7 +56,7 @@ type BottomHeavy struct { } // New electron scheduler -func NewBottomHeavy(tasks []def.Task, ignoreWatts bool, schedTracePrefix string, classMapWatts bool) *BottomHeavy { +func NewBottomHeavy(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *BottomHeavy { sort.Sort(def.WattsSorter(tasks)) logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") @@ -68,16 +68,16 @@ func NewBottomHeavy(tasks []def.Task, ignoreWatts bool, schedTracePrefix string, // Classification done based on MMPU watts requirements. mid := int(math.Floor((float64(len(tasks)) / 2.0) + 0.5)) s := &BottomHeavy{ - smallTasks: tasks[:mid], - largeTasks: tasks[mid+1:], - ignoreWatts: ignoreWatts, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - RecordPCP: false, - schedTrace: log.New(logFile, "", log.LstdFlags), + smallTasks: tasks[:mid], + largeTasks: tasks[mid+1:], + wattsAsAResource: wattsAsAResource, + classMapWatts: classMapWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + RecordPCP: false, + schedTrace: log.New(logFile, "", log.LstdFlags), } return s } @@ -105,7 +105,7 @@ func (s *BottomHeavy) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo mesosutil.NewScalarResource("mem", task.RAM), } - if !s.ignoreWatts { + if s.wattsAsAResource { if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil { log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider) resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider)) @@ -186,7 +186,7 @@ func (s *BottomHeavy) pack(offers []*mesos.Offer, driver sched.SchedulerDriver) // Does the task fit // OR lazy evaluation. If ignore watts is set to true, second statement won't // be evaluated. - if (s.ignoreWatts || (offerWatts >= (totalWatts + wattsConsideration))) && + if (!s.wattsAsAResource || (offerWatts >= (totalWatts + wattsConsideration))) && (offerCPU >= (totalCPU + task.CPU)) && (offerRAM >= (totalRAM + task.RAM)) { offerTaken = true @@ -248,7 +248,7 @@ func (s *BottomHeavy) spread(offers []*mesos.Offer, driver sched.SchedulerDriver } // Decision to take the offer or not - if (s.ignoreWatts || (offerWatts >= wattsConsideration)) && + if (!s.wattsAsAResource || (offerWatts >= wattsConsideration)) && (offerCPU >= task.CPU) && (offerRAM >= task.RAM) { taken = true tasks = append(tasks, s.createTaskInfoAndLogSchedTrace(offer, task)) diff --git a/schedulers/bpswMaxMin.go b/schedulers/bpswMaxMin.go index f6d4f3b..ae47645 100644 --- a/schedulers/bpswMaxMin.go +++ b/schedulers/bpswMaxMin.go @@ -28,7 +28,7 @@ func (s *BPSWMaxMinWatts) takeOffer(offer *mesos.Offer, task def.Task) bool { // Error in determining wattsConsideration log.Fatal(err) } - if cpus >= task.CPU && mem >= task.RAM && (s.ignoreWatts || (watts >= wattsConsideration)) { + if cpus >= task.CPU && mem >= task.RAM && (!s.wattsAsAResource || (watts >= wattsConsideration)) { return true } @@ -42,7 +42,7 @@ type BPSWMaxMinWatts struct { tasks []def.Task metrics map[string]def.Metric running map[string]map[string]bool - ignoreWatts bool + wattsAsAResource bool classMapWatts bool // First set of PCP values are garbage values, signal to logger to start recording when we're @@ -63,7 +63,7 @@ type BPSWMaxMinWatts struct { } // New electron scheduler -func NewBPMaxMinWatts(tasks []def.Task, ignoreWatts bool, schedTracePrefix string, classMapWatts bool) *BPSWMaxMinWatts { +func NewBPMaxMinWatts(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *BPSWMaxMinWatts { sort.Sort(def.WattsSorter(tasks)) logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") @@ -73,7 +73,7 @@ func NewBPMaxMinWatts(tasks []def.Task, ignoreWatts bool, schedTracePrefix strin s := &BPSWMaxMinWatts{ tasks: tasks, - ignoreWatts: ignoreWatts, + wattsAsAResource: wattsAsAResource, classMapWatts: classMapWatts, Shutdown: make(chan struct{}), Done: make(chan struct{}), @@ -109,7 +109,7 @@ func (s *BPSWMaxMinWatts) newTask(offer *mesos.Offer, task def.Task) *mesos.Task mesosutil.NewScalarResource("mem", task.RAM), } - if !s.ignoreWatts { + if s.wattsAsAResource { if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil { log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider) resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider)) @@ -152,7 +152,7 @@ func (s *BPSWMaxMinWatts) CheckFit(i int, offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) // Does the task fit - if (s.ignoreWatts || (offerWatts >= (*totalWatts + wattsConsideration))) && + if (!s.wattsAsAResource || (offerWatts >= (*totalWatts + wattsConsideration))) && (offerCPU >= (*totalCPU + task.CPU)) && (offerRAM >= (*totalRAM + task.RAM)) { diff --git a/schedulers/bpswMaxMinPistonCapping.go b/schedulers/bpswMaxMinPistonCapping.go index 6214e0e..d2f63e1 100644 --- a/schedulers/bpswMaxMinPistonCapping.go +++ b/schedulers/bpswMaxMinPistonCapping.go @@ -33,7 +33,7 @@ func (s *BPSWMaxMinPistonCapping) takeOffer(offer *mesos.Offer, task def.Task) b // Error in determining wattsConsideration log.Fatal(err) } - if cpus >= task.CPU && mem >= task.RAM && (s.ignoreWatts || (watts >= wattsConsideration)) { + if cpus >= task.CPU && mem >= task.RAM && (!s.wattsAsAResource || (watts >= wattsConsideration)) { return true } @@ -41,18 +41,18 @@ func (s *BPSWMaxMinPistonCapping) takeOffer(offer *mesos.Offer, task def.Task) b } type BPSWMaxMinPistonCapping struct { - base //Type embedding to inherit common functions - tasksCreated int - tasksRunning int - tasks []def.Task - metrics map[string]def.Metric - running map[string]map[string]bool - taskMonitor map[string][]def.Task - totalPower map[string]float64 - ignoreWatts bool - classMapWatts bool - ticker *time.Ticker - isCapping bool + base //Type embedding to inherit common functions + tasksCreated int + tasksRunning int + tasks []def.Task + metrics map[string]def.Metric + running map[string]map[string]bool + taskMonitor map[string][]def.Task + totalPower map[string]float64 + wattsAsAResource bool + classMapWatts bool + ticker *time.Ticker + isCapping bool // First set of PCP values are garbage values, signal to logger to start recording when we're // about to schedule a new task @@ -72,7 +72,7 @@ type BPSWMaxMinPistonCapping struct { } // New electron scheduler -func NewBPSWMaxMinPistonCapping(tasks []def.Task, ignoreWatts bool, schedTracePrefix string, +func NewBPSWMaxMinPistonCapping(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *BPSWMaxMinPistonCapping { sort.Sort(def.WattsSorter(tasks)) @@ -82,19 +82,19 @@ func NewBPSWMaxMinPistonCapping(tasks []def.Task, ignoreWatts bool, schedTracePr } s := &BPSWMaxMinPistonCapping{ - tasks: tasks, - ignoreWatts: ignoreWatts, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - taskMonitor: make(map[string][]def.Task), - totalPower: make(map[string]float64), - RecordPCP: false, - ticker: time.NewTicker(5 * time.Second), - isCapping: false, - schedTrace: log.New(logFile, "", log.LstdFlags), + tasks: tasks, + wattsAsAResource: wattsAsAResource, + classMapWatts: classMapWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + taskMonitor: make(map[string][]def.Task), + totalPower: make(map[string]float64), + RecordPCP: false, + ticker: time.NewTicker(5 * time.Second), + isCapping: false, + schedTrace: log.New(logFile, "", log.LstdFlags), } return s @@ -134,7 +134,7 @@ func (s *BPSWMaxMinPistonCapping) newTask(offer *mesos.Offer, task def.Task) *me mesosutil.NewScalarResource("mem", task.RAM), } - if !s.ignoreWatts { + if s.wattsAsAResource { if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil { log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider) resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider)) @@ -242,7 +242,7 @@ func (s *BPSWMaxMinPistonCapping) CheckFit(i int, offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) // Does the task fit - if (s.ignoreWatts || (offerWatts >= (*totalWatts + wattsConsideration))) && + if (!s.wattsAsAResource || (offerWatts >= (*totalWatts + wattsConsideration))) && (offerCPU >= (*totalCPU + task.CPU)) && (offerRAM >= (*totalRAM + task.RAM)) { diff --git a/schedulers/bpswMaxMinProacCC.go b/schedulers/bpswMaxMinProacCC.go index 3ec4d6a..129b030 100644 --- a/schedulers/bpswMaxMinProacCC.go +++ b/schedulers/bpswMaxMinProacCC.go @@ -32,7 +32,7 @@ func (s *BPSWMaxMinProacCC) takeOffer(offer *mesos.Offer, task def.Task) bool { // Error in determining wattsConsideration log.Fatal(err) } - if cpus >= task.CPU && mem >= task.RAM && (s.ignoreWatts || (watts >= wattsConsideration)) { + if cpus >= task.CPU && mem >= task.RAM && (!s.wattsAsAResource || (watts >= wattsConsideration)) { return true } @@ -40,22 +40,22 @@ func (s *BPSWMaxMinProacCC) takeOffer(offer *mesos.Offer, task def.Task) bool { } type BPSWMaxMinProacCC struct { - base // Type embedding to inherit common functions - tasksCreated int - tasksRunning int - tasks []def.Task - metrics map[string]def.Metric - running map[string]map[string]bool - taskMonitor map[string][]def.Task - availablePower map[string]float64 - totalPower map[string]float64 - ignoreWatts bool - classMapWatts bool - capper *powCap.ClusterwideCapper - ticker *time.Ticker - recapTicker *time.Ticker - isCapping bool // indicate whether we are currently performing cluster-wide capping. - isRecapping bool // indicate whether we are currently performing cluster-wide recapping. + base // Type embedding to inherit common functions + tasksCreated int + tasksRunning int + tasks []def.Task + metrics map[string]def.Metric + running map[string]map[string]bool + taskMonitor map[string][]def.Task + availablePower map[string]float64 + totalPower map[string]float64 + wattsAsAResource bool + classMapWatts bool + capper *powCap.ClusterwideCapper + ticker *time.Ticker + recapTicker *time.Ticker + isCapping bool // indicate whether we are currently performing cluster-wide capping. + isRecapping bool // indicate whether we are currently performing cluster-wide recapping. // First set of PCP values are garbage values, signal to logger to start recording when we're // about to schedule a new task @@ -75,7 +75,7 @@ type BPSWMaxMinProacCC struct { } // New electron scheduler -func NewBPSWMaxMinProacCC(tasks []def.Task, ignoreWatts bool, schedTracePrefix string, classMapWatts bool) *BPSWMaxMinProacCC { +func NewBPSWMaxMinProacCC(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *BPSWMaxMinProacCC { sort.Sort(def.WattsSorter(tasks)) logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") @@ -84,23 +84,23 @@ func NewBPSWMaxMinProacCC(tasks []def.Task, ignoreWatts bool, schedTracePrefix s } s := &BPSWMaxMinProacCC{ - tasks: tasks, - ignoreWatts: ignoreWatts, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - taskMonitor: make(map[string][]def.Task), - availablePower: make(map[string]float64), - totalPower: make(map[string]float64), - RecordPCP: false, - capper: powCap.GetClusterwideCapperInstance(), - ticker: time.NewTicker(10 * time.Second), - recapTicker: time.NewTicker(20 * time.Second), - isCapping: false, - isRecapping: false, - schedTrace: log.New(logFile, "", log.LstdFlags), + tasks: tasks, + wattsAsAResource: wattsAsAResource, + classMapWatts: classMapWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + taskMonitor: make(map[string][]def.Task), + availablePower: make(map[string]float64), + totalPower: make(map[string]float64), + RecordPCP: false, + capper: powCap.GetClusterwideCapperInstance(), + ticker: time.NewTicker(10 * time.Second), + recapTicker: time.NewTicker(20 * time.Second), + isCapping: false, + isRecapping: false, + schedTrace: log.New(logFile, "", log.LstdFlags), } return s } @@ -139,7 +139,7 @@ func (s *BPSWMaxMinProacCC) newTask(offer *mesos.Offer, task def.Task) *mesos.Ta mesosutil.NewScalarResource("mem", task.RAM), } - if !s.ignoreWatts { + if s.wattsAsAResource { if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil { log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider) resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider)) @@ -265,7 +265,7 @@ func (s *BPSWMaxMinProacCC) CheckFit(i int, offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) // Does the task fit - if (s.ignoreWatts || (offerWatts >= (*totalWatts + wattsConsideration))) && + if (!s.wattsAsAResource || (offerWatts >= (*totalWatts + wattsConsideration))) && (offerCPU >= (*totalCPU + task.CPU)) && (offerRAM >= (*totalRAM + task.RAM)) { diff --git a/schedulers/firstfit.go b/schedulers/firstfit.go index 09d1c2f..9992721 100644 --- a/schedulers/firstfit.go +++ b/schedulers/firstfit.go @@ -27,7 +27,7 @@ func (s *FirstFit) takeOffer(offer *mesos.Offer, task def.Task) bool { // Error in determining wattsConsideration log.Fatal(err) } - if cpus >= task.CPU && mem >= task.RAM && (s.ignoreWatts || watts >= wattsConsideration) { + if cpus >= task.CPU && mem >= task.RAM && (!s.wattsAsAResource || watts >= wattsConsideration) { return true } @@ -36,14 +36,14 @@ func (s *FirstFit) takeOffer(offer *mesos.Offer, task def.Task) bool { // electronScheduler implements the Scheduler interface type FirstFit struct { - base // Type embedded to inherit common functions - tasksCreated int - tasksRunning int - tasks []def.Task - metrics map[string]def.Metric - running map[string]map[string]bool - ignoreWatts bool - classMapWatts bool + base // Type embedded to inherit common functions + tasksCreated int + tasksRunning int + tasks []def.Task + metrics map[string]def.Metric + running map[string]map[string]bool + wattsAsAResource bool + classMapWatts bool // First set of PCP values are garbage values, signal to logger to start recording when we're // about to schedule a new task @@ -63,7 +63,7 @@ type FirstFit struct { } // New electron scheduler -func NewFirstFit(tasks []def.Task, ignoreWatts bool, schedTracePrefix string, classMapWatts bool) *FirstFit { +func NewFirstFit(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *FirstFit { logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") if err != nil { @@ -71,15 +71,15 @@ func NewFirstFit(tasks []def.Task, ignoreWatts bool, schedTracePrefix string, cl } s := &FirstFit{ - tasks: tasks, - ignoreWatts: ignoreWatts, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - RecordPCP: false, - schedTrace: log.New(logFile, "", log.LstdFlags), + tasks: tasks, + wattsAsAResource: wattsAsAResource, + classMapWatts: classMapWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + RecordPCP: false, + schedTrace: log.New(logFile, "", log.LstdFlags), } return s } @@ -107,7 +107,7 @@ func (s *FirstFit) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { mesosutil.NewScalarResource("mem", task.RAM), } - if !s.ignoreWatts { + if s.wattsAsAResource { if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil { log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider) resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider)) diff --git a/schedulers/firstfitProacCC.go b/schedulers/firstfitProacCC.go index 1766447..ba06be6 100644 --- a/schedulers/firstfitProacCC.go +++ b/schedulers/firstfitProacCC.go @@ -29,7 +29,7 @@ func (s *FirstFitProacCC) takeOffer(offer *mesos.Offer, task def.Task) bool { // Error in determining wattsConsideration log.Fatal(err) } - if offer_cpu >= task.CPU && offer_mem >= task.RAM && (s.ignoreWatts || (offer_watts >= wattsConsideration)) { + if offer_cpu >= task.CPU && offer_mem >= task.RAM && (!s.wattsAsAResource || (offer_watts >= wattsConsideration)) { return true } return false @@ -37,22 +37,22 @@ func (s *FirstFitProacCC) takeOffer(offer *mesos.Offer, task def.Task) bool { // electronScheduler implements the Scheduler interface. type FirstFitProacCC struct { - base // Type embedded to inherit common functions - tasksCreated int - tasksRunning int - tasks []def.Task - metrics map[string]def.Metric - running map[string]map[string]bool - taskMonitor map[string][]def.Task // store tasks that are currently running. - availablePower map[string]float64 // available power for each node in the cluster. - totalPower map[string]float64 // total power for each node in the cluster. - ignoreWatts bool - classMapWatts bool - capper *powCap.ClusterwideCapper - ticker *time.Ticker - recapTicker *time.Ticker - isCapping bool // indicate whether we are currently performing cluster wide capping. - isRecapping bool // indicate whether we are currently performing cluster wide re-capping. + base // Type embedded to inherit common functions + tasksCreated int + tasksRunning int + tasks []def.Task + metrics map[string]def.Metric + running map[string]map[string]bool + taskMonitor map[string][]def.Task // store tasks that are currently running. + availablePower map[string]float64 // available power for each node in the cluster. + totalPower map[string]float64 // total power for each node in the cluster. + wattsAsAResource bool + classMapWatts bool + capper *powCap.ClusterwideCapper + ticker *time.Ticker + recapTicker *time.Ticker + isCapping bool // indicate whether we are currently performing cluster wide capping. + isRecapping bool // indicate whether we are currently performing cluster wide re-capping. // First set of PCP values are garbage values, signal to logger to start recording when we're // about to schedule the new task. @@ -73,7 +73,7 @@ type FirstFitProacCC struct { } // New electron scheduler. -func NewFirstFitProacCC(tasks []def.Task, ignoreWatts bool, schedTracePrefix string, +func NewFirstFitProacCC(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *FirstFitProacCC { logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") @@ -82,23 +82,23 @@ func NewFirstFitProacCC(tasks []def.Task, ignoreWatts bool, schedTracePrefix str } s := &FirstFitProacCC{ - tasks: tasks, - ignoreWatts: ignoreWatts, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - taskMonitor: make(map[string][]def.Task), - availablePower: make(map[string]float64), - totalPower: make(map[string]float64), - RecordPCP: false, - capper: powCap.GetClusterwideCapperInstance(), - ticker: time.NewTicker(10 * time.Second), - recapTicker: time.NewTicker(20 * time.Second), - isCapping: false, - isRecapping: false, - schedTrace: log.New(logFile, "", log.LstdFlags), + tasks: tasks, + wattsAsAResource: wattsAsAResource, + classMapWatts: classMapWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + taskMonitor: make(map[string][]def.Task), + availablePower: make(map[string]float64), + totalPower: make(map[string]float64), + RecordPCP: false, + capper: powCap.GetClusterwideCapperInstance(), + ticker: time.NewTicker(10 * time.Second), + recapTicker: time.NewTicker(20 * time.Second), + isCapping: false, + isRecapping: false, + schedTrace: log.New(logFile, "", log.LstdFlags), } return s } @@ -137,7 +137,7 @@ func (s *FirstFitProacCC) newTask(offer *mesos.Offer, task def.Task) *mesos.Task mesosutil.NewScalarResource("mem", task.RAM), } - if !s.ignoreWatts { + if s.wattsAsAResource { if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil { log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider) resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider)) diff --git a/schedulers/firstfitSortedOffers.go b/schedulers/firstfitSortedOffers.go index 0d519d5..3e4fabe 100644 --- a/schedulers/firstfitSortedOffers.go +++ b/schedulers/firstfitSortedOffers.go @@ -28,7 +28,7 @@ func (s *FirstFitSortedOffers) takeOffer(offer *mesos.Offer, task def.Task) bool // Error in determining wattsConsideration log.Fatal(err) } - if cpus >= task.CPU && mem >= task.RAM && (s.ignoreWatts || watts >= wattsConsideration) { + if cpus >= task.CPU && mem >= task.RAM && (!s.wattsAsAResource || watts >= wattsConsideration) { return true } @@ -37,14 +37,14 @@ func (s *FirstFitSortedOffers) takeOffer(offer *mesos.Offer, task def.Task) bool // electronScheduler implements the Scheduler interface type FirstFitSortedOffers struct { - base // Type embedded to inherit common functions - tasksCreated int - tasksRunning int - tasks []def.Task - metrics map[string]def.Metric - running map[string]map[string]bool - ignoreWatts bool - classMapWatts bool + base // Type embedded to inherit common functions + tasksCreated int + tasksRunning int + tasks []def.Task + metrics map[string]def.Metric + running map[string]map[string]bool + wattsAsAResource bool + classMapWatts bool // First set of PCP values are garbage values, signal to logger to start recording when we're // about to schedule a new task @@ -64,7 +64,7 @@ type FirstFitSortedOffers struct { } // New electron scheduler -func NewFirstFitSortedOffers(tasks []def.Task, ignoreWatts bool, schedTracePrefix string, classMapWatts bool) *FirstFitSortedOffers { +func NewFirstFitSortedOffers(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *FirstFitSortedOffers { logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") if err != nil { @@ -72,15 +72,15 @@ func NewFirstFitSortedOffers(tasks []def.Task, ignoreWatts bool, schedTracePrefi } s := &FirstFitSortedOffers{ - tasks: tasks, - ignoreWatts: ignoreWatts, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - RecordPCP: false, - schedTrace: log.New(logFile, "", log.LstdFlags), + tasks: tasks, + wattsAsAResource: wattsAsAResource, + classMapWatts: classMapWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + RecordPCP: false, + schedTrace: log.New(logFile, "", log.LstdFlags), } return s } @@ -108,7 +108,7 @@ func (s *FirstFitSortedOffers) newTask(offer *mesos.Offer, task def.Task) *mesos mesosutil.NewScalarResource("mem", task.RAM), } - if !s.ignoreWatts { + if s.wattsAsAResource { if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil { log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider) resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider)) diff --git a/schedulers/firstfitSortedWattsProacCC.go b/schedulers/firstfitSortedWattsProacCC.go index e47cb14..bf4964e 100644 --- a/schedulers/firstfitSortedWattsProacCC.go +++ b/schedulers/firstfitSortedWattsProacCC.go @@ -40,7 +40,7 @@ func (s *FirstFitSortedWattsProacCC) takeOffer(offer *mesos.Offer, task def.Task // Error in determining wattsToConsider log.Fatal(err) } - if offer_cpu >= task.CPU && offer_mem >= task.RAM && (s.ignoreWatts || offer_watts >= wattsConsideration) { + if offer_cpu >= task.CPU && offer_mem >= task.RAM && (!s.wattsAsAResource || offer_watts >= wattsConsideration) { return true } return false @@ -48,22 +48,22 @@ func (s *FirstFitSortedWattsProacCC) takeOffer(offer *mesos.Offer, task def.Task // electronScheduler implements the Scheduler interface type FirstFitSortedWattsProacCC struct { - base // Type embedded to inherit common functions - tasksCreated int - tasksRunning int - tasks []def.Task - metrics map[string]def.Metric - running map[string]map[string]bool - taskMonitor map[string][]def.Task // store tasks that are currently running. - availablePower map[string]float64 // available power for each node in the cluster. - totalPower map[string]float64 // total power for each node in the cluster. - ignoreWatts bool - classMapWatts bool - capper *powCap.ClusterwideCapper - ticker *time.Ticker - recapTicker *time.Ticker - isCapping bool // indicate whether we are currently performing cluster wide capping. - isRecapping bool // indicate whether we are currently performing cluster wide re-capping. + base // Type embedded to inherit common functions + tasksCreated int + tasksRunning int + tasks []def.Task + metrics map[string]def.Metric + running map[string]map[string]bool + taskMonitor map[string][]def.Task // store tasks that are currently running. + availablePower map[string]float64 // available power for each node in the cluster. + totalPower map[string]float64 // total power for each node in the cluster. + wattsAsAResource bool + classMapWatts bool + capper *powCap.ClusterwideCapper + ticker *time.Ticker + recapTicker *time.Ticker + isCapping bool // indicate whether we are currently performing cluster wide capping. + isRecapping bool // indicate whether we are currently performing cluster wide re-capping. // First set of PCP values are garbage values, signal to logger to start recording when we're // about to schedule the new task. @@ -84,7 +84,7 @@ type FirstFitSortedWattsProacCC struct { } // New electron scheduler. -func NewFirstFitSortedWattsProacCC(tasks []def.Task, ignoreWatts bool, schedTracePrefix string, +func NewFirstFitSortedWattsProacCC(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *FirstFitSortedWattsProacCC { // Sorting tasks in ascending order of watts @@ -96,23 +96,23 @@ func NewFirstFitSortedWattsProacCC(tasks []def.Task, ignoreWatts bool, schedTrac } s := &FirstFitSortedWattsProacCC{ - tasks: tasks, - ignoreWatts: ignoreWatts, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - taskMonitor: make(map[string][]def.Task), - availablePower: make(map[string]float64), - totalPower: make(map[string]float64), - RecordPCP: false, - capper: powCap.GetClusterwideCapperInstance(), - ticker: time.NewTicker(10 * time.Second), - recapTicker: time.NewTicker(20 * time.Second), - isCapping: false, - isRecapping: false, - schedTrace: log.New(logFile, "", log.LstdFlags), + tasks: tasks, + wattsAsAResource: wattsAsAResource, + classMapWatts: classMapWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + taskMonitor: make(map[string][]def.Task), + availablePower: make(map[string]float64), + totalPower: make(map[string]float64), + RecordPCP: false, + capper: powCap.GetClusterwideCapperInstance(), + ticker: time.NewTicker(10 * time.Second), + recapTicker: time.NewTicker(20 * time.Second), + isCapping: false, + isRecapping: false, + schedTrace: log.New(logFile, "", log.LstdFlags), } return s } @@ -151,7 +151,7 @@ func (s *FirstFitSortedWattsProacCC) newTask(offer *mesos.Offer, task def.Task) mesosutil.NewScalarResource("mem", task.RAM), } - if !s.ignoreWatts { + if s.wattsAsAResource { if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil { resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider)) } else { @@ -202,11 +202,11 @@ func (s *FirstFitSortedWattsProacCC) startCapping() { if rankedCurrentCapValue > 0.0 { for _, host := range constants.Hosts { // Rounding currentCapValue to the nearest int. - if err := rapl.Cap(host, "rapl", int(math.Floor(rankedCurrentCapValue + 0.5))); err != nil { + if err := rapl.Cap(host, "rapl", int(math.Floor(rankedCurrentCapValue+0.5))); err != nil { log.Println(err) } } - log.Printf("Capped the cluster to %d", int(math.Floor(rankedCurrentCapValue + 0.5))) + log.Printf("Capped the cluster to %d", int(math.Floor(rankedCurrentCapValue+0.5))) } rankedMutex.Unlock() } @@ -226,11 +226,11 @@ func (s *FirstFitSortedWattsProacCC) startRecapping() { if s.isRecapping && rankedRecapValue > 0.0 { for _, host := range constants.Hosts { // Rounding currentCapValue to the nearest int. - if err := rapl.Cap(host, "rapl", int(math.Floor(rankedRecapValue + 0.5))); err != nil { + if err := rapl.Cap(host, "rapl", int(math.Floor(rankedRecapValue+0.5))); err != nil { log.Println(err) } } - log.Printf("Recapped the cluster to %d", int(math.Floor(rankedRecapValue + 0.5))) + log.Printf("Recapped the cluster to %d", int(math.Floor(rankedRecapValue+0.5))) } // setting recapping to false s.isRecapping = false diff --git a/schedulers/firstfitSortedWattsSortedOffers.go b/schedulers/firstfitSortedWattsSortedOffers.go index 1047bff..3742db2 100644 --- a/schedulers/firstfitSortedWattsSortedOffers.go +++ b/schedulers/firstfitSortedWattsSortedOffers.go @@ -28,7 +28,7 @@ func (s *FirstFitSortedWattsSortedOffers) takeOffer(offer *mesos.Offer, task def // Error in determining wattsConsideration log.Fatal(err) } - if cpus >= task.CPU && mem >= task.RAM && (s.ignoreWatts || watts >= wattsConsideration) { + if cpus >= task.CPU && mem >= task.RAM && (!s.wattsAsAResource || watts >= wattsConsideration) { return true } @@ -37,14 +37,14 @@ func (s *FirstFitSortedWattsSortedOffers) takeOffer(offer *mesos.Offer, task def // electronScheduler implements the Scheduler interface type FirstFitSortedWattsSortedOffers struct { - base // Type embedded to inherit common functions - tasksCreated int - tasksRunning int - tasks []def.Task - metrics map[string]def.Metric - running map[string]map[string]bool - ignoreWatts bool - classMapWatts bool + base // Type embedded to inherit common functions + tasksCreated int + tasksRunning int + tasks []def.Task + metrics map[string]def.Metric + running map[string]map[string]bool + wattsAsAResource bool + classMapWatts bool // First set of PCP values are garbage values, signal to logger to start recording when we're // about to schedule a new task @@ -64,7 +64,7 @@ type FirstFitSortedWattsSortedOffers struct { } // New electron scheduler -func NewFirstFitSortedWattsSortedOffers(tasks []def.Task, ignoreWatts bool, schedTracePrefix string, +func NewFirstFitSortedWattsSortedOffers(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *FirstFitSortedWattsSortedOffers { // Sorting the tasks in increasing order of watts requirement. @@ -76,15 +76,15 @@ func NewFirstFitSortedWattsSortedOffers(tasks []def.Task, ignoreWatts bool, sche } s := &FirstFitSortedWattsSortedOffers{ - tasks: tasks, - ignoreWatts: ignoreWatts, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - RecordPCP: false, - schedTrace: log.New(logFile, "", log.LstdFlags), + tasks: tasks, + wattsAsAResource: wattsAsAResource, + classMapWatts: classMapWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + RecordPCP: false, + schedTrace: log.New(logFile, "", log.LstdFlags), } return s } @@ -112,7 +112,7 @@ func (s *FirstFitSortedWattsSortedOffers) newTask(offer *mesos.Offer, task def.T mesosutil.NewScalarResource("mem", task.RAM), } - if !s.ignoreWatts { + if s.wattsAsAResource { if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil { log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider) resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider)) diff --git a/schedulers/firstfitsortedwatts.go b/schedulers/firstfitsortedwatts.go index 6dc6ee9..5d624cf 100644 --- a/schedulers/firstfitsortedwatts.go +++ b/schedulers/firstfitsortedwatts.go @@ -28,7 +28,7 @@ func (s *FirstFitSortedWatts) takeOffer(offer *mesos.Offer, task def.Task) bool // Error in determining wattsConsideration log.Fatal(err) } - if cpus >= task.CPU && mem >= task.RAM && (s.ignoreWatts || watts >= wattsConsideration) { + if cpus >= task.CPU && mem >= task.RAM && (!s.wattsAsAResource || watts >= wattsConsideration) { return true } @@ -37,14 +37,14 @@ func (s *FirstFitSortedWatts) takeOffer(offer *mesos.Offer, task def.Task) bool // electronScheduler implements the Scheduler interface type FirstFitSortedWatts struct { - base // Type embedded to inherit common functions - tasksCreated int - tasksRunning int - tasks []def.Task - metrics map[string]def.Metric - running map[string]map[string]bool - ignoreWatts bool - classMapWatts bool + base // Type embedded to inherit common functions + tasksCreated int + tasksRunning int + tasks []def.Task + metrics map[string]def.Metric + running map[string]map[string]bool + wattsAsAResource bool + classMapWatts bool // First set of PCP values are garbage values, signal to logger to start recording when we're // about to schedule a new task @@ -64,7 +64,7 @@ type FirstFitSortedWatts struct { } // New electron scheduler -func NewFirstFitSortedWatts(tasks []def.Task, ignoreWatts bool, schedTracePrefix string, classMapWatts bool) *FirstFitSortedWatts { +func NewFirstFitSortedWatts(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *FirstFitSortedWatts { sort.Sort(def.WattsSorter(tasks)) @@ -74,15 +74,15 @@ func NewFirstFitSortedWatts(tasks []def.Task, ignoreWatts bool, schedTracePrefix } s := &FirstFitSortedWatts{ - tasks: tasks, - ignoreWatts: ignoreWatts, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - RecordPCP: false, - schedTrace: log.New(logFile, "", log.LstdFlags), + tasks: tasks, + wattsAsAResource: wattsAsAResource, + classMapWatts: classMapWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + RecordPCP: false, + schedTrace: log.New(logFile, "", log.LstdFlags), } return s } @@ -110,7 +110,7 @@ func (s *FirstFitSortedWatts) newTask(offer *mesos.Offer, task def.Task) *mesos. mesosutil.NewScalarResource("mem", task.RAM), } - if !s.ignoreWatts { + if s.wattsAsAResource { if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil { log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider) resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider)) diff --git a/schedulers/firstfitwattsonly.go b/schedulers/firstfitwattsonly.go index a349b57..2d531c9 100644 --- a/schedulers/firstfitwattsonly.go +++ b/schedulers/firstfitwattsonly.go @@ -35,14 +35,14 @@ func (s *FirstFitWattsOnly) takeOffer(offer *mesos.Offer, task def.Task) bool { } type FirstFitWattsOnly struct { - base // Type embedded to inherit common functions - tasksCreated int - tasksRunning int - tasks []def.Task - metrics map[string]def.Metric - running map[string]map[string]bool - ignoreWatts bool - classMapWatts bool + base // Type embedded to inherit common functions + tasksCreated int + tasksRunning int + tasks []def.Task + metrics map[string]def.Metric + running map[string]map[string]bool + wattsAsAResource bool + classMapWatts bool // First set of PCP values are garbage values, signal to logger to start recording when we're // about to schedule a new task @@ -62,7 +62,7 @@ type FirstFitWattsOnly struct { } // New electron scheduler -func NewFirstFitWattsOnly(tasks []def.Task, ignoreWatts bool, schedTracePrefix string, classMapWatts bool) *FirstFitWattsOnly { +func NewFirstFitWattsOnly(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *FirstFitWattsOnly { logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") if err != nil { @@ -70,15 +70,15 @@ func NewFirstFitWattsOnly(tasks []def.Task, ignoreWatts bool, schedTracePrefix s } s := &FirstFitWattsOnly{ - tasks: tasks, - ignoreWatts: ignoreWatts, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - RecordPCP: false, - schedTrace: log.New(logFile, "", log.LstdFlags), + tasks: tasks, + wattsAsAResource: wattsAsAResource, + classMapWatts: classMapWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + RecordPCP: false, + schedTrace: log.New(logFile, "", log.LstdFlags), } return s } diff --git a/schedulers/topHeavy.go b/schedulers/topHeavy.go index 61be09c..39ffe03 100644 --- a/schedulers/topHeavy.go +++ b/schedulers/topHeavy.go @@ -34,7 +34,7 @@ type TopHeavy struct { tasks []def.Task metrics map[string]def.Metric running map[string]map[string]bool - ignoreWatts bool + wattsAsAResource bool classMapWatts bool smallTasks, largeTasks []def.Task @@ -56,7 +56,7 @@ type TopHeavy struct { } // New electron scheduler -func NewTopHeavy(tasks []def.Task, ignoreWatts bool, schedTracePrefix string, classMapWatts bool) *TopHeavy { +func NewTopHeavy(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *TopHeavy { sort.Sort(def.WattsSorter(tasks)) logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") @@ -68,16 +68,16 @@ func NewTopHeavy(tasks []def.Task, ignoreWatts bool, schedTracePrefix string, cl // Classification done based on MMPU watts requirements. mid := int(math.Floor((float64(len(tasks)) / 2.0) + 0.5)) s := &TopHeavy{ - smallTasks: tasks[:mid], - largeTasks: tasks[mid+1:], - ignoreWatts: ignoreWatts, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - RecordPCP: false, - schedTrace: log.New(logFile, "", log.LstdFlags), + smallTasks: tasks[:mid], + largeTasks: tasks[mid+1:], + wattsAsAResource: wattsAsAResource, + classMapWatts: classMapWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + RecordPCP: false, + schedTrace: log.New(logFile, "", log.LstdFlags), } return s } @@ -105,7 +105,7 @@ func (s *TopHeavy) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { mesosutil.NewScalarResource("mem", task.RAM), } - if !s.ignoreWatts { + if s.wattsAsAResource { if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil { log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider) resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider)) @@ -186,7 +186,7 @@ func (s *TopHeavy) pack(offers []*mesos.Offer, driver sched.SchedulerDriver) { // Does the task fit // OR lazy evaluation. If ignore watts is set to true, second statement won't // be evaluated. - if (s.ignoreWatts || (offerWatts >= (totalWatts + wattsConsideration))) && + if (!s.wattsAsAResource || (offerWatts >= (totalWatts + wattsConsideration))) && (offerCPU >= (totalCPU + task.CPU)) && (offerRAM >= (totalRAM + task.RAM)) { taken = true @@ -245,7 +245,7 @@ func (s *TopHeavy) spread(offers []*mesos.Offer, driver sched.SchedulerDriver) { } // Decision to take the offer or not - if (s.ignoreWatts || (offerWatts >= wattsConsideration)) && + if (!s.wattsAsAResource || (offerWatts >= wattsConsideration)) && (offerCPU >= task.CPU) && (offerRAM >= task.RAM) { offerTaken = true tasks = append(tasks, s.createTaskInfoAndLogSchedTrace(offer, task))