formatted the code

This commit is contained in:
Pradyumna Kaushik 2016-11-14 22:53:06 -05:00 committed by Renan DelValle
parent 4d13c432c4
commit b7394b8762
2 changed files with 397 additions and 396 deletions

View file

@ -11,51 +11,52 @@ This is not a scheduler but a scheduling scheme that schedulers can use.
package schedulers
import (
"bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/def"
"container/list"
"errors"
"github.com/montanaflynn/stats"
"sort"
"bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/def"
"container/list"
"errors"
"github.com/montanaflynn/stats"
"sort"
)
// Structure containing utility data structures used to compute cluster-wide dynamic cap.
type clusterwideCapper struct {
// window of tasks.
window_of_tasks list.List
// The current sum of requested powers of the tasks in the window.
current_sum float64
// The current number of tasks in the window.
number_of_tasks_in_window int
// window of tasks.
window_of_tasks list.List
// The current sum of requested powers of the tasks in the window.
current_sum float64
// The current number of tasks in the window.
number_of_tasks_in_window int
}
// Defining constructor for clusterwideCapper. Please don't call this directly and instead use getClusterwideCapperInstance().
func newClusterwideCapper() *clusterwideCapper {
return &clusterwideCapper{current_sum: 0.0, number_of_tasks_in_window: 0}
return &clusterwideCapper{current_sum: 0.0, number_of_tasks_in_window: 0}
}
// Singleton instance of clusterwideCapper
var singleton_capper *clusterwideCapper
// Retrieve the singleton instance of clusterwideCapper.
func getClusterwideCapperInstance() *clusterwideCapper {
if singleton_capper == nil {
singleton_capper = newClusterwideCapper()
} else {
// Do nothing
}
return singleton_capper
if singleton_capper == nil {
singleton_capper = newClusterwideCapper()
} else {
// Do nothing
}
return singleton_capper
}
// Clear and initialize all the members of clusterwideCapper.
func (capper clusterwideCapper) clear() {
capper.window_of_tasks.Init()
capper.current_sum = 0
capper.number_of_tasks_in_window = 0
capper.window_of_tasks.Init()
capper.current_sum = 0
capper.number_of_tasks_in_window = 0
}
// Compute the average of watts of all the tasks in the window.
func (capper clusterwideCapper) average() float64 {
return capper.current_sum / float64(capper.window_of_tasks.Len())
return capper.current_sum / float64(capper.window_of_tasks.Len())
}
/*
@ -65,22 +66,22 @@ Using clusterwideCapper#window_of_tasks to store the tasks.
Task at position 0 (oldest task) is removed when the window is full and new task arrives.
*/
func (capper clusterwideCapper) running_average_of_watts(tsk *def.Task) float64 {
var average float64
if capper.number_of_tasks_in_window < constants.Window_size {
capper.window_of_tasks.PushBack(tsk)
capper.number_of_tasks_in_window++
capper.current_sum += float64(tsk.Watts) * constants.Cap_margin
} else {
task_to_remove_element := capper.window_of_tasks.Front()
if task_to_remove, ok := task_to_remove_element.Value.(*def.Task); ok {
capper.current_sum -= float64(task_to_remove.Watts) * constants.Cap_margin
capper.window_of_tasks.Remove(task_to_remove_element)
}
capper.window_of_tasks.PushBack(tsk)
capper.current_sum += float64(tsk.Watts) * constants.Cap_margin
}
average = capper.average()
return average
var average float64
if capper.number_of_tasks_in_window < constants.Window_size {
capper.window_of_tasks.PushBack(tsk)
capper.number_of_tasks_in_window++
capper.current_sum += float64(tsk.Watts) * constants.Cap_margin
} else {
task_to_remove_element := capper.window_of_tasks.Front()
if task_to_remove, ok := task_to_remove_element.Value.(*def.Task); ok {
capper.current_sum -= float64(task_to_remove.Watts) * constants.Cap_margin
capper.window_of_tasks.Remove(task_to_remove_element)
}
capper.window_of_tasks.PushBack(tsk)
capper.current_sum += float64(tsk.Watts) * constants.Cap_margin
}
average = capper.average()
return average
}
/*
@ -91,22 +92,22 @@ Calculating cap value.
3. The median is now the cap.
*/
func (capper clusterwideCapper) get_cap(running_average_to_total_power_percentage map[string]float64) float64 {
var values []float64
// Validation
if running_average_to_total_power_percentage == nil {
return 100.0
}
for _, apower := range running_average_to_total_power_percentage {
values = append(values, apower)
}
// sorting the values in ascending order.
sort.Float64s(values)
// Calculating the median
if median, err := stats.Median(values); err == nil {
return median
}
// should never reach here. If here, then just setting the cap value to be 100
return 100.0
var values []float64
// Validation
if running_average_to_total_power_percentage == nil {
return 100.0
}
for _, apower := range running_average_to_total_power_percentage {
values = append(values, apower)
}
// sorting the values in ascending order.
sort.Float64s(values)
// Calculating the median
if median, err := stats.Median(values); err == nil {
return median
}
// should never reach here. If here, then just setting the cap value to be 100
return 100.0
}
/*
@ -120,72 +121,72 @@ Recapping the entire cluster.
This needs to be called whenever a task finishes execution.
*/
func (capper clusterwideCapper) recap(total_power map[string]float64,
task_monitor map[string][]def.Task, finished_taskId string) (float64, error) {
// Validation
if total_power == nil || task_monitor == nil {
return 100.0, errors.New("Invalid argument: total_power, task_monitor")
}
total_allocated_power := 0.0
total_running_tasks := 0
for _, tasks := range task_monitor {
index := 0
for i, task := range tasks {
if task.TaskID == finished_taskId {
index = i
continue
}
total_allocated_power += float64(task.Watts) * constants.Cap_margin
total_running_tasks++
}
tasks = append(tasks[:index], tasks[index+1:]...)
}
average := total_allocated_power / float64(total_running_tasks)
ratios := []float64{}
for _, tpower := range total_power {
ratios = append(ratios, (average/tpower) * 100)
}
sort.Float64s(ratios)
median, err := stats.Median(ratios)
if err == nil {
return median, nil
} else {
return 100, err
}
task_monitor map[string][]def.Task, finished_taskId string) (float64, error) {
// Validation
if total_power == nil || task_monitor == nil {
return 100.0, errors.New("Invalid argument: total_power, task_monitor")
}
total_allocated_power := 0.0
total_running_tasks := 0
for _, tasks := range task_monitor {
index := 0
for i, task := range tasks {
if task.TaskID == finished_taskId {
index = i
continue
}
total_allocated_power += float64(task.Watts) * constants.Cap_margin
total_running_tasks++
}
tasks = append(tasks[:index], tasks[index+1:]...)
}
average := total_allocated_power / float64(total_running_tasks)
ratios := []float64{}
for _, tpower := range total_power {
ratios = append(ratios, (average/tpower)*100)
}
sort.Float64s(ratios)
median, err := stats.Median(ratios)
if err == nil {
return median, nil
} else {
return 100, err
}
}
/* Quick sort algorithm to sort tasks, in place, in ascending order of power.*/
func (capper clusterwideCapper) quick_sort(low int, high int, tasks_to_sort []*def.Task) {
i := low
j := high
// calculating the pivot
pivot_index := low + (high - low)/2
pivot := tasks_to_sort[pivot_index]
for i <= j {
for tasks_to_sort[i].Watts < pivot.Watts {
i++
}
for tasks_to_sort[j].Watts > pivot.Watts {
j--
}
if i <= j {
temp := tasks_to_sort[i]
tasks_to_sort[i] = tasks_to_sort[j]
tasks_to_sort[j] = temp
i++
j--
}
}
if low < j {
capper.quick_sort(low, j, tasks_to_sort)
}
if i < high {
capper.quick_sort(i, high, tasks_to_sort)
}
i := low
j := high
// calculating the pivot
pivot_index := low + (high-low)/2
pivot := tasks_to_sort[pivot_index]
for i <= j {
for tasks_to_sort[i].Watts < pivot.Watts {
i++
}
for tasks_to_sort[j].Watts > pivot.Watts {
j--
}
if i <= j {
temp := tasks_to_sort[i]
tasks_to_sort[i] = tasks_to_sort[j]
tasks_to_sort[j] = temp
i++
j--
}
}
if low < j {
capper.quick_sort(low, j, tasks_to_sort)
}
if i < high {
capper.quick_sort(i, high, tasks_to_sort)
}
}
// Sorting tasks in ascending order of requested watts.
func (capper clusterwideCapper) sort_tasks(tasks_to_sort []*def.Task) {
capper.quick_sort(0, len(tasks_to_sort)-1, tasks_to_sort)
capper.quick_sort(0, len(tasks_to_sort)-1, tasks_to_sort)
}
/*
@ -195,86 +196,86 @@ This completed task needs to be removed from the window of tasks (if it is still
so that it doesn't contribute to the computation of the cap value.
*/
func (capper clusterwideCapper) taskFinished(taskID string) {
// If the window is empty the just return. This condition should technically return false.
if capper.window_of_tasks.Len() == 0 {
return
}
// If the window is empty the just return. This condition should technically return false.
if capper.window_of_tasks.Len() == 0 {
return
}
// Checking whether the task with the given taskID is currently present in the window of tasks.
var task_element_to_remove *list.Element
for task_element := capper.window_of_tasks.Front(); task_element != nil; task_element = task_element.Next() {
if tsk, ok := task_element.Value.(*def.Task); ok {
if tsk.TaskID == taskID {
task_element_to_remove = task_element
}
}
}
// Checking whether the task with the given taskID is currently present in the window of tasks.
var task_element_to_remove *list.Element
for task_element := capper.window_of_tasks.Front(); task_element != nil; task_element = task_element.Next() {
if tsk, ok := task_element.Value.(*def.Task); ok {
if tsk.TaskID == taskID {
task_element_to_remove = task_element
}
}
}
// Ee need to remove the task from the window.
if task_to_remove, ok := task_element_to_remove.Value.(*def.Task); ok {
capper.window_of_tasks.Remove(task_element_to_remove)
capper.number_of_tasks_in_window -= 1
capper.current_sum -= float64(task_to_remove.Watts) * constants.Cap_margin
}
// Ee need to remove the task from the window.
if task_to_remove, ok := task_element_to_remove.Value.(*def.Task); ok {
capper.window_of_tasks.Remove(task_element_to_remove)
capper.number_of_tasks_in_window -= 1
capper.current_sum -= float64(task_to_remove.Watts) * constants.Cap_margin
}
}
// Ranked based scheduling.
func (capper clusterwideCapper) rankedDetermineCap(available_power map[string]float64,
tasks_to_schedule []*def.Task) ([]*def.Task, map[int]float64, error) {
// Validation
if available_power == nil || len(tasks_to_schedule) == 0 {
return nil, nil, errors.New("Invalid argument: available_power, tasks_to_schedule")
} else {
// Need to sort the tasks in ascending order of requested power.
capper.sort_tasks(tasks_to_schedule)
tasks_to_schedule []*def.Task) ([]*def.Task, map[int]float64, error) {
// Validation
if available_power == nil || len(tasks_to_schedule) == 0 {
return nil, nil, errors.New("Invalid argument: available_power, tasks_to_schedule")
} else {
// Need to sort the tasks in ascending order of requested power.
capper.sort_tasks(tasks_to_schedule)
// Now, for each task in the sorted set of tasks, we need to use the Fcfs_determine_cap logic.
cluster_wide_cap_values := make(map[int]float64)
index := 0
for _, tsk := range tasks_to_schedule {
/*
Note that even though Fcfs_determine_cap is called, we have sorted the tasks aprior and thus, the tasks are scheduled in the sorted fashion.
Calling Fcfs_determine_cap(...) just to avoid redundant code.
*/
if cap, err := capper.fcfsDetermineCap(available_power, tsk); err == nil {
cluster_wide_cap_values[index] = cap
} else {
return nil, nil, err
}
index++
}
// Now returning the sorted set of tasks and the cluster wide cap values for each task that is launched.
return tasks_to_schedule, cluster_wide_cap_values, nil
}
// Now, for each task in the sorted set of tasks, we need to use the Fcfs_determine_cap logic.
cluster_wide_cap_values := make(map[int]float64)
index := 0
for _, tsk := range tasks_to_schedule {
/*
Note that even though Fcfs_determine_cap is called, we have sorted the tasks aprior and thus, the tasks are scheduled in the sorted fashion.
Calling Fcfs_determine_cap(...) just to avoid redundant code.
*/
if cap, err := capper.fcfsDetermineCap(available_power, tsk); err == nil {
cluster_wide_cap_values[index] = cap
} else {
return nil, nil, err
}
index++
}
// Now returning the sorted set of tasks and the cluster wide cap values for each task that is launched.
return tasks_to_schedule, cluster_wide_cap_values, nil
}
}
// First come first serve scheduling.
func (capper clusterwideCapper) fcfsDetermineCap(total_power map[string]float64,
new_task *def.Task) (float64, error) {
// Validation
if total_power == nil {
return 100, errors.New("Invalid argument: total_power")
} else {
// Need to calculate the running average
running_average := capper.running_average_of_watts(new_task)
// For each node, calculate the percentage of the running average to the total power.
running_average_to_total_power_percentage := make(map[string]float64)
for host, tpower := range total_power {
if tpower >= running_average {
running_average_to_total_power_percentage[host] = (running_average/tpower) * 100
} else {
// We don't consider this host for the computation of the cluster wide cap.
}
}
new_task *def.Task) (float64, error) {
// Validation
if total_power == nil {
return 100, errors.New("Invalid argument: total_power")
} else {
// Need to calculate the running average
running_average := capper.running_average_of_watts(new_task)
// For each node, calculate the percentage of the running average to the total power.
running_average_to_total_power_percentage := make(map[string]float64)
for host, tpower := range total_power {
if tpower >= running_average {
running_average_to_total_power_percentage[host] = (running_average / tpower) * 100
} else {
// We don't consider this host for the computation of the cluster wide cap.
}
}
// Determine the cluster wide cap value.
cap_value := capper.get_cap(running_average_to_total_power_percentage)
// Need to cap the cluster to this value.
return cap_value, nil
}
// Determine the cluster wide cap value.
cap_value := capper.get_cap(running_average_to_total_power_percentage)
// Need to cap the cluster to this value.
return cap_value, nil
}
}
// Stringer for an instance of clusterwideCapper
func (capper clusterwideCapper) string() string {
return "Cluster Capper -- Proactively cap the entire cluster."
return "Cluster Capper -- Proactively cap the entire cluster."
}

