Made a check to see whether cluster wide capping has started and if not then starting the go routine that performs the cluster wide capping at regular intervals.

This commit is contained in:
Pradyumna Kaushik 2016-11-10 21:18:05 -05:00 committed by Renan DelValle
parent 8a6ad00e21
commit a9f7ca5c91

View file

@ -4,17 +4,16 @@ import (
"bitbucket.org/sunybingcloud/electron/def" "bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/constants" "bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/rapl" "bitbucket.org/sunybingcloud/electron/rapl"
"errors"
"fmt" "fmt"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto" mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil" "github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler" sched "github.com/mesos/mesos-go/scheduler"
"log" "log"
"sort"
"strings" "strings"
"sync" "sync"
"time" "time"
"math"
) )
// electronScheduler implements the Scheduler interface. // electronScheduler implements the Scheduler interface.
@ -53,11 +52,11 @@ func NewProactiveClusterwideCapFCFS(tasks []def.Task, ignoreWatts bool) *Proacti
Shutdown: make(chan struct{}), Shutdown: make(chan struct{}),
Done: make(chan struct{}), Done: make(chan struct{}),
PCPLog: make(chan struct{}), PCPLog: make(chan struct{}),
running: make(mapp[string]map[string]bool), running: make(map[string]map[string]bool),
RecordPCP: false, RecordPCP: false,
capper: getClusterwideCapperInstance(), capper: getClusterwideCapperInstance(),
ticker: time.NewTicker(constants.Clusterwide_cap_interval * time.Second), ticker: time.NewTicker(10 * time.Second),
isCapping: false isCapping: false,
} }
return s return s
} }
@ -79,7 +78,7 @@ func (s *ProactiveClusterwideCapFCFS) newTask(offer *mesos.Offer, task def.Task)
// Setting the task ID to the task. This is done so that we can consider each task to be different, // Setting the task ID to the task. This is done so that we can consider each task to be different,
// even though they have the same parameters. // even though they have the same parameters.
task.SetTaskID(proto.String(taskName)) task.SetTaskID(*proto.String(taskName))
// Add task to the list of tasks running on the node. // Add task to the list of tasks running on the node.
s.running[offer.GetSlaveId().GoString()][taskName] = true s.running[offer.GetSlaveId().GoString()][taskName] = true
@ -114,7 +113,7 @@ func (s *ProactiveClusterwideCapFCFS) newTask(offer *mesos.Offer, task def.Task)
func (s *ProactiveClusterwideCapFCFS) Registered( func (s *ProactiveClusterwideCapFCFS) Registered(
_ sched.SchedulerDriver, _ sched.SchedulerDriver,
framewordID *mesos.FrameworkID, frameworkID *mesos.FrameworkID,
masterInfo *mesos.MasterInfo) { masterInfo *mesos.MasterInfo) {
log.Printf("Framework %s registered with master %s", frameworkID, masterInfo) log.Printf("Framework %s registered with master %s", frameworkID, masterInfo)
} }
@ -128,23 +127,27 @@ func (s *ProactiveClusterwideCapFCFS) Disconnected(sched.SchedulerDriver) {
} }
// go routine to cap the entire cluster in regular intervals of time. // go routine to cap the entire cluster in regular intervals of time.
func (s *ProactiveClusterwideCapFCFS) startCapping(currentCapValue float64, mutex sync.Mutex) { var currentCapValue = 0.0 // initial value to indicate that we haven't capped the cluster yet.
func (s *ProactiveClusterwideCapFCFS) startCapping(mutex sync.Mutex) {
go func() { go func() {
for tick := range s.ticker.C { for {
// Need to cap the cluster to the currentCapValue. select {
if currentCapValue > 0.0 { case <- s.ticker.C:
mutex.Lock() // Need to cap the cluster to the currentCapValue.
for _, host := range constants.Hosts { if currentCapValue > 0.0 {
if err := rapl.Cap(host, int(math.Floor(currentCapValue + 0.5))); err != nil { mutex.Lock()
fmt.Println(err) for _, host := range constants.Hosts {
} else { if err := rapl.Cap(host, "rapl", int(math.Floor(currentCapValue + 0.5))); err != nil {
fmt.Println("Successfully capped %s to %d\\%", host, currentCapValue) fmt.Println(err)
} else {
fmt.Printf("Successfully capped %s to %d\\% at %\n", host, currentCapValue)
}
}
mutex.Unlock()
} }
}
mutex.Unlock()
} }
} }
} }()
} }
// TODO: Need to reduce the time complexity: looping over offers twice (Possible to do it just once?). // TODO: Need to reduce the time complexity: looping over offers twice (Possible to do it just once?).
@ -155,12 +158,12 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive
available_power := make(map[string]float64) available_power := make(map[string]float64)
for _, offer := range offers { for _, offer := range offers {
_, _, offer_watts := OfferAgg(offer) _, _, offer_watts := OfferAgg(offer)
available_power[offer.Hostname] = offer_watts available_power[*offer.Hostname] = offer_watts
} }
for _, offer := range offers { for _, offer := range offers {
select { select {
case <-s.Shutdown; case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offerf on [", offer.GetHostname(), "]") log.Println("Done scheduling tasks: declining offerf on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, longFilter) driver.DeclineOffer(offer.Id, longFilter)
@ -184,9 +187,14 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive
offer_cpu, offer_ram, _ := OfferAgg(offer) offer_cpu, offer_ram, _ := OfferAgg(offer)
taken := false taken := false
currentCapValue := 0.0 // initial value to indicate that we haven't capped the cluster yet.
var mutex sync.Mutex var mutex sync.Mutex
// If haven't started cluster wide capping then doing so,
if !s.isCapping {
s.startCapping(mutex)
s.isCapping = true
}
for _, task := range s.tasks { for _, task := range s.tasks {
// Don't take offer if it doesn't match our task's host requirement. // Don't take offer if it doesn't match our task's host requirement.
if !strings.HasPrefix(*offer.Hostname, task.Host) { if !strings.HasPrefix(*offer.Hostname, task.Host) {
@ -194,18 +202,20 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive
} }
// Does the task fit. // Does the task fit.
if (s.ignoreWatts || offer_cpu >= task.CPU ||| offer_ram >= task.RAM) { if (s.ignoreWatts || offer_cpu >= task.CPU || offer_ram >= task.RAM) {
taken = true taken = true
mutex.Lock() mutex.Lock()
tempCap, err = s.capper.fcfsDetermineCap(available_power, task) tempCap, err := s.capper.fcfsDetermineCap(available_power, &task)
if err == nil { if err == nil {
currentCapValue = tempCap currentCapValue = tempCap
} else { } else {
fmt.Println("Failed to determine cluster wide cap: " + err.String()) fmt.Printf("Failed to determine cluster wide cap: ")
fmt.Println(err)
} }
mutex.Unlock() mutex.Unlock()
fmt.Printf("Starting on [%s]\n", offer.GetHostname()) fmt.Printf("Starting on [%s]\n", offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, [s.newTask(offer, task)], defaultFilter) to_schedule := []*mesos.TaskInfo{s.newTask(offer, task)}
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, to_schedule, defaultFilter)
} else { } else {
// Task doesn't fit the offer. Move onto the next offer. // Task doesn't fit the offer. Move onto the next offer.
} }
@ -230,7 +240,7 @@ func (s *ProactiveClusterwideCapFCFS) StatusUpdate(driver sched.SchedulerDriver,
} else if IsTerminal(status.State) { } else if IsTerminal(status.State) {
delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value)
// Need to remove the task from the window of tasks. // Need to remove the task from the window of tasks.
s.capper.taskFinished(status.TaskId.Value) s.capper.taskFinished(*status.TaskId.Value)
s.tasksRunning-- s.tasksRunning--
if s.tasksRunning == 0 { if s.tasksRunning == 0 {
select { select {