From c1eaa453a2a334410bd60b6681fd243f288e5f54 Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Thu, 17 Nov 2016 21:51:02 -0500 Subject: [PATCH] Sycnrhonized operations that change the value of the cluster wide cap. Added cleverRecap(...) that determines the recap value of the cluster at a much finer level, taking into account the average load on each node in the cluster. Bug fix in cap.go -- closed the session once capping had been done. This prevented from running out of file descriptors. --- constants/constants.go | 20 +-- rapl/cap.go | 1 + schedulers/proactiveclusterwidecappers.go | 106 ++++++++++++-- schedulers/proactiveclusterwidecappingfcfs.go | 135 +++++++++++++----- 4 files changed, 194 insertions(+), 68 deletions(-) diff --git a/constants/constants.go b/constants/constants.go index 133d61f..cc6d705 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -34,7 +34,7 @@ var Power_threshold = 0.6 // Right now saying that a task will never be given le So, if power required = 10W, the node would be capped to 75%*10W. This value can be changed upon convenience. */ -var Cap_margin = 0.75 +var Cap_margin = 0.70 // Modify the cap margin. func UpdateCapMargin(new_cap_margin float64) bool { @@ -84,20 +84,4 @@ func UpdateWindowSize(new_window_size int) bool { Window_size = new_window_size return true } -} - -// // Time duration between successive cluster wide capping. -// var Clusterwide_cap_interval = 10 // Right now capping the cluster at 10 second intervals. -// -// // Modify the cluster wide capping interval. We can update the interval depending on the workload. -// // TODO: If the workload is heavy then we can set a longer interval, while on the other hand, -// // if the workload is light then a smaller interval is sufficient. -// func UpdateClusterwideCapInterval(new_interval int) bool { -// // Validation -// if new_interval == 0.0 { -// return false -// } else { -// Clusterwide_cap_interval = new_interval -// return true -// } -// } +} \ No newline at end of file diff --git a/rapl/cap.go b/rapl/cap.go index 20cd945..b15d352 100644 --- a/rapl/cap.go +++ b/rapl/cap.go @@ -26,6 +26,7 @@ func Cap(host, username string, percentage int) error { } session, err := connection.NewSession() + defer session.Close() if err != nil { return errors.Wrap(err, "Failed to create session") } diff --git a/schedulers/proactiveclusterwidecappers.go b/schedulers/proactiveclusterwidecappers.go index aa3eafa..38aaca0 100644 --- a/schedulers/proactiveclusterwidecappers.go +++ b/schedulers/proactiveclusterwidecappers.go @@ -16,6 +16,7 @@ import ( "container/list" "errors" "github.com/montanaflynn/stats" + "log" "sort" ) @@ -110,6 +111,68 @@ func (capper clusterwideCapper) get_cap(running_average_to_total_power_percentag return 100.0 } +/* +Recapping the entire cluster. Also, removing the finished task from the list of running tasks. + +We would, at this point, have a better knowledge about the state of the cluster. + +1. Calculate the total allocated watts per node in the cluster. +2. Compute the ratio of the total watts usage per node to the total power for that node. + This would give us the load on that node. +3. Now, compute the average load across all the nodes in the cluster. + This would be the cap value. +*/ +func (capper clusterwideCapper) cleverRecap(total_power map[string]float64, + task_monitor map[string][]def.Task, finished_taskId string) (float64, error) { + // Validation + if total_power == nil || task_monitor == nil { + return 100.0, errors.New("Invalid argument: total_power, task_monitor") + } + // watts usage on each node in the cluster. + watts_usages := make(map[string][]float64) + host_of_finished_task := "" + index_of_finished_task := -1 + for _, host := range constants.Hosts { + watts_usages[host] = []float64{0.0} + } + for host, tasks := range task_monitor { + for i, task := range tasks { + if task.TaskID == finished_taskId { + host_of_finished_task = host + index_of_finished_task = i + // Not considering this task + continue + } + watts_usages[host] = append(watts_usages[host], float64(task.Watts) * constants.Cap_margin) + } + } + + // Updating task monitor + if host_of_finished_task != "" && index_of_finished_task != -1 { + log.Printf("Removing task with task [%s] from the list of running tasks\n", + task_monitor[host_of_finished_task][index_of_finished_task].TaskID) + task_monitor[host_of_finished_task] = append(task_monitor[host_of_finished_task][:index_of_finished_task], + task_monitor[host_of_finished_task][index_of_finished_task+1:]...) + } + + // load on each node in the cluster. + loads := []float64{} + for host, usages := range watts_usages { + total_usage := 0.0 + for _, usage := range usages { + total_usage += usage + } + loads = append(loads, total_usage / total_power[host]) + } + // Now need to compute the average load. + total_load := 0.0 + for _, load := range loads { + total_load += load + } + average_load := total_load / float64(len(loads)) // this would be the cap value. + return average_load, nil +} + /* Recapping the entire cluster. @@ -128,18 +191,35 @@ func (capper clusterwideCapper) recap(total_power map[string]float64, } total_allocated_power := 0.0 total_running_tasks := 0 - for _, tasks := range task_monitor { - index := 0 - for i, task := range tasks { - if task.TaskID == finished_taskId { - index = i - continue - } - total_allocated_power += float64(task.Watts) * constants.Cap_margin - total_running_tasks++ - } - tasks = append(tasks[:index], tasks[index+1:]...) - } + + host_of_finished_task := "" + index_of_finished_task := -1 + for host, tasks := range task_monitor { + for i, task := range tasks { + if task.TaskID == finished_taskId { + host_of_finished_task = host + index_of_finished_task = i + // Not considering this task for the computation of total_allocated_power and total_running_tasks + continue + } + total_allocated_power += (float64(task.Watts) * constants.Cap_margin) + total_running_tasks++ + } + } + + // Updating task monitor + if host_of_finished_task != "" && index_of_finished_task != -1 { + log.Printf("Removing task with task [%s] from the list of running tasks\n", + task_monitor[host_of_finished_task][index_of_finished_task].TaskID) + task_monitor[host_of_finished_task] = append(task_monitor[host_of_finished_task][:index_of_finished_task], + task_monitor[host_of_finished_task][index_of_finished_task+1:]...) + } + + // For the last task, total_allocated_power and total_running_tasks would be 0 + if total_allocated_power == 0 && total_running_tasks == 0 { + return 100, errors.New("No task running on the cluster.") + } + average := total_allocated_power / float64(total_running_tasks) ratios := []float64{} for _, tpower := range total_power { @@ -211,7 +291,7 @@ func (capper clusterwideCapper) taskFinished(taskID string) { } } - // Ee need to remove the task from the window. + // we need to remove the task from the window. if task_to_remove, ok := task_element_to_remove.Value.(*def.Task); ok { capper.window_of_tasks.Remove(task_element_to_remove) capper.number_of_tasks_in_window -= 1 diff --git a/schedulers/proactiveclusterwidecappingfcfs.go b/schedulers/proactiveclusterwidecappingfcfs.go index b12cb7c..59c3ac5 100644 --- a/schedulers/proactiveclusterwidecappingfcfs.go +++ b/schedulers/proactiveclusterwidecappingfcfs.go @@ -12,14 +12,15 @@ import ( "log" "math" "strings" + "sync" "time" ) // Decides if to take an offer or not func (_ *ProactiveClusterwideCapFCFS) takeOffer(offer *mesos.Offer, task def.Task) bool { - offer_cpu, offer_mem, _ := OfferAgg(offer) + offer_cpu, offer_mem, offer_watts := OfferAgg(offer) - if offer_cpu >= task.CPU && offer_mem >= task.RAM { + if offer_cpu >= task.CPU && offer_mem >= task.RAM && offer_watts >= task.Watts { return true } return false @@ -38,8 +39,9 @@ type ProactiveClusterwideCapFCFS struct { ignoreWatts bool capper *clusterwideCapper ticker *time.Ticker + recapTicker *time.Ticker isCapping bool // indicate whether we are currently performing cluster wide capping. - //lock *sync.Mutex + isRecapping bool // indicate whether we are currently performing cluster wide re-capping. // First set of PCP values are garbage values, signal to logger to start recording when we're // about to schedule the new task. @@ -71,13 +73,17 @@ func NewProactiveClusterwideCapFCFS(tasks []def.Task, ignoreWatts bool) *Proacti totalPower: make(map[string]float64), RecordPCP: false, capper: getClusterwideCapperInstance(), - ticker: time.NewTicker(5 * time.Second), + ticker: time.NewTicker(10 * time.Second), + recapTicker: time.NewTicker(20 * time.Second), isCapping: false, - //lock: new(sync.Mutex), + isRecapping: false, } return s } +// mutex +var mutex sync.Mutex + func (s *ProactiveClusterwideCapFCFS) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) s.tasksCreated++ @@ -95,10 +101,14 @@ func (s *ProactiveClusterwideCapFCFS) newTask(offer *mesos.Offer, task def.Task) // Setting the task ID to the task. This is done so that we can consider each task to be different, // even though they have the same parameters. - task.SetTaskID(*proto.String(taskName)) + task.SetTaskID(*proto.String("electron-" + taskName)) // Add task to the list of tasks running on the node. s.running[offer.GetSlaveId().GoString()][taskName] = true - s.taskMonitor[offer.GetSlaveId().GoString()] = []def.Task{task} + if len(s.taskMonitor[offer.GetSlaveId().GoString()]) == 0 { + s.taskMonitor[offer.GetSlaveId().GoString()] = []def.Task{task} + } else { + s.taskMonitor[offer.GetSlaveId().GoString()] = append(s.taskMonitor[offer.GetSlaveId().GoString()], task) + } resources := []*mesos.Resource{ mesosutil.NewScalarResource("cpus", task.CPU), @@ -143,7 +153,10 @@ func (s *ProactiveClusterwideCapFCFS) Reregistered(_ sched.SchedulerDriver, mast func (s *ProactiveClusterwideCapFCFS) Disconnected(sched.SchedulerDriver) { // Need to stop the capping process. s.ticker.Stop() + s.recapTicker.Stop() + mutex.Lock() s.isCapping = false + mutex.Unlock() log.Println("Framework disconnected with master") } @@ -155,20 +168,44 @@ func (s *ProactiveClusterwideCapFCFS) startCapping() { select { case <-s.ticker.C: // Need to cap the cluster to the currentCapValue. + mutex.Lock() if currentCapValue > 0.0 { - //mutex.Lock() - //s.lock.Lock() for _, host := range constants.Hosts { // Rounding curreCapValue to the nearest int. if err := rapl.Cap(host, "rapl", int(math.Floor(currentCapValue+0.5))); err != nil { - fmt.Println(err) - } else { - fmt.Printf("Successfully capped %s to %f%\n", host, currentCapValue) + log.Println(err) } } - //mutex.Unlock() - //s.lock.Unlock() + log.Printf("Capped the cluster to %d", int(math.Floor(currentCapValue+0.5))) } + mutex.Unlock() + } + } + }() +} + +// go routine to cap the entire cluster in regular intervals of time. +var recapValue = 0.0 // The cluster wide cap value when recapping. +func (s *ProactiveClusterwideCapFCFS) startRecapping() { + go func() { + for { + select { + case <-s.recapTicker.C: + mutex.Lock() + // If stopped performing cluster wide capping then we need to explicitly cap the entire cluster. + //if !s.isCapping && s.isRecapping && recapValue > 0.0 { + if s.isRecapping && recapValue > 0.0 { + for _, host := range constants.Hosts { + // Rounding curreCapValue to the nearest int. + if err := rapl.Cap(host, "rapl", int(math.Floor(recapValue+0.5))); err != nil { + log.Println(err) + } + } + log.Printf("Recapped the cluster to %d", int(math.Floor(recapValue+0.5))) + } + // setting recapping to false + s.isRecapping = false + mutex.Unlock() } } }() @@ -179,7 +216,22 @@ func (s *ProactiveClusterwideCapFCFS) stopCapping() { if s.isCapping { log.Println("Stopping the cluster wide capping.") s.ticker.Stop() + mutex.Lock() s.isCapping = false + s.isRecapping = true + mutex.Unlock() + } +} + +// Stop cluster wide Recapping +func (s *ProactiveClusterwideCapFCFS) stopRecapping() { + // If not capping, then definitely recapping. + if !s.isCapping && s.isRecapping { + log.Println("Stopping the cluster wide re-capping.") + s.recapTicker.Stop() + mutex.Lock() + s.isRecapping = false + mutex.Unlock() } } @@ -198,10 +250,7 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive } for host, tpower := range s.totalPower { - fmt.Printf("TotalPower[%s] = %f\n", host, tpower) - } - for host, apower := range s.availablePower { - fmt.Printf("AvailablePower[%s] = %f\n", host, apower) + log.Printf("TotalPower[%s] = %f", host, tpower) } for _, offer := range offers { @@ -227,10 +276,7 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive TODO: We can choose to cap the cluster only if the clusterwide cap varies more than the current clusterwide cap. Although this sounds like a better approach, it only works when the resource requirements of neighbouring tasks are similar. */ - //offer_cpu, offer_ram, _ := OfferAgg(offer) - taken := false - //var mutex sync.Mutex for i, task := range s.tasks { // Don't take offer if it doesn't match our task's host requirement. @@ -242,27 +288,26 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive if s.takeOffer(offer, task) { // Capping the cluster if haven't yet started, if !s.isCapping { - s.startCapping() + mutex.Lock() s.isCapping = true + mutex.Unlock() + s.startCapping() } taken = true - //mutex.Lock() - //s.lock.Lock() - //tempCap, err := s.capper.fcfsDetermineCap(s.availablePower, &task) tempCap, err := s.capper.fcfsDetermineCap(s.totalPower, &task) if err == nil { + mutex.Lock() currentCapValue = tempCap + mutex.Unlock() } else { - fmt.Printf("Failed to determine new cluster wide cap: ") - fmt.Println(err) + log.Printf("Failed to determine new cluster wide cap: ") + log.Println(err) } - //mutex.Unlock() - //s.lock.Unlock() - fmt.Printf("Starting on [%s]\n", offer.GetHostname()) + log.Printf("Starting on [%s]\n", offer.GetHostname()) to_schedule := []*mesos.TaskInfo{s.newTask(offer, task)} driver.LaunchTasks([]*mesos.OfferID{offer.Id}, to_schedule, defaultFilter) - fmt.Printf("Inst: %d", *task.Instances) + log.Printf("Inst: %d", *task.Instances) *task.Instances-- if *task.Instances <= 0 { // All instances of the task have been scheduled. Need to remove it from the list of tasks to schedule. @@ -273,6 +318,7 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive log.Println("Done scheduling all tasks") // Need to stop the cluster wide capping as there aren't any more tasks to schedule. s.stopCapping() + s.startRecapping() // Load changes after every task finishes and hence we need to change the capping of the cluster. close(s.Shutdown) } } @@ -284,7 +330,7 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive // If no task fit the offer, then declining the offer. if !taken { - fmt.Printf("There is not enough resources to launch a task on Host: %s\n", offer.GetHostname()) + log.Printf("There is not enough resources to launch a task on Host: %s\n", offer.GetHostname()) cpus, mem, watts := OfferAgg(offer) log.Printf("\n", cpus, mem, watts) @@ -294,7 +340,7 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive } func (s *ProactiveClusterwideCapFCFS) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { - log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) + log.Printf("Received task status [%s] for task [%s]\n", NameFor(status.State), *status.TaskId.Value) if *status.State == mesos.TaskState_TASK_RUNNING { s.tasksRunning++ @@ -302,17 +348,32 @@ func (s *ProactiveClusterwideCapFCFS) StatusUpdate(driver sched.SchedulerDriver, delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) // Need to remove the task from the window of tasks. s.capper.taskFinished(*status.TaskId.Value) - //currentCapValue, _ = s.capper.recap(s.availablePower, s.taskMonitor, *status.TaskId.Value) // Determining the new cluster wide cap. - currentCapValue, _ = s.capper.recap(s.totalPower, s.taskMonitor, *status.TaskId.Value) - log.Printf("Recapping the cluster to %f\n", currentCapValue) + tempCap, err := s.capper.recap(s.totalPower, s.taskMonitor, *status.TaskId.Value) + if err == nil { + // if new determined cap value is different from the current recap value then we need to recap. + if int(math.Floor(tempCap+0.5)) != int(math.Floor(recapValue+0.5)) { + recapValue = tempCap + mutex.Lock() + s.isRecapping = true + mutex.Unlock() + log.Printf("Determined re-cap value: %f\n", recapValue) + } else { + mutex.Lock() + s.isRecapping = false + mutex.Unlock() + } + } else { + // Not updating currentCapValue + log.Println(err) + } s.tasksRunning-- if s.tasksRunning == 0 { select { case <-s.Shutdown: // Need to stop the capping process. - s.stopCapping() + s.stopRecapping() close(s.Done) default: }