diff --git a/pcp/logAndProgressiveExtrema.go b/pcp/logAndProgressiveExtrema.go new file mode 100644 index 0000000..7936b53 --- /dev/null +++ b/pcp/logAndProgressiveExtrema.go @@ -0,0 +1,176 @@ +package pcp + +import ( + "bitbucket.org/sunybingcloud/electron/rapl" + "bufio" + "container/ring" + "log" + "os" + "os/exec" + "sort" + "strconv" + "strings" + "syscall" + "math" + "bitbucket.org/sunybingcloud/electron/constants" +) + +func round(num float64) int { + return int(math.Floor(num + math.Copysign(0.5, num))) +} + +func getNextCapValue(curCapValue float64, precision int) float64 { + output := math.Pow(10, float64(precision)) + return float64(round(curCapValue * output)) / output +} + +func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, prefix string, hiThreshold, loThreshold float64) { + const pcpCommand string = "pmdumptext -m -l -f '' -t 1.0 -d , -c config" + cmd := exec.Command("sh", "-c", pcpCommand) + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + + if hiThreshold < loThreshold { + log.Println("High threshold is lower than the low threshold") + } + + logFile, err := os.Create("./" + prefix + ".pcplog") + if err != nil { + log.Fatal(err) + } + + defer logFile.Close() + + pipe, err := cmd.StdoutPipe() + if err != nil { + log.Fatal(err) + } + //cmd.Stdout = stdout + + scanner := bufio.NewScanner(pipe) + + go func(logging *bool, hiThreshold, loThreshold float64) { + // Get names of the columns + scanner.Scan() + + // Write to logfile + logFile.WriteString(scanner.Text() + "\n") + + headers := strings.Split(scanner.Text(), ",") + + powerIndexes := make([]int, 0, 0) + powerHistories := make(map[string]*ring.Ring) + indexToHost := make(map[int]string) + + for i, hostMetric := range headers { + metricSplit := strings.Split(hostMetric, ":") + //log.Printf("%d Host %s: Metric: %s\n", i, split[0], split[1]) + + if strings.Contains(metricSplit[1], "RAPL_ENERGY_PKG") || + strings.Contains(metricSplit[1], "RAPL_ENERGY_DRAM") { + //fmt.Println("Index: ", i) + powerIndexes = append(powerIndexes, i) + indexToHost[i] = metricSplit[0] + + // Only create one ring per host + if _, ok := powerHistories[metricSplit[0]]; !ok { + powerHistories[metricSplit[0]] = ring.New(20) // Two PKGS, two DRAM per node, 20 = 5 seconds of tracking + } + } + } + + // Throw away first set of results + scanner.Scan() + + //cappedHosts := make(map[string]bool) + // Keep track of the capped victims and the corresponding cap value. + cappedVictims := make(map[string]float64) + orderCapped := make([]string, 0, 8) + clusterPowerHist := ring.New(5) + seconds := 0 + + for scanner.Scan() { + if *logging { + log.Println("Logging PCP...") + split := strings.Split(scanner.Text(), ",") + logFile.WriteString(scanner.Text() + "\n") + + totalPower := 0.0 + for _, powerIndex := range powerIndexes { + power, _ := strconv.ParseFloat(split[powerIndex], 64) + + host := indexToHost[powerIndex] + + powerHistories[host].Value = power + powerHistories[host] = powerHistories[host].Next() + + log.Printf("Host: %s, Power: %f", indexToHost[powerIndex], (power * RAPLUnits)) + + totalPower += power + } + clusterPower := totalPower * RAPLUnits + + clusterPowerHist.Value = clusterPower + clusterPowerHist = clusterPowerHist.Next() + + clusterMean := averageClusterPowerHistory(clusterPowerHist) + + log.Printf("Total power: %f, %d Sec Avg: %f", clusterPower, clusterPowerHist.Len(), clusterMean) + + if clusterMean >= hiThreshold { + log.Printf("Need to cap a node") + // Create statics for all victims and choose one to cap + victims := make([]Victim, 0, 8) + + // TODO: Just keep track of the largest to reduce fron nlogn to n + for name, history := range powerHistories { + + histMean := averageNodePowerHistory(history) + + // Consider doing mean calculations using go routines if we need to speed up + victims = append(victims, Victim{Watts: histMean, Host: name}) + } + + sort.Sort(VictimSorter(victims)) // Sort by average wattage + + // Finding the best victim to cap. + for i := 0; i < len(victims); i++ { + if curCapValue, ok := cappedVictims[victims[i].Host]; ok { + // checking whether we can continue to cap this host. + // If yes, then we cap it to half the current cap value. + // Else, we push it to the orderedCapped and continue. + if curCapValue > constants.CapThreshold { + newCapValue := getNextCapValue(curCapValue/2.0, 1) + if err := rapl.Cap(victims[0].Host, "rapl", newCapValue); err != nil { + log.Print("Error capping host") + } + // Updating the curCapValue in cappedVictims + cappedVictims[victims[0].Host] = newCapValue + break + } else { + // deleting entry in cappedVictims + delete(cappedVictims, victims[i].Host) + // Now this host can be uncapped. + orderCapped = append(orderCapped, victims[i].Host) + } + } + } + + } else if clusterMean < loThreshold { + if len(orderCapped) > 0 { + host := orderCapped[len(orderCapped)-1] + orderCapped = orderCapped[:len(orderCapped)-1] + // cappedVictims would contain the latest cap value for this host. + newCapValue := getNextCapValue(cappedVictims[host]/2.0, 1) + if err := rapl.Cap(host, "rapl", newCapValue); err != nil { + log.Print("Error capping host") + } + // Adding entry for the host to cappedVictims + cappedVictims[host] = newCapValue // Now this host can be capped again. + } + } + } + seconds++ + } + + }(logging, hiThreshold, loThreshold) +} diff --git a/pcp/loganddynamiccap.go b/pcp/loganddynamiccap.go index 714bf68..f692e5f 100644 --- a/pcp/loganddynamiccap.go +++ b/pcp/loganddynamiccap.go @@ -5,7 +5,6 @@ import ( "bufio" "container/ring" "log" - "math" "os" "os/exec" "sort" @@ -15,49 +14,6 @@ import ( "time" ) -var RAPLUnits = math.Pow(2, -32) - -func averageNodePowerHistory(history *ring.Ring) float64 { - - total := 0.0 - count := 0.0 - - history.Do(func(x interface{}) { - if val, ok := x.(float64); ok { //Add it if we can get a float - total += val - count++ - } - }) - - if count == 0.0 { - return 0.0 - } - - count /= 4 // two PKGs, two DRAM for all nodes currently - - return (total / count) -} - -// TODO: Figure a way to merge this and avgpower -func averageClusterPowerHistory(history *ring.Ring) float64 { - - total := 0.0 - count := 0.0 - - history.Do(func(x interface{}) { - if val, ok := x.(float64); ok { //Add it if we can get a float - total += val - count++ - } - }) - - if count == 0.0 { - return 0.0 - } - - return (total / count) -} - func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, prefix string, hiThreshold, loThreshold float64) { const pcpCommand string = "pmdumptext -m -l -f '' -t 1.0 -d , -c config" cmd := exec.Command("sh", "-c", pcpCommand) diff --git a/pcp/utils.go b/pcp/utils.go new file mode 100644 index 0000000..1e0a260 --- /dev/null +++ b/pcp/utils.go @@ -0,0 +1,49 @@ +package pcp + +import ( + "container/ring" + "math" +) + +var RAPLUnits = math.Pow(2, -32) + +func averageNodePowerHistory(history *ring.Ring) float64 { + + total := 0.0 + count := 0.0 + + history.Do(func(x interface{}) { + if val, ok := x.(float64); ok { //Add it if we can get a float + total += val + count++ + } + }) + + if count == 0.0 { + return 0.0 + } + + count /= 4 // two PKGs, two DRAM for all nodes currently + + return (total / count) +} + +// TODO: Figure a way to merge this and avgpower +func averageClusterPowerHistory(history *ring.Ring) float64 { + + total := 0.0 + count := 0.0 + + history.Do(func(x interface{}) { + if val, ok := x.(float64); ok { //Add it if we can get a float + total += val + count++ + } + }) + + if count == 0.0 { + return 0.0 + } + + return (total / count) +} diff --git a/rapl/cap.go b/rapl/cap.go index b15d352..6b5ec2f 100644 --- a/rapl/cap.go +++ b/rapl/cap.go @@ -6,7 +6,7 @@ import ( "strconv" ) -func Cap(host, username string, percentage int) error { +func Cap(host, username string, percentage float64) error { if percentage > 100 || percentage < 0 { return errors.New("Percentage is out of range") @@ -31,7 +31,7 @@ func Cap(host, username string, percentage int) error { return errors.Wrap(err, "Failed to create session") } - err = session.Run("sudo /misc/shared_data/rdelval1/RAPL_PKG_Throttle.py " + strconv.Itoa(percentage)) + err = session.Run("sudo /misc/shared_data/rdelval1/RAPL_PKG_Throttle.py " + strconv.FormatFloat(percentage, 'f', 2, 64)) if err != nil { return errors.Wrap(err, "Failed to run RAPL script") } diff --git a/schedulers/binpackedpistoncapping.go b/schedulers/binpackedpistoncapping.go index 95225e9..69764a9 100644 --- a/schedulers/binpackedpistoncapping.go +++ b/schedulers/binpackedpistoncapping.go @@ -157,7 +157,7 @@ func (s *BinPackedPistonCapper) Disconnected(sched.SchedulerDriver) { var bpPistonCapValues = make(map[string]float64) // Storing the previous cap value for each host so as to not repeatedly cap the nodes to the same value. (reduces overhead) -var bpPistonPreviousRoundedCapValues = make(map[string]int) +var bpPistonPreviousRoundedCapValues = make(map[string]float64) func (s *BinPackedPistonCapper) startCapping() { go func() { @@ -167,7 +167,7 @@ func (s *BinPackedPistonCapper) startCapping() { // Need to cap each node bpPistonMutex.Lock() for host, capValue := range bpPistonCapValues { - roundedCapValue := int(math.Floor(capValue + 0.5)) + roundedCapValue := float64(int(math.Floor(capValue + 0.5))) // has the cap value changed if prevRoundedCap, ok := bpPistonPreviousRoundedCapValues[host]; ok { if prevRoundedCap != roundedCapValue { diff --git a/schedulers/bpswMaxMinPistonCapping.go b/schedulers/bpswMaxMinPistonCapping.go index fceec50..2e16201 100644 --- a/schedulers/bpswMaxMinPistonCapping.go +++ b/schedulers/bpswMaxMinPistonCapping.go @@ -157,7 +157,7 @@ var bpMaxMinPistonCappingMutex sync.Mutex var bpMaxMinPistonCappingCapValues = make(map[string]float64) // Storing the previous cap value for each host so as to not repeatedly cap the nodes to the same value. (reduces overhead) -var bpMaxMinPistonCappingPreviousRoundedCapValues = make(map[string]int) +var bpMaxMinPistonCappingPreviousRoundedCapValues = make(map[string]float64) func (s *BPSWMaxMinPistonCapping) startCapping() { go func() { @@ -167,7 +167,7 @@ func (s *BPSWMaxMinPistonCapping) startCapping() { // Need to cap each node bpMaxMinPistonCappingMutex.Lock() for host, capValue := range bpMaxMinPistonCappingCapValues { - roundedCapValue := int(math.Floor(capValue + 0.5)) + roundedCapValue := float64(int(math.Floor(capValue + 0.5))) // has the cap value changed if previousRoundedCap, ok := bpMaxMinPistonCappingPreviousRoundedCapValues[host]; ok { if previousRoundedCap != roundedCapValue { diff --git a/schedulers/bpswMaxMinProacCC.go b/schedulers/bpswMaxMinProacCC.go index 564134a..100154d 100644 --- a/schedulers/bpswMaxMinProacCC.go +++ b/schedulers/bpswMaxMinProacCC.go @@ -164,7 +164,7 @@ func (s *BPSWMaxMinProacCC) startCapping() { if bpMaxMinProacCCCapValue > 0.0 { for _, host := range constants.Hosts { // Rounding cap value to nearest int - if err := rapl.Cap(host, "rapl", int(math.Floor(bpMaxMinProacCCCapValue+0.5))); err != nil { + if err := rapl.Cap(host, "rapl", float64(int(math.Floor(bpMaxMinProacCCCapValue+0.5)))); err != nil { log.Println(err) } } @@ -190,7 +190,7 @@ func (s *BPSWMaxMinProacCC) startRecapping() { if s.isRecapping && bpMaxMinProacCCRecapValue > 0.0 { for _, host := range constants.Hosts { // Rounding the recap value to the nearest int - if err := rapl.Cap(host, "rapl", int(math.Floor(bpMaxMinProacCCRecapValue+0.5))); err != nil { + if err := rapl.Cap(host, "rapl", float64(int(math.Floor(bpMaxMinProacCCRecapValue+0.5)))); err != nil { log.Println(err) } } diff --git a/schedulers/firstfitProacCC.go b/schedulers/firstfitProacCC.go index caddfd3..01c163e 100644 --- a/schedulers/firstfitProacCC.go +++ b/schedulers/firstfitProacCC.go @@ -166,7 +166,7 @@ func (s *FirstFitProacCC) startCapping() { if fcfsCurrentCapValue > 0.0 { for _, host := range constants.Hosts { // Rounding curreCapValue to the nearest int. - if err := rapl.Cap(host, "rapl", int(math.Floor(fcfsCurrentCapValue+0.5))); err != nil { + if err := rapl.Cap(host, "rapl", float64(int(math.Floor(fcfsCurrentCapValue+0.5)))); err != nil { log.Println(err) } } @@ -190,7 +190,7 @@ func (s *FirstFitProacCC) startRecapping() { if s.isRecapping && fcfsRecapValue > 0.0 { for _, host := range constants.Hosts { // Rounding curreCapValue to the nearest int. - if err := rapl.Cap(host, "rapl", int(math.Floor(fcfsRecapValue+0.5))); err != nil { + if err := rapl.Cap(host, "rapl", float64(int(math.Floor(fcfsRecapValue+0.5)))); err != nil { log.Println(err) } } diff --git a/schedulers/firstfitSortedWattsProacCC.go b/schedulers/firstfitSortedWattsProacCC.go index e00590c..1792cfd 100644 --- a/schedulers/firstfitSortedWattsProacCC.go +++ b/schedulers/firstfitSortedWattsProacCC.go @@ -179,7 +179,7 @@ func (s *FirstFitSortedWattsProacCC) startCapping() { if rankedCurrentCapValue > 0.0 { for _, host := range constants.Hosts { // Rounding currentCapValue to the nearest int. - if err := rapl.Cap(host, "rapl", int(math.Floor(rankedCurrentCapValue+0.5))); err != nil { + if err := rapl.Cap(host, "rapl", float64(int(math.Floor(rankedCurrentCapValue+0.5)))); err != nil { log.Println(err) } } @@ -203,7 +203,7 @@ func (s *FirstFitSortedWattsProacCC) startRecapping() { if s.isRecapping && rankedRecapValue > 0.0 { for _, host := range constants.Hosts { // Rounding currentCapValue to the nearest int. - if err := rapl.Cap(host, "rapl", int(math.Floor(rankedRecapValue+0.5))); err != nil { + if err := rapl.Cap(host, "rapl", float64(int(math.Floor(rankedRecapValue+0.5)))); err != nil { log.Println(err) } }