diff --git a/schedulers/proactiveclusterwidecappingfcfs.go b/schedulers/proactiveclusterwidecappingfcfs.go index 59c3ac5..c352cb1 100644 --- a/schedulers/proactiveclusterwidecappingfcfs.go +++ b/schedulers/proactiveclusterwidecappingfcfs.go @@ -82,7 +82,7 @@ func NewProactiveClusterwideCapFCFS(tasks []def.Task, ignoreWatts bool) *Proacti } // mutex -var mutex sync.Mutex +var fcfsMutex sync.Mutex func (s *ProactiveClusterwideCapFCFS) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) @@ -154,58 +154,58 @@ func (s *ProactiveClusterwideCapFCFS) Disconnected(sched.SchedulerDriver) { // Need to stop the capping process. s.ticker.Stop() s.recapTicker.Stop() - mutex.Lock() + fcfsMutex.Lock() s.isCapping = false - mutex.Unlock() + fcfsMutex.Unlock() log.Println("Framework disconnected with master") } // go routine to cap the entire cluster in regular intervals of time. -var currentCapValue = 0.0 // initial value to indicate that we haven't capped the cluster yet. +var fcfsCurrentCapValue = 0.0 // initial value to indicate that we haven't capped the cluster yet. func (s *ProactiveClusterwideCapFCFS) startCapping() { go func() { for { select { case <-s.ticker.C: - // Need to cap the cluster to the currentCapValue. - mutex.Lock() - if currentCapValue > 0.0 { + // Need to cap the cluster to the fcfsCurrentCapValue. + fcfsMutex.Lock() + if fcfsCurrentCapValue > 0.0 { for _, host := range constants.Hosts { // Rounding curreCapValue to the nearest int. - if err := rapl.Cap(host, "rapl", int(math.Floor(currentCapValue+0.5))); err != nil { + if err := rapl.Cap(host, "rapl", int(math.Floor(fcfsCurrentCapValue+0.5))); err != nil { log.Println(err) } } - log.Printf("Capped the cluster to %d", int(math.Floor(currentCapValue+0.5))) + log.Printf("Capped the cluster to %d", int(math.Floor(fcfsCurrentCapValue+0.5))) } - mutex.Unlock() + fcfsMutex.Unlock() } } }() } // go routine to cap the entire cluster in regular intervals of time. -var recapValue = 0.0 // The cluster wide cap value when recapping. +var fcfsRecapValue = 0.0 // The cluster wide cap value when recapping. func (s *ProactiveClusterwideCapFCFS) startRecapping() { go func() { for { select { case <-s.recapTicker.C: - mutex.Lock() + fcfsMutex.Lock() // If stopped performing cluster wide capping then we need to explicitly cap the entire cluster. - //if !s.isCapping && s.isRecapping && recapValue > 0.0 { - if s.isRecapping && recapValue > 0.0 { + //if !s.isCapping && s.isRecapping && fcfsRecapValue > 0.0 { + if s.isRecapping && fcfsRecapValue > 0.0 { for _, host := range constants.Hosts { // Rounding curreCapValue to the nearest int. - if err := rapl.Cap(host, "rapl", int(math.Floor(recapValue+0.5))); err != nil { + if err := rapl.Cap(host, "rapl", int(math.Floor(fcfsRecapValue+0.5))); err != nil { log.Println(err) } } - log.Printf("Recapped the cluster to %d", int(math.Floor(recapValue+0.5))) + log.Printf("Recapped the cluster to %d", int(math.Floor(fcfsRecapValue+0.5))) } // setting recapping to false s.isRecapping = false - mutex.Unlock() + fcfsMutex.Unlock() } } }() @@ -216,10 +216,10 @@ func (s *ProactiveClusterwideCapFCFS) stopCapping() { if s.isCapping { log.Println("Stopping the cluster wide capping.") s.ticker.Stop() - mutex.Lock() + fcfsMutex.Lock() s.isCapping = false s.isRecapping = true - mutex.Unlock() + fcfsMutex.Unlock() } } @@ -229,9 +229,9 @@ func (s *ProactiveClusterwideCapFCFS) stopRecapping() { if !s.isCapping && s.isRecapping { log.Println("Stopping the cluster wide re-capping.") s.recapTicker.Stop() - mutex.Lock() + fcfsMutex.Lock() s.isRecapping = false - mutex.Unlock() + fcfsMutex.Unlock() } } @@ -270,7 +270,7 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive For each task in s.tasks, 1. Need to check whether the offer can be taken or not (based on CPU and RAM requirements). 2. If the tasks fits the offer, then I need to detemrine the cluster wide cap. - 3. currentCapValue is updated with the determined cluster wide cap. + 3. fcfsCurrentCapValue is updated with the determined cluster wide cap. Cluster wide capping is currently performed at regular intervals of time. TODO: We can choose to cap the cluster only if the clusterwide cap varies more than the current clusterwide cap. @@ -288,18 +288,18 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive if s.takeOffer(offer, task) { // Capping the cluster if haven't yet started, if !s.isCapping { - mutex.Lock() + fcfsMutex.Lock() s.isCapping = true - mutex.Unlock() + fcfsMutex.Unlock() s.startCapping() } taken = true tempCap, err := s.capper.fcfsDetermineCap(s.totalPower, &task) if err == nil { - mutex.Lock() - currentCapValue = tempCap - mutex.Unlock() + fcfsMutex.Lock() + fcfsCurrentCapValue = tempCap + fcfsMutex.Unlock() } else { log.Printf("Failed to determine new cluster wide cap: ") log.Println(err) @@ -350,21 +350,22 @@ func (s *ProactiveClusterwideCapFCFS) StatusUpdate(driver sched.SchedulerDriver, s.capper.taskFinished(*status.TaskId.Value) // Determining the new cluster wide cap. tempCap, err := s.capper.recap(s.totalPower, s.taskMonitor, *status.TaskId.Value) + //tempCap, err := s.capper.cleverRecap(s.totalPower, s.taskMonitor, *status.TaskId.Value) if err == nil { // if new determined cap value is different from the current recap value then we need to recap. - if int(math.Floor(tempCap+0.5)) != int(math.Floor(recapValue+0.5)) { - recapValue = tempCap - mutex.Lock() + if int(math.Floor(tempCap+0.5)) != int(math.Floor(fcfsRecapValue+0.5)) { + fcfsRecapValue = tempCap + fcfsMutex.Lock() s.isRecapping = true - mutex.Unlock() - log.Printf("Determined re-cap value: %f\n", recapValue) + fcfsMutex.Unlock() + log.Printf("Determined re-cap value: %f\n", fcfsRecapValue) } else { - mutex.Lock() + fcfsMutex.Lock() s.isRecapping = false - mutex.Unlock() + fcfsMutex.Unlock() } } else { - // Not updating currentCapValue + // Not updating fcfsCurrentCapValue log.Println(err) } @@ -372,7 +373,7 @@ func (s *ProactiveClusterwideCapFCFS) StatusUpdate(driver sched.SchedulerDriver, if s.tasksRunning == 0 { select { case <-s.Shutdown: - // Need to stop the capping process. + // Need to stop the recapping process. s.stopRecapping() close(s.Done) default: