2016-11-10 19:57:36 -05:00
/ *
Cluster wide dynamic capping
2016-11-14 22:43:05 -05:00
Step1 . Compute the running average of watts of tasks in window .
Step2 . Compute what percentage of total power of each node , is the running average .
Step3 . Compute the median of the percetages and this is the percentage that the cluster needs to be capped at .
2016-11-10 19:57:36 -05:00
2016-11-14 22:43:05 -05:00
1. First fit scheduling -- Perform the above steps for each task that needs to be scheduled .
2016-11-22 17:02:58 -05:00
2. Ranked based scheduling -- Sort the tasks to be scheduled , in ascending order , and then determine the cluster wide cap .
2016-11-10 19:57:36 -05:00
This is not a scheduler but a scheduling scheme that schedulers can use .
* /
package schedulers
import (
2016-11-14 22:53:06 -05:00
"bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/def"
"container/list"
"errors"
"github.com/montanaflynn/stats"
2016-11-25 17:42:08 -05:00
"log"
2016-11-14 22:53:06 -05:00
"sort"
2016-11-10 19:57:36 -05:00
)
// Structure containing utility data structures used to compute cluster-wide dynamic cap.
type clusterwideCapper struct {
2016-11-14 22:53:06 -05:00
// window of tasks.
2016-11-28 17:18:33 -05:00
windowOfTasks list . List
2016-11-14 22:53:06 -05:00
// The current sum of requested powers of the tasks in the window.
2016-11-28 17:18:33 -05:00
currentSum float64
2016-11-14 22:53:06 -05:00
// The current number of tasks in the window.
2016-11-28 17:18:33 -05:00
numberOfTasksInWindow int
2016-11-10 19:57:36 -05:00
}
// Defining constructor for clusterwideCapper. Please don't call this directly and instead use getClusterwideCapperInstance().
func newClusterwideCapper ( ) * clusterwideCapper {
2016-11-28 17:18:33 -05:00
return & clusterwideCapper { currentSum : 0.0 , numberOfTasksInWindow : 0 }
2016-11-10 19:57:36 -05:00
}
// Singleton instance of clusterwideCapper
2016-11-28 17:18:33 -05:00
var singletonCapper * clusterwideCapper
2016-11-14 22:53:06 -05:00
2016-11-10 19:57:36 -05:00
// Retrieve the singleton instance of clusterwideCapper.
func getClusterwideCapperInstance ( ) * clusterwideCapper {
2016-11-28 17:18:33 -05:00
if singletonCapper == nil {
singletonCapper = newClusterwideCapper ( )
2016-11-14 22:53:06 -05:00
} else {
// Do nothing
}
2016-11-28 17:18:33 -05:00
return singletonCapper
2016-11-10 19:57:36 -05:00
}
// Clear and initialize all the members of clusterwideCapper.
func ( capper clusterwideCapper ) clear ( ) {
2016-11-28 17:18:33 -05:00
capper . windowOfTasks . Init ( )
capper . currentSum = 0
capper . numberOfTasksInWindow = 0
2016-11-10 19:57:36 -05:00
}
// Compute the average of watts of all the tasks in the window.
func ( capper clusterwideCapper ) average ( ) float64 {
2016-11-28 17:18:33 -05:00
return capper . currentSum / float64 ( capper . windowOfTasks . Len ( ) )
2016-11-10 19:57:36 -05:00
}
/ *
Compute the running average .
2016-11-28 17:18:33 -05:00
Using clusterwideCapper # windowOfTasks to store the tasks .
2016-11-10 19:57:36 -05:00
Task at position 0 ( oldest task ) is removed when the window is full and new task arrives .
* /
2016-11-28 17:18:33 -05:00
func ( capper clusterwideCapper ) runningAverageOfWatts ( tsk * def . Task ) float64 {
2016-11-14 22:53:06 -05:00
var average float64
2016-11-28 17:18:33 -05:00
if capper . numberOfTasksInWindow < constants . WindowSize {
capper . windowOfTasks . PushBack ( tsk )
capper . numberOfTasksInWindow ++
capper . currentSum += float64 ( tsk . Watts ) * constants . CapMargin
2016-11-14 22:53:06 -05:00
} else {
2016-11-28 17:18:33 -05:00
taskToRemoveElement := capper . windowOfTasks . Front ( )
if taskToRemove , ok := taskToRemoveElement . Value . ( * def . Task ) ; ok {
capper . currentSum -= float64 ( taskToRemove . Watts ) * constants . CapMargin
capper . windowOfTasks . Remove ( taskToRemoveElement )
2016-11-14 22:53:06 -05:00
}
2016-11-28 17:18:33 -05:00
capper . windowOfTasks . PushBack ( tsk )
capper . currentSum += float64 ( tsk . Watts ) * constants . CapMargin
2016-11-14 22:53:06 -05:00
}
average = capper . average ( )
return average
2016-11-10 19:57:36 -05:00
}
/ *
Calculating cap value .
2016-11-28 17:18:33 -05:00
1. Sorting the values of runningAverageToTotalPowerPercentage in ascending order .
2016-11-14 22:43:05 -05:00
2. Computing the median of above sorted values .
3. The median is now the cap .
2016-11-10 19:57:36 -05:00
* /
2016-11-28 17:18:33 -05:00
func ( capper clusterwideCapper ) getCap ( runningAverageToTotalPowerPercentage map [ string ] float64 ) float64 {
2016-11-14 22:53:06 -05:00
var values [ ] float64
// Validation
2016-11-28 17:18:33 -05:00
if runningAverageToTotalPowerPercentage == nil {
2016-11-14 22:53:06 -05:00
return 100.0
}
2016-11-28 17:18:33 -05:00
for _ , apower := range runningAverageToTotalPowerPercentage {
2016-11-14 22:53:06 -05:00
values = append ( values , apower )
}
// sorting the values in ascending order.
sort . Float64s ( values )
// Calculating the median
if median , err := stats . Median ( values ) ; err == nil {
return median
}
// should never reach here. If here, then just setting the cap value to be 100
return 100.0
2016-11-10 19:57:36 -05:00
}
2016-11-17 21:51:02 -05:00
/ *
2016-11-25 16:05:55 -05:00
A recapping strategy which decides between 2 different recapping schemes .
1. the regular scheme based on the average power usage across the cluster .
2. A scheme based on the average of the loads on each node in the cluster .
2016-11-17 21:51:02 -05:00
2016-11-25 16:05:55 -05:00
The recap value picked the least among the two .
2016-11-25 17:42:08 -05:00
The cleverRecap scheme works well when the cluster is relatively idle and until then ,
the primitive recapping scheme works better .
2016-11-17 21:51:02 -05:00
* /
2016-11-28 17:18:33 -05:00
func ( capper clusterwideCapper ) cleverRecap ( totalPower map [ string ] float64 ,
taskMonitor map [ string ] [ ] def . Task , finishedTaskId string ) ( float64 , error ) {
2016-11-17 21:51:02 -05:00
// Validation
2016-11-28 17:18:33 -05:00
if totalPower == nil || taskMonitor == nil {
return 100.0 , errors . New ( "Invalid argument: totalPower, taskMonitor" )
2016-11-17 21:51:02 -05:00
}
2016-11-25 16:05:55 -05:00
// determining the recap value by calling the regular recap(...)
toggle := false
2016-11-28 17:18:33 -05:00
recapValue , err := capper . recap ( totalPower , taskMonitor , finishedTaskId )
2016-11-25 16:05:55 -05:00
if err == nil {
toggle = true
}
2016-11-17 21:51:02 -05:00
// watts usage on each node in the cluster.
2016-11-28 17:18:33 -05:00
wattsUsages := make ( map [ string ] [ ] float64 )
hostOfFinishedTask := ""
indexOfFinishedTask := - 1
2016-11-17 21:51:02 -05:00
for _ , host := range constants . Hosts {
2016-11-28 17:18:33 -05:00
wattsUsages [ host ] = [ ] float64 { 0.0 }
2016-11-17 21:51:02 -05:00
}
2016-11-28 17:18:33 -05:00
for host , tasks := range taskMonitor {
2016-11-17 21:51:02 -05:00
for i , task := range tasks {
2016-11-28 17:18:33 -05:00
if task . TaskID == finishedTaskId {
hostOfFinishedTask = host
indexOfFinishedTask = i
// Not considering this task for the computation of totalAllocatedPower and totalRunningTasks
2016-11-25 17:42:08 -05:00
continue
}
2016-11-28 17:18:33 -05:00
wattsUsages [ host ] = append ( wattsUsages [ host ] , float64 ( task . Watts ) * constants . CapMargin )
2016-11-17 21:51:02 -05:00
}
}
2016-11-25 16:05:55 -05:00
// Updating task monitor. If recap(...) has deleted the finished task from the taskMonitor,
2016-11-25 17:42:08 -05:00
// then this will be ignored. Else (this is only when an error occured with recap(...)), we remove it here.
2016-11-28 17:18:33 -05:00
if hostOfFinishedTask != "" && indexOfFinishedTask != - 1 {
2016-11-25 17:42:08 -05:00
log . Printf ( "Removing task with task [%s] from the list of running tasks\n" ,
2016-11-28 17:18:33 -05:00
taskMonitor [ hostOfFinishedTask ] [ indexOfFinishedTask ] . TaskID )
taskMonitor [ hostOfFinishedTask ] = append ( taskMonitor [ hostOfFinishedTask ] [ : indexOfFinishedTask ] ,
taskMonitor [ hostOfFinishedTask ] [ indexOfFinishedTask + 1 : ] ... )
2016-11-25 17:42:08 -05:00
}
2016-11-25 16:05:55 -05:00
2016-11-25 17:42:08 -05:00
// Need to check whether there are still tasks running on the cluster. If not then we return an error.
clusterIdle := true
2016-11-28 17:18:33 -05:00
for _ , tasks := range taskMonitor {
2016-11-25 17:42:08 -05:00
if len ( tasks ) > 0 {
clusterIdle = false
}
}
2016-11-17 21:51:02 -05:00
2016-11-25 17:42:08 -05:00
if ! clusterIdle {
// load on each node in the cluster.
2016-11-25 16:05:55 -05:00
loads := [ ] float64 { 0.0 }
2016-11-28 17:18:33 -05:00
for host , usages := range wattsUsages {
totalUsage := 0.0
2016-11-25 16:05:55 -05:00
for _ , usage := range usages {
2016-11-28 17:18:33 -05:00
totalUsage += usage
2016-11-25 16:05:55 -05:00
}
2016-11-28 17:18:33 -05:00
loads = append ( loads , totalUsage / totalPower [ host ] )
2016-11-25 16:05:55 -05:00
}
// Now need to compute the average load.
2016-11-28 17:18:33 -05:00
totalLoad := 0.0
2016-11-25 16:05:55 -05:00
for _ , load := range loads {
2016-11-28 17:18:33 -05:00
totalLoad += load
2016-11-25 16:05:55 -05:00
}
2016-11-28 17:18:33 -05:00
averageLoad := ( totalLoad / float64 ( len ( loads ) ) * 100.0 ) // this would be the cap value.
2016-11-25 16:05:55 -05:00
// If toggle is true, then we need to return the least recap value.
if toggle {
2016-11-28 17:18:33 -05:00
if averageLoad <= recapValue {
return averageLoad , nil
2016-11-25 16:05:55 -05:00
} else {
return recapValue , nil
}
} else {
2016-11-28 17:18:33 -05:00
return averageLoad , nil
2016-11-17 21:51:02 -05:00
}
}
2016-11-25 16:05:55 -05:00
return 100.0 , errors . New ( "No task running on the cluster." )
2016-11-17 21:51:02 -05:00
}
2016-11-14 22:43:05 -05:00
/ *
Recapping the entire cluster .
1. Remove the task that finished from the list of running tasks .
2. Compute the average allocated power of each of the tasks that are currently running .
3. For each host , determine the ratio of the average to the total power .
4. Determine the median of the ratios and this would be the new cluster wide cap .
This needs to be called whenever a task finishes execution .
* /
2016-11-28 17:18:33 -05:00
func ( capper clusterwideCapper ) recap ( totalPower map [ string ] float64 ,
taskMonitor map [ string ] [ ] def . Task , finishedTaskId string ) ( float64 , error ) {
2016-11-14 22:53:06 -05:00
// Validation
2016-11-28 17:18:33 -05:00
if totalPower == nil || taskMonitor == nil {
return 100.0 , errors . New ( "Invalid argument: totalPower, taskMonitor" )
2016-11-14 22:53:06 -05:00
}
2016-11-28 17:18:33 -05:00
totalAllocatedPower := 0.0
totalRunningTasks := 0
2016-11-17 21:51:02 -05:00
2016-11-28 17:18:33 -05:00
hostOfFinishedTask := ""
indexOfFinishedTask := - 1
for host , tasks := range taskMonitor {
2016-11-25 17:42:08 -05:00
for i , task := range tasks {
2016-11-28 17:18:33 -05:00
if task . TaskID == finishedTaskId {
hostOfFinishedTask = host
indexOfFinishedTask = i
// Not considering this task for the computation of totalAllocatedPower and totalRunningTasks
2016-11-25 17:42:08 -05:00
continue
}
2016-11-28 17:18:33 -05:00
totalAllocatedPower += ( float64 ( task . Watts ) * constants . CapMargin )
totalRunningTasks ++
2016-11-25 17:42:08 -05:00
}
}
2016-11-17 21:51:02 -05:00
2016-11-25 17:42:08 -05:00
// Updating task monitor
2016-11-28 17:18:33 -05:00
if hostOfFinishedTask != "" && indexOfFinishedTask != - 1 {
2016-11-25 17:42:08 -05:00
log . Printf ( "Removing task with task [%s] from the list of running tasks\n" ,
2016-11-28 17:18:33 -05:00
taskMonitor [ hostOfFinishedTask ] [ indexOfFinishedTask ] . TaskID )
taskMonitor [ hostOfFinishedTask ] = append ( taskMonitor [ hostOfFinishedTask ] [ : indexOfFinishedTask ] ,
taskMonitor [ hostOfFinishedTask ] [ indexOfFinishedTask + 1 : ] ... )
2016-11-25 17:42:08 -05:00
}
2016-11-17 21:51:02 -05:00
2016-11-28 17:18:33 -05:00
// For the last task, totalAllocatedPower and totalRunningTasks would be 0
if totalAllocatedPower == 0 && totalRunningTasks == 0 {
2016-11-25 17:42:08 -05:00
return 100 , errors . New ( "No task running on the cluster." )
}
2016-11-17 21:51:02 -05:00
2016-11-28 17:18:33 -05:00
average := totalAllocatedPower / float64 ( totalRunningTasks )
2016-11-14 22:53:06 -05:00
ratios := [ ] float64 { }
2016-11-28 17:18:33 -05:00
for _ , tpower := range totalPower {
2016-11-14 22:53:06 -05:00
ratios = append ( ratios , ( average / tpower ) * 100 )
}
sort . Float64s ( ratios )
median , err := stats . Median ( ratios )
if err == nil {
return median , nil
} else {
return 100 , err
}
2016-11-14 22:43:05 -05:00
}
/* Quick sort algorithm to sort tasks, in place, in ascending order of power.*/
2016-11-28 17:18:33 -05:00
func ( capper clusterwideCapper ) quickSort ( low int , high int , tasksToSort * [ ] def . Task ) {
2016-11-14 22:53:06 -05:00
i := low
j := high
// calculating the pivot
2016-11-28 17:18:33 -05:00
pivotIndex := low + ( high - low ) / 2
pivot := ( * tasksToSort ) [ pivotIndex ]
2016-11-14 22:53:06 -05:00
for i <= j {
2016-11-28 17:18:33 -05:00
for ( * tasksToSort ) [ i ] . Watts < pivot . Watts {
2016-11-14 22:53:06 -05:00
i ++
}
2016-11-28 17:18:33 -05:00
for ( * tasksToSort ) [ j ] . Watts > pivot . Watts {
2016-11-14 22:53:06 -05:00
j --
}
if i <= j {
2016-11-28 17:18:33 -05:00
temp := ( * tasksToSort ) [ i ]
( * tasksToSort ) [ i ] = ( * tasksToSort ) [ j ]
( * tasksToSort ) [ j ] = temp
2016-11-14 22:53:06 -05:00
i ++
j --
}
}
if low < j {
2016-11-28 17:18:33 -05:00
capper . quickSort ( low , j , tasksToSort )
2016-11-14 22:53:06 -05:00
}
if i < high {
2016-11-28 17:18:33 -05:00
capper . quickSort ( i , high , tasksToSort )
2016-11-14 22:53:06 -05:00
}
2016-11-10 19:57:36 -05:00
}
// Sorting tasks in ascending order of requested watts.
2016-11-28 17:18:33 -05:00
func ( capper clusterwideCapper ) sortTasks ( tasksToSort * [ ] def . Task ) {
capper . quickSort ( 0 , len ( * tasksToSort ) - 1 , tasksToSort )
2016-11-10 19:57:36 -05:00
}
/ *
Remove entry for finished task .
2016-11-14 22:43:05 -05:00
This function is called when a task completes .
This completed task needs to be removed from the window of tasks ( if it is still present )
2016-11-10 19:57:36 -05:00
so that it doesn ' t contribute to the computation of the cap value .
* /
func ( capper clusterwideCapper ) taskFinished ( taskID string ) {
2016-11-14 22:53:06 -05:00
// If the window is empty the just return. This condition should technically return false.
2016-11-28 17:18:33 -05:00
if capper . windowOfTasks . Len ( ) == 0 {
2016-11-14 22:53:06 -05:00
return
}
2016-11-10 19:57:36 -05:00
2016-11-14 22:53:06 -05:00
// Checking whether the task with the given taskID is currently present in the window of tasks.
2016-11-28 17:18:33 -05:00
var taskElementToRemove * list . Element
for taskElement := capper . windowOfTasks . Front ( ) ; taskElement != nil ; taskElement = taskElement . Next ( ) {
if tsk , ok := taskElement . Value . ( * def . Task ) ; ok {
2016-11-14 22:53:06 -05:00
if tsk . TaskID == taskID {
2016-11-28 17:18:33 -05:00
taskElementToRemove = taskElement
2016-11-14 22:53:06 -05:00
}
}
}
2016-11-10 19:57:36 -05:00
2016-11-17 21:51:02 -05:00
// we need to remove the task from the window.
2016-11-28 17:18:33 -05:00
if taskToRemove , ok := taskElementToRemove . Value . ( * def . Task ) ; ok {
capper . windowOfTasks . Remove ( taskElementToRemove )
capper . numberOfTasksInWindow -= 1
capper . currentSum -= float64 ( taskToRemove . Watts ) * constants . CapMargin
2016-11-14 22:53:06 -05:00
}
2016-11-10 19:57:36 -05:00
}
2016-11-14 22:43:05 -05:00
// First come first serve scheduling.
2016-11-28 17:18:33 -05:00
func ( capper clusterwideCapper ) fcfsDetermineCap ( totalPower map [ string ] float64 ,
newTask * def . Task ) ( float64 , error ) {
2016-11-14 22:53:06 -05:00
// Validation
2016-11-28 17:18:33 -05:00
if totalPower == nil {
return 100 , errors . New ( "Invalid argument: totalPower" )
2016-11-14 22:53:06 -05:00
} else {
// Need to calculate the running average
2016-11-28 17:18:33 -05:00
runningAverage := capper . runningAverageOfWatts ( newTask )
2016-11-14 22:53:06 -05:00
// For each node, calculate the percentage of the running average to the total power.
2016-11-28 17:18:33 -05:00
runningAverageToTotalPowerPercentage := make ( map [ string ] float64 )
for host , tpower := range totalPower {
if tpower >= runningAverage {
runningAverageToTotalPowerPercentage [ host ] = ( runningAverage / tpower ) * 100
2016-11-14 22:53:06 -05:00
} else {
// We don't consider this host for the computation of the cluster wide cap.
}
}
2016-11-10 19:57:36 -05:00
2016-11-14 22:53:06 -05:00
// Determine the cluster wide cap value.
2016-11-28 17:18:33 -05:00
capValue := capper . getCap ( runningAverageToTotalPowerPercentage )
2016-11-14 22:53:06 -05:00
// Need to cap the cluster to this value.
2016-11-28 17:18:33 -05:00
return capValue , nil
2016-11-14 22:53:06 -05:00
}
2016-11-10 19:57:36 -05:00
}
// Stringer for an instance of clusterwideCapper
func ( capper clusterwideCapper ) string ( ) string {
2016-11-14 22:53:06 -05:00
return "Cluster Capper -- Proactively cap the entire cluster."
2016-11-10 19:57:36 -05:00
}