diff --git a/README.md b/README.md index 2e494da..16a1114 100644 --- a/README.md +++ b/README.md @@ -8,9 +8,8 @@ To Do: * Add ability to use constraints * Running average calculations https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average * Make parameters corresponding to each scheduler configurable (possible to have a config template for each scheduler?) - * TODO : Adding type of scheduler to be used, to be picked from a config file, along with it's configurable parameters. + * Adding type of scheduler to be used, to be picked from a config file, along with it's configurable parameters. * Write test code for each scheduler (This should be after the design change) - * Some of the constants in constants/constants.go can vary based on the environment. Possible to setup the constants at runtime based on the environment? * Log fix for declining offer -- different reason when insufficient resources as compared to when there are no longer any tasks to schedule. @@ -18,6 +17,8 @@ To Do: * Make def.Task an interface for further modularization and flexibility. * Convert def#WattsToConsider(...) to be a receiver of def.Task and change the name of it to Watts(...). * Have a generic sorter for task resources instead of having one for each kind of resource. + * Handle powerclass not configured on a node condition. As of now, an assumption is made that the powerclass is configured + for all the nodes. **Requires [Performance Co-Pilot](http://pcp.io/) tool pmdumptext to be installed on the machine on which electron is launched for logging to work and PCP collector agents installed @@ -28,9 +29,9 @@ How to run (Use the --help option to get information about other command-line op `./electron -workload ` -To run electron with ignoreWatts, run the following command, +To run electron with Watts as Resource, run the following command, -`./electron -workload -ignoreWatts` +`./electron -workload -wattsAsAResource` Workload schema: @@ -41,30 +42,31 @@ Workload schema: "name": "minife", "cpu": 3.0, "ram": 4096, - "watts": 50, - "image": "gouravr/minife:v5", - "cmd": "cd src && mpirun -np 1 miniFE.x -nx 100 -ny 100 -nz 100", - "inst": 9, - "class_to_watts" : { - "A": 30.2475289996, - "B": 35.6491229228, - "C": 24.0476734352 - } - + "watts": 63.141, + "class_to_watts": { + "A": 93.062, + "B": 65.552, + "C": 57.897, + "D": 60.729 + }, + "image": "rdelvalle/minife:electron1", + "cmd": "cd src && mpirun -np 3 miniFE.x -nx 100 -ny 100 -nz 100", + "inst": 10 }, { "name": "dgemm", "cpu": 3.0, - "ram": 4096, - "watts": 50, - "image": "gouravr/dgemm:v2", + "ram": 32, + "watts": 85.903, + "class_to_watts": { + "A": 114.789, + "B": 89.133, + "C": 82.672, + "D": 81.944 + }, + "image": "rdelvalle/dgemm:electron1", "cmd": "/./mt-dgemm 1024", - "inst": 9, - "class_to_watts" : { - "A": 35.2475289996, - "B": 25.6491229228, - "C": 29.0476734352 - } + "inst": 10 } ] ``` diff --git a/constants/constants.go b/constants/constants.go index 1cc97cf..313b620 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -7,34 +7,14 @@ TODO: Clean this up and use Mesos Attributes instead. */ package constants -var Hosts = []string{"stratos-001.cs.binghamton.edu", "stratos-002.cs.binghamton.edu", - "stratos-003.cs.binghamton.edu", "stratos-004.cs.binghamton.edu", - "stratos-005.cs.binghamton.edu", "stratos-006.cs.binghamton.edu", - "stratos-007.cs.binghamton.edu", "stratos-008.cs.binghamton.edu"} +var Hosts = make(map[string]struct{}) /* Classification of the nodes in the cluster based on their Thermal Design Power (TDP). The power classes are labelled in the decreasing order of the corresponding TDP, with class A nodes having the highest TDP and class C nodes having the lowest TDP. */ -var PowerClasses = map[string]map[string]bool{ - "A": map[string]bool{ - "stratos-005.cs.binghamton.edu": true, - "stratos-006.cs.binghamton.edu": true, - }, - "B": map[string]bool{ - "stratos-007.cs.binghamton.edu": true, - }, - "C": map[string]bool{ - "stratos-008.cs.binghamton.edu": true, - }, - "D": map[string]bool { - "stratos-001.cs.binghamton.edu": true, - "stratos-002.cs.binghamton.edu": true, - "stratos-003.cs.binghamton.edu": true, - "stratos-004.cs.binghamton.edu": true, - }, -} +var PowerClasses = make(map[string]map[string]struct{}) /* Margin with respect to the required power for a job. diff --git a/def/task.go b/def/task.go index 1b4af97..973021f 100644 --- a/def/task.go +++ b/def/task.go @@ -43,7 +43,7 @@ func TasksFromJSON(uri string) ([]Task, error) { func (tsk *Task) UpdateHost(newHost string) bool { // Validation isCorrectHost := false - for _, existingHost := range constants.Hosts { + for existingHost, _ := range constants.Hosts { if newHost == existingHost { isCorrectHost = true } diff --git a/pcp/logAndProgressiveExtrema.go b/pcp/logAndProgressiveExtrema.go index f708bef..4a2cf56 100644 --- a/pcp/logAndProgressiveExtrema.go +++ b/pcp/logAndProgressiveExtrema.go @@ -3,6 +3,7 @@ package pcp import ( "bitbucket.org/sunybingcloud/electron/constants" "bitbucket.org/sunybingcloud/electron/rapl" + "bitbucket.org/sunybingcloud/electron/utilities" "bufio" "container/ring" "log" @@ -14,7 +15,6 @@ import ( "strings" "syscall" "time" - "bitbucket.org/sunybingcloud/electron/utilities" ) func round(num float64) int { @@ -202,7 +202,7 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref // If cannot find any victim, then all nodes have been capped to the maximum and we stop capping at this point. } } - if (!canCapAlreadyCappedVictim) { + if !canCapAlreadyCappedVictim { log.Println("No Victim left to cap.") } } diff --git a/pcp/pcp.go b/pcp/pcp.go index edc60a9..1498317 100644 --- a/pcp/pcp.go +++ b/pcp/pcp.go @@ -37,15 +37,6 @@ func Start(quit chan struct{}, logging *bool, prefix string) { // Write to logfile logFile.WriteString(scanner.Text() + "\n") - /* - headers := strings.Split(scanner.Text(), ",") - - for _, hostMetric := range headers { - split := strings.Split(hostMetric, ":") - fmt.Printf("Host %s: Metric: %s\n", split[0], split[1]) - } - */ - // Throw away first set of results scanner.Scan() @@ -57,15 +48,7 @@ func Start(quit chan struct{}, logging *bool, prefix string) { logFile.WriteString(scanner.Text() + "\n") } - /* - fmt.Printf("Second: %d\n", seconds) - for i, val := range strings.Split(scanner.Text(), ",") { - fmt.Printf("host metric: %s val: %s\n", headers[i], val) - }*/ - seconds++ - - // fmt.Println("--------------------------------") } }(logging) diff --git a/powerCapping/proactiveclusterwidecappers.go b/powerCapping/proactiveclusterwidecappers.go index 3f10d6a..9088b29 100644 --- a/powerCapping/proactiveclusterwidecappers.go +++ b/powerCapping/proactiveclusterwidecappers.go @@ -110,7 +110,7 @@ func (capper ClusterwideCapper) CleverRecap(totalPower map[string]float64, wattsUsages := make(map[string][]float64) hostOfFinishedTask := "" indexOfFinishedTask := -1 - for _, host := range constants.Hosts { + for host, _ := range constants.Hosts { wattsUsages[host] = []float64{0.0} } for host, tasks := range taskMonitor { diff --git a/scheduler.go b/scheduler.go index b21b567..2128309 100644 --- a/scheduler.go +++ b/scheduler.go @@ -43,7 +43,7 @@ func main() { } if *hiThreshold < *loThreshold { - fmt.Println("High threshold is of a lower value than low threhold.") + fmt.Println("High threshold is of a lower value than low threshold.") os.Exit(1) } @@ -74,12 +74,12 @@ func main() { return } - //go pcp.Start(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix) + go pcp.Start(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix) //go pcp.StartPCPLogAndExtremaDynamicCap(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix, *hiThreshold, *loThreshold) - go pcp.StartPCPLogAndProgressiveExtremaCap(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix, *hiThreshold, *loThreshold) + //go pcp.StartPCPLogAndProgressiveExtremaCap(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix, *hiThreshold, *loThreshold) time.Sleep(1 * time.Second) // Take a second between starting PCP log and continuing - // Attempt to handle signint to not leave pmdumptext running + // Attempt to handle SIGINT to not leave pmdumptext running // Catch interrupt go func() { c := make(chan os.Signal, 1) @@ -120,4 +120,4 @@ func main() { log.Printf("Framework stopped with status %s and error: %s\n", status.String(), err.Error()) } log.Println("Exiting...") -} +} \ No newline at end of file diff --git a/schedulers/binPackSortedWattsSortedOffers.go b/schedulers/binPackSortedWattsSortedOffers.go index 6cc67bb..8d757fb 100644 --- a/schedulers/binPackSortedWattsSortedOffers.go +++ b/schedulers/binPackSortedWattsSortedOffers.go @@ -127,6 +127,7 @@ func (s *BinPackSortedWattsSortedOffers) ResourceOffers(driver sched.SchedulerDr log.Println("Sorted Offers:") for i := 0; i < len(offers); i++ { offer := offers[i] + offerUtils.UpdateEnvironment(offer) offerCPU, _, _ := offerUtils.OfferAgg(offer) log.Printf("Offer[%s].CPU = %f\n", offer.GetHostname(), offerCPU) } diff --git a/schedulers/binpackedpistoncapping.go b/schedulers/binpackedpistoncapping.go index 69764a9..a7ead66 100644 --- a/schedulers/binpackedpistoncapping.go +++ b/schedulers/binpackedpistoncapping.go @@ -210,6 +210,7 @@ func (s *BinPackedPistonCapper) ResourceOffers(driver sched.SchedulerDriver, off // retrieving the total power for each host in the offers for _, offer := range offers { + offerUtils.UpdateEnvironment(offer) if _, ok := s.totalPower[*offer.Hostname]; !ok { _, _, offerWatts := offerUtils.OfferAgg(offer) s.totalPower[*offer.Hostname] = offerWatts diff --git a/schedulers/binpacksortedwatts.go b/schedulers/binpacksortedwatts.go index 9ff1c80..f0c69fa 100644 --- a/schedulers/binpacksortedwatts.go +++ b/schedulers/binpacksortedwatts.go @@ -120,6 +120,7 @@ func (s *BinPackSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offers log.Printf("Received %d resource offers", len(offers)) for _, offer := range offers { + offerUtils.UpdateEnvironment(offer) select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") diff --git a/schedulers/bottomHeavy.go b/schedulers/bottomHeavy.go index e1b5567..3d8251e 100644 --- a/schedulers/bottomHeavy.go +++ b/schedulers/bottomHeavy.go @@ -18,12 +18,12 @@ import ( ) /* -Tasks are categorized into small and large tasks based on the watts requirement. -All the small tasks are packed into offers from agents belonging to power class C and power class D, using BinPacking. -All the large tasks are spread among the offers from agents belonging to power class A and power class B, using FirstFit. +Tasks are categorized into small and large tasks based on watts requirements. +All the large tasks are packed into offers from agents belonging to power classes A and B, using Bin-Packing. +All the small tasks are spread among offers from agents belonging to power class C and D, using First-Fit. -BinPacking has the most effect when co-scheduling of tasks is increased. Large tasks typically utilize more resources and hence, - co-scheduling them has a great impact on the total power utilization. +Bin-Packing has the most effect when co-scheduling of tasks is increased. Large tasks typically utilize more resources and hence, +co-scheduling them has a great impact on the total power utilization. */ func (s *BottomHeavy) takeOfferBinPack(offer *mesos.Offer, totalCPU, totalRAM, totalWatts, @@ -159,7 +159,7 @@ func (s *BottomHeavy) createTaskInfoAndLogSchedTrace(offer *mesos.Offer, task de return taskToSchedule } -// Using BinPacking to pack small tasks into this offer. +// Using BinPacking to pack large tasks into the given offers. func (s *BottomHeavy) pack(offers []*mesos.Offer, driver sched.SchedulerDriver) { for _, offer := range offers { select { @@ -221,7 +221,7 @@ func (s *BottomHeavy) pack(offers []*mesos.Offer, driver sched.SchedulerDriver) } } -// Using first fit to spread large tasks into these offers. +// Using First-Fit to spread small tasks among the given offers. func (s *BottomHeavy) spread(offers []*mesos.Offer, driver sched.SchedulerDriver) { for _, offer := range offers { select { @@ -282,6 +282,7 @@ func (s *BottomHeavy) ResourceOffers(driver sched.SchedulerDriver, offers []*mes offersLightPowerClasses := []*mesos.Offer{} for _, offer := range offers { + offerUtils.UpdateEnvironment(offer) select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") @@ -292,13 +293,19 @@ func (s *BottomHeavy) ResourceOffers(driver sched.SchedulerDriver, offers []*mes default: } - if constants.PowerClasses["A"][*offer.Hostname] || - constants.PowerClasses["B"][*offer.Hostname] { + if _, ok := constants.PowerClasses["A"][*offer.Hostname]; ok { offersHeavyPowerClasses = append(offersHeavyPowerClasses, offer) - } else if constants.PowerClasses["C"][*offer.Hostname] || - constants.PowerClasses["D"][*offer.Hostname] { + } + if _, ok := constants.PowerClasses["B"][*offer.Hostname]; ok { + offersHeavyPowerClasses = append(offersHeavyPowerClasses, offer) + } + if _, ok := constants.PowerClasses["C"][*offer.Hostname]; ok { offersLightPowerClasses = append(offersLightPowerClasses, offer) } + if _, ok := constants.PowerClasses["D"][*offer.Hostname]; ok { + offersLightPowerClasses = append(offersLightPowerClasses, offer) + } + } log.Println("Packing Large tasks into ClassAB offers:") diff --git a/schedulers/bpswMaxMin.go b/schedulers/bpswMaxMin.go index 656048f..60a80ee 100644 --- a/schedulers/bpswMaxMin.go +++ b/schedulers/bpswMaxMin.go @@ -164,6 +164,7 @@ func (s *BPSWMaxMinWatts) ResourceOffers(driver sched.SchedulerDriver, offers [] log.Printf("Received %d resource offers", len(offers)) for _, offer := range offers { + offerUtils.UpdateEnvironment(offer) select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") diff --git a/schedulers/bpswMaxMinPistonCapping.go b/schedulers/bpswMaxMinPistonCapping.go index 28964fd..53a7200 100644 --- a/schedulers/bpswMaxMinPistonCapping.go +++ b/schedulers/bpswMaxMinPistonCapping.go @@ -261,6 +261,7 @@ func (s *BPSWMaxMinPistonCapping) ResourceOffers(driver sched.SchedulerDriver, o log.Printf("Received %d resource offers", len(offers)) for _, offer := range offers { + offerUtils.UpdateEnvironment(offer) select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") diff --git a/schedulers/bpswMaxMinProacCC.go b/schedulers/bpswMaxMinProacCC.go index 8f05596..a0ac947 100644 --- a/schedulers/bpswMaxMinProacCC.go +++ b/schedulers/bpswMaxMinProacCC.go @@ -164,7 +164,7 @@ func (s *BPSWMaxMinProacCC) startCapping() { // updating cap value bpMaxMinProacCCCapValue = bpMaxMinProacCCNewCapValue if bpMaxMinProacCCCapValue > 0.0 { - for _, host := range constants.Hosts { + for host, _ := range constants.Hosts { // Rounding cap value to nearest int if err := rapl.Cap(host, "rapl", float64(int(math.Floor(bpMaxMinProacCCCapValue+0.5)))); err != nil { log.Println(err) @@ -190,7 +190,7 @@ func (s *BPSWMaxMinProacCC) startRecapping() { bpMaxMinProacCCMutex.Lock() // If stopped performing cluster-wide capping, then we need to recap. if s.isRecapping && bpMaxMinProacCCRecapValue > 0.0 { - for _, host := range constants.Hosts { + for host, _ := range constants.Hosts { // Rounding the recap value to the nearest int if err := rapl.Cap(host, "rapl", float64(int(math.Floor(bpMaxMinProacCCRecapValue+0.5)))); err != nil { log.Println(err) @@ -300,6 +300,7 @@ func (s *BPSWMaxMinProacCC) ResourceOffers(driver sched.SchedulerDriver, offers // retrieving the available power for all the hosts in the offers. for _, offer := range offers { + offerUtils.UpdateEnvironment(offer) _, _, offerWatts := offerUtils.OfferAgg(offer) s.availablePower[*offer.Hostname] = offerWatts // setting total power if the first time diff --git a/schedulers/firstfit.go b/schedulers/firstfit.go index 864a208..db59e24 100644 --- a/schedulers/firstfit.go +++ b/schedulers/firstfit.go @@ -119,6 +119,7 @@ func (s *FirstFit) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos. log.Printf("Received %d resource offers", len(offers)) for _, offer := range offers { + offerUtils.UpdateEnvironment(offer) select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") diff --git a/schedulers/firstfitProacCC.go b/schedulers/firstfitProacCC.go index 01c163e..51a466e 100644 --- a/schedulers/firstfitProacCC.go +++ b/schedulers/firstfitProacCC.go @@ -164,7 +164,7 @@ func (s *FirstFitProacCC) startCapping() { // Need to cap the cluster to the fcfsCurrentCapValue. fcfsMutex.Lock() if fcfsCurrentCapValue > 0.0 { - for _, host := range constants.Hosts { + for host, _ := range constants.Hosts { // Rounding curreCapValue to the nearest int. if err := rapl.Cap(host, "rapl", float64(int(math.Floor(fcfsCurrentCapValue+0.5)))); err != nil { log.Println(err) @@ -188,7 +188,7 @@ func (s *FirstFitProacCC) startRecapping() { fcfsMutex.Lock() // If stopped performing cluster wide capping then we need to explicitly cap the entire cluster. if s.isRecapping && fcfsRecapValue > 0.0 { - for _, host := range constants.Hosts { + for host, _ := range constants.Hosts { // Rounding curreCapValue to the nearest int. if err := rapl.Cap(host, "rapl", float64(int(math.Floor(fcfsRecapValue+0.5)))); err != nil { log.Println(err) @@ -233,6 +233,7 @@ func (s *FirstFitProacCC) ResourceOffers(driver sched.SchedulerDriver, offers [] // retrieving the available power for all the hosts in the offers. for _, offer := range offers { + offerUtils.UpdateEnvironment(offer) _, _, offer_watts := offerUtils.OfferAgg(offer) s.availablePower[*offer.Hostname] = offer_watts // setting total power if the first time. diff --git a/schedulers/firstfitSortedOffers.go b/schedulers/firstfitSortedOffers.go index 48a9ec1..d3fdb5f 100644 --- a/schedulers/firstfitSortedOffers.go +++ b/schedulers/firstfitSortedOffers.go @@ -126,6 +126,7 @@ func (s *FirstFitSortedOffers) ResourceOffers(driver sched.SchedulerDriver, offe log.Println("Sorted Offers:") for i := 0; i < len(offers); i++ { offer := offers[i] + offerUtils.UpdateEnvironment(offer) offerCPU, _, _ := offerUtils.OfferAgg(offer) log.Printf("Offer[%s].CPU = %f\n", offer.GetHostname(), offerCPU) } diff --git a/schedulers/firstfitSortedWattsProacCC.go b/schedulers/firstfitSortedWattsProacCC.go index 1792cfd..2610084 100644 --- a/schedulers/firstfitSortedWattsProacCC.go +++ b/schedulers/firstfitSortedWattsProacCC.go @@ -177,7 +177,7 @@ func (s *FirstFitSortedWattsProacCC) startCapping() { // Need to cap the cluster to the rankedCurrentCapValue. rankedMutex.Lock() if rankedCurrentCapValue > 0.0 { - for _, host := range constants.Hosts { + for host, _ := range constants.Hosts { // Rounding currentCapValue to the nearest int. if err := rapl.Cap(host, "rapl", float64(int(math.Floor(rankedCurrentCapValue+0.5)))); err != nil { log.Println(err) @@ -201,7 +201,7 @@ func (s *FirstFitSortedWattsProacCC) startRecapping() { rankedMutex.Lock() // If stopped performing cluster wide capping then we need to explicitly cap the entire cluster. if s.isRecapping && rankedRecapValue > 0.0 { - for _, host := range constants.Hosts { + for host, _ := range constants.Hosts { // Rounding currentCapValue to the nearest int. if err := rapl.Cap(host, "rapl", float64(int(math.Floor(rankedRecapValue+0.5)))); err != nil { log.Println(err) @@ -246,6 +246,7 @@ func (s *FirstFitSortedWattsProacCC) ResourceOffers(driver sched.SchedulerDriver // retrieving the available power for all the hosts in the offers. for _, offer := range offers { + offerUtils.UpdateEnvironment(offer) _, _, offer_watts := offerUtils.OfferAgg(offer) s.availablePower[*offer.Hostname] = offer_watts // setting total power if the first time. diff --git a/schedulers/firstfitSortedWattsSortedOffers.go b/schedulers/firstfitSortedWattsSortedOffers.go index d473a09..b8b1eef 100644 --- a/schedulers/firstfitSortedWattsSortedOffers.go +++ b/schedulers/firstfitSortedWattsSortedOffers.go @@ -128,6 +128,7 @@ func (s *FirstFitSortedWattsSortedOffers) ResourceOffers(driver sched.SchedulerD log.Println("Sorted Offers:") for i := 0; i < len(offers); i++ { offer := offers[i] + offerUtils.UpdateEnvironment(offer) offerCPU, _, _ := offerUtils.OfferAgg(offer) log.Printf("Offer[%s].CPU = %f\n", offer.GetHostname(), offerCPU) } diff --git a/schedulers/firstfitsortedwatts.go b/schedulers/firstfitsortedwatts.go index dfb7bed..1f71411 100644 --- a/schedulers/firstfitsortedwatts.go +++ b/schedulers/firstfitsortedwatts.go @@ -122,6 +122,7 @@ func (s *FirstFitSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offer log.Printf("Received %d resource offers", len(offers)) for _, offer := range offers { + offerUtils.UpdateEnvironment(offer) select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") diff --git a/schedulers/firstfitwattsonly.go b/schedulers/firstfitwattsonly.go index e2225ff..d2b13b6 100644 --- a/schedulers/firstfitwattsonly.go +++ b/schedulers/firstfitwattsonly.go @@ -112,6 +112,7 @@ func (s *FirstFitWattsOnly) ResourceOffers(driver sched.SchedulerDriver, offers log.Printf("Received %d resource offers", len(offers)) for _, offer := range offers { + offerUtils.UpdateEnvironment(offer) select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") diff --git a/schedulers/helpers.go b/schedulers/helpers.go index e6ba7fb..1e39c20 100644 --- a/schedulers/helpers.go +++ b/schedulers/helpers.go @@ -18,7 +18,7 @@ func coLocated(tasks map[string]bool) { // Get the powerClass of the given hostname func hostToPowerClass(hostName string) string { for powerClass, hosts := range constants.PowerClasses { - if ok := hosts[hostName]; ok { + if _, ok := hosts[hostName]; ok { return powerClass } } diff --git a/schedulers/topHeavy.go b/schedulers/topHeavy.go index 63c11e0..1b62336 100644 --- a/schedulers/topHeavy.go +++ b/schedulers/topHeavy.go @@ -19,8 +19,8 @@ import ( /* Tasks are categorized into small and large tasks based on the watts requirement. -All the large tasks are packed into offers from agents belonging to power class A and power class B, using BinPacking. -All the small tasks are spread among the offers from agents belonging to power class C and power class D, using FirstFit. +All the small tasks are packed into offers from agents belonging to power class C and power class D, using BinPacking. +All the large tasks are spread among the offers from agents belonging to power class A and power class B, using FirstFit. This was done to give a little more room for the large tasks (power intensive) for execution and reduce the possibility of starvation of power intensive tasks. @@ -281,6 +281,7 @@ func (s *TopHeavy) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos. offersLightPowerClasses := []*mesos.Offer{} for _, offer := range offers { + offerUtils.UpdateEnvironment(offer) select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") @@ -291,11 +292,16 @@ func (s *TopHeavy) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos. default: } - if constants.PowerClasses["A"][*offer.Hostname] || - constants.PowerClasses["B"][*offer.Hostname] { + if _, ok := constants.PowerClasses["A"][*offer.Hostname]; ok { offersHeavyPowerClasses = append(offersHeavyPowerClasses, offer) - } else if constants.PowerClasses["C"][*offer.Hostname] || - constants.PowerClasses["D"][*offer.Hostname] { + } + if _, ok := constants.PowerClasses["B"][*offer.Hostname]; ok { + offersHeavyPowerClasses = append(offersHeavyPowerClasses, offer) + } + if _, ok := constants.PowerClasses["C"][*offer.Hostname]; ok { + offersLightPowerClasses = append(offersLightPowerClasses, offer) + } + if _, ok := constants.PowerClasses["D"][*offer.Hostname]; ok { offersLightPowerClasses = append(offersLightPowerClasses, offer) } } diff --git a/utilities/offerUtils/offerUtils.go b/utilities/offerUtils/offerUtils.go index 6f5dc81..f90480e 100644 --- a/utilities/offerUtils/offerUtils.go +++ b/utilities/offerUtils/offerUtils.go @@ -1,7 +1,10 @@ package offerUtils import ( + "bitbucket.org/sunybingcloud/electron-archive/utilities/offerUtils" + "bitbucket.org/sunybingcloud/electron/constants" mesos "github.com/mesos/mesos-go/mesosproto" + "log" "strings" ) @@ -60,3 +63,26 @@ func HostMismatch(offerHost string, taskHost string) bool { } return false } + +// If the host in the offer is a new host, add the host to the set of Hosts and +// register the powerclass of this host. +func UpdateEnvironment(offer *mesos.Offer) { + var host = offer.GetHostname() + // If this host is not present in the set of hosts. + if _, ok := constants.Hosts[host]; !ok { + log.Printf("New host detected. Adding host [%s]", host) + // Add this host. + constants.Hosts[host] = struct{}{} + // Get the power class of this host. + class := offerUtils.PowerClass(offer) + log.Printf("Registering the power class... Host [%s] --> PowerClass [%s]", host, class) + // If new power class, register the power class. + if _, ok := constants.PowerClasses[class]; !ok { + constants.PowerClasses[class] = make(map[string]struct{}) + } + // If the host of this class is not yet present in PowerClasses[class], add it. + if _, ok := constants.PowerClasses[class][host]; !ok { + constants.PowerClasses[class][host] = struct{}{} + } + } +}