diff --git a/def/taskUtils.go b/def/taskUtils.go index 9505589..9bff28f 100644 --- a/def/taskUtils.go +++ b/def/taskUtils.go @@ -140,7 +140,7 @@ func initTaskResourceRequirements(tasks []Task) { baseTaskID := "electron-" for _, task := range tasks { for i := *task.Instances; i > 0; i-- { - taskID := fmt.Sprintf("%s-%d", baseTaskID + task.Name, *task.Instances) + taskID := fmt.Sprintf("%s-%d", baseTaskID + task.Name, i) taskResourceRequirement[taskID] = &TaskResources{ CPU: task.CPU, Ram: task.RAM, diff --git a/schedulers/helpers.go b/schedulers/helpers.go index ac791de..dbead7f 100644 --- a/schedulers/helpers.go +++ b/schedulers/helpers.go @@ -9,6 +9,7 @@ import ( "bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils" mesos "github.com/mesos/mesos-go/api/v0/mesosproto" sched "github.com/mesos/mesos-go/api/v0/scheduler" + "log" ) func coLocated(tasks map[string]bool, s baseScheduler) { @@ -131,6 +132,9 @@ func LaunchTasks(offerIDs []*mesos.OfferID, tasksToLaunch []*mesos.TaskInfo, dri var err error for _, task := range tasksToLaunch { err = utilities.ResourceAvailabilityUpdate("ON_TASK_ACTIVE_STATE", *task.TaskId, *task.SlaveId) + if err != nil { + log.Println(err) + } } return err } diff --git a/utilities/trackResourceUsage.go b/utilities/trackResourceUsage.go index b19e5c9..89a8e10 100644 --- a/utilities/trackResourceUsage.go +++ b/utilities/trackResourceUsage.go @@ -9,7 +9,7 @@ import ( ) type TrackResourceUsage struct { - perHostResourceAvailability map[string]ResourceCount + perHostResourceAvailability map[string]*ResourceCount sync.Mutex } @@ -17,28 +17,28 @@ type TrackResourceUsage struct { // This information is maintained for each node in the cluster. type ResourceCount struct { // Total resources available. - totalCPU float64 - totalRAM float64 - totalWatts float64 + TotalCPU float64 + TotalRAM float64 + TotalWatts float64 // Resources currently unused. - unusedCPU float64 - unusedRAM float64 - unusedWatts float64 + UnusedCPU float64 + UnusedRAM float64 + UnusedWatts float64 } // Increment unused resources. func (rc *ResourceCount) IncrUnusedResources(tr def.TaskResources) { - rc.unusedCPU += tr.CPU - rc.unusedRAM += tr.Ram - rc.unusedWatts += tr.Watts + rc.UnusedCPU += tr.CPU + rc.UnusedRAM += tr.Ram + rc.UnusedWatts += tr.Watts } // Decrement unused resources. func (rc *ResourceCount) DecrUnusedResources(tr def.TaskResources) { - rc.unusedCPU -= tr.CPU - rc.unusedRAM -= tr.Ram - rc.unusedWatts -= tr.Watts + rc.UnusedCPU -= tr.CPU + rc.UnusedRAM -= tr.Ram + rc.UnusedWatts -= tr.Watts } var truInstance *TrackResourceUsage @@ -52,7 +52,7 @@ func getTRUInstance() *TrackResourceUsage { func newResourceUsageTracker() *TrackResourceUsage { return &TrackResourceUsage{ - perHostResourceAvailability: make(map[string]ResourceCount), + perHostResourceAvailability: make(map[string]*ResourceCount), } } @@ -65,15 +65,15 @@ func RecordTotalResourceAvailability(offers []*mesos.Offer) { // If first offer received from Mesos Agent. if _, ok := tru.perHostResourceAvailability[*offer.SlaveId.Value]; !ok { cpu, mem, watts := offerUtils.OfferAgg(offer) - tru.perHostResourceAvailability[*offer.SlaveId.Value] = ResourceCount{ - totalCPU: cpu, - totalRAM: mem, - totalWatts: watts, + tru.perHostResourceAvailability[*offer.SlaveId.Value] = &ResourceCount{ + TotalCPU: cpu, + TotalRAM: mem, + TotalWatts: watts, // Initially, all resources are used. - unusedCPU: cpu, - unusedRAM: mem, - unusedWatts: watts, + UnusedCPU: cpu, + UnusedRAM: mem, + UnusedWatts: watts, } } } @@ -139,21 +139,21 @@ func GetClusterwideResourceAvailability() ResourceCount { clusterwideResourceCount := ResourceCount{} for _, resCount := range tru.perHostResourceAvailability { // Aggregating the total CPU, RAM and Watts. - clusterwideResourceCount.totalCPU += resCount.totalCPU - clusterwideResourceCount.totalRAM += resCount.totalRAM - clusterwideResourceCount.totalWatts += resCount.totalWatts + clusterwideResourceCount.TotalCPU += resCount.TotalCPU + clusterwideResourceCount.TotalRAM += resCount.TotalRAM + clusterwideResourceCount.TotalWatts += resCount.TotalWatts // Aggregating the total unused CPU, RAM and Watts. - clusterwideResourceCount.unusedCPU += resCount.unusedCPU - clusterwideResourceCount.unusedRAM += resCount.unusedRAM - clusterwideResourceCount.unusedWatts += resCount.unusedWatts + clusterwideResourceCount.UnusedCPU += resCount.UnusedCPU + clusterwideResourceCount.UnusedRAM += resCount.UnusedRAM + clusterwideResourceCount.UnusedWatts += resCount.UnusedWatts } return clusterwideResourceCount } // Retrieve resource availability for each host in the cluster. -func GetPerHostResourceAvailability() map[string]ResourceCount { +func GetPerHostResourceAvailability() map[string]*ResourceCount { tru := getTRUInstance() tru.Lock() defer tru.Unlock()