synchronized operations on tasksRunning and hence, prevented previously occuring race condition.
This commit is contained in:
parent
24d2b89aa0
commit
7bd562cacb
1 changed files with 20 additions and 11 deletions
|
@ -222,21 +222,26 @@ func (s *ProactiveClusterwideCapRanked) startRecapping() {
|
||||||
|
|
||||||
// Stop cluster wide capping
|
// Stop cluster wide capping
|
||||||
func (s *ProactiveClusterwideCapRanked) stopCapping() {
|
func (s *ProactiveClusterwideCapRanked) stopCapping() {
|
||||||
log.Println("Stopping the cluster wide capping.")
|
if s.isCapping {
|
||||||
s.ticker.Stop()
|
log.Println("Stopping the cluster wide capping.")
|
||||||
rankedMutex.Lock()
|
s.ticker.Stop()
|
||||||
s.isCapping = false
|
fcfsMutex.Lock()
|
||||||
s.isRecapping = true
|
s.isCapping = false
|
||||||
rankedMutex.Unlock()
|
s.isRecapping = true
|
||||||
|
fcfsMutex.Unlock()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop cluster wide Recapping
|
// Stop cluster wide Recapping
|
||||||
func (s *ProactiveClusterwideCapRanked) stopRecapping() {
|
func (s *ProactiveClusterwideCapRanked) stopRecapping() {
|
||||||
log.Println("Stopping the cluster wide re-capping.")
|
// If not capping, then definitely recapping.
|
||||||
s.recapTicker.Stop()
|
if !s.isCapping && s.isRecapping {
|
||||||
rankedMutex.Lock()
|
log.Println("Stopping the cluster wide re-capping.")
|
||||||
s.isRecapping = false
|
s.recapTicker.Stop()
|
||||||
rankedMutex.Unlock()
|
fcfsMutex.Lock()
|
||||||
|
s.isRecapping = false
|
||||||
|
fcfsMutex.Unlock()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ProactiveClusterwideCapRanked) ResouceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {
|
func (s *ProactiveClusterwideCapRanked) ResouceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {
|
||||||
|
@ -355,10 +360,14 @@ func (s *ProactiveClusterwideCapRanked) StatusUpdate(driver sched.SchedulerDrive
|
||||||
log.Printf("Received task status [%s] for task [%s]\n", NameFor(status.State), *status.TaskId.Value)
|
log.Printf("Received task status [%s] for task [%s]\n", NameFor(status.State), *status.TaskId.Value)
|
||||||
|
|
||||||
if *status.State == mesos.TaskState_TASK_RUNNING {
|
if *status.State == mesos.TaskState_TASK_RUNNING {
|
||||||
|
rankedMutex.Lock()
|
||||||
s.tasksRunning++
|
s.tasksRunning++
|
||||||
|
rankedMutex.Unlock()
|
||||||
} else if IsTerminal(status.State) {
|
} else if IsTerminal(status.State) {
|
||||||
delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value)
|
delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value)
|
||||||
|
rankedMutex.Lock()
|
||||||
s.tasksRunning--
|
s.tasksRunning--
|
||||||
|
rankedMutex.Unlock()
|
||||||
if s.tasksRunning == 0 {
|
if s.tasksRunning == 0 {
|
||||||
select {
|
select {
|
||||||
case <-s.Shutdown:
|
case <-s.Shutdown:
|
||||||
|
|
Reference in a new issue