View file

@ -1,111 +1,111 @@
package schedulers
import (
"bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/rapl"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler"
"log"
"math"
"strings"
"time"
"bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/rapl"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler"
"log"
"math"
"strings"
"time"
)
// Decides if to take an offer or not
func (_ *ProactiveClusterwideCapFCFS) takeOffer(offer *mesos.Offer, task def.Task) bool {
offer_cpu, offer_mem, _ := OfferAgg(offer)
offer_cpu, offer_mem, _ := OfferAgg(offer)
if offer_cpu >= task.CPU && offer_mem >= task.RAM {
return true
}
return false
if offer_cpu >= task.CPU && offer_mem >= task.RAM {
return true
}
return false
}
// electronScheduler implements the Scheduler interface.
type ProactiveClusterwideCapFCFS struct {
tasksCreated int
tasksRunning int
tasks []def.Task
metrics map[string]def.Metric
running map[string]map[string]bool
taskMonitor map[string][]def.Task // store tasks that are currently running.
availablePower map[string]float64 // available power for each node in the cluster.
totalPower map[string]float64 // total power for each node in the cluster.
ignoreWatts bool
capper *clusterwideCapper
ticker *time.Ticker
isCapping bool // indicate whether we are currently performing cluster wide capping.
//lock *sync.Mutex
tasksCreated int
tasksRunning int
tasks []def.Task
metrics map[string]def.Metric
running map[string]map[string]bool
taskMonitor map[string][]def.Task // store tasks that are currently running.
availablePower map[string]float64 // available power for each node in the cluster.
totalPower map[string]float64 // total power for each node in the cluster.
ignoreWatts bool
capper *clusterwideCapper
ticker *time.Ticker
isCapping bool // indicate whether we are currently performing cluster wide capping.
//lock *sync.Mutex
// First set of PCP values are garbage values, signal to logger to start recording when we're
// about to schedule the new task.
RecordPCP bool
// First set of PCP values are garbage values, signal to logger to start recording when we're
// about to schedule the new task.
RecordPCP bool
// This channel is closed when the program receives an interrupt,
// signalling that the program should shut down.
Shutdown chan struct{}
// This channel is closed when the program receives an interrupt,
// signalling that the program should shut down.
Shutdown chan struct{}
// This channel is closed after shutdown is closed, and only when all
// outstanding tasks have been cleaned up.
Done chan struct{}
// This channel is closed after shutdown is closed, and only when all
// outstanding tasks have been cleaned up.
Done chan struct{}
// Controls when to shutdown pcp logging.
PCPLog chan struct{}
// Controls when to shutdown pcp logging.
PCPLog chan struct{}
}
// New electron scheduler.
func NewProactiveClusterwideCapFCFS(tasks []def.Task, ignoreWatts bool) *ProactiveClusterwideCapFCFS {
s := &ProactiveClusterwideCapFCFS {
tasks: tasks,
ignoreWatts: ignoreWatts,
Shutdown: make(chan struct{}),
Done: make(chan struct{}),
PCPLog: make(chan struct{}),
running: make(map[string]map[string]bool),
taskMonitor: make(map[string][]def.Task),
availablePower: make(map[string]float64),
totalPower: make(map[string]float64),
RecordPCP: false,
capper: getClusterwideCapperInstance(),
ticker: time.NewTicker(5 * time.Second),
isCapping: false,
//lock: new(sync.Mutex),
}
return s
s := &ProactiveClusterwideCapFCFS{
tasks: tasks,
ignoreWatts: ignoreWatts,
Shutdown: make(chan struct{}),
Done: make(chan struct{}),
PCPLog: make(chan struct{}),
running: make(map[string]map[string]bool),
taskMonitor: make(map[string][]def.Task),
availablePower: make(map[string]float64),
totalPower: make(map[string]float64),
RecordPCP: false,
capper: getClusterwideCapperInstance(),
ticker: time.NewTicker(5 * time.Second),
isCapping: false,
//lock: new(sync.Mutex),
}
return s
}
func (s *ProactiveClusterwideCapFCFS) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
s.tasksCreated++
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
s.tasksCreated++
if !s.RecordPCP {
// Turn on logging.
s.RecordPCP = true
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
}
if !s.RecordPCP {
// Turn on logging.
s.RecordPCP = true
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
}
// If this is our first time running into this Agent
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
}
// If this is our first time running into this Agent
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
}
// Setting the task ID to the task. This is done so that we can consider each task to be different,
// even though they have the same parameters.
task.SetTaskID(*proto.String(taskName))
// Add task to the list of tasks running on the node.
s.running[offer.GetSlaveId().GoString()][taskName] = true
s.taskMonitor[offer.GetSlaveId().GoString()] = []def.Task{task}
// Setting the task ID to the task. This is done so that we can consider each task to be different,
// even though they have the same parameters.
task.SetTaskID(*proto.String(taskName))
// Add task to the list of tasks running on the node.
s.running[offer.GetSlaveId().GoString()][taskName] = true
s.taskMonitor[offer.GetSlaveId().GoString()] = []def.Task{task}
resources := []*mesos.Resource{
mesosutil.NewScalarResource("cpus", task.CPU),
mesosutil.NewScalarResource("mem", task.RAM),
}
resources := []*mesos.Resource{
mesosutil.NewScalarResource("cpus", task.CPU),
mesosutil.NewScalarResource("mem", task.RAM),
}
if !s.ignoreWatts {
if !s.ignoreWatts {
resources = append(resources, mesosutil.NewScalarResource("watts", task.Watts))
}
@ -130,189 +130,189 @@ func (s *ProactiveClusterwideCapFCFS) newTask(offer *mesos.Offer, task def.Task)
}
func (s *ProactiveClusterwideCapFCFS) Registered(
_ sched.SchedulerDriver,
frameworkID *mesos.FrameworkID,
masterInfo *mesos.MasterInfo) {
log.Printf("Framework %s registered with master %s", frameworkID, masterInfo)
_ sched.SchedulerDriver,
frameworkID *mesos.FrameworkID,
masterInfo *mesos.MasterInfo) {
log.Printf("Framework %s registered with master %s", frameworkID, masterInfo)
}
func (s *ProactiveClusterwideCapFCFS) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) {
log.Printf("Framework re-registered with master %s", masterInfo)
log.Printf("Framework re-registered with master %s", masterInfo)
}
func (s *ProactiveClusterwideCapFCFS) Disconnected(sched.SchedulerDriver) {
// Need to stop the capping process.
s.ticker.Stop()
s.isCapping = false
log.Println("Framework disconnected with master")
// Need to stop the capping process.
s.ticker.Stop()
s.isCapping = false
log.Println("Framework disconnected with master")
}
// go routine to cap the entire cluster in regular intervals of time.
var currentCapValue = 0.0 // initial value to indicate that we haven't capped the cluster yet.
func (s *ProactiveClusterwideCapFCFS) startCapping() {
go func() {
for {
select {
case <- s.ticker.C:
// Need to cap the cluster to the currentCapValue.
if currentCapValue > 0.0 {
//mutex.Lock()
//s.lock.Lock()
for _, host := range constants.Hosts {
// Rounding curreCapValue to the nearest int.
if err := rapl.Cap(host, "rapl", int(math.Floor(currentCapValue + 0.5))); err != nil {
fmt.Println(err)
} else {
fmt.Printf("Successfully capped %s to %f%\n", host, currentCapValue)
}
}
//mutex.Unlock()
//s.lock.Unlock()
}
}
}
}()
go func() {
for {
select {
case <-s.ticker.C:
// Need to cap the cluster to the currentCapValue.
if currentCapValue > 0.0 {
//mutex.Lock()
//s.lock.Lock()
for _, host := range constants.Hosts {
// Rounding curreCapValue to the nearest int.
if err := rapl.Cap(host, "rapl", int(math.Floor(currentCapValue+0.5))); err != nil {
fmt.Println(err)
} else {
fmt.Printf("Successfully capped %s to %f%\n", host, currentCapValue)
}
}
//mutex.Unlock()
//s.lock.Unlock()
}
}
}
}()
}
// Stop cluster wide capping
func (s *ProactiveClusterwideCapFCFS) stopCapping() {
if s.isCapping {
log.Println("Stopping the cluster wide capping.")
s.ticker.Stop()
s.isCapping = false
}
if s.isCapping {
log.Println("Stopping the cluster wide capping.")
s.ticker.Stop()
s.isCapping = false
}
}
// TODO: Need to reduce the time complexity: looping over offers twice (Possible to do it just once?).
func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {
log.Printf("Received %d resource offers", len(offers))
log.Printf("Received %d resource offers", len(offers))
// retrieving the available power for all the hosts in the offers.
for _, offer := range offers {
_, _, offer_watts := OfferAgg(offer)
s.availablePower[*offer.Hostname] = offer_watts
// setting total power if the first time.
if _, ok := s.totalPower[*offer.Hostname]; !ok {
s.totalPower[*offer.Hostname] = offer_watts
}
}
// retrieving the available power for all the hosts in the offers.
for _, offer := range offers {
_, _, offer_watts := OfferAgg(offer)
s.availablePower[*offer.Hostname] = offer_watts
// setting total power if the first time.
if _, ok := s.totalPower[*offer.Hostname]; !ok {
s.totalPower[*offer.Hostname] = offer_watts
}
}
for host, tpower := range s.totalPower {
fmt.Printf("TotalPower[%s] = %f\n", host, tpower)
}
for host, apower := range s.availablePower {
fmt.Printf("AvailablePower[%s] = %f\n", host, apower)
}
for host, tpower := range s.totalPower {
fmt.Printf("TotalPower[%s] = %f\n", host, tpower)
}
for host, apower := range s.availablePower {
fmt.Printf("AvailablePower[%s] = %f\n", host, apower)
}
for _, offer := range offers {
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, longFilter)
for _, offer := range offers {
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, longFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
default:
}
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
default:
}
/*
Clusterwide Capping strategy
/*
Clusterwide Capping strategy
For each task in s.tasks,
1. Need to check whether the offer can be taken or not (based on CPU and RAM requirements).
2. If the tasks fits the offer, then I need to detemrine the cluster wide cap.
3. currentCapValue is updated with the determined cluster wide cap.
For each task in s.tasks,
1. Need to check whether the offer can be taken or not (based on CPU and RAM requirements).
2. If the tasks fits the offer, then I need to detemrine the cluster wide cap.
3. currentCapValue is updated with the determined cluster wide cap.
Cluster wide capping is currently performed at regular intervals of time.
TODO: We can choose to cap the cluster only if the clusterwide cap varies more than the current clusterwide cap.
Although this sounds like a better approach, it only works when the resource requirements of neighbouring tasks are similar.
*/
//offer_cpu, offer_ram, _ := OfferAgg(offer)
Cluster wide capping is currently performed at regular intervals of time.
TODO: We can choose to cap the cluster only if the clusterwide cap varies more than the current clusterwide cap.
Although this sounds like a better approach, it only works when the resource requirements of neighbouring tasks are similar.
*/
//offer_cpu, offer_ram, _ := OfferAgg(offer)
taken := false
//var mutex sync.Mutex
taken := false
//var mutex sync.Mutex
for i, task := range s.tasks {
// Don't take offer if it doesn't match our task's host requirement.
if !strings.HasPrefix(*offer.Hostname, task.Host) {
continue
}
for i, task := range s.tasks {
// Don't take offer if it doesn't match our task's host requirement.
if !strings.HasPrefix(*offer.Hostname, task.Host) {
continue
}
// Does the task fit.
if s.takeOffer(offer, task) {
// Capping the cluster if haven't yet started,
if !s.isCapping {
s.startCapping()
s.isCapping = true
}
taken = true
//mutex.Lock()
//s.lock.Lock()
//tempCap, err := s.capper.fcfsDetermineCap(s.availablePower, &task)
tempCap, err := s.capper.fcfsDetermineCap(s.totalPower, &task)
// Does the task fit.
if s.takeOffer(offer, task) {
// Capping the cluster if haven't yet started,
if !s.isCapping {
s.startCapping()
s.isCapping = true
}
taken = true
//mutex.Lock()
//s.lock.Lock()
//tempCap, err := s.capper.fcfsDetermineCap(s.availablePower, &task)
tempCap, err := s.capper.fcfsDetermineCap(s.totalPower, &task)
if err == nil {
currentCapValue = tempCap
} else {
fmt.Printf("Failed to determine new cluster wide cap: ")
fmt.Println(err)
}
//mutex.Unlock()
//s.lock.Unlock()
fmt.Printf("Starting on [%s]\n", offer.GetHostname())
to_schedule := []*mesos.TaskInfo{s.newTask(offer, task)}
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, to_schedule, defaultFilter)
fmt.Printf("Inst: %d", *task.Instances)
*task.Instances--
if *task.Instances <= 0 {
// All instances of the task have been scheduled. Need to remove it from the list of tasks to schedule.
s.tasks[i] = s.tasks[len(s.tasks)-1]
if err == nil {
currentCapValue = tempCap
} else {
fmt.Printf("Failed to determine new cluster wide cap: ")
fmt.Println(err)
}
//mutex.Unlock()
//s.lock.Unlock()
fmt.Printf("Starting on [%s]\n", offer.GetHostname())
to_schedule := []*mesos.TaskInfo{s.newTask(offer, task)}
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, to_schedule, defaultFilter)
fmt.Printf("Inst: %d", *task.Instances)
*task.Instances--
if *task.Instances <= 0 {
// All instances of the task have been scheduled. Need to remove it from the list of tasks to schedule.
s.tasks[i] = s.tasks[len(s.tasks)-1]
s.tasks = s.tasks[:len(s.tasks)-1]
if len(s.tasks) <= 0 {
log.Println("Done scheduling all tasks")
// Need to stop the cluster wide capping as there aren't any more tasks to schedule.
s.stopCapping()
// Need to stop the cluster wide capping as there aren't any more tasks to schedule.
s.stopCapping()
close(s.Shutdown)
}
}
break // Offer taken, move on.
} else {
// Task doesn't fit the offer. Move onto the next offer.
}
}
}
break // Offer taken, move on.
} else {
// Task doesn't fit the offer. Move onto the next offer.
}
}
// If no task fit the offer, then declining the offer.
if !taken {
fmt.Printf("There is not enough resources to launch a task on Host: %s\n", offer.GetHostname())
cpus, mem, watts := OfferAgg(offer)
// If no task fit the offer, then declining the offer.
if !taken {
fmt.Printf("There is not enough resources to launch a task on Host: %s\n", offer.GetHostname())
cpus, mem, watts := OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, defaultFilter)
}
}
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, defaultFilter)
}
}
}
func (s *ProactiveClusterwideCapFCFS) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) {
log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
if *status.State == mesos.TaskState_TASK_RUNNING {
s.tasksRunning++
} else if IsTerminal(status.State) {
delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value)
// Need to remove the task from the window of tasks.
s.capper.taskFinished(*status.TaskId.Value)
//currentCapValue, _ = s.capper.recap(s.availablePower, s.taskMonitor, *status.TaskId.Value)
// Determining the new cluster wide cap.
currentCapValue, _ = s.capper.recap(s.totalPower, s.taskMonitor, *status.TaskId.Value)
log.Printf("Recapping the cluster to %f\n", currentCapValue)
// Need to remove the task from the window of tasks.
s.capper.taskFinished(*status.TaskId.Value)
//currentCapValue, _ = s.capper.recap(s.availablePower, s.taskMonitor, *status.TaskId.Value)
// Determining the new cluster wide cap.
currentCapValue, _ = s.capper.recap(s.totalPower, s.taskMonitor, *status.TaskId.Value)
log.Printf("Recapping the cluster to %f\n", currentCapValue)
s.tasksRunning--
if s.tasksRunning == 0 {
select {
case <-s.Shutdown:
// Need to stop the capping process.
s.stopCapping()
// Need to stop the capping process.
s.stopCapping()
close(s.Done)
default:
}
@ -322,20 +322,20 @@ func (s *ProactiveClusterwideCapFCFS) StatusUpdate(driver sched.SchedulerDriver,
}
func (s *ProactiveClusterwideCapFCFS) FrameworkMessage(driver sched.SchedulerDriver,
executorID *mesos.ExecutorID,
slaveID *mesos.SlaveID,
message string) {
executorID *mesos.ExecutorID,
slaveID *mesos.SlaveID,
message string) {
log.Println("Getting a framework message: ", message)
log.Printf("Received a framework message from some unknown source: %s", *executorID.Value)
}
func (s *ProactiveClusterwideCapFCFS) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) {
log.Printf("Offer %s rescinded", offerID)
log.Printf("Offer %s rescinded", offerID)
}
func (s *ProactiveClusterwideCapFCFS) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) {
log.Printf("Slave %s lost", slaveID)
log.Printf("Slave %s lost", slaveID)
}
func (s *ProactiveClusterwideCapFCFS) ExecutorLost(_ sched.SchedulerDriver, executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, status int) {