Merged in pk_dev (pull request #1)

Pk dev
This commit is contained in:
Renan DelValle 2016-12-22 23:08:10 -05:00
commit 620c81466c
16 changed files with 1417 additions and 17 deletions

View file

@ -5,19 +5,21 @@ To Do:
* Create metrics for each task launched [Time to schedule, run time, power used]
* Have calibration phase?
* Add ability to use constraints
* Add ability to use constraints
* Running average calculations https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
* Make parameters corresponding to each scheduler configurable (possible to have a config template for each scheduler?)
**Requires Performance-Copilot tool pmdumptext to be installed on the
machine on which electron is launched for logging to work**
How to run (Use the --help option to get information about other command-line options):
How to run:
`./electron -workload <workload json>`
`./electron -workload <workload.json> -ignoreWatts <true or false>`
To run electron with ignoreWatts, run the following command,
`./electron -workload <workload json> -ignoreWatts`
Workload schema:
@ -43,4 +45,4 @@ Workload schema:
"inst": 9
}
]
```
```

82
constants/constants.go Normal file
View file

@ -0,0 +1,82 @@
/*
Constants that are used across scripts
1. The available hosts = stratos-00x (x varies from 1 to 8)
2. cap_margin = percentage of the requested power to allocate
3. power_threshold = overloading factor
5. window_size = number of tasks to consider for computation of the dynamic cap.
Also, exposing functions to update or initialize some of the constants.
*/
package constants
var Hosts = []string{"stratos-001.cs.binghamton.edu", "stratos-002.cs.binghamton.edu",
"stratos-003.cs.binghamton.edu", "stratos-004.cs.binghamton.edu",
"stratos-005.cs.binghamton.edu", "stratos-006.cs.binghamton.edu",
"stratos-007.cs.binghamton.edu", "stratos-008.cs.binghamton.edu"}
// Add a new host to the slice of hosts.
func AddNewHost(newHost string) bool {
// Validation
if newHost == "" {
return false
} else {
Hosts = append(Hosts, newHost)
return true
}
}
/*
Lower bound of the percentage of requested power, that can be allocated to a task.
Note: This constant is not used for the proactive cluster wide capping schemes.
*/
var PowerThreshold = 0.6 // Right now saying that a task will never be given lesser than 60% of the power it requested.
/*
Margin with respect to the required power for a job.
So, if power required = 10W, the node would be capped to 75%*10W.
This value can be changed upon convenience.
*/
var CapMargin = 0.70
// Modify the cap margin.
func UpdateCapMargin(newCapMargin float64) bool {
// Checking if the new_cap_margin is less than the power threshold.
if newCapMargin < StarvationFactor {
return false
} else {
CapMargin = newCapMargin
return true
}
}
/*
The factor, that when multiplied with (task.Watts * CapMargin) results in (task.Watts * PowerThreshold).
This is used to check whether available power, for a host in an offer, is not less than (PowerThreshold * task.Watts),
which is assumed to result in starvation of the task.
Here is an example,
Suppose a task requires 100W of power. Assuming CapMargin = 0.75 and PowerThreshold = 0.6.
So, the assumed allocated watts is 75W.
Now, when we get an offer, we need to check whether the available power, for the host in that offer, is
not less than 60% (the PowerTreshold) of the requested power (100W).
To put it in other words,
availablePower >= 100W * 0.75 * X
where X is the StarvationFactor (80% in this case)
Note: This constant is not used for the proactive cluster wide capping schemes.
*/
var StarvationFactor = PowerThreshold / CapMargin
// Window size for running average
var WindowSize = 160
// Update the window size.
func UpdateWindowSize(newWindowSize int) bool {
// Validation
if newWindowSize == 0 {
return false
} else {
WindowSize = newWindowSize
return true
}
}

View file

@ -1,6 +1,7 @@
package def
import (
"bitbucket.org/sunybingcloud/electron/constants"
"encoding/json"
"github.com/pkg/errors"
"os"
@ -15,6 +16,7 @@ type Task struct {
CMD string `json:"cmd"`
Instances *int `json:"inst"`
Host string `json:"host"`
TaskID string `json:"taskID"`
}
func TasksFromJSON(uri string) ([]Task, error) {
@ -34,6 +36,34 @@ func TasksFromJSON(uri string) ([]Task, error) {
return tasks, nil
}
// Update the host on which the task needs to be scheduled.
func (tsk *Task) UpdateHost(newHost string) bool {
// Validation
isCorrectHost := false
for _, existingHost := range constants.Hosts {
if newHost == existingHost {
isCorrectHost = true
}
}
if !isCorrectHost {
return false
} else {
tsk.Host = newHost
return true
}
}
// Set the taskID of the task.
func (tsk *Task) SetTaskID(taskID string) bool {
// Validation
if taskID == "" {
return false
} else {
tsk.TaskID = taskID
return true
}
}
type WattsSorter []Task
func (slice WattsSorter) Len() int {
@ -47,3 +77,16 @@ func (slice WattsSorter) Less(i, j int) bool {
func (slice WattsSorter) Swap(i, j int) {
slice[i], slice[j] = slice[j], slice[i]
}
// Compare two tasks.
func Compare(task1 *Task, task2 *Task) bool {
// If comparing the same pointers (checking the addresses).
if task1 == task2 {
return true
}
if task1.TaskID != task2.TaskID {
return false
} else {
return true
}
}

View file

@ -1,7 +1,7 @@
package pcp
import (
"bitbucket.org/bingcloud/electron/rapl"
"bitbucket.org/sunybingcloud/electron/rapl"
"bufio"
"container/ring"
"log"

View file

@ -19,6 +19,7 @@ func Start(quit chan struct{}, logging *bool, prefix string) {
if err != nil {
log.Fatal(err)
}
log.Println("Writing pcp logs to file: " + logFile.Name())
defer logFile.Close()

View file

@ -26,6 +26,7 @@ func Cap(host, username string, percentage int) error {
}
session, err := connection.NewSession()
defer session.Close()
if err != nil {
return errors.Wrap(err, "Failed to create session")
}

View file

@ -1,9 +1,9 @@
package main
import (
"bitbucket.org/bingcloud/electron/def"
"bitbucket.org/bingcloud/electron/pcp"
"bitbucket.org/bingcloud/electron/schedulers"
"bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/schedulers"
"bitbucket.org/sunybingcloud/electron/pcp"
"flag"
"fmt"
"github.com/golang/protobuf/proto"
@ -56,7 +56,7 @@ func main() {
fmt.Println(task)
}
scheduler := schedulers.NewFirstFit(tasks, *ignoreWatts)
scheduler := schedulers.NewProactiveClusterwideCapRanked(tasks, *ignoreWatts)
driver, err := sched.NewMesosSchedulerDriver(sched.DriverConfig{
Master: *master,
Framework: &mesos.FrameworkInfo{
@ -70,8 +70,8 @@ func main() {
return
}
//go pcp.Start(scheduler.PCPLog, &scheduler.RecordPCP, *pcplogPrefix)
go pcp.StartLogAndDynamicCap(scheduler.PCPLog, &scheduler.RecordPCP, *pcplogPrefix, *hiThreshold, *loThreshold)
go pcp.Start(scheduler.PCPLog, &scheduler.RecordPCP, *pcplogPrefix)
//go pcp.StartLogAndDynamicCap(scheduler.PCPLog, &scheduler.RecordPCP, *pcplogPrefix, *hiThreshold, *loThreshold)
time.Sleep(1 * time.Second)
// Attempt to handle signint to not leave pmdumptext running

15
schedulers/README.md Normal file
View file

@ -0,0 +1,15 @@
Electron: Scheduling Algorithms
================================
To Do:
* Design changes -- Possible to have one scheduler with different scheduling schemes?
* Make the running average calculation generic, so that schedulers in the future can use it and not implement their own.
Scheduling Algorithms:
* Bin-packing with sorted watts
* FCFS Proactive Cluster-wide Capping
* Ranked Proactive Cluster-wide Capping
* First Fit
* First Fit with sorted watts

View file

@ -1,7 +1,7 @@
package schedulers
import (
"bitbucket.org/bingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/def"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"

View file

@ -1,7 +1,7 @@
package schedulers
import (
"bitbucket.org/bingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/def"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"

View file

@ -1,7 +1,7 @@
package schedulers
import (
"bitbucket.org/bingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/def"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"

View file

@ -1,7 +1,7 @@
package schedulers
import (
"bitbucket.org/bingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/def"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"

View file

@ -0,0 +1,361 @@
/*
Cluster wide dynamic capping
Step1. Compute the running average of watts of tasks in window.
Step2. Compute what percentage of total power of each node, is the running average.
Step3. Compute the median of the percetages and this is the percentage that the cluster needs to be capped at.
1. First fit scheduling -- Perform the above steps for each task that needs to be scheduled.
2. Ranked based scheduling -- Sort the tasks to be scheduled, in ascending order, and then determine the cluster wide cap.
This is not a scheduler but a scheduling scheme that schedulers can use.
*/
package schedulers
import (
"bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/def"
"container/list"
"errors"
"github.com/montanaflynn/stats"
"log"
"sort"
)
// Structure containing utility data structures used to compute cluster-wide dynamic cap.
type clusterwideCapper struct {
// window of tasks.
windowOfTasks list.List
// The current sum of requested powers of the tasks in the window.
currentSum float64
// The current number of tasks in the window.
numberOfTasksInWindow int
}
// Defining constructor for clusterwideCapper. Please don't call this directly and instead use getClusterwideCapperInstance().
func newClusterwideCapper() *clusterwideCapper {
return &clusterwideCapper{currentSum: 0.0, numberOfTasksInWindow: 0}
}
// Singleton instance of clusterwideCapper
var singletonCapper *clusterwideCapper
// Retrieve the singleton instance of clusterwideCapper.
func getClusterwideCapperInstance() *clusterwideCapper {
if singletonCapper == nil {
singletonCapper = newClusterwideCapper()
} else {
// Do nothing
}
return singletonCapper
}
// Clear and initialize all the members of clusterwideCapper.
func (capper clusterwideCapper) clear() {
capper.windowOfTasks.Init()
capper.currentSum = 0
capper.numberOfTasksInWindow = 0
}
// Compute the average of watts of all the tasks in the window.
func (capper clusterwideCapper) average() float64 {
return capper.currentSum / float64(capper.windowOfTasks.Len())
}
/*
Compute the running average.
Using clusterwideCapper#windowOfTasks to store the tasks.
Task at position 0 (oldest task) is removed when the window is full and new task arrives.
*/
func (capper clusterwideCapper) runningAverageOfWatts(tsk *def.Task) float64 {
var average float64
if capper.numberOfTasksInWindow < constants.WindowSize {
capper.windowOfTasks.PushBack(tsk)
capper.numberOfTasksInWindow++
capper.currentSum += float64(tsk.Watts) * constants.CapMargin
} else {
taskToRemoveElement := capper.windowOfTasks.Front()
if taskToRemove, ok := taskToRemoveElement.Value.(*def.Task); ok {
capper.currentSum -= float64(taskToRemove.Watts) * constants.CapMargin
capper.windowOfTasks.Remove(taskToRemoveElement)
}
capper.windowOfTasks.PushBack(tsk)
capper.currentSum += float64(tsk.Watts) * constants.CapMargin
}
average = capper.average()
return average
}
/*
Calculating cap value.
1. Sorting the values of runningAverageToTotalPowerPercentage in ascending order.
2. Computing the median of above sorted values.
3. The median is now the cap.
*/
func (capper clusterwideCapper) getCap(runningAverageToTotalPowerPercentage map[string]float64) float64 {
var values []float64
// Validation
if runningAverageToTotalPowerPercentage == nil {
return 100.0
}
for _, apower := range runningAverageToTotalPowerPercentage {
values = append(values, apower)
}
// sorting the values in ascending order.
sort.Float64s(values)
// Calculating the median
if median, err := stats.Median(values); err == nil {
return median
}
// should never reach here. If here, then just setting the cap value to be 100
return 100.0
}
/*
A recapping strategy which decides between 2 different recapping schemes.
1. the regular scheme based on the average power usage across the cluster.
2. A scheme based on the average of the loads on each node in the cluster.
The recap value picked the least among the two.
The cleverRecap scheme works well when the cluster is relatively idle and until then,
the primitive recapping scheme works better.
*/
func (capper clusterwideCapper) cleverRecap(totalPower map[string]float64,
taskMonitor map[string][]def.Task, finishedTaskId string) (float64, error) {
// Validation
if totalPower == nil || taskMonitor == nil {
return 100.0, errors.New("Invalid argument: totalPower, taskMonitor")
}
// determining the recap value by calling the regular recap(...)
toggle := false
recapValue, err := capper.recap(totalPower, taskMonitor, finishedTaskId)
if err == nil {
toggle = true
}
// watts usage on each node in the cluster.
wattsUsages := make(map[string][]float64)
hostOfFinishedTask := ""
indexOfFinishedTask := -1
for _, host := range constants.Hosts {
wattsUsages[host] = []float64{0.0}
}
for host, tasks := range taskMonitor {
for i, task := range tasks {
if task.TaskID == finishedTaskId {
hostOfFinishedTask = host
indexOfFinishedTask = i
// Not considering this task for the computation of totalAllocatedPower and totalRunningTasks
continue
}
wattsUsages[host] = append(wattsUsages[host], float64(task.Watts)*constants.CapMargin)
}
}
// Updating task monitor. If recap(...) has deleted the finished task from the taskMonitor,
// then this will be ignored. Else (this is only when an error occured with recap(...)), we remove it here.
if hostOfFinishedTask != "" && indexOfFinishedTask != -1 {
log.Printf("Removing task with task [%s] from the list of running tasks\n",
taskMonitor[hostOfFinishedTask][indexOfFinishedTask].TaskID)
taskMonitor[hostOfFinishedTask] = append(taskMonitor[hostOfFinishedTask][:indexOfFinishedTask],
taskMonitor[hostOfFinishedTask][indexOfFinishedTask+1:]...)
}
// Need to check whether there are still tasks running on the cluster. If not then we return an error.
clusterIdle := true
for _, tasks := range taskMonitor {
if len(tasks) > 0 {
clusterIdle = false
}
}
if !clusterIdle {
// load on each node in the cluster.
loads := []float64{0.0}
for host, usages := range wattsUsages {
totalUsage := 0.0
for _, usage := range usages {
totalUsage += usage
}
loads = append(loads, totalUsage/totalPower[host])
}
// Now need to compute the average load.
totalLoad := 0.0
for _, load := range loads {
totalLoad += load
}
averageLoad := (totalLoad / float64(len(loads)) * 100.0) // this would be the cap value.
// If toggle is true, then we need to return the least recap value.
if toggle {
if averageLoad <= recapValue {
return averageLoad, nil
} else {
return recapValue, nil
}
} else {
return averageLoad, nil
}
}
return 100.0, errors.New("No task running on the cluster.")
}
/*
Recapping the entire cluster.
1. Remove the task that finished from the list of running tasks.
2. Compute the average allocated power of each of the tasks that are currently running.
3. For each host, determine the ratio of the average to the total power.
4. Determine the median of the ratios and this would be the new cluster wide cap.
This needs to be called whenever a task finishes execution.
*/
func (capper clusterwideCapper) recap(totalPower map[string]float64,
taskMonitor map[string][]def.Task, finishedTaskId string) (float64, error) {
// Validation
if totalPower == nil || taskMonitor == nil {
return 100.0, errors.New("Invalid argument: totalPower, taskMonitor")
}
totalAllocatedPower := 0.0
totalRunningTasks := 0
hostOfFinishedTask := ""
indexOfFinishedTask := -1
for host, tasks := range taskMonitor {
for i, task := range tasks {
if task.TaskID == finishedTaskId {
hostOfFinishedTask = host
indexOfFinishedTask = i
// Not considering this task for the computation of totalAllocatedPower and totalRunningTasks
continue
}
totalAllocatedPower += (float64(task.Watts) * constants.CapMargin)
totalRunningTasks++
}
}
// Updating task monitor
if hostOfFinishedTask != "" && indexOfFinishedTask != -1 {
log.Printf("Removing task with task [%s] from the list of running tasks\n",
taskMonitor[hostOfFinishedTask][indexOfFinishedTask].TaskID)
taskMonitor[hostOfFinishedTask] = append(taskMonitor[hostOfFinishedTask][:indexOfFinishedTask],
taskMonitor[hostOfFinishedTask][indexOfFinishedTask+1:]...)
}
// For the last task, totalAllocatedPower and totalRunningTasks would be 0
if totalAllocatedPower == 0 && totalRunningTasks == 0 {
return 100, errors.New("No task running on the cluster.")
}
average := totalAllocatedPower / float64(totalRunningTasks)
ratios := []float64{}
for _, tpower := range totalPower {
ratios = append(ratios, (average/tpower)*100)
}
sort.Float64s(ratios)
median, err := stats.Median(ratios)
if err == nil {
return median, nil
} else {
return 100, err
}
}
/* Quick sort algorithm to sort tasks, in place, in ascending order of power.*/
func (capper clusterwideCapper) quickSort(low int, high int, tasksToSort *[]def.Task) {
i := low
j := high
// calculating the pivot
pivotIndex := low + (high-low)/2
pivot := (*tasksToSort)[pivotIndex]
for i <= j {
for (*tasksToSort)[i].Watts < pivot.Watts {
i++
}
for (*tasksToSort)[j].Watts > pivot.Watts {
j--
}
if i <= j {
temp := (*tasksToSort)[i]
(*tasksToSort)[i] = (*tasksToSort)[j]
(*tasksToSort)[j] = temp
i++
j--
}
}
if low < j {
capper.quickSort(low, j, tasksToSort)
}
if i < high {
capper.quickSort(i, high, tasksToSort)
}
}
// Sorting tasks in ascending order of requested watts.
func (capper clusterwideCapper) sortTasks(tasksToSort *[]def.Task) {
capper.quickSort(0, len(*tasksToSort)-1, tasksToSort)
}
/*
Remove entry for finished task.
This function is called when a task completes.
This completed task needs to be removed from the window of tasks (if it is still present)
so that it doesn't contribute to the computation of the cap value.
*/
func (capper clusterwideCapper) taskFinished(taskID string) {
// If the window is empty the just return. This condition should technically return false.
if capper.windowOfTasks.Len() == 0 {
return
}
// Checking whether the task with the given taskID is currently present in the window of tasks.
var taskElementToRemove *list.Element
for taskElement := capper.windowOfTasks.Front(); taskElement != nil; taskElement = taskElement.Next() {
if tsk, ok := taskElement.Value.(*def.Task); ok {
if tsk.TaskID == taskID {
taskElementToRemove = taskElement
}
}
}
// we need to remove the task from the window.
if taskToRemove, ok := taskElementToRemove.Value.(*def.Task); ok {
capper.windowOfTasks.Remove(taskElementToRemove)
capper.numberOfTasksInWindow -= 1
capper.currentSum -= float64(taskToRemove.Watts) * constants.CapMargin
}
}
// First come first serve scheduling.
func (capper clusterwideCapper) fcfsDetermineCap(totalPower map[string]float64,
newTask *def.Task) (float64, error) {
// Validation
if totalPower == nil {
return 100, errors.New("Invalid argument: totalPower")
} else {
// Need to calculate the running average
runningAverage := capper.runningAverageOfWatts(newTask)
// For each node, calculate the percentage of the running average to the total power.
runningAverageToTotalPowerPercentage := make(map[string]float64)
for host, tpower := range totalPower {
if tpower >= runningAverage {
runningAverageToTotalPowerPercentage[host] = (runningAverage / tpower) * 100
} else {
// We don't consider this host for the computation of the cluster wide cap.
}
}
// Determine the cluster wide cap value.
capValue := capper.getCap(runningAverageToTotalPowerPercentage)
// Need to cap the cluster to this value.
return capValue, nil
}
}
// Stringer for an instance of clusterwideCapper
func (capper clusterwideCapper) string() string {
return "Cluster Capper -- Proactively cap the entire cluster."
}

View file

@ -0,0 +1,409 @@
package schedulers
import (
"bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/rapl"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler"
"log"
"math"
"strings"
"sync"
"time"
)
// Decides if to take an offer or not
func (_ *ProactiveClusterwideCapFCFS) takeOffer(offer *mesos.Offer, task def.Task) bool {
offer_cpu, offer_mem, offer_watts := OfferAgg(offer)
if offer_cpu >= task.CPU && offer_mem >= task.RAM && offer_watts >= task.Watts {
return true
}
return false
}
// electronScheduler implements the Scheduler interface.
type ProactiveClusterwideCapFCFS struct {
tasksCreated int
tasksRunning int
tasks []def.Task
metrics map[string]def.Metric
running map[string]map[string]bool
taskMonitor map[string][]def.Task // store tasks that are currently running.
availablePower map[string]float64 // available power for each node in the cluster.
totalPower map[string]float64 // total power for each node in the cluster.
ignoreWatts bool
capper *clusterwideCapper
ticker *time.Ticker
recapTicker *time.Ticker
isCapping bool // indicate whether we are currently performing cluster wide capping.
isRecapping bool // indicate whether we are currently performing cluster wide re-capping.
// First set of PCP values are garbage values, signal to logger to start recording when we're
// about to schedule the new task.
RecordPCP bool
// This channel is closed when the program receives an interrupt,
// signalling that the program should shut down.
Shutdown chan struct{}
// This channel is closed after shutdown is closed, and only when all
// outstanding tasks have been cleaned up.
Done chan struct{}
// Controls when to shutdown pcp logging.
PCPLog chan struct{}
}
// New electron scheduler.
func NewProactiveClusterwideCapFCFS(tasks []def.Task, ignoreWatts bool) *ProactiveClusterwideCapFCFS {
s := &ProactiveClusterwideCapFCFS{
tasks: tasks,
ignoreWatts: ignoreWatts,
Shutdown: make(chan struct{}),
Done: make(chan struct{}),
PCPLog: make(chan struct{}),
running: make(map[string]map[string]bool),
taskMonitor: make(map[string][]def.Task),
availablePower: make(map[string]float64),
totalPower: make(map[string]float64),
RecordPCP: false,
capper: getClusterwideCapperInstance(),
ticker: time.NewTicker(10 * time.Second),
recapTicker: time.NewTicker(20 * time.Second),
isCapping: false,
isRecapping: false,
}
return s
}
// mutex
var fcfsMutex sync.Mutex
func (s *ProactiveClusterwideCapFCFS) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
s.tasksCreated++
if !s.RecordPCP {
// Turn on logging.
s.RecordPCP = true
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
}
// If this is our first time running into this Agent
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
}
// Setting the task ID to the task. This is done so that we can consider each task to be different,
// even though they have the same parameters.
task.SetTaskID(*proto.String("electron-" + taskName))
// Add task to the list of tasks running on the node.
s.running[offer.GetSlaveId().GoString()][taskName] = true
if len(s.taskMonitor[*offer.Hostname]) == 0 {
s.taskMonitor[*offer.Hostname] = []def.Task{task}
} else {
s.taskMonitor[*offer.Hostname] = append(s.taskMonitor[*offer.Hostname], task)
}
resources := []*mesos.Resource{
mesosutil.NewScalarResource("cpus", task.CPU),
mesosutil.NewScalarResource("mem", task.RAM),
}
if !s.ignoreWatts {
resources = append(resources, mesosutil.NewScalarResource("watts", task.Watts))
}
return &mesos.TaskInfo{
Name: proto.String(taskName),
TaskId: &mesos.TaskID{
Value: proto.String("electron-" + taskName),
},
SlaveId: offer.SlaveId,
Resources: resources,
Command: &mesos.CommandInfo{
Value: proto.String(task.CMD),
},
Container: &mesos.ContainerInfo{
Type: mesos.ContainerInfo_DOCKER.Enum(),
Docker: &mesos.ContainerInfo_DockerInfo{
Image: proto.String(task.Image),
Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated
},
},
}
}
func (s *ProactiveClusterwideCapFCFS) Registered(
_ sched.SchedulerDriver,
frameworkID *mesos.FrameworkID,
masterInfo *mesos.MasterInfo) {
log.Printf("Framework %s registered with master %s", frameworkID, masterInfo)
}
func (s *ProactiveClusterwideCapFCFS) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) {
log.Printf("Framework re-registered with master %s", masterInfo)
}
func (s *ProactiveClusterwideCapFCFS) Disconnected(sched.SchedulerDriver) {
// Need to stop the capping process.
s.ticker.Stop()
s.recapTicker.Stop()
fcfsMutex.Lock()
s.isCapping = false
fcfsMutex.Unlock()
log.Println("Framework disconnected with master")
}
// go routine to cap the entire cluster in regular intervals of time.
var fcfsCurrentCapValue = 0.0 // initial value to indicate that we haven't capped the cluster yet.
func (s *ProactiveClusterwideCapFCFS) startCapping() {
go func() {
for {
select {
case <-s.ticker.C:
// Need to cap the cluster to the fcfsCurrentCapValue.
fcfsMutex.Lock()
if fcfsCurrentCapValue > 0.0 {
for _, host := range constants.Hosts {
// Rounding curreCapValue to the nearest int.
if err := rapl.Cap(host, "rapl", int(math.Floor(fcfsCurrentCapValue+0.5))); err != nil {
log.Println(err)
}
}
log.Printf("Capped the cluster to %d", int(math.Floor(fcfsCurrentCapValue+0.5)))
}
fcfsMutex.Unlock()
}
}
}()
}
// go routine to cap the entire cluster in regular intervals of time.
var fcfsRecapValue = 0.0 // The cluster wide cap value when recapping.
func (s *ProactiveClusterwideCapFCFS) startRecapping() {
go func() {
for {
select {
case <-s.recapTicker.C:
fcfsMutex.Lock()
// If stopped performing cluster wide capping then we need to explicitly cap the entire cluster.
if s.isRecapping && fcfsRecapValue > 0.0 {
for _, host := range constants.Hosts {
// Rounding curreCapValue to the nearest int.
if err := rapl.Cap(host, "rapl", int(math.Floor(fcfsRecapValue+0.5))); err != nil {
log.Println(err)
}
}
log.Printf("Recapped the cluster to %d", int(math.Floor(fcfsRecapValue+0.5)))
}
// setting recapping to false
s.isRecapping = false
fcfsMutex.Unlock()
}
}
}()
}
// Stop cluster wide capping
func (s *ProactiveClusterwideCapFCFS) stopCapping() {
if s.isCapping {
log.Println("Stopping the cluster wide capping.")
s.ticker.Stop()
fcfsMutex.Lock()
s.isCapping = false
s.isRecapping = true
fcfsMutex.Unlock()
}
}
// Stop cluster wide Recapping
func (s *ProactiveClusterwideCapFCFS) stopRecapping() {
// If not capping, then definitely recapping.
if !s.isCapping && s.isRecapping {
log.Println("Stopping the cluster wide re-capping.")
s.recapTicker.Stop()
fcfsMutex.Lock()
s.isRecapping = false
fcfsMutex.Unlock()
}
}
func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {
log.Printf("Received %d resource offers", len(offers))
// retrieving the available power for all the hosts in the offers.
for _, offer := range offers {
_, _, offer_watts := OfferAgg(offer)
s.availablePower[*offer.Hostname] = offer_watts
// setting total power if the first time.
if _, ok := s.totalPower[*offer.Hostname]; !ok {
s.totalPower[*offer.Hostname] = offer_watts
}
}
for host, tpower := range s.totalPower {
log.Printf("TotalPower[%s] = %f", host, tpower)
}
for _, offer := range offers {
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, longFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
default:
}
/*
Clusterwide Capping strategy
For each task in s.tasks,
1. Need to check whether the offer can be taken or not (based on CPU and RAM requirements).
2. If the tasks fits the offer, then I need to detemrine the cluster wide cap.
3. fcfsCurrentCapValue is updated with the determined cluster wide cap.
Cluster wide capping is currently performed at regular intervals of time.
*/
taken := false
for i, task := range s.tasks {
// Don't take offer if it doesn't match our task's host requirement.
if !strings.HasPrefix(*offer.Hostname, task.Host) {
continue
}
// Does the task fit.
if s.takeOffer(offer, task) {
// Capping the cluster if haven't yet started,
if !s.isCapping {
fcfsMutex.Lock()
s.isCapping = true
fcfsMutex.Unlock()
s.startCapping()
}
taken = true
tempCap, err := s.capper.fcfsDetermineCap(s.totalPower, &task)
if err == nil {
fcfsMutex.Lock()
fcfsCurrentCapValue = tempCap
fcfsMutex.Unlock()
} else {
log.Printf("Failed to determine new cluster wide cap: ")
log.Println(err)
}
log.Printf("Starting on [%s]\n", offer.GetHostname())
toSchedule := []*mesos.TaskInfo{s.newTask(offer, task)}
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, toSchedule, defaultFilter)
log.Printf("Inst: %d", *task.Instances)
*task.Instances--
if *task.Instances <= 0 {
// All instances of the task have been scheduled. Need to remove it from the list of tasks to schedule.
s.tasks[i] = s.tasks[len(s.tasks)-1]
s.tasks = s.tasks[:len(s.tasks)-1]
if len(s.tasks) <= 0 {
log.Println("Done scheduling all tasks")
// Need to stop the cluster wide capping as there aren't any more tasks to schedule.
s.stopCapping()
s.startRecapping() // Load changes after every task finishes and hence we need to change the capping of the cluster.
close(s.Shutdown)
}
}
break // Offer taken, move on.
} else {
// Task doesn't fit the offer. Move onto the next offer.
}
}
// If no task fit the offer, then declining the offer.
if !taken {
log.Printf("There is not enough resources to launch a task on Host: %s\n", offer.GetHostname())
cpus, mem, watts := OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, defaultFilter)
}
}
}
func (s *ProactiveClusterwideCapFCFS) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) {
log.Printf("Received task status [%s] for task [%s]\n", NameFor(status.State), *status.TaskId.Value)
if *status.State == mesos.TaskState_TASK_RUNNING {
fcfsMutex.Lock()
s.tasksRunning++
fcfsMutex.Unlock()
} else if IsTerminal(status.State) {
delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value)
// Need to remove the task from the window of tasks.
s.capper.taskFinished(*status.TaskId.Value)
// Determining the new cluster wide cap.
//tempCap, err := s.capper.recap(s.totalPower, s.taskMonitor, *status.TaskId.Value)
tempCap, err := s.capper.cleverRecap(s.totalPower, s.taskMonitor, *status.TaskId.Value)
if err == nil {
// if new determined cap value is different from the current recap value then we need to recap.
if int(math.Floor(tempCap+0.5)) != int(math.Floor(fcfsRecapValue+0.5)) {
fcfsRecapValue = tempCap
fcfsMutex.Lock()
s.isRecapping = true
fcfsMutex.Unlock()
log.Printf("Determined re-cap value: %f\n", fcfsRecapValue)
} else {
fcfsMutex.Lock()
s.isRecapping = false
fcfsMutex.Unlock()
}
} else {
// Not updating fcfsCurrentCapValue
log.Println(err)
}
fcfsMutex.Lock()
s.tasksRunning--
fcfsMutex.Unlock()
if s.tasksRunning == 0 {
select {
case <-s.Shutdown:
// Need to stop the recapping process.
s.stopRecapping()
close(s.Done)
default:
}
}
}
log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
}
func (s *ProactiveClusterwideCapFCFS) FrameworkMessage(driver sched.SchedulerDriver,
executorID *mesos.ExecutorID,
slaveID *mesos.SlaveID,
message string) {
log.Println("Getting a framework message: ", message)
log.Printf("Received a framework message from some unknown source: %s", *executorID.Value)
}
func (s *ProactiveClusterwideCapFCFS) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) {
log.Printf("Offer %s rescinded", offerID)
}
func (s *ProactiveClusterwideCapFCFS) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) {
log.Printf("Slave %s lost", slaveID)
}
func (s *ProactiveClusterwideCapFCFS) ExecutorLost(_ sched.SchedulerDriver, executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, status int) {
log.Printf("Executor %s on slave %s was lost", executorID, slaveID)
}
func (s *ProactiveClusterwideCapFCFS) Error(_ sched.SchedulerDriver, err string) {
log.Printf("Receiving an error: %s", err)
}

View file

@ -0,0 +1,432 @@
/*
Ranked based cluster wide capping.
Note: Sorting the tasks right in the beginning, in ascending order of watts.
You are hence certain that the tasks that didn't fit are the ones that require more resources,
and hence, you can find a way to address that issue.
On the other hand, if you use first fit to fit the tasks and then sort them to determine the cap,
you are never certain as which tasks are the ones that don't fit and hence, it becomes much harder
to address this issue.
*/
package schedulers
import (
"bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/rapl"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler"
"log"
"math"
"strings"
"sync"
"time"
)
// Decides if to taken an offer or not
func (_ *ProactiveClusterwideCapRanked) takeOffer(offer *mesos.Offer, task def.Task) bool {
offer_cpu, offer_mem, offer_watts := OfferAgg(offer)
if offer_cpu >= task.CPU && offer_mem >= task.RAM && offer_watts >= task.Watts {
return true
}
return false
}
// electronScheduler implements the Scheduler interface
type ProactiveClusterwideCapRanked struct {
tasksCreated int
tasksRunning int
tasks []def.Task
metrics map[string]def.Metric
running map[string]map[string]bool
taskMonitor map[string][]def.Task // store tasks that are currently running.
availablePower map[string]float64 // available power for each node in the cluster.
totalPower map[string]float64 // total power for each node in the cluster.
ignoreWatts bool
capper *clusterwideCapper
ticker *time.Ticker
recapTicker *time.Ticker
isCapping bool // indicate whether we are currently performing cluster wide capping.
isRecapping bool // indicate whether we are currently performing cluster wide re-capping.
// First set of PCP values are garbage values, signal to logger to start recording when we're
// about to schedule the new task.
RecordPCP bool
// This channel is closed when the program receives an interrupt,
// signalling that the program should shut down.
Shutdown chan struct{}
// This channel is closed after shutdown is closed, and only when all
// outstanding tasks have been cleaned up.
Done chan struct{}
// Controls when to shutdown pcp logging.
PCPLog chan struct{}
}
// New electron scheduler.
func NewProactiveClusterwideCapRanked(tasks []def.Task, ignoreWatts bool) *ProactiveClusterwideCapRanked {
s := &ProactiveClusterwideCapRanked{
tasks: tasks,
ignoreWatts: ignoreWatts,
Shutdown: make(chan struct{}),
Done: make(chan struct{}),
PCPLog: make(chan struct{}),
running: make(map[string]map[string]bool),
taskMonitor: make(map[string][]def.Task),
availablePower: make(map[string]float64),
totalPower: make(map[string]float64),
RecordPCP: false,
capper: getClusterwideCapperInstance(),
ticker: time.NewTicker(10 * time.Second),
recapTicker: time.NewTicker(20 * time.Second),
isCapping: false,
isRecapping: false,
}
return s
}
// mutex
var rankedMutex sync.Mutex
func (s *ProactiveClusterwideCapRanked) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
s.tasksCreated++
if !s.RecordPCP {
// Turn on logging.
s.RecordPCP = true
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
}
// If this is our first time running into this Agent
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
}
// Setting the task ID to the task. This is done so that we can consider each task to be different,
// even though they have the same parameters.
task.SetTaskID(*proto.String("electron-" + taskName))
// Add task to the list of tasks running on the node.
s.running[offer.GetSlaveId().GoString()][taskName] = true
if len(s.taskMonitor[*offer.Hostname]) == 0 {
s.taskMonitor[*offer.Hostname] = []def.Task{task}
} else {
s.taskMonitor[*offer.Hostname] = append(s.taskMonitor[*offer.Hostname], task)
}
resources := []*mesos.Resource{
mesosutil.NewScalarResource("cpus", task.CPU),
mesosutil.NewScalarResource("mem", task.RAM),
}
if !s.ignoreWatts {
resources = append(resources, mesosutil.NewScalarResource("watts", task.Watts))
}
return &mesos.TaskInfo{
Name: proto.String(taskName),
TaskId: &mesos.TaskID{
Value: proto.String("electron-" + taskName),
},
SlaveId: offer.SlaveId,
Resources: resources,
Command: &mesos.CommandInfo{
Value: proto.String(task.CMD),
},
Container: &mesos.ContainerInfo{
Type: mesos.ContainerInfo_DOCKER.Enum(),
Docker: &mesos.ContainerInfo_DockerInfo{
Image: proto.String(task.Image),
Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated
},
},
}
}
func (s *ProactiveClusterwideCapRanked) Registered(
_ sched.SchedulerDriver,
frameworkID *mesos.FrameworkID,
masterInfo *mesos.MasterInfo) {
log.Printf("Framework %s registered with master %s", frameworkID, masterInfo)
}
func (s *ProactiveClusterwideCapRanked) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) {
log.Printf("Framework re-registered with master %s", masterInfo)
}
func (s *ProactiveClusterwideCapRanked) Disconnected(sched.SchedulerDriver) {
// Need to stop the capping process.
s.ticker.Stop()
s.recapTicker.Stop()
rankedMutex.Lock()
s.isCapping = false
rankedMutex.Unlock()
log.Println("Framework disconnected with master")
}
// go routine to cap the entire cluster in regular intervals of time.
var rankedCurrentCapValue = 0.0 // initial value to indicate that we haven't capped the cluster yet.
func (s *ProactiveClusterwideCapRanked) startCapping() {
go func() {
for {
select {
case <-s.ticker.C:
// Need to cap the cluster to the rankedCurrentCapValue.
rankedMutex.Lock()
if rankedCurrentCapValue > 0.0 {
for _, host := range constants.Hosts {
// Rounding curreCapValue to the nearest int.
if err := rapl.Cap(host, "rapl", int(math.Floor(rankedCurrentCapValue+0.5))); err != nil {
log.Println(err)
}
}
log.Printf("Capped the cluster to %d", int(math.Floor(rankedCurrentCapValue+0.5)))
}
rankedMutex.Unlock()
}
}
}()
}
// go routine to cap the entire cluster in regular intervals of time.
var rankedRecapValue = 0.0 // The cluster wide cap value when recapping.
func (s *ProactiveClusterwideCapRanked) startRecapping() {
go func() {
for {
select {
case <-s.recapTicker.C:
rankedMutex.Lock()
// If stopped performing cluster wide capping then we need to explicitly cap the entire cluster.
if s.isRecapping && rankedRecapValue > 0.0 {
for _, host := range constants.Hosts {
// Rounding curreCapValue to the nearest int.
if err := rapl.Cap(host, "rapl", int(math.Floor(rankedRecapValue+0.5))); err != nil {
log.Println(err)
}
}
log.Printf("Recapped the cluster to %d", int(math.Floor(rankedRecapValue+0.5)))
}
// setting recapping to false
s.isRecapping = false
rankedMutex.Unlock()
}
}
}()
}
// Stop cluster wide capping
func (s *ProactiveClusterwideCapRanked) stopCapping() {
if s.isCapping {
log.Println("Stopping the cluster wide capping.")
s.ticker.Stop()
fcfsMutex.Lock()
s.isCapping = false
s.isRecapping = true
fcfsMutex.Unlock()
}
}
// Stop cluster wide Recapping
func (s *ProactiveClusterwideCapRanked) stopRecapping() {
// If not capping, then definitely recapping.
if !s.isCapping && s.isRecapping {
log.Println("Stopping the cluster wide re-capping.")
s.recapTicker.Stop()
fcfsMutex.Lock()
s.isRecapping = false
fcfsMutex.Unlock()
}
}
func (s *ProactiveClusterwideCapRanked) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {
log.Printf("Received %d resource offers", len(offers))
// retrieving the available power for all the hosts in the offers.
for _, offer := range offers {
_, _, offer_watts := OfferAgg(offer)
s.availablePower[*offer.Hostname] = offer_watts
// setting total power if the first time.
if _, ok := s.totalPower[*offer.Hostname]; !ok {
s.totalPower[*offer.Hostname] = offer_watts
}
}
for host, tpower := range s.totalPower {
log.Printf("TotalPower[%s] = %f", host, tpower)
}
// sorting the tasks in ascending order of watts.
if (len(s.tasks) > 0) {
s.capper.sortTasks(&s.tasks)
// calculating the total number of tasks ranked.
numberOfRankedTasks := 0
for _, task := range s.tasks {
numberOfRankedTasks += *task.Instances
}
log.Printf("Ranked %d tasks in ascending order of tasks.", numberOfRankedTasks)
}
for _, offer := range offers {
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, longFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
default:
}
/*
Ranked cluster wide capping strategy
For each task in the sorted tasks,
1. Need to check whether the offer can be taken or not (based on CPU, RAM and WATTS requirements).
2. If the task fits the offer, then need to determine the cluster wide cap.'
3. rankedCurrentCapValue is updated with the determined cluster wide cap.
Once we are done scheduling all the tasks,
we start recalculating the cluster wide cap each time a task finishes.
Cluster wide capping is currently performed at regular intervals of time.
*/
taken := false
for i, task := range s.tasks {
// Don't take offer if it doesn't match our task's host requirement.
if !strings.HasPrefix(*offer.Hostname, task.Host) {
continue
}
// Does the task fit.
if s.takeOffer(offer, task) {
// Capping the cluster if haven't yet started
if !s.isCapping {
rankedMutex.Lock()
s.isCapping = true
rankedMutex.Unlock()
s.startCapping()
}
taken = true
tempCap, err := s.capper.fcfsDetermineCap(s.totalPower, &task)
if err == nil {
rankedMutex.Lock()
rankedCurrentCapValue = tempCap
rankedMutex.Unlock()
} else {
log.Println("Failed to determine the new cluster wide cap: ", err)
}
log.Printf("Starting on [%s]\n", offer.GetHostname())
to_schedule := []*mesos.TaskInfo{s.newTask(offer, task)}
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, to_schedule, defaultFilter)
log.Printf("Inst: %d", *task.Instances)
*task.Instances--
if *task.Instances <= 0 {
// All instances of the task have been scheduled. Need to remove it from the list of tasks to schedule.
s.tasks[i] = s.tasks[len(s.tasks)-1]
s.tasks = s.tasks[:len(s.tasks)-1]
if len(s.tasks) <= 0 {
log.Println("Done scheduling all tasks")
// Need to stop the cluster wide capping as there aren't any more tasks to schedule.
s.stopCapping()
s.startRecapping()
close(s.Shutdown)
}
}
break // Offer taken, move on.
} else {
// Task doesn't fit the offer. Move onto the next offer.
}
}
// If no tasks fit the offer, then declining the offer.
if !taken {
log.Printf("There is not enough resources to launch a task on Host: %s\n", offer.GetHostname())
cpus, mem, watts := OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, defaultFilter)
}
}
}
func (s *ProactiveClusterwideCapRanked) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) {
log.Printf("Received task status [%s] for task [%s]\n", NameFor(status.State), *status.TaskId.Value)
if *status.State == mesos.TaskState_TASK_RUNNING {
rankedMutex.Lock()
s.tasksRunning++
rankedMutex.Unlock()
} else if IsTerminal(status.State) {
delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value)
rankedMutex.Lock()
s.tasksRunning--
rankedMutex.Unlock()
if s.tasksRunning == 0 {
select {
case <-s.Shutdown:
// Need to stop the recapping process.
s.stopRecapping()
close(s.Done)
default:
}
} else {
// Need to remove the task from the window
s.capper.taskFinished(*status.TaskId.Value)
// Determining the new cluster wide cap.
//tempCap, err := s.capper.recap(s.totalPower, s.taskMonitor, *status.TaskId.Value)
tempCap, err := s.capper.cleverRecap(s.totalPower, s.taskMonitor, *status.TaskId.Value)
if err == nil {
// If new determined cap value is different from the current recap value then we need to recap.
if int(math.Floor(tempCap+0.5)) != int(math.Floor(rankedRecapValue+0.5)) {
rankedRecapValue = tempCap
rankedMutex.Lock()
s.isRecapping = true
rankedMutex.Unlock()
log.Printf("Determined re-cap value: %f\n", rankedRecapValue)
} else {
rankedMutex.Lock()
s.isRecapping = false
rankedMutex.Unlock()
}
} else {
// Not updating rankedCurrentCapValue
log.Println(err)
}
}
}
log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
}
func (s *ProactiveClusterwideCapRanked) FrameworkMessage(driver sched.SchedulerDriver,
executorID *mesos.ExecutorID,
slaveID *mesos.SlaveID,
message string) {
log.Println("Getting a framework message: ", message)
log.Printf("Received a framework message from some unknown source: %s", *executorID.Value)
}
func (s *ProactiveClusterwideCapRanked) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) {
log.Printf("Offer %s rescinded", offerID)
}
func (s *ProactiveClusterwideCapRanked) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) {
log.Printf("Slave %s lost", slaveID)
}
func (s *ProactiveClusterwideCapRanked) ExecutorLost(_ sched.SchedulerDriver, executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, status int) {
log.Printf("Executor %s on slave %s was lost", executorID, slaveID)
}
func (s *ProactiveClusterwideCapRanked) Error(_ sched.SchedulerDriver, err string) {
log.Printf("Receiving an error: %s", err)
}

54
utilities/utils.go Normal file
View file

@ -0,0 +1,54 @@
package utilities
import "errors"
/*
The Pair and PairList have been taken from google groups forum,
https://groups.google.com/forum/#!topic/golang-nuts/FT7cjmcL7gw
*/
// Utility struct that helps in sorting the available power by value.
type Pair struct {
Key string
Value float64
}
// A slice of pairs that implements the sort.Interface to sort by value.
type PairList []Pair
// Swap pairs in the PairList
func (plist PairList) Swap(i, j int) {
plist[i], plist[j] = plist[j], plist[i]
}
// function to return the length of the pairlist.
func (plist PairList) Len() int {
return len(plist)
}
// function to compare two elements in pairlist.
func (plist PairList) Less(i, j int) bool {
return plist[i].Value < plist[j].Value
}
// convert a PairList to a map[string]float64
func OrderedKeys(plist PairList) ([]string, error) {
// Validation
if plist == nil {
return nil, errors.New("Invalid argument: plist")
}
orderedKeys := make([]string, len(plist))
for _, pair := range plist {
orderedKeys = append(orderedKeys, pair.Key)
}
return orderedKeys, nil
}
// determine the max value
func Max(a, b float64) float64 {
if a > b {
return a
} else {
return b
}
}