diff --git a/pcp/loganddynamiccap.go b/pcp/loganddynamiccap.go new file mode 100644 index 0000000..577a395 --- /dev/null +++ b/pcp/loganddynamiccap.go @@ -0,0 +1,214 @@ +package pcp + +import ( + "bufio" + "container/ring" + "log" + "math" + "os" + "os/exec" + "sort" + "strconv" + "strings" + "syscall" + "time" + "bitbucket.org/bingcloud/electron/rapl" +) + +var RAPLUnits = math.Pow(2, -32) + +func meanPKG(history *ring.Ring) float64 { + + total := 0.0 + count := 0.0 + + history.Do(func(x interface{}) { + if val, ok := x.(float64); ok { //Add it if we can get a float + total += val + count++ + } + }) + + if count == 0.0 { + return 0.0 + } + + count /= 2 + + return (total / count) +} + +func meanCluster(history *ring.Ring) float64 { + + total := 0.0 + count := 0.0 + + history.Do(func(x interface{}) { + if val, ok := x.(float64); ok { //Add it if we can get a float + total += val + count++ + } + }) + + if count == 0.0 { + return 0.0 + } + + return (total / count) +} + +func StartLogAndDynamicCap(quit chan struct{}, logging *bool, prefix string, hiThreshold, loThreshold float64) { + const pcpCommand string = "pmdumptext -m -l -f '' -t 1.0 -d , -c config" + cmd := exec.Command("sh", "-c", pcpCommand) + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + startTime := time.Now().Format("20060102150405") + + if hiThreshold < loThreshold { + log.Println("High threshold is lower than low threshold!") + } + + logFile, err := os.Create("./" + prefix + startTime + ".pcplog") + if err != nil { + log.Fatal(err) + } + + defer logFile.Close() + + pipe, err := cmd.StdoutPipe() + if err != nil { + log.Fatal(err) + } + //cmd.Stdout = stdout + + scanner := bufio.NewScanner(pipe) + + go func(logging *bool, hiThreshold, loThreshold float64) { + // Get names of the columns + scanner.Scan() + + // Write to logfile + logFile.WriteString(scanner.Text() + "\n") + + headers := strings.Split(scanner.Text(), ",") + + powerIndexes := make([]int, 0, 0) + powerHistories := make(map[string]*ring.Ring) + indexToHost := make(map[int]string) + + for i, hostMetric := range headers { + split := strings.Split(hostMetric, ":") + //log.Printf("%d Host %s: Metric: %s\n", i, split[0], split[1]) + + if strings.Contains(split[1], "RAPL_ENERGY_PKG") { + //fmt.Println("Index: ", i) + powerIndexes = append(powerIndexes, i) + indexToHost[i] = split[0] + powerHistories[split[0]] = ring.New(10) // Two PKGS per node, 10 = 5 seconds tracking + } + } + + // Throw away first set of results + scanner.Scan() + + cappedHosts := make(map[string]bool) + orderCapped := make([]string, 0, 8) + clusterPowerHist := ring.New(5) + seconds := 0 + + for scanner.Scan() { + + if *logging { + log.Println("Logging PCP...") + split := strings.Split(scanner.Text(), ",") + logFile.WriteString(scanner.Text() + "\n") + + totalPower := 0.0 + for _, powerIndex := range powerIndexes { + power, _ := strconv.ParseFloat(split[powerIndex], 64) + + host := indexToHost[powerIndex] + + powerHistories[host].Value = power + powerHistories[host] = powerHistories[host].Next() + + log.Printf("Host: %s, Power: %f", indexToHost[powerIndex], (power * RAPLUnits)) + + totalPower += power + } + clusterPower := totalPower * RAPLUnits + + clusterPowerHist.Value = clusterPower + clusterPowerHist = clusterPowerHist.Next() + + clusterMean := meanCluster(clusterPowerHist) + + log.Printf("Total power: %f, %d Sec Avg: %f", clusterPower, clusterPowerHist.Len(), clusterMean) + + if clusterMean > hiThreshold { + log.Printf("Need to cap a node") + // Create statics for all victims and choose one to cap + victims := make([]Victim, 0, 8) + + // TODO: Just keep track of the largest to reduce fron nlogn to n + for name, history := range powerHistories { + + histMean := meanPKG(history) + // Consider doing mean calculations using go routines if we need to speed up + victims = append(victims, Victim{Watts: histMean, Host: name}) + //log.Printf("host: %s, Avg: %f", name, histMean * RAPLUnits) + } + + sort.Sort(VictimSorter(victims)) // Sort by average wattage + + // From best victim to worst, if everyone is already capped NOOP + for _, victim := range victims { + // Only cap if host hasn't been capped yet + if !cappedHosts[victim.Host] { + cappedHosts[victim.Host] = true + orderCapped = append(orderCapped, victim.Host) + log.Printf("Capping Victim %s Avg. Wattage: %f", victim.Host, victim.Watts*RAPLUnits) + if err := rapl.Cap(victim.Host, "rapl", 50); err != nil { + log.Print("Error capping host") + } + break // Only cap one machine at at time + } + } + + } else if clusterMean < loThreshold { + + if len(orderCapped) > 0 { + host := orderCapped[len(orderCapped)-1] + orderCapped = orderCapped[:len(orderCapped)-1] + cappedHosts[host] = false + // User RAPL package to send uncap + log.Printf("Uncapping host %s", host) + if err := rapl.Cap(host, "rapl", 100); err != nil { + log.Print("Error uncapping host") + } + } + } + } + + seconds++ + } + }(logging, hiThreshold, loThreshold) + + log.Println("PCP logging started") + + if err := cmd.Start(); err != nil { + log.Fatal(err) + } + + pgid, err := syscall.Getpgid(cmd.Process.Pid) + + select { + case <-quit: + log.Println("Stopping PCP logging in 5 seconds") + time.Sleep(5 * time.Second) + + // http://stackoverflow.com/questions/22470193/why-wont-go-kill-a-child-process-correctly + // kill process and all children processes + syscall.Kill(-pgid, 15) + return + } +} diff --git a/pcp/test/power.go b/pcp/test/power.go deleted file mode 100644 index b061e07..0000000 --- a/pcp/test/power.go +++ /dev/null @@ -1,182 +0,0 @@ -package main - -import ( - "bufio" - "fmt" - "log" - "os" - "os/exec" - "strings" - "syscall" - "time" - "strconv" - "math" - "container/ring" - "sort" -) - -type Victim struct { - Watts float64 - Host string -} - -type VictimSorter []Victim - -func (slice VictimSorter) Len() int { - return len(slice) -} - -func (slice VictimSorter) Less(i, j int) bool { - return slice[i].Watts >= slice[j].Watts -} - -func (slice VictimSorter) Swap(i, j int) { - slice[i], slice[j] = slice[j], slice[i] -} - -var RAPLUnits = math.Pow(2, -32) - -func mean(values *ring.Ring) float64 { - - total := 0.0 - count := 0.0 - - values.Do(func(x interface{}){ - if val, ok := x.(float64); ok { //Add it if we can get a float - total += val - count++ - } - }) - - if count == 0.0 { - return 0.0 - } - - - count /= 2 - - return (total/count) -} - -//func median(values *ring.Ring) { - -//} - - -func main() { - - prefix := "test" - logging := new(bool) - *logging = true - const pcpCommand string = "pmdumptext -m -l -f '' -t 1.0 -d , -c config" - cmd := exec.Command("sh", "-c", pcpCommand) - cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} - startTime := time.Now().Format("20060102150405") - - logFile, err := os.Create("./" + prefix + startTime + ".pcplog") - if err != nil { - log.Fatal(err) - } - - defer logFile.Close() - - pipe, err := cmd.StdoutPipe() - if err != nil { - log.Fatal(err) - } - //cmd.Stdout = stdout - - scanner := bufio.NewScanner(pipe) - - go func(logging *bool) { - // Get names of the columns - scanner.Scan() - - // Write to logfile - logFile.WriteString(scanner.Text() + "\n") - - headers := strings.Split(scanner.Text(), ",") - - powerIndexes := make([]int, 0, 0) - powerAverage := make(map[string]*ring.Ring) - indexToHost := make(map[int]string) - - for i, hostMetric := range headers { - split := strings.Split(hostMetric, ":") - fmt.Printf("%d Host %s: Metric: %s\n", i, split[0], split[1]) - - if strings.Contains(split[1], "RAPL_ENERGY_PKG") { - fmt.Println("Index: ", i) - powerIndexes = append(powerIndexes, i) - indexToHost[i] = split[0] - powerAverage[split[0]] = ring.New(10) // Two PKGS per node, 10 = 5 seconds tracking - } - } - - // Throw away first set of results - scanner.Scan() - - seconds := 0 - for scanner.Scan() { - - if *logging { - log.Println("Logging PCP...") - split := strings.Split(scanner.Text(), ",") - logFile.WriteString(scanner.Text() + "\n") - - - totalPower := 0.0 - for _,powerIndex := range powerIndexes { - power, _ := strconv.ParseFloat(split[powerIndex], 64) - - host := indexToHost[powerIndex] - - powerAverage[host].Value = power - powerAverage[host] = powerAverage[host].Next() - - log.Printf("Host: %s, Index: %d, Power: %f", indexToHost[powerIndex], powerIndex, (power * RAPLUnits)) - - totalPower += power - } - - log.Println("Total power: ", totalPower * RAPLUnits) - - victims := make([]Victim, 8, 8) - - // TODO: Just keep track of the largest to reduce fron nlogn to n - for name,ring := range powerAverage { - victims = append(victims, Victim{mean(ring), name}) - //log.Printf("host: %s, Avg: %f", name, mean(ring) * RAPLUnits) - } - sort.Sort(VictimSorter(victims)) - log.Printf("Current Victim %s Avg. Wattage: %f", victims[0].Host, victims[0].Watts * RAPLUnits) - } - - seconds++ - } - }(logging) - - log.Println("PCP logging started") - - if err := cmd.Start(); err != nil { - log.Fatal(err) - } - - if err := cmd.Wait(); err != nil { - log.Fatal(err) - } - - /* - pgid, err := syscall.Getpgid(cmd.Process.Pid) - - select { - case <-quit: - log.Println("Stopping PCP logging in 5 seconds") - time.Sleep(5 * time.Second) - - // http://stackoverflow.com/questions/22470193/why-wont-go-kill-a-child-process-correctly - // kill process and all children processes - syscall.Kill(-pgid, 15) - return - }*/ -} diff --git a/pcp/test/victim.go b/pcp/test/victim.go deleted file mode 100644 index 1110061..0000000 --- a/pcp/test/victim.go +++ /dev/null @@ -1,3 +0,0 @@ -package main - - diff --git a/pcp/victim.go b/pcp/victim.go new file mode 100644 index 0000000..47c0ebb --- /dev/null +++ b/pcp/victim.go @@ -0,0 +1,20 @@ +package pcp + +type Victim struct { + Watts float64 + Host string +} + +type VictimSorter []Victim + +func (slice VictimSorter) Len() int { + return len(slice) +} + +func (slice VictimSorter) Less(i, j int) bool { + return slice[i].Watts >= slice[j].Watts +} + +func (slice VictimSorter) Swap(i, j int) { + slice[i], slice[j] = slice[j], slice[i] +} diff --git a/scheduler.go b/scheduler.go index 6b11506..93bbdf4 100644 --- a/scheduler.go +++ b/scheduler.go @@ -19,6 +19,8 @@ var master = flag.String("master", "xavier:5050", "Location of leading Mesos mas var tasksFile = flag.String("workload", "", "JSON file containing task definitions") var ignoreWatts = flag.Bool("ignoreWatts", false, "Ignore watts in offers") var pcplogPrefix = flag.String("logPrefix", "", "Prefix for pcplog") +var hiThreshold = flag.Float64("hiThreshold", 0.0, "Upperbound for when we should start capping") +var loThreshold = flag.Float64("loThreshold", 0.0, "Lowerbound for when we should start uncapping") // Short hand args func init() { @@ -26,6 +28,8 @@ func init() { flag.StringVar(tasksFile, "w", "", "JSON file containing task definitions (shorthand)") flag.BoolVar(ignoreWatts, "i", false, "Ignore watts in offers (shorthand)") flag.StringVar(pcplogPrefix, "p", "", "Prefix for pcplog (shorthand)") + flag.Float64Var(hiThreshold, "ht", 700.0, "Upperbound for when we should start capping (shorthand)") + flag.Float64Var(loThreshold, "lt", 400.0, "Lowerbound for when we should start uncapping (shorthand)") } func main() { @@ -36,6 +40,11 @@ func main() { os.Exit(1) } + if *hiThreshold < *loThreshold { + fmt.Println("High threshold is of a lower value than low threhold.") + os.Exit(1) + } + tasks, err := def.TasksFromJSON(*tasksFile) if err != nil || len(tasks) == 0 { fmt.Println("Invalid tasks specification file provided") @@ -61,7 +70,8 @@ func main() { return } - go pcp.Start(scheduler.PCPLog, &scheduler.RecordPCP, *pcplogPrefix) + //go pcp.Start(scheduler.PCPLog, &scheduler.RecordPCP, *pcplogPrefix) + go pcp.StartLogAndDynamicCap(scheduler.PCPLog, &scheduler.RecordPCP, *pcplogPrefix, *hiThreshold, *loThreshold) time.Sleep(1 * time.Second) // Attempt to handle signint to not leave pmdumptext running