From 46be28ef8dbddca70baf5302123c226d4b7a1c18 Mon Sep 17 00:00:00 2001 From: Akash Kothawale Date: Fri, 2 Feb 2018 19:24:51 -0500 Subject: [PATCH] pcp: CPU/MEM utilization & task share variance per node --- pcp/pcp.go | 34 ++++++++++++++++++++++++++++-- pcp/utils.go | 52 ++++++++++++++++++++++++++++++++++++++++++++++ scheduler.go | 2 +- schedulers/base.go | 47 ++++++++++++++++++++++++----------------- 4 files changed, 113 insertions(+), 22 deletions(-) diff --git a/pcp/pcp.go b/pcp/pcp.go index 5924d88..b5d57f4 100644 --- a/pcp/pcp.go +++ b/pcp/pcp.go @@ -2,14 +2,18 @@ package pcp import ( elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def" + "bitbucket.org/sunybingcloud/elektron/schedulers" "bufio" + "fmt" + "github.com/mesos/mesos-go/api/v0/scheduler" "log" "os/exec" "syscall" "time" ) -func Start(quit chan struct{}, logging *bool, logMType chan elecLogDef.LogMessageType, logMsg chan string) { +func Start(quit chan struct{}, logging *bool, logMType chan elecLogDef.LogMessageType, logMsg chan string, s scheduler.Scheduler) { + baseSchedRef := s.(*schedulers.BaseScheduler) const pcpCommand string = "pmdumptext -m -l -f '' -t 1.0 -d , -c config" cmd := exec.Command("sh", "-c", pcpCommand) cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} @@ -34,14 +38,40 @@ func Start(quit chan struct{}, logging *bool, logMType chan elecLogDef.LogMessag scanner.Scan() seconds := 0 + for scanner.Scan() { + text := scanner.Text() if *logging { logMType <- elecLogDef.PCP - logMsg <- scanner.Text() + logMsg <- text } seconds++ + + memUtils := memUtilPerNode(text) + memTaskShares := make([]float64, len(memUtils)) + + cpuUtils := cpuUtilPerNode(text) + cpuTaskShares := make([]float64, len(cpuUtils)) + + for i := 0; i < 8; i++ { + host := fmt.Sprintf("stratos-00%d.cs.binghamton.edu", i+1) + slaveID := baseSchedRef.HostNameToSlaveID[host] + tasksRunning := len(baseSchedRef.Running[slaveID]) + if tasksRunning > 0 { + cpuTaskShares[i] = cpuUtils[i] / float64(tasksRunning) + memTaskShares[i] = memUtils[i] / float64(tasksRunning) + } + } + + cpuVariance := calcVariance(cpuUtils) + cpuTaskSharesVariance := calcVariance(cpuTaskShares) + memVariance := calcVariance(memUtils) + memTaskSharesVariance := calcVariance(memTaskShares) + + logMType <- elecLogDef.DEG_COL + logMsg <- fmt.Sprintf("%f, %f, %f, %f", cpuVariance, cpuTaskSharesVariance, memVariance, memTaskSharesVariance) } }(logging) diff --git a/pcp/utils.go b/pcp/utils.go index ff2ef55..ec902a5 100644 --- a/pcp/utils.go +++ b/pcp/utils.go @@ -3,6 +3,8 @@ package pcp import ( "container/ring" "math" + "strconv" + "strings" ) var RAPLUnits = math.Pow(2, -32) @@ -47,3 +49,53 @@ func AverageClusterPowerHistory(history *ring.Ring) float64 { return (total / count) } + +func sumAndNormalize(tokenSlice []string, normalizer float64) float64 { + sum := 0.0 + for _, value := range tokenSlice { + i, _ := strconv.ParseFloat(value, 64) + sum += i + } + return sum / normalizer +} + +func calcMean(a []float64) float64 { + total := 0.0 + for _, v := range a { + total += v + } + return total / float64(len(a)) +} + +func calcVariance(a []float64) float64 { + mean := calcMean(a) + total := 0.0 + for _, v := range a { + total += math.Pow(mean-v, 2) + } + return total / float64(len(a)) +} + +func utilization(used string, free string) float64 { + u, _ := strconv.ParseFloat(used, 64) + f, _ := strconv.ParseFloat(free, 64) + return u / (u + f) +} + +func cpuUtilPerNode(text string) []float64 { + tokenSlice := strings.Split(text, ",") + cpuUtil := make([]float64, 8) + for i := 0; i < 8; i++ { + cpuUtil[i] = utilization(tokenSlice[8+i], tokenSlice[24+i]) + } + return cpuUtil +} + +func memUtilPerNode(text string) []float64 { + tokenSlice := strings.Split(text, ",") + memUtil := make([]float64, 8) + for i := 0; i < 8; i++ { + memUtil[i] = utilization(tokenSlice[40+i], tokenSlice[32+i]) + } + return memUtil +} diff --git a/scheduler.go b/scheduler.go index 2b7ffbb..3951494 100644 --- a/scheduler.go +++ b/scheduler.go @@ -138,7 +138,7 @@ func main() { return } - go pcp.Start(pcpLog, &recordPCP, logMType, logMsg) + go pcp.Start(pcpLog, &recordPCP, logMType, logMsg, scheduler) //go pcp.StartPCPLogAndExtremaDynamicCap(pcpLog, &recordPCP, *hiThreshold, *loThreshold, logMType, logMsg) //go pcp.StartPCPLogAndProgressiveExtremaCap(pcpLog, &recordPCP, *hiThreshold, *loThreshold, logMType, logMsg) time.Sleep(1 * time.Second) // Take a second between starting PCP log and continuing diff --git a/schedulers/base.go b/schedulers/base.go index 222a071..10b36bb 100644 --- a/schedulers/base.go +++ b/schedulers/base.go @@ -3,7 +3,6 @@ package schedulers import ( "bitbucket.org/sunybingcloud/elektron/def" elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def" - "bitbucket.org/sunybingcloud/elektron/utilities" "bitbucket.org/sunybingcloud/elektron/utilities/schedUtils" "bytes" "fmt" @@ -22,14 +21,15 @@ type BaseScheduler struct { // Current scheduling policy used for resource offer consumption. curSchedPolicy SchedPolicyState - tasksCreated int - tasksRunning int - tasks []def.Task - metrics map[string]def.Metric - running map[string]map[string]bool - wattsAsAResource bool - classMapWatts bool - totalResourceAvailabilityRecorded bool + tasksCreated int + tasksRunning int + tasks []def.Task + metrics map[string]def.Metric + Running map[string]map[string]bool + wattsAsAResource bool + classMapWatts bool + TasksRunningMutex sync.Mutex + HostNameToSlaveID map[string]string // First set of PCP values are garbage values, signal to logger to start recording when we're // about to schedule a new task @@ -76,7 +76,8 @@ func (s *BaseScheduler) init(opts ...schedPolicyOption) { log.Fatal(err) } } - s.running = make(map[string]map[string]bool) + s.Running = make(map[string]map[string]bool) + s.HostNameToSlaveID = make(map[string]string) s.mutex = sync.Mutex{} s.schedWindowResStrategy = schedUtils.SchedWindowResizingCritToStrategy["fillNextOfferCycle"] } @@ -96,12 +97,14 @@ func (s *BaseScheduler) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskIn } // If this is our first time running into this Agent - if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok { - s.running[offer.GetSlaveId().GoString()] = make(map[string]bool) + s.TasksRunningMutex.Lock() + if _, ok := s.Running[offer.GetSlaveId().GoString()]; !ok { + s.Running[offer.GetSlaveId().GoString()] = make(map[string]bool) } + s.TasksRunningMutex.Unlock() // Add task to list of tasks running on node - s.running[offer.GetSlaveId().GoString()][taskName] = true + s.Running[offer.GetSlaveId().GoString()][taskName] = true resources := []*mesos.Resource{ mesosutil.NewScalarResource("cpus", task.CPU), @@ -177,6 +180,11 @@ func (s *BaseScheduler) Disconnected(sched.SchedulerDriver) { } func (s *BaseScheduler) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { + for _, offer := range offers { + if _, ok := s.HostNameToSlaveID[offer.GetHostname()]; !ok { + s.HostNameToSlaveID[offer.GetHostname()] = offer.GetSlaveId().GoString() + } + } s.curSchedPolicy.ConsumeOffers(s, driver, offers) } @@ -185,10 +193,9 @@ func (s *BaseScheduler) StatusUpdate(driver sched.SchedulerDriver, status *mesos if *status.State == mesos.TaskState_TASK_RUNNING { s.tasksRunning++ } else if IsTerminal(status.State) { - // Update resource availability. - utilities.ResourceAvailabilityUpdate("ON_TASK_TERMINAL_STATE", - *status.TaskId, *status.SlaveId) - delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) + s.TasksRunningMutex.Lock() + delete(s.Running[status.GetSlaveId().GoString()], *status.TaskId.Value) + s.TasksRunningMutex.Unlock() s.tasksRunning-- if s.tasksRunning == 0 { select { @@ -246,7 +253,7 @@ func (s *BaseScheduler) LogNoPendingTasksDeclineOffers(offer *mesos.Offer) { func (s *BaseScheduler) LogNumberOfRunningTasks() { lmt := elecLogDef.GENERAL msgColor := elecLogDef.LogMessageColors[lmt] - msg := msgColor.Sprintf("Number of tasks still running = %d", s.tasksRunning) + msg := msgColor.Sprintf("Number of tasks still Running = %d", s.tasksRunning) s.Log(lmt, msg) } @@ -255,9 +262,11 @@ func (s *BaseScheduler) LogCoLocatedTasks(slaveID string) { msgColor := elecLogDef.LogMessageColors[lmt] buffer := bytes.Buffer{} buffer.WriteString(fmt.Sprintln("Colocated with:")) - for taskName := range s.running[slaveID] { + s.TasksRunningMutex.Lock() + for taskName := range s.Running[slaveID] { buffer.WriteString(fmt.Sprintln(taskName)) } + s.TasksRunningMutex.Unlock() msg := msgColor.Sprintf(buffer.String()) s.Log(lmt, msg) }