Sycnrhonized operations that change the value of the cluster wide cap. Added cleverRecap(...) that determines the recap value of the cluster at a much finer level, taking into account the average load on each node in the cluster. Bug fix in cap.go -- closed the session once capping had been done. This prevented from running out of file descriptors.

This commit is contained in:
Pradyumna Kaushik 2016-11-17 21:51:02 -05:00 committed by Renan DelValle
parent cd644bbf69
commit c1eaa453a2
4 changed files with 194 additions and 68 deletions

View file

@ -34,7 +34,7 @@ var Power_threshold = 0.6 // Right now saying that a task will never be given le
So, if power required = 10W, the node would be capped to 75%*10W.
This value can be changed upon convenience.
*/
var Cap_margin = 0.75
var Cap_margin = 0.70
// Modify the cap margin.
func UpdateCapMargin(new_cap_margin float64) bool {
@ -84,20 +84,4 @@ func UpdateWindowSize(new_window_size int) bool {
Window_size = new_window_size
return true
}
}
// // Time duration between successive cluster wide capping.
// var Clusterwide_cap_interval = 10 // Right now capping the cluster at 10 second intervals.
//
// // Modify the cluster wide capping interval. We can update the interval depending on the workload.
// // TODO: If the workload is heavy then we can set a longer interval, while on the other hand,
// // if the workload is light then a smaller interval is sufficient.
// func UpdateClusterwideCapInterval(new_interval int) bool {
// // Validation
// if new_interval == 0.0 {
// return false
// } else {
// Clusterwide_cap_interval = new_interval
// return true
// }
// }
}

View file

@ -26,6 +26,7 @@ func Cap(host, username string, percentage int) error {
}
session, err := connection.NewSession()
defer session.Close()
if err != nil {
return errors.Wrap(err, "Failed to create session")
}

View file

