used the generic running average calculator

This commit is contained in:
Pradyumna Kaushik 2016-12-20 14:56:07 -05:00 committed by Renan DelValle
parent f304cd295a
commit bf6c5eded9

View file

@ -1,12 +1,5 @@
/*
Cluster wide dynamic capping
Step1. Compute the running average of watts of tasks in window.
Step2. Compute what percentage of total power of each node, is the running average.
Step3. Compute the median of the percetages and this is the percentage that the cluster needs to be capped at.
1. First fit scheduling -- Perform the above steps for each task that needs to be scheduled.
2. Ranked based scheduling -- Sort the tasks to be scheduled, in ascending order, and then determine the cluster wide cap.
This is not a scheduler but a scheduling scheme that schedulers can use.
*/
package schedulers
@ -14,26 +7,32 @@ package schedulers
import (
"bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/def"
"container/list"
"bitbucket.org/sunybingcloud/electron/utilities/runAvg"
"errors"
"github.com/montanaflynn/stats"
"log"
"sort"
)
// Structure containing utility data structures used to compute cluster-wide dynamic cap.
type clusterwideCapper struct {
// window of tasks.
windowOfTasks list.List
// The current sum of requested powers of the tasks in the window.
currentSum float64
// The current number of tasks in the window.
numberOfTasksInWindow int
// wrapper around def.Task that implements runAvg.Interface
type taskWrapper struct {
task def.Task
}
// Defining constructor for clusterwideCapper. Please don't call this directly and instead use getClusterwideCapperInstance().
func (tw taskWrapper) Val() float64 {
return tw.task.Watts * constants.CapMargin
}
func (tw taskWrapper) ID() string {
return tw.task.TaskID
}
// Cluster wide capper. Contains a type that implements runAvg.Interface
type clusterwideCapper struct {}
// Defining constructor for clusterwideCapper. Please don't call this directly and instead use getClusterwideCapperInstance()
func newClusterwideCapper() *clusterwideCapper {
return &clusterwideCapper{currentSum: 0.0, numberOfTasksInWindow: 0}
return &clusterwideCapper{}
}
// Singleton instance of clusterwideCapper
@ -49,41 +48,9 @@ func getClusterwideCapperInstance() *clusterwideCapper {
return singletonCapper
}
// Clear and initialize all the members of clusterwideCapper.
// Clear and initialize the runAvg calculator
func (capper clusterwideCapper) clear() {
capper.windowOfTasks.Init()
capper.currentSum = 0
capper.numberOfTasksInWindow = 0
}
// Compute the average of watts of all the tasks in the window.
func (capper clusterwideCapper) average() float64 {
return capper.currentSum / float64(capper.windowOfTasks.Len())
}
/*
Compute the running average.
Using clusterwideCapper#windowOfTasks to store the tasks.
Task at position 0 (oldest task) is removed when the window is full and new task arrives.
*/
func (capper clusterwideCapper) runningAverageOfWatts(tsk *def.Task) float64 {
var average float64
if capper.numberOfTasksInWindow < constants.WindowSize {
capper.windowOfTasks.PushBack(tsk)
capper.numberOfTasksInWindow++
capper.currentSum += float64(tsk.Watts) * constants.CapMargin
} else {
taskToRemoveElement := capper.windowOfTasks.Front()
if taskToRemove, ok := taskToRemoveElement.Value.(*def.Task); ok {
capper.currentSum -= float64(taskToRemove.Watts) * constants.CapMargin
capper.windowOfTasks.Remove(taskToRemoveElement)
}
capper.windowOfTasks.PushBack(tsk)
capper.currentSum += float64(tsk.Watts) * constants.CapMargin
}
average = capper.average()
return average
runAvg.Init()
}
/*
@ -265,33 +232,14 @@ func (capper clusterwideCapper) recap(totalPower map[string]float64,
}
/*
Remove entry for finished task.
This function is called when a task completes.
Remove entry for finished task from the window
This function is called when a task completes.
This completed task needs to be removed from the window of tasks (if it is still present)
so that it doesn't contribute to the computation of the cap value.
so that it doesn't contribute to the computation of the next cap value.
*/
func (capper clusterwideCapper) taskFinished(taskID string) {
// If the window is empty the just return. This condition should technically return false.
if capper.windowOfTasks.Len() == 0 {
return
}
// Checking whether the task with the given taskID is currently present in the window of tasks.
var taskElementToRemove *list.Element
for taskElement := capper.windowOfTasks.Front(); taskElement != nil; taskElement = taskElement.Next() {
if tsk, ok := taskElement.Value.(*def.Task); ok {
if tsk.TaskID == taskID {
taskElementToRemove = taskElement
}
}
}
// we need to remove the task from the window.
if taskToRemove, ok := taskElementToRemove.Value.(*def.Task); ok {
capper.windowOfTasks.Remove(taskElementToRemove)
capper.numberOfTasksInWindow -= 1
capper.currentSum -= float64(taskToRemove.Watts) * constants.CapMargin
}
runAvg.Remove(taskID)
}
// First come first serve scheduling.
@ -302,7 +250,7 @@ func (capper clusterwideCapper) fcfsDetermineCap(totalPower map[string]float64,
return 100, errors.New("Invalid argument: totalPower")
} else {
// Need to calculate the running average
runningAverage := capper.runningAverageOfWatts(newTask)
runningAverage := runAvg.Calc(taskWrapper{task: *newTask}, constants.WindowSize)
// For each node, calculate the percentage of the running average to the total power.
ratios := make(map[string]float64)
for host, tpower := range totalPower {