formatted the code

This commit is contained in:
Pradyumna Kaushik 2016-12-16 15:49:30 -05:00 committed by Renan DelValle
parent 16e25cea0f
commit bfcb254f23

View file

@ -4,16 +4,17 @@ import (
"bitbucket.org/sunybingcloud/electron/constants" "bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/def" "bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/rapl" "bitbucket.org/sunybingcloud/electron/rapl"
"fmt"
"errors" "errors"
"fmt"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto" mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil" "github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler" sched "github.com/mesos/mesos-go/scheduler"
"log" "log"
"math" "math"
"sync" "sort"
"strings" "strings"
"sync"
"time" "time"
) )
@ -26,15 +27,14 @@ import (
type PistonCapper struct { type PistonCapper struct {
tasksCreated int tasksCreated int
tasksRunning int tasksRunning int
tasks []def.Task tasks []def.Task
metrics map[string]def.Metric metrics map[string]def.Metric
running map[string]map[string]bool running map[string]map[string]bool
taskMonitor map[string][]def.Task taskMonitor map[string][]def.Task
clusterLoad map[string]float64 totalPower map[string]float64
totalPower map[string]float64 ignoreWatts bool
ignoreWatts bool ticker *time.Ticker
ticker *time.Ticker isCapping bool
isCapping bool
// First set of PCP values are garbage values, signal to logger to start recording when we're // First set of PCP values are garbage values, signal to logger to start recording when we're
// about to schedule the new task. // about to schedule the new task.
@ -55,18 +55,17 @@ type PistonCapper struct {
// New electron scheduler. // New electron scheduler.
func NewPistonCapper(tasks []def.Task, ignoreWatts bool) *PistonCapper { func NewPistonCapper(tasks []def.Task, ignoreWatts bool) *PistonCapper {
s := &PistonCapper{ s := &PistonCapper{
tasks: tasks, tasks: tasks,
ignoreWatts: ignoreWatts, ignoreWatts: ignoreWatts,
Shutdown: make(chan struct{}), Shutdown: make(chan struct{}),
Done: make(chan struct{}), Done: make(chan struct{}),
PCPLog: make(chan struct{}), PCPLog: make(chan struct{}),
running: make(map[string]map[string]bool), running: make(map[string]map[string]bool),
taskMonitor: make(map[string][]def.Task), taskMonitor: make(map[string][]def.Task),
clusterLoad: make(map[string]float64), totalPower: make(map[string]float64),
totalPower: make(map[string]float64), RecordPCP: false,
RecordPCP: false, ticker: time.NewTicker(5 * time.Second),
ticker: time.NewTicker(10 * time.Second), isCapping: false,
isCapping: false,
} }
return s return s
} }
@ -130,8 +129,6 @@ func (s *PistonCapper) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInf
} }
} }
func (s *PistonCapper) Registered( func (s *PistonCapper) Registered(
_ sched.SchedulerDriver, _ sched.SchedulerDriver,
frameworkID *mesos.FrameworkID, frameworkID *mesos.FrameworkID,
@ -149,8 +146,10 @@ func (s *PistonCapper) Disconnected(sched.SchedulerDriver) {
// go routine to cap the each node in the cluster at regular intervals of time. // go routine to cap the each node in the cluster at regular intervals of time.
var capValues = make(map[string]float64) var capValues = make(map[string]float64)
// Storing the previous cap value for each host so as to not repeatedly cap the nodes to the same value. (reduces overhead) // Storing the previous cap value for each host so as to not repeatedly cap the nodes to the same value. (reduces overhead)
var previousRoundedCapValues = make(map[string]int) var previousRoundedCapValues = make(map[string]int)
func (s *PistonCapper) startCapping() { func (s *PistonCapper) startCapping() {
go func() { go func() {
for { for {
@ -166,7 +165,7 @@ func (s *PistonCapper) startCapping() {
if err := rapl.Cap(host, "rapl", roundedCapValue); err != nil { if err := rapl.Cap(host, "rapl", roundedCapValue); err != nil {
log.Println(err) log.Println(err)
} else { } else {
log.Printf("Capped [%s] at %d", host, int(math.Floor(capValue + 0.5))) log.Printf("Capped [%s] at %d", host, int(math.Floor(capValue+0.5)))
} }
previousRoundedCapValues[host] = roundedCapValue previousRoundedCapValues[host] = roundedCapValue
} }
@ -174,7 +173,7 @@ func (s *PistonCapper) startCapping() {
if err := rapl.Cap(host, "rapl", roundedCapValue); err != nil { if err := rapl.Cap(host, "rapl", roundedCapValue); err != nil {
log.Println(err) log.Println(err)
} else { } else {
log.Printf("Capped [%s] at %d", host, int(math.Floor(capValue + 0.5))) log.Printf("Capped [%s] at %d", host, int(math.Floor(capValue+0.5)))
} }
previousRoundedCapValues[host] = roundedCapValue previousRoundedCapValues[host] = roundedCapValue
} }
@ -213,11 +212,11 @@ func (s *PistonCapper) ResourceOffers(driver sched.SchedulerDriver, offers []*me
} }
/* /*
Piston capping strategy Piston capping strategy
Perform bin-packing of tasks on nodes in the cluster, making sure that no task is given less hard-limit resources than requested. Perform bin-packing of tasks on nodes in the cluster, making sure that no task is given less hard-limit resources than requested.
For each set of tasks that are scheduled, compute the new cap values for each host in the cluster. For each set of tasks that are scheduled, compute the new cap values for each host in the cluster.
At regular intervals of time, cap each node in the cluster. At regular intervals of time, cap each node in the cluster.
*/ */
for _, offer := range offers { for _, offer := range offers {
select { select {
@ -251,7 +250,7 @@ func (s *PistonCapper) ResourceOffers(driver sched.SchedulerDriver, offers []*me
for *task.Instances > 0 { for *task.Instances > 0 {
// Does the task fit // Does the task fit
if (s.ignoreWatts || (offerWatts >= (totalWatts + task.Watts))) && if (s.ignoreWatts || (offerWatts >= (totalWatts + task.Watts))) &&
(offerCPU >= (totalCPU + task.CPU)) && (offerCPU >= (totalCPU + task.CPU)) &&
(offerRAM >= (totalRAM + task.RAM)) { (offerRAM >= (totalRAM + task.RAM)) {
// Start piston capping if haven't started yet // Start piston capping if haven't started yet
@ -356,7 +355,7 @@ func (s *PistonCapper) StatusUpdate(driver sched.SchedulerDriver, status *mesos.
mutex.Lock() mutex.Lock()
capValues[hostOfFinishedTask] -= ((finishedTask.Watts * constants.CapMargin) / s.totalPower[hostOfFinishedTask]) * 100 capValues[hostOfFinishedTask] -= ((finishedTask.Watts * constants.CapMargin) / s.totalPower[hostOfFinishedTask]) * 100
// Checking to see if the cap value has become 0, in which case we uncap the host. // Checking to see if the cap value has become 0, in which case we uncap the host.
if int(math.Floor(capValues[hostOfFinishedTask] + 0.5)) == 0 { if int(math.Floor(capValues[hostOfFinishedTask]+0.5)) == 0 {
capValues[hostOfFinishedTask] = 100 capValues[hostOfFinishedTask] = 100
} }
s.tasksRunning-- s.tasksRunning--