diff --git a/constants/constants.go b/constants/constants.go index 5dc98ea..2d9b516 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -11,19 +11,19 @@ Also, exposing functions to update or initialize some of the constants. package constants var Hosts = []string{"stratos-001.cs.binghamton.edu", "stratos-002.cs.binghamton.edu", - "stratos-003.cs.binghamton.edu", "stratos-004.cs.binghamton.edu", - "stratos-005.cs.binghamton.edu", "stratos-006.cs.binghamton.edu", - "stratos-007.cs.binghamton.edu", "stratos-008.cs.binghamton.edu"} + "stratos-003.cs.binghamton.edu", "stratos-004.cs.binghamton.edu", + "stratos-005.cs.binghamton.edu", "stratos-006.cs.binghamton.edu", + "stratos-007.cs.binghamton.edu", "stratos-008.cs.binghamton.edu"} // Add a new host to the slice of hosts. func AddNewHost(new_host string) bool { - // Validation - if new_host == "" { - return false - } else { - Hosts = append(Hosts, new_host) - return true - } + // Validation + if new_host == "" { + return false + } else { + Hosts = append(Hosts, new_host) + return true + } } // Lower bound of the percentage of requested power, that can be allocated to a task. @@ -38,16 +38,15 @@ var Cap_margin = 0.50 // Modify the cap margin. func UpdateCapMargin(new_cap_margin float64) bool { - // Checking if the new_cap_margin is less than the power threshold. - if new_cap_margin < Starvation_factor { - return false - } else { - Cap_margin = new_cap_margin - return true - } + // Checking if the new_cap_margin is less than the power threshold. + if new_cap_margin < Starvation_factor { + return false + } else { + Cap_margin = new_cap_margin + return true + } } - // Threshold factor that would make (Cap_margin * task.Watts) equal to (60/100 * task.Watts). var Starvation_factor = 0.8 @@ -56,32 +55,32 @@ var Total_power map[string]float64 // Initialize the total power per node. This should be done before accepting any set of tasks for scheduling. func AddTotalPowerForHost(host string, total_power float64) bool { - // Validation - is_correct_host := false - for _, existing_host := range Hosts { - if host == existing_host { - is_correct_host = true - } - } + // Validation + is_correct_host := false + for _, existing_host := range Hosts { + if host == existing_host { + is_correct_host = true + } + } - if !is_correct_host { - return false - } else { - Total_power[host] = total_power - return true - } + if !is_correct_host { + return false + } else { + Total_power[host] = total_power + return true + } } // Window size for running average -var Window_size = 160 +var Window_size = 10 // Update the window size. func UpdateWindowSize(new_window_size int) bool { - // Validation - if new_window_size == 0 { - return false - } else{ - Window_size = new_window_size - return true - } -} \ No newline at end of file + // Validation + if new_window_size == 0 { + return false + } else { + Window_size = new_window_size + return true + } +} diff --git a/schedulers/proactiveclusterwidecappers.go b/schedulers/proactiveclusterwidecappers.go index 505ac76..5de8a47 100644 --- a/schedulers/proactiveclusterwidecappers.go +++ b/schedulers/proactiveclusterwidecappers.go @@ -17,7 +17,7 @@ import ( "container/list" "errors" "github.com/montanaflynn/stats" - "log" + "log" "sort" ) @@ -118,8 +118,11 @@ A recapping strategy which decides between 2 different recapping schemes. 2. A scheme based on the average of the loads on each node in the cluster. The recap value picked the least among the two. + +The cleverRecap scheme works well when the cluster is relatively idle and until then, + the primitive recapping scheme works better. */ -func (capper clusterwideCapper) cleverRecap(total_power map[string]float64, +func (capper clusterwideCapper) cleverRecap(total_power map[string]float64, task_monitor map[string][]def.Task, finished_taskId string) (float64, error) { // Validation if total_power == nil || task_monitor == nil { @@ -136,48 +139,48 @@ func (capper clusterwideCapper) cleverRecap(total_power map[string]float64, // watts usage on each node in the cluster. watts_usages := make(map[string][]float64) host_of_finished_task := "" - index_of_finished_task := -1 + index_of_finished_task := -1 for _, host := range constants.Hosts { watts_usages[host] = []float64{0.0} } for host, tasks := range task_monitor { for i, task := range tasks { if task.TaskID == finished_taskId { - host_of_finished_task = host - index_of_finished_task = i - // Not considering this task for the computation of total_allocated_power and total_running_tasks - continue - } - watts_usages[host] = append(watts_usages[host], float64(task.Watts) * constants.Cap_margin) + host_of_finished_task = host + index_of_finished_task = i + // Not considering this task for the computation of total_allocated_power and total_running_tasks + continue + } + watts_usages[host] = append(watts_usages[host], float64(task.Watts)*constants.Cap_margin) } } // Updating task monitor. If recap(...) has deleted the finished task from the taskMonitor, - // then this will be ignored. - if host_of_finished_task != "" && index_of_finished_task != -1 { - log.Printf("Removing task with task [%s] from the list of running tasks\n", - task_monitor[host_of_finished_task][index_of_finished_task].TaskID) - task_monitor[host_of_finished_task] = append(task_monitor[host_of_finished_task][:index_of_finished_task], - task_monitor[host_of_finished_task][index_of_finished_task+1:]...) - } + // then this will be ignored. Else (this is only when an error occured with recap(...)), we remove it here. + if host_of_finished_task != "" && index_of_finished_task != -1 { + log.Printf("Removing task with task [%s] from the list of running tasks\n", + task_monitor[host_of_finished_task][index_of_finished_task].TaskID) + task_monitor[host_of_finished_task] = append(task_monitor[host_of_finished_task][:index_of_finished_task], + task_monitor[host_of_finished_task][index_of_finished_task+1:]...) + } - // Need to check whether there are still tasks running on the cluster. If not then we return an error. - clusterIdle := true - for _, tasks := range task_monitor { - if len(tasks) > 0 { - clusterIdle = false - } - } + // Need to check whether there are still tasks running on the cluster. If not then we return an error. + clusterIdle := true + for _, tasks := range task_monitor { + if len(tasks) > 0 { + clusterIdle = false + } + } - if !clusterIdle { - // load on each node in the cluster. + if !clusterIdle { + // load on each node in the cluster. loads := []float64{0.0} for host, usages := range watts_usages { total_usage := 0.0 for _, usage := range usages { total_usage += usage } - loads = append(loads, total_usage / total_power[host]) + loads = append(loads, total_usage/total_power[host]) } // Now need to compute the average load. @@ -219,33 +222,33 @@ func (capper clusterwideCapper) recap(total_power map[string]float64, total_allocated_power := 0.0 total_running_tasks := 0 - host_of_finished_task := "" - index_of_finished_task := -1 - for host, tasks := range task_monitor { - for i, task := range tasks { - if task.TaskID == finished_taskId { - host_of_finished_task = host - index_of_finished_task = i - // Not considering this task for the computation of total_allocated_power and total_running_tasks - continue - } - total_allocated_power += (float64(task.Watts) * constants.Cap_margin) - total_running_tasks++ - } - } + host_of_finished_task := "" + index_of_finished_task := -1 + for host, tasks := range task_monitor { + for i, task := range tasks { + if task.TaskID == finished_taskId { + host_of_finished_task = host + index_of_finished_task = i + // Not considering this task for the computation of total_allocated_power and total_running_tasks + continue + } + total_allocated_power += (float64(task.Watts) * constants.Cap_margin) + total_running_tasks++ + } + } - // Updating task monitor - if host_of_finished_task != "" && index_of_finished_task != -1 { - log.Printf("Removing task with task [%s] from the list of running tasks\n", - task_monitor[host_of_finished_task][index_of_finished_task].TaskID) - task_monitor[host_of_finished_task] = append(task_monitor[host_of_finished_task][:index_of_finished_task], - task_monitor[host_of_finished_task][index_of_finished_task+1:]...) - } + // Updating task monitor + if host_of_finished_task != "" && index_of_finished_task != -1 { + log.Printf("Removing task with task [%s] from the list of running tasks\n", + task_monitor[host_of_finished_task][index_of_finished_task].TaskID) + task_monitor[host_of_finished_task] = append(task_monitor[host_of_finished_task][:index_of_finished_task], + task_monitor[host_of_finished_task][index_of_finished_task+1:]...) + } - // For the last task, total_allocated_power and total_running_tasks would be 0 - if total_allocated_power == 0 && total_running_tasks == 0 { - return 100, errors.New("No task running on the cluster.") - } + // For the last task, total_allocated_power and total_running_tasks would be 0 + if total_allocated_power == 0 && total_running_tasks == 0 { + return 100, errors.New("No task running on the cluster.") + } average := total_allocated_power / float64(total_running_tasks) ratios := []float64{} diff --git a/schedulers/proactiveclusterwidecappingfcfs.go b/schedulers/proactiveclusterwidecappingfcfs.go index c749071..4a13574 100644 --- a/schedulers/proactiveclusterwidecappingfcfs.go +++ b/schedulers/proactiveclusterwidecappingfcfs.go @@ -76,7 +76,7 @@ func NewProactiveClusterwideCapFCFS(tasks []def.Task, ignoreWatts bool) *Proacti ticker: time.NewTicker(10 * time.Second), recapTicker: time.NewTicker(20 * time.Second), isCapping: false, - isRecapping: false, + isRecapping: false, } return s } diff --git a/utilities/utils.go b/utilities/utils.go index d6406d6..ede4f64 100644 --- a/utilities/utils.go +++ b/utilities/utils.go @@ -9,8 +9,8 @@ https://groups.google.com/forum/#!topic/golang-nuts/FT7cjmcL7gw // Utility struct that helps in sorting the available power by value. type Pair struct { - Key string - Value float64 + Key string + Value float64 } // A slice of pairs that implements the sort.Interface to sort by value. @@ -18,37 +18,37 @@ type PairList []Pair // Swap pairs in the PairList func (plist PairList) Swap(i, j int) { - plist[i], plist[j] = plist[j], plist[i] + plist[i], plist[j] = plist[j], plist[i] } // function to return the length of the pairlist. func (plist PairList) Len() int { - return len(plist) + return len(plist) } // function to compare two elements in pairlist. func (plist PairList) Less(i, j int) bool { - return plist[i].Value < plist[j].Value + return plist[i].Value < plist[j].Value } // convert a PairList to a map[string]float64 func OrderedKeys(plist PairList) ([]string, error) { - // Validation - if plist == nil { - return nil, errors.New("Invalid argument: plist") - } - ordered_keys := make([]string, len(plist)) - for _, pair := range plist { - ordered_keys = append(ordered_keys, pair.Key) - } - return ordered_keys, nil + // Validation + if plist == nil { + return nil, errors.New("Invalid argument: plist") + } + ordered_keys := make([]string, len(plist)) + for _, pair := range plist { + ordered_keys = append(ordered_keys, pair.Key) + } + return ordered_keys, nil } // determine the max value func Max(a, b float64) float64 { - if a > b { - return a - } else { - return b - } + if a > b { + return a + } else { + return b + } }