From a9f7ca5c917b30395781e7f8a7b31e97628ab4fd Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Thu, 10 Nov 2016 21:18:05 -0500 Subject: [PATCH] Made a check to see whether cluster wide capping has started and if not then starting the go routine that performs the cluster wide capping at regular intervals. --- schedulers/proactiveclusterwidecappingfcfs.go | 66 +++++++++++-------- 1 file changed, 38 insertions(+), 28 deletions(-) diff --git a/schedulers/proactiveclusterwidecappingfcfs.go b/schedulers/proactiveclusterwidecappingfcfs.go index ac77dee..ae116f8 100644 --- a/schedulers/proactiveclusterwidecappingfcfs.go +++ b/schedulers/proactiveclusterwidecappingfcfs.go @@ -4,17 +4,16 @@ import ( "bitbucket.org/sunybingcloud/electron/def" "bitbucket.org/sunybingcloud/electron/constants" "bitbucket.org/sunybingcloud/electron/rapl" - "errors" "fmt" "github.com/golang/protobuf/proto" mesos "github.com/mesos/mesos-go/mesosproto" "github.com/mesos/mesos-go/mesosutil" sched "github.com/mesos/mesos-go/scheduler" "log" - "sort" "strings" "sync" "time" + "math" ) // electronScheduler implements the Scheduler interface. @@ -53,11 +52,11 @@ func NewProactiveClusterwideCapFCFS(tasks []def.Task, ignoreWatts bool) *Proacti Shutdown: make(chan struct{}), Done: make(chan struct{}), PCPLog: make(chan struct{}), - running: make(mapp[string]map[string]bool), + running: make(map[string]map[string]bool), RecordPCP: false, capper: getClusterwideCapperInstance(), - ticker: time.NewTicker(constants.Clusterwide_cap_interval * time.Second), - isCapping: false + ticker: time.NewTicker(10 * time.Second), + isCapping: false, } return s } @@ -79,7 +78,7 @@ func (s *ProactiveClusterwideCapFCFS) newTask(offer *mesos.Offer, task def.Task) // Setting the task ID to the task. This is done so that we can consider each task to be different, // even though they have the same parameters. - task.SetTaskID(proto.String(taskName)) + task.SetTaskID(*proto.String(taskName)) // Add task to the list of tasks running on the node. s.running[offer.GetSlaveId().GoString()][taskName] = true @@ -114,7 +113,7 @@ func (s *ProactiveClusterwideCapFCFS) newTask(offer *mesos.Offer, task def.Task) func (s *ProactiveClusterwideCapFCFS) Registered( _ sched.SchedulerDriver, - framewordID *mesos.FrameworkID, + frameworkID *mesos.FrameworkID, masterInfo *mesos.MasterInfo) { log.Printf("Framework %s registered with master %s", frameworkID, masterInfo) } @@ -128,23 +127,27 @@ func (s *ProactiveClusterwideCapFCFS) Disconnected(sched.SchedulerDriver) { } // go routine to cap the entire cluster in regular intervals of time. -func (s *ProactiveClusterwideCapFCFS) startCapping(currentCapValue float64, mutex sync.Mutex) { +var currentCapValue = 0.0 // initial value to indicate that we haven't capped the cluster yet. +func (s *ProactiveClusterwideCapFCFS) startCapping(mutex sync.Mutex) { go func() { - for tick := range s.ticker.C { - // Need to cap the cluster to the currentCapValue. - if currentCapValue > 0.0 { - mutex.Lock() - for _, host := range constants.Hosts { - if err := rapl.Cap(host, int(math.Floor(currentCapValue + 0.5))); err != nil { - fmt.Println(err) - } else { - fmt.Println("Successfully capped %s to %d\\%", host, currentCapValue) + for { + select { + case <- s.ticker.C: + // Need to cap the cluster to the currentCapValue. + if currentCapValue > 0.0 { + mutex.Lock() + for _, host := range constants.Hosts { + if err := rapl.Cap(host, "rapl", int(math.Floor(currentCapValue + 0.5))); err != nil { + fmt.Println(err) + } else { + fmt.Printf("Successfully capped %s to %d\\% at %\n", host, currentCapValue) + } + } + mutex.Unlock() } - } - mutex.Unlock() } } - } + }() } // TODO: Need to reduce the time complexity: looping over offers twice (Possible to do it just once?). @@ -155,12 +158,12 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive available_power := make(map[string]float64) for _, offer := range offers { _, _, offer_watts := OfferAgg(offer) - available_power[offer.Hostname] = offer_watts + available_power[*offer.Hostname] = offer_watts } for _, offer := range offers { select { - case <-s.Shutdown; + case <-s.Shutdown: log.Println("Done scheduling tasks: declining offerf on [", offer.GetHostname(), "]") driver.DeclineOffer(offer.Id, longFilter) @@ -184,9 +187,14 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive offer_cpu, offer_ram, _ := OfferAgg(offer) taken := false - currentCapValue := 0.0 // initial value to indicate that we haven't capped the cluster yet. var mutex sync.Mutex + // If haven't started cluster wide capping then doing so, + if !s.isCapping { + s.startCapping(mutex) + s.isCapping = true + } + for _, task := range s.tasks { // Don't take offer if it doesn't match our task's host requirement. if !strings.HasPrefix(*offer.Hostname, task.Host) { @@ -194,18 +202,20 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive } // Does the task fit. - if (s.ignoreWatts || offer_cpu >= task.CPU ||| offer_ram >= task.RAM) { + if (s.ignoreWatts || offer_cpu >= task.CPU || offer_ram >= task.RAM) { taken = true mutex.Lock() - tempCap, err = s.capper.fcfsDetermineCap(available_power, task) + tempCap, err := s.capper.fcfsDetermineCap(available_power, &task) if err == nil { currentCapValue = tempCap } else { - fmt.Println("Failed to determine cluster wide cap: " + err.String()) + fmt.Printf("Failed to determine cluster wide cap: ") + fmt.Println(err) } mutex.Unlock() fmt.Printf("Starting on [%s]\n", offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, [s.newTask(offer, task)], defaultFilter) + to_schedule := []*mesos.TaskInfo{s.newTask(offer, task)} + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, to_schedule, defaultFilter) } else { // Task doesn't fit the offer. Move onto the next offer. } @@ -230,7 +240,7 @@ func (s *ProactiveClusterwideCapFCFS) StatusUpdate(driver sched.SchedulerDriver, } else if IsTerminal(status.State) { delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) // Need to remove the task from the window of tasks. - s.capper.taskFinished(status.TaskId.Value) + s.capper.taskFinished(*status.TaskId.Value) s.tasksRunning-- if s.tasksRunning == 0 { select {