From 0f305ab796df00d94526e27d4f5065caf8bb0a37 Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Thu, 1 Mar 2018 21:57:50 +0000 Subject: [PATCH] Merged in updateRunningOnTaskRunningStatus (pull request #9) critical: baseScheduler#Running wasn't getting update properly. Approved-by: Akash Kothawale --- logging/utils/createLogDir.go | 2 +- schedulers/base.go | 21 ++++++++++----------- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/logging/utils/createLogDir.go b/logging/utils/createLogDir.go index 6fae75d..0c698c0 100644 --- a/logging/utils/createLogDir.go +++ b/logging/utils/createLogDir.go @@ -30,7 +30,7 @@ func createLogDir(prefix string, startTime time.Time) string { logDirName += "-" logDirName += strconv.Itoa(startTime.Second()) if _, err := os.Stat(logDirName); os.IsNotExist(err) { - os.Mkdir(logDirName, 0700) + os.Mkdir(logDirName, 0755) } else { log.Println("Unable to create log directory: ", err) logDirName = "" diff --git a/schedulers/base.go b/schedulers/base.go index 3a624c2..191c745 100644 --- a/schedulers/base.go +++ b/schedulers/base.go @@ -105,15 +105,6 @@ func (s *BaseScheduler) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskIn time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts } - // If this is our first time running into this Agent - s.TasksRunningMutex.Lock() - if _, ok := s.Running[offer.GetSlaveId().GoString()]; !ok { - s.Running[offer.GetSlaveId().GoString()] = make(map[string]bool) - } - // Add task to list of tasks running on node - s.Running[offer.GetSlaveId().GoString()][taskName] = true - s.TasksRunningMutex.Unlock() - resources := []*mesos.Resource{ mesosutil.NewScalarResource("cpus", task.CPU), mesosutil.NewScalarResource("mem", task.RAM), @@ -192,7 +183,7 @@ func (s *BaseScheduler) ResourceOffers(driver sched.SchedulerDriver, offers []*m utilities.RecordTotalResourceAvailability(offers) for _, offer := range offers { if _, ok := s.HostNameToSlaveID[offer.GetHostname()]; !ok { - s.HostNameToSlaveID[offer.GetHostname()] = offer.GetSlaveId().GoString() + s.HostNameToSlaveID[offer.GetHostname()] = *offer.SlaveId.Value } } // If no resource offers have been received yet, and if scheduling policy switching has been enabled, @@ -215,13 +206,21 @@ func (s *BaseScheduler) ResourceOffers(driver sched.SchedulerDriver, offers []*m func (s *BaseScheduler) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { s.LogTaskStatusUpdate(status) if *status.State == mesos.TaskState_TASK_RUNNING { + // If this is our first time running into this Agent + s.TasksRunningMutex.Lock() + if _, ok := s.Running[*status.SlaveId.Value]; !ok { + s.Running[*status.SlaveId.Value] = make(map[string]bool) + } + // Add task to list of tasks running on node + s.Running[*status.SlaveId.Value][*status.TaskId.Value] = true + s.TasksRunningMutex.Unlock() s.tasksRunning++ } else if IsTerminal(status.State) { // Update resource availability. utilities.ResourceAvailabilityUpdate("ON_TASK_TERMINAL_STATE", *status.TaskId, *status.SlaveId) s.TasksRunningMutex.Lock() - delete(s.Running[status.GetSlaveId().GoString()], *status.TaskId.Value) + delete(s.Running[*status.SlaveId.Value], *status.TaskId.Value) s.TasksRunningMutex.Unlock() s.tasksRunning-- if s.tasksRunning == 0 {