From 7bd562cacb88f19f0151d50ee04f19c7a9393f44 Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Tue, 29 Nov 2016 23:00:03 -0500 Subject: [PATCH] synchronized operations on tasksRunning and hence, prevented previously occuring race condition. --- .../proactiveclusterwidecappingranked.go | 31 ++++++++++++------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/schedulers/proactiveclusterwidecappingranked.go b/schedulers/proactiveclusterwidecappingranked.go index f6ea425..d2565f3 100644 --- a/schedulers/proactiveclusterwidecappingranked.go +++ b/schedulers/proactiveclusterwidecappingranked.go @@ -222,21 +222,26 @@ func (s *ProactiveClusterwideCapRanked) startRecapping() { // Stop cluster wide capping func (s *ProactiveClusterwideCapRanked) stopCapping() { - log.Println("Stopping the cluster wide capping.") - s.ticker.Stop() - rankedMutex.Lock() - s.isCapping = false - s.isRecapping = true - rankedMutex.Unlock() + if s.isCapping { + log.Println("Stopping the cluster wide capping.") + s.ticker.Stop() + fcfsMutex.Lock() + s.isCapping = false + s.isRecapping = true + fcfsMutex.Unlock() + } } // Stop cluster wide Recapping func (s *ProactiveClusterwideCapRanked) stopRecapping() { - log.Println("Stopping the cluster wide re-capping.") - s.recapTicker.Stop() - rankedMutex.Lock() - s.isRecapping = false - rankedMutex.Unlock() + // If not capping, then definitely recapping. + if !s.isCapping && s.isRecapping { + log.Println("Stopping the cluster wide re-capping.") + s.recapTicker.Stop() + fcfsMutex.Lock() + s.isRecapping = false + fcfsMutex.Unlock() + } } func (s *ProactiveClusterwideCapRanked) ResouceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { @@ -355,10 +360,14 @@ func (s *ProactiveClusterwideCapRanked) StatusUpdate(driver sched.SchedulerDrive log.Printf("Received task status [%s] for task [%s]\n", NameFor(status.State), *status.TaskId.Value) if *status.State == mesos.TaskState_TASK_RUNNING { + rankedMutex.Lock() s.tasksRunning++ + rankedMutex.Unlock() } else if IsTerminal(status.State) { delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) + rankedMutex.Lock() s.tasksRunning-- + rankedMutex.Unlock() if s.tasksRunning == 0 { select { case <-s.Shutdown: