Merged in pkDev (pull request #2)

New scheduler -- PistonCapper. Generic running average calculator.
This commit is contained in:
Renan DelValle 2016-12-22 15:00:31 -05:00
commit 99dfb20efb
9 changed files with 596 additions and 159 deletions

View file

@ -8,6 +8,10 @@ To Do:
* Add ability to use constraints
* Running average calculations https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
* Make parameters corresponding to each scheduler configurable (possible to have a config template for each scheduler?)
* Write test code for each scheduler (This should be after the design change)
* Some of the constants in constants/constants.go can vary based on the environment.
Possible to setup the constants at runtime based on the environment?
**Requires Performance-Copilot tool pmdumptext to be installed on the
machine on which electron is launched for logging to work**

View file

@ -1,46 +1,45 @@
/*
Cluster wide dynamic capping
Step1. Compute the running average of watts of tasks in window.
Step2. Compute what percentage of total power of each node, is the running average.
Step3. Compute the median of the percetages and this is the percentage that the cluster needs to be capped at.
1. First fit scheduling -- Perform the above steps for each task that needs to be scheduled.
2. Ranked based scheduling -- Sort the tasks to be scheduled, in ascending order, and then determine the cluster wide cap.
This is not a scheduler but a scheduling scheme that schedulers can use.
this is not a scheduler but a scheduling scheme that schedulers can use.
*/
package schedulers
package pcp
import (
"bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/def"
"container/list"
"bitbucket.org/sunybingcloud/electron/utilities/runAvg"
"errors"
"github.com/montanaflynn/stats"
"log"
"sort"
)
// Structure containing utility data structures used to compute cluster-wide dynamic cap.
type clusterwideCapper struct {
// window of tasks.
windowOfTasks list.List
// The current sum of requested powers of the tasks in the window.
currentSum float64
// The current number of tasks in the window.
numberOfTasksInWindow int
// wrapper around def.Task that implements runAvg.Interface
type taskWrapper struct {
task def.Task
}
// Defining constructor for clusterwideCapper. Please don't call this directly and instead use getClusterwideCapperInstance().
func newClusterwideCapper() *clusterwideCapper {
return &clusterwideCapper{currentSum: 0.0, numberOfTasksInWindow: 0}
func (tw taskWrapper) Val() float64 {
return tw.task.Watts * constants.CapMargin
}
func (tw taskWrapper) ID() string {
return tw.task.TaskID
}
// Cluster wide capper
type ClusterwideCapper struct {}
// Defining constructor for clusterwideCapper. Please don't call this directly and instead use GetClusterwideCapperInstance()
func newClusterwideCapper() *ClusterwideCapper {
return &ClusterwideCapper{}
}
// Singleton instance of clusterwideCapper
var singletonCapper *clusterwideCapper
var singletonCapper *ClusterwideCapper
// Retrieve the singleton instance of clusterwideCapper.
func getClusterwideCapperInstance() *clusterwideCapper {
func GetClusterwideCapperInstance() *ClusterwideCapper {
if singletonCapper == nil {
singletonCapper = newClusterwideCapper()
} else {
@ -49,57 +48,25 @@ func getClusterwideCapperInstance() *clusterwideCapper {
return singletonCapper
}
// Clear and initialize all the members of clusterwideCapper.
func (capper clusterwideCapper) clear() {
capper.windowOfTasks.Init()
capper.currentSum = 0
capper.numberOfTasksInWindow = 0
}
// Compute the average of watts of all the tasks in the window.
func (capper clusterwideCapper) average() float64 {
return capper.currentSum / float64(capper.windowOfTasks.Len())
}
/*
Compute the running average.
Using clusterwideCapper#windowOfTasks to store the tasks.
Task at position 0 (oldest task) is removed when the window is full and new task arrives.
*/
func (capper clusterwideCapper) runningAverageOfWatts(tsk *def.Task) float64 {
var average float64
if capper.numberOfTasksInWindow < constants.WindowSize {
capper.windowOfTasks.PushBack(tsk)
capper.numberOfTasksInWindow++
capper.currentSum += float64(tsk.Watts) * constants.CapMargin
} else {
taskToRemoveElement := capper.windowOfTasks.Front()
if taskToRemove, ok := taskToRemoveElement.Value.(*def.Task); ok {
capper.currentSum -= float64(taskToRemove.Watts) * constants.CapMargin
capper.windowOfTasks.Remove(taskToRemoveElement)
}
capper.windowOfTasks.PushBack(tsk)
capper.currentSum += float64(tsk.Watts) * constants.CapMargin
}
average = capper.average()
return average
// Clear and initialize the runAvg calculator
func (capper ClusterwideCapper) clear() {
runAvg.Init()
}
/*
Calculating cap value.
1. Sorting the values of runningAverageToTotalPowerPercentage in ascending order.
1. Sorting the values of ratios ((running average/totalPower) per node) in ascending order.
2. Computing the median of above sorted values.
3. The median is now the cap.
*/
func (capper clusterwideCapper) getCap(runningAverageToTotalPowerPercentage map[string]float64) float64 {
func (capper ClusterwideCapper) getCap(ratios map[string]float64) float64 {
var values []float64
// Validation
if runningAverageToTotalPowerPercentage == nil {
if ratios == nil {
return 100.0
}
for _, apower := range runningAverageToTotalPowerPercentage {
for _, apower := range ratios {
values = append(values, apower)
}
// sorting the values in ascending order.
@ -113,25 +80,25 @@ func (capper clusterwideCapper) getCap(runningAverageToTotalPowerPercentage map[
}
/*
A recapping strategy which decides between 2 different recapping schemes.
A Recapping strategy which decides between 2 different Recapping schemes.
1. the regular scheme based on the average power usage across the cluster.
2. A scheme based on the average of the loads on each node in the cluster.
The recap value picked the least among the two.
The Recap value picked the least among the two.
The cleverRecap scheme works well when the cluster is relatively idle and until then,
the primitive recapping scheme works better.
The CleverRecap scheme works well when the cluster is relatively idle and until then,
the primitive Recapping scheme works better.
*/
func (capper clusterwideCapper) cleverRecap(totalPower map[string]float64,
func (capper ClusterwideCapper) CleverRecap(totalPower map[string]float64,
taskMonitor map[string][]def.Task, finishedTaskId string) (float64, error) {
// Validation
if totalPower == nil || taskMonitor == nil {
return 100.0, errors.New("Invalid argument: totalPower, taskMonitor")
}
// determining the recap value by calling the regular recap(...)
// determining the Recap value by calling the regular Recap(...)
toggle := false
recapValue, err := capper.recap(totalPower, taskMonitor, finishedTaskId)
RecapValue, err := capper.Recap(totalPower, taskMonitor, finishedTaskId)
if err == nil {
toggle = true
}
@ -155,8 +122,8 @@ func (capper clusterwideCapper) cleverRecap(totalPower map[string]float64,
}
}
// Updating task monitor. If recap(...) has deleted the finished task from the taskMonitor,
// then this will be ignored. Else (this is only when an error occured with recap(...)), we remove it here.
// Updating task monitor. If Recap(...) has deleted the finished task from the taskMonitor,
// then this will be ignored. Else (this is only when an error occured with Recap(...)), we remove it here.
if hostOfFinishedTask != "" && indexOfFinishedTask != -1 {
log.Printf("Removing task with task [%s] from the list of running tasks\n",
taskMonitor[hostOfFinishedTask][indexOfFinishedTask].TaskID)
@ -189,12 +156,12 @@ func (capper clusterwideCapper) cleverRecap(totalPower map[string]float64,
totalLoad += load
}
averageLoad := (totalLoad / float64(len(loads)) * 100.0) // this would be the cap value.
// If toggle is true, then we need to return the least recap value.
// If toggle is true, then we need to return the least Recap value.
if toggle {
if averageLoad <= recapValue {
if averageLoad <= RecapValue {
return averageLoad, nil
} else {
return recapValue, nil
return RecapValue, nil
}
} else {
return averageLoad, nil
@ -213,7 +180,7 @@ Recapping the entire cluster.
This needs to be called whenever a task finishes execution.
*/
func (capper clusterwideCapper) recap(totalPower map[string]float64,
func (capper ClusterwideCapper) Recap(totalPower map[string]float64,
taskMonitor map[string][]def.Task, finishedTaskId string) (float64, error) {
// Validation
if totalPower == nil || taskMonitor == nil {
@ -264,98 +231,44 @@ func (capper clusterwideCapper) recap(totalPower map[string]float64,
}
}
/* Quick sort algorithm to sort tasks, in place, in ascending order of power.*/
func (capper clusterwideCapper) quickSort(low int, high int, tasksToSort *[]def.Task) {
i := low
j := high
// calculating the pivot
pivotIndex := low + (high-low)/2
pivot := (*tasksToSort)[pivotIndex]
for i <= j {
for (*tasksToSort)[i].Watts < pivot.Watts {
i++
}
for (*tasksToSort)[j].Watts > pivot.Watts {
j--
}
if i <= j {
temp := (*tasksToSort)[i]
(*tasksToSort)[i] = (*tasksToSort)[j]
(*tasksToSort)[j] = temp
i++
j--
}
}
if low < j {
capper.quickSort(low, j, tasksToSort)
}
if i < high {
capper.quickSort(i, high, tasksToSort)
}
}
// Sorting tasks in ascending order of requested watts.
func (capper clusterwideCapper) sortTasks(tasksToSort *[]def.Task) {
capper.quickSort(0, len(*tasksToSort)-1, tasksToSort)
}
/*
Remove entry for finished task.
This function is called when a task completes.
Remove entry for finished task from the window
This function is called when a task completes.
This completed task needs to be removed from the window of tasks (if it is still present)
so that it doesn't contribute to the computation of the cap value.
so that it doesn't contribute to the computation of the next cap value.
*/
func (capper clusterwideCapper) taskFinished(taskID string) {
// If the window is empty the just return. This condition should technically return false.
if capper.windowOfTasks.Len() == 0 {
return
}
// Checking whether the task with the given taskID is currently present in the window of tasks.
var taskElementToRemove *list.Element
for taskElement := capper.windowOfTasks.Front(); taskElement != nil; taskElement = taskElement.Next() {
if tsk, ok := taskElement.Value.(*def.Task); ok {
if tsk.TaskID == taskID {
taskElementToRemove = taskElement
}
}
}
// we need to remove the task from the window.
if taskToRemove, ok := taskElementToRemove.Value.(*def.Task); ok {
capper.windowOfTasks.Remove(taskElementToRemove)
capper.numberOfTasksInWindow -= 1
capper.currentSum -= float64(taskToRemove.Watts) * constants.CapMargin
}
func (capper ClusterwideCapper) TaskFinished(taskID string) {
runAvg.Remove(taskID)
}
// First come first serve scheduling.
func (capper clusterwideCapper) fcfsDetermineCap(totalPower map[string]float64,
func (capper ClusterwideCapper) FCFSDeterminedCap(totalPower map[string]float64,
newTask *def.Task) (float64, error) {
// Validation
if totalPower == nil {
return 100, errors.New("Invalid argument: totalPower")
} else {
// Need to calculate the running average
runningAverage := capper.runningAverageOfWatts(newTask)
runningAverage := runAvg.Calc(taskWrapper{task: *newTask}, constants.WindowSize)
// For each node, calculate the percentage of the running average to the total power.
runningAverageToTotalPowerPercentage := make(map[string]float64)
ratios := make(map[string]float64)
for host, tpower := range totalPower {
if tpower >= runningAverage {
runningAverageToTotalPowerPercentage[host] = (runningAverage / tpower) * 100
ratios[host] = (runningAverage / tpower) * 100
} else {
// We don't consider this host for the computation of the cluster wide cap.
}
}
// Determine the cluster wide cap value.
capValue := capper.getCap(runningAverageToTotalPowerPercentage)
capValue := capper.getCap(ratios)
// Need to cap the cluster to this value.
return capValue, nil
}
}
// Stringer for an instance of clusterwideCapper
func (capper clusterwideCapper) string() string {
func (capper ClusterwideCapper) String() string {
return "Cluster Capper -- Proactively cap the entire cluster."
}

View file

@ -56,7 +56,7 @@ func main() {
fmt.Println(task)
}
scheduler := schedulers.NewProactiveClusterwideCapRanked(tasks, *ignoreWatts)
scheduler := schedulers.NewPistonCapper(tasks, *ignoreWatts)
driver, err := sched.NewMesosSchedulerDriver(sched.DriverConfig{
Master: *master,
Framework: &mesos.FrameworkInfo{

View file

@ -4,12 +4,14 @@ Electron: Scheduling Algorithms
To Do:
* Design changes -- Possible to have one scheduler with different scheduling schemes?
* Make the running average calculation generic, so that schedulers in the future can use it and not implement their own.
* Fix the race condition on 'tasksRunning' in proactiveclusterwidecappingfcfs.go and proactiveclusterwidecappingranked.go
* Separate the capping strategies from the scheduling algorithms and make it possible to use any capping strategy with any scheduler.
Scheduling Algorithms:
* First Fit
* First Fit with sorted watts
* Bin-packing with sorted watts
* FCFS Proactive Cluster-wide Capping
* Ranked Proactive Cluster-wide Capping
* First Fit
* First Fit with sorted watts
* Piston Capping -- Works when scheduler is run with WAR

407
schedulers/pistoncapper.go Normal file
View file

@ -0,0 +1,407 @@
package schedulers
import (
"bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/rapl"
"errors"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler"
"log"
"math"
"strings"
"sync"
"time"
)
/*
Piston Capper implements the Scheduler interface
This is basically extending the BinPacking algorithm to also cap each node at a different values,
corresponding to the load on that node.
*/
type PistonCapper struct {
tasksCreated int
tasksRunning int
tasks []def.Task
metrics map[string]def.Metric
running map[string]map[string]bool
taskMonitor map[string][]def.Task
totalPower map[string]float64
ignoreWatts bool
ticker *time.Ticker
isCapping bool
// First set of PCP values are garbage values, signal to logger to start recording when we're
// about to schedule the new task.
RecordPCP bool
// This channel is closed when the program receives an interrupt,
// signalling that the program should shut down.
Shutdown chan struct{}
// This channel is closed after shutdown is closed, and only when all
// outstanding tasks have been cleaned up.
Done chan struct{}
// Controls when to shutdown pcp logging.
PCPLog chan struct{}
}
// New electron scheduler.
func NewPistonCapper(tasks []def.Task, ignoreWatts bool) *PistonCapper {
s := &PistonCapper{
tasks: tasks,
ignoreWatts: ignoreWatts,
Shutdown: make(chan struct{}),
Done: make(chan struct{}),
PCPLog: make(chan struct{}),
running: make(map[string]map[string]bool),
taskMonitor: make(map[string][]def.Task),
totalPower: make(map[string]float64),
RecordPCP: false,
ticker: time.NewTicker(5 * time.Second),
isCapping: false,
}
return s
}
// check whether task fits the offer or not.
func (s *PistonCapper) takeOffer(offerWatts float64, offerCPU float64, offerRAM float64,
totalWatts float64, totalCPU float64, totalRAM float64, task def.Task) bool {
if (s.ignoreWatts || (offerWatts >= (totalWatts + task.Watts))) &&
(offerCPU >= (totalCPU + task.CPU)) &&
(offerRAM >= (totalRAM + task.RAM)) {
return true
} else {
return false
}
}
// mutex
var mutex sync.Mutex
func (s *PistonCapper) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
s.tasksCreated++
if !s.RecordPCP {
// Turn on logging
s.RecordPCP = true
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
}
// If this is our first time running into this Agent
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
}
// Setting the task ID to the task. This is done so that we can consider each task to be different,
// even though they have the same parameters.
task.SetTaskID(*proto.String("electron-" + taskName))
// Add task to list of tasks running on node
s.running[offer.GetSlaveId().GoString()][taskName] = true
// Adding the task to the taskMonitor
if len(s.taskMonitor[*offer.Hostname]) == 0 {
s.taskMonitor[*offer.Hostname] = []def.Task{task}
} else {
s.taskMonitor[*offer.Hostname] = append(s.taskMonitor[*offer.Hostname], task)
}
resources := []*mesos.Resource{
mesosutil.NewScalarResource("cpus", task.CPU),
mesosutil.NewScalarResource("mem", task.RAM),
}
if !s.ignoreWatts {
resources = append(resources, mesosutil.NewScalarResource("watts", task.Watts))
}
return &mesos.TaskInfo{
Name: proto.String(taskName),
TaskId: &mesos.TaskID{
Value: proto.String("electron-" + taskName),
},
SlaveId: offer.SlaveId,
Resources: resources,
Command: &mesos.CommandInfo{
Value: proto.String(task.CMD),
},
Container: &mesos.ContainerInfo{
Type: mesos.ContainerInfo_DOCKER.Enum(),
Docker: &mesos.ContainerInfo_DockerInfo{
Image: proto.String(task.Image),
Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated
},
},
}
}
func (s *PistonCapper) Registered(
_ sched.SchedulerDriver,
frameworkID *mesos.FrameworkID,
masterInfo *mesos.MasterInfo) {
log.Printf("Framework %s registered with master %s", frameworkID, masterInfo)
}
func (s *PistonCapper) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) {
log.Printf("Framework re-registered with master %s", masterInfo)
}
func (s *PistonCapper) Disconnected(sched.SchedulerDriver) {
log.Println("Framework disconnected with master")
}
// go routine to cap the each node in the cluster at regular intervals of time.
var capValues = make(map[string]float64)
// Storing the previous cap value for each host so as to not repeatedly cap the nodes to the same value. (reduces overhead)
var previousRoundedCapValues = make(map[string]int)
func (s *PistonCapper) startCapping() {
go func() {
for {
select {
case <-s.ticker.C:
// Need to cap each node
mutex.Lock()
for host, capValue := range capValues {
roundedCapValue := int(math.Floor(capValue + 0.5))
// has the cap value changed
if prevRoundedCap, ok := previousRoundedCapValues[host]; ok {
if prevRoundedCap != roundedCapValue {
if err := rapl.Cap(host, "rapl", roundedCapValue); err != nil {
log.Println(err)
} else {
log.Printf("Capped [%s] at %d", host, int(math.Floor(capValue+0.5)))
}
previousRoundedCapValues[host] = roundedCapValue
}
} else {
if err := rapl.Cap(host, "rapl", roundedCapValue); err != nil {
log.Println(err)
} else {
log.Printf("Capped [%s] at %d", host, int(math.Floor(capValue+0.5)))
}
previousRoundedCapValues[host] = roundedCapValue
}
}
mutex.Unlock()
}
}
}()
}
// Stop the capping
func (s *PistonCapper) stopCapping() {
if s.isCapping {
log.Println("Stopping the capping.")
s.ticker.Stop()
mutex.Lock()
s.isCapping = false
mutex.Unlock()
}
}
func (s *PistonCapper) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {
log.Printf("Received %d resource offers", len(offers))
// retrieving the total power for each host in the offers
for _, offer := range offers {
if _, ok := s.totalPower[*offer.Hostname]; !ok {
_, _, offer_watts := OfferAgg(offer)
s.totalPower[*offer.Hostname] = offer_watts
}
}
// Displaying the totalPower
for host, tpower := range s.totalPower {
log.Printf("TotalPower[%s] = %f", host, tpower)
}
/*
Piston capping strategy
Perform bin-packing of tasks on nodes in the cluster, making sure that no task is given less hard-limit resources than requested.
For each set of tasks that are scheduled, compute the new cap values for each host in the cluster.
At regular intervals of time, cap each node in the cluster.
*/
for _, offer := range offers {
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, longFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
default:
}
fitTasks := []*mesos.TaskInfo{}
offerCPU, offerRAM, offerWatts := OfferAgg(offer)
taken := false
totalWatts := 0.0
totalCPU := 0.0
totalRAM := 0.0
// Store the partialLoad for host corresponding to this offer.
// Once we can't fit any more tasks, we update capValue for this host with partialLoad and then launch the fit tasks.
partialLoad := 0.0
for i, task := range s.tasks {
// Check host if it exists
if task.Host != "" {
// Don't take offer if it doens't match our task's host requirement.
if !strings.HasPrefix(*offer.Hostname, task.Host) {
continue
}
}
for *task.Instances > 0 {
// Does the task fit
if s.takeOffer(offerWatts, offerCPU, offerRAM, totalWatts, totalCPU, totalRAM, task) {
// Start piston capping if haven't started yet
if !s.isCapping {
s.isCapping = true
s.startCapping()
}
taken = true
totalWatts += task.Watts
totalCPU += task.CPU
totalRAM += task.RAM
log.Println("Co-Located with: ")
coLocated(s.running[offer.GetSlaveId().GoString()])
fitTasks = append(fitTasks, s.newTask(offer, task))
log.Println("Inst: ", *task.Instances)
*task.Instances--
// updating the cap value for offer.Hostname
partialLoad += ((task.Watts * constants.CapMargin) / s.totalPower[*offer.Hostname]) * 100
if *task.Instances <= 0 {
// All instances of task have been scheduled. Remove it
s.tasks = append(s.tasks[:i], s.tasks[i+1:]...)
if len(s.tasks) <= 0 {
log.Println("Done scheduling all tasks")
close(s.Shutdown)
}
}
} else {
break // Continue on to next task
}
}
}
if taken {
// Updating the cap value for offer.Hostname
mutex.Lock()
capValues[*offer.Hostname] += partialLoad
mutex.Unlock()
log.Printf("Starting on [%s]\n", offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, fitTasks, defaultFilter)
} else {
// If there was no match for task
log.Println("There is not enough resources to launch task: ")
cpus, mem, watts := OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, defaultFilter)
}
}
}
// Remove finished task from the taskMonitor
func (s *PistonCapper) deleteFromTaskMonitor(finishedTaskID string) (def.Task, string, error) {
hostOfFinishedTask := ""
indexOfFinishedTask := -1
found := false
var finishedTask def.Task
for host, tasks := range s.taskMonitor {
for i, task := range tasks {
if task.TaskID == finishedTaskID {
hostOfFinishedTask = host
indexOfFinishedTask = i
found = true
}
}
if found {
break
}
}
if hostOfFinishedTask != "" && indexOfFinishedTask != -1 {
finishedTask = s.taskMonitor[hostOfFinishedTask][indexOfFinishedTask]
log.Printf("Removing task with TaskID [%s] from the list of running tasks\n",
s.taskMonitor[hostOfFinishedTask][indexOfFinishedTask].TaskID)
s.taskMonitor[hostOfFinishedTask] = append(s.taskMonitor[hostOfFinishedTask][:indexOfFinishedTask],
s.taskMonitor[hostOfFinishedTask][indexOfFinishedTask+1:]...)
} else {
return finishedTask, hostOfFinishedTask, errors.New("Finished Task not present in TaskMonitor")
}
return finishedTask, hostOfFinishedTask, nil
}
func (s *PistonCapper) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) {
log.Printf("Received task status [%s] for task [%s]\n", NameFor(status.State), *status.TaskId.Value)
if *status.State == mesos.TaskState_TASK_RUNNING {
mutex.Lock()
s.tasksRunning++
mutex.Unlock()
} else if IsTerminal(status.State) {
delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value)
// Deleting the task from the taskMonitor
finishedTask, hostOfFinishedTask, err := s.deleteFromTaskMonitor(*status.TaskId.Value)
if err != nil {
log.Println(err)
}
// Need to update the cap values for host of the finishedTask
mutex.Lock()
capValues[hostOfFinishedTask] -= ((finishedTask.Watts * constants.CapMargin) / s.totalPower[hostOfFinishedTask]) * 100
// Checking to see if the cap value has become 0, in which case we uncap the host.
if int(math.Floor(capValues[hostOfFinishedTask]+0.5)) == 0 {
capValues[hostOfFinishedTask] = 100
}
s.tasksRunning--
mutex.Unlock()
if s.tasksRunning == 0 {
select {
case <-s.Shutdown:
s.stopCapping()
close(s.Done)
default:
}
}
}
log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
}
func (s *PistonCapper) FrameworkMessage(
driver sched.SchedulerDriver,
executorID *mesos.ExecutorID,
slaveID *mesos.SlaveID,
message string) {
log.Println("Getting a framework message: ", message)
log.Printf("Received a framework message from some unknown source: %s", *executorID.Value)
}
func (s *PistonCapper) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) {
log.Printf("Offer %s rescinded", offerID)
}
func (s *PistonCapper) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) {
log.Printf("Slave %s lost", slaveID)
}
func (s *PistonCapper) ExecutorLost(_ sched.SchedulerDriver, executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, status int) {
log.Printf("Executor %s on slave %s was lost", executorID, slaveID)
}
func (s *PistonCapper) Error(_ sched.SchedulerDriver, err string) {
log.Printf("Receiving an error: %s", err)
}

View file

@ -3,6 +3,7 @@ package schedulers
import (
"bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/pcp"
"bitbucket.org/sunybingcloud/electron/rapl"
"fmt"
"github.com/golang/protobuf/proto"
@ -37,7 +38,7 @@ type ProactiveClusterwideCapFCFS struct {
availablePower map[string]float64 // available power for each node in the cluster.
totalPower map[string]float64 // total power for each node in the cluster.
ignoreWatts bool
capper *clusterwideCapper
capper *pcp.ClusterwideCapper
ticker *time.Ticker
recapTicker *time.Ticker
isCapping bool // indicate whether we are currently performing cluster wide capping.
@ -72,7 +73,7 @@ func NewProactiveClusterwideCapFCFS(tasks []def.Task, ignoreWatts bool) *Proacti
availablePower: make(map[string]float64),
totalPower: make(map[string]float64),
RecordPCP: false,
capper: getClusterwideCapperInstance(),
capper: pcp.GetClusterwideCapperInstance(),
ticker: time.NewTicker(10 * time.Second),
recapTicker: time.NewTicker(20 * time.Second),
isCapping: false,
@ -290,7 +291,7 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive
s.startCapping()
}
taken = true
tempCap, err := s.capper.fcfsDetermineCap(s.totalPower, &task)
tempCap, err := s.capper.FCFSDeterminedCap(s.totalPower, &task)
if err == nil {
fcfsMutex.Lock()
@ -345,10 +346,10 @@ func (s *ProactiveClusterwideCapFCFS) StatusUpdate(driver sched.SchedulerDriver,
} else if IsTerminal(status.State) {
delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value)
// Need to remove the task from the window of tasks.
s.capper.taskFinished(*status.TaskId.Value)
s.capper.TaskFinished(*status.TaskId.Value)
// Determining the new cluster wide cap.
//tempCap, err := s.capper.recap(s.totalPower, s.taskMonitor, *status.TaskId.Value)
tempCap, err := s.capper.cleverRecap(s.totalPower, s.taskMonitor, *status.TaskId.Value)
//tempCap, err := s.capper.Recap(s.totalPower, s.taskMonitor, *status.TaskId.Value)
tempCap, err := s.capper.CleverRecap(s.totalPower, s.taskMonitor, *status.TaskId.Value)
if err == nil {
// if new determined cap value is different from the current recap value then we need to recap.
if int(math.Floor(tempCap+0.5)) != int(math.Floor(fcfsRecapValue+0.5)) {

View file

@ -13,6 +13,7 @@ package schedulers
import (
"bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/pcp"
"bitbucket.org/sunybingcloud/electron/rapl"
"fmt"
"github.com/golang/protobuf/proto"
@ -21,6 +22,7 @@ import (
sched "github.com/mesos/mesos-go/scheduler"
"log"
"math"
"sort"
"strings"
"sync"
"time"
@ -47,7 +49,7 @@ type ProactiveClusterwideCapRanked struct {
availablePower map[string]float64 // available power for each node in the cluster.
totalPower map[string]float64 // total power for each node in the cluster.
ignoreWatts bool
capper *clusterwideCapper
capper *pcp.ClusterwideCapper
ticker *time.Ticker
recapTicker *time.Ticker
isCapping bool // indicate whether we are currently performing cluster wide capping.
@ -82,7 +84,7 @@ func NewProactiveClusterwideCapRanked(tasks []def.Task, ignoreWatts bool) *Proac
availablePower: make(map[string]float64),
totalPower: make(map[string]float64),
RecordPCP: false,
capper: getClusterwideCapperInstance(),
capper: pcp.GetClusterwideCapperInstance(),
ticker: time.NewTicker(10 * time.Second),
recapTicker: time.NewTicker(20 * time.Second),
isCapping: false,
@ -263,7 +265,7 @@ func (s *ProactiveClusterwideCapRanked) ResourceOffers(driver sched.SchedulerDri
// sorting the tasks in ascending order of watts.
if (len(s.tasks) > 0) {
s.capper.sortTasks(&s.tasks)
sort.Sort(def.WattsSorter(s.tasks))
// calculating the total number of tasks ranked.
numberOfRankedTasks := 0
for _, task := range s.tasks {
@ -313,7 +315,7 @@ func (s *ProactiveClusterwideCapRanked) ResourceOffers(driver sched.SchedulerDri
s.startCapping()
}
taken = true
tempCap, err := s.capper.fcfsDetermineCap(s.totalPower, &task)
tempCap, err := s.capper.FCFSDeterminedCap(s.totalPower, &task)
if err == nil {
rankedMutex.Lock()
@ -379,10 +381,10 @@ func (s *ProactiveClusterwideCapRanked) StatusUpdate(driver sched.SchedulerDrive
}
} else {
// Need to remove the task from the window
s.capper.taskFinished(*status.TaskId.Value)
s.capper.TaskFinished(*status.TaskId.Value)
// Determining the new cluster wide cap.
//tempCap, err := s.capper.recap(s.totalPower, s.taskMonitor, *status.TaskId.Value)
tempCap, err := s.capper.cleverRecap(s.totalPower, s.taskMonitor, *status.TaskId.Value)
//tempCap, err := s.capper.Recap(s.totalPower, s.taskMonitor, *status.TaskId.Value)
tempCap, err := s.capper.CleverRecap(s.totalPower, s.taskMonitor, *status.TaskId.Value)
if err == nil {
// If new determined cap value is different from the current recap value then we need to recap.

108
utilities/runAvg/runAvg.go Normal file
View file

@ -0,0 +1,108 @@
/*
A utility to calculate the running average.
One should implement Val() to be able to use this utility.
*/
package runAvg
import (
"errors"
"container/list"
)
type Interface interface {
// Value to use for running average calculation.
Val() float64
// Unique ID
ID() string
}
type runningAverageCalculator struct {
window list.List
windowSize int
currentSum float64
}
// singleton instance
var racSingleton *runningAverageCalculator
// return single instance
func getInstance(curSum float64, wSize int) *runningAverageCalculator {
if racSingleton == nil {
racSingleton = &runningAverageCalculator {
windowSize: wSize,
currentSum: curSum,
}
return racSingleton
} else {
// Updating window size if a new window size is given.
if wSize != racSingleton.windowSize {
racSingleton.windowSize = wSize
}
return racSingleton
}
}
// Compute the running average by adding 'data' to the window.
// Updating currentSum to get constant time complexity for every running average computation.
func (rac *runningAverageCalculator) calculate(data Interface) float64 {
if rac.window.Len() < rac.windowSize {
rac.window.PushBack(data)
rac.currentSum += data.Val()
} else {
// removing the element at the front of the window.
elementToRemove := rac.window.Front()
rac.currentSum -= elementToRemove.Value.(Interface).Val()
rac.window.Remove(elementToRemove)
// adding new element to the window
rac.window.PushBack(data)
rac.currentSum += data.Val()
}
return rac.currentSum / float64(rac.window.Len())
}
/*
If element with given ID present in the window, then remove it and return (removeElement, nil).
Else, return (nil, error)
*/
func (rac *runningAverageCalculator) removeFromWindow(id string) (interface{}, error) {
for element := rac.window.Front(); element != nil; element = element.Next() {
if elementToRemove := element.Value.(Interface); elementToRemove.ID() == id {
rac.window.Remove(element)
rac.currentSum -= elementToRemove.Val()
return elementToRemove, nil
}
}
return nil, errors.New("Error: Element not found in the window.")
}
// Taking windowSize as a parameter to allow for sliding window implementation.
func Calc(data Interface, windowSize int) float64 {
rac := getInstance(0.0, windowSize)
return rac.calculate(data)
}
// Remove element from the window if it is present.
func Remove(id string) (interface{}, error) {
// checking if racSingleton has been instantiated
if racSingleton == nil {
return nil, errors.New("Error: Not instantiated. Please call Init() to instantiate.")
} else {
return racSingleton.removeFromWindow(id)
}
}
// initialize the parameters of the running average calculator
func Init() {
// checking to see if racSingleton needs top be instantiated
if racSingleton == nil {
racSingleton = getInstance(0.0, 0)
}
// Setting parameters to default values. Could also set racSingleton to nil but this leads to unnecessary overhead of creating
// another instance when Calc is called.
racSingleton.window.Init()
racSingleton.windowSize = 0
racSingleton.currentSum = 0.0
}

View file

@ -7,7 +7,7 @@ The Pair and PairList have been taken from google groups forum,
https://groups.google.com/forum/#!topic/golang-nuts/FT7cjmcL7gw
*/
// Utility struct that helps in sorting the available power by value.
// Utility struct that helps in sorting a map[string]float64 by value.
type Pair struct {
Key string
Value float64