Proactive cluster wide capper that defines two types of schedulers. First fit scheduling, that uses running average of task.Watts to calculate the cluster wide cap, and Ranked based cluster wide capper that ranks the tasks (sorts) based on the watts required and then performs fcfs in the sorted order.

This commit is contained in:
Pradyumna Kaushik 2016-11-10 19:57:36 -05:00 committed by Renan DelValle
parent 10824a8520
commit d8710385be

View file

@ -0,0 +1,244 @@
/*
Cluster wide dynamic capping
Step1. Compute running average of tasks in window.
Step2. Compute what percentage of available power of each node, is the running average.
Step3. Compute the median of the percentages and this is the percentage that the cluster needs to be cpaped at.
1. First Fit Scheduling -- Perform the above steps for each task that needs to be scheduled.
2. Rank based Scheduling -- Sort a set of tasks to be scheduled, in ascending order of power, and then perform the above steps for each of them in the sorted order.
This is not a scheduler but a scheduling scheme that schedulers can use.
*/
package schedulers
import (
"bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/def"
"container/list"
"errors"
"github.com/montanaflynn/stats"
"sort"
"sync"
)
// Structure containing utility data structures used to compute cluster-wide dynamic cap.
type clusterwideCapper struct {
// window of tasks.
window_of_tasks list.list
// The current sum of requested powers of the tasks in the window.
current_sum float64
// The current number of tasks in the window.
number_of_tasks_in_window int
}
// Defining constructor for clusterwideCapper. Please don't call this directly and instead use getClusterwideCapperInstance().
func newClusterwideCapper() *clusterwideCapper {
return &clusterwideCapper{current_sum: 0.0, number_of_tasks_in_window: 0}
}
// For locking on operations that may result in race conditions.
var clusterwide_capper_mutex sync.Mutex
// Singleton instance of clusterwideCapper
var singleton_capper *clusterwideCapper
// Retrieve the singleton instance of clusterwideCapper.
func getClusterwideCapperInstance() *clusterwideCapper {
if singleton_capper == nil {
clusterwide_capper_mutex.Lock()
singleton_capper = newClusterwideCapper()
clusterwide_capper_mutex.Unlock()
} else {
// Do nothing
}
return singleton_capper
}
// Clear and initialize all the members of clusterwideCapper.
func (capper clusterwideCapper) clear() {
capper.window_of_tasks.Init()
capper.current_sum = 0
capper.number_of_tasks_in_window = 0
}
// Compute the average of watts of all the tasks in the window.
func (capper clusterwideCapper) average() float64 {
return capper.current_sum / float64(capper.window_of_tasks.Len())
}
/*
Compute the running average.
Using clusterwideCapper#window_of_tasks to store the tasks.
Task at position 0 (oldest task) is removed when the window is full and new task arrives.
*/
func (capper clusterwideCapper) running_average_of_watts(tsk *def.Task) float64 {
var average float64
if capper.number_of_tasks_in_window < constants.Window_size {
capper.window_of_tasks.PushBack(tsk)
capper.number_of_tasks_in_window++
capper.current_sum += float64(tsk.Watts)
} else {
task_to_remove_element := capper.window_of_tasks.Front()
if task_to_remove, ok := task_to_remove_element.Value.(*def.Task); ok {
capper.current_sum -= float64(task_to_remove.Watts)
capper.window_of_tasks.Remove(task_to_remove_element)
}
capper.window_of_tasks.PushBack(tsk)
capper.current_sum += float64(tsk.Watts)
}
average = capper.average()
return average
}
/*
Calculating cap value.
1. Sorting the values of running_average_available_power_percentage in ascending order.
2. Computing the median of the above sorted values.
3. The median is now the cap value.
*/
func (capper clusterwideCapper) get_cap(running_average_available_power_percentage map[string]float64) float64 {
var values []float64
// Validation
if running_average_available_power_percentage == nil {
return 100.0
}
for _, apower := range running_average_available_power_percentage {
values = append(values, apower)
}
// sorting the values in ascending order
sort.Float64s(values)
// Calculating the median
if median, err := stats.Median(values); err == nil {
return median
}
// should never reach here. If here, then just setting the cap value to be 100
return 100.0
}
/* Quick sort algorithm to sort tasks, in place,
in ascending order of power.*/
func (capper clusterwideCapper) quick_sort(low int, high int, tasks_to_sort []*def.Task) {
i := low
j := high
// calculating the pivot
pivot_index := low + (high - low)/2
pivot := tasks_to_sort[pivot_index]
for i <= j {
for tasks_to_sort[i].Watts < pivot.Watts {
i++
}
for tasks_to_sort[j].Watts > pivot.Watts {
j--
}
if i <= j {
temp := tasks_to_sort[i]
tasks_to_sort[i] = tasks_to_sort[j]
tasks_to_sort[j] = temp
i++
j--
}
}
if low < j {
capper.quick_sort(low, j, tasks_to_sort)
}
if i < high {
capper.quick_sort(i, high, tasks_to_sort)
}
}
// Sorting tasks in ascending order of requested watts.
func (capper clusterwideCapper) sort_tasks(tasks_to_sort []*def.Task) {
capper.quick_sort(0, len(tasks_to_sort)-1, tasks_to_sort)
}
/*
Remove entry for finished task.
This function is called when a task completes. This completed task needs to be removed from the window of tasks (if it is still present)
so that it doesn't contribute to the computation of the cap value.
*/
func (capper clusterwideCapper) taskFinished(taskID string) {
// If the window is empty the just return. This condition should technically return false.
if capper.window_of_tasks.Len() == 0 {
return
}
// Checking whether the task with the given taskID is currently present in the window of tasks.
var task_element_to_remove *list.Element
for task_element := capper.window_of_tasks.Front(); task_element != nil; task_element = task_element.Next() {
if tsk, ok := task_element.Value.(*def.Task); ok {
if task.TaskID == taskID {
task_element_to_remove = task_element
}
}
}
// If finished task is there in the window of tasks, then we need to remove the task from the same and modify the members of clusterwideCapper accordingly.
if task_to_remove, ok := task_element_to_remove.Value.(*def.Task); ok {
capper.window_of_tasks.Remove(task_element_to_remove)
capper.number_of_tasks_in_window -= 1
capper.current_sum -= float64(task_to_remove.Watts)
}
}
// Ranked based scheduling.
func (capper clusterwideCapper) rankedDetermineCap(available_power map[string]float64,
tasks_to_schedule []*def.Task) ([]*def.Task, map[string]float64, error) {
// Validation
if available_power == nil || len(tasks_to_schedule) == 0 {
return nil, nil, errors.New("Invalid argument: available_power, tasks_to_schedule")
} else {
// Need to sort the tasks in ascending order of requested power.
capper.sort_tasks(tasks_to_schedule)
// Now, for each task in the sorted set of tasks, we need to use the Fcfs_determine_cap logic.
cluster_wide_cap_values := make(map[int]float64)
index := 0
for _, tsk := range tasks_to_schedule {
/*
Note that even though Fcfs_determine_cap is called, we have sorted the tasks aprior and thus, the tasks are scheduled in the sorted fashion.
Calling Fcfs_determine_cap(...) just to avoid redundant code.
*/
if cap, err := capper.fcfsDetermineCap(available_power, tsk); err == nil {
cluster_wide_cap_values[index] = cap
} else {
return nil, nil, err
}
index++
}
// Now returning the sorted set of tasks and the cluster wide cap values for each task that is launched.
return tasks_to_schedule, cluster_wide_cap_values, nil
}
}
// First come first serve shceduling.
func (capper clusterwideCapper) fcfsDetermineCap(available_power map[string]float64, new_task *def.Task) (float64, error) {
// Validation
if available_power == nil {
return 100, errors.New("Invalid argument: available_power")
} else {
clusterwide_capper_mutex.Lock()
// Need to calculate the running average
running_average := capper.running_average_of_watts(new_task)
// What percent of available_power for each node is the running average.
running_average_available_power_percentage := make(map[string]float64)
for host, apower := range available_power {
if apower >= running_average {
running_average_available_power_percentage[host] = (running_average/apower) * 100
} else {
// We don't consider this host in the offers.
}
}
// Determine the cluster wide cap value.
cap_value := capper.get_cap(running_average_available_power_percentage)
// Need to cap the cluster to this value before launching the next task.
clusterwide_capper_mutex.Unlock()
return cap_value, nil
}
}
// Stringer for an instance of clusterwideCapper
func (capper clusterwideCapper) string() string {
return "Clusterwide Capper -- Proactively cap the entire cluster."
}