Merged in updateRunningOnTaskRunningStatus (pull request #9)

critical: baseScheduler#Running wasn't getting update properly.

Approved-by: Akash Kothawale <akothaw1@binghamton.edu>
This commit is contained in:
Pradyumna Kaushik 2018-03-01 21:57:50 +00:00
parent b877d31cb8
commit 0f305ab796
2 changed files with 11 additions and 12 deletions

View file

@ -30,7 +30,7 @@ func createLogDir(prefix string, startTime time.Time) string {
logDirName += "-"
logDirName += strconv.Itoa(startTime.Second())
if _, err := os.Stat(logDirName); os.IsNotExist(err) {
os.Mkdir(logDirName, 0700)
os.Mkdir(logDirName, 0755)
} else {
log.Println("Unable to create log directory: ", err)
logDirName = ""

View file

@ -105,15 +105,6 @@ func (s *BaseScheduler) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskIn
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
}
// If this is our first time running into this Agent
s.TasksRunningMutex.Lock()
if _, ok := s.Running[offer.GetSlaveId().GoString()]; !ok {
s.Running[offer.GetSlaveId().GoString()] = make(map[string]bool)
}
// Add task to list of tasks running on node
s.Running[offer.GetSlaveId().GoString()][taskName] = true
s.TasksRunningMutex.Unlock()
resources := []*mesos.Resource{
mesosutil.NewScalarResource("cpus", task.CPU),
mesosutil.NewScalarResource("mem", task.RAM),
@ -192,7 +183,7 @@ func (s *BaseScheduler) ResourceOffers(driver sched.SchedulerDriver, offers []*m
utilities.RecordTotalResourceAvailability(offers)
for _, offer := range offers {
if _, ok := s.HostNameToSlaveID[offer.GetHostname()]; !ok {
s.HostNameToSlaveID[offer.GetHostname()] = offer.GetSlaveId().GoString()
s.HostNameToSlaveID[offer.GetHostname()] = *offer.SlaveId.Value
}
}
// If no resource offers have been received yet, and if scheduling policy switching has been enabled,
@ -215,13 +206,21 @@ func (s *BaseScheduler) ResourceOffers(driver sched.SchedulerDriver, offers []*m
func (s *BaseScheduler) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) {
s.LogTaskStatusUpdate(status)
if *status.State == mesos.TaskState_TASK_RUNNING {
// If this is our first time running into this Agent
s.TasksRunningMutex.Lock()
if _, ok := s.Running[*status.SlaveId.Value]; !ok {
s.Running[*status.SlaveId.Value] = make(map[string]bool)
}
// Add task to list of tasks running on node
s.Running[*status.SlaveId.Value][*status.TaskId.Value] = true
s.TasksRunningMutex.Unlock()
s.tasksRunning++
} else if IsTerminal(status.State) {
// Update resource availability.
utilities.ResourceAvailabilityUpdate("ON_TASK_TERMINAL_STATE",
*status.TaskId, *status.SlaveId)
s.TasksRunningMutex.Lock()
delete(s.Running[status.GetSlaveId().GoString()], *status.TaskId.Value)
delete(s.Running[*status.SlaveId.Value], *status.TaskId.Value)
s.TasksRunningMutex.Unlock()
s.tasksRunning--
if s.tasksRunning == 0 {