diff --git a/README.md b/README.md index f7e8a33..2033b81 100644 --- a/README.md +++ b/README.md @@ -5,19 +5,21 @@ To Do: * Create metrics for each task launched [Time to schedule, run time, power used] * Have calibration phase? - * Add ability to use constraints + * Add ability to use constraints * Running average calculations https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average - - + * Make parameters corresponding to each scheduler configurable (possible to have a config template for each scheduler?) **Requires Performance-Copilot tool pmdumptext to be installed on the machine on which electron is launched for logging to work** +How to run (Use the --help option to get information about other command-line options): -How to run: +`./electron -workload ` -`./electron -workload -ignoreWatts ` +To run electron with ignoreWatts, run the following command, + +`./electron -workload -ignoreWatts` Workload schema: @@ -43,4 +45,4 @@ Workload schema: "inst": 9 } ] -``` \ No newline at end of file +``` diff --git a/constants/constants.go b/constants/constants.go new file mode 100644 index 0000000..08ad28a --- /dev/null +++ b/constants/constants.go @@ -0,0 +1,82 @@ +/* +Constants that are used across scripts +1. The available hosts = stratos-00x (x varies from 1 to 8) +2. cap_margin = percentage of the requested power to allocate +3. power_threshold = overloading factor +5. window_size = number of tasks to consider for computation of the dynamic cap. + +Also, exposing functions to update or initialize some of the constants. +*/ +package constants + +var Hosts = []string{"stratos-001.cs.binghamton.edu", "stratos-002.cs.binghamton.edu", + "stratos-003.cs.binghamton.edu", "stratos-004.cs.binghamton.edu", + "stratos-005.cs.binghamton.edu", "stratos-006.cs.binghamton.edu", + "stratos-007.cs.binghamton.edu", "stratos-008.cs.binghamton.edu"} + +// Add a new host to the slice of hosts. +func AddNewHost(newHost string) bool { + // Validation + if newHost == "" { + return false + } else { + Hosts = append(Hosts, newHost) + return true + } +} + +/* + Lower bound of the percentage of requested power, that can be allocated to a task. + + Note: This constant is not used for the proactive cluster wide capping schemes. +*/ +var PowerThreshold = 0.6 // Right now saying that a task will never be given lesser than 60% of the power it requested. + +/* + Margin with respect to the required power for a job. + So, if power required = 10W, the node would be capped to 75%*10W. + This value can be changed upon convenience. +*/ +var CapMargin = 0.70 + +// Modify the cap margin. +func UpdateCapMargin(newCapMargin float64) bool { + // Checking if the new_cap_margin is less than the power threshold. + if newCapMargin < StarvationFactor { + return false + } else { + CapMargin = newCapMargin + return true + } +} + +/* + The factor, that when multiplied with (task.Watts * CapMargin) results in (task.Watts * PowerThreshold). + This is used to check whether available power, for a host in an offer, is not less than (PowerThreshold * task.Watts), + which is assumed to result in starvation of the task. + Here is an example, + Suppose a task requires 100W of power. Assuming CapMargin = 0.75 and PowerThreshold = 0.6. + So, the assumed allocated watts is 75W. + Now, when we get an offer, we need to check whether the available power, for the host in that offer, is + not less than 60% (the PowerTreshold) of the requested power (100W). + To put it in other words, + availablePower >= 100W * 0.75 * X + where X is the StarvationFactor (80% in this case) + + Note: This constant is not used for the proactive cluster wide capping schemes. +*/ +var StarvationFactor = PowerThreshold / CapMargin + +// Window size for running average +var WindowSize = 160 + +// Update the window size. +func UpdateWindowSize(newWindowSize int) bool { + // Validation + if newWindowSize == 0 { + return false + } else { + WindowSize = newWindowSize + return true + } +} diff --git a/def/task.go b/def/task.go index 5e0f8de..e52acb3 100644 --- a/def/task.go +++ b/def/task.go @@ -1,6 +1,7 @@ package def import ( + "bitbucket.org/sunybingcloud/electron/constants" "encoding/json" "github.com/pkg/errors" "os" @@ -15,6 +16,7 @@ type Task struct { CMD string `json:"cmd"` Instances *int `json:"inst"` Host string `json:"host"` + TaskID string `json:"taskID"` } func TasksFromJSON(uri string) ([]Task, error) { @@ -34,6 +36,34 @@ func TasksFromJSON(uri string) ([]Task, error) { return tasks, nil } +// Update the host on which the task needs to be scheduled. +func (tsk *Task) UpdateHost(newHost string) bool { + // Validation + isCorrectHost := false + for _, existingHost := range constants.Hosts { + if newHost == existingHost { + isCorrectHost = true + } + } + if !isCorrectHost { + return false + } else { + tsk.Host = newHost + return true + } +} + +// Set the taskID of the task. +func (tsk *Task) SetTaskID(taskID string) bool { + // Validation + if taskID == "" { + return false + } else { + tsk.TaskID = taskID + return true + } +} + type WattsSorter []Task func (slice WattsSorter) Len() int { @@ -47,3 +77,16 @@ func (slice WattsSorter) Less(i, j int) bool { func (slice WattsSorter) Swap(i, j int) { slice[i], slice[j] = slice[j], slice[i] } + +// Compare two tasks. +func Compare(task1 *Task, task2 *Task) bool { + // If comparing the same pointers (checking the addresses). + if task1 == task2 { + return true + } + if task1.TaskID != task2.TaskID { + return false + } else { + return true + } +} diff --git a/pcp/loganddynamiccap.go b/pcp/loganddynamiccap.go index cc2776f..4b17fb5 100644 --- a/pcp/loganddynamiccap.go +++ b/pcp/loganddynamiccap.go @@ -1,7 +1,7 @@ package pcp import ( - "bitbucket.org/bingcloud/electron/rapl" + "bitbucket.org/sunybingcloud/electron/rapl" "bufio" "container/ring" "log" diff --git a/pcp/pcp.go b/pcp/pcp.go index 8a1d46d..3e65a70 100644 --- a/pcp/pcp.go +++ b/pcp/pcp.go @@ -19,6 +19,7 @@ func Start(quit chan struct{}, logging *bool, prefix string) { if err != nil { log.Fatal(err) } + log.Println("Writing pcp logs to file: " + logFile.Name()) defer logFile.Close() diff --git a/rapl/cap.go b/rapl/cap.go index 20cd945..b15d352 100644 --- a/rapl/cap.go +++ b/rapl/cap.go @@ -26,6 +26,7 @@ func Cap(host, username string, percentage int) error { } session, err := connection.NewSession() + defer session.Close() if err != nil { return errors.Wrap(err, "Failed to create session") } diff --git a/scheduler.go b/scheduler.go index 93bbdf4..280d788 100644 --- a/scheduler.go +++ b/scheduler.go @@ -1,9 +1,9 @@ package main import ( - "bitbucket.org/bingcloud/electron/def" - "bitbucket.org/bingcloud/electron/pcp" - "bitbucket.org/bingcloud/electron/schedulers" + "bitbucket.org/sunybingcloud/electron/def" + "bitbucket.org/sunybingcloud/electron/schedulers" + "bitbucket.org/sunybingcloud/electron/pcp" "flag" "fmt" "github.com/golang/protobuf/proto" @@ -56,7 +56,7 @@ func main() { fmt.Println(task) } - scheduler := schedulers.NewFirstFit(tasks, *ignoreWatts) + scheduler := schedulers.NewProactiveClusterwideCapRanked(tasks, *ignoreWatts) driver, err := sched.NewMesosSchedulerDriver(sched.DriverConfig{ Master: *master, Framework: &mesos.FrameworkInfo{ @@ -70,8 +70,8 @@ func main() { return } - //go pcp.Start(scheduler.PCPLog, &scheduler.RecordPCP, *pcplogPrefix) - go pcp.StartLogAndDynamicCap(scheduler.PCPLog, &scheduler.RecordPCP, *pcplogPrefix, *hiThreshold, *loThreshold) + go pcp.Start(scheduler.PCPLog, &scheduler.RecordPCP, *pcplogPrefix) + //go pcp.StartLogAndDynamicCap(scheduler.PCPLog, &scheduler.RecordPCP, *pcplogPrefix, *hiThreshold, *loThreshold) time.Sleep(1 * time.Second) // Attempt to handle signint to not leave pmdumptext running diff --git a/schedulers/README.md b/schedulers/README.md new file mode 100644 index 0000000..11f25a0 --- /dev/null +++ b/schedulers/README.md @@ -0,0 +1,15 @@ +Electron: Scheduling Algorithms +================================ + +To Do: + + * Design changes -- Possible to have one scheduler with different scheduling schemes? + * Make the running average calculation generic, so that schedulers in the future can use it and not implement their own. + +Scheduling Algorithms: + + * Bin-packing with sorted watts + * FCFS Proactive Cluster-wide Capping + * Ranked Proactive Cluster-wide Capping + * First Fit + * First Fit with sorted watts diff --git a/schedulers/binpacksortedwatts.go b/schedulers/binpacksortedwatts.go index d73b64c..b05a3e3 100644 --- a/schedulers/binpacksortedwatts.go +++ b/schedulers/binpacksortedwatts.go @@ -1,7 +1,7 @@ package schedulers import ( - "bitbucket.org/bingcloud/electron/def" + "bitbucket.org/sunybingcloud/electron/def" "fmt" "github.com/golang/protobuf/proto" mesos "github.com/mesos/mesos-go/mesosproto" diff --git a/schedulers/firstfit.go b/schedulers/firstfit.go index bdfad7e..e426ab1 100644 --- a/schedulers/firstfit.go +++ b/schedulers/firstfit.go @@ -1,7 +1,7 @@ package schedulers import ( - "bitbucket.org/bingcloud/electron/def" + "bitbucket.org/sunybingcloud/electron/def" "fmt" "github.com/golang/protobuf/proto" mesos "github.com/mesos/mesos-go/mesosproto" diff --git a/schedulers/firstfitsortedwatts.go b/schedulers/firstfitsortedwatts.go index faab082..9067e1c 100644 --- a/schedulers/firstfitsortedwatts.go +++ b/schedulers/firstfitsortedwatts.go @@ -1,7 +1,7 @@ package schedulers import ( - "bitbucket.org/bingcloud/electron/def" + "bitbucket.org/sunybingcloud/electron/def" "fmt" "github.com/golang/protobuf/proto" mesos "github.com/mesos/mesos-go/mesosproto" diff --git a/schedulers/firstfitwattsonly.go b/schedulers/firstfitwattsonly.go index b19ce3b..e5962a7 100644 --- a/schedulers/firstfitwattsonly.go +++ b/schedulers/firstfitwattsonly.go @@ -1,7 +1,7 @@ package schedulers import ( - "bitbucket.org/bingcloud/electron/def" + "bitbucket.org/sunybingcloud/electron/def" "fmt" "github.com/golang/protobuf/proto" mesos "github.com/mesos/mesos-go/mesosproto" diff --git a/schedulers/proactiveclusterwidecappers.go b/schedulers/proactiveclusterwidecappers.go new file mode 100644 index 0000000..e943d37 --- /dev/null +++ b/schedulers/proactiveclusterwidecappers.go @@ -0,0 +1,361 @@ +/* +Cluster wide dynamic capping +Step1. Compute the running average of watts of tasks in window. +Step2. Compute what percentage of total power of each node, is the running average. +Step3. Compute the median of the percetages and this is the percentage that the cluster needs to be capped at. + +1. First fit scheduling -- Perform the above steps for each task that needs to be scheduled. +2. Ranked based scheduling -- Sort the tasks to be scheduled, in ascending order, and then determine the cluster wide cap. + +This is not a scheduler but a scheduling scheme that schedulers can use. +*/ +package schedulers + +import ( + "bitbucket.org/sunybingcloud/electron/constants" + "bitbucket.org/sunybingcloud/electron/def" + "container/list" + "errors" + "github.com/montanaflynn/stats" + "log" + "sort" +) + +// Structure containing utility data structures used to compute cluster-wide dynamic cap. +type clusterwideCapper struct { + // window of tasks. + windowOfTasks list.List + // The current sum of requested powers of the tasks in the window. + currentSum float64 + // The current number of tasks in the window. + numberOfTasksInWindow int +} + +// Defining constructor for clusterwideCapper. Please don't call this directly and instead use getClusterwideCapperInstance(). +func newClusterwideCapper() *clusterwideCapper { + return &clusterwideCapper{currentSum: 0.0, numberOfTasksInWindow: 0} +} + +// Singleton instance of clusterwideCapper +var singletonCapper *clusterwideCapper + +// Retrieve the singleton instance of clusterwideCapper. +func getClusterwideCapperInstance() *clusterwideCapper { + if singletonCapper == nil { + singletonCapper = newClusterwideCapper() + } else { + // Do nothing + } + return singletonCapper +} + +// Clear and initialize all the members of clusterwideCapper. +func (capper clusterwideCapper) clear() { + capper.windowOfTasks.Init() + capper.currentSum = 0 + capper.numberOfTasksInWindow = 0 +} + +// Compute the average of watts of all the tasks in the window. +func (capper clusterwideCapper) average() float64 { + return capper.currentSum / float64(capper.windowOfTasks.Len()) +} + +/* +Compute the running average. + +Using clusterwideCapper#windowOfTasks to store the tasks. +Task at position 0 (oldest task) is removed when the window is full and new task arrives. +*/ +func (capper clusterwideCapper) runningAverageOfWatts(tsk *def.Task) float64 { + var average float64 + if capper.numberOfTasksInWindow < constants.WindowSize { + capper.windowOfTasks.PushBack(tsk) + capper.numberOfTasksInWindow++ + capper.currentSum += float64(tsk.Watts) * constants.CapMargin + } else { + taskToRemoveElement := capper.windowOfTasks.Front() + if taskToRemove, ok := taskToRemoveElement.Value.(*def.Task); ok { + capper.currentSum -= float64(taskToRemove.Watts) * constants.CapMargin + capper.windowOfTasks.Remove(taskToRemoveElement) + } + capper.windowOfTasks.PushBack(tsk) + capper.currentSum += float64(tsk.Watts) * constants.CapMargin + } + average = capper.average() + return average +} + +/* +Calculating cap value. + +1. Sorting the values of runningAverageToTotalPowerPercentage in ascending order. +2. Computing the median of above sorted values. +3. The median is now the cap. +*/ +func (capper clusterwideCapper) getCap(runningAverageToTotalPowerPercentage map[string]float64) float64 { + var values []float64 + // Validation + if runningAverageToTotalPowerPercentage == nil { + return 100.0 + } + for _, apower := range runningAverageToTotalPowerPercentage { + values = append(values, apower) + } + // sorting the values in ascending order. + sort.Float64s(values) + // Calculating the median + if median, err := stats.Median(values); err == nil { + return median + } + // should never reach here. If here, then just setting the cap value to be 100 + return 100.0 +} + +/* +A recapping strategy which decides between 2 different recapping schemes. +1. the regular scheme based on the average power usage across the cluster. +2. A scheme based on the average of the loads on each node in the cluster. + +The recap value picked the least among the two. + +The cleverRecap scheme works well when the cluster is relatively idle and until then, + the primitive recapping scheme works better. +*/ +func (capper clusterwideCapper) cleverRecap(totalPower map[string]float64, + taskMonitor map[string][]def.Task, finishedTaskId string) (float64, error) { + // Validation + if totalPower == nil || taskMonitor == nil { + return 100.0, errors.New("Invalid argument: totalPower, taskMonitor") + } + + // determining the recap value by calling the regular recap(...) + toggle := false + recapValue, err := capper.recap(totalPower, taskMonitor, finishedTaskId) + if err == nil { + toggle = true + } + + // watts usage on each node in the cluster. + wattsUsages := make(map[string][]float64) + hostOfFinishedTask := "" + indexOfFinishedTask := -1 + for _, host := range constants.Hosts { + wattsUsages[host] = []float64{0.0} + } + for host, tasks := range taskMonitor { + for i, task := range tasks { + if task.TaskID == finishedTaskId { + hostOfFinishedTask = host + indexOfFinishedTask = i + // Not considering this task for the computation of totalAllocatedPower and totalRunningTasks + continue + } + wattsUsages[host] = append(wattsUsages[host], float64(task.Watts)*constants.CapMargin) + } + } + + // Updating task monitor. If recap(...) has deleted the finished task from the taskMonitor, + // then this will be ignored. Else (this is only when an error occured with recap(...)), we remove it here. + if hostOfFinishedTask != "" && indexOfFinishedTask != -1 { + log.Printf("Removing task with task [%s] from the list of running tasks\n", + taskMonitor[hostOfFinishedTask][indexOfFinishedTask].TaskID) + taskMonitor[hostOfFinishedTask] = append(taskMonitor[hostOfFinishedTask][:indexOfFinishedTask], + taskMonitor[hostOfFinishedTask][indexOfFinishedTask+1:]...) + } + + // Need to check whether there are still tasks running on the cluster. If not then we return an error. + clusterIdle := true + for _, tasks := range taskMonitor { + if len(tasks) > 0 { + clusterIdle = false + } + } + + if !clusterIdle { + // load on each node in the cluster. + loads := []float64{0.0} + for host, usages := range wattsUsages { + totalUsage := 0.0 + for _, usage := range usages { + totalUsage += usage + } + loads = append(loads, totalUsage/totalPower[host]) + } + + // Now need to compute the average load. + totalLoad := 0.0 + for _, load := range loads { + totalLoad += load + } + averageLoad := (totalLoad / float64(len(loads)) * 100.0) // this would be the cap value. + // If toggle is true, then we need to return the least recap value. + if toggle { + if averageLoad <= recapValue { + return averageLoad, nil + } else { + return recapValue, nil + } + } else { + return averageLoad, nil + } + } + return 100.0, errors.New("No task running on the cluster.") +} + +/* +Recapping the entire cluster. + +1. Remove the task that finished from the list of running tasks. +2. Compute the average allocated power of each of the tasks that are currently running. +3. For each host, determine the ratio of the average to the total power. +4. Determine the median of the ratios and this would be the new cluster wide cap. + +This needs to be called whenever a task finishes execution. +*/ +func (capper clusterwideCapper) recap(totalPower map[string]float64, + taskMonitor map[string][]def.Task, finishedTaskId string) (float64, error) { + // Validation + if totalPower == nil || taskMonitor == nil { + return 100.0, errors.New("Invalid argument: totalPower, taskMonitor") + } + totalAllocatedPower := 0.0 + totalRunningTasks := 0 + + hostOfFinishedTask := "" + indexOfFinishedTask := -1 + for host, tasks := range taskMonitor { + for i, task := range tasks { + if task.TaskID == finishedTaskId { + hostOfFinishedTask = host + indexOfFinishedTask = i + // Not considering this task for the computation of totalAllocatedPower and totalRunningTasks + continue + } + totalAllocatedPower += (float64(task.Watts) * constants.CapMargin) + totalRunningTasks++ + } + } + + // Updating task monitor + if hostOfFinishedTask != "" && indexOfFinishedTask != -1 { + log.Printf("Removing task with task [%s] from the list of running tasks\n", + taskMonitor[hostOfFinishedTask][indexOfFinishedTask].TaskID) + taskMonitor[hostOfFinishedTask] = append(taskMonitor[hostOfFinishedTask][:indexOfFinishedTask], + taskMonitor[hostOfFinishedTask][indexOfFinishedTask+1:]...) + } + + // For the last task, totalAllocatedPower and totalRunningTasks would be 0 + if totalAllocatedPower == 0 && totalRunningTasks == 0 { + return 100, errors.New("No task running on the cluster.") + } + + average := totalAllocatedPower / float64(totalRunningTasks) + ratios := []float64{} + for _, tpower := range totalPower { + ratios = append(ratios, (average/tpower)*100) + } + sort.Float64s(ratios) + median, err := stats.Median(ratios) + if err == nil { + return median, nil + } else { + return 100, err + } +} + +/* Quick sort algorithm to sort tasks, in place, in ascending order of power.*/ +func (capper clusterwideCapper) quickSort(low int, high int, tasksToSort *[]def.Task) { + i := low + j := high + // calculating the pivot + pivotIndex := low + (high-low)/2 + pivot := (*tasksToSort)[pivotIndex] + for i <= j { + for (*tasksToSort)[i].Watts < pivot.Watts { + i++ + } + for (*tasksToSort)[j].Watts > pivot.Watts { + j-- + } + if i <= j { + temp := (*tasksToSort)[i] + (*tasksToSort)[i] = (*tasksToSort)[j] + (*tasksToSort)[j] = temp + i++ + j-- + } + } + if low < j { + capper.quickSort(low, j, tasksToSort) + } + if i < high { + capper.quickSort(i, high, tasksToSort) + } +} + +// Sorting tasks in ascending order of requested watts. +func (capper clusterwideCapper) sortTasks(tasksToSort *[]def.Task) { + capper.quickSort(0, len(*tasksToSort)-1, tasksToSort) +} + +/* +Remove entry for finished task. +This function is called when a task completes. +This completed task needs to be removed from the window of tasks (if it is still present) + so that it doesn't contribute to the computation of the cap value. +*/ +func (capper clusterwideCapper) taskFinished(taskID string) { + // If the window is empty the just return. This condition should technically return false. + if capper.windowOfTasks.Len() == 0 { + return + } + + // Checking whether the task with the given taskID is currently present in the window of tasks. + var taskElementToRemove *list.Element + for taskElement := capper.windowOfTasks.Front(); taskElement != nil; taskElement = taskElement.Next() { + if tsk, ok := taskElement.Value.(*def.Task); ok { + if tsk.TaskID == taskID { + taskElementToRemove = taskElement + } + } + } + + // we need to remove the task from the window. + if taskToRemove, ok := taskElementToRemove.Value.(*def.Task); ok { + capper.windowOfTasks.Remove(taskElementToRemove) + capper.numberOfTasksInWindow -= 1 + capper.currentSum -= float64(taskToRemove.Watts) * constants.CapMargin + } +} + +// First come first serve scheduling. +func (capper clusterwideCapper) fcfsDetermineCap(totalPower map[string]float64, + newTask *def.Task) (float64, error) { + // Validation + if totalPower == nil { + return 100, errors.New("Invalid argument: totalPower") + } else { + // Need to calculate the running average + runningAverage := capper.runningAverageOfWatts(newTask) + // For each node, calculate the percentage of the running average to the total power. + runningAverageToTotalPowerPercentage := make(map[string]float64) + for host, tpower := range totalPower { + if tpower >= runningAverage { + runningAverageToTotalPowerPercentage[host] = (runningAverage / tpower) * 100 + } else { + // We don't consider this host for the computation of the cluster wide cap. + } + } + + // Determine the cluster wide cap value. + capValue := capper.getCap(runningAverageToTotalPowerPercentage) + // Need to cap the cluster to this value. + return capValue, nil + } +} + +// Stringer for an instance of clusterwideCapper +func (capper clusterwideCapper) string() string { + return "Cluster Capper -- Proactively cap the entire cluster." +} diff --git a/schedulers/proactiveclusterwidecappingfcfs.go b/schedulers/proactiveclusterwidecappingfcfs.go new file mode 100644 index 0000000..a96d496 --- /dev/null +++ b/schedulers/proactiveclusterwidecappingfcfs.go @@ -0,0 +1,409 @@ +package schedulers + +import ( + "bitbucket.org/sunybingcloud/electron/constants" + "bitbucket.org/sunybingcloud/electron/def" + "bitbucket.org/sunybingcloud/electron/rapl" + "fmt" + "github.com/golang/protobuf/proto" + mesos "github.com/mesos/mesos-go/mesosproto" + "github.com/mesos/mesos-go/mesosutil" + sched "github.com/mesos/mesos-go/scheduler" + "log" + "math" + "strings" + "sync" + "time" +) + +// Decides if to take an offer or not +func (_ *ProactiveClusterwideCapFCFS) takeOffer(offer *mesos.Offer, task def.Task) bool { + offer_cpu, offer_mem, offer_watts := OfferAgg(offer) + + if offer_cpu >= task.CPU && offer_mem >= task.RAM && offer_watts >= task.Watts { + return true + } + return false +} + +// electronScheduler implements the Scheduler interface. +type ProactiveClusterwideCapFCFS struct { + tasksCreated int + tasksRunning int + tasks []def.Task + metrics map[string]def.Metric + running map[string]map[string]bool + taskMonitor map[string][]def.Task // store tasks that are currently running. + availablePower map[string]float64 // available power for each node in the cluster. + totalPower map[string]float64 // total power for each node in the cluster. + ignoreWatts bool + capper *clusterwideCapper + ticker *time.Ticker + recapTicker *time.Ticker + isCapping bool // indicate whether we are currently performing cluster wide capping. + isRecapping bool // indicate whether we are currently performing cluster wide re-capping. + + // First set of PCP values are garbage values, signal to logger to start recording when we're + // about to schedule the new task. + RecordPCP bool + + // This channel is closed when the program receives an interrupt, + // signalling that the program should shut down. + Shutdown chan struct{} + + // This channel is closed after shutdown is closed, and only when all + // outstanding tasks have been cleaned up. + Done chan struct{} + + // Controls when to shutdown pcp logging. + PCPLog chan struct{} +} + +// New electron scheduler. +func NewProactiveClusterwideCapFCFS(tasks []def.Task, ignoreWatts bool) *ProactiveClusterwideCapFCFS { + s := &ProactiveClusterwideCapFCFS{ + tasks: tasks, + ignoreWatts: ignoreWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + taskMonitor: make(map[string][]def.Task), + availablePower: make(map[string]float64), + totalPower: make(map[string]float64), + RecordPCP: false, + capper: getClusterwideCapperInstance(), + ticker: time.NewTicker(10 * time.Second), + recapTicker: time.NewTicker(20 * time.Second), + isCapping: false, + isRecapping: false, + } + return s +} + +// mutex +var fcfsMutex sync.Mutex + +func (s *ProactiveClusterwideCapFCFS) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { + taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) + s.tasksCreated++ + + if !s.RecordPCP { + // Turn on logging. + s.RecordPCP = true + time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts + } + + // If this is our first time running into this Agent + if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok { + s.running[offer.GetSlaveId().GoString()] = make(map[string]bool) + } + + // Setting the task ID to the task. This is done so that we can consider each task to be different, + // even though they have the same parameters. + task.SetTaskID(*proto.String("electron-" + taskName)) + // Add task to the list of tasks running on the node. + s.running[offer.GetSlaveId().GoString()][taskName] = true + if len(s.taskMonitor[*offer.Hostname]) == 0 { + s.taskMonitor[*offer.Hostname] = []def.Task{task} + } else { + s.taskMonitor[*offer.Hostname] = append(s.taskMonitor[*offer.Hostname], task) + } + + resources := []*mesos.Resource{ + mesosutil.NewScalarResource("cpus", task.CPU), + mesosutil.NewScalarResource("mem", task.RAM), + } + + if !s.ignoreWatts { + resources = append(resources, mesosutil.NewScalarResource("watts", task.Watts)) + } + + return &mesos.TaskInfo{ + Name: proto.String(taskName), + TaskId: &mesos.TaskID{ + Value: proto.String("electron-" + taskName), + }, + SlaveId: offer.SlaveId, + Resources: resources, + Command: &mesos.CommandInfo{ + Value: proto.String(task.CMD), + }, + Container: &mesos.ContainerInfo{ + Type: mesos.ContainerInfo_DOCKER.Enum(), + Docker: &mesos.ContainerInfo_DockerInfo{ + Image: proto.String(task.Image), + Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated + }, + }, + } +} + +func (s *ProactiveClusterwideCapFCFS) Registered( + _ sched.SchedulerDriver, + frameworkID *mesos.FrameworkID, + masterInfo *mesos.MasterInfo) { + log.Printf("Framework %s registered with master %s", frameworkID, masterInfo) +} + +func (s *ProactiveClusterwideCapFCFS) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) { + log.Printf("Framework re-registered with master %s", masterInfo) +} + +func (s *ProactiveClusterwideCapFCFS) Disconnected(sched.SchedulerDriver) { + // Need to stop the capping process. + s.ticker.Stop() + s.recapTicker.Stop() + fcfsMutex.Lock() + s.isCapping = false + fcfsMutex.Unlock() + log.Println("Framework disconnected with master") +} + +// go routine to cap the entire cluster in regular intervals of time. +var fcfsCurrentCapValue = 0.0 // initial value to indicate that we haven't capped the cluster yet. +func (s *ProactiveClusterwideCapFCFS) startCapping() { + go func() { + for { + select { + case <-s.ticker.C: + // Need to cap the cluster to the fcfsCurrentCapValue. + fcfsMutex.Lock() + if fcfsCurrentCapValue > 0.0 { + for _, host := range constants.Hosts { + // Rounding curreCapValue to the nearest int. + if err := rapl.Cap(host, "rapl", int(math.Floor(fcfsCurrentCapValue+0.5))); err != nil { + log.Println(err) + } + } + log.Printf("Capped the cluster to %d", int(math.Floor(fcfsCurrentCapValue+0.5))) + } + fcfsMutex.Unlock() + } + } + }() +} + +// go routine to cap the entire cluster in regular intervals of time. +var fcfsRecapValue = 0.0 // The cluster wide cap value when recapping. +func (s *ProactiveClusterwideCapFCFS) startRecapping() { + go func() { + for { + select { + case <-s.recapTicker.C: + fcfsMutex.Lock() + // If stopped performing cluster wide capping then we need to explicitly cap the entire cluster. + if s.isRecapping && fcfsRecapValue > 0.0 { + for _, host := range constants.Hosts { + // Rounding curreCapValue to the nearest int. + if err := rapl.Cap(host, "rapl", int(math.Floor(fcfsRecapValue+0.5))); err != nil { + log.Println(err) + } + } + log.Printf("Recapped the cluster to %d", int(math.Floor(fcfsRecapValue+0.5))) + } + // setting recapping to false + s.isRecapping = false + fcfsMutex.Unlock() + } + } + }() +} + +// Stop cluster wide capping +func (s *ProactiveClusterwideCapFCFS) stopCapping() { + if s.isCapping { + log.Println("Stopping the cluster wide capping.") + s.ticker.Stop() + fcfsMutex.Lock() + s.isCapping = false + s.isRecapping = true + fcfsMutex.Unlock() + } +} + +// Stop cluster wide Recapping +func (s *ProactiveClusterwideCapFCFS) stopRecapping() { + // If not capping, then definitely recapping. + if !s.isCapping && s.isRecapping { + log.Println("Stopping the cluster wide re-capping.") + s.recapTicker.Stop() + fcfsMutex.Lock() + s.isRecapping = false + fcfsMutex.Unlock() + } +} + +func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { + log.Printf("Received %d resource offers", len(offers)) + + // retrieving the available power for all the hosts in the offers. + for _, offer := range offers { + _, _, offer_watts := OfferAgg(offer) + s.availablePower[*offer.Hostname] = offer_watts + // setting total power if the first time. + if _, ok := s.totalPower[*offer.Hostname]; !ok { + s.totalPower[*offer.Hostname] = offer_watts + } + } + + for host, tpower := range s.totalPower { + log.Printf("TotalPower[%s] = %f", host, tpower) + } + + for _, offer := range offers { + select { + case <-s.Shutdown: + log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") + driver.DeclineOffer(offer.Id, longFilter) + + log.Println("Number of tasks still running: ", s.tasksRunning) + continue + default: + } + + /* + Clusterwide Capping strategy + + For each task in s.tasks, + 1. Need to check whether the offer can be taken or not (based on CPU and RAM requirements). + 2. If the tasks fits the offer, then I need to detemrine the cluster wide cap. + 3. fcfsCurrentCapValue is updated with the determined cluster wide cap. + + Cluster wide capping is currently performed at regular intervals of time. + */ + taken := false + + for i, task := range s.tasks { + // Don't take offer if it doesn't match our task's host requirement. + if !strings.HasPrefix(*offer.Hostname, task.Host) { + continue + } + + // Does the task fit. + if s.takeOffer(offer, task) { + // Capping the cluster if haven't yet started, + if !s.isCapping { + fcfsMutex.Lock() + s.isCapping = true + fcfsMutex.Unlock() + s.startCapping() + } + taken = true + tempCap, err := s.capper.fcfsDetermineCap(s.totalPower, &task) + + if err == nil { + fcfsMutex.Lock() + fcfsCurrentCapValue = tempCap + fcfsMutex.Unlock() + } else { + log.Printf("Failed to determine new cluster wide cap: ") + log.Println(err) + } + log.Printf("Starting on [%s]\n", offer.GetHostname()) + toSchedule := []*mesos.TaskInfo{s.newTask(offer, task)} + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, toSchedule, defaultFilter) + log.Printf("Inst: %d", *task.Instances) + *task.Instances-- + if *task.Instances <= 0 { + // All instances of the task have been scheduled. Need to remove it from the list of tasks to schedule. + s.tasks[i] = s.tasks[len(s.tasks)-1] + s.tasks = s.tasks[:len(s.tasks)-1] + + if len(s.tasks) <= 0 { + log.Println("Done scheduling all tasks") + // Need to stop the cluster wide capping as there aren't any more tasks to schedule. + s.stopCapping() + s.startRecapping() // Load changes after every task finishes and hence we need to change the capping of the cluster. + close(s.Shutdown) + } + } + break // Offer taken, move on. + } else { + // Task doesn't fit the offer. Move onto the next offer. + } + } + + // If no task fit the offer, then declining the offer. + if !taken { + log.Printf("There is not enough resources to launch a task on Host: %s\n", offer.GetHostname()) + cpus, mem, watts := OfferAgg(offer) + + log.Printf("\n", cpus, mem, watts) + driver.DeclineOffer(offer.Id, defaultFilter) + } + } +} + +func (s *ProactiveClusterwideCapFCFS) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { + log.Printf("Received task status [%s] for task [%s]\n", NameFor(status.State), *status.TaskId.Value) + + if *status.State == mesos.TaskState_TASK_RUNNING { + fcfsMutex.Lock() + s.tasksRunning++ + fcfsMutex.Unlock() + } else if IsTerminal(status.State) { + delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) + // Need to remove the task from the window of tasks. + s.capper.taskFinished(*status.TaskId.Value) + // Determining the new cluster wide cap. + //tempCap, err := s.capper.recap(s.totalPower, s.taskMonitor, *status.TaskId.Value) + tempCap, err := s.capper.cleverRecap(s.totalPower, s.taskMonitor, *status.TaskId.Value) + if err == nil { + // if new determined cap value is different from the current recap value then we need to recap. + if int(math.Floor(tempCap+0.5)) != int(math.Floor(fcfsRecapValue+0.5)) { + fcfsRecapValue = tempCap + fcfsMutex.Lock() + s.isRecapping = true + fcfsMutex.Unlock() + log.Printf("Determined re-cap value: %f\n", fcfsRecapValue) + } else { + fcfsMutex.Lock() + s.isRecapping = false + fcfsMutex.Unlock() + } + } else { + // Not updating fcfsCurrentCapValue + log.Println(err) + } + + fcfsMutex.Lock() + s.tasksRunning-- + fcfsMutex.Unlock() + if s.tasksRunning == 0 { + select { + case <-s.Shutdown: + // Need to stop the recapping process. + s.stopRecapping() + close(s.Done) + default: + } + } + } + log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) +} + +func (s *ProactiveClusterwideCapFCFS) FrameworkMessage(driver sched.SchedulerDriver, + executorID *mesos.ExecutorID, + slaveID *mesos.SlaveID, + message string) { + + log.Println("Getting a framework message: ", message) + log.Printf("Received a framework message from some unknown source: %s", *executorID.Value) +} + +func (s *ProactiveClusterwideCapFCFS) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) { + log.Printf("Offer %s rescinded", offerID) +} + +func (s *ProactiveClusterwideCapFCFS) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) { + log.Printf("Slave %s lost", slaveID) +} + +func (s *ProactiveClusterwideCapFCFS) ExecutorLost(_ sched.SchedulerDriver, executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, status int) { + log.Printf("Executor %s on slave %s was lost", executorID, slaveID) +} + +func (s *ProactiveClusterwideCapFCFS) Error(_ sched.SchedulerDriver, err string) { + log.Printf("Receiving an error: %s", err) +} diff --git a/schedulers/proactiveclusterwidecappingranked.go b/schedulers/proactiveclusterwidecappingranked.go new file mode 100644 index 0000000..69ae26f --- /dev/null +++ b/schedulers/proactiveclusterwidecappingranked.go @@ -0,0 +1,432 @@ +/* +Ranked based cluster wide capping. + +Note: Sorting the tasks right in the beginning, in ascending order of watts. + You are hence certain that the tasks that didn't fit are the ones that require more resources, + and hence, you can find a way to address that issue. + On the other hand, if you use first fit to fit the tasks and then sort them to determine the cap, + you are never certain as which tasks are the ones that don't fit and hence, it becomes much harder + to address this issue. +*/ +package schedulers + +import ( + "bitbucket.org/sunybingcloud/electron/constants" + "bitbucket.org/sunybingcloud/electron/def" + "bitbucket.org/sunybingcloud/electron/rapl" + "fmt" + "github.com/golang/protobuf/proto" + mesos "github.com/mesos/mesos-go/mesosproto" + "github.com/mesos/mesos-go/mesosutil" + sched "github.com/mesos/mesos-go/scheduler" + "log" + "math" + "strings" + "sync" + "time" +) + +// Decides if to taken an offer or not +func (_ *ProactiveClusterwideCapRanked) takeOffer(offer *mesos.Offer, task def.Task) bool { + offer_cpu, offer_mem, offer_watts := OfferAgg(offer) + + if offer_cpu >= task.CPU && offer_mem >= task.RAM && offer_watts >= task.Watts { + return true + } + return false +} + +// electronScheduler implements the Scheduler interface +type ProactiveClusterwideCapRanked struct { + tasksCreated int + tasksRunning int + tasks []def.Task + metrics map[string]def.Metric + running map[string]map[string]bool + taskMonitor map[string][]def.Task // store tasks that are currently running. + availablePower map[string]float64 // available power for each node in the cluster. + totalPower map[string]float64 // total power for each node in the cluster. + ignoreWatts bool + capper *clusterwideCapper + ticker *time.Ticker + recapTicker *time.Ticker + isCapping bool // indicate whether we are currently performing cluster wide capping. + isRecapping bool // indicate whether we are currently performing cluster wide re-capping. + + // First set of PCP values are garbage values, signal to logger to start recording when we're + // about to schedule the new task. + RecordPCP bool + + // This channel is closed when the program receives an interrupt, + // signalling that the program should shut down. + Shutdown chan struct{} + + // This channel is closed after shutdown is closed, and only when all + // outstanding tasks have been cleaned up. + Done chan struct{} + + // Controls when to shutdown pcp logging. + PCPLog chan struct{} +} + +// New electron scheduler. +func NewProactiveClusterwideCapRanked(tasks []def.Task, ignoreWatts bool) *ProactiveClusterwideCapRanked { + s := &ProactiveClusterwideCapRanked{ + tasks: tasks, + ignoreWatts: ignoreWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + taskMonitor: make(map[string][]def.Task), + availablePower: make(map[string]float64), + totalPower: make(map[string]float64), + RecordPCP: false, + capper: getClusterwideCapperInstance(), + ticker: time.NewTicker(10 * time.Second), + recapTicker: time.NewTicker(20 * time.Second), + isCapping: false, + isRecapping: false, + } + return s +} + +// mutex +var rankedMutex sync.Mutex + +func (s *ProactiveClusterwideCapRanked) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { + taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) + s.tasksCreated++ + + if !s.RecordPCP { + // Turn on logging. + s.RecordPCP = true + time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts + } + + // If this is our first time running into this Agent + if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok { + s.running[offer.GetSlaveId().GoString()] = make(map[string]bool) + } + + // Setting the task ID to the task. This is done so that we can consider each task to be different, + // even though they have the same parameters. + task.SetTaskID(*proto.String("electron-" + taskName)) + // Add task to the list of tasks running on the node. + s.running[offer.GetSlaveId().GoString()][taskName] = true + if len(s.taskMonitor[*offer.Hostname]) == 0 { + s.taskMonitor[*offer.Hostname] = []def.Task{task} + } else { + s.taskMonitor[*offer.Hostname] = append(s.taskMonitor[*offer.Hostname], task) + } + + resources := []*mesos.Resource{ + mesosutil.NewScalarResource("cpus", task.CPU), + mesosutil.NewScalarResource("mem", task.RAM), + } + + if !s.ignoreWatts { + resources = append(resources, mesosutil.NewScalarResource("watts", task.Watts)) + } + + return &mesos.TaskInfo{ + Name: proto.String(taskName), + TaskId: &mesos.TaskID{ + Value: proto.String("electron-" + taskName), + }, + SlaveId: offer.SlaveId, + Resources: resources, + Command: &mesos.CommandInfo{ + Value: proto.String(task.CMD), + }, + Container: &mesos.ContainerInfo{ + Type: mesos.ContainerInfo_DOCKER.Enum(), + Docker: &mesos.ContainerInfo_DockerInfo{ + Image: proto.String(task.Image), + Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated + }, + }, + } +} + +func (s *ProactiveClusterwideCapRanked) Registered( + _ sched.SchedulerDriver, + frameworkID *mesos.FrameworkID, + masterInfo *mesos.MasterInfo) { + log.Printf("Framework %s registered with master %s", frameworkID, masterInfo) +} + +func (s *ProactiveClusterwideCapRanked) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) { + log.Printf("Framework re-registered with master %s", masterInfo) +} + +func (s *ProactiveClusterwideCapRanked) Disconnected(sched.SchedulerDriver) { + // Need to stop the capping process. + s.ticker.Stop() + s.recapTicker.Stop() + rankedMutex.Lock() + s.isCapping = false + rankedMutex.Unlock() + log.Println("Framework disconnected with master") +} + +// go routine to cap the entire cluster in regular intervals of time. +var rankedCurrentCapValue = 0.0 // initial value to indicate that we haven't capped the cluster yet. +func (s *ProactiveClusterwideCapRanked) startCapping() { + go func() { + for { + select { + case <-s.ticker.C: + // Need to cap the cluster to the rankedCurrentCapValue. + rankedMutex.Lock() + if rankedCurrentCapValue > 0.0 { + for _, host := range constants.Hosts { + // Rounding curreCapValue to the nearest int. + if err := rapl.Cap(host, "rapl", int(math.Floor(rankedCurrentCapValue+0.5))); err != nil { + log.Println(err) + } + } + log.Printf("Capped the cluster to %d", int(math.Floor(rankedCurrentCapValue+0.5))) + } + rankedMutex.Unlock() + } + } + }() +} + +// go routine to cap the entire cluster in regular intervals of time. +var rankedRecapValue = 0.0 // The cluster wide cap value when recapping. +func (s *ProactiveClusterwideCapRanked) startRecapping() { + go func() { + for { + select { + case <-s.recapTicker.C: + rankedMutex.Lock() + // If stopped performing cluster wide capping then we need to explicitly cap the entire cluster. + if s.isRecapping && rankedRecapValue > 0.0 { + for _, host := range constants.Hosts { + // Rounding curreCapValue to the nearest int. + if err := rapl.Cap(host, "rapl", int(math.Floor(rankedRecapValue+0.5))); err != nil { + log.Println(err) + } + } + log.Printf("Recapped the cluster to %d", int(math.Floor(rankedRecapValue+0.5))) + } + // setting recapping to false + s.isRecapping = false + rankedMutex.Unlock() + } + } + }() +} + +// Stop cluster wide capping +func (s *ProactiveClusterwideCapRanked) stopCapping() { + if s.isCapping { + log.Println("Stopping the cluster wide capping.") + s.ticker.Stop() + fcfsMutex.Lock() + s.isCapping = false + s.isRecapping = true + fcfsMutex.Unlock() + } +} + +// Stop cluster wide Recapping +func (s *ProactiveClusterwideCapRanked) stopRecapping() { + // If not capping, then definitely recapping. + if !s.isCapping && s.isRecapping { + log.Println("Stopping the cluster wide re-capping.") + s.recapTicker.Stop() + fcfsMutex.Lock() + s.isRecapping = false + fcfsMutex.Unlock() + } +} + +func (s *ProactiveClusterwideCapRanked) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { + log.Printf("Received %d resource offers", len(offers)) + + // retrieving the available power for all the hosts in the offers. + for _, offer := range offers { + _, _, offer_watts := OfferAgg(offer) + s.availablePower[*offer.Hostname] = offer_watts + // setting total power if the first time. + if _, ok := s.totalPower[*offer.Hostname]; !ok { + s.totalPower[*offer.Hostname] = offer_watts + } + } + + for host, tpower := range s.totalPower { + log.Printf("TotalPower[%s] = %f", host, tpower) + } + + // sorting the tasks in ascending order of watts. + if (len(s.tasks) > 0) { + s.capper.sortTasks(&s.tasks) + // calculating the total number of tasks ranked. + numberOfRankedTasks := 0 + for _, task := range s.tasks { + numberOfRankedTasks += *task.Instances + } + log.Printf("Ranked %d tasks in ascending order of tasks.", numberOfRankedTasks) + } + for _, offer := range offers { + select { + case <-s.Shutdown: + log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") + driver.DeclineOffer(offer.Id, longFilter) + + log.Println("Number of tasks still running: ", s.tasksRunning) + continue + default: + } + + /* + Ranked cluster wide capping strategy + + For each task in the sorted tasks, + 1. Need to check whether the offer can be taken or not (based on CPU, RAM and WATTS requirements). + 2. If the task fits the offer, then need to determine the cluster wide cap.' + 3. rankedCurrentCapValue is updated with the determined cluster wide cap. + + Once we are done scheduling all the tasks, + we start recalculating the cluster wide cap each time a task finishes. + + Cluster wide capping is currently performed at regular intervals of time. + */ + taken := false + + for i, task := range s.tasks { + // Don't take offer if it doesn't match our task's host requirement. + if !strings.HasPrefix(*offer.Hostname, task.Host) { + continue + } + + // Does the task fit. + if s.takeOffer(offer, task) { + // Capping the cluster if haven't yet started + if !s.isCapping { + rankedMutex.Lock() + s.isCapping = true + rankedMutex.Unlock() + s.startCapping() + } + taken = true + tempCap, err := s.capper.fcfsDetermineCap(s.totalPower, &task) + + if err == nil { + rankedMutex.Lock() + rankedCurrentCapValue = tempCap + rankedMutex.Unlock() + } else { + log.Println("Failed to determine the new cluster wide cap: ", err) + } + log.Printf("Starting on [%s]\n", offer.GetHostname()) + to_schedule := []*mesos.TaskInfo{s.newTask(offer, task)} + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, to_schedule, defaultFilter) + log.Printf("Inst: %d", *task.Instances) + *task.Instances-- + if *task.Instances <= 0 { + // All instances of the task have been scheduled. Need to remove it from the list of tasks to schedule. + s.tasks[i] = s.tasks[len(s.tasks)-1] + s.tasks = s.tasks[:len(s.tasks)-1] + + if len(s.tasks) <= 0 { + log.Println("Done scheduling all tasks") + // Need to stop the cluster wide capping as there aren't any more tasks to schedule. + s.stopCapping() + s.startRecapping() + close(s.Shutdown) + } + } + break // Offer taken, move on. + } else { + // Task doesn't fit the offer. Move onto the next offer. + } + } + + // If no tasks fit the offer, then declining the offer. + if !taken { + log.Printf("There is not enough resources to launch a task on Host: %s\n", offer.GetHostname()) + cpus, mem, watts := OfferAgg(offer) + + log.Printf("\n", cpus, mem, watts) + driver.DeclineOffer(offer.Id, defaultFilter) + } + } +} + +func (s *ProactiveClusterwideCapRanked) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { + log.Printf("Received task status [%s] for task [%s]\n", NameFor(status.State), *status.TaskId.Value) + + if *status.State == mesos.TaskState_TASK_RUNNING { + rankedMutex.Lock() + s.tasksRunning++ + rankedMutex.Unlock() + } else if IsTerminal(status.State) { + delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) + rankedMutex.Lock() + s.tasksRunning-- + rankedMutex.Unlock() + if s.tasksRunning == 0 { + select { + case <-s.Shutdown: + // Need to stop the recapping process. + s.stopRecapping() + close(s.Done) + default: + } + } else { + // Need to remove the task from the window + s.capper.taskFinished(*status.TaskId.Value) + // Determining the new cluster wide cap. + //tempCap, err := s.capper.recap(s.totalPower, s.taskMonitor, *status.TaskId.Value) + tempCap, err := s.capper.cleverRecap(s.totalPower, s.taskMonitor, *status.TaskId.Value) + + if err == nil { + // If new determined cap value is different from the current recap value then we need to recap. + if int(math.Floor(tempCap+0.5)) != int(math.Floor(rankedRecapValue+0.5)) { + rankedRecapValue = tempCap + rankedMutex.Lock() + s.isRecapping = true + rankedMutex.Unlock() + log.Printf("Determined re-cap value: %f\n", rankedRecapValue) + } else { + rankedMutex.Lock() + s.isRecapping = false + rankedMutex.Unlock() + } + } else { + // Not updating rankedCurrentCapValue + log.Println(err) + } + } + } + log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) +} + +func (s *ProactiveClusterwideCapRanked) FrameworkMessage(driver sched.SchedulerDriver, + executorID *mesos.ExecutorID, + slaveID *mesos.SlaveID, + message string) { + + log.Println("Getting a framework message: ", message) + log.Printf("Received a framework message from some unknown source: %s", *executorID.Value) +} + +func (s *ProactiveClusterwideCapRanked) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) { + log.Printf("Offer %s rescinded", offerID) +} + +func (s *ProactiveClusterwideCapRanked) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) { + log.Printf("Slave %s lost", slaveID) +} + +func (s *ProactiveClusterwideCapRanked) ExecutorLost(_ sched.SchedulerDriver, executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, status int) { + log.Printf("Executor %s on slave %s was lost", executorID, slaveID) +} + +func (s *ProactiveClusterwideCapRanked) Error(_ sched.SchedulerDriver, err string) { + log.Printf("Receiving an error: %s", err) +} diff --git a/utilities/utils.go b/utilities/utils.go new file mode 100644 index 0000000..c53df74 --- /dev/null +++ b/utilities/utils.go @@ -0,0 +1,54 @@ +package utilities + +import "errors" + +/* +The Pair and PairList have been taken from google groups forum, +https://groups.google.com/forum/#!topic/golang-nuts/FT7cjmcL7gw +*/ + +// Utility struct that helps in sorting the available power by value. +type Pair struct { + Key string + Value float64 +} + +// A slice of pairs that implements the sort.Interface to sort by value. +type PairList []Pair + +// Swap pairs in the PairList +func (plist PairList) Swap(i, j int) { + plist[i], plist[j] = plist[j], plist[i] +} + +// function to return the length of the pairlist. +func (plist PairList) Len() int { + return len(plist) +} + +// function to compare two elements in pairlist. +func (plist PairList) Less(i, j int) bool { + return plist[i].Value < plist[j].Value +} + +// convert a PairList to a map[string]float64 +func OrderedKeys(plist PairList) ([]string, error) { + // Validation + if plist == nil { + return nil, errors.New("Invalid argument: plist") + } + orderedKeys := make([]string, len(plist)) + for _, pair := range plist { + orderedKeys = append(orderedKeys, pair.Key) + } + return orderedKeys, nil +} + +// determine the max value +func Max(a, b float64) float64 { + if a > b { + return a + } else { + return b + } +}