synchronized operations on tasksRunning and hence prevented the previously occuring race condition.
This commit is contained in:
parent
8767cc61da
commit
24d2b89aa0
1 changed files with 4 additions and 0 deletions
|
@ -339,7 +339,9 @@ func (s *ProactiveClusterwideCapFCFS) StatusUpdate(driver sched.SchedulerDriver,
|
||||||
log.Printf("Received task status [%s] for task [%s]\n", NameFor(status.State), *status.TaskId.Value)
|
log.Printf("Received task status [%s] for task [%s]\n", NameFor(status.State), *status.TaskId.Value)
|
||||||
|
|
||||||
if *status.State == mesos.TaskState_TASK_RUNNING {
|
if *status.State == mesos.TaskState_TASK_RUNNING {
|
||||||
|
fcfsMutex.Lock()
|
||||||
s.tasksRunning++
|
s.tasksRunning++
|
||||||
|
fcfsMutex.Unlock()
|
||||||
} else if IsTerminal(status.State) {
|
} else if IsTerminal(status.State) {
|
||||||
delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value)
|
delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value)
|
||||||
// Need to remove the task from the window of tasks.
|
// Need to remove the task from the window of tasks.
|
||||||
|
@ -365,7 +367,9 @@ func (s *ProactiveClusterwideCapFCFS) StatusUpdate(driver sched.SchedulerDriver,
|
||||||
log.Println(err)
|
log.Println(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fcfsMutex.Lock()
|
||||||
s.tasksRunning--
|
s.tasksRunning--
|
||||||
|
fcfsMutex.Unlock()
|
||||||
if s.tasksRunning == 0 {
|
if s.tasksRunning == 0 {
|
||||||
select {
|
select {
|
||||||
case <-s.Shutdown:
|
case <-s.Shutdown:
|
||||||
|
|
Reference in a new issue