formatted the code

This commit is contained in:
Pradyumna Kaushik 2016-11-14 22:53:06 -05:00 committed by Renan DelValle
parent 4cc1dd8e63
commit 7522cf9879
2 changed files with 397 additions and 396 deletions

View file

@ -11,51 +11,52 @@ This is not a scheduler but a scheduling scheme that schedulers can use.
package schedulers package schedulers
import ( import (
"bitbucket.org/sunybingcloud/electron/constants" "bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/def" "bitbucket.org/sunybingcloud/electron/def"
"container/list" "container/list"
"errors" "errors"
"github.com/montanaflynn/stats" "github.com/montanaflynn/stats"
"sort" "sort"
) )
// Structure containing utility data structures used to compute cluster-wide dynamic cap. // Structure containing utility data structures used to compute cluster-wide dynamic cap.
type clusterwideCapper struct { type clusterwideCapper struct {
// window of tasks. // window of tasks.
window_of_tasks list.List window_of_tasks list.List
// The current sum of requested powers of the tasks in the window. // The current sum of requested powers of the tasks in the window.
current_sum float64 current_sum float64
// The current number of tasks in the window. // The current number of tasks in the window.
number_of_tasks_in_window int number_of_tasks_in_window int
} }
// Defining constructor for clusterwideCapper. Please don't call this directly and instead use getClusterwideCapperInstance(). // Defining constructor for clusterwideCapper. Please don't call this directly and instead use getClusterwideCapperInstance().
func newClusterwideCapper() *clusterwideCapper { func newClusterwideCapper() *clusterwideCapper {
return &clusterwideCapper{current_sum: 0.0, number_of_tasks_in_window: 0} return &clusterwideCapper{current_sum: 0.0, number_of_tasks_in_window: 0}
} }
// Singleton instance of clusterwideCapper // Singleton instance of clusterwideCapper
var singleton_capper *clusterwideCapper var singleton_capper *clusterwideCapper
// Retrieve the singleton instance of clusterwideCapper. // Retrieve the singleton instance of clusterwideCapper.
func getClusterwideCapperInstance() *clusterwideCapper { func getClusterwideCapperInstance() *clusterwideCapper {
if singleton_capper == nil { if singleton_capper == nil {
singleton_capper = newClusterwideCapper() singleton_capper = newClusterwideCapper()
} else { } else {
// Do nothing // Do nothing
} }
return singleton_capper return singleton_capper
} }
// Clear and initialize all the members of clusterwideCapper. // Clear and initialize all the members of clusterwideCapper.
func (capper clusterwideCapper) clear() { func (capper clusterwideCapper) clear() {
capper.window_of_tasks.Init() capper.window_of_tasks.Init()
capper.current_sum = 0 capper.current_sum = 0
capper.number_of_tasks_in_window = 0 capper.number_of_tasks_in_window = 0
} }
// Compute the average of watts of all the tasks in the window. // Compute the average of watts of all the tasks in the window.
func (capper clusterwideCapper) average() float64 { func (capper clusterwideCapper) average() float64 {
return capper.current_sum / float64(capper.window_of_tasks.Len()) return capper.current_sum / float64(capper.window_of_tasks.Len())
} }
/* /*
@ -65,22 +66,22 @@ Using clusterwideCapper#window_of_tasks to store the tasks.
Task at position 0 (oldest task) is removed when the window is full and new task arrives. Task at position 0 (oldest task) is removed when the window is full and new task arrives.
*/ */
func (capper clusterwideCapper) running_average_of_watts(tsk *def.Task) float64 { func (capper clusterwideCapper) running_average_of_watts(tsk *def.Task) float64 {
var average float64 var average float64
if capper.number_of_tasks_in_window < constants.Window_size { if capper.number_of_tasks_in_window < constants.Window_size {
capper.window_of_tasks.PushBack(tsk) capper.window_of_tasks.PushBack(tsk)
capper.number_of_tasks_in_window++ capper.number_of_tasks_in_window++
capper.current_sum += float64(tsk.Watts) * constants.Cap_margin capper.current_sum += float64(tsk.Watts) * constants.Cap_margin
} else { } else {
task_to_remove_element := capper.window_of_tasks.Front() task_to_remove_element := capper.window_of_tasks.Front()
if task_to_remove, ok := task_to_remove_element.Value.(*def.Task); ok { if task_to_remove, ok := task_to_remove_element.Value.(*def.Task); ok {
capper.current_sum -= float64(task_to_remove.Watts) * constants.Cap_margin capper.current_sum -= float64(task_to_remove.Watts) * constants.Cap_margin
capper.window_of_tasks.Remove(task_to_remove_element) capper.window_of_tasks.Remove(task_to_remove_element)
} }
capper.window_of_tasks.PushBack(tsk) capper.window_of_tasks.PushBack(tsk)
capper.current_sum += float64(tsk.Watts) * constants.Cap_margin capper.current_sum += float64(tsk.Watts) * constants.Cap_margin
} }
average = capper.average() average = capper.average()
return average return average
} }
/* /*
@ -91,22 +92,22 @@ Calculating cap value.
3. The median is now the cap. 3. The median is now the cap.
*/ */
func (capper clusterwideCapper) get_cap(running_average_to_total_power_percentage map[string]float64) float64 { func (capper clusterwideCapper) get_cap(running_average_to_total_power_percentage map[string]float64) float64 {
var values []float64 var values []float64
// Validation // Validation
if running_average_to_total_power_percentage == nil { if running_average_to_total_power_percentage == nil {
return 100.0 return 100.0
} }
for _, apower := range running_average_to_total_power_percentage { for _, apower := range running_average_to_total_power_percentage {
values = append(values, apower) values = append(values, apower)
} }
// sorting the values in ascending order. // sorting the values in ascending order.
sort.Float64s(values) sort.Float64s(values)
// Calculating the median // Calculating the median
if median, err := stats.Median(values); err == nil { if median, err := stats.Median(values); err == nil {
return median return median
} }
// should never reach here. If here, then just setting the cap value to be 100 // should never reach here. If here, then just setting the cap value to be 100
return 100.0 return 100.0
} }
/* /*
@ -120,72 +121,72 @@ Recapping the entire cluster.
This needs to be called whenever a task finishes execution. This needs to be called whenever a task finishes execution.
*/ */
func (capper clusterwideCapper) recap(total_power map[string]float64, func (capper clusterwideCapper) recap(total_power map[string]float64,
task_monitor map[string][]def.Task, finished_taskId string) (float64, error) { task_monitor map[string][]def.Task, finished_taskId string) (float64, error) {
// Validation // Validation
if total_power == nil || task_monitor == nil { if total_power == nil || task_monitor == nil {
return 100.0, errors.New("Invalid argument: total_power, task_monitor") return 100.0, errors.New("Invalid argument: total_power, task_monitor")
} }
total_allocated_power := 0.0 total_allocated_power := 0.0
total_running_tasks := 0 total_running_tasks := 0
for _, tasks := range task_monitor { for _, tasks := range task_monitor {
index := 0 index := 0
for i, task := range tasks { for i, task := range tasks {
if task.TaskID == finished_taskId { if task.TaskID == finished_taskId {
index = i index = i
continue continue
} }
total_allocated_power += float64(task.Watts) * constants.Cap_margin total_allocated_power += float64(task.Watts) * constants.Cap_margin
total_running_tasks++ total_running_tasks++
} }
tasks = append(tasks[:index], tasks[index+1:]...) tasks = append(tasks[:index], tasks[index+1:]...)
} }
average := total_allocated_power / float64(total_running_tasks) average := total_allocated_power / float64(total_running_tasks)
ratios := []float64{} ratios := []float64{}
for _, tpower := range total_power { for _, tpower := range total_power {
ratios = append(ratios, (average/tpower) * 100) ratios = append(ratios, (average/tpower)*100)
} }
sort.Float64s(ratios) sort.Float64s(ratios)
median, err := stats.Median(ratios) median, err := stats.Median(ratios)
if err == nil { if err == nil {
return median, nil return median, nil
} else { } else {
return 100, err return 100, err
} }
} }
/* Quick sort algorithm to sort tasks, in place, in ascending order of power.*/ /* Quick sort algorithm to sort tasks, in place, in ascending order of power.*/
func (capper clusterwideCapper) quick_sort(low int, high int, tasks_to_sort []*def.Task) { func (capper clusterwideCapper) quick_sort(low int, high int, tasks_to_sort []*def.Task) {
i := low i := low
j := high j := high
// calculating the pivot // calculating the pivot
pivot_index := low + (high - low)/2 pivot_index := low + (high-low)/2
pivot := tasks_to_sort[pivot_index] pivot := tasks_to_sort[pivot_index]
for i <= j { for i <= j {
for tasks_to_sort[i].Watts < pivot.Watts { for tasks_to_sort[i].Watts < pivot.Watts {
i++ i++
} }
for tasks_to_sort[j].Watts > pivot.Watts { for tasks_to_sort[j].Watts > pivot.Watts {
j-- j--
} }
if i <= j { if i <= j {
temp := tasks_to_sort[i] temp := tasks_to_sort[i]
tasks_to_sort[i] = tasks_to_sort[j] tasks_to_sort[i] = tasks_to_sort[j]
tasks_to_sort[j] = temp tasks_to_sort[j] = temp
i++ i++
j-- j--
} }
} }
if low < j { if low < j {
capper.quick_sort(low, j, tasks_to_sort) capper.quick_sort(low, j, tasks_to_sort)
} }
if i < high { if i < high {
capper.quick_sort(i, high, tasks_to_sort) capper.quick_sort(i, high, tasks_to_sort)
} }
} }
// Sorting tasks in ascending order of requested watts. // Sorting tasks in ascending order of requested watts.
func (capper clusterwideCapper) sort_tasks(tasks_to_sort []*def.Task) { func (capper clusterwideCapper) sort_tasks(tasks_to_sort []*def.Task) {
capper.quick_sort(0, len(tasks_to_sort)-1, tasks_to_sort) capper.quick_sort(0, len(tasks_to_sort)-1, tasks_to_sort)
} }
/* /*
@ -195,86 +196,86 @@ This completed task needs to be removed from the window of tasks (if it is still
so that it doesn't contribute to the computation of the cap value. so that it doesn't contribute to the computation of the cap value.
*/ */
func (capper clusterwideCapper) taskFinished(taskID string) { func (capper clusterwideCapper) taskFinished(taskID string) {
// If the window is empty the just return. This condition should technically return false. // If the window is empty the just return. This condition should technically return false.
if capper.window_of_tasks.Len() == 0 { if capper.window_of_tasks.Len() == 0 {
return return
} }
// Checking whether the task with the given taskID is currently present in the window of tasks. // Checking whether the task with the given taskID is currently present in the window of tasks.
var task_element_to_remove *list.Element var task_element_to_remove *list.Element
for task_element := capper.window_of_tasks.Front(); task_element != nil; task_element = task_element.Next() { for task_element := capper.window_of_tasks.Front(); task_element != nil; task_element = task_element.Next() {
if tsk, ok := task_element.Value.(*def.Task); ok { if tsk, ok := task_element.Value.(*def.Task); ok {
if tsk.TaskID == taskID { if tsk.TaskID == taskID {
task_element_to_remove = task_element task_element_to_remove = task_element
} }
} }
} }
// Ee need to remove the task from the window. // Ee need to remove the task from the window.
if task_to_remove, ok := task_element_to_remove.Value.(*def.Task); ok { if task_to_remove, ok := task_element_to_remove.Value.(*def.Task); ok {
capper.window_of_tasks.Remove(task_element_to_remove) capper.window_of_tasks.Remove(task_element_to_remove)
capper.number_of_tasks_in_window -= 1 capper.number_of_tasks_in_window -= 1
capper.current_sum -= float64(task_to_remove.Watts) * constants.Cap_margin capper.current_sum -= float64(task_to_remove.Watts) * constants.Cap_margin
} }
} }
// Ranked based scheduling. // Ranked based scheduling.
func (capper clusterwideCapper) rankedDetermineCap(available_power map[string]float64, func (capper clusterwideCapper) rankedDetermineCap(available_power map[string]float64,
tasks_to_schedule []*def.Task) ([]*def.Task, map[int]float64, error) { tasks_to_schedule []*def.Task) ([]*def.Task, map[int]float64, error) {
// Validation // Validation
if available_power == nil || len(tasks_to_schedule) == 0 { if available_power == nil || len(tasks_to_schedule) == 0 {
return nil, nil, errors.New("Invalid argument: available_power, tasks_to_schedule") return nil, nil, errors.New("Invalid argument: available_power, tasks_to_schedule")
} else { } else {
// Need to sort the tasks in ascending order of requested power. // Need to sort the tasks in ascending order of requested power.
capper.sort_tasks(tasks_to_schedule) capper.sort_tasks(tasks_to_schedule)
// Now, for each task in the sorted set of tasks, we need to use the Fcfs_determine_cap logic. // Now, for each task in the sorted set of tasks, we need to use the Fcfs_determine_cap logic.
cluster_wide_cap_values := make(map[int]float64) cluster_wide_cap_values := make(map[int]float64)
index := 0 index := 0
for _, tsk := range tasks_to_schedule { for _, tsk := range tasks_to_schedule {
/* /*
Note that even though Fcfs_determine_cap is called, we have sorted the tasks aprior and thus, the tasks are scheduled in the sorted fashion. Note that even though Fcfs_determine_cap is called, we have sorted the tasks aprior and thus, the tasks are scheduled in the sorted fashion.
Calling Fcfs_determine_cap(...) just to avoid redundant code. Calling Fcfs_determine_cap(...) just to avoid redundant code.
*/ */
if cap, err := capper.fcfsDetermineCap(available_power, tsk); err == nil { if cap, err := capper.fcfsDetermineCap(available_power, tsk); err == nil {
cluster_wide_cap_values[index] = cap cluster_wide_cap_values[index] = cap
} else { } else {
return nil, nil, err return nil, nil, err
} }
index++ index++
} }
// Now returning the sorted set of tasks and the cluster wide cap values for each task that is launched. // Now returning the sorted set of tasks and the cluster wide cap values for each task that is launched.
return tasks_to_schedule, cluster_wide_cap_values, nil return tasks_to_schedule, cluster_wide_cap_values, nil
} }
} }
// First come first serve scheduling. // First come first serve scheduling.
func (capper clusterwideCapper) fcfsDetermineCap(total_power map[string]float64, func (capper clusterwideCapper) fcfsDetermineCap(total_power map[string]float64,
new_task *def.Task) (float64, error) { new_task *def.Task) (float64, error) {
// Validation // Validation
if total_power == nil { if total_power == nil {
return 100, errors.New("Invalid argument: total_power") return 100, errors.New("Invalid argument: total_power")
} else { } else {
// Need to calculate the running average // Need to calculate the running average
running_average := capper.running_average_of_watts(new_task) running_average := capper.running_average_of_watts(new_task)
// For each node, calculate the percentage of the running average to the total power. // For each node, calculate the percentage of the running average to the total power.
running_average_to_total_power_percentage := make(map[string]float64) running_average_to_total_power_percentage := make(map[string]float64)
for host, tpower := range total_power { for host, tpower := range total_power {
if tpower >= running_average { if tpower >= running_average {
running_average_to_total_power_percentage[host] = (running_average/tpower) * 100 running_average_to_total_power_percentage[host] = (running_average / tpower) * 100
} else { } else {
// We don't consider this host for the computation of the cluster wide cap. // We don't consider this host for the computation of the cluster wide cap.
} }
} }
// Determine the cluster wide cap value. // Determine the cluster wide cap value.
cap_value := capper.get_cap(running_average_to_total_power_percentage) cap_value := capper.get_cap(running_average_to_total_power_percentage)
// Need to cap the cluster to this value. // Need to cap the cluster to this value.
return cap_value, nil return cap_value, nil
} }
} }
// Stringer for an instance of clusterwideCapper // Stringer for an instance of clusterwideCapper
func (capper clusterwideCapper) string() string { func (capper clusterwideCapper) string() string {
return "Cluster Capper -- Proactively cap the entire cluster." return "Cluster Capper -- Proactively cap the entire cluster."
} }

