Kept track of totalPower per node. The watts resource for the first offer corresponds to the total power per node. Removed tasks, that had all their instances scheduled, from the list of tasks to schedule. Also, calling recap(...) every time a task completes to determine the new cluster wide cap."

This commit is contained in:
Pradyumna Kaushik 2016-11-14 22:46:38 -05:00 committed by Renan DelValle
parent 7a69aff8d7
commit 4cc1dd8e63

View file

@ -10,12 +10,21 @@ import (
"github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler"
"log"
"strings"
"sync"
"time"
"math"
"strings"
"time"
)
// Decides if to take an offer or not
func (_ *ProactiveClusterwideCapFCFS) takeOffer(offer *mesos.Offer, task def.Task) bool {
offer_cpu, offer_mem, _ := OfferAgg(offer)
if offer_cpu >= task.CPU && offer_mem >= task.RAM {
return true
}
return false
}
// electronScheduler implements the Scheduler interface.
type ProactiveClusterwideCapFCFS struct {
tasksCreated int
@ -23,10 +32,14 @@ type ProactiveClusterwideCapFCFS struct {
tasks []def.Task
metrics map[string]def.Metric
running map[string]map[string]bool
taskMonitor map[string][]def.Task // store tasks that are currently running.
availablePower map[string]float64 // available power for each node in the cluster.
totalPower map[string]float64 // total power for each node in the cluster.
ignoreWatts bool
capper *clusterwideCapper
ticker *time.Ticker
isCapping bool
isCapping bool // indicate whether we are currently performing cluster wide capping.
//lock *sync.Mutex
// First set of PCP values are garbage values, signal to logger to start recording when we're
// about to schedule the new task.
@ -53,10 +66,14 @@ func NewProactiveClusterwideCapFCFS(tasks []def.Task, ignoreWatts bool) *Proacti
Done: make(chan struct{}),
PCPLog: make(chan struct{}),
running: make(map[string]map[string]bool),
taskMonitor: make(map[string][]def.Task),
availablePower: make(map[string]float64),
totalPower: make(map[string]float64),
RecordPCP: false,
capper: getClusterwideCapperInstance(),
ticker: time.NewTicker(10 * time.Second),
ticker: time.NewTicker(5 * time.Second),
isCapping: false,
//lock: new(sync.Mutex),
}
return s
}
@ -81,6 +98,7 @@ func (s *ProactiveClusterwideCapFCFS) newTask(offer *mesos.Offer, task def.Task)
task.SetTaskID(*proto.String(taskName))
// Add task to the list of tasks running on the node.
s.running[offer.GetSlaveId().GoString()][taskName] = true
s.taskMonitor[offer.GetSlaveId().GoString()] = []def.Task{task}
resources := []*mesos.Resource{
mesosutil.NewScalarResource("cpus", task.CPU),
@ -123,51 +141,76 @@ func (s *ProactiveClusterwideCapFCFS) Reregistered(_ sched.SchedulerDriver, mast
}
func (s *ProactiveClusterwideCapFCFS) Disconnected(sched.SchedulerDriver) {
// Need to stop the capping process.
s.ticker.Stop()
s.isCapping = false
log.Println("Framework disconnected with master")
}
// go routine to cap the entire cluster in regular intervals of time.
var currentCapValue = 0.0 // initial value to indicate that we haven't capped the cluster yet.
func (s *ProactiveClusterwideCapFCFS) startCapping(mutex sync.Mutex) {
func (s *ProactiveClusterwideCapFCFS) startCapping() {
go func() {
for {
select {
case <- s.ticker.C:
// Need to cap the cluster to the currentCapValue.
if currentCapValue > 0.0 {
mutex.Lock()
//mutex.Lock()
//s.lock.Lock()
for _, host := range constants.Hosts {
// Rounding curreCapValue to the nearest int.
if err := rapl.Cap(host, "rapl", int(math.Floor(currentCapValue + 0.5))); err != nil {
fmt.Println(err)
} else {
fmt.Printf("Successfully capped %s to %d\\% at %\n", host, currentCapValue)
fmt.Printf("Successfully capped %s to %f%\n", host, currentCapValue)
}
}
mutex.Unlock()
//mutex.Unlock()
//s.lock.Unlock()
}
}
}
}()
}
// Stop cluster wide capping
func (s *ProactiveClusterwideCapFCFS) stopCapping() {
if s.isCapping {
log.Println("Stopping the cluster wide capping.")
s.ticker.Stop()
s.isCapping = false
}
}
// TODO: Need to reduce the time complexity: looping over offers twice (Possible to do it just once?).
func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {
log.Printf("Received %d resource offers", len(offers))
// retrieving the available power for all the hosts in the offers.
available_power := make(map[string]float64)
for _, offer := range offers {
_, _, offer_watts := OfferAgg(offer)
available_power[*offer.Hostname] = offer_watts
s.availablePower[*offer.Hostname] = offer_watts
// setting total power if the first time.
if _, ok := s.totalPower[*offer.Hostname]; !ok {
s.totalPower[*offer.Hostname] = offer_watts
}
}
for host, tpower := range s.totalPower {
fmt.Printf("TotalPower[%s] = %f\n", host, tpower)
}
for host, apower := range s.availablePower {
fmt.Printf("AvailablePower[%s] = %f\n", host, apower)
}
for _, offer := range offers {
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offerf on [", offer.GetHostname(), "]")
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, longFilter)
log.Println("Number og tasks still running: ", s.tasksRunning)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
default:
}
@ -176,46 +219,64 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive
Clusterwide Capping strategy
For each task in s.tasks,
1. I need to check whether the mesos offer can be taken or not (based on CPU and RAM).
2. If the tasks fits the offer then I need to detemrine the cluster wide cap.
3. First need to cap the cluster to the determine cap value and then launch the task on the host corresponding to the offer.
1. Need to check whether the offer can be taken or not (based on CPU and RAM requirements).
2. If the tasks fits the offer, then I need to detemrine the cluster wide cap.
3. currentCapValue is updated with the determined cluster wide cap.
Capping the cluster for every task would create a lot of overhead. Hence, clusterwide capping is performed at regular intervals.
Cluster wide capping is currently performed at regular intervals of time.
TODO: We can choose to cap the cluster only if the clusterwide cap varies more than the current clusterwide cap.
Although this sounds like a better approach, it only works when the resource requirements of neighbouring tasks are similar.
*/
offer_cpu, offer_ram, _ := OfferAgg(offer)
//offer_cpu, offer_ram, _ := OfferAgg(offer)
taken := false
var mutex sync.Mutex
//var mutex sync.Mutex
// If haven't started cluster wide capping then doing so,
if !s.isCapping {
s.startCapping(mutex)
s.isCapping = true
}
for _, task := range s.tasks {
for i, task := range s.tasks {
// Don't take offer if it doesn't match our task's host requirement.
if !strings.HasPrefix(*offer.Hostname, task.Host) {
continue
}
// Does the task fit.
if (s.ignoreWatts || offer_cpu >= task.CPU || offer_ram >= task.RAM) {
if s.takeOffer(offer, task) {
// Capping the cluster if haven't yet started,
if !s.isCapping {
s.startCapping()
s.isCapping = true
}
taken = true
mutex.Lock()
tempCap, err := s.capper.fcfsDetermineCap(available_power, &task)
//mutex.Lock()
//s.lock.Lock()
//tempCap, err := s.capper.fcfsDetermineCap(s.availablePower, &task)
tempCap, err := s.capper.fcfsDetermineCap(s.totalPower, &task)
if err == nil {
currentCapValue = tempCap
} else {
fmt.Printf("Failed to determine cluster wide cap: ")
fmt.Printf("Failed to determine new cluster wide cap: ")
fmt.Println(err)
}
mutex.Unlock()
//mutex.Unlock()
//s.lock.Unlock()
fmt.Printf("Starting on [%s]\n", offer.GetHostname())
to_schedule := []*mesos.TaskInfo{s.newTask(offer, task)}
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, to_schedule, defaultFilter)
fmt.Printf("Inst: %d", *task.Instances)
*task.Instances--
if *task.Instances <= 0 {
// All instances of the task have been scheduled. Need to remove it from the list of tasks to schedule.
s.tasks[i] = s.tasks[len(s.tasks)-1]
s.tasks = s.tasks[:len(s.tasks)-1]
if len(s.tasks) <= 0 {
log.Println("Done scheduling all tasks")
// Need to stop the cluster wide capping as there aren't any more tasks to schedule.
s.stopCapping()
close(s.Shutdown)
}
}
break // Offer taken, move on.
} else {
// Task doesn't fit the offer. Move onto the next offer.
}
@ -223,7 +284,7 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive
// If no task fit the offer, then declining the offer.
if !taken {
fmt.Println("There is not enough resources to launch a task:")
fmt.Printf("There is not enough resources to launch a task on Host: %s\n", offer.GetHostname())
cpus, mem, watts := OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
@ -241,10 +302,17 @@ func (s *ProactiveClusterwideCapFCFS) StatusUpdate(driver sched.SchedulerDriver,
delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value)
// Need to remove the task from the window of tasks.
s.capper.taskFinished(*status.TaskId.Value)
//currentCapValue, _ = s.capper.recap(s.availablePower, s.taskMonitor, *status.TaskId.Value)
// Determining the new cluster wide cap.
currentCapValue, _ = s.capper.recap(s.totalPower, s.taskMonitor, *status.TaskId.Value)
log.Printf("Recapping the cluster to %f\n", currentCapValue)
s.tasksRunning--
if s.tasksRunning == 0 {
select {
case <-s.Shutdown:
// Need to stop the capping process.
s.stopCapping()
close(s.Done)
default:
}