From 657dc8df9373686a4a7ad8dfb85f4e80869bdfa9 Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Fri, 26 Jan 2018 17:29:43 -0500 Subject: [PATCH] Track resource usage across the cluster. Created utility in utilities/ to track the total and the unused resources for each host in the cluster. Added utility to def/taskUtils.go to retrieve the resource requirement for a given taskID. Decoupled the code, to launch a list of tasks on a set of offerIDs, to schedulers/helpers.go and updated all the scheduling policies to call this function instead of directly calling mesos.SchedulerDriver#LaunchTasks. The resource availability of the cluster is updated at 2 stages -- 1. When the tasks are about to be launched (in schedulers/helpers.go#LaunchTasks), the scheduling policy switching logic will be able to adhere to the update in the resource availability due to the JUST launched tasks and 2. when a terminal status update is received for a task (in schedulers/base.go#statusUpdate). --- def/task.go | 1 + def/taskUtils.go | 37 ++++++++ schedulers/MaxGreedyMins.go | 4 +- schedulers/MaxMin.go | 4 +- schedulers/base.go | 6 ++ schedulers/bin-packing.go | 4 +- schedulers/first-fit.go | 5 +- schedulers/helpers.go | 15 +++ utilities/trackResourceUsage.go | 161 ++++++++++++++++++++++++++++++++ 9 files changed, 232 insertions(+), 5 deletions(-) create mode 100644 utilities/trackResourceUsage.go diff --git a/def/task.go b/def/task.go index 8624841..539b0e7 100644 --- a/def/task.go +++ b/def/task.go @@ -36,6 +36,7 @@ func TasksFromJSON(uri string) ([]Task, error) { return nil, errors.Wrap(err, "Error unmarshalling") } + initTaskResourceRequirements(tasks) return tasks, nil } diff --git a/def/taskUtils.go b/def/taskUtils.go index 602a006..9505589 100644 --- a/def/taskUtils.go +++ b/def/taskUtils.go @@ -4,6 +4,8 @@ import ( "github.com/mash/gokmeans" "log" "sort" + "errors" + "fmt" ) // Information about a cluster of tasks. @@ -122,3 +124,38 @@ func SortTasks(ts []Task, sb sortBy) { return sb(&ts[i]) <= sb(&ts[j]) }) } + +// Map taskIDs to resource requirements. +type TaskResources struct { + CPU float64 + Ram float64 + Watts float64 +} + +var taskResourceRequirement map[string]*TaskResources + +// Record resource requirements for all the tasks. +func initTaskResourceRequirements(tasks []Task) { + taskResourceRequirement = make(map[string]*TaskResources) + baseTaskID := "electron-" + for _, task := range tasks { + for i := *task.Instances; i > 0; i-- { + taskID := fmt.Sprintf("%s-%d", baseTaskID + task.Name, *task.Instances) + taskResourceRequirement[taskID] = &TaskResources{ + CPU: task.CPU, + Ram: task.RAM, + Watts: task.Watts, + } + } + } +} + +// Retrieve the resource requirement of a task specified by the TaskID +func GetResourceRequirement(taskID string) (TaskResources, error) { + if tr, ok := taskResourceRequirement[taskID]; ok { + return *tr, nil + } else { + // Shouldn't be here. + return TaskResources{}, errors.New("Invalid TaskID: " + taskID) + } +} \ No newline at end of file diff --git a/schedulers/MaxGreedyMins.go b/schedulers/MaxGreedyMins.go index 31d2726..076fef2 100644 --- a/schedulers/MaxGreedyMins.go +++ b/schedulers/MaxGreedyMins.go @@ -160,7 +160,9 @@ func (s *MaxGreedyMins) ConsumeOffers(spc SchedPolicyContext, driver sched.Sched if offerTaken { baseSchedRef.LogTaskStarting(nil, offer) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) + if err := LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, driver); err != nil { + baseSchedRef.LogElectronError(err) + } } else { // If there was no match for the task diff --git a/schedulers/MaxMin.go b/schedulers/MaxMin.go index 7614d84..abcbbc6 100644 --- a/schedulers/MaxMin.go +++ b/schedulers/MaxMin.go @@ -155,7 +155,9 @@ func (s *MaxMin) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDri if offerTaken { baseSchedRef.LogTaskStarting(nil, offer) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) + if err := LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, driver); err != nil { + baseSchedRef.LogElectronError(err) + } } else { // If there was no match for the task cpus, mem, watts := offerUtils.OfferAgg(offer) diff --git a/schedulers/base.go b/schedulers/base.go index 241371f..70f2b4c 100644 --- a/schedulers/base.go +++ b/schedulers/base.go @@ -12,6 +12,7 @@ import ( "log" "sync" "time" + "bitbucket.org/sunybingcloud/elektron/utilities" ) type baseScheduler struct { @@ -27,6 +28,7 @@ type baseScheduler struct { running map[string]map[string]bool wattsAsAResource bool classMapWatts bool + totalResourceAvailabilityRecorded bool // First set of PCP values are garbage values, signal to logger to start recording when we're // about to schedule a new task @@ -162,6 +164,7 @@ func (s *baseScheduler) Disconnected(sched.SchedulerDriver) { } func (s *baseScheduler) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { + utilities.RecordTotalResourceAvailability(offers) s.curSchedPolicy.ConsumeOffers(s, driver, offers) } @@ -170,6 +173,9 @@ func (s *baseScheduler) StatusUpdate(driver sched.SchedulerDriver, status *mesos if *status.State == mesos.TaskState_TASK_RUNNING { s.tasksRunning++ } else if IsTerminal(status.State) { + // Update resource availability. + utilities.ResourceAvailabilityUpdate("ON_TASK_TERMINAL_STATE", + *status.TaskId, *status.SlaveId) delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) s.tasksRunning-- if s.tasksRunning == 0 { diff --git a/schedulers/bin-packing.go b/schedulers/bin-packing.go index fe25185..1083e41 100644 --- a/schedulers/bin-packing.go +++ b/schedulers/bin-packing.go @@ -104,7 +104,9 @@ func (s *BinPackSortedWatts) ConsumeOffers(spc SchedPolicyContext, driver sched. if offerTaken { baseSchedRef.LogTaskStarting(nil, offer) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) + if err := LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, driver); err != nil { + baseSchedRef.LogElectronError(err) + } } else { // If there was no match for the task diff --git a/schedulers/first-fit.go b/schedulers/first-fit.go index 6a934d4..b720979 100644 --- a/schedulers/first-fit.go +++ b/schedulers/first-fit.go @@ -71,8 +71,9 @@ func (s *FirstFit) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerD tasks = append(tasks, taskToSchedule) baseSchedRef.LogTaskStarting(&task, offer) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) - + if err := LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, driver); err != nil { + baseSchedRef.LogElectronError(err) + } offerTaken = true baseSchedRef.LogSchedTrace(taskToSchedule, offer) diff --git a/schedulers/helpers.go b/schedulers/helpers.go index 2289014..ac791de 100644 --- a/schedulers/helpers.go +++ b/schedulers/helpers.go @@ -5,6 +5,10 @@ import ( "bitbucket.org/sunybingcloud/elektron/def" "errors" elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def" + "bitbucket.org/sunybingcloud/elektron/utilities" + "bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils" + mesos "github.com/mesos/mesos-go/api/v0/mesosproto" + sched "github.com/mesos/mesos-go/api/v0/scheduler" ) func coLocated(tasks map[string]bool, s baseScheduler) { @@ -119,3 +123,14 @@ func WithSchedPolSwitchEnabled(enableSchedPolicySwitch bool) schedPolicyOption { return nil } } + +// Launch tasks. +func LaunchTasks(offerIDs []*mesos.OfferID, tasksToLaunch []*mesos.TaskInfo, driver sched.SchedulerDriver) error { + driver.LaunchTasks(offerIDs, tasksToLaunch, mesosUtils.DefaultFilter) + // Update resource availability + var err error + for _, task := range tasksToLaunch { + err = utilities.ResourceAvailabilityUpdate("ON_TASK_ACTIVE_STATE", *task.TaskId, *task.SlaveId) + } + return err +} diff --git a/utilities/trackResourceUsage.go b/utilities/trackResourceUsage.go new file mode 100644 index 0000000..b19e5c9 --- /dev/null +++ b/utilities/trackResourceUsage.go @@ -0,0 +1,161 @@ +package utilities + +import ( + "bitbucket.org/sunybingcloud/elektron/def" + "bitbucket.org/sunybingcloud/elektron/utilities/offerUtils" + "errors" + mesos "github.com/mesos/mesos-go/api/v0/mesosproto" + "sync" +) + +type TrackResourceUsage struct { + perHostResourceAvailability map[string]ResourceCount + sync.Mutex +} + +// Maintain information regarding the usage of the cluster resources. +// This information is maintained for each node in the cluster. +type ResourceCount struct { + // Total resources available. + totalCPU float64 + totalRAM float64 + totalWatts float64 + + // Resources currently unused. + unusedCPU float64 + unusedRAM float64 + unusedWatts float64 +} + +// Increment unused resources. +func (rc *ResourceCount) IncrUnusedResources(tr def.TaskResources) { + rc.unusedCPU += tr.CPU + rc.unusedRAM += tr.Ram + rc.unusedWatts += tr.Watts +} + +// Decrement unused resources. +func (rc *ResourceCount) DecrUnusedResources(tr def.TaskResources) { + rc.unusedCPU -= tr.CPU + rc.unusedRAM -= tr.Ram + rc.unusedWatts -= tr.Watts +} + +var truInstance *TrackResourceUsage + +func getTRUInstance() *TrackResourceUsage { + if truInstance == nil { + truInstance = newResourceUsageTracker() + } + return truInstance +} + +func newResourceUsageTracker() *TrackResourceUsage { + return &TrackResourceUsage{ + perHostResourceAvailability: make(map[string]ResourceCount), + } +} + +// Determine the total available resources from the first round of mesos resource offers. +func RecordTotalResourceAvailability(offers []*mesos.Offer) { + tru := getTRUInstance() + tru.Lock() + defer tru.Unlock() + for _, offer := range offers { + // If first offer received from Mesos Agent. + if _, ok := tru.perHostResourceAvailability[*offer.SlaveId.Value]; !ok { + cpu, mem, watts := offerUtils.OfferAgg(offer) + tru.perHostResourceAvailability[*offer.SlaveId.Value] = ResourceCount{ + totalCPU: cpu, + totalRAM: mem, + totalWatts: watts, + + // Initially, all resources are used. + unusedCPU: cpu, + unusedRAM: mem, + unusedWatts: watts, + } + } + } +} + +// Resource availability update scenarios. +var resourceAvailabilityUpdateScenario = map[string]func(mesos.TaskID, mesos.SlaveID) error{ + "ON_TASK_TERMINAL_STATE": func(taskID mesos.TaskID, slaveID mesos.SlaveID) error { + tru := getTRUInstance() + tru.Lock() + defer tru.Unlock() + if taskResources, err := def.GetResourceRequirement(*taskID.Value); err != nil { + return err + } else { + // Checking if first resource offer already recorded for slaveID. + if resCount, ok := tru.perHostResourceAvailability[*slaveID.Value]; ok { + resCount.IncrUnusedResources(taskResources) + } else { + // Shouldn't be here. + // First round of mesos resource offers not recorded. + return errors.New("Recource Availability not recorded for " + *slaveID.Value) + } + return nil + } + }, + "ON_TASK_ACTIVE_STATE": func(taskID mesos.TaskID, slaveID mesos.SlaveID) error { + tru := getTRUInstance() + tru.Lock() + defer tru.Unlock() + if taskResources, err := def.GetResourceRequirement(*taskID.Value); err != nil { + return err + } else { + // Checking if first resource offer already recorded for slaveID. + if resCount, ok := tru.perHostResourceAvailability[*slaveID.Value]; ok { + resCount.DecrUnusedResources(taskResources) + } else { + // Shouldn't be here. + // First round of mesos resource offers not recorded. + return errors.New("Resource Availability not recorded for " + *slaveID.Value) + } + return nil + } + }, +} + +// Updating cluster resource availability based on the given scenario. +func ResourceAvailabilityUpdate(scenario string, taskID mesos.TaskID, slaveID mesos.SlaveID) error { + if updateFunc, ok := resourceAvailabilityUpdateScenario[scenario]; ok { + // Applying the update function + updateFunc(taskID, slaveID) + return nil + } else { + // Incorrect scenario specified. + return errors.New("Incorrect scenario specified for resource availability update: " + scenario) + } +} + +// Retrieve clusterwide resource availability. +func GetClusterwideResourceAvailability() ResourceCount { + tru := getTRUInstance() + tru.Lock() + defer tru.Unlock() + clusterwideResourceCount := ResourceCount{} + for _, resCount := range tru.perHostResourceAvailability { + // Aggregating the total CPU, RAM and Watts. + clusterwideResourceCount.totalCPU += resCount.totalCPU + clusterwideResourceCount.totalRAM += resCount.totalRAM + clusterwideResourceCount.totalWatts += resCount.totalWatts + + // Aggregating the total unused CPU, RAM and Watts. + clusterwideResourceCount.unusedCPU += resCount.unusedCPU + clusterwideResourceCount.unusedRAM += resCount.unusedRAM + clusterwideResourceCount.unusedWatts += resCount.unusedWatts + } + + return clusterwideResourceCount +} + +// Retrieve resource availability for each host in the cluster. +func GetPerHostResourceAvailability() map[string]ResourceCount { + tru := getTRUInstance() + tru.Lock() + defer tru.Unlock() + return tru.perHostResourceAvailability +}