From bfcb254f23f9084e2db0a83383e484e6c54d7a97 Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Fri, 16 Dec 2016 15:49:30 -0500 Subject: [PATCH] formatted the code --- schedulers/pistoncapper.go | 65 +++++++++++++++++++------------------- 1 file changed, 32 insertions(+), 33 deletions(-) diff --git a/schedulers/pistoncapper.go b/schedulers/pistoncapper.go index 002a5f2..488a1cc 100644 --- a/schedulers/pistoncapper.go +++ b/schedulers/pistoncapper.go @@ -4,16 +4,17 @@ import ( "bitbucket.org/sunybingcloud/electron/constants" "bitbucket.org/sunybingcloud/electron/def" "bitbucket.org/sunybingcloud/electron/rapl" - "fmt" "errors" + "fmt" "github.com/golang/protobuf/proto" mesos "github.com/mesos/mesos-go/mesosproto" "github.com/mesos/mesos-go/mesosutil" sched "github.com/mesos/mesos-go/scheduler" "log" "math" - "sync" + "sort" "strings" + "sync" "time" ) @@ -26,15 +27,14 @@ import ( type PistonCapper struct { tasksCreated int tasksRunning int - tasks []def.Task - metrics map[string]def.Metric - running map[string]map[string]bool - taskMonitor map[string][]def.Task - clusterLoad map[string]float64 - totalPower map[string]float64 - ignoreWatts bool - ticker *time.Ticker - isCapping bool + tasks []def.Task + metrics map[string]def.Metric + running map[string]map[string]bool + taskMonitor map[string][]def.Task + totalPower map[string]float64 + ignoreWatts bool + ticker *time.Ticker + isCapping bool // First set of PCP values are garbage values, signal to logger to start recording when we're // about to schedule the new task. @@ -55,18 +55,17 @@ type PistonCapper struct { // New electron scheduler. func NewPistonCapper(tasks []def.Task, ignoreWatts bool) *PistonCapper { s := &PistonCapper{ - tasks: tasks, - ignoreWatts: ignoreWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - taskMonitor: make(map[string][]def.Task), - clusterLoad: make(map[string]float64), - totalPower: make(map[string]float64), - RecordPCP: false, - ticker: time.NewTicker(10 * time.Second), - isCapping: false, + tasks: tasks, + ignoreWatts: ignoreWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + taskMonitor: make(map[string][]def.Task), + totalPower: make(map[string]float64), + RecordPCP: false, + ticker: time.NewTicker(5 * time.Second), + isCapping: false, } return s } @@ -130,8 +129,6 @@ func (s *PistonCapper) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInf } } - - func (s *PistonCapper) Registered( _ sched.SchedulerDriver, frameworkID *mesos.FrameworkID, @@ -149,8 +146,10 @@ func (s *PistonCapper) Disconnected(sched.SchedulerDriver) { // go routine to cap the each node in the cluster at regular intervals of time. var capValues = make(map[string]float64) + // Storing the previous cap value for each host so as to not repeatedly cap the nodes to the same value. (reduces overhead) var previousRoundedCapValues = make(map[string]int) + func (s *PistonCapper) startCapping() { go func() { for { @@ -166,7 +165,7 @@ func (s *PistonCapper) startCapping() { if err := rapl.Cap(host, "rapl", roundedCapValue); err != nil { log.Println(err) } else { - log.Printf("Capped [%s] at %d", host, int(math.Floor(capValue + 0.5))) + log.Printf("Capped [%s] at %d", host, int(math.Floor(capValue+0.5))) } previousRoundedCapValues[host] = roundedCapValue } @@ -174,7 +173,7 @@ func (s *PistonCapper) startCapping() { if err := rapl.Cap(host, "rapl", roundedCapValue); err != nil { log.Println(err) } else { - log.Printf("Capped [%s] at %d", host, int(math.Floor(capValue + 0.5))) + log.Printf("Capped [%s] at %d", host, int(math.Floor(capValue+0.5))) } previousRoundedCapValues[host] = roundedCapValue } @@ -213,11 +212,11 @@ func (s *PistonCapper) ResourceOffers(driver sched.SchedulerDriver, offers []*me } /* - Piston capping strategy + Piston capping strategy - Perform bin-packing of tasks on nodes in the cluster, making sure that no task is given less hard-limit resources than requested. - For each set of tasks that are scheduled, compute the new cap values for each host in the cluster. - At regular intervals of time, cap each node in the cluster. + Perform bin-packing of tasks on nodes in the cluster, making sure that no task is given less hard-limit resources than requested. + For each set of tasks that are scheduled, compute the new cap values for each host in the cluster. + At regular intervals of time, cap each node in the cluster. */ for _, offer := range offers { select { @@ -251,7 +250,7 @@ func (s *PistonCapper) ResourceOffers(driver sched.SchedulerDriver, offers []*me for *task.Instances > 0 { // Does the task fit if (s.ignoreWatts || (offerWatts >= (totalWatts + task.Watts))) && - (offerCPU >= (totalCPU + task.CPU)) && + (offerCPU >= (totalCPU + task.CPU)) && (offerRAM >= (totalRAM + task.RAM)) { // Start piston capping if haven't started yet @@ -356,7 +355,7 @@ func (s *PistonCapper) StatusUpdate(driver sched.SchedulerDriver, status *mesos. mutex.Lock() capValues[hostOfFinishedTask] -= ((finishedTask.Watts * constants.CapMargin) / s.totalPower[hostOfFinishedTask]) * 100 // Checking to see if the cap value has become 0, in which case we uncap the host. - if int(math.Floor(capValues[hostOfFinishedTask] + 0.5)) == 0 { + if int(math.Floor(capValues[hostOfFinishedTask]+0.5)) == 0 { capValues[hostOfFinishedTask] = 100 } s.tasksRunning--