diff --git a/schedulers/proactiveclusterwidecappingranked.go b/schedulers/proactiveclusterwidecappingranked.go index f6ea425..d2565f3 100644 --- a/schedulers/proactiveclusterwidecappingranked.go +++ b/schedulers/proactiveclusterwidecappingranked.go @@ -222,21 +222,26 @@ func (s *ProactiveClusterwideCapRanked) startRecapping() { // Stop cluster wide capping func (s *ProactiveClusterwideCapRanked) stopCapping() { - log.Println("Stopping the cluster wide capping.") - s.ticker.Stop() - rankedMutex.Lock() - s.isCapping = false - s.isRecapping = true - rankedMutex.Unlock() + if s.isCapping { + log.Println("Stopping the cluster wide capping.") + s.ticker.Stop() + fcfsMutex.Lock() + s.isCapping = false + s.isRecapping = true + fcfsMutex.Unlock() + } } // Stop cluster wide Recapping func (s *ProactiveClusterwideCapRanked) stopRecapping() { - log.Println("Stopping the cluster wide re-capping.") - s.recapTicker.Stop() - rankedMutex.Lock() - s.isRecapping = false - rankedMutex.Unlock() + // If not capping, then definitely recapping. + if !s.isCapping && s.isRecapping { + log.Println("Stopping the cluster wide re-capping.") + s.recapTicker.Stop() + fcfsMutex.Lock() + s.isRecapping = false + fcfsMutex.Unlock() + } } func (s *ProactiveClusterwideCapRanked) ResouceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { @@ -355,10 +360,14 @@ func (s *ProactiveClusterwideCapRanked) StatusUpdate(driver sched.SchedulerDrive log.Printf("Received task status [%s] for task [%s]\n", NameFor(status.State), *status.TaskId.Value) if *status.State == mesos.TaskState_TASK_RUNNING { + rankedMutex.Lock() s.tasksRunning++ + rankedMutex.Unlock() } else if IsTerminal(status.State) { delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) + rankedMutex.Lock() s.tasksRunning-- + rankedMutex.Unlock() if s.tasksRunning == 0 { select { case <-s.Shutdown: