From 3af1d561c26555ced934f9a9afc4184ce630c7e1 Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Tue, 31 Jan 2017 15:33:31 -0500 Subject: [PATCH] retrofitted to use offerUtils.PowerClass(...) instead of inlining the code in every scheduler. Reduced redundant code. Changed name of newTaskClass in newTask(...) to powerClass. --- schedulers/bpswClassMapWatts.go | 19 +++++++------------ schedulers/bpswClassMapWattsPistonCapping.go | 19 +++++++------------ schedulers/bpswClassMapWattsProacCC.go | 19 +++++++------------ .../firstfitSortedWattsClassMapWatts.go | 18 +++++++----------- ...firstfitSortedWattsClassMapWattsProacCC.go | 19 +++++++------------ 5 files changed, 35 insertions(+), 59 deletions(-) diff --git a/schedulers/bpswClassMapWatts.go b/schedulers/bpswClassMapWatts.go index a648e15..79f3882 100644 --- a/schedulers/bpswClassMapWatts.go +++ b/schedulers/bpswClassMapWatts.go @@ -78,7 +78,7 @@ func NewBPSWClassMapWatts(tasks []def.Task, ignoreWatts bool, schedTracePrefix s return s } -func (s *BPSWClassMapWatts) newTask(offer *mesos.Offer, task def.Task, newTaskClass string) *mesos.TaskInfo { +func (s *BPSWClassMapWatts) newTask(offer *mesos.Offer, task def.Task, powerClass string) *mesos.TaskInfo { taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) s.tasksCreated++ @@ -102,7 +102,7 @@ func (s *BPSWClassMapWatts) newTask(offer *mesos.Offer, task def.Task, newTaskCl } if !s.ignoreWatts { - resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[newTaskClass])) + resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[powerClass])) } return &mesos.TaskInfo{ @@ -159,27 +159,22 @@ func (s *BPSWClassMapWatts) ResourceOffers(driver sched.SchedulerDriver, offers } for *task.Instances > 0 { - var nodeClass string - for _, attr := range offer.GetAttributes() { - if attr.GetName() == "class" { - nodeClass = attr.GetText().GetValue() - } - } + powerClass := offerUtils.PowerClass(offer) // Does the task fit // OR lazy evaluation. If ignore watts is set to true, second statement won't // be evaluated. - if (s.ignoreWatts || (offerWatts >= (totalWatts + task.ClassToWatts[nodeClass]))) && + if (s.ignoreWatts || (offerWatts >= (totalWatts + task.ClassToWatts[powerClass]))) && (offerCPU >= (totalCPU + task.CPU)) && (offerRAM >= (totalRAM + task.RAM)) { - fmt.Println("Watts being used: ", task.ClassToWatts[nodeClass]) + fmt.Println("Watts being used: ", task.ClassToWatts[powerClass]) taken = true - totalWatts += task.ClassToWatts[nodeClass] + totalWatts += task.ClassToWatts[powerClass] totalCPU += task.CPU totalRAM += task.RAM log.Println("Co-Located with: ") coLocated(s.running[offer.GetSlaveId().GoString()]) - taskToSchedule := s.newTask(offer, task, nodeClass) + taskToSchedule := s.newTask(offer, task, powerClass) tasks = append(tasks, taskToSchedule) fmt.Println("Inst: ", *task.Instances) diff --git a/schedulers/bpswClassMapWattsPistonCapping.go b/schedulers/bpswClassMapWattsPistonCapping.go index 4ee7825..baec23d 100644 --- a/schedulers/bpswClassMapWattsPistonCapping.go +++ b/schedulers/bpswClassMapWattsPistonCapping.go @@ -91,7 +91,7 @@ func NewBPSWClassMapWattsPistonCapping(tasks []def.Task, ignoreWatts bool, sched return s } -func (s *BPSWClassMapWattsPistonCapping) newTask(offer *mesos.Offer, task def.Task, newTaskClass string) *mesos.TaskInfo { +func (s *BPSWClassMapWattsPistonCapping) newTask(offer *mesos.Offer, task def.Task, powerClass string) *mesos.TaskInfo { taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) s.tasksCreated++ @@ -125,7 +125,7 @@ func (s *BPSWClassMapWattsPistonCapping) newTask(offer *mesos.Offer, task def.Ta } if !s.ignoreWatts { - resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[newTaskClass])) + resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[powerClass])) } return &mesos.TaskInfo{ @@ -260,16 +260,11 @@ func (s *BPSWClassMapWattsPistonCapping) ResourceOffers(driver sched.SchedulerDr } for *task.Instances > 0 { - var nodeClass string - for _, attr := range offer.GetAttributes() { - if attr.GetName() == "class" { - nodeClass = attr.GetText().GetValue() - } - } + powerClass := offerUtils.PowerClass(offer) // Does the task fit // OR lazy evaluation. If ignoreWatts is set to true, second statement won't // be evaluated - if (s.ignoreWatts || (offerWatts >= (totalWatts + task.ClassToWatts[nodeClass]))) && + if (s.ignoreWatts || (offerWatts >= (totalWatts + task.ClassToWatts[powerClass]))) && (offerCPU >= (totalCPU + task.CPU)) && (offerRAM >= (totalRAM + task.RAM)) { @@ -279,14 +274,14 @@ func (s *BPSWClassMapWattsPistonCapping) ResourceOffers(driver sched.SchedulerDr s.startCapping() } - fmt.Println("Watts being used: ", task.ClassToWatts[nodeClass]) + fmt.Println("Watts being used: ", task.ClassToWatts[powerClass]) taken = true - totalWatts += task.ClassToWatts[nodeClass] + totalWatts += task.ClassToWatts[powerClass] totalCPU += task.CPU totalRAM += task.RAM log.Println("Co-Located with: ") coLocated(s.running[offer.GetSlaveId().GoString()]) - taskToSchedule := s.newTask(offer, task, nodeClass) + taskToSchedule := s.newTask(offer, task, powerClass) tasks = append(tasks, taskToSchedule) fmt.Println("Inst: ", *task.Instances) diff --git a/schedulers/bpswClassMapWattsProacCC.go b/schedulers/bpswClassMapWattsProacCC.go index 9a50a69..12ddb46 100644 --- a/schedulers/bpswClassMapWattsProacCC.go +++ b/schedulers/bpswClassMapWattsProacCC.go @@ -101,7 +101,7 @@ func NewBPSWClassMapWattsProacCC(tasks []def.Task, ignoreWatts bool, schedTraceP // mutex var bpswClassMapWattsProacCCMutex sync.Mutex -func (s *BPSWClassMapWattsProacCC) newTask(offer *mesos.Offer, task def.Task, newTaskClass string) *mesos.TaskInfo { +func (s *BPSWClassMapWattsProacCC) newTask(offer *mesos.Offer, task def.Task, powerClass string) *mesos.TaskInfo { taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) s.tasksCreated++ @@ -133,7 +133,7 @@ func (s *BPSWClassMapWattsProacCC) newTask(offer *mesos.Offer, task def.Task, ne } if !s.ignoreWatts { - resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[newTaskClass])) + resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[powerClass])) } return &mesos.TaskInfo{ @@ -295,16 +295,11 @@ func (s *BPSWClassMapWattsProacCC) ResourceOffers(driver sched.SchedulerDriver, } for *task.Instances > 0 { - var nodeClass string - for _, attr := range offer.GetAttributes() { - if attr.GetName() == "class" { - nodeClass = attr.GetText().GetValue() - } - } + powerClass := offerUtils.PowerClass(offer) // Does the task fit // OR Lazy evaluation. If ignore watts is set to true, second statement won't // be evaluated. - if (s.ignoreWatts || (offerWatts >= (totalWatts + task.ClassToWatts[nodeClass]))) && + if (s.ignoreWatts || (offerWatts >= (totalWatts + task.ClassToWatts[powerClass]))) && (offerCPU >= (totalCPU + task.CPU)) && (offerRAM >= (totalRAM + task.RAM)) { @@ -316,7 +311,7 @@ func (s *BPSWClassMapWattsProacCC) ResourceOffers(driver sched.SchedulerDriver, s.startCapping() } - fmt.Println("Watts being used: ", task.ClassToWatts[nodeClass]) + fmt.Println("Watts being used: ", task.ClassToWatts[powerClass]) tempCap, err := s.capper.FCFSDeterminedCap(s.totalPower, &task) if err == nil { bpswClassMapWattsProacCCMutex.Lock() @@ -327,12 +322,12 @@ func (s *BPSWClassMapWattsProacCC) ResourceOffers(driver sched.SchedulerDriver, log.Println(err) } taken = true - totalWatts += task.ClassToWatts[nodeClass] + totalWatts += task.ClassToWatts[powerClass] totalCPU += task.CPU totalRAM += task.RAM log.Println("Co-Located with: ") coLocated(s.running[offer.GetSlaveId().GoString()]) - taskToSchedule := s.newTask(offer, task, nodeClass) + taskToSchedule := s.newTask(offer, task, powerClass) tasks = append(tasks, taskToSchedule) fmt.Println("Inst: ", *task.Instances) diff --git a/schedulers/firstfitSortedWattsClassMapWatts.go b/schedulers/firstfitSortedWattsClassMapWatts.go index 3a0d1df..b8b51e8 100644 --- a/schedulers/firstfitSortedWattsClassMapWatts.go +++ b/schedulers/firstfitSortedWattsClassMapWatts.go @@ -65,7 +65,7 @@ func NewFirstFitSortedWattsClassMapWatts(tasks []def.Task, ignoreWatts bool, sch return s } -func (s *FirstFitSortedWattsClassMapWatts) newTask(offer *mesos.Offer, task def.Task, newTaskClass string) *mesos.TaskInfo { +func (s *FirstFitSortedWattsClassMapWatts) newTask(offer *mesos.Offer, task def.Task, powerClass string) *mesos.TaskInfo { taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) s.tasksCreated++ @@ -89,7 +89,7 @@ func (s *FirstFitSortedWattsClassMapWatts) newTask(offer *mesos.Offer, task def. } if !s.ignoreWatts { - resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[newTaskClass])) + resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[powerClass])) } return &mesos.TaskInfo{ @@ -140,21 +140,17 @@ func (s *FirstFitSortedWattsClassMapWatts) ResourceOffers(driver sched.Scheduler } } - // retrieving the node class from the offer - var nodeClass string - for _, attr := range offer.GetAttributes() { - if attr.GetName() == "class" { - nodeClass = attr.GetText().GetValue() - } - } + // retrieving the powerClass from the offer + powerClass := offerUtils.PowerClass(offer) // Decision to take the offer or not - if (s.ignoreWatts || (offerWatts >= task.ClassToWatts[nodeClass])) && + if (s.ignoreWatts || (offerWatts >= task.ClassToWatts[powerClass])) && (offerCPU >= task.CPU) && (offerRAM >= task.RAM) { + fmt.Println("Watts being used: ", task.ClassToWatts[powerClass]) log.Println("Co-Located with: ") coLocated(s.running[offer.GetSlaveId().GoString()]) - taskToSchedule := s.newTask(offer, task, nodeClass) + taskToSchedule := s.newTask(offer, task, powerClass) s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) driver.LaunchTasks([]*mesos.OfferID{offer.Id}, []*mesos.TaskInfo{taskToSchedule}, mesosUtils.DefaultFilter) diff --git a/schedulers/firstfitSortedWattsClassMapWattsProacCC.go b/schedulers/firstfitSortedWattsClassMapWattsProacCC.go index f3a09d2..0db8a05 100644 --- a/schedulers/firstfitSortedWattsClassMapWattsProacCC.go +++ b/schedulers/firstfitSortedWattsClassMapWattsProacCC.go @@ -89,7 +89,7 @@ func NewFirstFitSortedWattsClassMapWattsProacCC(tasks []def.Task, ignoreWatts bo // mutex var ffswClassMapWattsProacCCMutex sync.Mutex -func (s *FirstFitSortedWattsClassMapWattsProacCC) newTask(offer *mesos.Offer, task def.Task, newTaskClass string) *mesos.TaskInfo { +func (s *FirstFitSortedWattsClassMapWattsProacCC) newTask(offer *mesos.Offer, task def.Task, powerClass string) *mesos.TaskInfo { taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) s.tasksCreated++ @@ -121,7 +121,7 @@ func (s *FirstFitSortedWattsClassMapWattsProacCC) newTask(offer *mesos.Offer, ta } if !s.ignoreWatts { - resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[newTaskClass])) + resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[powerClass])) } return &mesos.TaskInfo{ @@ -278,16 +278,11 @@ func (s *FirstFitSortedWattsClassMapWattsProacCC) ResourceOffers(driver sched.Sc } } - // Retrieving the node class from the offer - var nodeClass string - for _, attr := range offer.GetAttributes() { - if attr.GetName() == "class" { - nodeClass = attr.GetText().GetValue() - } - } + // retrieving the powerClass for the offer + powerClass := offerUtils.PowerClass(offer) // Decision to take the offer or not - if (s.ignoreWatts || (offerWatts >= task.ClassToWatts[nodeClass])) && + if (s.ignoreWatts || (offerWatts >= task.ClassToWatts[powerClass])) && (offerCPU >= task.CPU) && (offerRAM >= task.RAM) { // Capping the cluster if haven't yet started @@ -298,7 +293,7 @@ func (s *FirstFitSortedWattsClassMapWattsProacCC) ResourceOffers(driver sched.Sc s.startCapping() } - fmt.Println("Watts being used: ", task.ClassToWatts[nodeClass]) + fmt.Println("Watts being used: ", task.ClassToWatts[powerClass]) tempCap, err := s.capper.FCFSDeterminedCap(s.totalPower, &task) if err == nil { ffswClassMapWattsProacCCMutex.Lock() @@ -312,7 +307,7 @@ func (s *FirstFitSortedWattsClassMapWattsProacCC) ResourceOffers(driver sched.Sc log.Println("Co-Located with: ") coLocated(s.running[offer.GetSlaveId().GoString()]) - taskToSchedule := s.newTask(offer, task, nodeClass) + taskToSchedule := s.newTask(offer, task, powerClass) s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) driver.LaunchTasks([]*mesos.OfferID{offer.Id}, []*mesos.TaskInfo{taskToSchedule}, mesosUtils.DefaultFilter)