The computation of the cluster wide cap now considers total power per node rather than the available power per node. Also, added function recap(...) that is called to compute the cluster wide cap once a task completes. This value is used to change the cluster wide cap once a task completes."
This commit is contained in:
parent
5dc3baab55
commit
3551de20da
1 changed files with 83 additions and 47 deletions
|
@ -1,24 +1,22 @@
|
||||||
/*
|
/*
|
||||||
Cluster wide dynamic capping
|
Cluster wide dynamic capping
|
||||||
Step1. Compute running average of tasks in window.
|
Step1. Compute the running average of watts of tasks in window.
|
||||||
Step2. Compute what percentage of available power of each node, is the running average.
|
Step2. Compute what percentage of total power of each node, is the running average.
|
||||||
Step3. Compute the median of the percentages and this is the percentage that the cluster needs to be cpaped at.
|
Step3. Compute the median of the percetages and this is the percentage that the cluster needs to be capped at.
|
||||||
|
|
||||||
1. First Fit Scheduling -- Perform the above steps for each task that needs to be scheduled.
|
1. First fit scheduling -- Perform the above steps for each task that needs to be scheduled.
|
||||||
2. Rank based Scheduling -- Sort a set of tasks to be scheduled, in ascending order of power, and then perform the above steps for each of them in the sorted order.
|
|
||||||
|
|
||||||
This is not a scheduler but a scheduling scheme that schedulers can use.
|
This is not a scheduler but a scheduling scheme that schedulers can use.
|
||||||
*/
|
*/
|
||||||
package schedulers
|
package schedulers
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bitbucket.org/sunybingcloud/electron/constants"
|
"bitbucket.org/sunybingcloud/electron/constants"
|
||||||
"bitbucket.org/sunybingcloud/electron/def"
|
"bitbucket.org/sunybingcloud/electron/def"
|
||||||
"container/list"
|
"container/list"
|
||||||
"errors"
|
"errors"
|
||||||
"github.com/montanaflynn/stats"
|
"github.com/montanaflynn/stats"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Structure containing utility data structures used to compute cluster-wide dynamic cap.
|
// Structure containing utility data structures used to compute cluster-wide dynamic cap.
|
||||||
|
@ -36,17 +34,12 @@ func newClusterwideCapper() *clusterwideCapper {
|
||||||
return &clusterwideCapper{current_sum: 0.0, number_of_tasks_in_window: 0}
|
return &clusterwideCapper{current_sum: 0.0, number_of_tasks_in_window: 0}
|
||||||
}
|
}
|
||||||
|
|
||||||
// For locking on operations that may result in race conditions.
|
|
||||||
var clusterwide_capper_mutex sync.Mutex
|
|
||||||
|
|
||||||
// Singleton instance of clusterwideCapper
|
// Singleton instance of clusterwideCapper
|
||||||
var singleton_capper *clusterwideCapper
|
var singleton_capper *clusterwideCapper
|
||||||
// Retrieve the singleton instance of clusterwideCapper.
|
// Retrieve the singleton instance of clusterwideCapper.
|
||||||
func getClusterwideCapperInstance() *clusterwideCapper {
|
func getClusterwideCapperInstance() *clusterwideCapper {
|
||||||
if singleton_capper == nil {
|
if singleton_capper == nil {
|
||||||
clusterwide_capper_mutex.Lock()
|
|
||||||
singleton_capper = newClusterwideCapper()
|
singleton_capper = newClusterwideCapper()
|
||||||
clusterwide_capper_mutex.Unlock()
|
|
||||||
} else {
|
} else {
|
||||||
// Do nothing
|
// Do nothing
|
||||||
}
|
}
|
||||||
|
@ -76,15 +69,15 @@ func (capper clusterwideCapper) running_average_of_watts(tsk *def.Task) float64
|
||||||
if capper.number_of_tasks_in_window < constants.Window_size {
|
if capper.number_of_tasks_in_window < constants.Window_size {
|
||||||
capper.window_of_tasks.PushBack(tsk)
|
capper.window_of_tasks.PushBack(tsk)
|
||||||
capper.number_of_tasks_in_window++
|
capper.number_of_tasks_in_window++
|
||||||
capper.current_sum += float64(tsk.Watts)
|
capper.current_sum += float64(tsk.Watts) * constants.Cap_margin
|
||||||
} else {
|
} else {
|
||||||
task_to_remove_element := capper.window_of_tasks.Front()
|
task_to_remove_element := capper.window_of_tasks.Front()
|
||||||
if task_to_remove, ok := task_to_remove_element.Value.(*def.Task); ok {
|
if task_to_remove, ok := task_to_remove_element.Value.(*def.Task); ok {
|
||||||
capper.current_sum -= float64(task_to_remove.Watts)
|
capper.current_sum -= float64(task_to_remove.Watts) * constants.Cap_margin
|
||||||
capper.window_of_tasks.Remove(task_to_remove_element)
|
capper.window_of_tasks.Remove(task_to_remove_element)
|
||||||
}
|
}
|
||||||
capper.window_of_tasks.PushBack(tsk)
|
capper.window_of_tasks.PushBack(tsk)
|
||||||
capper.current_sum += float64(tsk.Watts)
|
capper.current_sum += float64(tsk.Watts) * constants.Cap_margin
|
||||||
}
|
}
|
||||||
average = capper.average()
|
average = capper.average()
|
||||||
return average
|
return average
|
||||||
|
@ -93,20 +86,20 @@ func (capper clusterwideCapper) running_average_of_watts(tsk *def.Task) float64
|
||||||
/*
|
/*
|
||||||
Calculating cap value.
|
Calculating cap value.
|
||||||
|
|
||||||
1. Sorting the values of running_average_available_power_percentage in ascending order.
|
1. Sorting the values of running_average_to_total_power_percentage in ascending order.
|
||||||
2. Computing the median of the above sorted values.
|
2. Computing the median of above sorted values.
|
||||||
3. The median is now the cap value.
|
3. The median is now the cap.
|
||||||
*/
|
*/
|
||||||
func (capper clusterwideCapper) get_cap(running_average_available_power_percentage map[string]float64) float64 {
|
func (capper clusterwideCapper) get_cap(running_average_to_total_power_percentage map[string]float64) float64 {
|
||||||
var values []float64
|
var values []float64
|
||||||
// Validation
|
// Validation
|
||||||
if running_average_available_power_percentage == nil {
|
if running_average_to_total_power_percentage == nil {
|
||||||
return 100.0
|
return 100.0
|
||||||
}
|
}
|
||||||
for _, apower := range running_average_available_power_percentage {
|
for _, apower := range running_average_to_total_power_percentage {
|
||||||
values = append(values, apower)
|
values = append(values, apower)
|
||||||
}
|
}
|
||||||
// sorting the values in ascending order
|
// sorting the values in ascending order.
|
||||||
sort.Float64s(values)
|
sort.Float64s(values)
|
||||||
// Calculating the median
|
// Calculating the median
|
||||||
if median, err := stats.Median(values); err == nil {
|
if median, err := stats.Median(values); err == nil {
|
||||||
|
@ -116,8 +109,51 @@ func (capper clusterwideCapper) get_cap(running_average_available_power_percenta
|
||||||
return 100.0
|
return 100.0
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Quick sort algorithm to sort tasks, in place,
|
/*
|
||||||
in ascending order of power.*/
|
Recapping the entire cluster.
|
||||||
|
|
||||||
|
1. Remove the task that finished from the list of running tasks.
|
||||||
|
2. Compute the average allocated power of each of the tasks that are currently running.
|
||||||
|
3. For each host, determine the ratio of the average to the total power.
|
||||||
|
4. Determine the median of the ratios and this would be the new cluster wide cap.
|
||||||
|
|
||||||
|
This needs to be called whenever a task finishes execution.
|
||||||
|
*/
|
||||||
|
func (capper clusterwideCapper) recap(total_power map[string]float64,
|
||||||
|
task_monitor map[string][]def.Task, finished_taskId string) (float64, error) {
|
||||||
|
// Validation
|
||||||
|
if total_power == nil || task_monitor == nil {
|
||||||
|
return 100.0, errors.New("Invalid argument: total_power, task_monitor")
|
||||||
|
}
|
||||||
|
total_allocated_power := 0.0
|
||||||
|
total_running_tasks := 0
|
||||||
|
for _, tasks := range task_monitor {
|
||||||
|
index := 0
|
||||||
|
for i, task := range tasks {
|
||||||
|
if task.TaskID == finished_taskId {
|
||||||
|
index = i
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
total_allocated_power += float64(task.Watts) * constants.Cap_margin
|
||||||
|
total_running_tasks++
|
||||||
|
}
|
||||||
|
tasks = append(tasks[:index], tasks[index+1:]...)
|
||||||
|
}
|
||||||
|
average := total_allocated_power / float64(total_running_tasks)
|
||||||
|
ratios := []float64{}
|
||||||
|
for _, tpower := range total_power {
|
||||||
|
ratios = append(ratios, (average/tpower) * 100)
|
||||||
|
}
|
||||||
|
sort.Float64s(ratios)
|
||||||
|
median, err := stats.Median(ratios)
|
||||||
|
if err == nil {
|
||||||
|
return median, nil
|
||||||
|
} else {
|
||||||
|
return 100, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Quick sort algorithm to sort tasks, in place, in ascending order of power.*/
|
||||||
func (capper clusterwideCapper) quick_sort(low int, high int, tasks_to_sort []*def.Task) {
|
func (capper clusterwideCapper) quick_sort(low int, high int, tasks_to_sort []*def.Task) {
|
||||||
i := low
|
i := low
|
||||||
j := high
|
j := high
|
||||||
|
@ -154,7 +190,8 @@ func (capper clusterwideCapper) sort_tasks(tasks_to_sort []*def.Task) {
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Remove entry for finished task.
|
Remove entry for finished task.
|
||||||
This function is called when a task completes. This completed task needs to be removed from the window of tasks (if it is still present)
|
This function is called when a task completes.
|
||||||
|
This completed task needs to be removed from the window of tasks (if it is still present)
|
||||||
so that it doesn't contribute to the computation of the cap value.
|
so that it doesn't contribute to the computation of the cap value.
|
||||||
*/
|
*/
|
||||||
func (capper clusterwideCapper) taskFinished(taskID string) {
|
func (capper clusterwideCapper) taskFinished(taskID string) {
|
||||||
|
@ -173,11 +210,11 @@ func (capper clusterwideCapper) taskFinished(taskID string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If finished task is there in the window of tasks, then we need to remove the task from the same and modify the members of clusterwideCapper accordingly.
|
// Ee need to remove the task from the window.
|
||||||
if task_to_remove, ok := task_element_to_remove.Value.(*def.Task); ok {
|
if task_to_remove, ok := task_element_to_remove.Value.(*def.Task); ok {
|
||||||
capper.window_of_tasks.Remove(task_element_to_remove)
|
capper.window_of_tasks.Remove(task_element_to_remove)
|
||||||
capper.number_of_tasks_in_window -= 1
|
capper.number_of_tasks_in_window -= 1
|
||||||
capper.current_sum -= float64(task_to_remove.Watts)
|
capper.current_sum -= float64(task_to_remove.Watts) * constants.Cap_margin
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -211,34 +248,33 @@ func (capper clusterwideCapper) rankedDetermineCap(available_power map[string]fl
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// First come first serve shceduling.
|
// First come first serve scheduling.
|
||||||
func (capper clusterwideCapper) fcfsDetermineCap(available_power map[string]float64, new_task *def.Task) (float64, error) {
|
func (capper clusterwideCapper) fcfsDetermineCap(total_power map[string]float64,
|
||||||
|
new_task *def.Task) (float64, error) {
|
||||||
// Validation
|
// Validation
|
||||||
if available_power == nil {
|
if total_power == nil {
|
||||||
return 100, errors.New("Invalid argument: available_power")
|
return 100, errors.New("Invalid argument: total_power")
|
||||||
} else {
|
} else {
|
||||||
clusterwide_capper_mutex.Lock()
|
|
||||||
// Need to calculate the running average
|
// Need to calculate the running average
|
||||||
running_average := capper.running_average_of_watts(new_task)
|
running_average := capper.running_average_of_watts(new_task)
|
||||||
// What percent of available_power for each node is the running average.
|
// For each node, calculate the percentage of the running average to the total power.
|
||||||
running_average_available_power_percentage := make(map[string]float64)
|
running_average_to_total_power_percentage := make(map[string]float64)
|
||||||
for host, apower := range available_power {
|
for host, tpower := range total_power {
|
||||||
if apower >= running_average {
|
if tpower >= running_average {
|
||||||
running_average_available_power_percentage[host] = (running_average/apower) * 100
|
running_average_to_total_power_percentage[host] = (running_average/tpower) * 100
|
||||||
} else {
|
} else {
|
||||||
// We don't consider this host in the offers.
|
// We don't consider this host for the computation of the cluster wide cap.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Determine the cluster wide cap value.
|
// Determine the cluster wide cap value.
|
||||||
cap_value := capper.get_cap(running_average_available_power_percentage)
|
cap_value := capper.get_cap(running_average_to_total_power_percentage)
|
||||||
// Need to cap the cluster to this value before launching the next task.
|
// Need to cap the cluster to this value.
|
||||||
clusterwide_capper_mutex.Unlock()
|
|
||||||
return cap_value, nil
|
return cap_value, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stringer for an instance of clusterwideCapper
|
// Stringer for an instance of clusterwideCapper
|
||||||
func (capper clusterwideCapper) string() string {
|
func (capper clusterwideCapper) string() string {
|
||||||
return "Clusterwide Capper -- Proactively cap the entire cluster."
|
return "Cluster Capper -- Proactively cap the entire cluster."
|
||||||
}
|
}
|
||||||
|
|
Reference in a new issue