From bf6c5eded9d9169707c23234af8e7ff375f33dce Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Tue, 20 Dec 2016 14:56:07 -0500 Subject: [PATCH] used the generic running average calculator --- schedulers/proactiveclusterwidecappers.go | 102 ++++++---------------- 1 file changed, 25 insertions(+), 77 deletions(-) diff --git a/schedulers/proactiveclusterwidecappers.go b/schedulers/proactiveclusterwidecappers.go index 418980e..275aaa8 100644 --- a/schedulers/proactiveclusterwidecappers.go +++ b/schedulers/proactiveclusterwidecappers.go @@ -1,12 +1,5 @@ /* Cluster wide dynamic capping -Step1. Compute the running average of watts of tasks in window. -Step2. Compute what percentage of total power of each node, is the running average. -Step3. Compute the median of the percetages and this is the percentage that the cluster needs to be capped at. - -1. First fit scheduling -- Perform the above steps for each task that needs to be scheduled. -2. Ranked based scheduling -- Sort the tasks to be scheduled, in ascending order, and then determine the cluster wide cap. - This is not a scheduler but a scheduling scheme that schedulers can use. */ package schedulers @@ -14,26 +7,32 @@ package schedulers import ( "bitbucket.org/sunybingcloud/electron/constants" "bitbucket.org/sunybingcloud/electron/def" - "container/list" + "bitbucket.org/sunybingcloud/electron/utilities/runAvg" "errors" "github.com/montanaflynn/stats" "log" "sort" ) -// Structure containing utility data structures used to compute cluster-wide dynamic cap. -type clusterwideCapper struct { - // window of tasks. - windowOfTasks list.List - // The current sum of requested powers of the tasks in the window. - currentSum float64 - // The current number of tasks in the window. - numberOfTasksInWindow int +// wrapper around def.Task that implements runAvg.Interface +type taskWrapper struct { + task def.Task } -// Defining constructor for clusterwideCapper. Please don't call this directly and instead use getClusterwideCapperInstance(). +func (tw taskWrapper) Val() float64 { + return tw.task.Watts * constants.CapMargin +} + +func (tw taskWrapper) ID() string { + return tw.task.TaskID +} + +// Cluster wide capper. Contains a type that implements runAvg.Interface +type clusterwideCapper struct {} + +// Defining constructor for clusterwideCapper. Please don't call this directly and instead use getClusterwideCapperInstance() func newClusterwideCapper() *clusterwideCapper { - return &clusterwideCapper{currentSum: 0.0, numberOfTasksInWindow: 0} + return &clusterwideCapper{} } // Singleton instance of clusterwideCapper @@ -49,41 +48,9 @@ func getClusterwideCapperInstance() *clusterwideCapper { return singletonCapper } -// Clear and initialize all the members of clusterwideCapper. +// Clear and initialize the runAvg calculator func (capper clusterwideCapper) clear() { - capper.windowOfTasks.Init() - capper.currentSum = 0 - capper.numberOfTasksInWindow = 0 -} - -// Compute the average of watts of all the tasks in the window. -func (capper clusterwideCapper) average() float64 { - return capper.currentSum / float64(capper.windowOfTasks.Len()) -} - -/* -Compute the running average. - -Using clusterwideCapper#windowOfTasks to store the tasks. -Task at position 0 (oldest task) is removed when the window is full and new task arrives. -*/ -func (capper clusterwideCapper) runningAverageOfWatts(tsk *def.Task) float64 { - var average float64 - if capper.numberOfTasksInWindow < constants.WindowSize { - capper.windowOfTasks.PushBack(tsk) - capper.numberOfTasksInWindow++ - capper.currentSum += float64(tsk.Watts) * constants.CapMargin - } else { - taskToRemoveElement := capper.windowOfTasks.Front() - if taskToRemove, ok := taskToRemoveElement.Value.(*def.Task); ok { - capper.currentSum -= float64(taskToRemove.Watts) * constants.CapMargin - capper.windowOfTasks.Remove(taskToRemoveElement) - } - capper.windowOfTasks.PushBack(tsk) - capper.currentSum += float64(tsk.Watts) * constants.CapMargin - } - average = capper.average() - return average + runAvg.Init() } /* @@ -265,33 +232,14 @@ func (capper clusterwideCapper) recap(totalPower map[string]float64, } /* -Remove entry for finished task. -This function is called when a task completes. +Remove entry for finished task from the window + +This function is called when a task completes. This completed task needs to be removed from the window of tasks (if it is still present) - so that it doesn't contribute to the computation of the cap value. + so that it doesn't contribute to the computation of the next cap value. */ func (capper clusterwideCapper) taskFinished(taskID string) { - // If the window is empty the just return. This condition should technically return false. - if capper.windowOfTasks.Len() == 0 { - return - } - - // Checking whether the task with the given taskID is currently present in the window of tasks. - var taskElementToRemove *list.Element - for taskElement := capper.windowOfTasks.Front(); taskElement != nil; taskElement = taskElement.Next() { - if tsk, ok := taskElement.Value.(*def.Task); ok { - if tsk.TaskID == taskID { - taskElementToRemove = taskElement - } - } - } - - // we need to remove the task from the window. - if taskToRemove, ok := taskElementToRemove.Value.(*def.Task); ok { - capper.windowOfTasks.Remove(taskElementToRemove) - capper.numberOfTasksInWindow -= 1 - capper.currentSum -= float64(taskToRemove.Watts) * constants.CapMargin - } + runAvg.Remove(taskID) } // First come first serve scheduling. @@ -302,7 +250,7 @@ func (capper clusterwideCapper) fcfsDetermineCap(totalPower map[string]float64, return 100, errors.New("Invalid argument: totalPower") } else { // Need to calculate the running average - runningAverage := capper.runningAverageOfWatts(newTask) + runningAverage := runAvg.Calc(taskWrapper{task: *newTask}, constants.WindowSize) // For each node, calculate the percentage of the running average to the total power. ratios := make(map[string]float64) for host, tpower := range totalPower {