From b7394b876283ac46bd67966f2eb8dcdf80f302ee Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Mon, 14 Nov 2016 22:53:06 -0500 Subject: [PATCH] formatted the code --- schedulers/proactiveclusterwidecappers.go | 359 +++++++-------- schedulers/proactiveclusterwidecappingfcfs.go | 434 +++++++++--------- 2 files changed, 397 insertions(+), 396 deletions(-) diff --git a/schedulers/proactiveclusterwidecappers.go b/schedulers/proactiveclusterwidecappers.go index 8d7a55e..aa3eafa 100644 --- a/schedulers/proactiveclusterwidecappers.go +++ b/schedulers/proactiveclusterwidecappers.go @@ -11,51 +11,52 @@ This is not a scheduler but a scheduling scheme that schedulers can use. package schedulers import ( - "bitbucket.org/sunybingcloud/electron/constants" - "bitbucket.org/sunybingcloud/electron/def" - "container/list" - "errors" - "github.com/montanaflynn/stats" - "sort" + "bitbucket.org/sunybingcloud/electron/constants" + "bitbucket.org/sunybingcloud/electron/def" + "container/list" + "errors" + "github.com/montanaflynn/stats" + "sort" ) // Structure containing utility data structures used to compute cluster-wide dynamic cap. type clusterwideCapper struct { - // window of tasks. - window_of_tasks list.List - // The current sum of requested powers of the tasks in the window. - current_sum float64 - // The current number of tasks in the window. - number_of_tasks_in_window int + // window of tasks. + window_of_tasks list.List + // The current sum of requested powers of the tasks in the window. + current_sum float64 + // The current number of tasks in the window. + number_of_tasks_in_window int } // Defining constructor for clusterwideCapper. Please don't call this directly and instead use getClusterwideCapperInstance(). func newClusterwideCapper() *clusterwideCapper { - return &clusterwideCapper{current_sum: 0.0, number_of_tasks_in_window: 0} + return &clusterwideCapper{current_sum: 0.0, number_of_tasks_in_window: 0} } // Singleton instance of clusterwideCapper var singleton_capper *clusterwideCapper + // Retrieve the singleton instance of clusterwideCapper. func getClusterwideCapperInstance() *clusterwideCapper { - if singleton_capper == nil { - singleton_capper = newClusterwideCapper() - } else { - // Do nothing - } - return singleton_capper + if singleton_capper == nil { + singleton_capper = newClusterwideCapper() + } else { + // Do nothing + } + return singleton_capper } // Clear and initialize all the members of clusterwideCapper. func (capper clusterwideCapper) clear() { - capper.window_of_tasks.Init() - capper.current_sum = 0 - capper.number_of_tasks_in_window = 0 + capper.window_of_tasks.Init() + capper.current_sum = 0 + capper.number_of_tasks_in_window = 0 } // Compute the average of watts of all the tasks in the window. func (capper clusterwideCapper) average() float64 { - return capper.current_sum / float64(capper.window_of_tasks.Len()) + return capper.current_sum / float64(capper.window_of_tasks.Len()) } /* @@ -65,22 +66,22 @@ Using clusterwideCapper#window_of_tasks to store the tasks. Task at position 0 (oldest task) is removed when the window is full and new task arrives. */ func (capper clusterwideCapper) running_average_of_watts(tsk *def.Task) float64 { - var average float64 - if capper.number_of_tasks_in_window < constants.Window_size { - capper.window_of_tasks.PushBack(tsk) - capper.number_of_tasks_in_window++ - capper.current_sum += float64(tsk.Watts) * constants.Cap_margin - } else { - task_to_remove_element := capper.window_of_tasks.Front() - if task_to_remove, ok := task_to_remove_element.Value.(*def.Task); ok { - capper.current_sum -= float64(task_to_remove.Watts) * constants.Cap_margin - capper.window_of_tasks.Remove(task_to_remove_element) - } - capper.window_of_tasks.PushBack(tsk) - capper.current_sum += float64(tsk.Watts) * constants.Cap_margin - } - average = capper.average() - return average + var average float64 + if capper.number_of_tasks_in_window < constants.Window_size { + capper.window_of_tasks.PushBack(tsk) + capper.number_of_tasks_in_window++ + capper.current_sum += float64(tsk.Watts) * constants.Cap_margin + } else { + task_to_remove_element := capper.window_of_tasks.Front() + if task_to_remove, ok := task_to_remove_element.Value.(*def.Task); ok { + capper.current_sum -= float64(task_to_remove.Watts) * constants.Cap_margin + capper.window_of_tasks.Remove(task_to_remove_element) + } + capper.window_of_tasks.PushBack(tsk) + capper.current_sum += float64(tsk.Watts) * constants.Cap_margin + } + average = capper.average() + return average } /* @@ -91,22 +92,22 @@ Calculating cap value. 3. The median is now the cap. */ func (capper clusterwideCapper) get_cap(running_average_to_total_power_percentage map[string]float64) float64 { - var values []float64 - // Validation - if running_average_to_total_power_percentage == nil { - return 100.0 - } - for _, apower := range running_average_to_total_power_percentage { - values = append(values, apower) - } - // sorting the values in ascending order. - sort.Float64s(values) - // Calculating the median - if median, err := stats.Median(values); err == nil { - return median - } - // should never reach here. If here, then just setting the cap value to be 100 - return 100.0 + var values []float64 + // Validation + if running_average_to_total_power_percentage == nil { + return 100.0 + } + for _, apower := range running_average_to_total_power_percentage { + values = append(values, apower) + } + // sorting the values in ascending order. + sort.Float64s(values) + // Calculating the median + if median, err := stats.Median(values); err == nil { + return median + } + // should never reach here. If here, then just setting the cap value to be 100 + return 100.0 } /* @@ -120,72 +121,72 @@ Recapping the entire cluster. This needs to be called whenever a task finishes execution. */ func (capper clusterwideCapper) recap(total_power map[string]float64, - task_monitor map[string][]def.Task, finished_taskId string) (float64, error) { - // Validation - if total_power == nil || task_monitor == nil { - return 100.0, errors.New("Invalid argument: total_power, task_monitor") - } - total_allocated_power := 0.0 - total_running_tasks := 0 - for _, tasks := range task_monitor { - index := 0 - for i, task := range tasks { - if task.TaskID == finished_taskId { - index = i - continue - } - total_allocated_power += float64(task.Watts) * constants.Cap_margin - total_running_tasks++ - } - tasks = append(tasks[:index], tasks[index+1:]...) - } - average := total_allocated_power / float64(total_running_tasks) - ratios := []float64{} - for _, tpower := range total_power { - ratios = append(ratios, (average/tpower) * 100) - } - sort.Float64s(ratios) - median, err := stats.Median(ratios) - if err == nil { - return median, nil - } else { - return 100, err - } + task_monitor map[string][]def.Task, finished_taskId string) (float64, error) { + // Validation + if total_power == nil || task_monitor == nil { + return 100.0, errors.New("Invalid argument: total_power, task_monitor") + } + total_allocated_power := 0.0 + total_running_tasks := 0 + for _, tasks := range task_monitor { + index := 0 + for i, task := range tasks { + if task.TaskID == finished_taskId { + index = i + continue + } + total_allocated_power += float64(task.Watts) * constants.Cap_margin + total_running_tasks++ + } + tasks = append(tasks[:index], tasks[index+1:]...) + } + average := total_allocated_power / float64(total_running_tasks) + ratios := []float64{} + for _, tpower := range total_power { + ratios = append(ratios, (average/tpower)*100) + } + sort.Float64s(ratios) + median, err := stats.Median(ratios) + if err == nil { + return median, nil + } else { + return 100, err + } } /* Quick sort algorithm to sort tasks, in place, in ascending order of power.*/ func (capper clusterwideCapper) quick_sort(low int, high int, tasks_to_sort []*def.Task) { - i := low - j := high - // calculating the pivot - pivot_index := low + (high - low)/2 - pivot := tasks_to_sort[pivot_index] - for i <= j { - for tasks_to_sort[i].Watts < pivot.Watts { - i++ - } - for tasks_to_sort[j].Watts > pivot.Watts { - j-- - } - if i <= j { - temp := tasks_to_sort[i] - tasks_to_sort[i] = tasks_to_sort[j] - tasks_to_sort[j] = temp - i++ - j-- - } - } - if low < j { - capper.quick_sort(low, j, tasks_to_sort) - } - if i < high { - capper.quick_sort(i, high, tasks_to_sort) - } + i := low + j := high + // calculating the pivot + pivot_index := low + (high-low)/2 + pivot := tasks_to_sort[pivot_index] + for i <= j { + for tasks_to_sort[i].Watts < pivot.Watts { + i++ + } + for tasks_to_sort[j].Watts > pivot.Watts { + j-- + } + if i <= j { + temp := tasks_to_sort[i] + tasks_to_sort[i] = tasks_to_sort[j] + tasks_to_sort[j] = temp + i++ + j-- + } + } + if low < j { + capper.quick_sort(low, j, tasks_to_sort) + } + if i < high { + capper.quick_sort(i, high, tasks_to_sort) + } } // Sorting tasks in ascending order of requested watts. func (capper clusterwideCapper) sort_tasks(tasks_to_sort []*def.Task) { - capper.quick_sort(0, len(tasks_to_sort)-1, tasks_to_sort) + capper.quick_sort(0, len(tasks_to_sort)-1, tasks_to_sort) } /* @@ -195,86 +196,86 @@ This completed task needs to be removed from the window of tasks (if it is still so that it doesn't contribute to the computation of the cap value. */ func (capper clusterwideCapper) taskFinished(taskID string) { - // If the window is empty the just return. This condition should technically return false. - if capper.window_of_tasks.Len() == 0 { - return - } + // If the window is empty the just return. This condition should technically return false. + if capper.window_of_tasks.Len() == 0 { + return + } - // Checking whether the task with the given taskID is currently present in the window of tasks. - var task_element_to_remove *list.Element - for task_element := capper.window_of_tasks.Front(); task_element != nil; task_element = task_element.Next() { - if tsk, ok := task_element.Value.(*def.Task); ok { - if tsk.TaskID == taskID { - task_element_to_remove = task_element - } - } - } + // Checking whether the task with the given taskID is currently present in the window of tasks. + var task_element_to_remove *list.Element + for task_element := capper.window_of_tasks.Front(); task_element != nil; task_element = task_element.Next() { + if tsk, ok := task_element.Value.(*def.Task); ok { + if tsk.TaskID == taskID { + task_element_to_remove = task_element + } + } + } - // Ee need to remove the task from the window. - if task_to_remove, ok := task_element_to_remove.Value.(*def.Task); ok { - capper.window_of_tasks.Remove(task_element_to_remove) - capper.number_of_tasks_in_window -= 1 - capper.current_sum -= float64(task_to_remove.Watts) * constants.Cap_margin - } + // Ee need to remove the task from the window. + if task_to_remove, ok := task_element_to_remove.Value.(*def.Task); ok { + capper.window_of_tasks.Remove(task_element_to_remove) + capper.number_of_tasks_in_window -= 1 + capper.current_sum -= float64(task_to_remove.Watts) * constants.Cap_margin + } } // Ranked based scheduling. func (capper clusterwideCapper) rankedDetermineCap(available_power map[string]float64, - tasks_to_schedule []*def.Task) ([]*def.Task, map[int]float64, error) { - // Validation - if available_power == nil || len(tasks_to_schedule) == 0 { - return nil, nil, errors.New("Invalid argument: available_power, tasks_to_schedule") - } else { - // Need to sort the tasks in ascending order of requested power. - capper.sort_tasks(tasks_to_schedule) + tasks_to_schedule []*def.Task) ([]*def.Task, map[int]float64, error) { + // Validation + if available_power == nil || len(tasks_to_schedule) == 0 { + return nil, nil, errors.New("Invalid argument: available_power, tasks_to_schedule") + } else { + // Need to sort the tasks in ascending order of requested power. + capper.sort_tasks(tasks_to_schedule) - // Now, for each task in the sorted set of tasks, we need to use the Fcfs_determine_cap logic. - cluster_wide_cap_values := make(map[int]float64) - index := 0 - for _, tsk := range tasks_to_schedule { - /* - Note that even though Fcfs_determine_cap is called, we have sorted the tasks aprior and thus, the tasks are scheduled in the sorted fashion. - Calling Fcfs_determine_cap(...) just to avoid redundant code. - */ - if cap, err := capper.fcfsDetermineCap(available_power, tsk); err == nil { - cluster_wide_cap_values[index] = cap - } else { - return nil, nil, err - } - index++ - } - // Now returning the sorted set of tasks and the cluster wide cap values for each task that is launched. - return tasks_to_schedule, cluster_wide_cap_values, nil - } + // Now, for each task in the sorted set of tasks, we need to use the Fcfs_determine_cap logic. + cluster_wide_cap_values := make(map[int]float64) + index := 0 + for _, tsk := range tasks_to_schedule { + /* + Note that even though Fcfs_determine_cap is called, we have sorted the tasks aprior and thus, the tasks are scheduled in the sorted fashion. + Calling Fcfs_determine_cap(...) just to avoid redundant code. + */ + if cap, err := capper.fcfsDetermineCap(available_power, tsk); err == nil { + cluster_wide_cap_values[index] = cap + } else { + return nil, nil, err + } + index++ + } + // Now returning the sorted set of tasks and the cluster wide cap values for each task that is launched. + return tasks_to_schedule, cluster_wide_cap_values, nil + } } // First come first serve scheduling. func (capper clusterwideCapper) fcfsDetermineCap(total_power map[string]float64, - new_task *def.Task) (float64, error) { - // Validation - if total_power == nil { - return 100, errors.New("Invalid argument: total_power") - } else { - // Need to calculate the running average - running_average := capper.running_average_of_watts(new_task) - // For each node, calculate the percentage of the running average to the total power. - running_average_to_total_power_percentage := make(map[string]float64) - for host, tpower := range total_power { - if tpower >= running_average { - running_average_to_total_power_percentage[host] = (running_average/tpower) * 100 - } else { - // We don't consider this host for the computation of the cluster wide cap. - } - } + new_task *def.Task) (float64, error) { + // Validation + if total_power == nil { + return 100, errors.New("Invalid argument: total_power") + } else { + // Need to calculate the running average + running_average := capper.running_average_of_watts(new_task) + // For each node, calculate the percentage of the running average to the total power. + running_average_to_total_power_percentage := make(map[string]float64) + for host, tpower := range total_power { + if tpower >= running_average { + running_average_to_total_power_percentage[host] = (running_average / tpower) * 100 + } else { + // We don't consider this host for the computation of the cluster wide cap. + } + } - // Determine the cluster wide cap value. - cap_value := capper.get_cap(running_average_to_total_power_percentage) - // Need to cap the cluster to this value. - return cap_value, nil - } + // Determine the cluster wide cap value. + cap_value := capper.get_cap(running_average_to_total_power_percentage) + // Need to cap the cluster to this value. + return cap_value, nil + } } // Stringer for an instance of clusterwideCapper func (capper clusterwideCapper) string() string { - return "Cluster Capper -- Proactively cap the entire cluster." + return "Cluster Capper -- Proactively cap the entire cluster." } diff --git a/schedulers/proactiveclusterwidecappingfcfs.go b/schedulers/proactiveclusterwidecappingfcfs.go index 679686a..b12cb7c 100644 --- a/schedulers/proactiveclusterwidecappingfcfs.go +++ b/schedulers/proactiveclusterwidecappingfcfs.go @@ -1,111 +1,111 @@ package schedulers import ( - "bitbucket.org/sunybingcloud/electron/def" - "bitbucket.org/sunybingcloud/electron/constants" - "bitbucket.org/sunybingcloud/electron/rapl" - "fmt" - "github.com/golang/protobuf/proto" - mesos "github.com/mesos/mesos-go/mesosproto" - "github.com/mesos/mesos-go/mesosutil" - sched "github.com/mesos/mesos-go/scheduler" - "log" - "math" - "strings" - "time" + "bitbucket.org/sunybingcloud/electron/constants" + "bitbucket.org/sunybingcloud/electron/def" + "bitbucket.org/sunybingcloud/electron/rapl" + "fmt" + "github.com/golang/protobuf/proto" + mesos "github.com/mesos/mesos-go/mesosproto" + "github.com/mesos/mesos-go/mesosutil" + sched "github.com/mesos/mesos-go/scheduler" + "log" + "math" + "strings" + "time" ) // Decides if to take an offer or not func (_ *ProactiveClusterwideCapFCFS) takeOffer(offer *mesos.Offer, task def.Task) bool { - offer_cpu, offer_mem, _ := OfferAgg(offer) + offer_cpu, offer_mem, _ := OfferAgg(offer) - if offer_cpu >= task.CPU && offer_mem >= task.RAM { - return true - } - return false + if offer_cpu >= task.CPU && offer_mem >= task.RAM { + return true + } + return false } // electronScheduler implements the Scheduler interface. type ProactiveClusterwideCapFCFS struct { - tasksCreated int - tasksRunning int - tasks []def.Task - metrics map[string]def.Metric - running map[string]map[string]bool - taskMonitor map[string][]def.Task // store tasks that are currently running. - availablePower map[string]float64 // available power for each node in the cluster. - totalPower map[string]float64 // total power for each node in the cluster. - ignoreWatts bool - capper *clusterwideCapper - ticker *time.Ticker - isCapping bool // indicate whether we are currently performing cluster wide capping. - //lock *sync.Mutex + tasksCreated int + tasksRunning int + tasks []def.Task + metrics map[string]def.Metric + running map[string]map[string]bool + taskMonitor map[string][]def.Task // store tasks that are currently running. + availablePower map[string]float64 // available power for each node in the cluster. + totalPower map[string]float64 // total power for each node in the cluster. + ignoreWatts bool + capper *clusterwideCapper + ticker *time.Ticker + isCapping bool // indicate whether we are currently performing cluster wide capping. + //lock *sync.Mutex - // First set of PCP values are garbage values, signal to logger to start recording when we're - // about to schedule the new task. - RecordPCP bool + // First set of PCP values are garbage values, signal to logger to start recording when we're + // about to schedule the new task. + RecordPCP bool - // This channel is closed when the program receives an interrupt, - // signalling that the program should shut down. - Shutdown chan struct{} + // This channel is closed when the program receives an interrupt, + // signalling that the program should shut down. + Shutdown chan struct{} - // This channel is closed after shutdown is closed, and only when all - // outstanding tasks have been cleaned up. - Done chan struct{} + // This channel is closed after shutdown is closed, and only when all + // outstanding tasks have been cleaned up. + Done chan struct{} - // Controls when to shutdown pcp logging. - PCPLog chan struct{} + // Controls when to shutdown pcp logging. + PCPLog chan struct{} } // New electron scheduler. func NewProactiveClusterwideCapFCFS(tasks []def.Task, ignoreWatts bool) *ProactiveClusterwideCapFCFS { - s := &ProactiveClusterwideCapFCFS { - tasks: tasks, - ignoreWatts: ignoreWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - taskMonitor: make(map[string][]def.Task), - availablePower: make(map[string]float64), - totalPower: make(map[string]float64), - RecordPCP: false, - capper: getClusterwideCapperInstance(), - ticker: time.NewTicker(5 * time.Second), - isCapping: false, - //lock: new(sync.Mutex), - } - return s + s := &ProactiveClusterwideCapFCFS{ + tasks: tasks, + ignoreWatts: ignoreWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + taskMonitor: make(map[string][]def.Task), + availablePower: make(map[string]float64), + totalPower: make(map[string]float64), + RecordPCP: false, + capper: getClusterwideCapperInstance(), + ticker: time.NewTicker(5 * time.Second), + isCapping: false, + //lock: new(sync.Mutex), + } + return s } func (s *ProactiveClusterwideCapFCFS) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { - taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) - s.tasksCreated++ + taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) + s.tasksCreated++ - if !s.RecordPCP { - // Turn on logging. - s.RecordPCP = true - time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts - } + if !s.RecordPCP { + // Turn on logging. + s.RecordPCP = true + time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts + } - // If this is our first time running into this Agent - if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok { - s.running[offer.GetSlaveId().GoString()] = make(map[string]bool) - } + // If this is our first time running into this Agent + if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok { + s.running[offer.GetSlaveId().GoString()] = make(map[string]bool) + } - // Setting the task ID to the task. This is done so that we can consider each task to be different, - // even though they have the same parameters. - task.SetTaskID(*proto.String(taskName)) - // Add task to the list of tasks running on the node. - s.running[offer.GetSlaveId().GoString()][taskName] = true - s.taskMonitor[offer.GetSlaveId().GoString()] = []def.Task{task} + // Setting the task ID to the task. This is done so that we can consider each task to be different, + // even though they have the same parameters. + task.SetTaskID(*proto.String(taskName)) + // Add task to the list of tasks running on the node. + s.running[offer.GetSlaveId().GoString()][taskName] = true + s.taskMonitor[offer.GetSlaveId().GoString()] = []def.Task{task} - resources := []*mesos.Resource{ - mesosutil.NewScalarResource("cpus", task.CPU), - mesosutil.NewScalarResource("mem", task.RAM), - } + resources := []*mesos.Resource{ + mesosutil.NewScalarResource("cpus", task.CPU), + mesosutil.NewScalarResource("mem", task.RAM), + } - if !s.ignoreWatts { + if !s.ignoreWatts { resources = append(resources, mesosutil.NewScalarResource("watts", task.Watts)) } @@ -130,189 +130,189 @@ func (s *ProactiveClusterwideCapFCFS) newTask(offer *mesos.Offer, task def.Task) } func (s *ProactiveClusterwideCapFCFS) Registered( - _ sched.SchedulerDriver, - frameworkID *mesos.FrameworkID, - masterInfo *mesos.MasterInfo) { - log.Printf("Framework %s registered with master %s", frameworkID, masterInfo) + _ sched.SchedulerDriver, + frameworkID *mesos.FrameworkID, + masterInfo *mesos.MasterInfo) { + log.Printf("Framework %s registered with master %s", frameworkID, masterInfo) } func (s *ProactiveClusterwideCapFCFS) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) { - log.Printf("Framework re-registered with master %s", masterInfo) + log.Printf("Framework re-registered with master %s", masterInfo) } func (s *ProactiveClusterwideCapFCFS) Disconnected(sched.SchedulerDriver) { - // Need to stop the capping process. - s.ticker.Stop() - s.isCapping = false - log.Println("Framework disconnected with master") + // Need to stop the capping process. + s.ticker.Stop() + s.isCapping = false + log.Println("Framework disconnected with master") } // go routine to cap the entire cluster in regular intervals of time. var currentCapValue = 0.0 // initial value to indicate that we haven't capped the cluster yet. func (s *ProactiveClusterwideCapFCFS) startCapping() { - go func() { - for { - select { - case <- s.ticker.C: - // Need to cap the cluster to the currentCapValue. - if currentCapValue > 0.0 { - //mutex.Lock() - //s.lock.Lock() - for _, host := range constants.Hosts { - // Rounding curreCapValue to the nearest int. - if err := rapl.Cap(host, "rapl", int(math.Floor(currentCapValue + 0.5))); err != nil { - fmt.Println(err) - } else { - fmt.Printf("Successfully capped %s to %f%\n", host, currentCapValue) - } - } - //mutex.Unlock() - //s.lock.Unlock() - } - } - } - }() + go func() { + for { + select { + case <-s.ticker.C: + // Need to cap the cluster to the currentCapValue. + if currentCapValue > 0.0 { + //mutex.Lock() + //s.lock.Lock() + for _, host := range constants.Hosts { + // Rounding curreCapValue to the nearest int. + if err := rapl.Cap(host, "rapl", int(math.Floor(currentCapValue+0.5))); err != nil { + fmt.Println(err) + } else { + fmt.Printf("Successfully capped %s to %f%\n", host, currentCapValue) + } + } + //mutex.Unlock() + //s.lock.Unlock() + } + } + } + }() } // Stop cluster wide capping func (s *ProactiveClusterwideCapFCFS) stopCapping() { - if s.isCapping { - log.Println("Stopping the cluster wide capping.") - s.ticker.Stop() - s.isCapping = false - } + if s.isCapping { + log.Println("Stopping the cluster wide capping.") + s.ticker.Stop() + s.isCapping = false + } } // TODO: Need to reduce the time complexity: looping over offers twice (Possible to do it just once?). func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { - log.Printf("Received %d resource offers", len(offers)) + log.Printf("Received %d resource offers", len(offers)) - // retrieving the available power for all the hosts in the offers. - for _, offer := range offers { - _, _, offer_watts := OfferAgg(offer) - s.availablePower[*offer.Hostname] = offer_watts - // setting total power if the first time. - if _, ok := s.totalPower[*offer.Hostname]; !ok { - s.totalPower[*offer.Hostname] = offer_watts - } - } + // retrieving the available power for all the hosts in the offers. + for _, offer := range offers { + _, _, offer_watts := OfferAgg(offer) + s.availablePower[*offer.Hostname] = offer_watts + // setting total power if the first time. + if _, ok := s.totalPower[*offer.Hostname]; !ok { + s.totalPower[*offer.Hostname] = offer_watts + } + } - for host, tpower := range s.totalPower { - fmt.Printf("TotalPower[%s] = %f\n", host, tpower) - } - for host, apower := range s.availablePower { - fmt.Printf("AvailablePower[%s] = %f\n", host, apower) - } + for host, tpower := range s.totalPower { + fmt.Printf("TotalPower[%s] = %f\n", host, tpower) + } + for host, apower := range s.availablePower { + fmt.Printf("AvailablePower[%s] = %f\n", host, apower) + } - for _, offer := range offers { - select { - case <-s.Shutdown: - log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + for _, offer := range offers { + select { + case <-s.Shutdown: + log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") + driver.DeclineOffer(offer.Id, longFilter) - log.Println("Number of tasks still running: ", s.tasksRunning) - continue - default: - } + log.Println("Number of tasks still running: ", s.tasksRunning) + continue + default: + } - /* - Clusterwide Capping strategy + /* + Clusterwide Capping strategy - For each task in s.tasks, - 1. Need to check whether the offer can be taken or not (based on CPU and RAM requirements). - 2. If the tasks fits the offer, then I need to detemrine the cluster wide cap. - 3. currentCapValue is updated with the determined cluster wide cap. + For each task in s.tasks, + 1. Need to check whether the offer can be taken or not (based on CPU and RAM requirements). + 2. If the tasks fits the offer, then I need to detemrine the cluster wide cap. + 3. currentCapValue is updated with the determined cluster wide cap. - Cluster wide capping is currently performed at regular intervals of time. - TODO: We can choose to cap the cluster only if the clusterwide cap varies more than the current clusterwide cap. - Although this sounds like a better approach, it only works when the resource requirements of neighbouring tasks are similar. - */ - //offer_cpu, offer_ram, _ := OfferAgg(offer) + Cluster wide capping is currently performed at regular intervals of time. + TODO: We can choose to cap the cluster only if the clusterwide cap varies more than the current clusterwide cap. + Although this sounds like a better approach, it only works when the resource requirements of neighbouring tasks are similar. + */ + //offer_cpu, offer_ram, _ := OfferAgg(offer) - taken := false - //var mutex sync.Mutex + taken := false + //var mutex sync.Mutex - for i, task := range s.tasks { - // Don't take offer if it doesn't match our task's host requirement. - if !strings.HasPrefix(*offer.Hostname, task.Host) { - continue - } + for i, task := range s.tasks { + // Don't take offer if it doesn't match our task's host requirement. + if !strings.HasPrefix(*offer.Hostname, task.Host) { + continue + } - // Does the task fit. - if s.takeOffer(offer, task) { - // Capping the cluster if haven't yet started, - if !s.isCapping { - s.startCapping() - s.isCapping = true - } - taken = true - //mutex.Lock() - //s.lock.Lock() - //tempCap, err := s.capper.fcfsDetermineCap(s.availablePower, &task) - tempCap, err := s.capper.fcfsDetermineCap(s.totalPower, &task) + // Does the task fit. + if s.takeOffer(offer, task) { + // Capping the cluster if haven't yet started, + if !s.isCapping { + s.startCapping() + s.isCapping = true + } + taken = true + //mutex.Lock() + //s.lock.Lock() + //tempCap, err := s.capper.fcfsDetermineCap(s.availablePower, &task) + tempCap, err := s.capper.fcfsDetermineCap(s.totalPower, &task) - if err == nil { - currentCapValue = tempCap - } else { - fmt.Printf("Failed to determine new cluster wide cap: ") - fmt.Println(err) - } - //mutex.Unlock() - //s.lock.Unlock() - fmt.Printf("Starting on [%s]\n", offer.GetHostname()) - to_schedule := []*mesos.TaskInfo{s.newTask(offer, task)} - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, to_schedule, defaultFilter) - fmt.Printf("Inst: %d", *task.Instances) - *task.Instances-- - if *task.Instances <= 0 { - // All instances of the task have been scheduled. Need to remove it from the list of tasks to schedule. - s.tasks[i] = s.tasks[len(s.tasks)-1] + if err == nil { + currentCapValue = tempCap + } else { + fmt.Printf("Failed to determine new cluster wide cap: ") + fmt.Println(err) + } + //mutex.Unlock() + //s.lock.Unlock() + fmt.Printf("Starting on [%s]\n", offer.GetHostname()) + to_schedule := []*mesos.TaskInfo{s.newTask(offer, task)} + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, to_schedule, defaultFilter) + fmt.Printf("Inst: %d", *task.Instances) + *task.Instances-- + if *task.Instances <= 0 { + // All instances of the task have been scheduled. Need to remove it from the list of tasks to schedule. + s.tasks[i] = s.tasks[len(s.tasks)-1] s.tasks = s.tasks[:len(s.tasks)-1] if len(s.tasks) <= 0 { log.Println("Done scheduling all tasks") - // Need to stop the cluster wide capping as there aren't any more tasks to schedule. - s.stopCapping() + // Need to stop the cluster wide capping as there aren't any more tasks to schedule. + s.stopCapping() close(s.Shutdown) } - } - break // Offer taken, move on. - } else { - // Task doesn't fit the offer. Move onto the next offer. - } - } + } + break // Offer taken, move on. + } else { + // Task doesn't fit the offer. Move onto the next offer. + } + } - // If no task fit the offer, then declining the offer. - if !taken { - fmt.Printf("There is not enough resources to launch a task on Host: %s\n", offer.GetHostname()) - cpus, mem, watts := OfferAgg(offer) + // If no task fit the offer, then declining the offer. + if !taken { + fmt.Printf("There is not enough resources to launch a task on Host: %s\n", offer.GetHostname()) + cpus, mem, watts := OfferAgg(offer) - log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) - } - } + log.Printf("\n", cpus, mem, watts) + driver.DeclineOffer(offer.Id, defaultFilter) + } + } } func (s *ProactiveClusterwideCapFCFS) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { - log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) + log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) if *status.State == mesos.TaskState_TASK_RUNNING { s.tasksRunning++ } else if IsTerminal(status.State) { delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) - // Need to remove the task from the window of tasks. - s.capper.taskFinished(*status.TaskId.Value) - //currentCapValue, _ = s.capper.recap(s.availablePower, s.taskMonitor, *status.TaskId.Value) - // Determining the new cluster wide cap. - currentCapValue, _ = s.capper.recap(s.totalPower, s.taskMonitor, *status.TaskId.Value) - log.Printf("Recapping the cluster to %f\n", currentCapValue) + // Need to remove the task from the window of tasks. + s.capper.taskFinished(*status.TaskId.Value) + //currentCapValue, _ = s.capper.recap(s.availablePower, s.taskMonitor, *status.TaskId.Value) + // Determining the new cluster wide cap. + currentCapValue, _ = s.capper.recap(s.totalPower, s.taskMonitor, *status.TaskId.Value) + log.Printf("Recapping the cluster to %f\n", currentCapValue) s.tasksRunning-- if s.tasksRunning == 0 { select { case <-s.Shutdown: - // Need to stop the capping process. - s.stopCapping() + // Need to stop the capping process. + s.stopCapping() close(s.Done) default: } @@ -322,20 +322,20 @@ func (s *ProactiveClusterwideCapFCFS) StatusUpdate(driver sched.SchedulerDriver, } func (s *ProactiveClusterwideCapFCFS) FrameworkMessage(driver sched.SchedulerDriver, - executorID *mesos.ExecutorID, - slaveID *mesos.SlaveID, - message string) { + executorID *mesos.ExecutorID, + slaveID *mesos.SlaveID, + message string) { log.Println("Getting a framework message: ", message) log.Printf("Received a framework message from some unknown source: %s", *executorID.Value) } func (s *ProactiveClusterwideCapFCFS) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) { - log.Printf("Offer %s rescinded", offerID) + log.Printf("Offer %s rescinded", offerID) } func (s *ProactiveClusterwideCapFCFS) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) { - log.Printf("Slave %s lost", slaveID) + log.Printf("Slave %s lost", slaveID) } func (s *ProactiveClusterwideCapFCFS) ExecutorLost(_ sched.SchedulerDriver, executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, status int) {