Fixed corner cases in progressive extrema -- When a node is capped and the new cap value is above a threshold then that node can be capped or uncapped in the next cycle. If the new cap value is equal to the threshold then the node cannot be capped further and can only be uncapped. When the node is uncapped and the newUncapValue is below 100 then the node can be capped or uncapped in the next cycle. If the newUncapValue is 100 then the node can only be capped.

This commit is contained in:
Pradyumna Kaushik 2017-02-20 20:55:06 -05:00
parent d42b7a3a3b
commit 726c0555ed
2 changed files with 143 additions and 31 deletions

View file

@ -1,18 +1,19 @@
package pcp
import (
"bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/rapl"
"bufio"
"container/ring"
"log"
"math"
"os"
"os/exec"
"sort"
"strconv"
"strings"
"syscall"
"math"
"bitbucket.org/sunybingcloud/electron/constants"
"time"
)
func round(num float64) int {
@ -20,17 +21,25 @@ func round(num float64) int {
}
func getNextCapValue(curCapValue float64, precision int) float64 {
curCapValue /= 2.0
output := math.Pow(10, float64(precision))
return float64(round(curCapValue * output)) / output
return float64(round(curCapValue*output)) / output
}
func getNextUncapValue(curCapValue float64, precision int) float64 {
curCapValue *= 2.0
output := math.Pow(10, float64(precision))
return float64(round(curCapValue*output)) / output
}
func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, prefix string, hiThreshold, loThreshold float64) {
log.Println("Inside Log and Progressive Extrema")
const pcpCommand string = "pmdumptext -m -l -f '' -t 1.0 -d , -c config"
cmd := exec.Command("sh", "-c", pcpCommand)
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
if hiThreshold < loThreshold {
log.Println("High threshold is lower than the low threshold")
log.Println("High threshold is lower than low threshold!")
}
logFile, err := os.Create("./" + prefix + ".pcplog")
@ -81,10 +90,11 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref
// Throw away first set of results
scanner.Scan()
//cappedHosts := make(map[string]bool)
// cappedHosts := make(map[string]bool)
// Keep track of the capped victims and the corresponding cap value.
cappedVictims := make(map[string]float64)
orderCapped := make([]string, 0, 8)
orderCappedVictims := make(map[string]float64) // Parallel data structure to orderCapped to make it possible to search victims, that are to be uncapped, faster.
clusterPowerHist := ring.New(5)
seconds := 0
@ -117,7 +127,7 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref
log.Printf("Total power: %f, %d Sec Avg: %f", clusterPower, clusterPowerHist.Len(), clusterMean)
if clusterMean >= hiThreshold {
log.Printf("Need to cap a node")
log.Println("Need to cap a node")
// Create statics for all victims and choose one to cap
victims := make([]Victim, 0, 8)
@ -132,40 +142,121 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref
sort.Sort(VictimSorter(victims)) // Sort by average wattage
// Finding the best victim to cap.
// Finding the best victim to cap in a round robin manner
newVictimFound := false
alreadyCappedHosts := []string{} // Hosts of already capped hosts in decreasing order of recent power consumption
for i := 0; i < len(victims); i++ {
if curCapValue, ok := cappedVictims[victims[i].Host]; ok {
// checking whether we can continue to cap this host.
// If yes, then we cap it to half the current cap value.
// Else, we push it to the orderedCapped and continue.
if curCapValue > constants.CapThreshold {
newCapValue := getNextCapValue(curCapValue/2.0, 1)
if err := rapl.Cap(victims[0].Host, "rapl", newCapValue); err != nil {
log.Print("Error capping host")
}
// Updating the curCapValue in cappedVictims
cappedVictims[victims[0].Host] = newCapValue
break
// Try to pick a victim that hasn't been capped yet.
if _, ok := cappedVictims[victims[i].Host]; !ok {
// If this victim is present in orderCapped, then we move on to find another victim that we can cap.
if _, ok := orderCappedVictims[victims[i].Host]; ok {
// Adding the host to the alreadyCappedHosts
alreadyCappedHosts = append(alreadyCappedHosts, victims[i].Host)
continue
}
// Need to cap this victim and add it to the cappedVictims
if err := rapl.Cap(victims[i].Host, "rapl", 50.0); err != nil {
log.Printf("Error capping host %s", victims[i].Host)
} else {
// deleting entry in cappedVictims
delete(cappedVictims, victims[i].Host)
// Now this host can be uncapped.
log.Printf("Capped host[%s] at %f", victims[i].Host, 50.0)
cappedVictims[victims[i].Host] = 50.0
newVictimFound = true
// This node can be uncapped and hence adding to orderCapped
orderCapped = append(orderCapped, victims[i].Host)
orderCappedVictims[victims[i].Host] = 50.0
break // breaking only on successful cap
}
} else {
alreadyCappedHosts = append(alreadyCappedHosts, victims[i].Host)
}
}
// If no new victim found, then we need to cap the best victim among the already capped victims
if !newVictimFound {
for i := 0; i < len(alreadyCappedHosts); i++ {
// Checking if this node can be uncapped too
if capValue, ok := orderCappedVictims[alreadyCappedHosts[i]]; ok {
// if capValue is greater than the threshold then cap, else continue
if capValue > constants.CapThreshold {
newCapValue := getNextCapValue(capValue, 2)
if err := rapl.Cap(alreadyCappedHosts[i], "rapl", newCapValue); err != nil {
log.Printf("Error capping host %s", alreadyCappedHosts[i])
} else {
// successful cap
log.Printf("Capped host[%s] at %f", alreadyCappedHosts[i], newCapValue)
// Checking whether this victim can be capped further
if newCapValue <= constants.CapThreshold {
// deleting victim from cappedVictims
delete(cappedVictims, alreadyCappedHosts[i])
// updating the cap value in orderCappedVictims
orderCappedVictims[alreadyCappedHosts[i]] = newCapValue
} else {
// updating the cap value
cappedVictims[alreadyCappedHosts[i]] = newCapValue
orderCappedVictims[alreadyCappedHosts[i]] = newCapValue
}
break // exiting only on a successful cap.
}
} else {
// Do nothing
}
} else {
// This host can definitely be capped.
// Cap this host to half it's current cap value and update the new cap value in cappedVictims and orderCappedVictims
// If we have hit the threshold then we add this host to orderCapped to indicate that it needs to be uncapped.
newCapValue := getNextCapValue(cappedVictims[alreadyCappedHosts[i]], 2)
if err := rapl.Cap(alreadyCappedHosts[i], "rapl", newCapValue); err != nil {
log.Printf("Error capping host %s", alreadyCappedHosts[i])
} else {
log.Printf("Capped host[%s] at %f", alreadyCappedHosts[i], newCapValue)
// Checking whether this victim can be capped further
if newCapValue <= constants.CapThreshold {
// deleting victim from cappedVictims
delete(cappedVictims, alreadyCappedHosts[i])
// staging victim for uncapping
orderCapped = append(orderCapped, alreadyCappedHosts[i])
orderCappedVictims[alreadyCappedHosts[i]] = constants.CapThreshold
} else {
// Updating the cap value of the victim
cappedVictims[alreadyCappedHosts[i]] = newCapValue
}
break // exiting only on a successful uncap
}
}
}
}
} else if clusterMean < loThreshold {
log.Println("Need to uncap a node")
if len(orderCapped) > 0 {
host := orderCapped[len(orderCapped)-1]
orderCapped = orderCapped[:len(orderCapped)-1]
// cappedVictims would contain the latest cap value for this host.
newCapValue := getNextCapValue(cappedVictims[host]/2.0, 1)
if err := rapl.Cap(host, "rapl", newCapValue); err != nil {
log.Print("Error capping host")
// Removing victim from orderCapped only if it has been completely uncapped to 100%
if cappedVictims[host] == 100.0 {
orderCapped = orderCapped[:len(orderCapped)-1]
delete(orderCappedVictims, host)
} else {
newCapValue := getNextUncapValue(cappedVictims[host], 2)
// Uncapping the victim
if err := rapl.Cap(host, "rapl", newCapValue); err != nil {
log.Printf("Error uncapping host %s", host)
} else {
// Successful uncap
log.Printf("Uncapped host[%s] to %f", host, newCapValue)
// If the new cap value is 100, then this node cannot be uncapped
if newCapValue == 100.0 {
orderCapped = orderCapped[:len(orderCapped)-1]
delete(orderCappedVictims, host)
// Updating cappedVictims
cappedVictims[host] = newCapValue
} else if newCapValue > constants.CapThreshold {
// This host can be capped
cappedVictims[host] = newCapValue
// Updating orderCappedVictims
orderCappedVictims[host] = newCapValue
}
}
}
// Adding entry for the host to cappedVictims
cappedVictims[host] = newCapValue // Now this host can be capped again.
} else {
// No node has been capped until now.
}
}
}
@ -173,4 +264,24 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref
}
}(logging, hiThreshold, loThreshold)
log.Println("PCP logging started")
if err := cmd.Start(); err != nil {
log.Fatal(err)
}
pgid, err := syscall.Getpgid(cmd.Process.Pid)
select {
case <-quit:
log.Println("Stopping PCP logging in 5 seconds")
time.Sleep(5 * time.Second)
// http://stackoverflow.com/questions/22470193/why-wont-go-kill-a-child-process-correctly
// kill process and all children processes
syscall.Kill(-pgid, 15)
return
}
}

View file

@ -60,7 +60,7 @@ func main() {
startTime := time.Now().Format("20060102150405")
logPrefix := *pcplogPrefix + "_" + startTime
scheduler := schedulers.NewBinPackedPistonCapper(tasks, *wattsAsAResource, logPrefix, *classMapWatts)
scheduler := schedulers.NewFirstFit(tasks, *wattsAsAResource, logPrefix, *classMapWatts)
driver, err := sched.NewMesosSchedulerDriver(sched.DriverConfig{
Master: *master,
Framework: &mesos.FrameworkInfo{
@ -74,8 +74,9 @@ func main() {
return
}
go pcp.Start(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix)
//go pcp.Start(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix)
//go pcp.StartPCPLogAndExtremaDynamicCap(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix, *hiThreshold, *loThreshold)
go pcp.StartPCPLogAndProgressiveExtremaCap(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix, *hiThreshold, *loThreshold)
time.Sleep(1 * time.Second) // Take a second between starting PCP log and continuing
// Attempt to handle signint to not leave pmdumptext running