diff --git a/pcp/logAndProgressiveExtrema.go b/pcp/logAndProgressiveExtrema.go index 7936b53..5eefb8c 100644 --- a/pcp/logAndProgressiveExtrema.go +++ b/pcp/logAndProgressiveExtrema.go @@ -1,18 +1,19 @@ package pcp import ( + "bitbucket.org/sunybingcloud/electron/constants" "bitbucket.org/sunybingcloud/electron/rapl" "bufio" "container/ring" "log" + "math" "os" "os/exec" "sort" "strconv" "strings" "syscall" - "math" - "bitbucket.org/sunybingcloud/electron/constants" + "time" ) func round(num float64) int { @@ -20,17 +21,25 @@ func round(num float64) int { } func getNextCapValue(curCapValue float64, precision int) float64 { + curCapValue /= 2.0 output := math.Pow(10, float64(precision)) - return float64(round(curCapValue * output)) / output + return float64(round(curCapValue*output)) / output +} + +func getNextUncapValue(curCapValue float64, precision int) float64 { + curCapValue *= 2.0 + output := math.Pow(10, float64(precision)) + return float64(round(curCapValue*output)) / output } func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, prefix string, hiThreshold, loThreshold float64) { + log.Println("Inside Log and Progressive Extrema") const pcpCommand string = "pmdumptext -m -l -f '' -t 1.0 -d , -c config" cmd := exec.Command("sh", "-c", pcpCommand) cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} if hiThreshold < loThreshold { - log.Println("High threshold is lower than the low threshold") + log.Println("High threshold is lower than low threshold!") } logFile, err := os.Create("./" + prefix + ".pcplog") @@ -81,10 +90,11 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref // Throw away first set of results scanner.Scan() - //cappedHosts := make(map[string]bool) + // cappedHosts := make(map[string]bool) // Keep track of the capped victims and the corresponding cap value. cappedVictims := make(map[string]float64) orderCapped := make([]string, 0, 8) + orderCappedVictims := make(map[string]float64) // Parallel data structure to orderCapped to make it possible to search victims, that are to be uncapped, faster. clusterPowerHist := ring.New(5) seconds := 0 @@ -117,7 +127,7 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref log.Printf("Total power: %f, %d Sec Avg: %f", clusterPower, clusterPowerHist.Len(), clusterMean) if clusterMean >= hiThreshold { - log.Printf("Need to cap a node") + log.Println("Need to cap a node") // Create statics for all victims and choose one to cap victims := make([]Victim, 0, 8) @@ -132,40 +142,121 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref sort.Sort(VictimSorter(victims)) // Sort by average wattage - // Finding the best victim to cap. + // Finding the best victim to cap in a round robin manner + newVictimFound := false + alreadyCappedHosts := []string{} // Hosts of already capped hosts in decreasing order of recent power consumption for i := 0; i < len(victims); i++ { - if curCapValue, ok := cappedVictims[victims[i].Host]; ok { - // checking whether we can continue to cap this host. - // If yes, then we cap it to half the current cap value. - // Else, we push it to the orderedCapped and continue. - if curCapValue > constants.CapThreshold { - newCapValue := getNextCapValue(curCapValue/2.0, 1) - if err := rapl.Cap(victims[0].Host, "rapl", newCapValue); err != nil { - log.Print("Error capping host") - } - // Updating the curCapValue in cappedVictims - cappedVictims[victims[0].Host] = newCapValue - break + // Try to pick a victim that hasn't been capped yet. + if _, ok := cappedVictims[victims[i].Host]; !ok { + // If this victim is present in orderCapped, then we move on to find another victim that we can cap. + if _, ok := orderCappedVictims[victims[i].Host]; ok { + // Adding the host to the alreadyCappedHosts + alreadyCappedHosts = append(alreadyCappedHosts, victims[i].Host) + continue + } + // Need to cap this victim and add it to the cappedVictims + if err := rapl.Cap(victims[i].Host, "rapl", 50.0); err != nil { + log.Printf("Error capping host %s", victims[i].Host) } else { - // deleting entry in cappedVictims - delete(cappedVictims, victims[i].Host) - // Now this host can be uncapped. + log.Printf("Capped host[%s] at %f", victims[i].Host, 50.0) + cappedVictims[victims[i].Host] = 50.0 + newVictimFound = true + // This node can be uncapped and hence adding to orderCapped orderCapped = append(orderCapped, victims[i].Host) + orderCappedVictims[victims[i].Host] = 50.0 + break // breaking only on successful cap + } + } else { + alreadyCappedHosts = append(alreadyCappedHosts, victims[i].Host) + } + } + // If no new victim found, then we need to cap the best victim among the already capped victims + if !newVictimFound { + for i := 0; i < len(alreadyCappedHosts); i++ { + // Checking if this node can be uncapped too + if capValue, ok := orderCappedVictims[alreadyCappedHosts[i]]; ok { + // if capValue is greater than the threshold then cap, else continue + if capValue > constants.CapThreshold { + newCapValue := getNextCapValue(capValue, 2) + if err := rapl.Cap(alreadyCappedHosts[i], "rapl", newCapValue); err != nil { + log.Printf("Error capping host %s", alreadyCappedHosts[i]) + } else { + // successful cap + log.Printf("Capped host[%s] at %f", alreadyCappedHosts[i], newCapValue) + // Checking whether this victim can be capped further + if newCapValue <= constants.CapThreshold { + // deleting victim from cappedVictims + delete(cappedVictims, alreadyCappedHosts[i]) + // updating the cap value in orderCappedVictims + orderCappedVictims[alreadyCappedHosts[i]] = newCapValue + } else { + // updating the cap value + cappedVictims[alreadyCappedHosts[i]] = newCapValue + orderCappedVictims[alreadyCappedHosts[i]] = newCapValue + } + break // exiting only on a successful cap. + } + } else { + // Do nothing + } + } else { + // This host can definitely be capped. + // Cap this host to half it's current cap value and update the new cap value in cappedVictims and orderCappedVictims + // If we have hit the threshold then we add this host to orderCapped to indicate that it needs to be uncapped. + newCapValue := getNextCapValue(cappedVictims[alreadyCappedHosts[i]], 2) + if err := rapl.Cap(alreadyCappedHosts[i], "rapl", newCapValue); err != nil { + log.Printf("Error capping host %s", alreadyCappedHosts[i]) + } else { + log.Printf("Capped host[%s] at %f", alreadyCappedHosts[i], newCapValue) + // Checking whether this victim can be capped further + if newCapValue <= constants.CapThreshold { + // deleting victim from cappedVictims + delete(cappedVictims, alreadyCappedHosts[i]) + // staging victim for uncapping + orderCapped = append(orderCapped, alreadyCappedHosts[i]) + orderCappedVictims[alreadyCappedHosts[i]] = constants.CapThreshold + } else { + // Updating the cap value of the victim + cappedVictims[alreadyCappedHosts[i]] = newCapValue + } + break // exiting only on a successful uncap + } } } } } else if clusterMean < loThreshold { + log.Println("Need to uncap a node") if len(orderCapped) > 0 { host := orderCapped[len(orderCapped)-1] - orderCapped = orderCapped[:len(orderCapped)-1] - // cappedVictims would contain the latest cap value for this host. - newCapValue := getNextCapValue(cappedVictims[host]/2.0, 1) - if err := rapl.Cap(host, "rapl", newCapValue); err != nil { - log.Print("Error capping host") + // Removing victim from orderCapped only if it has been completely uncapped to 100% + if cappedVictims[host] == 100.0 { + orderCapped = orderCapped[:len(orderCapped)-1] + delete(orderCappedVictims, host) + } else { + newCapValue := getNextUncapValue(cappedVictims[host], 2) + // Uncapping the victim + if err := rapl.Cap(host, "rapl", newCapValue); err != nil { + log.Printf("Error uncapping host %s", host) + } else { + // Successful uncap + log.Printf("Uncapped host[%s] to %f", host, newCapValue) + // If the new cap value is 100, then this node cannot be uncapped + if newCapValue == 100.0 { + orderCapped = orderCapped[:len(orderCapped)-1] + delete(orderCappedVictims, host) + // Updating cappedVictims + cappedVictims[host] = newCapValue + } else if newCapValue > constants.CapThreshold { + // This host can be capped + cappedVictims[host] = newCapValue + // Updating orderCappedVictims + orderCappedVictims[host] = newCapValue + } + } } - // Adding entry for the host to cappedVictims - cappedVictims[host] = newCapValue // Now this host can be capped again. + } else { + // No node has been capped until now. } } } @@ -173,4 +264,24 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref } }(logging, hiThreshold, loThreshold) + + log.Println("PCP logging started") + + if err := cmd.Start(); err != nil { + log.Fatal(err) + } + + pgid, err := syscall.Getpgid(cmd.Process.Pid) + + select { + case <-quit: + log.Println("Stopping PCP logging in 5 seconds") + time.Sleep(5 * time.Second) + + // http://stackoverflow.com/questions/22470193/why-wont-go-kill-a-child-process-correctly + // kill process and all children processes + syscall.Kill(-pgid, 15) + return + } + } diff --git a/scheduler.go b/scheduler.go index 9b6dc45..ba13663 100644 --- a/scheduler.go +++ b/scheduler.go @@ -60,7 +60,7 @@ func main() { startTime := time.Now().Format("20060102150405") logPrefix := *pcplogPrefix + "_" + startTime - scheduler := schedulers.NewBinPackedPistonCapper(tasks, *wattsAsAResource, logPrefix, *classMapWatts) + scheduler := schedulers.NewFirstFit(tasks, *wattsAsAResource, logPrefix, *classMapWatts) driver, err := sched.NewMesosSchedulerDriver(sched.DriverConfig{ Master: *master, Framework: &mesos.FrameworkInfo{ @@ -74,8 +74,9 @@ func main() { return } - go pcp.Start(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix) + //go pcp.Start(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix) //go pcp.StartPCPLogAndExtremaDynamicCap(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix, *hiThreshold, *loThreshold) + go pcp.StartPCPLogAndProgressiveExtremaCap(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix, *hiThreshold, *loThreshold) time.Sleep(1 * time.Second) // Take a second between starting PCP log and continuing // Attempt to handle signint to not leave pmdumptext running