From d42b7a3a3b90aa2a83a046ad7ed8437d2126baf7 Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Wed, 15 Feb 2017 19:22:56 -0500 Subject: [PATCH] changed the type of percentage in rapl.Cap(...) from int to float64. Retrofitted power-capping strategies to cap using a float64 value instead of an int. Moved common functions in loganddynamiccap.go and logAndProgressiveExtrema.go into pcp/utils.go. New power-capping strategy that builds on top of extrema, where it caps the victims at different until it can't cap further, in which case it starts uncapping them in the reverse order of capping. --- pcp/logAndProgressiveExtrema.go | 176 +++++++++++++++++++++++ pcp/loganddynamiccap.go | 44 ------ pcp/utils.go | 49 +++++++ rapl/cap.go | 4 +- schedulers/binpackedpistoncapping.go | 4 +- schedulers/bpswMaxMinPistonCapping.go | 4 +- schedulers/bpswMaxMinProacCC.go | 4 +- schedulers/firstfitProacCC.go | 4 +- schedulers/firstfitSortedWattsProacCC.go | 4 +- 9 files changed, 237 insertions(+), 56 deletions(-) create mode 100644 pcp/logAndProgressiveExtrema.go create mode 100644 pcp/utils.go diff --git a/pcp/logAndProgressiveExtrema.go b/pcp/logAndProgressiveExtrema.go new file mode 100644 index 0000000..7936b53 --- /dev/null +++ b/pcp/logAndProgressiveExtrema.go @@ -0,0 +1,176 @@ +package pcp + +import ( + "bitbucket.org/sunybingcloud/electron/rapl" + "bufio" + "container/ring" + "log" + "os" + "os/exec" + "sort" + "strconv" + "strings" + "syscall" + "math" + "bitbucket.org/sunybingcloud/electron/constants" +) + +func round(num float64) int { + return int(math.Floor(num + math.Copysign(0.5, num))) +} + +func getNextCapValue(curCapValue float64, precision int) float64 { + output := math.Pow(10, float64(precision)) + return float64(round(curCapValue * output)) / output +} + +func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, prefix string, hiThreshold, loThreshold float64) { + const pcpCommand string = "pmdumptext -m -l -f '' -t 1.0 -d , -c config" + cmd := exec.Command("sh", "-c", pcpCommand) + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + + if hiThreshold < loThreshold { + log.Println("High threshold is lower than the low threshold") + } + + logFile, err := os.Create("./" + prefix + ".pcplog") + if err != nil { + log.Fatal(err) + } + + defer logFile.Close() + + pipe, err := cmd.StdoutPipe() + if err != nil { + log.Fatal(err) + } + //cmd.Stdout = stdout + + scanner := bufio.NewScanner(pipe) + + go func(logging *bool, hiThreshold, loThreshold float64) { + // Get names of the columns + scanner.Scan() + + // Write to logfile + logFile.WriteString(scanner.Text() + "\n") + + headers := strings.Split(scanner.Text(), ",") + + powerIndexes := make([]int, 0, 0) + powerHistories := make(map[string]*ring.Ring) + indexToHost := make(map[int]string) + + for i, hostMetric := range headers { + metricSplit := strings.Split(hostMetric, ":") + //log.Printf("%d Host %s: Metric: %s\n", i, split[0], split[1]) + + if strings.Contains(metricSplit[1], "RAPL_ENERGY_PKG") || + strings.Contains(metricSplit[1], "RAPL_ENERGY_DRAM") { + //fmt.Println("Index: ", i) + powerIndexes = append(powerIndexes, i) + indexToHost[i] = metricSplit[0] + + // Only create one ring per host + if _, ok := powerHistories[metricSplit[0]]; !ok { + powerHistories[metricSplit[0]] = ring.New(20) // Two PKGS, two DRAM per node, 20 = 5 seconds of tracking + } + } + } + + // Throw away first set of results + scanner.Scan() + + //cappedHosts := make(map[string]bool) + // Keep track of the capped victims and the corresponding cap value. + cappedVictims := make(map[string]float64) + orderCapped := make([]string, 0, 8) + clusterPowerHist := ring.New(5) + seconds := 0 + + for scanner.Scan() { + if *logging { + log.Println("Logging PCP...") + split := strings.Split(scanner.Text(), ",") + logFile.WriteString(scanner.Text() + "\n") + + totalPower := 0.0 + for _, powerIndex := range powerIndexes { + power, _ := strconv.ParseFloat(split[powerIndex], 64) + + host := indexToHost[powerIndex] + + powerHistories[host].Value = power + powerHistories[host] = powerHistories[host].Next() + + log.Printf("Host: %s, Power: %f", indexToHost[powerIndex], (power * RAPLUnits)) + + totalPower += power + } + clusterPower := totalPower * RAPLUnits + + clusterPowerHist.Value = clusterPower + clusterPowerHist = clusterPowerHist.Next() + + clusterMean := averageClusterPowerHistory(clusterPowerHist) + + log.Printf("Total power: %f, %d Sec Avg: %f", clusterPower, clusterPowerHist.Len(), clusterMean) + + if clusterMean >= hiThreshold { + log.Printf("Need to cap a node") + // Create statics for all victims and choose one to cap + victims := make([]Victim, 0, 8) + + // TODO: Just keep track of the largest to reduce fron nlogn to n + for name, history := range powerHistories { + + histMean := averageNodePowerHistory(history) + + // Consider doing mean calculations using go routines if we need to speed up + victims = append(victims, Victim{Watts: histMean, Host: name}) + } + + sort.Sort(VictimSorter(victims)) // Sort by average wattage + + // Finding the best victim to cap. + for i := 0; i < len(victims); i++ { + if curCapValue, ok := cappedVictims[victims[i].Host]; ok { + // checking whether we can continue to cap this host. + // If yes, then we cap it to half the current cap value. + // Else, we push it to the orderedCapped and continue. + if curCapValue > constants.CapThreshold { + newCapValue := getNextCapValue(curCapValue/2.0, 1) + if err := rapl.Cap(victims[0].Host, "rapl", newCapValue); err != nil { + log.Print("Error capping host") + } + // Updating the curCapValue in cappedVictims + cappedVictims[victims[0].Host] = newCapValue + break + } else { + // deleting entry in cappedVictims + delete(cappedVictims, victims[i].Host) + // Now this host can be uncapped. + orderCapped = append(orderCapped, victims[i].Host) + } + } + } + + } else if clusterMean < loThreshold { + if len(orderCapped) > 0 { + host := orderCapped[len(orderCapped)-1] + orderCapped = orderCapped[:len(orderCapped)-1] + // cappedVictims would contain the latest cap value for this host. + newCapValue := getNextCapValue(cappedVictims[host]/2.0, 1) + if err := rapl.Cap(host, "rapl", newCapValue); err != nil { + log.Print("Error capping host") + } + // Adding entry for the host to cappedVictims + cappedVictims[host] = newCapValue // Now this host can be capped again. + } + } + } + seconds++ + } + + }(logging, hiThreshold, loThreshold) +} diff --git a/pcp/loganddynamiccap.go b/pcp/loganddynamiccap.go index 714bf68..f692e5f 100644 --- a/pcp/loganddynamiccap.go +++ b/pcp/loganddynamiccap.go @@ -5,7 +5,6 @@ import ( "bufio" "container/ring" "log" - "math" "os" "os/exec" "sort" @@ -15,49 +14,6 @@ import ( "time" ) -var RAPLUnits = math.Pow(2, -32) - -func averageNodePowerHistory(history *ring.Ring) float64 { - - total := 0.0 - count := 0.0 - - history.Do(func(x interface{}) { - if val, ok := x.(float64); ok { //Add it if we can get a float - total += val - count++ - } - }) - - if count == 0.0 { - return 0.0 - } - - count /= 4 // two PKGs, two DRAM for all nodes currently - - return (total / count) -} - -// TODO: Figure a way to merge this and avgpower -func averageClusterPowerHistory(history *ring.Ring) float64 { - - total := 0.0 - count := 0.0 - - history.Do(func(x interface{}) { - if val, ok := x.(float64); ok { //Add it if we can get a float - total += val - count++ - } - }) - - if count == 0.0 { - return 0.0 - } - - return (total / count) -} - func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, prefix string, hiThreshold, loThreshold float64) { const pcpCommand string = "pmdumptext -m -l -f '' -t 1.0 -d , -c config" cmd := exec.Command("sh", "-c", pcpCommand) diff --git a/pcp/utils.go b/pcp/utils.go new file mode 100644 index 0000000..1e0a260 --- /dev/null +++ b/pcp/utils.go @@ -0,0 +1,49 @@ +package pcp + +import ( + "container/ring" + "math" +) + +var RAPLUnits = math.Pow(2, -32) + +func averageNodePowerHistory(history *ring.Ring) float64 { + + total := 0.0 + count := 0.0 + + history.Do(func(x interface{}) { + if val, ok := x.(float64); ok { //Add it if we can get a float + total += val + count++ + } + }) + + if count == 0.0 { + return 0.0 + } + + count /= 4 // two PKGs, two DRAM for all nodes currently + + return (total / count) +} + +// TODO: Figure a way to merge this and avgpower +func averageClusterPowerHistory(history *ring.Ring) float64 { + + total := 0.0 + count := 0.0 + + history.Do(func(x interface{}) { + if val, ok := x.(float64); ok { //Add it if we can get a float + total += val + count++ + } + }) + + if count == 0.0 { + return 0.0 + } + + return (total / count) +} diff --git a/rapl/cap.go b/rapl/cap.go index b15d352..6b5ec2f 100644 --- a/rapl/cap.go +++ b/rapl/cap.go @@ -6,7 +6,7 @@ import ( "strconv" ) -func Cap(host, username string, percentage int) error { +func Cap(host, username string, percentage float64) error { if percentage > 100 || percentage < 0 { return errors.New("Percentage is out of range") @@ -31,7 +31,7 @@ func Cap(host, username string, percentage int) error { return errors.Wrap(err, "Failed to create session") } - err = session.Run("sudo /misc/shared_data/rdelval1/RAPL_PKG_Throttle.py " + strconv.Itoa(percentage)) + err = session.Run("sudo /misc/shared_data/rdelval1/RAPL_PKG_Throttle.py " + strconv.FormatFloat(percentage, 'f', 2, 64)) if err != nil { return errors.Wrap(err, "Failed to run RAPL script") } diff --git a/schedulers/binpackedpistoncapping.go b/schedulers/binpackedpistoncapping.go index 95225e9..69764a9 100644 --- a/schedulers/binpackedpistoncapping.go +++ b/schedulers/binpackedpistoncapping.go @@ -157,7 +157,7 @@ func (s *BinPackedPistonCapper) Disconnected(sched.SchedulerDriver) { var bpPistonCapValues = make(map[string]float64) // Storing the previous cap value for each host so as to not repeatedly cap the nodes to the same value. (reduces overhead) -var bpPistonPreviousRoundedCapValues = make(map[string]int) +var bpPistonPreviousRoundedCapValues = make(map[string]float64) func (s *BinPackedPistonCapper) startCapping() { go func() { @@ -167,7 +167,7 @@ func (s *BinPackedPistonCapper) startCapping() { // Need to cap each node bpPistonMutex.Lock() for host, capValue := range bpPistonCapValues { - roundedCapValue := int(math.Floor(capValue + 0.5)) + roundedCapValue := float64(int(math.Floor(capValue + 0.5))) // has the cap value changed if prevRoundedCap, ok := bpPistonPreviousRoundedCapValues[host]; ok { if prevRoundedCap != roundedCapValue { diff --git a/schedulers/bpswMaxMinPistonCapping.go b/schedulers/bpswMaxMinPistonCapping.go index fceec50..2e16201 100644 --- a/schedulers/bpswMaxMinPistonCapping.go +++ b/schedulers/bpswMaxMinPistonCapping.go @@ -157,7 +157,7 @@ var bpMaxMinPistonCappingMutex sync.Mutex var bpMaxMinPistonCappingCapValues = make(map[string]float64) // Storing the previous cap value for each host so as to not repeatedly cap the nodes to the same value. (reduces overhead) -var bpMaxMinPistonCappingPreviousRoundedCapValues = make(map[string]int) +var bpMaxMinPistonCappingPreviousRoundedCapValues = make(map[string]float64) func (s *BPSWMaxMinPistonCapping) startCapping() { go func() { @@ -167,7 +167,7 @@ func (s *BPSWMaxMinPistonCapping) startCapping() { // Need to cap each node bpMaxMinPistonCappingMutex.Lock() for host, capValue := range bpMaxMinPistonCappingCapValues { - roundedCapValue := int(math.Floor(capValue + 0.5)) + roundedCapValue := float64(int(math.Floor(capValue + 0.5))) // has the cap value changed if previousRoundedCap, ok := bpMaxMinPistonCappingPreviousRoundedCapValues[host]; ok { if previousRoundedCap != roundedCapValue { diff --git a/schedulers/bpswMaxMinProacCC.go b/schedulers/bpswMaxMinProacCC.go index 564134a..100154d 100644 --- a/schedulers/bpswMaxMinProacCC.go +++ b/schedulers/bpswMaxMinProacCC.go @@ -164,7 +164,7 @@ func (s *BPSWMaxMinProacCC) startCapping() { if bpMaxMinProacCCCapValue > 0.0 { for _, host := range constants.Hosts { // Rounding cap value to nearest int - if err := rapl.Cap(host, "rapl", int(math.Floor(bpMaxMinProacCCCapValue+0.5))); err != nil { + if err := rapl.Cap(host, "rapl", float64(int(math.Floor(bpMaxMinProacCCCapValue+0.5)))); err != nil { log.Println(err) } } @@ -190,7 +190,7 @@ func (s *BPSWMaxMinProacCC) startRecapping() { if s.isRecapping && bpMaxMinProacCCRecapValue > 0.0 { for _, host := range constants.Hosts { // Rounding the recap value to the nearest int - if err := rapl.Cap(host, "rapl", int(math.Floor(bpMaxMinProacCCRecapValue+0.5))); err != nil { + if err := rapl.Cap(host, "rapl", float64(int(math.Floor(bpMaxMinProacCCRecapValue+0.5)))); err != nil { log.Println(err) } } diff --git a/schedulers/firstfitProacCC.go b/schedulers/firstfitProacCC.go index caddfd3..01c163e 100644 --- a/schedulers/firstfitProacCC.go +++ b/schedulers/firstfitProacCC.go @@ -166,7 +166,7 @@ func (s *FirstFitProacCC) startCapping() { if fcfsCurrentCapValue > 0.0 { for _, host := range constants.Hosts { // Rounding curreCapValue to the nearest int. - if err := rapl.Cap(host, "rapl", int(math.Floor(fcfsCurrentCapValue+0.5))); err != nil { + if err := rapl.Cap(host, "rapl", float64(int(math.Floor(fcfsCurrentCapValue+0.5)))); err != nil { log.Println(err) } } @@ -190,7 +190,7 @@ func (s *FirstFitProacCC) startRecapping() { if s.isRecapping && fcfsRecapValue > 0.0 { for _, host := range constants.Hosts { // Rounding curreCapValue to the nearest int. - if err := rapl.Cap(host, "rapl", int(math.Floor(fcfsRecapValue+0.5))); err != nil { + if err := rapl.Cap(host, "rapl", float64(int(math.Floor(fcfsRecapValue+0.5)))); err != nil { log.Println(err) } } diff --git a/schedulers/firstfitSortedWattsProacCC.go b/schedulers/firstfitSortedWattsProacCC.go index e00590c..1792cfd 100644 --- a/schedulers/firstfitSortedWattsProacCC.go +++ b/schedulers/firstfitSortedWattsProacCC.go @@ -179,7 +179,7 @@ func (s *FirstFitSortedWattsProacCC) startCapping() { if rankedCurrentCapValue > 0.0 { for _, host := range constants.Hosts { // Rounding currentCapValue to the nearest int. - if err := rapl.Cap(host, "rapl", int(math.Floor(rankedCurrentCapValue+0.5))); err != nil { + if err := rapl.Cap(host, "rapl", float64(int(math.Floor(rankedCurrentCapValue+0.5)))); err != nil { log.Println(err) } } @@ -203,7 +203,7 @@ func (s *FirstFitSortedWattsProacCC) startRecapping() { if s.isRecapping && rankedRecapValue > 0.0 { for _, host := range constants.Hosts { // Rounding currentCapValue to the nearest int. - if err := rapl.Cap(host, "rapl", int(math.Floor(rankedRecapValue+0.5))); err != nil { + if err := rapl.Cap(host, "rapl", float64(int(math.Floor(rankedRecapValue+0.5)))); err != nil { log.Println(err) } }