From aed4fd1073d74073737ee5553419b1696bf96065 Mon Sep 17 00:00:00 2001 From: Abhishek Jain Date: Thu, 23 Mar 2017 22:16:05 -0400 Subject: [PATCH] Retrofitted all schedulers to now pick the hostname from the offer and add it to constants.Hosts --- schedulers/binPackSortedWattsSortedOffers.go | 5 +++++ schedulers/binpackedpistoncapping.go | 4 ++++ schedulers/binpacksortedwatts.go | 5 +++++ schedulers/bottomHeavy.go | 4 ++++ schedulers/bpswMaxMin.go | 5 +++++ schedulers/bpswMaxMinPistonCapping.go | 4 ++++ schedulers/bpswMaxMinProacCC.go | 8 ++++++-- schedulers/firstfit.go | 5 +++++ schedulers/firstfitProacCC.go | 8 ++++++-- schedulers/firstfitSortedOffers.go | 5 +++++ schedulers/firstfitSortedWattsProacCC.go | 8 ++++++-- schedulers/firstfitSortedWattsSortedOffers.go | 5 +++++ schedulers/firstfitsortedwatts.go | 5 +++++ schedulers/firstfitwattsonly.go | 5 +++++ 14 files changed, 70 insertions(+), 6 deletions(-) diff --git a/schedulers/binPackSortedWattsSortedOffers.go b/schedulers/binPackSortedWattsSortedOffers.go index 6cc67bb..93e564a 100644 --- a/schedulers/binPackSortedWattsSortedOffers.go +++ b/schedulers/binPackSortedWattsSortedOffers.go @@ -1,6 +1,7 @@ package schedulers import ( + "bitbucket.org/sunybingcloud/electron/constants" "bitbucket.org/sunybingcloud/electron/def" "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" @@ -127,6 +128,10 @@ func (s *BinPackSortedWattsSortedOffers) ResourceOffers(driver sched.SchedulerDr log.Println("Sorted Offers:") for i := 0; i < len(offers); i++ { offer := offers[i] + if _, ok := constants.Hosts[offer.GetHostname()]; !ok { + log.Printf("New host found. Adding host [%s]", offer.GetHostname()) + constants.Hosts[offer.GetHostname()] = struct{}{} + } offerCPU, _, _ := offerUtils.OfferAgg(offer) log.Printf("Offer[%s].CPU = %f\n", offer.GetHostname(), offerCPU) } diff --git a/schedulers/binpackedpistoncapping.go b/schedulers/binpackedpistoncapping.go index 69764a9..3022d6c 100644 --- a/schedulers/binpackedpistoncapping.go +++ b/schedulers/binpackedpistoncapping.go @@ -210,6 +210,10 @@ func (s *BinPackedPistonCapper) ResourceOffers(driver sched.SchedulerDriver, off // retrieving the total power for each host in the offers for _, offer := range offers { + if _, ok := constants.Hosts[offer.GetHostname()]; !ok { + log.Printf("New host found. Adding host [%s]", offer.GetHostname()) + constants.Hosts[offer.GetHostname()] = struct{}{} + } if _, ok := s.totalPower[*offer.Hostname]; !ok { _, _, offerWatts := offerUtils.OfferAgg(offer) s.totalPower[*offer.Hostname] = offerWatts diff --git a/schedulers/binpacksortedwatts.go b/schedulers/binpacksortedwatts.go index 9ff1c80..45d2488 100644 --- a/schedulers/binpacksortedwatts.go +++ b/schedulers/binpacksortedwatts.go @@ -1,6 +1,7 @@ package schedulers import ( + "bitbucket.org/sunybingcloud/electron/constants" "bitbucket.org/sunybingcloud/electron/def" "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" @@ -120,6 +121,10 @@ func (s *BinPackSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offers log.Printf("Received %d resource offers", len(offers)) for _, offer := range offers { + if _, ok := constants.Hosts[offer.GetHostname()]; !ok { + log.Printf("New host found. Adding host [%s]", offer.GetHostname()) + constants.Hosts[offer.GetHostname()] = struct{}{} + } select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") diff --git a/schedulers/bottomHeavy.go b/schedulers/bottomHeavy.go index e1b5567..eef8bd6 100644 --- a/schedulers/bottomHeavy.go +++ b/schedulers/bottomHeavy.go @@ -282,6 +282,10 @@ func (s *BottomHeavy) ResourceOffers(driver sched.SchedulerDriver, offers []*mes offersLightPowerClasses := []*mesos.Offer{} for _, offer := range offers { + if _, ok := constants.Hosts[offer.GetHostname()]; !ok { + log.Printf("New host found. Adding host [%s]", offer.GetHostname()) + constants.Hosts[offer.GetHostname()] = struct{}{} + } select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") diff --git a/schedulers/bpswMaxMin.go b/schedulers/bpswMaxMin.go index 656048f..940439a 100644 --- a/schedulers/bpswMaxMin.go +++ b/schedulers/bpswMaxMin.go @@ -1,6 +1,7 @@ package schedulers import ( + "bitbucket.org/sunybingcloud/electron/constants" "bitbucket.org/sunybingcloud/electron/def" "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" @@ -164,6 +165,10 @@ func (s *BPSWMaxMinWatts) ResourceOffers(driver sched.SchedulerDriver, offers [] log.Printf("Received %d resource offers", len(offers)) for _, offer := range offers { + if _, ok := constants.Hosts[offer.GetHostname()]; !ok { + log.Printf("New host found. Adding host [%s]", offer.GetHostname()) + constants.Hosts[offer.GetHostname()] = struct{}{} + } select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") diff --git a/schedulers/bpswMaxMinPistonCapping.go b/schedulers/bpswMaxMinPistonCapping.go index 28964fd..4031e88 100644 --- a/schedulers/bpswMaxMinPistonCapping.go +++ b/schedulers/bpswMaxMinPistonCapping.go @@ -261,6 +261,10 @@ func (s *BPSWMaxMinPistonCapping) ResourceOffers(driver sched.SchedulerDriver, o log.Printf("Received %d resource offers", len(offers)) for _, offer := range offers { + if _, ok := constants.Hosts[offer.GetHostname()]; !ok { + log.Printf("New host found. Adding host [%s]", offer.GetHostname()) + constants.Hosts[offer.GetHostname()] = struct{}{} + } select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") diff --git a/schedulers/bpswMaxMinProacCC.go b/schedulers/bpswMaxMinProacCC.go index 8f05596..d80f05f 100644 --- a/schedulers/bpswMaxMinProacCC.go +++ b/schedulers/bpswMaxMinProacCC.go @@ -164,7 +164,7 @@ func (s *BPSWMaxMinProacCC) startCapping() { // updating cap value bpMaxMinProacCCCapValue = bpMaxMinProacCCNewCapValue if bpMaxMinProacCCCapValue > 0.0 { - for _, host := range constants.Hosts { + for host, _ := range constants.Hosts { // Rounding cap value to nearest int if err := rapl.Cap(host, "rapl", float64(int(math.Floor(bpMaxMinProacCCCapValue+0.5)))); err != nil { log.Println(err) @@ -190,7 +190,7 @@ func (s *BPSWMaxMinProacCC) startRecapping() { bpMaxMinProacCCMutex.Lock() // If stopped performing cluster-wide capping, then we need to recap. if s.isRecapping && bpMaxMinProacCCRecapValue > 0.0 { - for _, host := range constants.Hosts { + for host, _ := range constants.Hosts { // Rounding the recap value to the nearest int if err := rapl.Cap(host, "rapl", float64(int(math.Floor(bpMaxMinProacCCRecapValue+0.5)))); err != nil { log.Println(err) @@ -300,6 +300,10 @@ func (s *BPSWMaxMinProacCC) ResourceOffers(driver sched.SchedulerDriver, offers // retrieving the available power for all the hosts in the offers. for _, offer := range offers { + if _, ok := constants.Hosts[offer.GetHostname()]; !ok { + log.Printf("New host found. Adding host [%s]", offer.GetHostname()) + constants.Hosts[offer.GetHostname()] = struct{}{} + } _, _, offerWatts := offerUtils.OfferAgg(offer) s.availablePower[*offer.Hostname] = offerWatts // setting total power if the first time diff --git a/schedulers/firstfit.go b/schedulers/firstfit.go index 864a208..e07839c 100644 --- a/schedulers/firstfit.go +++ b/schedulers/firstfit.go @@ -1,6 +1,7 @@ package schedulers import ( + "bitbucket.org/sunybingcloud/electron/constants" "bitbucket.org/sunybingcloud/electron/def" "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" @@ -119,6 +120,10 @@ func (s *FirstFit) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos. log.Printf("Received %d resource offers", len(offers)) for _, offer := range offers { + if _, ok := constants.Hosts[offer.GetHostname()]; !ok { + log.Printf("New host found. Adding host [%s]", offer.GetHostname()) + constants.Hosts[offer.GetHostname()] = struct{}{} + } select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") diff --git a/schedulers/firstfitProacCC.go b/schedulers/firstfitProacCC.go index 01c163e..51011e5 100644 --- a/schedulers/firstfitProacCC.go +++ b/schedulers/firstfitProacCC.go @@ -164,7 +164,7 @@ func (s *FirstFitProacCC) startCapping() { // Need to cap the cluster to the fcfsCurrentCapValue. fcfsMutex.Lock() if fcfsCurrentCapValue > 0.0 { - for _, host := range constants.Hosts { + for host, _ := range constants.Hosts { // Rounding curreCapValue to the nearest int. if err := rapl.Cap(host, "rapl", float64(int(math.Floor(fcfsCurrentCapValue+0.5)))); err != nil { log.Println(err) @@ -188,7 +188,7 @@ func (s *FirstFitProacCC) startRecapping() { fcfsMutex.Lock() // If stopped performing cluster wide capping then we need to explicitly cap the entire cluster. if s.isRecapping && fcfsRecapValue > 0.0 { - for _, host := range constants.Hosts { + for host, _ := range constants.Hosts { // Rounding curreCapValue to the nearest int. if err := rapl.Cap(host, "rapl", float64(int(math.Floor(fcfsRecapValue+0.5)))); err != nil { log.Println(err) @@ -233,6 +233,10 @@ func (s *FirstFitProacCC) ResourceOffers(driver sched.SchedulerDriver, offers [] // retrieving the available power for all the hosts in the offers. for _, offer := range offers { + if _, ok := constants.Hosts[offer.GetHostname()]; !ok { + log.Printf("New host found. Adding host [%s]", offer.GetHostname()) + constants.Hosts[offer.GetHostname()] = struct{}{} + } _, _, offer_watts := offerUtils.OfferAgg(offer) s.availablePower[*offer.Hostname] = offer_watts // setting total power if the first time. diff --git a/schedulers/firstfitSortedOffers.go b/schedulers/firstfitSortedOffers.go index 48a9ec1..a9ab1d8 100644 --- a/schedulers/firstfitSortedOffers.go +++ b/schedulers/firstfitSortedOffers.go @@ -1,6 +1,7 @@ package schedulers import ( + "bitbucket.org/sunybingcloud/electron/constants" "bitbucket.org/sunybingcloud/electron/def" "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" @@ -126,6 +127,10 @@ func (s *FirstFitSortedOffers) ResourceOffers(driver sched.SchedulerDriver, offe log.Println("Sorted Offers:") for i := 0; i < len(offers); i++ { offer := offers[i] + if _, ok := constants.Hosts[offer.GetHostname()]; !ok { + log.Printf("New host found. Adding host [%s]", offer.GetHostname()) + constants.Hosts[offer.GetHostname()] = struct{}{} + } offerCPU, _, _ := offerUtils.OfferAgg(offer) log.Printf("Offer[%s].CPU = %f\n", offer.GetHostname(), offerCPU) } diff --git a/schedulers/firstfitSortedWattsProacCC.go b/schedulers/firstfitSortedWattsProacCC.go index 1792cfd..4173ae6 100644 --- a/schedulers/firstfitSortedWattsProacCC.go +++ b/schedulers/firstfitSortedWattsProacCC.go @@ -177,7 +177,7 @@ func (s *FirstFitSortedWattsProacCC) startCapping() { // Need to cap the cluster to the rankedCurrentCapValue. rankedMutex.Lock() if rankedCurrentCapValue > 0.0 { - for _, host := range constants.Hosts { + for host, _ := range constants.Hosts { // Rounding currentCapValue to the nearest int. if err := rapl.Cap(host, "rapl", float64(int(math.Floor(rankedCurrentCapValue+0.5)))); err != nil { log.Println(err) @@ -201,7 +201,7 @@ func (s *FirstFitSortedWattsProacCC) startRecapping() { rankedMutex.Lock() // If stopped performing cluster wide capping then we need to explicitly cap the entire cluster. if s.isRecapping && rankedRecapValue > 0.0 { - for _, host := range constants.Hosts { + for host, _ := range constants.Hosts { // Rounding currentCapValue to the nearest int. if err := rapl.Cap(host, "rapl", float64(int(math.Floor(rankedRecapValue+0.5)))); err != nil { log.Println(err) @@ -246,6 +246,10 @@ func (s *FirstFitSortedWattsProacCC) ResourceOffers(driver sched.SchedulerDriver // retrieving the available power for all the hosts in the offers. for _, offer := range offers { + if _, ok := constants.Hosts[offer.GetHostname()]; !ok { + log.Printf("New host found. Adding host [%s]", offer.GetHostname()) + constants.Hosts[offer.GetHostname()] = struct{}{} + } _, _, offer_watts := offerUtils.OfferAgg(offer) s.availablePower[*offer.Hostname] = offer_watts // setting total power if the first time. diff --git a/schedulers/firstfitSortedWattsSortedOffers.go b/schedulers/firstfitSortedWattsSortedOffers.go index d473a09..5745ec6 100644 --- a/schedulers/firstfitSortedWattsSortedOffers.go +++ b/schedulers/firstfitSortedWattsSortedOffers.go @@ -1,6 +1,7 @@ package schedulers import ( + "bitbucket.org/sunybingcloud/electron/constants" "bitbucket.org/sunybingcloud/electron/def" "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" @@ -128,6 +129,10 @@ func (s *FirstFitSortedWattsSortedOffers) ResourceOffers(driver sched.SchedulerD log.Println("Sorted Offers:") for i := 0; i < len(offers); i++ { offer := offers[i] + if _, ok := constants.Hosts[offer.GetHostname()]; !ok { + log.Printf("New host found. Adding host [%s]", offer.GetHostname()) + constants.Hosts[offer.GetHostname()] = struct{}{} + } offerCPU, _, _ := offerUtils.OfferAgg(offer) log.Printf("Offer[%s].CPU = %f\n", offer.GetHostname(), offerCPU) } diff --git a/schedulers/firstfitsortedwatts.go b/schedulers/firstfitsortedwatts.go index dfb7bed..f19bec0 100644 --- a/schedulers/firstfitsortedwatts.go +++ b/schedulers/firstfitsortedwatts.go @@ -1,6 +1,7 @@ package schedulers import ( + "bitbucket.org/sunybingcloud/electron/constants" "bitbucket.org/sunybingcloud/electron/def" "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" @@ -122,6 +123,10 @@ func (s *FirstFitSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offer log.Printf("Received %d resource offers", len(offers)) for _, offer := range offers { + if _, ok := constants.Hosts[offer.GetHostname()]; !ok { + log.Printf("New host found. Adding host [%s]", offer.GetHostname()) + constants.Hosts[offer.GetHostname()] = struct{}{} + } select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") diff --git a/schedulers/firstfitwattsonly.go b/schedulers/firstfitwattsonly.go index e2225ff..f7833b1 100644 --- a/schedulers/firstfitwattsonly.go +++ b/schedulers/firstfitwattsonly.go @@ -1,6 +1,7 @@ package schedulers import ( + "bitbucket.org/sunybingcloud/electron/constants" "bitbucket.org/sunybingcloud/electron/def" "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" @@ -112,6 +113,10 @@ func (s *FirstFitWattsOnly) ResourceOffers(driver sched.SchedulerDriver, offers log.Printf("Received %d resource offers", len(offers)) for _, offer := range offers { + if _, ok := constants.Hosts[offer.GetHostname()]; !ok { + log.Printf("New host found. Adding host [%s]", offer.GetHostname()) + constants.Hosts[offer.GetHostname()] = struct{}{} + } select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")