From d5d3c87ff24dc7fb534ba0a0e85a2b6c42605101 Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Wed, 15 Feb 2017 19:15:18 -0500 Subject: [PATCH 01/12] added a constant called CapThreshold that defines the lower limit below which we shouldn't cap a node. --- constants/constants.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/constants/constants.go b/constants/constants.go index bbacf42..782f33d 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -43,3 +43,6 @@ var Tolerance = 0.70 // Window size for running average var ConsiderationWindowSize = 20 + +// Threshold below which a host should be capped +var CapThreshold = 12.5 From d42b7a3a3b90aa2a83a046ad7ed8437d2126baf7 Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Wed, 15 Feb 2017 19:22:56 -0500 Subject: [PATCH 02/12] changed the type of percentage in rapl.Cap(...) from int to float64. Retrofitted power-capping strategies to cap using a float64 value instead of an int. Moved common functions in loganddynamiccap.go and logAndProgressiveExtrema.go into pcp/utils.go. New power-capping strategy that builds on top of extrema, where it caps the victims at different until it can't cap further, in which case it starts uncapping them in the reverse order of capping. --- pcp/logAndProgressiveExtrema.go | 176 +++++++++++++++++++++++ pcp/loganddynamiccap.go | 44 ------ pcp/utils.go | 49 +++++++ rapl/cap.go | 4 +- schedulers/binpackedpistoncapping.go | 4 +- schedulers/bpswMaxMinPistonCapping.go | 4 +- schedulers/bpswMaxMinProacCC.go | 4 +- schedulers/firstfitProacCC.go | 4 +- schedulers/firstfitSortedWattsProacCC.go | 4 +- 9 files changed, 237 insertions(+), 56 deletions(-) create mode 100644 pcp/logAndProgressiveExtrema.go create mode 100644 pcp/utils.go diff --git a/pcp/logAndProgressiveExtrema.go b/pcp/logAndProgressiveExtrema.go new file mode 100644 index 0000000..7936b53 --- /dev/null +++ b/pcp/logAndProgressiveExtrema.go @@ -0,0 +1,176 @@ +package pcp + +import ( + "bitbucket.org/sunybingcloud/electron/rapl" + "bufio" + "container/ring" + "log" + "os" + "os/exec" + "sort" + "strconv" + "strings" + "syscall" + "math" + "bitbucket.org/sunybingcloud/electron/constants" +) + +func round(num float64) int { + return int(math.Floor(num + math.Copysign(0.5, num))) +} + +func getNextCapValue(curCapValue float64, precision int) float64 { + output := math.Pow(10, float64(precision)) + return float64(round(curCapValue * output)) / output +} + +func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, prefix string, hiThreshold, loThreshold float64) { + const pcpCommand string = "pmdumptext -m -l -f '' -t 1.0 -d , -c config" + cmd := exec.Command("sh", "-c", pcpCommand) + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + + if hiThreshold < loThreshold { + log.Println("High threshold is lower than the low threshold") + } + + logFile, err := os.Create("./" + prefix + ".pcplog") + if err != nil { + log.Fatal(err) + } + + defer logFile.Close() + + pipe, err := cmd.StdoutPipe() + if err != nil { + log.Fatal(err) + } + //cmd.Stdout = stdout + + scanner := bufio.NewScanner(pipe) + + go func(logging *bool, hiThreshold, loThreshold float64) { + // Get names of the columns + scanner.Scan() + + // Write to logfile + logFile.WriteString(scanner.Text() + "\n") + + headers := strings.Split(scanner.Text(), ",") + + powerIndexes := make([]int, 0, 0) + powerHistories := make(map[string]*ring.Ring) + indexToHost := make(map[int]string) + + for i, hostMetric := range headers { + metricSplit := strings.Split(hostMetric, ":") + //log.Printf("%d Host %s: Metric: %s\n", i, split[0], split[1]) + + if strings.Contains(metricSplit[1], "RAPL_ENERGY_PKG") || + strings.Contains(metricSplit[1], "RAPL_ENERGY_DRAM") { + //fmt.Println("Index: ", i) + powerIndexes = append(powerIndexes, i) + indexToHost[i] = metricSplit[0] + + // Only create one ring per host + if _, ok := powerHistories[metricSplit[0]]; !ok { + powerHistories[metricSplit[0]] = ring.New(20) // Two PKGS, two DRAM per node, 20 = 5 seconds of tracking + } + } + } + + // Throw away first set of results + scanner.Scan() + + //cappedHosts := make(map[string]bool) + // Keep track of the capped victims and the corresponding cap value. + cappedVictims := make(map[string]float64) + orderCapped := make([]string, 0, 8) + clusterPowerHist := ring.New(5) + seconds := 0 + + for scanner.Scan() { + if *logging { + log.Println("Logging PCP...") + split := strings.Split(scanner.Text(), ",") + logFile.WriteString(scanner.Text() + "\n") + + totalPower := 0.0 + for _, powerIndex := range powerIndexes { + power, _ := strconv.ParseFloat(split[powerIndex], 64) + + host := indexToHost[powerIndex] + + powerHistories[host].Value = power + powerHistories[host] = powerHistories[host].Next() + + log.Printf("Host: %s, Power: %f", indexToHost[powerIndex], (power * RAPLUnits)) + + totalPower += power + } + clusterPower := totalPower * RAPLUnits + + clusterPowerHist.Value = clusterPower + clusterPowerHist = clusterPowerHist.Next() + + clusterMean := averageClusterPowerHistory(clusterPowerHist) + + log.Printf("Total power: %f, %d Sec Avg: %f", clusterPower, clusterPowerHist.Len(), clusterMean) + + if clusterMean >= hiThreshold { + log.Printf("Need to cap a node") + // Create statics for all victims and choose one to cap + victims := make([]Victim, 0, 8) + + // TODO: Just keep track of the largest to reduce fron nlogn to n + for name, history := range powerHistories { + + histMean := averageNodePowerHistory(history) + + // Consider doing mean calculations using go routines if we need to speed up + victims = append(victims, Victim{Watts: histMean, Host: name}) + } + + sort.Sort(VictimSorter(victims)) // Sort by average wattage + + // Finding the best victim to cap. + for i := 0; i < len(victims); i++ { + if curCapValue, ok := cappedVictims[victims[i].Host]; ok { + // checking whether we can continue to cap this host. + // If yes, then we cap it to half the current cap value. + // Else, we push it to the orderedCapped and continue. + if curCapValue > constants.CapThreshold { + newCapValue := getNextCapValue(curCapValue/2.0, 1) + if err := rapl.Cap(victims[0].Host, "rapl", newCapValue); err != nil { + log.Print("Error capping host") + } + // Updating the curCapValue in cappedVictims + cappedVictims[victims[0].Host] = newCapValue + break + } else { + // deleting entry in cappedVictims + delete(cappedVictims, victims[i].Host) + // Now this host can be uncapped. + orderCapped = append(orderCapped, victims[i].Host) + } + } + } + + } else if clusterMean < loThreshold { + if len(orderCapped) > 0 { + host := orderCapped[len(orderCapped)-1] + orderCapped = orderCapped[:len(orderCapped)-1] + // cappedVictims would contain the latest cap value for this host. + newCapValue := getNextCapValue(cappedVictims[host]/2.0, 1) + if err := rapl.Cap(host, "rapl", newCapValue); err != nil { + log.Print("Error capping host") + } + // Adding entry for the host to cappedVictims + cappedVictims[host] = newCapValue // Now this host can be capped again. + } + } + } + seconds++ + } + + }(logging, hiThreshold, loThreshold) +} diff --git a/pcp/loganddynamiccap.go b/pcp/loganddynamiccap.go index 714bf68..f692e5f 100644 --- a/pcp/loganddynamiccap.go +++ b/pcp/loganddynamiccap.go @@ -5,7 +5,6 @@ import ( "bufio" "container/ring" "log" - "math" "os" "os/exec" "sort" @@ -15,49 +14,6 @@ import ( "time" ) -var RAPLUnits = math.Pow(2, -32) - -func averageNodePowerHistory(history *ring.Ring) float64 { - - total := 0.0 - count := 0.0 - - history.Do(func(x interface{}) { - if val, ok := x.(float64); ok { //Add it if we can get a float - total += val - count++ - } - }) - - if count == 0.0 { - return 0.0 - } - - count /= 4 // two PKGs, two DRAM for all nodes currently - - return (total / count) -} - -// TODO: Figure a way to merge this and avgpower -func averageClusterPowerHistory(history *ring.Ring) float64 { - - total := 0.0 - count := 0.0 - - history.Do(func(x interface{}) { - if val, ok := x.(float64); ok { //Add it if we can get a float - total += val - count++ - } - }) - - if count == 0.0 { - return 0.0 - } - - return (total / count) -} - func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, prefix string, hiThreshold, loThreshold float64) { const pcpCommand string = "pmdumptext -m -l -f '' -t 1.0 -d , -c config" cmd := exec.Command("sh", "-c", pcpCommand) diff --git a/pcp/utils.go b/pcp/utils.go new file mode 100644 index 0000000..1e0a260 --- /dev/null +++ b/pcp/utils.go @@ -0,0 +1,49 @@ +package pcp + +import ( + "container/ring" + "math" +) + +var RAPLUnits = math.Pow(2, -32) + +func averageNodePowerHistory(history *ring.Ring) float64 { + + total := 0.0 + count := 0.0 + + history.Do(func(x interface{}) { + if val, ok := x.(float64); ok { //Add it if we can get a float + total += val + count++ + } + }) + + if count == 0.0 { + return 0.0 + } + + count /= 4 // two PKGs, two DRAM for all nodes currently + + return (total / count) +} + +// TODO: Figure a way to merge this and avgpower +func averageClusterPowerHistory(history *ring.Ring) float64 { + + total := 0.0 + count := 0.0 + + history.Do(func(x interface{}) { + if val, ok := x.(float64); ok { //Add it if we can get a float + total += val + count++ + } + }) + + if count == 0.0 { + return 0.0 + } + + return (total / count) +} diff --git a/rapl/cap.go b/rapl/cap.go index b15d352..6b5ec2f 100644 --- a/rapl/cap.go +++ b/rapl/cap.go @@ -6,7 +6,7 @@ import ( "strconv" ) -func Cap(host, username string, percentage int) error { +func Cap(host, username string, percentage float64) error { if percentage > 100 || percentage < 0 { return errors.New("Percentage is out of range") @@ -31,7 +31,7 @@ func Cap(host, username string, percentage int) error { return errors.Wrap(err, "Failed to create session") } - err = session.Run("sudo /misc/shared_data/rdelval1/RAPL_PKG_Throttle.py " + strconv.Itoa(percentage)) + err = session.Run("sudo /misc/shared_data/rdelval1/RAPL_PKG_Throttle.py " + strconv.FormatFloat(percentage, 'f', 2, 64)) if err != nil { return errors.Wrap(err, "Failed to run RAPL script") } diff --git a/schedulers/binpackedpistoncapping.go b/schedulers/binpackedpistoncapping.go index 95225e9..69764a9 100644 --- a/schedulers/binpackedpistoncapping.go +++ b/schedulers/binpackedpistoncapping.go @@ -157,7 +157,7 @@ func (s *BinPackedPistonCapper) Disconnected(sched.SchedulerDriver) { var bpPistonCapValues = make(map[string]float64) // Storing the previous cap value for each host so as to not repeatedly cap the nodes to the same value. (reduces overhead) -var bpPistonPreviousRoundedCapValues = make(map[string]int) +var bpPistonPreviousRoundedCapValues = make(map[string]float64) func (s *BinPackedPistonCapper) startCapping() { go func() { @@ -167,7 +167,7 @@ func (s *BinPackedPistonCapper) startCapping() { // Need to cap each node bpPistonMutex.Lock() for host, capValue := range bpPistonCapValues { - roundedCapValue := int(math.Floor(capValue + 0.5)) + roundedCapValue := float64(int(math.Floor(capValue + 0.5))) // has the cap value changed if prevRoundedCap, ok := bpPistonPreviousRoundedCapValues[host]; ok { if prevRoundedCap != roundedCapValue { diff --git a/schedulers/bpswMaxMinPistonCapping.go b/schedulers/bpswMaxMinPistonCapping.go index fceec50..2e16201 100644 --- a/schedulers/bpswMaxMinPistonCapping.go +++ b/schedulers/bpswMaxMinPistonCapping.go @@ -157,7 +157,7 @@ var bpMaxMinPistonCappingMutex sync.Mutex var bpMaxMinPistonCappingCapValues = make(map[string]float64) // Storing the previous cap value for each host so as to not repeatedly cap the nodes to the same value. (reduces overhead) -var bpMaxMinPistonCappingPreviousRoundedCapValues = make(map[string]int) +var bpMaxMinPistonCappingPreviousRoundedCapValues = make(map[string]float64) func (s *BPSWMaxMinPistonCapping) startCapping() { go func() { @@ -167,7 +167,7 @@ func (s *BPSWMaxMinPistonCapping) startCapping() { // Need to cap each node bpMaxMinPistonCappingMutex.Lock() for host, capValue := range bpMaxMinPistonCappingCapValues { - roundedCapValue := int(math.Floor(capValue + 0.5)) + roundedCapValue := float64(int(math.Floor(capValue + 0.5))) // has the cap value changed if previousRoundedCap, ok := bpMaxMinPistonCappingPreviousRoundedCapValues[host]; ok { if previousRoundedCap != roundedCapValue { diff --git a/schedulers/bpswMaxMinProacCC.go b/schedulers/bpswMaxMinProacCC.go index 564134a..100154d 100644 --- a/schedulers/bpswMaxMinProacCC.go +++ b/schedulers/bpswMaxMinProacCC.go @@ -164,7 +164,7 @@ func (s *BPSWMaxMinProacCC) startCapping() { if bpMaxMinProacCCCapValue > 0.0 { for _, host := range constants.Hosts { // Rounding cap value to nearest int - if err := rapl.Cap(host, "rapl", int(math.Floor(bpMaxMinProacCCCapValue+0.5))); err != nil { + if err := rapl.Cap(host, "rapl", float64(int(math.Floor(bpMaxMinProacCCCapValue+0.5)))); err != nil { log.Println(err) } } @@ -190,7 +190,7 @@ func (s *BPSWMaxMinProacCC) startRecapping() { if s.isRecapping && bpMaxMinProacCCRecapValue > 0.0 { for _, host := range constants.Hosts { // Rounding the recap value to the nearest int - if err := rapl.Cap(host, "rapl", int(math.Floor(bpMaxMinProacCCRecapValue+0.5))); err != nil { + if err := rapl.Cap(host, "rapl", float64(int(math.Floor(bpMaxMinProacCCRecapValue+0.5)))); err != nil { log.Println(err) } } diff --git a/schedulers/firstfitProacCC.go b/schedulers/firstfitProacCC.go index caddfd3..01c163e 100644 --- a/schedulers/firstfitProacCC.go +++ b/schedulers/firstfitProacCC.go @@ -166,7 +166,7 @@ func (s *FirstFitProacCC) startCapping() { if fcfsCurrentCapValue > 0.0 { for _, host := range constants.Hosts { // Rounding curreCapValue to the nearest int. - if err := rapl.Cap(host, "rapl", int(math.Floor(fcfsCurrentCapValue+0.5))); err != nil { + if err := rapl.Cap(host, "rapl", float64(int(math.Floor(fcfsCurrentCapValue+0.5)))); err != nil { log.Println(err) } } @@ -190,7 +190,7 @@ func (s *FirstFitProacCC) startRecapping() { if s.isRecapping && fcfsRecapValue > 0.0 { for _, host := range constants.Hosts { // Rounding curreCapValue to the nearest int. - if err := rapl.Cap(host, "rapl", int(math.Floor(fcfsRecapValue+0.5))); err != nil { + if err := rapl.Cap(host, "rapl", float64(int(math.Floor(fcfsRecapValue+0.5)))); err != nil { log.Println(err) } } diff --git a/schedulers/firstfitSortedWattsProacCC.go b/schedulers/firstfitSortedWattsProacCC.go index e00590c..1792cfd 100644 --- a/schedulers/firstfitSortedWattsProacCC.go +++ b/schedulers/firstfitSortedWattsProacCC.go @@ -179,7 +179,7 @@ func (s *FirstFitSortedWattsProacCC) startCapping() { if rankedCurrentCapValue > 0.0 { for _, host := range constants.Hosts { // Rounding currentCapValue to the nearest int. - if err := rapl.Cap(host, "rapl", int(math.Floor(rankedCurrentCapValue+0.5))); err != nil { + if err := rapl.Cap(host, "rapl", float64(int(math.Floor(rankedCurrentCapValue+0.5)))); err != nil { log.Println(err) } } @@ -203,7 +203,7 @@ func (s *FirstFitSortedWattsProacCC) startRecapping() { if s.isRecapping && rankedRecapValue > 0.0 { for _, host := range constants.Hosts { // Rounding currentCapValue to the nearest int. - if err := rapl.Cap(host, "rapl", int(math.Floor(rankedRecapValue+0.5))); err != nil { + if err := rapl.Cap(host, "rapl", float64(int(math.Floor(rankedRecapValue+0.5)))); err != nil { log.Println(err) } } From 726c0555ed35f3b15174b7d6e0169c53fcb717e3 Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Mon, 20 Feb 2017 20:55:06 -0500 Subject: [PATCH 03/12] Fixed corner cases in progressive extrema -- When a node is capped and the new cap value is above a threshold then that node can be capped or uncapped in the next cycle. If the new cap value is equal to the threshold then the node cannot be capped further and can only be uncapped. When the node is uncapped and the newUncapValue is below 100 then the node can be capped or uncapped in the next cycle. If the newUncapValue is 100 then the node can only be capped. --- pcp/logAndProgressiveExtrema.go | 169 ++++++++++++++++++++++++++------ scheduler.go | 5 +- 2 files changed, 143 insertions(+), 31 deletions(-) diff --git a/pcp/logAndProgressiveExtrema.go b/pcp/logAndProgressiveExtrema.go index 7936b53..5eefb8c 100644 --- a/pcp/logAndProgressiveExtrema.go +++ b/pcp/logAndProgressiveExtrema.go @@ -1,18 +1,19 @@ package pcp import ( + "bitbucket.org/sunybingcloud/electron/constants" "bitbucket.org/sunybingcloud/electron/rapl" "bufio" "container/ring" "log" + "math" "os" "os/exec" "sort" "strconv" "strings" "syscall" - "math" - "bitbucket.org/sunybingcloud/electron/constants" + "time" ) func round(num float64) int { @@ -20,17 +21,25 @@ func round(num float64) int { } func getNextCapValue(curCapValue float64, precision int) float64 { + curCapValue /= 2.0 output := math.Pow(10, float64(precision)) - return float64(round(curCapValue * output)) / output + return float64(round(curCapValue*output)) / output +} + +func getNextUncapValue(curCapValue float64, precision int) float64 { + curCapValue *= 2.0 + output := math.Pow(10, float64(precision)) + return float64(round(curCapValue*output)) / output } func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, prefix string, hiThreshold, loThreshold float64) { + log.Println("Inside Log and Progressive Extrema") const pcpCommand string = "pmdumptext -m -l -f '' -t 1.0 -d , -c config" cmd := exec.Command("sh", "-c", pcpCommand) cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} if hiThreshold < loThreshold { - log.Println("High threshold is lower than the low threshold") + log.Println("High threshold is lower than low threshold!") } logFile, err := os.Create("./" + prefix + ".pcplog") @@ -81,10 +90,11 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref // Throw away first set of results scanner.Scan() - //cappedHosts := make(map[string]bool) + // cappedHosts := make(map[string]bool) // Keep track of the capped victims and the corresponding cap value. cappedVictims := make(map[string]float64) orderCapped := make([]string, 0, 8) + orderCappedVictims := make(map[string]float64) // Parallel data structure to orderCapped to make it possible to search victims, that are to be uncapped, faster. clusterPowerHist := ring.New(5) seconds := 0 @@ -117,7 +127,7 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref log.Printf("Total power: %f, %d Sec Avg: %f", clusterPower, clusterPowerHist.Len(), clusterMean) if clusterMean >= hiThreshold { - log.Printf("Need to cap a node") + log.Println("Need to cap a node") // Create statics for all victims and choose one to cap victims := make([]Victim, 0, 8) @@ -132,40 +142,121 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref sort.Sort(VictimSorter(victims)) // Sort by average wattage - // Finding the best victim to cap. + // Finding the best victim to cap in a round robin manner + newVictimFound := false + alreadyCappedHosts := []string{} // Hosts of already capped hosts in decreasing order of recent power consumption for i := 0; i < len(victims); i++ { - if curCapValue, ok := cappedVictims[victims[i].Host]; ok { - // checking whether we can continue to cap this host. - // If yes, then we cap it to half the current cap value. - // Else, we push it to the orderedCapped and continue. - if curCapValue > constants.CapThreshold { - newCapValue := getNextCapValue(curCapValue/2.0, 1) - if err := rapl.Cap(victims[0].Host, "rapl", newCapValue); err != nil { - log.Print("Error capping host") - } - // Updating the curCapValue in cappedVictims - cappedVictims[victims[0].Host] = newCapValue - break + // Try to pick a victim that hasn't been capped yet. + if _, ok := cappedVictims[victims[i].Host]; !ok { + // If this victim is present in orderCapped, then we move on to find another victim that we can cap. + if _, ok := orderCappedVictims[victims[i].Host]; ok { + // Adding the host to the alreadyCappedHosts + alreadyCappedHosts = append(alreadyCappedHosts, victims[i].Host) + continue + } + // Need to cap this victim and add it to the cappedVictims + if err := rapl.Cap(victims[i].Host, "rapl", 50.0); err != nil { + log.Printf("Error capping host %s", victims[i].Host) } else { - // deleting entry in cappedVictims - delete(cappedVictims, victims[i].Host) - // Now this host can be uncapped. + log.Printf("Capped host[%s] at %f", victims[i].Host, 50.0) + cappedVictims[victims[i].Host] = 50.0 + newVictimFound = true + // This node can be uncapped and hence adding to orderCapped orderCapped = append(orderCapped, victims[i].Host) + orderCappedVictims[victims[i].Host] = 50.0 + break // breaking only on successful cap + } + } else { + alreadyCappedHosts = append(alreadyCappedHosts, victims[i].Host) + } + } + // If no new victim found, then we need to cap the best victim among the already capped victims + if !newVictimFound { + for i := 0; i < len(alreadyCappedHosts); i++ { + // Checking if this node can be uncapped too + if capValue, ok := orderCappedVictims[alreadyCappedHosts[i]]; ok { + // if capValue is greater than the threshold then cap, else continue + if capValue > constants.CapThreshold { + newCapValue := getNextCapValue(capValue, 2) + if err := rapl.Cap(alreadyCappedHosts[i], "rapl", newCapValue); err != nil { + log.Printf("Error capping host %s", alreadyCappedHosts[i]) + } else { + // successful cap + log.Printf("Capped host[%s] at %f", alreadyCappedHosts[i], newCapValue) + // Checking whether this victim can be capped further + if newCapValue <= constants.CapThreshold { + // deleting victim from cappedVictims + delete(cappedVictims, alreadyCappedHosts[i]) + // updating the cap value in orderCappedVictims + orderCappedVictims[alreadyCappedHosts[i]] = newCapValue + } else { + // updating the cap value + cappedVictims[alreadyCappedHosts[i]] = newCapValue + orderCappedVictims[alreadyCappedHosts[i]] = newCapValue + } + break // exiting only on a successful cap. + } + } else { + // Do nothing + } + } else { + // This host can definitely be capped. + // Cap this host to half it's current cap value and update the new cap value in cappedVictims and orderCappedVictims + // If we have hit the threshold then we add this host to orderCapped to indicate that it needs to be uncapped. + newCapValue := getNextCapValue(cappedVictims[alreadyCappedHosts[i]], 2) + if err := rapl.Cap(alreadyCappedHosts[i], "rapl", newCapValue); err != nil { + log.Printf("Error capping host %s", alreadyCappedHosts[i]) + } else { + log.Printf("Capped host[%s] at %f", alreadyCappedHosts[i], newCapValue) + // Checking whether this victim can be capped further + if newCapValue <= constants.CapThreshold { + // deleting victim from cappedVictims + delete(cappedVictims, alreadyCappedHosts[i]) + // staging victim for uncapping + orderCapped = append(orderCapped, alreadyCappedHosts[i]) + orderCappedVictims[alreadyCappedHosts[i]] = constants.CapThreshold + } else { + // Updating the cap value of the victim + cappedVictims[alreadyCappedHosts[i]] = newCapValue + } + break // exiting only on a successful uncap + } } } } } else if clusterMean < loThreshold { + log.Println("Need to uncap a node") if len(orderCapped) > 0 { host := orderCapped[len(orderCapped)-1] - orderCapped = orderCapped[:len(orderCapped)-1] - // cappedVictims would contain the latest cap value for this host. - newCapValue := getNextCapValue(cappedVictims[host]/2.0, 1) - if err := rapl.Cap(host, "rapl", newCapValue); err != nil { - log.Print("Error capping host") + // Removing victim from orderCapped only if it has been completely uncapped to 100% + if cappedVictims[host] == 100.0 { + orderCapped = orderCapped[:len(orderCapped)-1] + delete(orderCappedVictims, host) + } else { + newCapValue := getNextUncapValue(cappedVictims[host], 2) + // Uncapping the victim + if err := rapl.Cap(host, "rapl", newCapValue); err != nil { + log.Printf("Error uncapping host %s", host) + } else { + // Successful uncap + log.Printf("Uncapped host[%s] to %f", host, newCapValue) + // If the new cap value is 100, then this node cannot be uncapped + if newCapValue == 100.0 { + orderCapped = orderCapped[:len(orderCapped)-1] + delete(orderCappedVictims, host) + // Updating cappedVictims + cappedVictims[host] = newCapValue + } else if newCapValue > constants.CapThreshold { + // This host can be capped + cappedVictims[host] = newCapValue + // Updating orderCappedVictims + orderCappedVictims[host] = newCapValue + } + } } - // Adding entry for the host to cappedVictims - cappedVictims[host] = newCapValue // Now this host can be capped again. + } else { + // No node has been capped until now. } } } @@ -173,4 +264,24 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref } }(logging, hiThreshold, loThreshold) + + log.Println("PCP logging started") + + if err := cmd.Start(); err != nil { + log.Fatal(err) + } + + pgid, err := syscall.Getpgid(cmd.Process.Pid) + + select { + case <-quit: + log.Println("Stopping PCP logging in 5 seconds") + time.Sleep(5 * time.Second) + + // http://stackoverflow.com/questions/22470193/why-wont-go-kill-a-child-process-correctly + // kill process and all children processes + syscall.Kill(-pgid, 15) + return + } + } diff --git a/scheduler.go b/scheduler.go index 9b6dc45..ba13663 100644 --- a/scheduler.go +++ b/scheduler.go @@ -60,7 +60,7 @@ func main() { startTime := time.Now().Format("20060102150405") logPrefix := *pcplogPrefix + "_" + startTime - scheduler := schedulers.NewBinPackedPistonCapper(tasks, *wattsAsAResource, logPrefix, *classMapWatts) + scheduler := schedulers.NewFirstFit(tasks, *wattsAsAResource, logPrefix, *classMapWatts) driver, err := sched.NewMesosSchedulerDriver(sched.DriverConfig{ Master: *master, Framework: &mesos.FrameworkInfo{ @@ -74,8 +74,9 @@ func main() { return } - go pcp.Start(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix) + //go pcp.Start(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix) //go pcp.StartPCPLogAndExtremaDynamicCap(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix, *hiThreshold, *loThreshold) + go pcp.StartPCPLogAndProgressiveExtremaCap(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix, *hiThreshold, *loThreshold) time.Sleep(1 * time.Second) // Take a second between starting PCP log and continuing // Attempt to handle signint to not leave pmdumptext running From ef6f74cd814304945a504c06aa22fe3670f0cdc2 Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Mon, 20 Feb 2017 22:42:07 -0500 Subject: [PATCH 04/12] removed TODO for consolidating common scheduler struct members into base.go. --- schedulers/README.md | 1 - 1 file changed, 1 deletion(-) diff --git a/schedulers/README.md b/schedulers/README.md index cec4efc..476e114 100644 --- a/schedulers/README.md +++ b/schedulers/README.md @@ -7,7 +7,6 @@ To Do: * Fix the race condition on 'tasksRunning' in proactiveclusterwidecappingfcfs.go and proactiveclusterwidecappingranked.go * **Critical**: Separate the capping strategies from the scheduling algorithms and make it possible to use any capping strategy with any scheduler. * Create a package that would contain routines to perform various logging and move helpers.coLocated(...) into that. - * Move all the common struct members from all schedulers into base.go. Scheduling Algorithms: From 7d93215a7c5be0fca2e7899454dff70c58295fda Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Mon, 20 Feb 2017 22:55:56 -0500 Subject: [PATCH 05/12] added TODO for a generic task sorter that sorts based on any kind of resource, instead of having a sorter for each kind of resource. --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 64d6fcb..2e494da 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,7 @@ To Do: * Have a centralised logFile that can be filtered by identifier. All electron logs should go into this file. * Make def.Task an interface for further modularization and flexibility. * Convert def#WattsToConsider(...) to be a receiver of def.Task and change the name of it to Watts(...). + * Have a generic sorter for task resources instead of having one for each kind of resource. **Requires [Performance Co-Pilot](http://pcp.io/) tool pmdumptext to be installed on the machine on which electron is launched for logging to work and PCP collector agents installed From 531c1a120b674a21ad88c58e3d5a423cceb34019 Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Mon, 20 Feb 2017 23:49:27 -0500 Subject: [PATCH 06/12] Fixed messages for commandline arguments. --- scheduler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scheduler.go b/scheduler.go index ba13663..b21b567 100644 --- a/scheduler.go +++ b/scheduler.go @@ -27,11 +27,11 @@ var classMapWatts = flag.Bool("classMapWatts", false, "Enable mapping of watts t func init() { flag.StringVar(master, "m", "xavier:5050", "Location of leading Mesos master (shorthand)") flag.StringVar(tasksFile, "w", "", "JSON file containing task definitions (shorthand)") - flag.BoolVar(wattsAsAResource, "waar", false, "Enable Watts as a Resource") + flag.BoolVar(wattsAsAResource, "waar", false, "Enable Watts as a Resource (shorthand)") flag.StringVar(pcplogPrefix, "p", "", "Prefix for pcplog (shorthand)") flag.Float64Var(hiThreshold, "ht", 700.0, "Upperbound for when we should start capping (shorthand)") flag.Float64Var(loThreshold, "lt", 400.0, "Lowerbound for when we should start uncapping (shorthand)") - flag.BoolVar(classMapWatts, "cmw", false, "Enable mapping of watts to power class of node") + flag.BoolVar(classMapWatts, "cmw", false, "Enable mapping of watts to power class of node (shorthand)") } func main() { From 373ba63933d03be709c02ede3d4cb2cbab2184ba Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Tue, 21 Feb 2017 21:05:47 -0500 Subject: [PATCH 07/12] fixed bugs. Made sure that we don't cap a victim below the threshold. Made sure the victims the can be capped and uncapped are maintained in both cappedVictims and orderCappedVictims. --- pcp/logAndProgressiveExtrema.go | 107 ++++++++++++-------------------- 1 file changed, 38 insertions(+), 69 deletions(-) diff --git a/pcp/logAndProgressiveExtrema.go b/pcp/logAndProgressiveExtrema.go index 5eefb8c..b844fe4 100644 --- a/pcp/logAndProgressiveExtrema.go +++ b/pcp/logAndProgressiveExtrema.go @@ -26,12 +26,6 @@ func getNextCapValue(curCapValue float64, precision int) float64 { return float64(round(curCapValue*output)) / output } -func getNextUncapValue(curCapValue float64, precision int) float64 { - curCapValue *= 2.0 - output := math.Pow(10, float64(precision)) - return float64(round(curCapValue*output)) / output -} - func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, prefix string, hiThreshold, loThreshold float64) { log.Println("Inside Log and Progressive Extrema") const pcpCommand string = "pmdumptext -m -l -f '' -t 1.0 -d , -c config" @@ -128,6 +122,8 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref if clusterMean >= hiThreshold { log.Println("Need to cap a node") + log.Println(cappedVictims) + log.Println(orderCappedVictims) // Create statics for all victims and choose one to cap victims := make([]Victim, 0, 8) @@ -148,10 +144,8 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref for i := 0; i < len(victims); i++ { // Try to pick a victim that hasn't been capped yet. if _, ok := cappedVictims[victims[i].Host]; !ok { - // If this victim is present in orderCapped, then we move on to find another victim that we can cap. + // If this victim can't be capped further, then we move on to find another victim that we can cap. if _, ok := orderCappedVictims[victims[i].Host]; ok { - // Adding the host to the alreadyCappedHosts - alreadyCappedHosts = append(alreadyCappedHosts, victims[i].Host) continue } // Need to cap this victim and add it to the cappedVictims @@ -173,90 +167,65 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref // If no new victim found, then we need to cap the best victim among the already capped victims if !newVictimFound { for i := 0; i < len(alreadyCappedHosts); i++ { - // Checking if this node can be uncapped too - if capValue, ok := orderCappedVictims[alreadyCappedHosts[i]]; ok { - // if capValue is greater than the threshold then cap, else continue - if capValue > constants.CapThreshold { - newCapValue := getNextCapValue(capValue, 2) - if err := rapl.Cap(alreadyCappedHosts[i], "rapl", newCapValue); err != nil { - log.Printf("Error capping host %s", alreadyCappedHosts[i]) - } else { - // successful cap - log.Printf("Capped host[%s] at %f", alreadyCappedHosts[i], newCapValue) - // Checking whether this victim can be capped further - if newCapValue <= constants.CapThreshold { - // deleting victim from cappedVictims - delete(cappedVictims, alreadyCappedHosts[i]) - // updating the cap value in orderCappedVictims - orderCappedVictims[alreadyCappedHosts[i]] = newCapValue - } else { - // updating the cap value - cappedVictims[alreadyCappedHosts[i]] = newCapValue - orderCappedVictims[alreadyCappedHosts[i]] = newCapValue - } - break // exiting only on a successful cap. - } - } else { - // Do nothing - } - } else { - // This host can definitely be capped. - // Cap this host to half it's current cap value and update the new cap value in cappedVictims and orderCappedVictims - // If we have hit the threshold then we add this host to orderCapped to indicate that it needs to be uncapped. - newCapValue := getNextCapValue(cappedVictims[alreadyCappedHosts[i]], 2) + // If already capped then the host must be present in orderCappedVictims + capValue := orderCappedVictims[alreadyCappedHosts[i]] + // if capValue is greater than the threshold then cap, else continue + if capValue > constants.CapThreshold { + newCapValue := getNextCapValue(capValue, 2) + log.Printf("CurrentCapValue for host[%s] is %f", alreadyCappedHosts[i], capValue) + log.Printf("NewCapValue for host[%s] is %f", alreadyCappedHosts[i], newCapValue) if err := rapl.Cap(alreadyCappedHosts[i], "rapl", newCapValue); err != nil { log.Printf("Error capping host %s", alreadyCappedHosts[i]) } else { + // successful cap log.Printf("Capped host[%s] at %f", alreadyCappedHosts[i], newCapValue) // Checking whether this victim can be capped further if newCapValue <= constants.CapThreshold { // deleting victim from cappedVictims delete(cappedVictims, alreadyCappedHosts[i]) - // staging victim for uncapping - orderCapped = append(orderCapped, alreadyCappedHosts[i]) - orderCappedVictims[alreadyCappedHosts[i]] = constants.CapThreshold + // updating the cap value in orderCappedVictims + orderCappedVictims[alreadyCappedHosts[i]] = newCapValue } else { - // Updating the cap value of the victim + // updating the cap value cappedVictims[alreadyCappedHosts[i]] = newCapValue + orderCappedVictims[alreadyCappedHosts[i]] = newCapValue } - break // exiting only on a successful uncap + break // exiting only on a successful cap. } + } else { + // Do nothing + // Continue to find another victim to cap. + // If cannot find any victim, then all nodes have been capped to the maximum and we stop capping at that point. } } } } else if clusterMean < loThreshold { log.Println("Need to uncap a node") + log.Println(cappedVictims) + log.Println(orderCappedVictims) if len(orderCapped) > 0 { host := orderCapped[len(orderCapped)-1] - // Removing victim from orderCapped only if it has been completely uncapped to 100% - if cappedVictims[host] == 100.0 { - orderCapped = orderCapped[:len(orderCapped)-1] - delete(orderCappedVictims, host) + // Uncapping the host and removing it from orderCapped if we cannot uncap it further + newUncapValue := orderCappedVictims[host] * 2.0 + if err := rapl.Cap(host, "rapl", newUncapValue); err != nil { + log.Printf("Error uncapping host %s", host) } else { - newCapValue := getNextUncapValue(cappedVictims[host], 2) - // Uncapping the victim - if err := rapl.Cap(host, "rapl", newCapValue); err != nil { - log.Printf("Error uncapping host %s", host) - } else { - // Successful uncap - log.Printf("Uncapped host[%s] to %f", host, newCapValue) - // If the new cap value is 100, then this node cannot be uncapped - if newCapValue == 100.0 { - orderCapped = orderCapped[:len(orderCapped)-1] - delete(orderCappedVictims, host) - // Updating cappedVictims - cappedVictims[host] = newCapValue - } else if newCapValue > constants.CapThreshold { - // This host can be capped - cappedVictims[host] = newCapValue - // Updating orderCappedVictims - orderCappedVictims[host] = newCapValue - } + // Successful uncap + log.Printf("Uncapped host[%s] to %f", host, newUncapValue) + if newUncapValue >= 100.0 { // can compare using == + orderCapped = orderCapped[:len(orderCapped)-1] + delete(orderCappedVictims, host) + // removing entry from cappedVictims as this host is no longer capped + delete(cappedVictims, host) + } else if newUncapValue > constants.CapThreshold { // this check is unnecessary and can be converted to 'else' + // Updating the cap value in orderCappedVictims and cappedVictims + orderCappedVictims[host] = newUncapValue + cappedVictims[host] = newUncapValue } } } else { - // No node has been capped until now. + log.Println("No host staged for Uncapping") } } } From de4e4c01415692ecd51c66f32486aa57e3140b74 Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Tue, 21 Feb 2017 22:27:08 -0500 Subject: [PATCH 08/12] removed unnecessary function. --- utilities/utils.go | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/utilities/utils.go b/utilities/utils.go index 18b2400..1d0dfd1 100644 --- a/utilities/utils.go +++ b/utilities/utils.go @@ -32,16 +32,3 @@ func (plist PairList) Len() int { func (plist PairList) Less(i, j int) bool { return plist[i].Value < plist[j].Value } - -// convert a PairList to a map[string]float64 -func OrderedKeys(plist PairList) ([]string, error) { - // Validation - if plist == nil { - return nil, errors.New("Invalid argument: plist") - } - orderedKeys := make([]string, len(plist)) - for _, pair := range plist { - orderedKeys = append(orderedKeys, pair.Key) - } - return orderedKeys, nil -} From 64de61ac4e7397a867633235b3a5c811efb68a8f Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Wed, 22 Feb 2017 20:07:48 -0500 Subject: [PATCH 09/12] added function to initial list of Pairs from a map[string]float64. Removed unnecessary import. --- utilities/utils.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/utilities/utils.go b/utilities/utils.go index 1d0dfd1..a8ca307 100644 --- a/utilities/utils.go +++ b/utilities/utils.go @@ -1,9 +1,5 @@ package utilities -import ( - "errors" -) - /* The Pair and PairList have been taken from google groups forum, https://groups.google.com/forum/#!topic/golang-nuts/FT7cjmcL7gw @@ -18,6 +14,15 @@ type Pair struct { // A slice of pairs that implements the sort.Interface to sort by value. type PairList []Pair +// Convert map[string]float64 to PairList +func GetPairList(m map[string]float64) PairList { + pl := PairList{} + for k, v := range m { + pl = append(pl, Pair{Key: k, Value: v}) + } + return pl +} + // Swap pairs in the PairList func (plist PairList) Swap(i, j int) { plist[i], plist[j] = plist[j], plist[i] From 2939943afde324c061a8010e6fba790eb1b70690 Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Wed, 22 Feb 2017 20:09:04 -0500 Subject: [PATCH 10/12] Made sure that the victim that is capped the most is the one picked to be uncapped. This is to reduce the chances of benchmark starvation. Refined comments and logging. --- pcp/logAndProgressiveExtrema.go | 85 +++++++++++++++++++-------------- 1 file changed, 49 insertions(+), 36 deletions(-) diff --git a/pcp/logAndProgressiveExtrema.go b/pcp/logAndProgressiveExtrema.go index b844fe4..8242ae9 100644 --- a/pcp/logAndProgressiveExtrema.go +++ b/pcp/logAndProgressiveExtrema.go @@ -14,6 +14,7 @@ import ( "strings" "syscall" "time" + "bitbucket.org/sunybingcloud/electron/utilities" ) func round(num float64) int { @@ -84,11 +85,13 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref // Throw away first set of results scanner.Scan() - // cappedHosts := make(map[string]bool) - // Keep track of the capped victims and the corresponding cap value. + // To keep track of the capped states of the capped victims cappedVictims := make(map[string]float64) + // TODO: Come with a better name for this. orderCapped := make([]string, 0, 8) - orderCappedVictims := make(map[string]float64) // Parallel data structure to orderCapped to make it possible to search victims, that are to be uncapped, faster. + // TODO: Change this to a priority queue ordered by the cap value. This will get rid of the sorting performed in the code. + // Parallel data structure to orderCapped to keep track of the uncapped states of the uncapped victims + orderCappedVictims := make(map[string]float64) clusterPowerHist := ring.New(5) seconds := 0 @@ -122,8 +125,8 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref if clusterMean >= hiThreshold { log.Println("Need to cap a node") - log.Println(cappedVictims) - log.Println(orderCappedVictims) + log.Printf("Cap values of capped victims: %v", cappedVictims) + log.Printf("Cap values of victims to uncap: %v", orderCappedVictims) // Create statics for all victims and choose one to cap victims := make([]Victim, 0, 8) @@ -140,88 +143,98 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref // Finding the best victim to cap in a round robin manner newVictimFound := false - alreadyCappedHosts := []string{} // Hosts of already capped hosts in decreasing order of recent power consumption + alreadyCappedHosts := []string{} // Host-names of victims that are already capped for i := 0; i < len(victims); i++ { - // Try to pick a victim that hasn't been capped yet. + // Try to pick a victim that hasn't been capped yet if _, ok := cappedVictims[victims[i].Host]; !ok { - // If this victim can't be capped further, then we move on to find another victim that we can cap. + // If this victim can't be capped further, then we move on to find another victim if _, ok := orderCappedVictims[victims[i].Host]; ok { continue } - // Need to cap this victim and add it to the cappedVictims + // Need to cap this victim if err := rapl.Cap(victims[i].Host, "rapl", 50.0); err != nil { log.Printf("Error capping host %s", victims[i].Host) } else { log.Printf("Capped host[%s] at %f", victims[i].Host, 50.0) + // Keeping track of this victim and it's cap value cappedVictims[victims[i].Host] = 50.0 newVictimFound = true // This node can be uncapped and hence adding to orderCapped orderCapped = append(orderCapped, victims[i].Host) orderCappedVictims[victims[i].Host] = 50.0 - break // breaking only on successful cap + break // Breaking only on successful cap } } else { alreadyCappedHosts = append(alreadyCappedHosts, victims[i].Host) } } - // If no new victim found, then we need to cap the best victim among the already capped victims + // If no new victim found, then we need to cap the best victim among the ones that are already capped if !newVictimFound { for i := 0; i < len(alreadyCappedHosts); i++ { // If already capped then the host must be present in orderCappedVictims capValue := orderCappedVictims[alreadyCappedHosts[i]] - // if capValue is greater than the threshold then cap, else continue + // If capValue is greater than the threshold then cap, else continue if capValue > constants.CapThreshold { newCapValue := getNextCapValue(capValue, 2) - log.Printf("CurrentCapValue for host[%s] is %f", alreadyCappedHosts[i], capValue) - log.Printf("NewCapValue for host[%s] is %f", alreadyCappedHosts[i], newCapValue) if err := rapl.Cap(alreadyCappedHosts[i], "rapl", newCapValue); err != nil { - log.Printf("Error capping host %s", alreadyCappedHosts[i]) + log.Printf("Error capping host[%s]", alreadyCappedHosts[i]) } else { - // successful cap + // Successful cap log.Printf("Capped host[%s] at %f", alreadyCappedHosts[i], newCapValue) // Checking whether this victim can be capped further if newCapValue <= constants.CapThreshold { - // deleting victim from cappedVictims + // Deleting victim from cappedVictims delete(cappedVictims, alreadyCappedHosts[i]) - // updating the cap value in orderCappedVictims + // Updating the cap value in orderCappedVictims orderCappedVictims[alreadyCappedHosts[i]] = newCapValue } else { - // updating the cap value + // Updating the cap value cappedVictims[alreadyCappedHosts[i]] = newCapValue orderCappedVictims[alreadyCappedHosts[i]] = newCapValue } - break // exiting only on a successful cap. + break // Breaking only on successful cap. } } else { // Do nothing // Continue to find another victim to cap. - // If cannot find any victim, then all nodes have been capped to the maximum and we stop capping at that point. + // If cannot find any victim, then all nodes have been capped to the maximum and we stop capping at this point. } } } } else if clusterMean < loThreshold { log.Println("Need to uncap a node") - log.Println(cappedVictims) - log.Println(orderCappedVictims) + log.Printf("Cap values of capped victims: %v", cappedVictims) + log.Printf("Cap values of victims to uncap: %v", orderCappedVictims) if len(orderCapped) > 0 { - host := orderCapped[len(orderCapped)-1] - // Uncapping the host and removing it from orderCapped if we cannot uncap it further - newUncapValue := orderCappedVictims[host] * 2.0 - if err := rapl.Cap(host, "rapl", newUncapValue); err != nil { - log.Printf("Error uncapping host %s", host) + // We pick the host that is capped the most to uncap + orderCappedToSort := utilities.GetPairList(orderCappedVictims) + sort.Sort(orderCappedToSort) // Sorted hosts in non-decreasing order of capped states + hostToUncap := orderCappedToSort[0].Key + // Uncapping the host + newUncapValue := orderCappedVictims[hostToUncap] * 2.0 + if err := rapl.Cap(hostToUncap, "rapl", newUncapValue); err != nil { + log.Printf("Error uncapping host[%s]", hostToUncap) } else { // Successful uncap - log.Printf("Uncapped host[%s] to %f", host, newUncapValue) + log.Printf("Uncapped host[%s] to %f", hostToUncap, newUncapValue) + // Can we uncap this host further. If not, then we remove its entry from orderCapped if newUncapValue >= 100.0 { // can compare using == - orderCapped = orderCapped[:len(orderCapped)-1] - delete(orderCappedVictims, host) - // removing entry from cappedVictims as this host is no longer capped - delete(cappedVictims, host) + // Deleting entry from orderCapped + for i, victimHost := range orderCapped { + if victimHost == hostToUncap { + orderCapped = append(orderCapped[:i], orderCapped[i+1:]...) + break // We are done removing host from orderCapped + } + } + // Removing entry for host from the parallel data structure + delete(orderCappedVictims, hostToUncap) + // Removing entry from cappedVictims as this host is no longer capped + delete(cappedVictims, hostToUncap) } else if newUncapValue > constants.CapThreshold { // this check is unnecessary and can be converted to 'else' - // Updating the cap value in orderCappedVictims and cappedVictims - orderCappedVictims[host] = newUncapValue - cappedVictims[host] = newUncapValue + // Updating the cap value + orderCappedVictims[hostToUncap] = newUncapValue + cappedVictims[hostToUncap] = newUncapValue } } } else { From 87bd8d7cf0bc53cb9fe792a4ea6f7d5e6a85c7d2 Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Thu, 9 Mar 2017 19:17:06 -0500 Subject: [PATCH 11/12] Added logging when there is no victim left to cap. --- pcp/logAndProgressiveExtrema.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pcp/logAndProgressiveExtrema.go b/pcp/logAndProgressiveExtrema.go index 8242ae9..b284d3a 100644 --- a/pcp/logAndProgressiveExtrema.go +++ b/pcp/logAndProgressiveExtrema.go @@ -170,6 +170,7 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref } // If no new victim found, then we need to cap the best victim among the ones that are already capped if !newVictimFound { + canCapAlreadyCappedVictim := false for i := 0; i < len(alreadyCappedHosts); i++ { // If already capped then the host must be present in orderCappedVictims capValue := orderCappedVictims[alreadyCappedHosts[i]] @@ -192,6 +193,7 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref cappedVictims[alreadyCappedHosts[i]] = newCapValue orderCappedVictims[alreadyCappedHosts[i]] = newCapValue } + canCapAlreadyCappedVictim = true break // Breaking only on successful cap. } } else { @@ -200,6 +202,9 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref // If cannot find any victim, then all nodes have been capped to the maximum and we stop capping at this point. } } + if (!canCapAlreadyCappedVictim) { + log.Println("No Victim left to cap.") + } } } else if clusterMean < loThreshold { From 41206dd82e140208fbdc5474d3b416dc9142bd73 Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Thu, 9 Mar 2017 19:20:13 -0500 Subject: [PATCH 12/12] refactored name of CapThreshold to LowerCapLimit. Added comment to mention that floating point operations can lead to precision loss. --- constants/constants.go | 2 +- pcp/logAndProgressiveExtrema.go | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/constants/constants.go b/constants/constants.go index 782f33d..d8dcd9f 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -45,4 +45,4 @@ var Tolerance = 0.70 var ConsiderationWindowSize = 20 // Threshold below which a host should be capped -var CapThreshold = 12.5 +var LowerCapLimit = 12.5 diff --git a/pcp/logAndProgressiveExtrema.go b/pcp/logAndProgressiveExtrema.go index b284d3a..f708bef 100644 --- a/pcp/logAndProgressiveExtrema.go +++ b/pcp/logAndProgressiveExtrema.go @@ -175,7 +175,7 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref // If already capped then the host must be present in orderCappedVictims capValue := orderCappedVictims[alreadyCappedHosts[i]] // If capValue is greater than the threshold then cap, else continue - if capValue > constants.CapThreshold { + if capValue > constants.LowerCapLimit { newCapValue := getNextCapValue(capValue, 2) if err := rapl.Cap(alreadyCappedHosts[i], "rapl", newCapValue); err != nil { log.Printf("Error capping host[%s]", alreadyCappedHosts[i]) @@ -183,7 +183,7 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref // Successful cap log.Printf("Capped host[%s] at %f", alreadyCappedHosts[i], newCapValue) // Checking whether this victim can be capped further - if newCapValue <= constants.CapThreshold { + if newCapValue <= constants.LowerCapLimit { // Deleting victim from cappedVictims delete(cappedVictims, alreadyCappedHosts[i]) // Updating the cap value in orderCappedVictims @@ -216,7 +216,8 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref orderCappedToSort := utilities.GetPairList(orderCappedVictims) sort.Sort(orderCappedToSort) // Sorted hosts in non-decreasing order of capped states hostToUncap := orderCappedToSort[0].Key - // Uncapping the host + // Uncapping the host. + // This is a floating point operation and might suffer from precision loss. newUncapValue := orderCappedVictims[hostToUncap] * 2.0 if err := rapl.Cap(hostToUncap, "rapl", newUncapValue); err != nil { log.Printf("Error uncapping host[%s]", hostToUncap) @@ -236,7 +237,7 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref delete(orderCappedVictims, hostToUncap) // Removing entry from cappedVictims as this host is no longer capped delete(cappedVictims, hostToUncap) - } else if newUncapValue > constants.CapThreshold { // this check is unnecessary and can be converted to 'else' + } else if newUncapValue > constants.LowerCapLimit { // this check is unnecessary and can be converted to 'else' // Updating the cap value orderCappedVictims[hostToUncap] = newUncapValue cappedVictims[hostToUncap] = newUncapValue