2016-11-10 19:57:36 -05:00
/ *
Cluster wide dynamic capping
2016-12-22 23:17:01 -05:00
this is not a scheduler but a scheduling scheme that schedulers can use .
2016-11-10 19:57:36 -05:00
* /
package schedulers
import (
2016-11-14 22:53:06 -05:00
"bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/def"
2016-12-20 14:56:07 -05:00
"bitbucket.org/sunybingcloud/electron/utilities/runAvg"
2016-11-14 22:53:06 -05:00
"errors"
"github.com/montanaflynn/stats"
2016-11-25 17:42:08 -05:00
"log"
2016-11-14 22:53:06 -05:00
"sort"
2016-11-10 19:57:36 -05:00
)
2016-12-20 14:56:07 -05:00
// wrapper around def.Task that implements runAvg.Interface
type taskWrapper struct {
task def . Task
}
func ( tw taskWrapper ) Val ( ) float64 {
return tw . task . Watts * constants . CapMargin
2016-11-10 19:57:36 -05:00
}
2016-12-20 14:56:07 -05:00
func ( tw taskWrapper ) ID ( ) string {
return tw . task . TaskID
}
2016-12-20 15:07:31 -05:00
// Cluster wide capper
2016-12-20 14:56:07 -05:00
type clusterwideCapper struct { }
// Defining constructor for clusterwideCapper. Please don't call this directly and instead use getClusterwideCapperInstance()
2016-11-10 19:57:36 -05:00
func newClusterwideCapper ( ) * clusterwideCapper {
2016-12-20 14:56:07 -05:00
return & clusterwideCapper { }
2016-11-10 19:57:36 -05:00
}
// Singleton instance of clusterwideCapper
2016-11-28 17:18:33 -05:00
var singletonCapper * clusterwideCapper
2016-11-14 22:53:06 -05:00
2016-11-10 19:57:36 -05:00
// Retrieve the singleton instance of clusterwideCapper.
func getClusterwideCapperInstance ( ) * clusterwideCapper {
2016-11-28 17:18:33 -05:00
if singletonCapper == nil {
singletonCapper = newClusterwideCapper ( )
2016-11-14 22:53:06 -05:00
} else {
// Do nothing
}
2016-11-28 17:18:33 -05:00
return singletonCapper
2016-11-10 19:57:36 -05:00
}
2016-12-20 14:56:07 -05:00
// Clear and initialize the runAvg calculator
2016-11-10 19:57:36 -05:00
func ( capper clusterwideCapper ) clear ( ) {
2016-12-20 14:56:07 -05:00
runAvg . Init ( )
2016-11-10 19:57:36 -05:00
}
/ *
Calculating cap value .
2016-12-07 18:47:37 -05:00
1. Sorting the values of ratios ( ( running average / totalPower ) per node ) in ascending order .
2016-11-14 22:43:05 -05:00
2. Computing the median of above sorted values .
3. The median is now the cap .
2016-11-10 19:57:36 -05:00
* /
2016-12-07 18:47:37 -05:00
func ( capper clusterwideCapper ) getCap ( ratios map [ string ] float64 ) float64 {
2016-11-14 22:53:06 -05:00
var values [ ] float64
// Validation
2016-12-07 18:47:37 -05:00
if ratios == nil {
2016-11-14 22:53:06 -05:00
return 100.0
}
2016-12-07 18:47:37 -05:00
for _ , apower := range ratios {
2016-11-14 22:53:06 -05:00
values = append ( values , apower )
}
// sorting the values in ascending order.
sort . Float64s ( values )
// Calculating the median
if median , err := stats . Median ( values ) ; err == nil {
return median
}
// should never reach here. If here, then just setting the cap value to be 100
return 100.0
2016-11-10 19:57:36 -05:00
}
2016-11-17 21:51:02 -05:00
/ *
2016-11-25 16:05:55 -05:00
A recapping strategy which decides between 2 different recapping schemes .
1. the regular scheme based on the average power usage across the cluster .
2. A scheme based on the average of the loads on each node in the cluster .
2016-11-17 21:51:02 -05:00
2016-11-25 16:05:55 -05:00
The recap value picked the least among the two .
2016-11-25 17:42:08 -05:00
The cleverRecap scheme works well when the cluster is relatively idle and until then ,
the primitive recapping scheme works better .
2016-11-17 21:51:02 -05:00
* /
2016-11-28 17:18:33 -05:00
func ( capper clusterwideCapper ) cleverRecap ( totalPower map [ string ] float64 ,
taskMonitor map [ string ] [ ] def . Task , finishedTaskId string ) ( float64 , error ) {
2016-11-17 21:51:02 -05:00
// Validation
2016-11-28 17:18:33 -05:00
if totalPower == nil || taskMonitor == nil {
return 100.0 , errors . New ( "Invalid argument: totalPower, taskMonitor" )
2016-11-17 21:51:02 -05:00
}
2016-11-25 16:05:55 -05:00
// determining the recap value by calling the regular recap(...)
toggle := false
2016-11-28 17:18:33 -05:00
recapValue , err := capper . recap ( totalPower , taskMonitor , finishedTaskId )
2016-11-25 16:05:55 -05:00
if err == nil {
toggle = true
}
2016-11-17 21:51:02 -05:00
// watts usage on each node in the cluster.
2016-11-28 17:18:33 -05:00
wattsUsages := make ( map [ string ] [ ] float64 )
hostOfFinishedTask := ""
indexOfFinishedTask := - 1
2016-11-17 21:51:02 -05:00
for _ , host := range constants . Hosts {
2016-11-28 17:18:33 -05:00
wattsUsages [ host ] = [ ] float64 { 0.0 }
2016-11-17 21:51:02 -05:00
}
2016-11-28 17:18:33 -05:00
for host , tasks := range taskMonitor {
2016-11-17 21:51:02 -05:00
for i , task := range tasks {
2016-11-28 17:18:33 -05:00
if task . TaskID == finishedTaskId {
hostOfFinishedTask = host
indexOfFinishedTask = i
// Not considering this task for the computation of totalAllocatedPower and totalRunningTasks
2016-11-25 17:42:08 -05:00
continue
}
2016-11-28 17:18:33 -05:00
wattsUsages [ host ] = append ( wattsUsages [ host ] , float64 ( task . Watts ) * constants . CapMargin )
2016-11-17 21:51:02 -05:00
}
}
2016-11-25 16:05:55 -05:00
// Updating task monitor. If recap(...) has deleted the finished task from the taskMonitor,
2016-11-25 17:42:08 -05:00
// then this will be ignored. Else (this is only when an error occured with recap(...)), we remove it here.
2016-11-28 17:18:33 -05:00
if hostOfFinishedTask != "" && indexOfFinishedTask != - 1 {
2016-11-25 17:42:08 -05:00
log . Printf ( "Removing task with task [%s] from the list of running tasks\n" ,
2016-11-28 17:18:33 -05:00
taskMonitor [ hostOfFinishedTask ] [ indexOfFinishedTask ] . TaskID )
taskMonitor [ hostOfFinishedTask ] = append ( taskMonitor [ hostOfFinishedTask ] [ : indexOfFinishedTask ] ,
taskMonitor [ hostOfFinishedTask ] [ indexOfFinishedTask + 1 : ] ... )
2016-11-25 17:42:08 -05:00
}
2016-11-25 16:05:55 -05:00
2016-11-25 17:42:08 -05:00
// Need to check whether there are still tasks running on the cluster. If not then we return an error.
clusterIdle := true
2016-11-28 17:18:33 -05:00
for _ , tasks := range taskMonitor {
2016-11-25 17:42:08 -05:00
if len ( tasks ) > 0 {
clusterIdle = false
}
}
2016-11-17 21:51:02 -05:00
2016-11-25 17:42:08 -05:00
if ! clusterIdle {
// load on each node in the cluster.
2016-11-25 16:05:55 -05:00
loads := [ ] float64 { 0.0 }
2016-11-28 17:18:33 -05:00
for host , usages := range wattsUsages {
totalUsage := 0.0
2016-11-25 16:05:55 -05:00
for _ , usage := range usages {
2016-11-28 17:18:33 -05:00
totalUsage += usage
2016-11-25 16:05:55 -05:00
}
2016-11-28 17:18:33 -05:00
loads = append ( loads , totalUsage / totalPower [ host ] )
2016-11-25 16:05:55 -05:00
}
// Now need to compute the average load.
2016-11-28 17:18:33 -05:00
totalLoad := 0.0
2016-11-25 16:05:55 -05:00
for _ , load := range loads {
2016-11-28 17:18:33 -05:00
totalLoad += load
2016-11-25 16:05:55 -05:00
}
2016-11-28 17:18:33 -05:00
averageLoad := ( totalLoad / float64 ( len ( loads ) ) * 100.0 ) // this would be the cap value.
2016-11-25 16:05:55 -05:00
// If toggle is true, then we need to return the least recap value.
if toggle {
2016-11-28 17:18:33 -05:00
if averageLoad <= recapValue {
return averageLoad , nil
2016-11-25 16:05:55 -05:00
} else {
return recapValue , nil
}
} else {
2016-11-28 17:18:33 -05:00
return averageLoad , nil
2016-11-17 21:51:02 -05:00
}
}
2016-11-25 16:05:55 -05:00
return 100.0 , errors . New ( "No task running on the cluster." )
2016-11-17 21:51:02 -05:00
}
2016-11-14 22:43:05 -05:00
/ *
Recapping the entire cluster .
1. Remove the task that finished from the list of running tasks .
2. Compute the average allocated power of each of the tasks that are currently running .
3. For each host , determine the ratio of the average to the total power .
4. Determine the median of the ratios and this would be the new cluster wide cap .
This needs to be called whenever a task finishes execution .
* /
2016-11-28 17:18:33 -05:00
func ( capper clusterwideCapper ) recap ( totalPower map [ string ] float64 ,
taskMonitor map [ string ] [ ] def . Task , finishedTaskId string ) ( float64 , error ) {
2016-11-14 22:53:06 -05:00
// Validation
2016-11-28 17:18:33 -05:00
if totalPower == nil || taskMonitor == nil {
return 100.0 , errors . New ( "Invalid argument: totalPower, taskMonitor" )
2016-11-14 22:53:06 -05:00
}
2016-11-28 17:18:33 -05:00
totalAllocatedPower := 0.0
totalRunningTasks := 0
2016-11-17 21:51:02 -05:00
2016-11-28 17:18:33 -05:00
hostOfFinishedTask := ""
indexOfFinishedTask := - 1
for host , tasks := range taskMonitor {
2016-11-25 17:42:08 -05:00
for i , task := range tasks {
2016-11-28 17:18:33 -05:00
if task . TaskID == finishedTaskId {
hostOfFinishedTask = host
indexOfFinishedTask = i
// Not considering this task for the computation of totalAllocatedPower and totalRunningTasks
2016-11-25 17:42:08 -05:00
continue
}
2016-11-28 17:18:33 -05:00
totalAllocatedPower += ( float64 ( task . Watts ) * constants . CapMargin )
totalRunningTasks ++
2016-11-25 17:42:08 -05:00
}
}
2016-11-17 21:51:02 -05:00
2016-11-25 17:42:08 -05:00
// Updating task monitor
2016-11-28 17:18:33 -05:00
if hostOfFinishedTask != "" && indexOfFinishedTask != - 1 {
2016-11-25 17:42:08 -05:00
log . Printf ( "Removing task with task [%s] from the list of running tasks\n" ,
2016-11-28 17:18:33 -05:00
taskMonitor [ hostOfFinishedTask ] [ indexOfFinishedTask ] . TaskID )
taskMonitor [ hostOfFinishedTask ] = append ( taskMonitor [ hostOfFinishedTask ] [ : indexOfFinishedTask ] ,
taskMonitor [ hostOfFinishedTask ] [ indexOfFinishedTask + 1 : ] ... )
2016-11-25 17:42:08 -05:00
}
2016-11-17 21:51:02 -05:00
2016-11-28 17:18:33 -05:00
// For the last task, totalAllocatedPower and totalRunningTasks would be 0
if totalAllocatedPower == 0 && totalRunningTasks == 0 {
2016-11-25 17:42:08 -05:00
return 100 , errors . New ( "No task running on the cluster." )
}
2016-11-17 21:51:02 -05:00
2016-11-28 17:18:33 -05:00
average := totalAllocatedPower / float64 ( totalRunningTasks )
2016-11-14 22:53:06 -05:00
ratios := [ ] float64 { }
2016-11-28 17:18:33 -05:00
for _ , tpower := range totalPower {
2016-11-14 22:53:06 -05:00
ratios = append ( ratios , ( average / tpower ) * 100 )
}
sort . Float64s ( ratios )
median , err := stats . Median ( ratios )
if err == nil {
return median , nil
} else {
return 100 , err
}
2016-11-14 22:43:05 -05:00
}
2016-11-10 19:57:36 -05:00
/ *
2016-12-20 14:56:07 -05:00
Remove entry for finished task from the window
This function is called when a task completes .
2016-11-14 22:43:05 -05:00
This completed task needs to be removed from the window of tasks ( if it is still present )
2016-12-20 14:56:07 -05:00
so that it doesn ' t contribute to the computation of the next cap value .
2016-11-10 19:57:36 -05:00
* /
func ( capper clusterwideCapper ) taskFinished ( taskID string ) {
2016-12-20 14:56:07 -05:00
runAvg . Remove ( taskID )
2016-11-10 19:57:36 -05:00
}
2016-11-14 22:43:05 -05:00
// First come first serve scheduling.
2016-11-28 17:18:33 -05:00
func ( capper clusterwideCapper ) fcfsDetermineCap ( totalPower map [ string ] float64 ,
newTask * def . Task ) ( float64 , error ) {
2016-11-14 22:53:06 -05:00
// Validation
2016-11-28 17:18:33 -05:00
if totalPower == nil {
return 100 , errors . New ( "Invalid argument: totalPower" )
2016-11-14 22:53:06 -05:00
} else {
// Need to calculate the running average
2016-12-20 14:56:07 -05:00
runningAverage := runAvg . Calc ( taskWrapper { task : * newTask } , constants . WindowSize )
2016-11-14 22:53:06 -05:00
// For each node, calculate the percentage of the running average to the total power.
2016-12-07 18:47:37 -05:00
ratios := make ( map [ string ] float64 )
2016-11-28 17:18:33 -05:00
for host , tpower := range totalPower {
if tpower >= runningAverage {
2016-12-07 18:47:37 -05:00
ratios [ host ] = ( runningAverage / tpower ) * 100
2016-11-14 22:53:06 -05:00
} else {
// We don't consider this host for the computation of the cluster wide cap.
}
}
2016-11-10 19:57:36 -05:00
2016-11-14 22:53:06 -05:00
// Determine the cluster wide cap value.
2016-12-07 18:47:37 -05:00
capValue := capper . getCap ( ratios )
2016-11-14 22:53:06 -05:00
// Need to cap the cluster to this value.
2016-11-28 17:18:33 -05:00
return capValue , nil
2016-11-14 22:53:06 -05:00
}
2016-11-10 19:57:36 -05:00
}
// Stringer for an instance of clusterwideCapper
func ( capper clusterwideCapper ) string ( ) string {
2016-11-14 22:53:06 -05:00
return "Cluster Capper -- Proactively cap the entire cluster."
2016-11-10 19:57:36 -05:00
}