@ -16,6 +16,7 @@ import (
"container/list"
"errors"
"github.com/montanaflynn/stats"
"log"
"sort"
)
@ -110,6 +111,68 @@ func (capper clusterwideCapper) get_cap(running_average_to_total_power_percentag
return 100.0
}
/*
Recapping the entire cluster. Also, removing the finished task from the list of running tasks.
We would, at this point, have a better knowledge about the state of the cluster.
1. Calculate the total allocated watts per node in the cluster.
2. Compute the ratio of the total watts usage per node to the total power for that node.
This would give us the load on that node.
3. Now, compute the average load across all the nodes in the cluster.
This would be the cap value.
*/
func (capper clusterwideCapper) cleverRecap(total_power map[string]float64,
task_monitor map[string][]def.Task, finished_taskId string) (float64, error) {
// Validation
if total_power == nil || task_monitor == nil {
return 100.0, errors.New("Invalid argument: total_power, task_monitor")
}
// watts usage on each node in the cluster.
watts_usages := make(map[string][]float64)
host_of_finished_task := ""
index_of_finished_task := -1
for _, host := range constants.Hosts {
watts_usages[host] = []float64{0.0}
}
for host, tasks := range task_monitor {
for i, task := range tasks {
if task.TaskID == finished_taskId {
host_of_finished_task = host
index_of_finished_task = i
// Not considering this task
continue
}
watts_usages[host] = append(watts_usages[host], float64(task.Watts) * constants.Cap_margin)
}
}
// Updating task monitor
if host_of_finished_task != "" && index_of_finished_task != -1 {
log.Printf("Removing task with task [%s] from the list of running tasks\n",
task_monitor[host_of_finished_task][index_of_finished_task].TaskID)
task_monitor[host_of_finished_task] = append(task_monitor[host_of_finished_task][:index_of_finished_task],
task_monitor[host_of_finished_task][index_of_finished_task+1:]...)
}
// load on each node in the cluster.
loads := []float64{}
for host, usages := range watts_usages {
total_usage := 0.0
for _, usage := range usages {
total_usage += usage
}
loads = append(loads, total_usage / total_power[host])
}
// Now need to compute the average load.
total_load := 0.0
for _, load := range loads {
total_load += load
}
average_load := total_load / float64(len(loads)) // this would be the cap value.
return average_load, nil
}
/*
Recapping the entire cluster.
@ -128,18 +191,35 @@ func (capper clusterwideCapper) recap(total_power map[string]float64,
}
total_allocated_power := 0.0
total_running_tasks := 0
for _, tasks := range task_monitor {
index := 0
for i, task := range tasks {
if task.TaskID == finished_taskId {
index = i
continue
}
total_allocated_power += float64(task.Watts) * constants.Cap_margin
total_running_tasks++
}
tasks = append(tasks[:index], tasks[index+1:]...)
}
host_of_finished_task := ""
index_of_finished_task := -1
for host, tasks := range task_monitor {
for i, task := range tasks {
if task.TaskID == finished_taskId {
host_of_finished_task = host
index_of_finished_task = i
// Not considering this task for the computation of total_allocated_power and total_running_tasks
continue
}
total_allocated_power += (float64(task.Watts) * constants.Cap_margin)
total_running_tasks++
}
}
// Updating task monitor
if host_of_finished_task != "" && index_of_finished_task != -1 {
log.Printf("Removing task with task [%s] from the list of running tasks\n",
task_monitor[host_of_finished_task][index_of_finished_task].TaskID)
task_monitor[host_of_finished_task] = append(task_monitor[host_of_finished_task][:index_of_finished_task],
task_monitor[host_of_finished_task][index_of_finished_task+1:]...)
}
// For the last task, total_allocated_power and total_running_tasks would be 0
if total_allocated_power == 0 && total_running_tasks == 0 {
return 100, errors.New("No task running on the cluster.")
}
average := total_allocated_power / float64(total_running_tasks)
ratios := []float64{}
for _, tpower := range total_power {
@ -211,7 +291,7 @@ func (capper clusterwideCapper) taskFinished(taskID string) {
}
}
// Ee need to remove the task from the window.
// we need to remove the task from the window.
if task_to_remove, ok := task_element_to_remove.Value.(*def.Task); ok {
capper.window_of_tasks.Remove(task_element_to_remove)
capper.number_of_tasks_in_window -= 1

View file

@ -12,14 +12,15 @@ import (
"log"
"math"
"strings"
"sync"
"time"
)
// Decides if to take an offer or not
func (_ *ProactiveClusterwideCapFCFS) takeOffer(offer *mesos.Offer, task def.Task) bool {
offer_cpu, offer_mem, _ := OfferAgg(offer)
offer_cpu, offer_mem, offer_watts := OfferAgg(offer)
if offer_cpu >= task.CPU && offer_mem >= task.RAM {
if offer_cpu >= task.CPU && offer_mem >= task.RAM && offer_watts >= task.Watts {
return true
}
return false
@ -38,8 +39,9 @@ type ProactiveClusterwideCapFCFS struct {
ignoreWatts bool
capper *clusterwideCapper
ticker *time.Ticker
recapTicker *time.Ticker
isCapping bool // indicate whether we are currently performing cluster wide capping.
//lock *sync.Mutex
isRecapping bool // indicate whether we are currently performing cluster wide re-capping.
// First set of PCP values are garbage values, signal to logger to start recording when we're
// about to schedule the new task.
@ -71,13 +73,17 @@ func NewProactiveClusterwideCapFCFS(tasks []def.Task, ignoreWatts bool) *Proacti
totalPower: make(map[string]float64),
RecordPCP: false,
capper: getClusterwideCapperInstance(),
ticker: time.NewTicker(5 * time.Second),
ticker: time.NewTicker(10 * time.Second),
recapTicker: time.NewTicker(20 * time.Second),
isCapping: false,
//lock: new(sync.Mutex),
isRecapping: false,
}
return s
}
// mutex
var mutex sync.Mutex
func (s *ProactiveClusterwideCapFCFS) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
s.tasksCreated++
@ -95,10 +101,14 @@ func (s *ProactiveClusterwideCapFCFS) newTask(offer *mesos.Offer, task def.Task)
// Setting the task ID to the task. This is done so that we can consider each task to be different,
// even though they have the same parameters.
task.SetTaskID(*proto.String(taskName))
task.SetTaskID(*proto.String("electron-" + taskName))
// Add task to the list of tasks running on the node.
s.running[offer.GetSlaveId().GoString()][taskName] = true
s.taskMonitor[offer.GetSlaveId().GoString()] = []def.Task{task}
if len(s.taskMonitor[offer.GetSlaveId().GoString()]) == 0 {
s.taskMonitor[offer.GetSlaveId().GoString()] = []def.Task{task}
} else {
s.taskMonitor[offer.GetSlaveId().GoString()] = append(s.taskMonitor[offer.GetSlaveId().GoString()], task)
}
resources := []*mesos.Resource{
mesosutil.NewScalarResource("cpus", task.CPU),
@ -143,7 +153,10 @@ func (s *ProactiveClusterwideCapFCFS) Reregistered(_ sched.SchedulerDriver, mast
func (s *ProactiveClusterwideCapFCFS) Disconnected(sched.SchedulerDriver) {
// Need to stop the capping process.
s.ticker.Stop()
s.recapTicker.Stop()
mutex.Lock()
s.isCapping = false
mutex.Unlock()
log.Println("Framework disconnected with master")
}
@ -155,20 +168,44 @@ func (s *ProactiveClusterwideCapFCFS) startCapping() {
select {
case <-s.ticker.C:
// Need to cap the cluster to the currentCapValue.
mutex.Lock()
if currentCapValue > 0.0 {
//mutex.Lock()
//s.lock.Lock()
for _, host := range constants.Hosts {
// Rounding curreCapValue to the nearest int.
if err := rapl.Cap(host, "rapl", int(math.Floor(currentCapValue+0.5))); err != nil {
fmt.Println(err)
} else {
fmt.Printf("Successfully capped %s to %f%\n", host, currentCapValue)
log.Println(err)
}
}
//mutex.Unlock()
//s.lock.Unlock()
log.Printf("Capped the cluster to %d", int(math.Floor(currentCapValue+0.5)))
}
mutex.Unlock()
}
}
}()
}
// go routine to cap the entire cluster in regular intervals of time.
var recapValue = 0.0 // The cluster wide cap value when recapping.
func (s *ProactiveClusterwideCapFCFS) startRecapping() {
go func() {
for {
select {
case <-s.recapTicker.C:
mutex.Lock()
// If stopped performing cluster wide capping then we need to explicitly cap the entire cluster.
//if !s.isCapping && s.isRecapping && recapValue > 0.0 {
if s.isRecapping && recapValue > 0.0 {
for _, host := range constants.Hosts {
// Rounding curreCapValue to the nearest int.
if err := rapl.Cap(host, "rapl", int(math.Floor(recapValue+0.5))); err != nil {
log.Println(err)
}
}
log.Printf("Recapped the cluster to %d", int(math.Floor(recapValue+0.5)))
}
// setting recapping to false
s.isRecapping = false
mutex.Unlock()
}
}
}()
@ -179,7 +216,22 @@ func (s *ProactiveClusterwideCapFCFS) stopCapping() {
if s.isCapping {
log.Println("Stopping the cluster wide capping.")
s.ticker.Stop()
mutex.Lock()
s.isCapping = false
s.isRecapping = true
mutex.Unlock()
}
}
// Stop cluster wide Recapping
func (s *ProactiveClusterwideCapFCFS) stopRecapping() {
// If not capping, then definitely recapping.
if !s.isCapping && s.isRecapping {
log.Println("Stopping the cluster wide re-capping.")
s.recapTicker.Stop()
mutex.Lock()
s.isRecapping = false
mutex.Unlock()
}
}
@ -198,10 +250,7 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive
}
for host, tpower := range s.totalPower {
fmt.Printf("TotalPower[%s] = %f\n", host, tpower)
}
for host, apower := range s.availablePower {
fmt.Printf("AvailablePower[%s] = %f\n", host, apower)
log.Printf("TotalPower[%s] = %f", host, tpower)
}
for _, offer := range offers {
@ -227,10 +276,7 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive
TODO: We can choose to cap the cluster only if the clusterwide cap varies more than the current clusterwide cap.
Although this sounds like a better approach, it only works when the resource requirements of neighbouring tasks are similar.
*/
//offer_cpu, offer_ram, _ := OfferAgg(offer)
taken := false
//var mutex sync.Mutex
for i, task := range s.tasks {
// Don't take offer if it doesn't match our task's host requirement.
@ -242,27 +288,26 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive
if s.takeOffer(offer, task) {
// Capping the cluster if haven't yet started,
if !s.isCapping {
s.startCapping()
mutex.Lock()
s.isCapping = true
mutex.Unlock()
s.startCapping()
}
taken = true
//mutex.Lock()
//s.lock.Lock()
//tempCap, err := s.capper.fcfsDetermineCap(s.availablePower, &task)
tempCap, err := s.capper.fcfsDetermineCap(s.totalPower, &task)
if err == nil {
mutex.Lock()
currentCapValue = tempCap
mutex.Unlock()
} else {
fmt.Printf("Failed to determine new cluster wide cap: ")
fmt.Println(err)
log.Printf("Failed to determine new cluster wide cap: ")
log.Println(err)
}
//mutex.Unlock()
//s.lock.Unlock()
fmt.Printf("Starting on [%s]\n", offer.GetHostname())
log.Printf("Starting on [%s]\n", offer.GetHostname())
to_schedule := []*mesos.TaskInfo{s.newTask(offer, task)}
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, to_schedule, defaultFilter)
fmt.Printf("Inst: %d", *task.Instances)
log.Printf("Inst: %d", *task.Instances)
*task.Instances--
if *task.Instances <= 0 {
// All instances of the task have been scheduled. Need to remove it from the list of tasks to schedule.
@ -273,6 +318,7 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive
log.Println("Done scheduling all tasks")
// Need to stop the cluster wide capping as there aren't any more tasks to schedule.
s.stopCapping()
s.startRecapping() // Load changes after every task finishes and hence we need to change the capping of the cluster.
close(s.Shutdown)
}
}
@ -284,7 +330,7 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive
// If no task fit the offer, then declining the offer.
if !taken {
fmt.Printf("There is not enough resources to launch a task on Host: %s\n", offer.GetHostname())
log.Printf("There is not enough resources to launch a task on Host: %s\n", offer.GetHostname())
cpus, mem, watts := OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
@ -294,7 +340,7 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive
}
func (s *ProactiveClusterwideCapFCFS) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) {
log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
log.Printf("Received task status [%s] for task [%s]\n", NameFor(status.State), *status.TaskId.Value)
if *status.State == mesos.TaskState_TASK_RUNNING {
s.tasksRunning++
@ -302,17 +348,32 @@ func (s *ProactiveClusterwideCapFCFS) StatusUpdate(driver sched.SchedulerDriver,
delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value)
// Need to remove the task from the window of tasks.
s.capper.taskFinished(*status.TaskId.Value)
//currentCapValue, _ = s.capper.recap(s.availablePower, s.taskMonitor, *status.TaskId.Value)
// Determining the new cluster wide cap.
currentCapValue, _ = s.capper.recap(s.totalPower, s.taskMonitor, *status.TaskId.Value)
log.Printf("Recapping the cluster to %f\n", currentCapValue)
tempCap, err := s.capper.recap(s.totalPower, s.taskMonitor, *status.TaskId.Value)
if err == nil {
// if new determined cap value is different from the current recap value then we need to recap.
if int(math.Floor(tempCap+0.5)) != int(math.Floor(recapValue+0.5)) {
recapValue = tempCap
mutex.Lock()
s.isRecapping = true
mutex.Unlock()
log.Printf("Determined re-cap value: %f\n", recapValue)
} else {
mutex.Lock()
s.isRecapping = false
mutex.Unlock()
}
} else {
// Not updating currentCapValue
log.Println(err)
}
s.tasksRunning--
if s.tasksRunning == 0 {
select {
case <-s.Shutdown:
// Need to stop the capping process.
s.stopCapping()
s.stopRecapping()
close(s.Done)
default:
}