diff --git a/schedulers/proactiveclusterwidecappers.go b/schedulers/proactiveclusterwidecappers.go index 65bdf2e..8d7a55e 100644 --- a/schedulers/proactiveclusterwidecappers.go +++ b/schedulers/proactiveclusterwidecappers.go @@ -1,24 +1,22 @@ /* Cluster wide dynamic capping -Step1. Compute running average of tasks in window. -Step2. Compute what percentage of available power of each node, is the running average. -Step3. Compute the median of the percentages and this is the percentage that the cluster needs to be cpaped at. +Step1. Compute the running average of watts of tasks in window. +Step2. Compute what percentage of total power of each node, is the running average. +Step3. Compute the median of the percetages and this is the percentage that the cluster needs to be capped at. -1. First Fit Scheduling -- Perform the above steps for each task that needs to be scheduled. -2. Rank based Scheduling -- Sort a set of tasks to be scheduled, in ascending order of power, and then perform the above steps for each of them in the sorted order. +1. First fit scheduling -- Perform the above steps for each task that needs to be scheduled. This is not a scheduler but a scheduling scheme that schedulers can use. */ package schedulers import ( - "bitbucket.org/sunybingcloud/electron/constants" - "bitbucket.org/sunybingcloud/electron/def" - "container/list" - "errors" - "github.com/montanaflynn/stats" - "sort" - "sync" + "bitbucket.org/sunybingcloud/electron/constants" + "bitbucket.org/sunybingcloud/electron/def" + "container/list" + "errors" + "github.com/montanaflynn/stats" + "sort" ) // Structure containing utility data structures used to compute cluster-wide dynamic cap. @@ -36,17 +34,12 @@ func newClusterwideCapper() *clusterwideCapper { return &clusterwideCapper{current_sum: 0.0, number_of_tasks_in_window: 0} } -// For locking on operations that may result in race conditions. -var clusterwide_capper_mutex sync.Mutex - // Singleton instance of clusterwideCapper var singleton_capper *clusterwideCapper // Retrieve the singleton instance of clusterwideCapper. func getClusterwideCapperInstance() *clusterwideCapper { if singleton_capper == nil { - clusterwide_capper_mutex.Lock() singleton_capper = newClusterwideCapper() - clusterwide_capper_mutex.Unlock() } else { // Do nothing } @@ -76,15 +69,15 @@ func (capper clusterwideCapper) running_average_of_watts(tsk *def.Task) float64 if capper.number_of_tasks_in_window < constants.Window_size { capper.window_of_tasks.PushBack(tsk) capper.number_of_tasks_in_window++ - capper.current_sum += float64(tsk.Watts) + capper.current_sum += float64(tsk.Watts) * constants.Cap_margin } else { task_to_remove_element := capper.window_of_tasks.Front() if task_to_remove, ok := task_to_remove_element.Value.(*def.Task); ok { - capper.current_sum -= float64(task_to_remove.Watts) + capper.current_sum -= float64(task_to_remove.Watts) * constants.Cap_margin capper.window_of_tasks.Remove(task_to_remove_element) } capper.window_of_tasks.PushBack(tsk) - capper.current_sum += float64(tsk.Watts) + capper.current_sum += float64(tsk.Watts) * constants.Cap_margin } average = capper.average() return average @@ -93,20 +86,20 @@ func (capper clusterwideCapper) running_average_of_watts(tsk *def.Task) float64 /* Calculating cap value. -1. Sorting the values of running_average_available_power_percentage in ascending order. -2. Computing the median of the above sorted values. -3. The median is now the cap value. +1. Sorting the values of running_average_to_total_power_percentage in ascending order. +2. Computing the median of above sorted values. +3. The median is now the cap. */ -func (capper clusterwideCapper) get_cap(running_average_available_power_percentage map[string]float64) float64 { +func (capper clusterwideCapper) get_cap(running_average_to_total_power_percentage map[string]float64) float64 { var values []float64 // Validation - if running_average_available_power_percentage == nil { + if running_average_to_total_power_percentage == nil { return 100.0 } - for _, apower := range running_average_available_power_percentage { + for _, apower := range running_average_to_total_power_percentage { values = append(values, apower) } - // sorting the values in ascending order + // sorting the values in ascending order. sort.Float64s(values) // Calculating the median if median, err := stats.Median(values); err == nil { @@ -116,8 +109,51 @@ func (capper clusterwideCapper) get_cap(running_average_available_power_percenta return 100.0 } -/* Quick sort algorithm to sort tasks, in place, -in ascending order of power.*/ +/* +Recapping the entire cluster. + +1. Remove the task that finished from the list of running tasks. +2. Compute the average allocated power of each of the tasks that are currently running. +3. For each host, determine the ratio of the average to the total power. +4. Determine the median of the ratios and this would be the new cluster wide cap. + +This needs to be called whenever a task finishes execution. +*/ +func (capper clusterwideCapper) recap(total_power map[string]float64, + task_monitor map[string][]def.Task, finished_taskId string) (float64, error) { + // Validation + if total_power == nil || task_monitor == nil { + return 100.0, errors.New("Invalid argument: total_power, task_monitor") + } + total_allocated_power := 0.0 + total_running_tasks := 0 + for _, tasks := range task_monitor { + index := 0 + for i, task := range tasks { + if task.TaskID == finished_taskId { + index = i + continue + } + total_allocated_power += float64(task.Watts) * constants.Cap_margin + total_running_tasks++ + } + tasks = append(tasks[:index], tasks[index+1:]...) + } + average := total_allocated_power / float64(total_running_tasks) + ratios := []float64{} + for _, tpower := range total_power { + ratios = append(ratios, (average/tpower) * 100) + } + sort.Float64s(ratios) + median, err := stats.Median(ratios) + if err == nil { + return median, nil + } else { + return 100, err + } +} + +/* Quick sort algorithm to sort tasks, in place, in ascending order of power.*/ func (capper clusterwideCapper) quick_sort(low int, high int, tasks_to_sort []*def.Task) { i := low j := high @@ -154,7 +190,8 @@ func (capper clusterwideCapper) sort_tasks(tasks_to_sort []*def.Task) { /* Remove entry for finished task. -This function is called when a task completes. This completed task needs to be removed from the window of tasks (if it is still present) +This function is called when a task completes. +This completed task needs to be removed from the window of tasks (if it is still present) so that it doesn't contribute to the computation of the cap value. */ func (capper clusterwideCapper) taskFinished(taskID string) { @@ -173,11 +210,11 @@ func (capper clusterwideCapper) taskFinished(taskID string) { } } - // If finished task is there in the window of tasks, then we need to remove the task from the same and modify the members of clusterwideCapper accordingly. + // Ee need to remove the task from the window. if task_to_remove, ok := task_element_to_remove.Value.(*def.Task); ok { capper.window_of_tasks.Remove(task_element_to_remove) capper.number_of_tasks_in_window -= 1 - capper.current_sum -= float64(task_to_remove.Watts) + capper.current_sum -= float64(task_to_remove.Watts) * constants.Cap_margin } } @@ -211,34 +248,33 @@ func (capper clusterwideCapper) rankedDetermineCap(available_power map[string]fl } } -// First come first serve shceduling. -func (capper clusterwideCapper) fcfsDetermineCap(available_power map[string]float64, new_task *def.Task) (float64, error) { +// First come first serve scheduling. +func (capper clusterwideCapper) fcfsDetermineCap(total_power map[string]float64, + new_task *def.Task) (float64, error) { // Validation - if available_power == nil { - return 100, errors.New("Invalid argument: available_power") + if total_power == nil { + return 100, errors.New("Invalid argument: total_power") } else { - clusterwide_capper_mutex.Lock() // Need to calculate the running average running_average := capper.running_average_of_watts(new_task) - // What percent of available_power for each node is the running average. - running_average_available_power_percentage := make(map[string]float64) - for host, apower := range available_power { - if apower >= running_average { - running_average_available_power_percentage[host] = (running_average/apower) * 100 + // For each node, calculate the percentage of the running average to the total power. + running_average_to_total_power_percentage := make(map[string]float64) + for host, tpower := range total_power { + if tpower >= running_average { + running_average_to_total_power_percentage[host] = (running_average/tpower) * 100 } else { - // We don't consider this host in the offers. + // We don't consider this host for the computation of the cluster wide cap. } } // Determine the cluster wide cap value. - cap_value := capper.get_cap(running_average_available_power_percentage) - // Need to cap the cluster to this value before launching the next task. - clusterwide_capper_mutex.Unlock() + cap_value := capper.get_cap(running_average_to_total_power_percentage) + // Need to cap the cluster to this value. return cap_value, nil } } // Stringer for an instance of clusterwideCapper func (capper clusterwideCapper) string() string { - return "Clusterwide Capper -- Proactively cap the entire cluster." + return "Cluster Capper -- Proactively cap the entire cluster." }