From 85817494353ceaf9dea3114a923b29695fc8c01d Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Sat, 28 Jan 2017 19:40:39 -0500 Subject: [PATCH] retrofitted all schedulers to call OfferAgg(...) and OffersSorter from utilities/offerUtils and also to use defaultFilter and longFilter from utilities/mesosUtils --- schedulers/binPackSortedWattsSortedOffers.go | 33 ++++++++++--------- schedulers/binpackedpistoncapping.go | 14 ++++---- schedulers/binpacksortedwatts.go | 14 ++++---- schedulers/bottomHeavy.go | 26 ++++++++------- schedulers/bpMaxMin.go | 14 ++++---- schedulers/bpMaxMinPistonCapping.go | 14 ++++---- schedulers/bpMaxMinProacCC.go | 16 +++++---- schedulers/bpswClassMapWatts.go | 14 ++++---- schedulers/bpswClassMapWattsPistonCapping.go | 16 +++++---- schedulers/bpswClassMapWattsProacCC.go | 18 +++++----- schedulers/firstfit.go | 12 ++++--- schedulers/firstfitSortedOffers.go | 16 +++++---- .../firstfitSortedWattsClassMapWatts.go | 12 ++++--- ...firstfitSortedWattsClassMapWattsProacCC.go | 14 ++++---- schedulers/firstfitSortedWattsSortedOffers.go | 16 +++++---- schedulers/firstfitsortedwatts.go | 12 ++++--- schedulers/firstfitwattsonly.go | 12 ++++--- schedulers/proactiveclusterwidecappingfcfs.go | 14 ++++---- .../proactiveclusterwidecappingranked.go | 14 ++++---- schedulers/topHeavy.go | 24 +++++++------- 20 files changed, 182 insertions(+), 143 deletions(-) diff --git a/schedulers/binPackSortedWattsSortedOffers.go b/schedulers/binPackSortedWattsSortedOffers.go index 5d926e9..2f70cb3 100644 --- a/schedulers/binPackSortedWattsSortedOffers.go +++ b/schedulers/binPackSortedWattsSortedOffers.go @@ -2,6 +2,8 @@ package schedulers import ( "bitbucket.org/sunybingcloud/electron/def" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" "fmt" "github.com/golang/protobuf/proto" mesos "github.com/mesos/mesos-go/mesosproto" @@ -17,7 +19,7 @@ import ( // Decides if to take an offer or not func (*BinPackSortedWattsSortedOffers) takeOffer(offer *mesos.Offer, task def.Task) bool { - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) //TODO: Insert watts calculation here instead of taking them as a parameter @@ -37,18 +39,18 @@ type BinPackSortedWattsSortedOffers struct { running map[string]map[string]bool ignoreWatts bool - // First set of PCP values are garbage values, signal to logger to start recording when we're - // about to schedule a new task + // First set of PCP values are garbage values, signal to logger to start recording when we're + // about to schedule a new task RecordPCP bool - // This channel is closed when the program receives an interrupt, - // signalling that the program should shut down. + // This channel is closed when the program receives an interrupt, + // signalling that the program should shut down. Shutdown chan struct{} - // This channel is closed after shutdown is closed, and only when all - // outstanding tasks have been cleaned up + // This channel is closed after shutdown is closed, and only when all + // outstanding tasks have been cleaned up Done chan struct{} - // Controls when to shutdown pcp logging + // Controls when to shutdown pcp logging PCPLog chan struct{} schedTrace *log.Logger @@ -127,13 +129,13 @@ func (s *BinPackSortedWattsSortedOffers) ResourceOffers(driver sched.SchedulerDr log.Printf("Received %d resource offers", len(offers)) // Sorting the offers - sort.Sort(OffersSorter(offers)) + sort.Sort(offerUtils.OffersSorter(offers)) // Printing the sorted offers and the corresponding CPU resource availability log.Println("Sorted Offers:") for i := 0; i < len(offers); i++ { offer := offers[i] - offerCPU, _, _ := OfferAgg(offer) + offerCPU, _, _ := offerUtils.OfferAgg(offer) log.Printf("Offer[%s].CPU = %f\n", offer.GetHostname(), offerCPU) } @@ -141,7 +143,7 @@ func (s *BinPackSortedWattsSortedOffers) ResourceOffers(driver sched.SchedulerDr select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue @@ -150,7 +152,7 @@ func (s *BinPackSortedWattsSortedOffers) ResourceOffers(driver sched.SchedulerDr tasks := []*mesos.TaskInfo{} - offer_cpu, offer_ram, offer_watts := OfferAgg(offer) + offer_cpu, offer_ram, offer_watts := offerUtils.OfferAgg(offer) taken := false totalWatts := 0.0 @@ -203,15 +205,15 @@ func (s *BinPackSortedWattsSortedOffers) ResourceOffers(driver sched.SchedulerDr if taken { log.Printf("Starting on [%s]\n", offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) } else { // If there was no match for the task fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } } @@ -234,4 +236,3 @@ func (s *BinPackSortedWattsSortedOffers) StatusUpdate(driver sched.SchedulerDriv } log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) } - diff --git a/schedulers/binpackedpistoncapping.go b/schedulers/binpackedpistoncapping.go index 2ed96f4..7cf4b9d 100644 --- a/schedulers/binpackedpistoncapping.go +++ b/schedulers/binpackedpistoncapping.go @@ -4,6 +4,8 @@ import ( "bitbucket.org/sunybingcloud/electron/constants" "bitbucket.org/sunybingcloud/electron/def" "bitbucket.org/sunybingcloud/electron/rapl" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" "errors" "fmt" "github.com/golang/protobuf/proto" @@ -217,7 +219,7 @@ func (s *BinPackedPistonCapper) ResourceOffers(driver sched.SchedulerDriver, off // retrieving the total power for each host in the offers for _, offer := range offers { if _, ok := s.totalPower[*offer.Hostname]; !ok { - _, _, offer_watts := OfferAgg(offer) + _, _, offer_watts := offerUtils.OfferAgg(offer) s.totalPower[*offer.Hostname] = offer_watts } } @@ -238,7 +240,7 @@ func (s *BinPackedPistonCapper) ResourceOffers(driver sched.SchedulerDriver, off select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue @@ -246,7 +248,7 @@ func (s *BinPackedPistonCapper) ResourceOffers(driver sched.SchedulerDriver, off } fitTasks := []*mesos.TaskInfo{} - offerCPU, offerRAM, offerWatts := OfferAgg(offer) + offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) taken := false totalWatts := 0.0 totalCPU := 0.0 @@ -309,14 +311,14 @@ func (s *BinPackedPistonCapper) ResourceOffers(driver sched.SchedulerDriver, off bpPistonCapValues[*offer.Hostname] += partialLoad bpPistonMutex.Unlock() log.Printf("Starting on [%s]\n", offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, fitTasks, defaultFilter) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, fitTasks, mesosUtils.DefaultFilter) } else { // If there was no match for task log.Println("There is not enough resources to launch task: ") - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } } diff --git a/schedulers/binpacksortedwatts.go b/schedulers/binpacksortedwatts.go index fdcc82a..00247c7 100644 --- a/schedulers/binpacksortedwatts.go +++ b/schedulers/binpacksortedwatts.go @@ -2,6 +2,8 @@ package schedulers import ( "bitbucket.org/sunybingcloud/electron/def" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" "fmt" "github.com/golang/protobuf/proto" mesos "github.com/mesos/mesos-go/mesosproto" @@ -17,7 +19,7 @@ import ( // Decides if to take an offer or not func (*BinPackSortedWatts) takeOffer(offer *mesos.Offer, task def.Task) bool { - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) //TODO: Insert watts calculation here instead of taking them as a parameter @@ -130,7 +132,7 @@ func (s *BinPackSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offers select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue @@ -139,7 +141,7 @@ func (s *BinPackSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offers tasks := []*mesos.TaskInfo{} - offer_cpu, offer_ram, offer_watts := OfferAgg(offer) + offer_cpu, offer_ram, offer_watts := offerUtils.OfferAgg(offer) taken := false totalWatts := 0.0 @@ -192,15 +194,15 @@ func (s *BinPackSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offers if taken { log.Printf("Starting on [%s]\n", offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) } else { // If there was no match for the task fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } } diff --git a/schedulers/bottomHeavy.go b/schedulers/bottomHeavy.go index 999d9d6..4b4391b 100644 --- a/schedulers/bottomHeavy.go +++ b/schedulers/bottomHeavy.go @@ -3,6 +3,8 @@ package schedulers import ( "bitbucket.org/sunybingcloud/electron/constants" "bitbucket.org/sunybingcloud/electron/def" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" "fmt" "github.com/golang/protobuf/proto" mesos "github.com/mesos/mesos-go/mesosproto" @@ -53,7 +55,7 @@ type BottomHeavy struct { } // New electron scheduler -func NewPackBigSpreadSmall(tasks []def.Task, ignoreWatts bool, schedTracePrefix string) *BottomHeavy { +func NewBottomHeavy(tasks []def.Task, ignoreWatts bool, schedTracePrefix string) *BottomHeavy { sort.Sort(def.WattsSorter(tasks)) logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") @@ -163,7 +165,7 @@ func (s *BottomHeavy) pack(offers []*mesos.Offer, driver sched.SchedulerDriver) select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue @@ -171,7 +173,7 @@ func (s *BottomHeavy) pack(offers []*mesos.Offer, driver sched.SchedulerDriver) } tasks := []*mesos.TaskInfo{} - offerCPU, offerRAM, offerWatts := OfferAgg(offer) + offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) totalWatts := 0.0 totalCPU := 0.0 totalRAM := 0.0 @@ -210,14 +212,14 @@ func (s *BottomHeavy) pack(offers []*mesos.Offer, driver sched.SchedulerDriver) if taken { log.Printf("Starting on [%s]\n", offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) } else { // If there was no match for the task fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } } @@ -228,7 +230,7 @@ func (s *BottomHeavy) spread(offers []*mesos.Offer, driver sched.SchedulerDriver select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue @@ -236,7 +238,7 @@ func (s *BottomHeavy) spread(offers []*mesos.Offer, driver sched.SchedulerDriver } tasks := []*mesos.TaskInfo{} - offerCPU, offerRAM, offerWatts := OfferAgg(offer) + offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) taken := false for i := 0; i < len(s.smallTasks); i++ { task := s.smallTasks[i] @@ -252,7 +254,7 @@ func (s *BottomHeavy) spread(offers []*mesos.Offer, driver sched.SchedulerDriver taken = true tasks = append(tasks, s.createTaskInfoAndLogSchedTrace(offer, powerClass, task)) log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) if *task.Instances <= 0 { // All instances of task have been scheduled, remove it @@ -266,10 +268,10 @@ func (s *BottomHeavy) spread(offers []*mesos.Offer, driver sched.SchedulerDriver if !taken { // If there was no match for the task fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } } @@ -288,7 +290,7 @@ func (s *BottomHeavy) ResourceOffers(driver sched.SchedulerDriver, offers []*mes select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue diff --git a/schedulers/bpMaxMin.go b/schedulers/bpMaxMin.go index 9221476..d5e791a 100644 --- a/schedulers/bpMaxMin.go +++ b/schedulers/bpMaxMin.go @@ -2,6 +2,8 @@ package schedulers import ( "bitbucket.org/sunybingcloud/electron/def" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" "fmt" "github.com/golang/protobuf/proto" mesos "github.com/mesos/mesos-go/mesosproto" @@ -17,7 +19,7 @@ import ( // Decides if to take an offer or not func (*BPMaxMinWatts) takeOffer(offer *mesos.Offer, task def.Task) bool { - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) //TODO: Insert watts calculation here instead of taking them as a parameter @@ -133,7 +135,7 @@ func (s *BPMaxMinWatts) CheckFit(i int, totalRAM *float64, totalWatts *float64) (bool, *mesos.TaskInfo) { - offerCPU, offerRAM, offerWatts := OfferAgg(offer) + offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) // Does the task fit if (s.ignoreWatts || (offerWatts >= (*totalWatts + task.Watts))) && @@ -175,7 +177,7 @@ func (s *BPMaxMinWatts) ResourceOffers(driver sched.SchedulerDriver, offers []*m select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue @@ -240,15 +242,15 @@ func (s *BPMaxMinWatts) ResourceOffers(driver sched.SchedulerDriver, offers []*m if offerTaken { log.Printf("Starting on [%s]\n", offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) } else { // If there was no match for the task fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } } diff --git a/schedulers/bpMaxMinPistonCapping.go b/schedulers/bpMaxMinPistonCapping.go index edc27d8..b4d4e3c 100644 --- a/schedulers/bpMaxMinPistonCapping.go +++ b/schedulers/bpMaxMinPistonCapping.go @@ -4,6 +4,8 @@ import ( "bitbucket.org/sunybingcloud/electron/constants" "bitbucket.org/sunybingcloud/electron/def" "bitbucket.org/sunybingcloud/electron/rapl" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" "errors" "fmt" "github.com/golang/protobuf/proto" @@ -22,7 +24,7 @@ import ( // Decides if to take an offer or not func (s *BPMaxMinPistonCapping) takeOffer(offer *mesos.Offer, task def.Task) bool { - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) //TODO: Insert watts calculation here instead of taking them as a parameter @@ -222,7 +224,7 @@ func (s *BPMaxMinPistonCapping) CheckFit(i int, totalWatts *float64, partialLoad *float64) (bool, *mesos.TaskInfo) { - offerCPU, offerRAM, offerWatts := OfferAgg(offer) + offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) // Does the task fit if (s.ignoreWatts || (offerWatts >= (*totalWatts + task.Watts))) && @@ -271,7 +273,7 @@ func (s *BPMaxMinPistonCapping) ResourceOffers(driver sched.SchedulerDriver, off select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue @@ -343,15 +345,15 @@ func (s *BPMaxMinPistonCapping) ResourceOffers(driver sched.SchedulerDriver, off bpMaxMinPistonCappingCapValues[*offer.Hostname] += partialLoad bpMaxMinPistonCappingMutex.Unlock() log.Printf("Starting on [%s]\n", offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) } else { // If there was no match for the task fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } } diff --git a/schedulers/bpMaxMinProacCC.go b/schedulers/bpMaxMinProacCC.go index 39e96fc..2af372f 100644 --- a/schedulers/bpMaxMinProacCC.go +++ b/schedulers/bpMaxMinProacCC.go @@ -17,11 +17,13 @@ import ( "strings" "sync" "time" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" ) // Decides if to take an offer or not func (s *BPMaxMinProacCC) takeOffer(offer *mesos.Offer, task def.Task) bool { - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) //TODO: Insert watts calculation here instead of taking them as a parameter @@ -246,7 +248,7 @@ func (s *BPMaxMinProacCC) CheckFit(i int, totalRAM *float64, totalWatts *float64) (bool, *mesos.TaskInfo) { - offerCPU, offerRAM, offerWatts := OfferAgg(offer) + offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) // Does the task fit if (s.ignoreWatts || (offerWatts >= (*totalWatts + task.Watts))) && @@ -308,7 +310,7 @@ func (s *BPMaxMinProacCC) ResourceOffers(driver sched.SchedulerDriver, offers [] // retrieving the available power for all the hosts in the offers. for _, offer := range offers { - _, _, offerWatts := OfferAgg(offer) + _, _, offerWatts := offerUtils.OfferAgg(offer) s.availablePower[*offer.Hostname] = offerWatts // setting total power if the first time if _, ok := s.totalPower[*offer.Hostname]; !ok { @@ -324,7 +326,7 @@ func (s *BPMaxMinProacCC) ResourceOffers(driver sched.SchedulerDriver, offers [] select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue @@ -389,15 +391,15 @@ func (s *BPMaxMinProacCC) ResourceOffers(driver sched.SchedulerDriver, offers [] if offerTaken { log.Printf("Starting on [%s]\n", offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) } else { // If there was no match for the task fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } } diff --git a/schedulers/bpswClassMapWatts.go b/schedulers/bpswClassMapWatts.go index 1196459..d35629b 100644 --- a/schedulers/bpswClassMapWatts.go +++ b/schedulers/bpswClassMapWatts.go @@ -12,12 +12,14 @@ import ( "sort" "strings" "time" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" ) // Decides if to take an offer or not func (*BPSWClassMapWatts) takeOffer(offer *mesos.Offer, task def.Task) bool { - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) //TODO: Insert watts calculation here instead of taking them as a parameter @@ -130,7 +132,7 @@ func (s *BPSWClassMapWatts) ResourceOffers(driver sched.SchedulerDriver, offers select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue @@ -139,7 +141,7 @@ func (s *BPSWClassMapWatts) ResourceOffers(driver sched.SchedulerDriver, offers tasks := []*mesos.TaskInfo{} - offerCPU, offerRAM, offerWatts := OfferAgg(offer) + offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) taken := false totalWatts := 0.0 @@ -201,15 +203,15 @@ func (s *BPSWClassMapWatts) ResourceOffers(driver sched.SchedulerDriver, offers if taken { log.Printf("Starting on [%s]\n", offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) } else { // If there was no match for the task fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } } diff --git a/schedulers/bpswClassMapWattsPistonCapping.go b/schedulers/bpswClassMapWattsPistonCapping.go index cae8cc3..4ee7825 100644 --- a/schedulers/bpswClassMapWattsPistonCapping.go +++ b/schedulers/bpswClassMapWattsPistonCapping.go @@ -4,6 +4,8 @@ import ( "bitbucket.org/sunybingcloud/electron/constants" "bitbucket.org/sunybingcloud/electron/def" "bitbucket.org/sunybingcloud/electron/rapl" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" "errors" "fmt" "github.com/golang/protobuf/proto" @@ -21,7 +23,7 @@ import ( // Decides if to take offer or not func (s *BPSWClassMapWattsPistonCapping) takeOffer(offer *mesos.Offer, task def.Task) bool { - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) //TODO: Insert watts calculation here instead of taking them as a parameter @@ -215,7 +217,7 @@ func (s *BPSWClassMapWattsPistonCapping) ResourceOffers(driver sched.SchedulerDr // retrieving the total power for each host in the offers. for _, offer := range offers { if _, ok := s.totalPower[*offer.Hostname]; !ok { - _, _, offerWatts := OfferAgg(offer) + _, _, offerWatts := offerUtils.OfferAgg(offer) s.totalPower[*offer.Hostname] = offerWatts } } @@ -229,7 +231,7 @@ func (s *BPSWClassMapWattsPistonCapping) ResourceOffers(driver sched.SchedulerDr select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue @@ -238,7 +240,7 @@ func (s *BPSWClassMapWattsPistonCapping) ResourceOffers(driver sched.SchedulerDr tasks := []*mesos.TaskInfo{} - offerCPU, offerRAM, offerWatts := OfferAgg(offer) + offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) taken := false totalWatts := 0.0 @@ -312,14 +314,14 @@ func (s *BPSWClassMapWattsPistonCapping) ResourceOffers(driver sched.SchedulerDr bpswClassMapWattsPistonCapValues[*offer.Hostname] += partialLoad bpswClassMapWattsPistonMutex.Unlock() log.Printf("Starting on [%s]\n", offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) } else { // If there was no match for task log.Println("There is not enough resources to launch task: ") - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } } diff --git a/schedulers/bpswClassMapWattsProacCC.go b/schedulers/bpswClassMapWattsProacCC.go index 390aeb9..d94df90 100644 --- a/schedulers/bpswClassMapWattsProacCC.go +++ b/schedulers/bpswClassMapWattsProacCC.go @@ -5,6 +5,8 @@ import ( "bitbucket.org/sunybingcloud/electron/def" "bitbucket.org/sunybingcloud/electron/pcp" "bitbucket.org/sunybingcloud/electron/rapl" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" "fmt" "github.com/golang/protobuf/proto" mesos "github.com/mesos/mesos-go/mesosproto" @@ -21,7 +23,7 @@ import ( // Decides if to take an offer or not func (*BPSWClassMapWattsProacCC) takeOffer(offer *mesos.Offer, task def.Task) bool { - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) // TODO: Insert watts calculation here instead of taking them as parameter @@ -165,7 +167,7 @@ func (s *BPSWClassMapWattsProacCC) Disconnected(sched.SchedulerDriver) { } // go routine to cap the entire cluster in regular intervals of time. -var bpswClassMapWattsProacCCCapValue = 0.0 // initial value to indicate that we haven't capped the cluster yet. +var bpswClassMapWattsProacCCCapValue = 0.0 // initial value to indicate that we haven't capped the cluster yet. var bpswClassMapWattsProacCCNewCapValue = 0.0 // newly computed cap value func (s *BPSWClassMapWattsProacCC) startCapping() { go func() { @@ -251,7 +253,7 @@ func (s *BPSWClassMapWattsProacCC) ResourceOffers(driver sched.SchedulerDriver, // retrieving the available power for all the hosts in the offers. for _, offer := range offers { - _, _, offerWatts := OfferAgg(offer) + _, _, offerWatts := offerUtils.OfferAgg(offer) s.availablePower[*offer.Hostname] = offerWatts // setting total power if the first time if _, ok := s.totalPower[*offer.Hostname]; !ok { @@ -267,7 +269,7 @@ func (s *BPSWClassMapWattsProacCC) ResourceOffers(driver sched.SchedulerDriver, select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue @@ -276,7 +278,7 @@ func (s *BPSWClassMapWattsProacCC) ResourceOffers(driver sched.SchedulerDriver, tasks := []*mesos.TaskInfo{} - offerCPU, offerRAM, offerWatts := OfferAgg(offer) + offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) taken := false totalWatts := 0.0 @@ -357,14 +359,14 @@ func (s *BPSWClassMapWattsProacCC) ResourceOffers(driver sched.SchedulerDriver, if taken { log.Printf("Starting on [%s]\n", offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) } else { // If there was no match for the task fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } } diff --git a/schedulers/firstfit.go b/schedulers/firstfit.go index 4eaecdd..5469bb4 100644 --- a/schedulers/firstfit.go +++ b/schedulers/firstfit.go @@ -2,6 +2,8 @@ package schedulers import ( "bitbucket.org/sunybingcloud/electron/def" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" "fmt" "github.com/golang/protobuf/proto" mesos "github.com/mesos/mesos-go/mesosproto" @@ -16,7 +18,7 @@ import ( // Decides if to take an offer or not func (s *FirstFit) takeOffer(offer *mesos.Offer, task def.Task) bool { - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) //TODO: Insert watts calculation here instead of taking them as a parameter @@ -129,7 +131,7 @@ func (s *FirstFit) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos. select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue @@ -162,7 +164,7 @@ func (s *FirstFit) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos. tasks = append(tasks, taskToSchedule) log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) taken = true @@ -187,10 +189,10 @@ func (s *FirstFit) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos. // If there was no match for the task if !taken { fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } diff --git a/schedulers/firstfitSortedOffers.go b/schedulers/firstfitSortedOffers.go index 09f1d17..06ee713 100644 --- a/schedulers/firstfitSortedOffers.go +++ b/schedulers/firstfitSortedOffers.go @@ -2,6 +2,8 @@ package schedulers import ( "bitbucket.org/sunybingcloud/electron/def" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" "fmt" "github.com/golang/protobuf/proto" mesos "github.com/mesos/mesos-go/mesosproto" @@ -17,7 +19,7 @@ import ( // Decides if to take an offer or not func (s *FirstFitSortedOffers) takeOffer(offer *mesos.Offer, task def.Task) bool { - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) //TODO: Insert watts calculation here instead of taking them as a parameter @@ -127,13 +129,13 @@ func (s *FirstFitSortedOffers) ResourceOffers(driver sched.SchedulerDriver, offe log.Printf("Received %d resource offers", len(offers)) // Sorting the offers - sort.Sort(OffersSorter(offers)) + sort.Sort(offerUtils.OffersSorter(offers)) // Printing the sorted offers and the corresponding CPU resource availability log.Println("Sorted Offers:") for i := 0; i < len(offers); i++ { offer := offers[i] - offerCPU, _, _ := OfferAgg(offer) + offerCPU, _, _ := offerUtils.OfferAgg(offer) log.Printf("Offer[%s].CPU = %f\n", offer.GetHostname(), offerCPU) } @@ -141,7 +143,7 @@ func (s *FirstFitSortedOffers) ResourceOffers(driver sched.SchedulerDriver, offe select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue @@ -174,7 +176,7 @@ func (s *FirstFitSortedOffers) ResourceOffers(driver sched.SchedulerDriver, offe tasks = append(tasks, taskToSchedule) log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) taken = true @@ -199,10 +201,10 @@ func (s *FirstFitSortedOffers) ResourceOffers(driver sched.SchedulerDriver, offe // If there was no match for the task if !taken { fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } diff --git a/schedulers/firstfitSortedWattsClassMapWatts.go b/schedulers/firstfitSortedWattsClassMapWatts.go index 4a03d89..3a0d1df 100644 --- a/schedulers/firstfitSortedWattsClassMapWatts.go +++ b/schedulers/firstfitSortedWattsClassMapWatts.go @@ -2,6 +2,8 @@ package schedulers import ( "bitbucket.org/sunybingcloud/electron/def" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" "fmt" "github.com/golang/protobuf/proto" mesos "github.com/mesos/mesos-go/mesosproto" @@ -117,14 +119,14 @@ func (s *FirstFitSortedWattsClassMapWatts) ResourceOffers(driver sched.Scheduler select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue default: } - offerCPU, offerRAM, offerWatts := OfferAgg(offer) + offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) // First fit strategy taken := false @@ -155,7 +157,7 @@ func (s *FirstFitSortedWattsClassMapWatts) ResourceOffers(driver sched.Scheduler taskToSchedule := s.newTask(offer, task, nodeClass) s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, []*mesos.TaskInfo{taskToSchedule}, defaultFilter) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, []*mesos.TaskInfo{taskToSchedule}, mesosUtils.DefaultFilter) taken = true fmt.Println("Inst: ", *task.Instances) @@ -176,10 +178,10 @@ func (s *FirstFitSortedWattsClassMapWatts) ResourceOffers(driver sched.Scheduler // If there was no match for the task if !taken { fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } diff --git a/schedulers/firstfitSortedWattsClassMapWattsProacCC.go b/schedulers/firstfitSortedWattsClassMapWattsProacCC.go index 3cc9fb9..f083ffb 100644 --- a/schedulers/firstfitSortedWattsClassMapWattsProacCC.go +++ b/schedulers/firstfitSortedWattsClassMapWattsProacCC.go @@ -17,6 +17,8 @@ import ( "strings" "sync" "time" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" ) // electron scheduler implements the Scheduler interface @@ -239,7 +241,7 @@ func (s *FirstFitSortedWattsClassMapWattsProacCC) ResourceOffers(driver sched.Sc // retrieving the available power for all the hosts in the offers. for _, offer := range offers { - _, _, offerWatts := OfferAgg(offer) + _, _, offerWatts := offerUtils.OfferAgg(offer) s.availablePower[*offer.Hostname] = offerWatts // setting total power if the first time if _, ok := s.totalPower[*offer.Hostname]; !ok { @@ -255,14 +257,14 @@ func (s *FirstFitSortedWattsClassMapWattsProacCC) ResourceOffers(driver sched.Sc select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue default: } - offerCPU, offerRAM, offerWatts := OfferAgg(offer) + offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) // First fit strategy taken := false @@ -313,7 +315,7 @@ func (s *FirstFitSortedWattsClassMapWattsProacCC) ResourceOffers(driver sched.Sc taskToSchedule := s.newTask(offer, task, nodeClass) s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, []*mesos.TaskInfo{taskToSchedule}, defaultFilter) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, []*mesos.TaskInfo{taskToSchedule}, mesosUtils.DefaultFilter) taken = true fmt.Println("Inst: ", *task.Instances) @@ -337,10 +339,10 @@ func (s *FirstFitSortedWattsClassMapWattsProacCC) ResourceOffers(driver sched.Sc // If there was no match for the task if !taken { fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } } diff --git a/schedulers/firstfitSortedWattsSortedOffers.go b/schedulers/firstfitSortedWattsSortedOffers.go index 0590585..3b4bb4e 100644 --- a/schedulers/firstfitSortedWattsSortedOffers.go +++ b/schedulers/firstfitSortedWattsSortedOffers.go @@ -2,6 +2,8 @@ package schedulers import ( "bitbucket.org/sunybingcloud/electron/def" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" "fmt" "github.com/golang/protobuf/proto" mesos "github.com/mesos/mesos-go/mesosproto" @@ -17,7 +19,7 @@ import ( // Decides if to take an offer or not func (s *FirstFitSortedWattsSortedOffers) takeOffer(offer *mesos.Offer, task def.Task) bool { - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) //TODO: Insert watts calculation here instead of taking them as a parameter @@ -128,13 +130,13 @@ func (s *FirstFitSortedWattsSortedOffers) newTask(offer *mesos.Offer, task def.T func (s *FirstFitSortedWattsSortedOffers) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { // Sorting the offers - sort.Sort(OffersSorter(offers)) + sort.Sort(offerUtils.OffersSorter(offers)) // Printing the sorted offers and the corresponding CPU resource availability log.Println("Sorted Offers:") for i := 0; i < len(offers); i++ { offer := offers[i] - offerCPU, _, _ := OfferAgg(offer) + offerCPU, _, _ := offerUtils.OfferAgg(offer) log.Printf("Offer[%s].CPU = %f\n", offer.GetHostname(), offerCPU) } @@ -144,7 +146,7 @@ func (s *FirstFitSortedWattsSortedOffers) ResourceOffers(driver sched.SchedulerD select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue @@ -177,7 +179,7 @@ func (s *FirstFitSortedWattsSortedOffers) ResourceOffers(driver sched.SchedulerD tasks = append(tasks, taskToSchedule) log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) taken = true @@ -201,10 +203,10 @@ func (s *FirstFitSortedWattsSortedOffers) ResourceOffers(driver sched.SchedulerD // If there was no match for the task if !taken { fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } diff --git a/schedulers/firstfitsortedwatts.go b/schedulers/firstfitsortedwatts.go index 940ef90..ab8d9c3 100644 --- a/schedulers/firstfitsortedwatts.go +++ b/schedulers/firstfitsortedwatts.go @@ -2,6 +2,8 @@ package schedulers import ( "bitbucket.org/sunybingcloud/electron/def" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" "fmt" "github.com/golang/protobuf/proto" mesos "github.com/mesos/mesos-go/mesosproto" @@ -17,7 +19,7 @@ import ( // Decides if to take an offer or not func (s *FirstFitSortedWatts) takeOffer(offer *mesos.Offer, task def.Task) bool { - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) //TODO: Insert watts calculation here instead of taking them as a parameter @@ -132,7 +134,7 @@ func (s *FirstFitSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offer select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue @@ -165,7 +167,7 @@ func (s *FirstFitSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offer tasks = append(tasks, taskToSchedule) log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) taken = true @@ -189,10 +191,10 @@ func (s *FirstFitSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offer // If there was no match for the task if !taken { fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } diff --git a/schedulers/firstfitwattsonly.go b/schedulers/firstfitwattsonly.go index c23727f..c24e75e 100644 --- a/schedulers/firstfitwattsonly.go +++ b/schedulers/firstfitwattsonly.go @@ -2,6 +2,8 @@ package schedulers import ( "bitbucket.org/sunybingcloud/electron/def" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" "fmt" "github.com/golang/protobuf/proto" mesos "github.com/mesos/mesos-go/mesosproto" @@ -16,7 +18,7 @@ import ( // Decides if to take an offer or not func (*FirstFitWattsOnly) takeOffer(offer *mesos.Offer, task def.Task) bool { - _, _, watts := OfferAgg(offer) + _, _, watts := offerUtils.OfferAgg(offer) //TODO: Insert watts calculation here instead of taking them as a parameter @@ -123,7 +125,7 @@ func (s *FirstFitWattsOnly) ResourceOffers(driver sched.SchedulerDriver, offers select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue @@ -156,7 +158,7 @@ func (s *FirstFitWattsOnly) ResourceOffers(driver sched.SchedulerDriver, offers tasks = append(tasks, taskToSchedule) log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) taken = true @@ -181,10 +183,10 @@ func (s *FirstFitWattsOnly) ResourceOffers(driver sched.SchedulerDriver, offers // If there was no match for the task if !taken { fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } diff --git a/schedulers/proactiveclusterwidecappingfcfs.go b/schedulers/proactiveclusterwidecappingfcfs.go index d89390b..b7491f7 100644 --- a/schedulers/proactiveclusterwidecappingfcfs.go +++ b/schedulers/proactiveclusterwidecappingfcfs.go @@ -16,11 +16,13 @@ import ( "strings" "sync" "time" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" ) // Decides if to take an offer or not func (_ *ProactiveClusterwideCapFCFS) takeOffer(offer *mesos.Offer, task def.Task) bool { - offer_cpu, offer_mem, offer_watts := OfferAgg(offer) + offer_cpu, offer_mem, offer_watts := offerUtils.OfferAgg(offer) if offer_cpu >= task.CPU && offer_mem >= task.RAM && offer_watts >= task.Watts { return true @@ -240,7 +242,7 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive // retrieving the available power for all the hosts in the offers. for _, offer := range offers { - _, _, offer_watts := OfferAgg(offer) + _, _, offer_watts := offerUtils.OfferAgg(offer) s.availablePower[*offer.Hostname] = offer_watts // setting total power if the first time. if _, ok := s.totalPower[*offer.Hostname]; !ok { @@ -256,7 +258,7 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue @@ -305,7 +307,7 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive log.Printf("Starting on [%s]\n", offer.GetHostname()) taskToSchedule := s.newTask(offer, task) toSchedule := []*mesos.TaskInfo{taskToSchedule} - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, toSchedule, defaultFilter) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, toSchedule, mesosUtils.DefaultFilter) log.Printf("Inst: %d", *task.Instances) s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) *task.Instances-- @@ -331,10 +333,10 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive // If no task fit the offer, then declining the offer. if !taken { log.Printf("There is not enough resources to launch a task on Host: %s\n", offer.GetHostname()) - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } } diff --git a/schedulers/proactiveclusterwidecappingranked.go b/schedulers/proactiveclusterwidecappingranked.go index f4c3484..9a8e4c4 100644 --- a/schedulers/proactiveclusterwidecappingranked.go +++ b/schedulers/proactiveclusterwidecappingranked.go @@ -15,6 +15,8 @@ import ( "bitbucket.org/sunybingcloud/electron/def" "bitbucket.org/sunybingcloud/electron/pcp" "bitbucket.org/sunybingcloud/electron/rapl" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" "fmt" "github.com/golang/protobuf/proto" mesos "github.com/mesos/mesos-go/mesosproto" @@ -31,7 +33,7 @@ import ( // Decides if to taken an offer or not func (_ *ProactiveClusterwideCapRanked) takeOffer(offer *mesos.Offer, task def.Task) bool { - offer_cpu, offer_mem, offer_watts := OfferAgg(offer) + offer_cpu, offer_mem, offer_watts := offerUtils.OfferAgg(offer) if offer_cpu >= task.CPU && offer_mem >= task.RAM && offer_watts >= task.Watts { return true @@ -251,7 +253,7 @@ func (s *ProactiveClusterwideCapRanked) ResourceOffers(driver sched.SchedulerDri // retrieving the available power for all the hosts in the offers. for _, offer := range offers { - _, _, offer_watts := OfferAgg(offer) + _, _, offer_watts := offerUtils.OfferAgg(offer) s.availablePower[*offer.Hostname] = offer_watts // setting total power if the first time. if _, ok := s.totalPower[*offer.Hostname]; !ok { @@ -277,7 +279,7 @@ func (s *ProactiveClusterwideCapRanked) ResourceOffers(driver sched.SchedulerDri select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue @@ -328,7 +330,7 @@ func (s *ProactiveClusterwideCapRanked) ResourceOffers(driver sched.SchedulerDri log.Printf("Starting on [%s]\n", offer.GetHostname()) taskToSchedule := s.newTask(offer, task) to_schedule := []*mesos.TaskInfo{taskToSchedule} - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, to_schedule, defaultFilter) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, to_schedule, mesosUtils.DefaultFilter) log.Printf("Inst: %d", *task.Instances) s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) *task.Instances-- @@ -354,10 +356,10 @@ func (s *ProactiveClusterwideCapRanked) ResourceOffers(driver sched.SchedulerDri // If no tasks fit the offer, then declining the offer. if !taken { log.Printf("There is not enough resources to launch a task on Host: %s\n", offer.GetHostname()) - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } } diff --git a/schedulers/topHeavy.go b/schedulers/topHeavy.go index f0d0920..9454e40 100644 --- a/schedulers/topHeavy.go +++ b/schedulers/topHeavy.go @@ -3,6 +3,8 @@ package schedulers import ( "bitbucket.org/sunybingcloud/electron/constants" "bitbucket.org/sunybingcloud/electron/def" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" "fmt" "github.com/golang/protobuf/proto" mesos "github.com/mesos/mesos-go/mesosproto" @@ -163,7 +165,7 @@ func (s *TopHeavy) pack(offers []*mesos.Offer, driver sched.SchedulerDriver) { select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue @@ -171,7 +173,7 @@ func (s *TopHeavy) pack(offers []*mesos.Offer, driver sched.SchedulerDriver) { } tasks := []*mesos.TaskInfo{} - offerCPU, offerRAM, offerWatts := OfferAgg(offer) + offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) totalWatts := 0.0 totalCPU := 0.0 totalRAM := 0.0 @@ -210,14 +212,14 @@ func (s *TopHeavy) pack(offers []*mesos.Offer, driver sched.SchedulerDriver) { if taken { log.Printf("Starting on [%s]\n", offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) } else { // If there was no match for the task fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } } @@ -228,7 +230,7 @@ func (s *TopHeavy) spread(offers []*mesos.Offer, driver sched.SchedulerDriver) { select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue @@ -236,7 +238,7 @@ func (s *TopHeavy) spread(offers []*mesos.Offer, driver sched.SchedulerDriver) { } tasks := []*mesos.TaskInfo{} - offerCPU, offerRAM, offerWatts := OfferAgg(offer) + offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) taken := false for i := 0; i < len(s.largeTasks); i++ { task := s.largeTasks[i] @@ -252,7 +254,7 @@ func (s *TopHeavy) spread(offers []*mesos.Offer, driver sched.SchedulerDriver) { taken = true tasks = append(tasks, s.createTaskInfoAndLogSchedTrace(offer, powerClass, task)) log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) if *task.Instances <= 0 { // All instances of task have been scheduled, remove it @@ -266,10 +268,10 @@ func (s *TopHeavy) spread(offers []*mesos.Offer, driver sched.SchedulerDriver) { if !taken { // If there was no match for the task fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } } @@ -288,7 +290,7 @@ func (s *TopHeavy) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos. select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue