diff --git a/README.md b/README.md index 2033b81..7b3cd87 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,10 @@ To Do: * Add ability to use constraints * Running average calculations https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average * Make parameters corresponding to each scheduler configurable (possible to have a config template for each scheduler?) + * Write test code for each scheduler (This should be after the design change) + * Some of the constants in constants/constants.go can vary based on the environment. + Possible to setup the constants at runtime based on the environment? + **Requires Performance-Copilot tool pmdumptext to be installed on the machine on which electron is launched for logging to work** diff --git a/schedulers/proactiveclusterwidecappers.go b/pcp/proactiveclusterwidecappers.go similarity index 52% rename from schedulers/proactiveclusterwidecappers.go rename to pcp/proactiveclusterwidecappers.go index e943d37..fe94022 100644 --- a/schedulers/proactiveclusterwidecappers.go +++ b/pcp/proactiveclusterwidecappers.go @@ -1,46 +1,45 @@ /* Cluster wide dynamic capping -Step1. Compute the running average of watts of tasks in window. -Step2. Compute what percentage of total power of each node, is the running average. -Step3. Compute the median of the percetages and this is the percentage that the cluster needs to be capped at. - -1. First fit scheduling -- Perform the above steps for each task that needs to be scheduled. -2. Ranked based scheduling -- Sort the tasks to be scheduled, in ascending order, and then determine the cluster wide cap. - -This is not a scheduler but a scheduling scheme that schedulers can use. +this is not a scheduler but a scheduling scheme that schedulers can use. */ -package schedulers +package pcp import ( "bitbucket.org/sunybingcloud/electron/constants" "bitbucket.org/sunybingcloud/electron/def" - "container/list" + "bitbucket.org/sunybingcloud/electron/utilities/runAvg" "errors" "github.com/montanaflynn/stats" "log" "sort" ) -// Structure containing utility data structures used to compute cluster-wide dynamic cap. -type clusterwideCapper struct { - // window of tasks. - windowOfTasks list.List - // The current sum of requested powers of the tasks in the window. - currentSum float64 - // The current number of tasks in the window. - numberOfTasksInWindow int +// wrapper around def.Task that implements runAvg.Interface +type taskWrapper struct { + task def.Task } -// Defining constructor for clusterwideCapper. Please don't call this directly and instead use getClusterwideCapperInstance(). -func newClusterwideCapper() *clusterwideCapper { - return &clusterwideCapper{currentSum: 0.0, numberOfTasksInWindow: 0} +func (tw taskWrapper) Val() float64 { + return tw.task.Watts * constants.CapMargin +} + +func (tw taskWrapper) ID() string { + return tw.task.TaskID +} + +// Cluster wide capper +type ClusterwideCapper struct {} + +// Defining constructor for clusterwideCapper. Please don't call this directly and instead use GetClusterwideCapperInstance() +func newClusterwideCapper() *ClusterwideCapper { + return &ClusterwideCapper{} } // Singleton instance of clusterwideCapper -var singletonCapper *clusterwideCapper +var singletonCapper *ClusterwideCapper // Retrieve the singleton instance of clusterwideCapper. -func getClusterwideCapperInstance() *clusterwideCapper { +func GetClusterwideCapperInstance() *ClusterwideCapper { if singletonCapper == nil { singletonCapper = newClusterwideCapper() } else { @@ -49,57 +48,25 @@ func getClusterwideCapperInstance() *clusterwideCapper { return singletonCapper } -// Clear and initialize all the members of clusterwideCapper. -func (capper clusterwideCapper) clear() { - capper.windowOfTasks.Init() - capper.currentSum = 0 - capper.numberOfTasksInWindow = 0 -} - -// Compute the average of watts of all the tasks in the window. -func (capper clusterwideCapper) average() float64 { - return capper.currentSum / float64(capper.windowOfTasks.Len()) -} - -/* -Compute the running average. - -Using clusterwideCapper#windowOfTasks to store the tasks. -Task at position 0 (oldest task) is removed when the window is full and new task arrives. -*/ -func (capper clusterwideCapper) runningAverageOfWatts(tsk *def.Task) float64 { - var average float64 - if capper.numberOfTasksInWindow < constants.WindowSize { - capper.windowOfTasks.PushBack(tsk) - capper.numberOfTasksInWindow++ - capper.currentSum += float64(tsk.Watts) * constants.CapMargin - } else { - taskToRemoveElement := capper.windowOfTasks.Front() - if taskToRemove, ok := taskToRemoveElement.Value.(*def.Task); ok { - capper.currentSum -= float64(taskToRemove.Watts) * constants.CapMargin - capper.windowOfTasks.Remove(taskToRemoveElement) - } - capper.windowOfTasks.PushBack(tsk) - capper.currentSum += float64(tsk.Watts) * constants.CapMargin - } - average = capper.average() - return average +// Clear and initialize the runAvg calculator +func (capper ClusterwideCapper) clear() { + runAvg.Init() } /* Calculating cap value. -1. Sorting the values of runningAverageToTotalPowerPercentage in ascending order. +1. Sorting the values of ratios ((running average/totalPower) per node) in ascending order. 2. Computing the median of above sorted values. 3. The median is now the cap. */ -func (capper clusterwideCapper) getCap(runningAverageToTotalPowerPercentage map[string]float64) float64 { +func (capper ClusterwideCapper) getCap(ratios map[string]float64) float64 { var values []float64 // Validation - if runningAverageToTotalPowerPercentage == nil { + if ratios == nil { return 100.0 } - for _, apower := range runningAverageToTotalPowerPercentage { + for _, apower := range ratios { values = append(values, apower) } // sorting the values in ascending order. @@ -113,25 +80,25 @@ func (capper clusterwideCapper) getCap(runningAverageToTotalPowerPercentage map[ } /* -A recapping strategy which decides between 2 different recapping schemes. +A Recapping strategy which decides between 2 different Recapping schemes. 1. the regular scheme based on the average power usage across the cluster. 2. A scheme based on the average of the loads on each node in the cluster. -The recap value picked the least among the two. +The Recap value picked the least among the two. -The cleverRecap scheme works well when the cluster is relatively idle and until then, - the primitive recapping scheme works better. +The CleverRecap scheme works well when the cluster is relatively idle and until then, + the primitive Recapping scheme works better. */ -func (capper clusterwideCapper) cleverRecap(totalPower map[string]float64, +func (capper ClusterwideCapper) CleverRecap(totalPower map[string]float64, taskMonitor map[string][]def.Task, finishedTaskId string) (float64, error) { // Validation if totalPower == nil || taskMonitor == nil { return 100.0, errors.New("Invalid argument: totalPower, taskMonitor") } - // determining the recap value by calling the regular recap(...) + // determining the Recap value by calling the regular Recap(...) toggle := false - recapValue, err := capper.recap(totalPower, taskMonitor, finishedTaskId) + RecapValue, err := capper.Recap(totalPower, taskMonitor, finishedTaskId) if err == nil { toggle = true } @@ -155,8 +122,8 @@ func (capper clusterwideCapper) cleverRecap(totalPower map[string]float64, } } - // Updating task monitor. If recap(...) has deleted the finished task from the taskMonitor, - // then this will be ignored. Else (this is only when an error occured with recap(...)), we remove it here. + // Updating task monitor. If Recap(...) has deleted the finished task from the taskMonitor, + // then this will be ignored. Else (this is only when an error occured with Recap(...)), we remove it here. if hostOfFinishedTask != "" && indexOfFinishedTask != -1 { log.Printf("Removing task with task [%s] from the list of running tasks\n", taskMonitor[hostOfFinishedTask][indexOfFinishedTask].TaskID) @@ -189,12 +156,12 @@ func (capper clusterwideCapper) cleverRecap(totalPower map[string]float64, totalLoad += load } averageLoad := (totalLoad / float64(len(loads)) * 100.0) // this would be the cap value. - // If toggle is true, then we need to return the least recap value. + // If toggle is true, then we need to return the least Recap value. if toggle { - if averageLoad <= recapValue { + if averageLoad <= RecapValue { return averageLoad, nil } else { - return recapValue, nil + return RecapValue, nil } } else { return averageLoad, nil @@ -213,7 +180,7 @@ Recapping the entire cluster. This needs to be called whenever a task finishes execution. */ -func (capper clusterwideCapper) recap(totalPower map[string]float64, +func (capper ClusterwideCapper) Recap(totalPower map[string]float64, taskMonitor map[string][]def.Task, finishedTaskId string) (float64, error) { // Validation if totalPower == nil || taskMonitor == nil { @@ -264,98 +231,44 @@ func (capper clusterwideCapper) recap(totalPower map[string]float64, } } -/* Quick sort algorithm to sort tasks, in place, in ascending order of power.*/ -func (capper clusterwideCapper) quickSort(low int, high int, tasksToSort *[]def.Task) { - i := low - j := high - // calculating the pivot - pivotIndex := low + (high-low)/2 - pivot := (*tasksToSort)[pivotIndex] - for i <= j { - for (*tasksToSort)[i].Watts < pivot.Watts { - i++ - } - for (*tasksToSort)[j].Watts > pivot.Watts { - j-- - } - if i <= j { - temp := (*tasksToSort)[i] - (*tasksToSort)[i] = (*tasksToSort)[j] - (*tasksToSort)[j] = temp - i++ - j-- - } - } - if low < j { - capper.quickSort(low, j, tasksToSort) - } - if i < high { - capper.quickSort(i, high, tasksToSort) - } -} - -// Sorting tasks in ascending order of requested watts. -func (capper clusterwideCapper) sortTasks(tasksToSort *[]def.Task) { - capper.quickSort(0, len(*tasksToSort)-1, tasksToSort) -} - /* -Remove entry for finished task. -This function is called when a task completes. +Remove entry for finished task from the window + +This function is called when a task completes. This completed task needs to be removed from the window of tasks (if it is still present) - so that it doesn't contribute to the computation of the cap value. + so that it doesn't contribute to the computation of the next cap value. */ -func (capper clusterwideCapper) taskFinished(taskID string) { - // If the window is empty the just return. This condition should technically return false. - if capper.windowOfTasks.Len() == 0 { - return - } - - // Checking whether the task with the given taskID is currently present in the window of tasks. - var taskElementToRemove *list.Element - for taskElement := capper.windowOfTasks.Front(); taskElement != nil; taskElement = taskElement.Next() { - if tsk, ok := taskElement.Value.(*def.Task); ok { - if tsk.TaskID == taskID { - taskElementToRemove = taskElement - } - } - } - - // we need to remove the task from the window. - if taskToRemove, ok := taskElementToRemove.Value.(*def.Task); ok { - capper.windowOfTasks.Remove(taskElementToRemove) - capper.numberOfTasksInWindow -= 1 - capper.currentSum -= float64(taskToRemove.Watts) * constants.CapMargin - } +func (capper ClusterwideCapper) TaskFinished(taskID string) { + runAvg.Remove(taskID) } // First come first serve scheduling. -func (capper clusterwideCapper) fcfsDetermineCap(totalPower map[string]float64, +func (capper ClusterwideCapper) FCFSDeterminedCap(totalPower map[string]float64, newTask *def.Task) (float64, error) { // Validation if totalPower == nil { return 100, errors.New("Invalid argument: totalPower") } else { // Need to calculate the running average - runningAverage := capper.runningAverageOfWatts(newTask) + runningAverage := runAvg.Calc(taskWrapper{task: *newTask}, constants.WindowSize) // For each node, calculate the percentage of the running average to the total power. - runningAverageToTotalPowerPercentage := make(map[string]float64) + ratios := make(map[string]float64) for host, tpower := range totalPower { if tpower >= runningAverage { - runningAverageToTotalPowerPercentage[host] = (runningAverage / tpower) * 100 + ratios[host] = (runningAverage / tpower) * 100 } else { // We don't consider this host for the computation of the cluster wide cap. } } // Determine the cluster wide cap value. - capValue := capper.getCap(runningAverageToTotalPowerPercentage) + capValue := capper.getCap(ratios) // Need to cap the cluster to this value. return capValue, nil } } // Stringer for an instance of clusterwideCapper -func (capper clusterwideCapper) string() string { +func (capper ClusterwideCapper) String() string { return "Cluster Capper -- Proactively cap the entire cluster." } diff --git a/scheduler.go b/scheduler.go index 280d788..bd7c943 100644 --- a/scheduler.go +++ b/scheduler.go @@ -56,7 +56,7 @@ func main() { fmt.Println(task) } - scheduler := schedulers.NewProactiveClusterwideCapRanked(tasks, *ignoreWatts) + scheduler := schedulers.NewPistonCapper(tasks, *ignoreWatts) driver, err := sched.NewMesosSchedulerDriver(sched.DriverConfig{ Master: *master, Framework: &mesos.FrameworkInfo{ diff --git a/schedulers/README.md b/schedulers/README.md index 11f25a0..7492e56 100644 --- a/schedulers/README.md +++ b/schedulers/README.md @@ -4,12 +4,14 @@ Electron: Scheduling Algorithms To Do: * Design changes -- Possible to have one scheduler with different scheduling schemes? - * Make the running average calculation generic, so that schedulers in the future can use it and not implement their own. + * Fix the race condition on 'tasksRunning' in proactiveclusterwidecappingfcfs.go and proactiveclusterwidecappingranked.go + * Separate the capping strategies from the scheduling algorithms and make it possible to use any capping strategy with any scheduler. Scheduling Algorithms: + * First Fit + * First Fit with sorted watts * Bin-packing with sorted watts * FCFS Proactive Cluster-wide Capping * Ranked Proactive Cluster-wide Capping - * First Fit - * First Fit with sorted watts + * Piston Capping -- Works when scheduler is run with WAR diff --git a/schedulers/pistoncapper.go b/schedulers/pistoncapper.go new file mode 100644 index 0000000..d5a22d1 --- /dev/null +++ b/schedulers/pistoncapper.go @@ -0,0 +1,407 @@ +package schedulers + +import ( + "bitbucket.org/sunybingcloud/electron/constants" + "bitbucket.org/sunybingcloud/electron/def" + "bitbucket.org/sunybingcloud/electron/rapl" + "errors" + "fmt" + "github.com/golang/protobuf/proto" + mesos "github.com/mesos/mesos-go/mesosproto" + "github.com/mesos/mesos-go/mesosutil" + sched "github.com/mesos/mesos-go/scheduler" + "log" + "math" + "strings" + "sync" + "time" +) + +/* + Piston Capper implements the Scheduler interface + + This is basically extending the BinPacking algorithm to also cap each node at a different values, + corresponding to the load on that node. +*/ +type PistonCapper struct { + tasksCreated int + tasksRunning int + tasks []def.Task + metrics map[string]def.Metric + running map[string]map[string]bool + taskMonitor map[string][]def.Task + totalPower map[string]float64 + ignoreWatts bool + ticker *time.Ticker + isCapping bool + + // First set of PCP values are garbage values, signal to logger to start recording when we're + // about to schedule the new task. + RecordPCP bool + + // This channel is closed when the program receives an interrupt, + // signalling that the program should shut down. + Shutdown chan struct{} + + // This channel is closed after shutdown is closed, and only when all + // outstanding tasks have been cleaned up. + Done chan struct{} + + // Controls when to shutdown pcp logging. + PCPLog chan struct{} +} + +// New electron scheduler. +func NewPistonCapper(tasks []def.Task, ignoreWatts bool) *PistonCapper { + s := &PistonCapper{ + tasks: tasks, + ignoreWatts: ignoreWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + taskMonitor: make(map[string][]def.Task), + totalPower: make(map[string]float64), + RecordPCP: false, + ticker: time.NewTicker(5 * time.Second), + isCapping: false, + } + return s +} + +// check whether task fits the offer or not. +func (s *PistonCapper) takeOffer(offerWatts float64, offerCPU float64, offerRAM float64, + totalWatts float64, totalCPU float64, totalRAM float64, task def.Task) bool { + if (s.ignoreWatts || (offerWatts >= (totalWatts + task.Watts))) && + (offerCPU >= (totalCPU + task.CPU)) && + (offerRAM >= (totalRAM + task.RAM)) { + return true + } else { + return false + } +} + +// mutex +var mutex sync.Mutex + +func (s *PistonCapper) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { + taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) + s.tasksCreated++ + + if !s.RecordPCP { + // Turn on logging + s.RecordPCP = true + time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts + } + + // If this is our first time running into this Agent + if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok { + s.running[offer.GetSlaveId().GoString()] = make(map[string]bool) + } + + // Setting the task ID to the task. This is done so that we can consider each task to be different, + // even though they have the same parameters. + task.SetTaskID(*proto.String("electron-" + taskName)) + // Add task to list of tasks running on node + s.running[offer.GetSlaveId().GoString()][taskName] = true + // Adding the task to the taskMonitor + if len(s.taskMonitor[*offer.Hostname]) == 0 { + s.taskMonitor[*offer.Hostname] = []def.Task{task} + } else { + s.taskMonitor[*offer.Hostname] = append(s.taskMonitor[*offer.Hostname], task) + } + + resources := []*mesos.Resource{ + mesosutil.NewScalarResource("cpus", task.CPU), + mesosutil.NewScalarResource("mem", task.RAM), + } + + if !s.ignoreWatts { + resources = append(resources, mesosutil.NewScalarResource("watts", task.Watts)) + } + + return &mesos.TaskInfo{ + Name: proto.String(taskName), + TaskId: &mesos.TaskID{ + Value: proto.String("electron-" + taskName), + }, + SlaveId: offer.SlaveId, + Resources: resources, + Command: &mesos.CommandInfo{ + Value: proto.String(task.CMD), + }, + Container: &mesos.ContainerInfo{ + Type: mesos.ContainerInfo_DOCKER.Enum(), + Docker: &mesos.ContainerInfo_DockerInfo{ + Image: proto.String(task.Image), + Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated + }, + }, + } +} + +func (s *PistonCapper) Registered( + _ sched.SchedulerDriver, + frameworkID *mesos.FrameworkID, + masterInfo *mesos.MasterInfo) { + log.Printf("Framework %s registered with master %s", frameworkID, masterInfo) +} + +func (s *PistonCapper) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) { + log.Printf("Framework re-registered with master %s", masterInfo) +} + +func (s *PistonCapper) Disconnected(sched.SchedulerDriver) { + log.Println("Framework disconnected with master") +} + +// go routine to cap the each node in the cluster at regular intervals of time. +var capValues = make(map[string]float64) + +// Storing the previous cap value for each host so as to not repeatedly cap the nodes to the same value. (reduces overhead) +var previousRoundedCapValues = make(map[string]int) + +func (s *PistonCapper) startCapping() { + go func() { + for { + select { + case <-s.ticker.C: + // Need to cap each node + mutex.Lock() + for host, capValue := range capValues { + roundedCapValue := int(math.Floor(capValue + 0.5)) + // has the cap value changed + if prevRoundedCap, ok := previousRoundedCapValues[host]; ok { + if prevRoundedCap != roundedCapValue { + if err := rapl.Cap(host, "rapl", roundedCapValue); err != nil { + log.Println(err) + } else { + log.Printf("Capped [%s] at %d", host, int(math.Floor(capValue+0.5))) + } + previousRoundedCapValues[host] = roundedCapValue + } + } else { + if err := rapl.Cap(host, "rapl", roundedCapValue); err != nil { + log.Println(err) + } else { + log.Printf("Capped [%s] at %d", host, int(math.Floor(capValue+0.5))) + } + previousRoundedCapValues[host] = roundedCapValue + } + } + mutex.Unlock() + } + } + }() +} + +// Stop the capping +func (s *PistonCapper) stopCapping() { + if s.isCapping { + log.Println("Stopping the capping.") + s.ticker.Stop() + mutex.Lock() + s.isCapping = false + mutex.Unlock() + } +} + +func (s *PistonCapper) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { + log.Printf("Received %d resource offers", len(offers)) + + // retrieving the total power for each host in the offers + for _, offer := range offers { + if _, ok := s.totalPower[*offer.Hostname]; !ok { + _, _, offer_watts := OfferAgg(offer) + s.totalPower[*offer.Hostname] = offer_watts + } + } + + // Displaying the totalPower + for host, tpower := range s.totalPower { + log.Printf("TotalPower[%s] = %f", host, tpower) + } + + /* + Piston capping strategy + + Perform bin-packing of tasks on nodes in the cluster, making sure that no task is given less hard-limit resources than requested. + For each set of tasks that are scheduled, compute the new cap values for each host in the cluster. + At regular intervals of time, cap each node in the cluster. + */ + for _, offer := range offers { + select { + case <-s.Shutdown: + log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") + driver.DeclineOffer(offer.Id, longFilter) + + log.Println("Number of tasks still running: ", s.tasksRunning) + continue + default: + } + + fitTasks := []*mesos.TaskInfo{} + offerCPU, offerRAM, offerWatts := OfferAgg(offer) + taken := false + totalWatts := 0.0 + totalCPU := 0.0 + totalRAM := 0.0 + // Store the partialLoad for host corresponding to this offer. + // Once we can't fit any more tasks, we update capValue for this host with partialLoad and then launch the fit tasks. + partialLoad := 0.0 + for i, task := range s.tasks { + // Check host if it exists + if task.Host != "" { + // Don't take offer if it doens't match our task's host requirement. + if !strings.HasPrefix(*offer.Hostname, task.Host) { + continue + } + } + + for *task.Instances > 0 { + // Does the task fit + if s.takeOffer(offerWatts, offerCPU, offerRAM, totalWatts, totalCPU, totalRAM, task) { + + // Start piston capping if haven't started yet + if !s.isCapping { + s.isCapping = true + s.startCapping() + } + + taken = true + totalWatts += task.Watts + totalCPU += task.CPU + totalRAM += task.RAM + log.Println("Co-Located with: ") + coLocated(s.running[offer.GetSlaveId().GoString()]) + fitTasks = append(fitTasks, s.newTask(offer, task)) + + log.Println("Inst: ", *task.Instances) + *task.Instances-- + // updating the cap value for offer.Hostname + partialLoad += ((task.Watts * constants.CapMargin) / s.totalPower[*offer.Hostname]) * 100 + + if *task.Instances <= 0 { + // All instances of task have been scheduled. Remove it + s.tasks = append(s.tasks[:i], s.tasks[i+1:]...) + if len(s.tasks) <= 0 { + log.Println("Done scheduling all tasks") + close(s.Shutdown) + } + } + } else { + break // Continue on to next task + } + } + } + + if taken { + // Updating the cap value for offer.Hostname + mutex.Lock() + capValues[*offer.Hostname] += partialLoad + mutex.Unlock() + log.Printf("Starting on [%s]\n", offer.GetHostname()) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, fitTasks, defaultFilter) + } else { + // If there was no match for task + log.Println("There is not enough resources to launch task: ") + cpus, mem, watts := OfferAgg(offer) + + log.Printf("\n", cpus, mem, watts) + driver.DeclineOffer(offer.Id, defaultFilter) + } + } +} + +// Remove finished task from the taskMonitor +func (s *PistonCapper) deleteFromTaskMonitor(finishedTaskID string) (def.Task, string, error) { + hostOfFinishedTask := "" + indexOfFinishedTask := -1 + found := false + var finishedTask def.Task + + for host, tasks := range s.taskMonitor { + for i, task := range tasks { + if task.TaskID == finishedTaskID { + hostOfFinishedTask = host + indexOfFinishedTask = i + found = true + } + } + if found { + break + } + } + + if hostOfFinishedTask != "" && indexOfFinishedTask != -1 { + finishedTask = s.taskMonitor[hostOfFinishedTask][indexOfFinishedTask] + log.Printf("Removing task with TaskID [%s] from the list of running tasks\n", + s.taskMonitor[hostOfFinishedTask][indexOfFinishedTask].TaskID) + s.taskMonitor[hostOfFinishedTask] = append(s.taskMonitor[hostOfFinishedTask][:indexOfFinishedTask], + s.taskMonitor[hostOfFinishedTask][indexOfFinishedTask+1:]...) + } else { + return finishedTask, hostOfFinishedTask, errors.New("Finished Task not present in TaskMonitor") + } + return finishedTask, hostOfFinishedTask, nil +} + +func (s *PistonCapper) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { + log.Printf("Received task status [%s] for task [%s]\n", NameFor(status.State), *status.TaskId.Value) + + if *status.State == mesos.TaskState_TASK_RUNNING { + mutex.Lock() + s.tasksRunning++ + mutex.Unlock() + } else if IsTerminal(status.State) { + delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) + // Deleting the task from the taskMonitor + finishedTask, hostOfFinishedTask, err := s.deleteFromTaskMonitor(*status.TaskId.Value) + if err != nil { + log.Println(err) + } + + // Need to update the cap values for host of the finishedTask + mutex.Lock() + capValues[hostOfFinishedTask] -= ((finishedTask.Watts * constants.CapMargin) / s.totalPower[hostOfFinishedTask]) * 100 + // Checking to see if the cap value has become 0, in which case we uncap the host. + if int(math.Floor(capValues[hostOfFinishedTask]+0.5)) == 0 { + capValues[hostOfFinishedTask] = 100 + } + s.tasksRunning-- + mutex.Unlock() + + if s.tasksRunning == 0 { + select { + case <-s.Shutdown: + s.stopCapping() + close(s.Done) + default: + } + } + } + log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) +} + +func (s *PistonCapper) FrameworkMessage( + driver sched.SchedulerDriver, + executorID *mesos.ExecutorID, + slaveID *mesos.SlaveID, + message string) { + + log.Println("Getting a framework message: ", message) + log.Printf("Received a framework message from some unknown source: %s", *executorID.Value) +} + +func (s *PistonCapper) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) { + log.Printf("Offer %s rescinded", offerID) +} +func (s *PistonCapper) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) { + log.Printf("Slave %s lost", slaveID) +} +func (s *PistonCapper) ExecutorLost(_ sched.SchedulerDriver, executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, status int) { + log.Printf("Executor %s on slave %s was lost", executorID, slaveID) +} + +func (s *PistonCapper) Error(_ sched.SchedulerDriver, err string) { + log.Printf("Receiving an error: %s", err) +} diff --git a/schedulers/proactiveclusterwidecappingfcfs.go b/schedulers/proactiveclusterwidecappingfcfs.go index a96d496..68b096e 100644 --- a/schedulers/proactiveclusterwidecappingfcfs.go +++ b/schedulers/proactiveclusterwidecappingfcfs.go @@ -3,6 +3,7 @@ package schedulers import ( "bitbucket.org/sunybingcloud/electron/constants" "bitbucket.org/sunybingcloud/electron/def" + "bitbucket.org/sunybingcloud/electron/pcp" "bitbucket.org/sunybingcloud/electron/rapl" "fmt" "github.com/golang/protobuf/proto" @@ -37,7 +38,7 @@ type ProactiveClusterwideCapFCFS struct { availablePower map[string]float64 // available power for each node in the cluster. totalPower map[string]float64 // total power for each node in the cluster. ignoreWatts bool - capper *clusterwideCapper + capper *pcp.ClusterwideCapper ticker *time.Ticker recapTicker *time.Ticker isCapping bool // indicate whether we are currently performing cluster wide capping. @@ -72,7 +73,7 @@ func NewProactiveClusterwideCapFCFS(tasks []def.Task, ignoreWatts bool) *Proacti availablePower: make(map[string]float64), totalPower: make(map[string]float64), RecordPCP: false, - capper: getClusterwideCapperInstance(), + capper: pcp.GetClusterwideCapperInstance(), ticker: time.NewTicker(10 * time.Second), recapTicker: time.NewTicker(20 * time.Second), isCapping: false, @@ -290,7 +291,7 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive s.startCapping() } taken = true - tempCap, err := s.capper.fcfsDetermineCap(s.totalPower, &task) + tempCap, err := s.capper.FCFSDeterminedCap(s.totalPower, &task) if err == nil { fcfsMutex.Lock() @@ -345,10 +346,10 @@ func (s *ProactiveClusterwideCapFCFS) StatusUpdate(driver sched.SchedulerDriver, } else if IsTerminal(status.State) { delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) // Need to remove the task from the window of tasks. - s.capper.taskFinished(*status.TaskId.Value) + s.capper.TaskFinished(*status.TaskId.Value) // Determining the new cluster wide cap. - //tempCap, err := s.capper.recap(s.totalPower, s.taskMonitor, *status.TaskId.Value) - tempCap, err := s.capper.cleverRecap(s.totalPower, s.taskMonitor, *status.TaskId.Value) + //tempCap, err := s.capper.Recap(s.totalPower, s.taskMonitor, *status.TaskId.Value) + tempCap, err := s.capper.CleverRecap(s.totalPower, s.taskMonitor, *status.TaskId.Value) if err == nil { // if new determined cap value is different from the current recap value then we need to recap. if int(math.Floor(tempCap+0.5)) != int(math.Floor(fcfsRecapValue+0.5)) { diff --git a/schedulers/proactiveclusterwidecappingranked.go b/schedulers/proactiveclusterwidecappingranked.go index 69ae26f..85e8537 100644 --- a/schedulers/proactiveclusterwidecappingranked.go +++ b/schedulers/proactiveclusterwidecappingranked.go @@ -13,6 +13,7 @@ package schedulers import ( "bitbucket.org/sunybingcloud/electron/constants" "bitbucket.org/sunybingcloud/electron/def" + "bitbucket.org/sunybingcloud/electron/pcp" "bitbucket.org/sunybingcloud/electron/rapl" "fmt" "github.com/golang/protobuf/proto" @@ -21,6 +22,7 @@ import ( sched "github.com/mesos/mesos-go/scheduler" "log" "math" + "sort" "strings" "sync" "time" @@ -47,7 +49,7 @@ type ProactiveClusterwideCapRanked struct { availablePower map[string]float64 // available power for each node in the cluster. totalPower map[string]float64 // total power for each node in the cluster. ignoreWatts bool - capper *clusterwideCapper + capper *pcp.ClusterwideCapper ticker *time.Ticker recapTicker *time.Ticker isCapping bool // indicate whether we are currently performing cluster wide capping. @@ -82,7 +84,7 @@ func NewProactiveClusterwideCapRanked(tasks []def.Task, ignoreWatts bool) *Proac availablePower: make(map[string]float64), totalPower: make(map[string]float64), RecordPCP: false, - capper: getClusterwideCapperInstance(), + capper: pcp.GetClusterwideCapperInstance(), ticker: time.NewTicker(10 * time.Second), recapTicker: time.NewTicker(20 * time.Second), isCapping: false, @@ -263,7 +265,7 @@ func (s *ProactiveClusterwideCapRanked) ResourceOffers(driver sched.SchedulerDri // sorting the tasks in ascending order of watts. if (len(s.tasks) > 0) { - s.capper.sortTasks(&s.tasks) + sort.Sort(def.WattsSorter(s.tasks)) // calculating the total number of tasks ranked. numberOfRankedTasks := 0 for _, task := range s.tasks { @@ -313,7 +315,7 @@ func (s *ProactiveClusterwideCapRanked) ResourceOffers(driver sched.SchedulerDri s.startCapping() } taken = true - tempCap, err := s.capper.fcfsDetermineCap(s.totalPower, &task) + tempCap, err := s.capper.FCFSDeterminedCap(s.totalPower, &task) if err == nil { rankedMutex.Lock() @@ -379,10 +381,10 @@ func (s *ProactiveClusterwideCapRanked) StatusUpdate(driver sched.SchedulerDrive } } else { // Need to remove the task from the window - s.capper.taskFinished(*status.TaskId.Value) + s.capper.TaskFinished(*status.TaskId.Value) // Determining the new cluster wide cap. - //tempCap, err := s.capper.recap(s.totalPower, s.taskMonitor, *status.TaskId.Value) - tempCap, err := s.capper.cleverRecap(s.totalPower, s.taskMonitor, *status.TaskId.Value) + //tempCap, err := s.capper.Recap(s.totalPower, s.taskMonitor, *status.TaskId.Value) + tempCap, err := s.capper.CleverRecap(s.totalPower, s.taskMonitor, *status.TaskId.Value) if err == nil { // If new determined cap value is different from the current recap value then we need to recap. diff --git a/utilities/runAvg/runAvg.go b/utilities/runAvg/runAvg.go new file mode 100644 index 0000000..f022715 --- /dev/null +++ b/utilities/runAvg/runAvg.go @@ -0,0 +1,108 @@ +/* +A utility to calculate the running average. + +One should implement Val() to be able to use this utility. +*/ + +package runAvg + +import ( + "errors" + "container/list" +) + +type Interface interface { + // Value to use for running average calculation. + Val() float64 + // Unique ID + ID() string +} + +type runningAverageCalculator struct { + window list.List + windowSize int + currentSum float64 +} + +// singleton instance +var racSingleton *runningAverageCalculator + +// return single instance +func getInstance(curSum float64, wSize int) *runningAverageCalculator { + if racSingleton == nil { + racSingleton = &runningAverageCalculator { + windowSize: wSize, + currentSum: curSum, + } + return racSingleton + } else { + // Updating window size if a new window size is given. + if wSize != racSingleton.windowSize { + racSingleton.windowSize = wSize + } + return racSingleton + } +} + +// Compute the running average by adding 'data' to the window. +// Updating currentSum to get constant time complexity for every running average computation. +func (rac *runningAverageCalculator) calculate(data Interface) float64 { + if rac.window.Len() < rac.windowSize { + rac.window.PushBack(data) + rac.currentSum += data.Val() + } else { + // removing the element at the front of the window. + elementToRemove := rac.window.Front() + rac.currentSum -= elementToRemove.Value.(Interface).Val() + rac.window.Remove(elementToRemove) + + // adding new element to the window + rac.window.PushBack(data) + rac.currentSum += data.Val() + } + return rac.currentSum / float64(rac.window.Len()) +} + +/* +If element with given ID present in the window, then remove it and return (removeElement, nil). +Else, return (nil, error) +*/ +func (rac *runningAverageCalculator) removeFromWindow(id string) (interface{}, error) { + for element := rac.window.Front(); element != nil; element = element.Next() { + if elementToRemove := element.Value.(Interface); elementToRemove.ID() == id { + rac.window.Remove(element) + rac.currentSum -= elementToRemove.Val() + return elementToRemove, nil + } + } + return nil, errors.New("Error: Element not found in the window.") +} + +// Taking windowSize as a parameter to allow for sliding window implementation. +func Calc(data Interface, windowSize int) float64 { + rac := getInstance(0.0, windowSize) + return rac.calculate(data) +} + +// Remove element from the window if it is present. +func Remove(id string) (interface{}, error) { + // checking if racSingleton has been instantiated + if racSingleton == nil { + return nil, errors.New("Error: Not instantiated. Please call Init() to instantiate.") + } else { + return racSingleton.removeFromWindow(id) + } +} + +// initialize the parameters of the running average calculator +func Init() { + // checking to see if racSingleton needs top be instantiated + if racSingleton == nil { + racSingleton = getInstance(0.0, 0) + } + // Setting parameters to default values. Could also set racSingleton to nil but this leads to unnecessary overhead of creating + // another instance when Calc is called. + racSingleton.window.Init() + racSingleton.windowSize = 0 + racSingleton.currentSum = 0.0 +} \ No newline at end of file diff --git a/utilities/utils.go b/utilities/utils.go index c53df74..cfe9785 100644 --- a/utilities/utils.go +++ b/utilities/utils.go @@ -7,7 +7,7 @@ The Pair and PairList have been taken from google groups forum, https://groups.google.com/forum/#!topic/golang-nuts/FT7cjmcL7gw */ -// Utility struct that helps in sorting the available power by value. +// Utility struct that helps in sorting a map[string]float64 by value. type Pair struct { Key string Value float64