diff --git a/constants/constants.go b/constants/constants.go index 2d9b516..fb06a9d 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -16,71 +16,71 @@ var Hosts = []string{"stratos-001.cs.binghamton.edu", "stratos-002.cs.binghamton "stratos-007.cs.binghamton.edu", "stratos-008.cs.binghamton.edu"} // Add a new host to the slice of hosts. -func AddNewHost(new_host string) bool { +func AddNewHost(newHost string) bool { // Validation - if new_host == "" { + if newHost == "" { return false } else { - Hosts = append(Hosts, new_host) + Hosts = append(Hosts, newHost) return true } } // Lower bound of the percentage of requested power, that can be allocated to a task. -var Power_threshold = 0.6 // Right now saying that a task will never be given lesser than 60% of the power it requested. +var PowerThreshold = 0.6 // Right now saying that a task will never be given lesser than 60% of the power it requested. /* Margin with respect to the required power for a job. So, if power required = 10W, the node would be capped to 75%*10W. This value can be changed upon convenience. */ -var Cap_margin = 0.50 +var CapMargin = 0.70 // Modify the cap margin. -func UpdateCapMargin(new_cap_margin float64) bool { +func UpdateCapMargin(newCapMargin float64) bool { // Checking if the new_cap_margin is less than the power threshold. - if new_cap_margin < Starvation_factor { + if newCapMargin < StarvationFactor { return false } else { - Cap_margin = new_cap_margin + CapMargin = newCapMargin return true } } // Threshold factor that would make (Cap_margin * task.Watts) equal to (60/100 * task.Watts). -var Starvation_factor = 0.8 +var StarvationFactor = 0.8 // Total power per node. -var Total_power map[string]float64 +var TotalPower map[string]float64 // Initialize the total power per node. This should be done before accepting any set of tasks for scheduling. -func AddTotalPowerForHost(host string, total_power float64) bool { +func AddTotalPowerForHost(host string, totalPower float64) bool { // Validation - is_correct_host := false - for _, existing_host := range Hosts { - if host == existing_host { - is_correct_host = true + isCorrectHost := false + for _, existingHost := range Hosts { + if host == existingHost { + isCorrectHost = true } } - if !is_correct_host { + if !isCorrectHost { return false } else { - Total_power[host] = total_power + TotalPower[host] = totalPower return true } } // Window size for running average -var Window_size = 10 +var WindowSize = 160 // Update the window size. -func UpdateWindowSize(new_window_size int) bool { +func UpdateWindowSize(newWindowSize int) bool { // Validation - if new_window_size == 0 { + if newWindowSize == 0 { return false } else { - Window_size = new_window_size + WindowSize = newWindowSize return true } } diff --git a/def/task.go b/def/task.go index 63668ad..9699812 100644 --- a/def/task.go +++ b/def/task.go @@ -38,18 +38,18 @@ func TasksFromJSON(uri string) ([]Task, error) { } // Update the host on which the task needs to be scheduled. -func (tsk *Task) UpdateHost(new_host string) bool { +func (tsk *Task) UpdateHost(newHost string) bool { // Validation - is_correct_host := false - for _, existing_host := range constants.Hosts { - if new_host == existing_host { - is_correct_host = true + isCorrectHost := false + for _, existingHost := range constants.Hosts { + if newHost == existingHost { + isCorrectHost = true } } - if !is_correct_host { + if !isCorrectHost { return false } else { - tsk.Host = new_host + tsk.Host = newHost return true } } diff --git a/schedulers/proactiveclusterwidecappers.go b/schedulers/proactiveclusterwidecappers.go index 5de8a47..e943d37 100644 --- a/schedulers/proactiveclusterwidecappers.go +++ b/schedulers/proactiveclusterwidecappers.go @@ -24,63 +24,63 @@ import ( // Structure containing utility data structures used to compute cluster-wide dynamic cap. type clusterwideCapper struct { // window of tasks. - window_of_tasks list.List + windowOfTasks list.List // The current sum of requested powers of the tasks in the window. - current_sum float64 + currentSum float64 // The current number of tasks in the window. - number_of_tasks_in_window int + numberOfTasksInWindow int } // Defining constructor for clusterwideCapper. Please don't call this directly and instead use getClusterwideCapperInstance(). func newClusterwideCapper() *clusterwideCapper { - return &clusterwideCapper{current_sum: 0.0, number_of_tasks_in_window: 0} + return &clusterwideCapper{currentSum: 0.0, numberOfTasksInWindow: 0} } // Singleton instance of clusterwideCapper -var singleton_capper *clusterwideCapper +var singletonCapper *clusterwideCapper // Retrieve the singleton instance of clusterwideCapper. func getClusterwideCapperInstance() *clusterwideCapper { - if singleton_capper == nil { - singleton_capper = newClusterwideCapper() + if singletonCapper == nil { + singletonCapper = newClusterwideCapper() } else { // Do nothing } - return singleton_capper + return singletonCapper } // Clear and initialize all the members of clusterwideCapper. func (capper clusterwideCapper) clear() { - capper.window_of_tasks.Init() - capper.current_sum = 0 - capper.number_of_tasks_in_window = 0 + capper.windowOfTasks.Init() + capper.currentSum = 0 + capper.numberOfTasksInWindow = 0 } // Compute the average of watts of all the tasks in the window. func (capper clusterwideCapper) average() float64 { - return capper.current_sum / float64(capper.window_of_tasks.Len()) + return capper.currentSum / float64(capper.windowOfTasks.Len()) } /* Compute the running average. -Using clusterwideCapper#window_of_tasks to store the tasks. +Using clusterwideCapper#windowOfTasks to store the tasks. Task at position 0 (oldest task) is removed when the window is full and new task arrives. */ -func (capper clusterwideCapper) running_average_of_watts(tsk *def.Task) float64 { +func (capper clusterwideCapper) runningAverageOfWatts(tsk *def.Task) float64 { var average float64 - if capper.number_of_tasks_in_window < constants.Window_size { - capper.window_of_tasks.PushBack(tsk) - capper.number_of_tasks_in_window++ - capper.current_sum += float64(tsk.Watts) * constants.Cap_margin + if capper.numberOfTasksInWindow < constants.WindowSize { + capper.windowOfTasks.PushBack(tsk) + capper.numberOfTasksInWindow++ + capper.currentSum += float64(tsk.Watts) * constants.CapMargin } else { - task_to_remove_element := capper.window_of_tasks.Front() - if task_to_remove, ok := task_to_remove_element.Value.(*def.Task); ok { - capper.current_sum -= float64(task_to_remove.Watts) * constants.Cap_margin - capper.window_of_tasks.Remove(task_to_remove_element) + taskToRemoveElement := capper.windowOfTasks.Front() + if taskToRemove, ok := taskToRemoveElement.Value.(*def.Task); ok { + capper.currentSum -= float64(taskToRemove.Watts) * constants.CapMargin + capper.windowOfTasks.Remove(taskToRemoveElement) } - capper.window_of_tasks.PushBack(tsk) - capper.current_sum += float64(tsk.Watts) * constants.Cap_margin + capper.windowOfTasks.PushBack(tsk) + capper.currentSum += float64(tsk.Watts) * constants.CapMargin } average = capper.average() return average @@ -89,17 +89,17 @@ func (capper clusterwideCapper) running_average_of_watts(tsk *def.Task) float64 /* Calculating cap value. -1. Sorting the values of running_average_to_total_power_percentage in ascending order. +1. Sorting the values of runningAverageToTotalPowerPercentage in ascending order. 2. Computing the median of above sorted values. 3. The median is now the cap. */ -func (capper clusterwideCapper) get_cap(running_average_to_total_power_percentage map[string]float64) float64 { +func (capper clusterwideCapper) getCap(runningAverageToTotalPowerPercentage map[string]float64) float64 { var values []float64 // Validation - if running_average_to_total_power_percentage == nil { + if runningAverageToTotalPowerPercentage == nil { return 100.0 } - for _, apower := range running_average_to_total_power_percentage { + for _, apower := range runningAverageToTotalPowerPercentage { values = append(values, apower) } // sorting the values in ascending order. @@ -122,51 +122,51 @@ The recap value picked the least among the two. The cleverRecap scheme works well when the cluster is relatively idle and until then, the primitive recapping scheme works better. */ -func (capper clusterwideCapper) cleverRecap(total_power map[string]float64, - task_monitor map[string][]def.Task, finished_taskId string) (float64, error) { +func (capper clusterwideCapper) cleverRecap(totalPower map[string]float64, + taskMonitor map[string][]def.Task, finishedTaskId string) (float64, error) { // Validation - if total_power == nil || task_monitor == nil { - return 100.0, errors.New("Invalid argument: total_power, task_monitor") + if totalPower == nil || taskMonitor == nil { + return 100.0, errors.New("Invalid argument: totalPower, taskMonitor") } // determining the recap value by calling the regular recap(...) toggle := false - recapValue, err := capper.recap(total_power, task_monitor, finished_taskId) + recapValue, err := capper.recap(totalPower, taskMonitor, finishedTaskId) if err == nil { toggle = true } // watts usage on each node in the cluster. - watts_usages := make(map[string][]float64) - host_of_finished_task := "" - index_of_finished_task := -1 + wattsUsages := make(map[string][]float64) + hostOfFinishedTask := "" + indexOfFinishedTask := -1 for _, host := range constants.Hosts { - watts_usages[host] = []float64{0.0} + wattsUsages[host] = []float64{0.0} } - for host, tasks := range task_monitor { + for host, tasks := range taskMonitor { for i, task := range tasks { - if task.TaskID == finished_taskId { - host_of_finished_task = host - index_of_finished_task = i - // Not considering this task for the computation of total_allocated_power and total_running_tasks + if task.TaskID == finishedTaskId { + hostOfFinishedTask = host + indexOfFinishedTask = i + // Not considering this task for the computation of totalAllocatedPower and totalRunningTasks continue } - watts_usages[host] = append(watts_usages[host], float64(task.Watts)*constants.Cap_margin) + wattsUsages[host] = append(wattsUsages[host], float64(task.Watts)*constants.CapMargin) } } // Updating task monitor. If recap(...) has deleted the finished task from the taskMonitor, // then this will be ignored. Else (this is only when an error occured with recap(...)), we remove it here. - if host_of_finished_task != "" && index_of_finished_task != -1 { + if hostOfFinishedTask != "" && indexOfFinishedTask != -1 { log.Printf("Removing task with task [%s] from the list of running tasks\n", - task_monitor[host_of_finished_task][index_of_finished_task].TaskID) - task_monitor[host_of_finished_task] = append(task_monitor[host_of_finished_task][:index_of_finished_task], - task_monitor[host_of_finished_task][index_of_finished_task+1:]...) + taskMonitor[hostOfFinishedTask][indexOfFinishedTask].TaskID) + taskMonitor[hostOfFinishedTask] = append(taskMonitor[hostOfFinishedTask][:indexOfFinishedTask], + taskMonitor[hostOfFinishedTask][indexOfFinishedTask+1:]...) } // Need to check whether there are still tasks running on the cluster. If not then we return an error. clusterIdle := true - for _, tasks := range task_monitor { + for _, tasks := range taskMonitor { if len(tasks) > 0 { clusterIdle = false } @@ -175,29 +175,29 @@ func (capper clusterwideCapper) cleverRecap(total_power map[string]float64, if !clusterIdle { // load on each node in the cluster. loads := []float64{0.0} - for host, usages := range watts_usages { - total_usage := 0.0 + for host, usages := range wattsUsages { + totalUsage := 0.0 for _, usage := range usages { - total_usage += usage + totalUsage += usage } - loads = append(loads, total_usage/total_power[host]) + loads = append(loads, totalUsage/totalPower[host]) } // Now need to compute the average load. - total_load := 0.0 + totalLoad := 0.0 for _, load := range loads { - total_load += load + totalLoad += load } - average_load := (total_load / float64(len(loads)) * 100.0) // this would be the cap value. + averageLoad := (totalLoad / float64(len(loads)) * 100.0) // this would be the cap value. // If toggle is true, then we need to return the least recap value. if toggle { - if average_load <= recapValue { - return average_load, nil + if averageLoad <= recapValue { + return averageLoad, nil } else { return recapValue, nil } } else { - return average_load, nil + return averageLoad, nil } } return 100.0, errors.New("No task running on the cluster.") @@ -213,46 +213,46 @@ Recapping the entire cluster. This needs to be called whenever a task finishes execution. */ -func (capper clusterwideCapper) recap(total_power map[string]float64, - task_monitor map[string][]def.Task, finished_taskId string) (float64, error) { +func (capper clusterwideCapper) recap(totalPower map[string]float64, + taskMonitor map[string][]def.Task, finishedTaskId string) (float64, error) { // Validation - if total_power == nil || task_monitor == nil { - return 100.0, errors.New("Invalid argument: total_power, task_monitor") + if totalPower == nil || taskMonitor == nil { + return 100.0, errors.New("Invalid argument: totalPower, taskMonitor") } - total_allocated_power := 0.0 - total_running_tasks := 0 + totalAllocatedPower := 0.0 + totalRunningTasks := 0 - host_of_finished_task := "" - index_of_finished_task := -1 - for host, tasks := range task_monitor { + hostOfFinishedTask := "" + indexOfFinishedTask := -1 + for host, tasks := range taskMonitor { for i, task := range tasks { - if task.TaskID == finished_taskId { - host_of_finished_task = host - index_of_finished_task = i - // Not considering this task for the computation of total_allocated_power and total_running_tasks + if task.TaskID == finishedTaskId { + hostOfFinishedTask = host + indexOfFinishedTask = i + // Not considering this task for the computation of totalAllocatedPower and totalRunningTasks continue } - total_allocated_power += (float64(task.Watts) * constants.Cap_margin) - total_running_tasks++ + totalAllocatedPower += (float64(task.Watts) * constants.CapMargin) + totalRunningTasks++ } } // Updating task monitor - if host_of_finished_task != "" && index_of_finished_task != -1 { + if hostOfFinishedTask != "" && indexOfFinishedTask != -1 { log.Printf("Removing task with task [%s] from the list of running tasks\n", - task_monitor[host_of_finished_task][index_of_finished_task].TaskID) - task_monitor[host_of_finished_task] = append(task_monitor[host_of_finished_task][:index_of_finished_task], - task_monitor[host_of_finished_task][index_of_finished_task+1:]...) + taskMonitor[hostOfFinishedTask][indexOfFinishedTask].TaskID) + taskMonitor[hostOfFinishedTask] = append(taskMonitor[hostOfFinishedTask][:indexOfFinishedTask], + taskMonitor[hostOfFinishedTask][indexOfFinishedTask+1:]...) } - // For the last task, total_allocated_power and total_running_tasks would be 0 - if total_allocated_power == 0 && total_running_tasks == 0 { + // For the last task, totalAllocatedPower and totalRunningTasks would be 0 + if totalAllocatedPower == 0 && totalRunningTasks == 0 { return 100, errors.New("No task running on the cluster.") } - average := total_allocated_power / float64(total_running_tasks) + average := totalAllocatedPower / float64(totalRunningTasks) ratios := []float64{} - for _, tpower := range total_power { + for _, tpower := range totalPower { ratios = append(ratios, (average/tpower)*100) } sort.Float64s(ratios) @@ -265,38 +265,38 @@ func (capper clusterwideCapper) recap(total_power map[string]float64, } /* Quick sort algorithm to sort tasks, in place, in ascending order of power.*/ -func (capper clusterwideCapper) quick_sort(low int, high int, tasks_to_sort *[]def.Task) { +func (capper clusterwideCapper) quickSort(low int, high int, tasksToSort *[]def.Task) { i := low j := high // calculating the pivot - pivot_index := low + (high-low)/2 - pivot := (*tasks_to_sort)[pivot_index] + pivotIndex := low + (high-low)/2 + pivot := (*tasksToSort)[pivotIndex] for i <= j { - for (*tasks_to_sort)[i].Watts < pivot.Watts { + for (*tasksToSort)[i].Watts < pivot.Watts { i++ } - for (*tasks_to_sort)[j].Watts > pivot.Watts { + for (*tasksToSort)[j].Watts > pivot.Watts { j-- } if i <= j { - temp := (*tasks_to_sort)[i] - (*tasks_to_sort)[i] = (*tasks_to_sort)[j] - (*tasks_to_sort)[j] = temp + temp := (*tasksToSort)[i] + (*tasksToSort)[i] = (*tasksToSort)[j] + (*tasksToSort)[j] = temp i++ j-- } } if low < j { - capper.quick_sort(low, j, tasks_to_sort) + capper.quickSort(low, j, tasksToSort) } if i < high { - capper.quick_sort(i, high, tasks_to_sort) + capper.quickSort(i, high, tasksToSort) } } // Sorting tasks in ascending order of requested watts. -func (capper clusterwideCapper) sort_tasks(tasks_to_sort *[]def.Task) { - capper.quick_sort(0, len(*tasks_to_sort)-1, tasks_to_sort) +func (capper clusterwideCapper) sortTasks(tasksToSort *[]def.Task) { + capper.quickSort(0, len(*tasksToSort)-1, tasksToSort) } /* @@ -307,51 +307,51 @@ This completed task needs to be removed from the window of tasks (if it is still */ func (capper clusterwideCapper) taskFinished(taskID string) { // If the window is empty the just return. This condition should technically return false. - if capper.window_of_tasks.Len() == 0 { + if capper.windowOfTasks.Len() == 0 { return } // Checking whether the task with the given taskID is currently present in the window of tasks. - var task_element_to_remove *list.Element - for task_element := capper.window_of_tasks.Front(); task_element != nil; task_element = task_element.Next() { - if tsk, ok := task_element.Value.(*def.Task); ok { + var taskElementToRemove *list.Element + for taskElement := capper.windowOfTasks.Front(); taskElement != nil; taskElement = taskElement.Next() { + if tsk, ok := taskElement.Value.(*def.Task); ok { if tsk.TaskID == taskID { - task_element_to_remove = task_element + taskElementToRemove = taskElement } } } // we need to remove the task from the window. - if task_to_remove, ok := task_element_to_remove.Value.(*def.Task); ok { - capper.window_of_tasks.Remove(task_element_to_remove) - capper.number_of_tasks_in_window -= 1 - capper.current_sum -= float64(task_to_remove.Watts) * constants.Cap_margin + if taskToRemove, ok := taskElementToRemove.Value.(*def.Task); ok { + capper.windowOfTasks.Remove(taskElementToRemove) + capper.numberOfTasksInWindow -= 1 + capper.currentSum -= float64(taskToRemove.Watts) * constants.CapMargin } } // First come first serve scheduling. -func (capper clusterwideCapper) fcfsDetermineCap(total_power map[string]float64, - new_task *def.Task) (float64, error) { +func (capper clusterwideCapper) fcfsDetermineCap(totalPower map[string]float64, + newTask *def.Task) (float64, error) { // Validation - if total_power == nil { - return 100, errors.New("Invalid argument: total_power") + if totalPower == nil { + return 100, errors.New("Invalid argument: totalPower") } else { // Need to calculate the running average - running_average := capper.running_average_of_watts(new_task) + runningAverage := capper.runningAverageOfWatts(newTask) // For each node, calculate the percentage of the running average to the total power. - running_average_to_total_power_percentage := make(map[string]float64) - for host, tpower := range total_power { - if tpower >= running_average { - running_average_to_total_power_percentage[host] = (running_average / tpower) * 100 + runningAverageToTotalPowerPercentage := make(map[string]float64) + for host, tpower := range totalPower { + if tpower >= runningAverage { + runningAverageToTotalPowerPercentage[host] = (runningAverage / tpower) * 100 } else { // We don't consider this host for the computation of the cluster wide cap. } } // Determine the cluster wide cap value. - cap_value := capper.get_cap(running_average_to_total_power_percentage) + capValue := capper.getCap(runningAverageToTotalPowerPercentage) // Need to cap the cluster to this value. - return cap_value, nil + return capValue, nil } } diff --git a/schedulers/proactiveclusterwidecappingfcfs.go b/schedulers/proactiveclusterwidecappingfcfs.go index 5ff439f..49094cd 100644 --- a/schedulers/proactiveclusterwidecappingfcfs.go +++ b/schedulers/proactiveclusterwidecappingfcfs.go @@ -304,8 +304,8 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive log.Println(err) } log.Printf("Starting on [%s]\n", offer.GetHostname()) - to_schedule := []*mesos.TaskInfo{s.newTask(offer, task)} - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, to_schedule, defaultFilter) + toSchedule := []*mesos.TaskInfo{s.newTask(offer, task)} + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, toSchedule, defaultFilter) log.Printf("Inst: %d", *task.Instances) *task.Instances-- if *task.Instances <= 0 { diff --git a/schedulers/proactiveclusterwidecappingranked.go b/schedulers/proactiveclusterwidecappingranked.go index 3c9ef81..f6ea425 100644 --- a/schedulers/proactiveclusterwidecappingranked.go +++ b/schedulers/proactiveclusterwidecappingranked.go @@ -257,7 +257,7 @@ func (s *ProactiveClusterwideCapRanked) ResouceOffers(driver sched.SchedulerDriv } // sorting the tasks in ascending order of watts. - s.capper.sort_tasks(&s.tasks) + s.capper.sortTasks(&s.tasks) // displaying the ranked tasks. log.Println("The ranked tasks are:\n---------------------\n\t[") for rank, task := range s.tasks { diff --git a/utilities/utils.go b/utilities/utils.go index ede4f64..c53df74 100644 --- a/utilities/utils.go +++ b/utilities/utils.go @@ -37,11 +37,11 @@ func OrderedKeys(plist PairList) ([]string, error) { if plist == nil { return nil, errors.New("Invalid argument: plist") } - ordered_keys := make([]string, len(plist)) + orderedKeys := make([]string, len(plist)) for _, pair := range plist { - ordered_keys = append(ordered_keys, pair.Key) + orderedKeys = append(orderedKeys, pair.Key) } - return ordered_keys, nil + return orderedKeys, nil } // determine the max value