View file

@ -1,111 +1,111 @@
package schedulers package schedulers
import ( import (
"bitbucket.org/sunybingcloud/electron/def" "bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/constants" "bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/rapl" "bitbucket.org/sunybingcloud/electron/rapl"
"fmt" "fmt"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto" mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil" "github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler" sched "github.com/mesos/mesos-go/scheduler"
"log" "log"
"math" "math"
"strings" "strings"
"time" "time"
) )
// Decides if to take an offer or not // Decides if to take an offer or not
func (_ *ProactiveClusterwideCapFCFS) takeOffer(offer *mesos.Offer, task def.Task) bool { func (_ *ProactiveClusterwideCapFCFS) takeOffer(offer *mesos.Offer, task def.Task) bool {
offer_cpu, offer_mem, _ := OfferAgg(offer) offer_cpu, offer_mem, _ := OfferAgg(offer)
if offer_cpu >= task.CPU && offer_mem >= task.RAM { if offer_cpu >= task.CPU && offer_mem >= task.RAM {
return true return true
} }
return false return false
} }
// electronScheduler implements the Scheduler interface. // electronScheduler implements the Scheduler interface.
type ProactiveClusterwideCapFCFS struct { type ProactiveClusterwideCapFCFS struct {
tasksCreated int tasksCreated int
tasksRunning int tasksRunning int
tasks []def.Task tasks []def.Task
metrics map[string]def.Metric metrics map[string]def.Metric
running map[string]map[string]bool running map[string]map[string]bool
taskMonitor map[string][]def.Task // store tasks that are currently running. taskMonitor map[string][]def.Task // store tasks that are currently running.
availablePower map[string]float64 // available power for each node in the cluster. availablePower map[string]float64 // available power for each node in the cluster.
totalPower map[string]float64 // total power for each node in the cluster. totalPower map[string]float64 // total power for each node in the cluster.
ignoreWatts bool ignoreWatts bool
capper *clusterwideCapper capper *clusterwideCapper
ticker *time.Ticker ticker *time.Ticker
isCapping bool // indicate whether we are currently performing cluster wide capping. isCapping bool // indicate whether we are currently performing cluster wide capping.
//lock *sync.Mutex //lock *sync.Mutex
// First set of PCP values are garbage values, signal to logger to start recording when we're // First set of PCP values are garbage values, signal to logger to start recording when we're
// about to schedule the new task. // about to schedule the new task.
RecordPCP bool RecordPCP bool
// This channel is closed when the program receives an interrupt, // This channel is closed when the program receives an interrupt,
// signalling that the program should shut down. // signalling that the program should shut down.
Shutdown chan struct{} Shutdown chan struct{}
// This channel is closed after shutdown is closed, and only when all // This channel is closed after shutdown is closed, and only when all
// outstanding tasks have been cleaned up. // outstanding tasks have been cleaned up.
Done chan struct{} Done chan struct{}
// Controls when to shutdown pcp logging. // Controls when to shutdown pcp logging.
PCPLog chan struct{} PCPLog chan struct{}
} }
// New electron scheduler. // New electron scheduler.
func NewProactiveClusterwideCapFCFS(tasks []def.Task, ignoreWatts bool) *ProactiveClusterwideCapFCFS { func NewProactiveClusterwideCapFCFS(tasks []def.Task, ignoreWatts bool) *ProactiveClusterwideCapFCFS {
s := &ProactiveClusterwideCapFCFS { s := &ProactiveClusterwideCapFCFS{
tasks: tasks, tasks: tasks,
ignoreWatts: ignoreWatts, ignoreWatts: ignoreWatts,
Shutdown: make(chan struct{}), Shutdown: make(chan struct{}),
Done: make(chan struct{}), Done: make(chan struct{}),
PCPLog: make(chan struct{}), PCPLog: make(chan struct{}),
running: make(map[string]map[string]bool), running: make(map[string]map[string]bool),
taskMonitor: make(map[string][]def.Task), taskMonitor: make(map[string][]def.Task),
availablePower: make(map[string]float64), availablePower: make(map[string]float64),
totalPower: make(map[string]float64), totalPower: make(map[string]float64),
RecordPCP: false, RecordPCP: false,
capper: getClusterwideCapperInstance(), capper: getClusterwideCapperInstance(),
ticker: time.NewTicker(5 * time.Second), ticker: time.NewTicker(5 * time.Second),
isCapping: false, isCapping: false,
//lock: new(sync.Mutex), //lock: new(sync.Mutex),
} }
return s return s
} }
func (s *ProactiveClusterwideCapFCFS) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { func (s *ProactiveClusterwideCapFCFS) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
s.tasksCreated++ s.tasksCreated++
if !s.RecordPCP { if !s.RecordPCP {
// Turn on logging. // Turn on logging.
s.RecordPCP = true s.RecordPCP = true
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
} }
// If this is our first time running into this Agent // If this is our first time running into this Agent
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok { if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool) s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
} }
// Setting the task ID to the task. This is done so that we can consider each task to be different, // Setting the task ID to the task. This is done so that we can consider each task to be different,
// even though they have the same parameters. // even though they have the same parameters.
task.SetTaskID(*proto.String(taskName)) task.SetTaskID(*proto.String(taskName))
// Add task to the list of tasks running on the node. // Add task to the list of tasks running on the node.
s.running[offer.GetSlaveId().GoString()][taskName] = true s.running[offer.GetSlaveId().GoString()][taskName] = true
s.taskMonitor[offer.GetSlaveId().GoString()] = []def.Task{task} s.taskMonitor[offer.GetSlaveId().GoString()] = []def.Task{task}
resources := []*mesos.Resource{ resources := []*mesos.Resource{
mesosutil.NewScalarResource("cpus", task.CPU), mesosutil.NewScalarResource("cpus", task.CPU),
mesosutil.NewScalarResource("mem", task.RAM), mesosutil.NewScalarResource("mem", task.RAM),
} }
if !s.ignoreWatts { if !s.ignoreWatts {
resources = append(resources, mesosutil.NewScalarResource("watts", task.Watts)) resources = append(resources, mesosutil.NewScalarResource("watts", task.Watts))
} }
@ -130,189 +130,189 @@ func (s *ProactiveClusterwideCapFCFS) newTask(offer *mesos.Offer, task def.Task)
} }
func (s *ProactiveClusterwideCapFCFS) Registered( func (s *ProactiveClusterwideCapFCFS) Registered(
_ sched.SchedulerDriver, _ sched.SchedulerDriver,
frameworkID *mesos.FrameworkID, frameworkID *mesos.FrameworkID,
masterInfo *mesos.MasterInfo) { masterInfo *mesos.MasterInfo) {
log.Printf("Framework %s registered with master %s", frameworkID, masterInfo) log.Printf("Framework %s registered with master %s", frameworkID, masterInfo)
} }
func (s *ProactiveClusterwideCapFCFS) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) { func (s *ProactiveClusterwideCapFCFS) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) {
log.Printf("Framework re-registered with master %s", masterInfo) log.Printf("Framework re-registered with master %s", masterInfo)
} }
func (s *ProactiveClusterwideCapFCFS) Disconnected(sched.SchedulerDriver) { func (s *ProactiveClusterwideCapFCFS) Disconnected(sched.SchedulerDriver) {
// Need to stop the capping process. // Need to stop the capping process.
s.ticker.Stop() s.ticker.Stop()
s.isCapping = false s.isCapping = false
log.Println("Framework disconnected with master") log.Println("Framework disconnected with master")
} }
// go routine to cap the entire cluster in regular intervals of time. // go routine to cap the entire cluster in regular intervals of time.
var currentCapValue = 0.0 // initial value to indicate that we haven't capped the cluster yet. var currentCapValue = 0.0 // initial value to indicate that we haven't capped the cluster yet.
func (s *ProactiveClusterwideCapFCFS) startCapping() { func (s *ProactiveClusterwideCapFCFS) startCapping() {
go func() { go func() {
for { for {
select { select {
case <- s.ticker.C: case <-s.ticker.C:
// Need to cap the cluster to the currentCapValue. // Need to cap the cluster to the currentCapValue.
if currentCapValue > 0.0 { if currentCapValue > 0.0 {
//mutex.Lock() //mutex.Lock()
//s.lock.Lock() //s.lock.Lock()
for _, host := range constants.Hosts { for _, host := range constants.Hosts {
// Rounding curreCapValue to the nearest int. // Rounding curreCapValue to the nearest int.
if err := rapl.Cap(host, "rapl", int(math.Floor(currentCapValue + 0.5))); err != nil { if err := rapl.Cap(host, "rapl", int(math.Floor(currentCapValue+0.5))); err != nil {
fmt.Println(err) fmt.Println(err)
} else { } else {
fmt.Printf("Successfully capped %s to %f%\n", host, currentCapValue) fmt.Printf("Successfully capped %s to %f%\n", host, currentCapValue)
} }
} }
//mutex.Unlock() //mutex.Unlock()
//s.lock.Unlock() //s.lock.Unlock()
} }
} }
} }
}() }()
} }
// Stop cluster wide capping // Stop cluster wide capping
func (s *ProactiveClusterwideCapFCFS) stopCapping() { func (s *ProactiveClusterwideCapFCFS) stopCapping() {
if s.isCapping { if s.isCapping {
log.Println("Stopping the cluster wide capping.") log.Println("Stopping the cluster wide capping.")
s.ticker.Stop() s.ticker.Stop()
s.isCapping = false s.isCapping = false
} }
} }
// TODO: Need to reduce the time complexity: looping over offers twice (Possible to do it just once?). // TODO: Need to reduce the time complexity: looping over offers twice (Possible to do it just once?).
func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {
log.Printf("Received %d resource offers", len(offers)) log.Printf("Received %d resource offers", len(offers))
// retrieving the available power for all the hosts in the offers. // retrieving the available power for all the hosts in the offers.
for _, offer := range offers { for _, offer := range offers {
_, _, offer_watts := OfferAgg(offer) _, _, offer_watts := OfferAgg(offer)
s.availablePower[*offer.Hostname] = offer_watts s.availablePower[*offer.Hostname] = offer_watts
// setting total power if the first time. // setting total power if the first time.
if _, ok := s.totalPower[*offer.Hostname]; !ok { if _, ok := s.totalPower[*offer.Hostname]; !ok {
s.totalPower[*offer.Hostname] = offer_watts s.totalPower[*offer.Hostname] = offer_watts
} }
} }
for host, tpower := range s.totalPower { for host, tpower := range s.totalPower {
fmt.Printf("TotalPower[%s] = %f\n", host, tpower) fmt.Printf("TotalPower[%s] = %f\n", host, tpower)
} }
for host, apower := range s.availablePower { for host, apower := range s.availablePower {
fmt.Printf("AvailablePower[%s] = %f\n", host, apower) fmt.Printf("AvailablePower[%s] = %f\n", host, apower)
} }
for _, offer := range offers { for _, offer := range offers {
select { select {
case <-s.Shutdown: case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, longFilter) driver.DeclineOffer(offer.Id, longFilter)
log.Println("Number of tasks still running: ", s.tasksRunning) log.Println("Number of tasks still running: ", s.tasksRunning)
continue continue
default: default:
} }
/* /*
Clusterwide Capping strategy Clusterwide Capping strategy
For each task in s.tasks, For each task in s.tasks,
1. Need to check whether the offer can be taken or not (based on CPU and RAM requirements). 1. Need to check whether the offer can be taken or not (based on CPU and RAM requirements).
2. If the tasks fits the offer, then I need to detemrine the cluster wide cap. 2. If the tasks fits the offer, then I need to detemrine the cluster wide cap.
3. currentCapValue is updated with the determined cluster wide cap. 3. currentCapValue is updated with the determined cluster wide cap.
Cluster wide capping is currently performed at regular intervals of time. Cluster wide capping is currently performed at regular intervals of time.
TODO: We can choose to cap the cluster only if the clusterwide cap varies more than the current clusterwide cap. TODO: We can choose to cap the cluster only if the clusterwide cap varies more than the current clusterwide cap.
Although this sounds like a better approach, it only works when the resource requirements of neighbouring tasks are similar. Although this sounds like a better approach, it only works when the resource requirements of neighbouring tasks are similar.
*/ */
//offer_cpu, offer_ram, _ := OfferAgg(offer) //offer_cpu, offer_ram, _ := OfferAgg(offer)
taken := false taken := false
//var mutex sync.Mutex //var mutex sync.Mutex
for i, task := range s.tasks { for i, task := range s.tasks {
// Don't take offer if it doesn't match our task's host requirement. // Don't take offer if it doesn't match our task's host requirement.
if !strings.HasPrefix(*offer.Hostname, task.Host) { if !strings.HasPrefix(*offer.Hostname, task.Host) {
continue continue
} }
// Does the task fit. // Does the task fit.
if s.takeOffer(offer, task) { if s.takeOffer(offer, task) {
// Capping the cluster if haven't yet started, // Capping the cluster if haven't yet started,
if !s.isCapping { if !s.isCapping {
s.startCapping() s.startCapping()
s.isCapping = true s.isCapping = true
} }
taken = true taken = true
//mutex.Lock() //mutex.Lock()
//s.lock.Lock() //s.lock.Lock()
//tempCap, err := s.capper.fcfsDetermineCap(s.availablePower, &task) //tempCap, err := s.capper.fcfsDetermineCap(s.availablePower, &task)
tempCap, err := s.capper.fcfsDetermineCap(s.totalPower, &task) tempCap, err := s.capper.fcfsDetermineCap(s.totalPower, &task)
if err == nil { if err == nil {
currentCapValue = tempCap currentCapValue = tempCap
} else { } else {
fmt.Printf("Failed to determine new cluster wide cap: ") fmt.Printf("Failed to determine new cluster wide cap: ")
fmt.Println(err) fmt.Println(err)
} }
//mutex.Unlock() //mutex.Unlock()
//s.lock.Unlock() //s.lock.Unlock()
fmt.Printf("Starting on [%s]\n", offer.GetHostname()) fmt.Printf("Starting on [%s]\n", offer.GetHostname())
to_schedule := []*mesos.TaskInfo{s.newTask(offer, task)} to_schedule := []*mesos.TaskInfo{s.newTask(offer, task)}
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, to_schedule, defaultFilter) driver.LaunchTasks([]*mesos.OfferID{offer.Id}, to_schedule, defaultFilter)
fmt.Printf("Inst: %d", *task.Instances) fmt.Printf("Inst: %d", *task.Instances)
*task.Instances-- *task.Instances--
if *task.Instances <= 0 { if *task.Instances <= 0 {
// All instances of the task have been scheduled. Need to remove it from the list of tasks to schedule. // All instances of the task have been scheduled. Need to remove it from the list of tasks to schedule.
s.tasks[i] = s.tasks[len(s.tasks)-1] s.tasks[i] = s.tasks[len(s.tasks)-1]
s.tasks = s.tasks[:len(s.tasks)-1] s.tasks = s.tasks[:len(s.tasks)-1]
if len(s.tasks) <= 0 { if len(s.tasks) <= 0 {
log.Println("Done scheduling all tasks") log.Println("Done scheduling all tasks")
// Need to stop the cluster wide capping as there aren't any more tasks to schedule. // Need to stop the cluster wide capping as there aren't any more tasks to schedule.
s.stopCapping() s.stopCapping()
close(s.Shutdown) close(s.Shutdown)
} }
} }
break // Offer taken, move on. break // Offer taken, move on.
} else { } else {
// Task doesn't fit the offer. Move onto the next offer. // Task doesn't fit the offer. Move onto the next offer.
} }
} }
// If no task fit the offer, then declining the offer. // If no task fit the offer, then declining the offer.
if !taken { if !taken {
fmt.Printf("There is not enough resources to launch a task on Host: %s\n", offer.GetHostname()) fmt.Printf("There is not enough resources to launch a task on Host: %s\n", offer.GetHostname())
cpus, mem, watts := OfferAgg(offer) cpus, mem, watts := OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts) log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, defaultFilter) driver.DeclineOffer(offer.Id, defaultFilter)
} }
} }
} }
func (s *ProactiveClusterwideCapFCFS) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { func (s *ProactiveClusterwideCapFCFS) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) {
log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
if *status.State == mesos.TaskState_TASK_RUNNING { if *status.State == mesos.TaskState_TASK_RUNNING {
s.tasksRunning++ s.tasksRunning++
} else if IsTerminal(status.State) { } else if IsTerminal(status.State) {
delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value)
// Need to remove the task from the window of tasks. // Need to remove the task from the window of tasks.
s.capper.taskFinished(*status.TaskId.Value) s.capper.taskFinished(*status.TaskId.Value)
//currentCapValue, _ = s.capper.recap(s.availablePower, s.taskMonitor, *status.TaskId.Value) //currentCapValue, _ = s.capper.recap(s.availablePower, s.taskMonitor, *status.TaskId.Value)
// Determining the new cluster wide cap. // Determining the new cluster wide cap.
currentCapValue, _ = s.capper.recap(s.totalPower, s.taskMonitor, *status.TaskId.Value) currentCapValue, _ = s.capper.recap(s.totalPower, s.taskMonitor, *status.TaskId.Value)
log.Printf("Recapping the cluster to %f\n", currentCapValue) log.Printf("Recapping the cluster to %f\n", currentCapValue)
s.tasksRunning-- s.tasksRunning--
if s.tasksRunning == 0 { if s.tasksRunning == 0 {
select { select {
case <-s.Shutdown: case <-s.Shutdown:
// Need to stop the capping process. // Need to stop the capping process.
s.stopCapping() s.stopCapping()
close(s.Done) close(s.Done)
default: default:
} }
@ -322,20 +322,20 @@ func (s *ProactiveClusterwideCapFCFS) StatusUpdate(driver sched.SchedulerDriver,
} }
func (s *ProactiveClusterwideCapFCFS) FrameworkMessage(driver sched.SchedulerDriver, func (s *ProactiveClusterwideCapFCFS) FrameworkMessage(driver sched.SchedulerDriver,
executorID *mesos.ExecutorID, executorID *mesos.ExecutorID,
slaveID *mesos.SlaveID, slaveID *mesos.SlaveID,
message string) { message string) {
log.Println("Getting a framework message: ", message) log.Println("Getting a framework message: ", message)
log.Printf("Received a framework message from some unknown source: %s", *executorID.Value) log.Printf("Received a framework message from some unknown source: %s", *executorID.Value)
} }
func (s *ProactiveClusterwideCapFCFS) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) { func (s *ProactiveClusterwideCapFCFS) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) {
log.Printf("Offer %s rescinded", offerID) log.Printf("Offer %s rescinded", offerID)
} }
func (s *ProactiveClusterwideCapFCFS) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) { func (s *ProactiveClusterwideCapFCFS) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) {
log.Printf("Slave %s lost", slaveID) log.Printf("Slave %s lost", slaveID)
} }
func (s *ProactiveClusterwideCapFCFS) ExecutorLost(_ sched.SchedulerDriver, executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, status int) { func (s *ProactiveClusterwideCapFCFS) ExecutorLost(_ sched.SchedulerDriver, executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, status int) {