Sycnrhonized operations that change the value of the cluster wide cap. Added cleverRecap(...) that determines the recap value of the cluster at a much finer level, taking into account the average load on each node in the cluster. Bug fix in cap.go -- closed the session once capping had been done. This prevented from running out of file descriptors.

This commit is contained in:
Pradyumna Kaushik 2016-11-17 21:51:02 -05:00 committed by Renan DelValle
parent e562df0f5c
commit 3f90ccfe74
4 changed files with 194 additions and 68 deletions

View file

@ -16,6 +16,7 @@ import (
"container/list"
"errors"
"github.com/montanaflynn/stats"
"log"
"sort"
)
@ -110,6 +111,68 @@ func (capper clusterwideCapper) get_cap(running_average_to_total_power_percentag
return 100.0
}
/*
Recapping the entire cluster. Also, removing the finished task from the list of running tasks.
We would, at this point, have a better knowledge about the state of the cluster.
1. Calculate the total allocated watts per node in the cluster.
2. Compute the ratio of the total watts usage per node to the total power for that node.
This would give us the load on that node.
3. Now, compute the average load across all the nodes in the cluster.
This would be the cap value.
*/
func (capper clusterwideCapper) cleverRecap(total_power map[string]float64,
task_monitor map[string][]def.Task, finished_taskId string) (float64, error) {
// Validation
if total_power == nil || task_monitor == nil {
return 100.0, errors.New("Invalid argument: total_power, task_monitor")
}
// watts usage on each node in the cluster.
watts_usages := make(map[string][]float64)
host_of_finished_task := ""
index_of_finished_task := -1
for _, host := range constants.Hosts {
watts_usages[host] = []float64{0.0}
}
for host, tasks := range task_monitor {
for i, task := range tasks {
if task.TaskID == finished_taskId {
host_of_finished_task = host
index_of_finished_task = i
// Not considering this task
continue
}
watts_usages[host] = append(watts_usages[host], float64(task.Watts) * constants.Cap_margin)
}
}
// Updating task monitor
if host_of_finished_task != "" && index_of_finished_task != -1 {
log.Printf("Removing task with task [%s] from the list of running tasks\n",
task_monitor[host_of_finished_task][index_of_finished_task].TaskID)
task_monitor[host_of_finished_task] = append(task_monitor[host_of_finished_task][:index_of_finished_task],
task_monitor[host_of_finished_task][index_of_finished_task+1:]...)
}
// load on each node in the cluster.
loads := []float64{}
for host, usages := range watts_usages {
total_usage := 0.0
for _, usage := range usages {
total_usage += usage
}
loads = append(loads, total_usage / total_power[host])
}
// Now need to compute the average load.
total_load := 0.0
for _, load := range loads {
total_load += load
}
average_load := total_load / float64(len(loads)) // this would be the cap value.
return average_load, nil
}
/*
Recapping the entire cluster.
@ -128,18 +191,35 @@ func (capper clusterwideCapper) recap(total_power map[string]float64,
}
total_allocated_power := 0.0
total_running_tasks := 0
for _, tasks := range task_monitor {
index := 0
for i, task := range tasks {
if task.TaskID == finished_taskId {
index = i
continue
}
total_allocated_power += float64(task.Watts) * constants.Cap_margin
total_running_tasks++
}
tasks = append(tasks[:index], tasks[index+1:]...)
}
host_of_finished_task := ""
index_of_finished_task := -1
for host, tasks := range task_monitor {
for i, task := range tasks {
if task.TaskID == finished_taskId {
host_of_finished_task = host
index_of_finished_task = i
// Not considering this task for the computation of total_allocated_power and total_running_tasks
continue
}
total_allocated_power += (float64(task.Watts) * constants.Cap_margin)
total_running_tasks++
}
}
// Updating task monitor
if host_of_finished_task != "" && index_of_finished_task != -1 {
log.Printf("Removing task with task [%s] from the list of running tasks\n",
task_monitor[host_of_finished_task][index_of_finished_task].TaskID)
task_monitor[host_of_finished_task] = append(task_monitor[host_of_finished_task][:index_of_finished_task],
task_monitor[host_of_finished_task][index_of_finished_task+1:]...)
}
// For the last task, total_allocated_power and total_running_tasks would be 0
if total_allocated_power == 0 && total_running_tasks == 0 {
return 100, errors.New("No task running on the cluster.")
}
average := total_allocated_power / float64(total_running_tasks)
ratios := []float64{}
for _, tpower := range total_power {
@ -211,7 +291,7 @@ func (capper clusterwideCapper) taskFinished(taskID string) {
}
}
// Ee need to remove the task from the window.
// we need to remove the task from the window.
if task_to_remove, ok := task_element_to_remove.Value.(*def.Task); ok {
capper.window_of_tasks.Remove(task_element_to_remove)
capper.number_of_tasks_in_window -= 1