Merged in progressiveExtrema (pull request #12)
ProgressiveExtrema Approved-by: Renan DelValle <rdelval1@binghamton.edu> Approved-by: ajain13 <ajain13@binghamton.edu>
This commit is contained in:
commit
8b7a57519b
14 changed files with 354 additions and 78 deletions
|
@ -17,6 +17,7 @@ To Do:
|
||||||
* Have a centralised logFile that can be filtered by identifier. All electron logs should go into this file.
|
* Have a centralised logFile that can be filtered by identifier. All electron logs should go into this file.
|
||||||
* Make def.Task an interface for further modularization and flexibility.
|
* Make def.Task an interface for further modularization and flexibility.
|
||||||
* Convert def#WattsToConsider(...) to be a receiver of def.Task and change the name of it to Watts(...).
|
* Convert def#WattsToConsider(...) to be a receiver of def.Task and change the name of it to Watts(...).
|
||||||
|
* Have a generic sorter for task resources instead of having one for each kind of resource.
|
||||||
|
|
||||||
**Requires [Performance Co-Pilot](http://pcp.io/) tool pmdumptext to be installed on the
|
**Requires [Performance Co-Pilot](http://pcp.io/) tool pmdumptext to be installed on the
|
||||||
machine on which electron is launched for logging to work and PCP collector agents installed
|
machine on which electron is launched for logging to work and PCP collector agents installed
|
||||||
|
|
|
@ -45,3 +45,6 @@ var Tolerance = 0.70
|
||||||
|
|
||||||
// Window size for running average
|
// Window size for running average
|
||||||
var ConsiderationWindowSize = 20
|
var ConsiderationWindowSize = 20
|
||||||
|
|
||||||
|
// Threshold below which a host should be capped
|
||||||
|
var LowerCapLimit = 12.5
|
||||||
|
|
275
pcp/logAndProgressiveExtrema.go
Normal file
275
pcp/logAndProgressiveExtrema.go
Normal file
|
@ -0,0 +1,275 @@
|
||||||
|
package pcp
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bitbucket.org/sunybingcloud/electron/constants"
|
||||||
|
"bitbucket.org/sunybingcloud/electron/rapl"
|
||||||
|
"bufio"
|
||||||
|
"container/ring"
|
||||||
|
"log"
|
||||||
|
"math"
|
||||||
|
"os"
|
||||||
|
"os/exec"
|
||||||
|
"sort"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
"bitbucket.org/sunybingcloud/electron/utilities"
|
||||||
|
)
|
||||||
|
|
||||||
|
func round(num float64) int {
|
||||||
|
return int(math.Floor(num + math.Copysign(0.5, num)))
|
||||||
|
}
|
||||||
|
|
||||||
|
func getNextCapValue(curCapValue float64, precision int) float64 {
|
||||||
|
curCapValue /= 2.0
|
||||||
|
output := math.Pow(10, float64(precision))
|
||||||
|
return float64(round(curCapValue*output)) / output
|
||||||
|
}
|
||||||
|
|
||||||
|
func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, prefix string, hiThreshold, loThreshold float64) {
|
||||||
|
log.Println("Inside Log and Progressive Extrema")
|
||||||
|
const pcpCommand string = "pmdumptext -m -l -f '' -t 1.0 -d , -c config"
|
||||||
|
cmd := exec.Command("sh", "-c", pcpCommand)
|
||||||
|
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
|
||||||
|
|
||||||
|
if hiThreshold < loThreshold {
|
||||||
|
log.Println("High threshold is lower than low threshold!")
|
||||||
|
}
|
||||||
|
|
||||||
|
logFile, err := os.Create("./" + prefix + ".pcplog")
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer logFile.Close()
|
||||||
|
|
||||||
|
pipe, err := cmd.StdoutPipe()
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
//cmd.Stdout = stdout
|
||||||
|
|
||||||
|
scanner := bufio.NewScanner(pipe)
|
||||||
|
|
||||||
|
go func(logging *bool, hiThreshold, loThreshold float64) {
|
||||||
|
// Get names of the columns
|
||||||
|
scanner.Scan()
|
||||||
|
|
||||||
|
// Write to logfile
|
||||||
|
logFile.WriteString(scanner.Text() + "\n")
|
||||||
|
|
||||||
|
headers := strings.Split(scanner.Text(), ",")
|
||||||
|
|
||||||
|
powerIndexes := make([]int, 0, 0)
|
||||||
|
powerHistories := make(map[string]*ring.Ring)
|
||||||
|
indexToHost := make(map[int]string)
|
||||||
|
|
||||||
|
for i, hostMetric := range headers {
|
||||||
|
metricSplit := strings.Split(hostMetric, ":")
|
||||||
|
//log.Printf("%d Host %s: Metric: %s\n", i, split[0], split[1])
|
||||||
|
|
||||||
|
if strings.Contains(metricSplit[1], "RAPL_ENERGY_PKG") ||
|
||||||
|
strings.Contains(metricSplit[1], "RAPL_ENERGY_DRAM") {
|
||||||
|
//fmt.Println("Index: ", i)
|
||||||
|
powerIndexes = append(powerIndexes, i)
|
||||||
|
indexToHost[i] = metricSplit[0]
|
||||||
|
|
||||||
|
// Only create one ring per host
|
||||||
|
if _, ok := powerHistories[metricSplit[0]]; !ok {
|
||||||
|
powerHistories[metricSplit[0]] = ring.New(20) // Two PKGS, two DRAM per node, 20 = 5 seconds of tracking
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Throw away first set of results
|
||||||
|
scanner.Scan()
|
||||||
|
|
||||||
|
// To keep track of the capped states of the capped victims
|
||||||
|
cappedVictims := make(map[string]float64)
|
||||||
|
// TODO: Come with a better name for this.
|
||||||
|
orderCapped := make([]string, 0, 8)
|
||||||
|
// TODO: Change this to a priority queue ordered by the cap value. This will get rid of the sorting performed in the code.
|
||||||
|
// Parallel data structure to orderCapped to keep track of the uncapped states of the uncapped victims
|
||||||
|
orderCappedVictims := make(map[string]float64)
|
||||||
|
clusterPowerHist := ring.New(5)
|
||||||
|
seconds := 0
|
||||||
|
|
||||||
|
for scanner.Scan() {
|
||||||
|
if *logging {
|
||||||
|
log.Println("Logging PCP...")
|
||||||
|
split := strings.Split(scanner.Text(), ",")
|
||||||
|
logFile.WriteString(scanner.Text() + "\n")
|
||||||
|
|
||||||
|
totalPower := 0.0
|
||||||
|
for _, powerIndex := range powerIndexes {
|
||||||
|
power, _ := strconv.ParseFloat(split[powerIndex], 64)
|
||||||
|
|
||||||
|
host := indexToHost[powerIndex]
|
||||||
|
|
||||||
|
powerHistories[host].Value = power
|
||||||
|
powerHistories[host] = powerHistories[host].Next()
|
||||||
|
|
||||||
|
log.Printf("Host: %s, Power: %f", indexToHost[powerIndex], (power * RAPLUnits))
|
||||||
|
|
||||||
|
totalPower += power
|
||||||
|
}
|
||||||
|
clusterPower := totalPower * RAPLUnits
|
||||||
|
|
||||||
|
clusterPowerHist.Value = clusterPower
|
||||||
|
clusterPowerHist = clusterPowerHist.Next()
|
||||||
|
|
||||||
|
clusterMean := averageClusterPowerHistory(clusterPowerHist)
|
||||||
|
|
||||||
|
log.Printf("Total power: %f, %d Sec Avg: %f", clusterPower, clusterPowerHist.Len(), clusterMean)
|
||||||
|
|
||||||
|
if clusterMean >= hiThreshold {
|
||||||
|
log.Println("Need to cap a node")
|
||||||
|
log.Printf("Cap values of capped victims: %v", cappedVictims)
|
||||||
|
log.Printf("Cap values of victims to uncap: %v", orderCappedVictims)
|
||||||
|
// Create statics for all victims and choose one to cap
|
||||||
|
victims := make([]Victim, 0, 8)
|
||||||
|
|
||||||
|
// TODO: Just keep track of the largest to reduce fron nlogn to n
|
||||||
|
for name, history := range powerHistories {
|
||||||
|
|
||||||
|
histMean := averageNodePowerHistory(history)
|
||||||
|
|
||||||
|
// Consider doing mean calculations using go routines if we need to speed up
|
||||||
|
victims = append(victims, Victim{Watts: histMean, Host: name})
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Sort(VictimSorter(victims)) // Sort by average wattage
|
||||||
|
|
||||||
|
// Finding the best victim to cap in a round robin manner
|
||||||
|
newVictimFound := false
|
||||||
|
alreadyCappedHosts := []string{} // Host-names of victims that are already capped
|
||||||
|
for i := 0; i < len(victims); i++ {
|
||||||
|
// Try to pick a victim that hasn't been capped yet
|
||||||
|
if _, ok := cappedVictims[victims[i].Host]; !ok {
|
||||||
|
// If this victim can't be capped further, then we move on to find another victim
|
||||||
|
if _, ok := orderCappedVictims[victims[i].Host]; ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Need to cap this victim
|
||||||
|
if err := rapl.Cap(victims[i].Host, "rapl", 50.0); err != nil {
|
||||||
|
log.Printf("Error capping host %s", victims[i].Host)
|
||||||
|
} else {
|
||||||
|
log.Printf("Capped host[%s] at %f", victims[i].Host, 50.0)
|
||||||
|
// Keeping track of this victim and it's cap value
|
||||||
|
cappedVictims[victims[i].Host] = 50.0
|
||||||
|
newVictimFound = true
|
||||||
|
// This node can be uncapped and hence adding to orderCapped
|
||||||
|
orderCapped = append(orderCapped, victims[i].Host)
|
||||||
|
orderCappedVictims[victims[i].Host] = 50.0
|
||||||
|
break // Breaking only on successful cap
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
alreadyCappedHosts = append(alreadyCappedHosts, victims[i].Host)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// If no new victim found, then we need to cap the best victim among the ones that are already capped
|
||||||
|
if !newVictimFound {
|
||||||
|
canCapAlreadyCappedVictim := false
|
||||||
|
for i := 0; i < len(alreadyCappedHosts); i++ {
|
||||||
|
// If already capped then the host must be present in orderCappedVictims
|
||||||
|
capValue := orderCappedVictims[alreadyCappedHosts[i]]
|
||||||
|
// If capValue is greater than the threshold then cap, else continue
|
||||||
|
if capValue > constants.LowerCapLimit {
|
||||||
|
newCapValue := getNextCapValue(capValue, 2)
|
||||||
|
if err := rapl.Cap(alreadyCappedHosts[i], "rapl", newCapValue); err != nil {
|
||||||
|
log.Printf("Error capping host[%s]", alreadyCappedHosts[i])
|
||||||
|
} else {
|
||||||
|
// Successful cap
|
||||||
|
log.Printf("Capped host[%s] at %f", alreadyCappedHosts[i], newCapValue)
|
||||||
|
// Checking whether this victim can be capped further
|
||||||
|
if newCapValue <= constants.LowerCapLimit {
|
||||||
|
// Deleting victim from cappedVictims
|
||||||
|
delete(cappedVictims, alreadyCappedHosts[i])
|
||||||
|
// Updating the cap value in orderCappedVictims
|
||||||
|
orderCappedVictims[alreadyCappedHosts[i]] = newCapValue
|
||||||
|
} else {
|
||||||
|
// Updating the cap value
|
||||||
|
cappedVictims[alreadyCappedHosts[i]] = newCapValue
|
||||||
|
orderCappedVictims[alreadyCappedHosts[i]] = newCapValue
|
||||||
|
}
|
||||||
|
canCapAlreadyCappedVictim = true
|
||||||
|
break // Breaking only on successful cap.
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Do nothing
|
||||||
|
// Continue to find another victim to cap.
|
||||||
|
// If cannot find any victim, then all nodes have been capped to the maximum and we stop capping at this point.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!canCapAlreadyCappedVictim) {
|
||||||
|
log.Println("No Victim left to cap.")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} else if clusterMean < loThreshold {
|
||||||
|
log.Println("Need to uncap a node")
|
||||||
|
log.Printf("Cap values of capped victims: %v", cappedVictims)
|
||||||
|
log.Printf("Cap values of victims to uncap: %v", orderCappedVictims)
|
||||||
|
if len(orderCapped) > 0 {
|
||||||
|
// We pick the host that is capped the most to uncap
|
||||||
|
orderCappedToSort := utilities.GetPairList(orderCappedVictims)
|
||||||
|
sort.Sort(orderCappedToSort) // Sorted hosts in non-decreasing order of capped states
|
||||||
|
hostToUncap := orderCappedToSort[0].Key
|
||||||
|
// Uncapping the host.
|
||||||
|
// This is a floating point operation and might suffer from precision loss.
|
||||||
|
newUncapValue := orderCappedVictims[hostToUncap] * 2.0
|
||||||
|
if err := rapl.Cap(hostToUncap, "rapl", newUncapValue); err != nil {
|
||||||
|
log.Printf("Error uncapping host[%s]", hostToUncap)
|
||||||
|
} else {
|
||||||
|
// Successful uncap
|
||||||
|
log.Printf("Uncapped host[%s] to %f", hostToUncap, newUncapValue)
|
||||||
|
// Can we uncap this host further. If not, then we remove its entry from orderCapped
|
||||||
|
if newUncapValue >= 100.0 { // can compare using ==
|
||||||
|
// Deleting entry from orderCapped
|
||||||
|
for i, victimHost := range orderCapped {
|
||||||
|
if victimHost == hostToUncap {
|
||||||
|
orderCapped = append(orderCapped[:i], orderCapped[i+1:]...)
|
||||||
|
break // We are done removing host from orderCapped
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Removing entry for host from the parallel data structure
|
||||||
|
delete(orderCappedVictims, hostToUncap)
|
||||||
|
// Removing entry from cappedVictims as this host is no longer capped
|
||||||
|
delete(cappedVictims, hostToUncap)
|
||||||
|
} else if newUncapValue > constants.LowerCapLimit { // this check is unnecessary and can be converted to 'else'
|
||||||
|
// Updating the cap value
|
||||||
|
orderCappedVictims[hostToUncap] = newUncapValue
|
||||||
|
cappedVictims[hostToUncap] = newUncapValue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.Println("No host staged for Uncapping")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
seconds++
|
||||||
|
}
|
||||||
|
|
||||||
|
}(logging, hiThreshold, loThreshold)
|
||||||
|
|
||||||
|
log.Println("PCP logging started")
|
||||||
|
|
||||||
|
if err := cmd.Start(); err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
pgid, err := syscall.Getpgid(cmd.Process.Pid)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-quit:
|
||||||
|
log.Println("Stopping PCP logging in 5 seconds")
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
|
||||||
|
// http://stackoverflow.com/questions/22470193/why-wont-go-kill-a-child-process-correctly
|
||||||
|
// kill process and all children processes
|
||||||
|
syscall.Kill(-pgid, 15)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -5,7 +5,6 @@ import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"container/ring"
|
"container/ring"
|
||||||
"log"
|
"log"
|
||||||
"math"
|
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"sort"
|
"sort"
|
||||||
|
@ -15,49 +14,6 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
var RAPLUnits = math.Pow(2, -32)
|
|
||||||
|
|
||||||
func averageNodePowerHistory(history *ring.Ring) float64 {
|
|
||||||
|
|
||||||
total := 0.0
|
|
||||||
count := 0.0
|
|
||||||
|
|
||||||
history.Do(func(x interface{}) {
|
|
||||||
if val, ok := x.(float64); ok { //Add it if we can get a float
|
|
||||||
total += val
|
|
||||||
count++
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
if count == 0.0 {
|
|
||||||
return 0.0
|
|
||||||
}
|
|
||||||
|
|
||||||
count /= 4 // two PKGs, two DRAM for all nodes currently
|
|
||||||
|
|
||||||
return (total / count)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: Figure a way to merge this and avgpower
|
|
||||||
func averageClusterPowerHistory(history *ring.Ring) float64 {
|
|
||||||
|
|
||||||
total := 0.0
|
|
||||||
count := 0.0
|
|
||||||
|
|
||||||
history.Do(func(x interface{}) {
|
|
||||||
if val, ok := x.(float64); ok { //Add it if we can get a float
|
|
||||||
total += val
|
|
||||||
count++
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
if count == 0.0 {
|
|
||||||
return 0.0
|
|
||||||
}
|
|
||||||
|
|
||||||
return (total / count)
|
|
||||||
}
|
|
||||||
|
|
||||||
func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, prefix string, hiThreshold, loThreshold float64) {
|
func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, prefix string, hiThreshold, loThreshold float64) {
|
||||||
const pcpCommand string = "pmdumptext -m -l -f '' -t 1.0 -d , -c config"
|
const pcpCommand string = "pmdumptext -m -l -f '' -t 1.0 -d , -c config"
|
||||||
cmd := exec.Command("sh", "-c", pcpCommand)
|
cmd := exec.Command("sh", "-c", pcpCommand)
|
||||||
|
|
49
pcp/utils.go
Normal file
49
pcp/utils.go
Normal file
|
@ -0,0 +1,49 @@
|
||||||
|
package pcp
|
||||||
|
|
||||||
|
import (
|
||||||
|
"container/ring"
|
||||||
|
"math"
|
||||||
|
)
|
||||||
|
|
||||||
|
var RAPLUnits = math.Pow(2, -32)
|
||||||
|
|
||||||
|
func averageNodePowerHistory(history *ring.Ring) float64 {
|
||||||
|
|
||||||
|
total := 0.0
|
||||||
|
count := 0.0
|
||||||
|
|
||||||
|
history.Do(func(x interface{}) {
|
||||||
|
if val, ok := x.(float64); ok { //Add it if we can get a float
|
||||||
|
total += val
|
||||||
|
count++
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
if count == 0.0 {
|
||||||
|
return 0.0
|
||||||
|
}
|
||||||
|
|
||||||
|
count /= 4 // two PKGs, two DRAM for all nodes currently
|
||||||
|
|
||||||
|
return (total / count)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Figure a way to merge this and avgpower
|
||||||
|
func averageClusterPowerHistory(history *ring.Ring) float64 {
|
||||||
|
|
||||||
|
total := 0.0
|
||||||
|
count := 0.0
|
||||||
|
|
||||||
|
history.Do(func(x interface{}) {
|
||||||
|
if val, ok := x.(float64); ok { //Add it if we can get a float
|
||||||
|
total += val
|
||||||
|
count++
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
if count == 0.0 {
|
||||||
|
return 0.0
|
||||||
|
}
|
||||||
|
|
||||||
|
return (total / count)
|
||||||
|
}
|
|
@ -6,7 +6,7 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Cap(host, username string, percentage int) error {
|
func Cap(host, username string, percentage float64) error {
|
||||||
|
|
||||||
if percentage > 100 || percentage < 0 {
|
if percentage > 100 || percentage < 0 {
|
||||||
return errors.New("Percentage is out of range")
|
return errors.New("Percentage is out of range")
|
||||||
|
@ -31,7 +31,7 @@ func Cap(host, username string, percentage int) error {
|
||||||
return errors.Wrap(err, "Failed to create session")
|
return errors.Wrap(err, "Failed to create session")
|
||||||
}
|
}
|
||||||
|
|
||||||
err = session.Run("sudo /misc/shared_data/rdelval1/RAPL_PKG_Throttle.py " + strconv.Itoa(percentage))
|
err = session.Run("sudo /misc/shared_data/rdelval1/RAPL_PKG_Throttle.py " + strconv.FormatFloat(percentage, 'f', 2, 64))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "Failed to run RAPL script")
|
return errors.Wrap(err, "Failed to run RAPL script")
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,11 +27,11 @@ var classMapWatts = flag.Bool("classMapWatts", false, "Enable mapping of watts t
|
||||||
func init() {
|
func init() {
|
||||||
flag.StringVar(master, "m", "xavier:5050", "Location of leading Mesos master (shorthand)")
|
flag.StringVar(master, "m", "xavier:5050", "Location of leading Mesos master (shorthand)")
|
||||||
flag.StringVar(tasksFile, "w", "", "JSON file containing task definitions (shorthand)")
|
flag.StringVar(tasksFile, "w", "", "JSON file containing task definitions (shorthand)")
|
||||||
flag.BoolVar(wattsAsAResource, "waar", false, "Enable Watts as a Resource")
|
flag.BoolVar(wattsAsAResource, "waar", false, "Enable Watts as a Resource (shorthand)")
|
||||||
flag.StringVar(pcplogPrefix, "p", "", "Prefix for pcplog (shorthand)")
|
flag.StringVar(pcplogPrefix, "p", "", "Prefix for pcplog (shorthand)")
|
||||||
flag.Float64Var(hiThreshold, "ht", 700.0, "Upperbound for when we should start capping (shorthand)")
|
flag.Float64Var(hiThreshold, "ht", 700.0, "Upperbound for when we should start capping (shorthand)")
|
||||||
flag.Float64Var(loThreshold, "lt", 400.0, "Lowerbound for when we should start uncapping (shorthand)")
|
flag.Float64Var(loThreshold, "lt", 400.0, "Lowerbound for when we should start uncapping (shorthand)")
|
||||||
flag.BoolVar(classMapWatts, "cmw", false, "Enable mapping of watts to power class of node")
|
flag.BoolVar(classMapWatts, "cmw", false, "Enable mapping of watts to power class of node (shorthand)")
|
||||||
}
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
@ -60,7 +60,7 @@ func main() {
|
||||||
startTime := time.Now().Format("20060102150405")
|
startTime := time.Now().Format("20060102150405")
|
||||||
logPrefix := *pcplogPrefix + "_" + startTime
|
logPrefix := *pcplogPrefix + "_" + startTime
|
||||||
|
|
||||||
scheduler := schedulers.NewBinPackedPistonCapper(tasks, *wattsAsAResource, logPrefix, *classMapWatts)
|
scheduler := schedulers.NewFirstFit(tasks, *wattsAsAResource, logPrefix, *classMapWatts)
|
||||||
driver, err := sched.NewMesosSchedulerDriver(sched.DriverConfig{
|
driver, err := sched.NewMesosSchedulerDriver(sched.DriverConfig{
|
||||||
Master: *master,
|
Master: *master,
|
||||||
Framework: &mesos.FrameworkInfo{
|
Framework: &mesos.FrameworkInfo{
|
||||||
|
@ -74,8 +74,9 @@ func main() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
go pcp.Start(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix)
|
//go pcp.Start(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix)
|
||||||
//go pcp.StartPCPLogAndExtremaDynamicCap(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix, *hiThreshold, *loThreshold)
|
//go pcp.StartPCPLogAndExtremaDynamicCap(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix, *hiThreshold, *loThreshold)
|
||||||
|
go pcp.StartPCPLogAndProgressiveExtremaCap(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix, *hiThreshold, *loThreshold)
|
||||||
time.Sleep(1 * time.Second) // Take a second between starting PCP log and continuing
|
time.Sleep(1 * time.Second) // Take a second between starting PCP log and continuing
|
||||||
|
|
||||||
// Attempt to handle signint to not leave pmdumptext running
|
// Attempt to handle signint to not leave pmdumptext running
|
||||||
|
|
|
@ -7,7 +7,6 @@ To Do:
|
||||||
* Fix the race condition on 'tasksRunning' in proactiveclusterwidecappingfcfs.go and proactiveclusterwidecappingranked.go
|
* Fix the race condition on 'tasksRunning' in proactiveclusterwidecappingfcfs.go and proactiveclusterwidecappingranked.go
|
||||||
* **Critical**: Separate the capping strategies from the scheduling algorithms and make it possible to use any capping strategy with any scheduler.
|
* **Critical**: Separate the capping strategies from the scheduling algorithms and make it possible to use any capping strategy with any scheduler.
|
||||||
* Create a package that would contain routines to perform various logging and move helpers.coLocated(...) into that.
|
* Create a package that would contain routines to perform various logging and move helpers.coLocated(...) into that.
|
||||||
* Move all the common struct members from all schedulers into base.go.
|
|
||||||
|
|
||||||
Scheduling Algorithms:
|
Scheduling Algorithms:
|
||||||
|
|
||||||
|
|
|
@ -157,7 +157,7 @@ func (s *BinPackedPistonCapper) Disconnected(sched.SchedulerDriver) {
|
||||||
var bpPistonCapValues = make(map[string]float64)
|
var bpPistonCapValues = make(map[string]float64)
|
||||||
|
|
||||||
// Storing the previous cap value for each host so as to not repeatedly cap the nodes to the same value. (reduces overhead)
|
// Storing the previous cap value for each host so as to not repeatedly cap the nodes to the same value. (reduces overhead)
|
||||||
var bpPistonPreviousRoundedCapValues = make(map[string]int)
|
var bpPistonPreviousRoundedCapValues = make(map[string]float64)
|
||||||
|
|
||||||
func (s *BinPackedPistonCapper) startCapping() {
|
func (s *BinPackedPistonCapper) startCapping() {
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -167,7 +167,7 @@ func (s *BinPackedPistonCapper) startCapping() {
|
||||||
// Need to cap each node
|
// Need to cap each node
|
||||||
bpPistonMutex.Lock()
|
bpPistonMutex.Lock()
|
||||||
for host, capValue := range bpPistonCapValues {
|
for host, capValue := range bpPistonCapValues {
|
||||||
roundedCapValue := int(math.Floor(capValue + 0.5))
|
roundedCapValue := float64(int(math.Floor(capValue + 0.5)))
|
||||||
// has the cap value changed
|
// has the cap value changed
|
||||||
if prevRoundedCap, ok := bpPistonPreviousRoundedCapValues[host]; ok {
|
if prevRoundedCap, ok := bpPistonPreviousRoundedCapValues[host]; ok {
|
||||||
if prevRoundedCap != roundedCapValue {
|
if prevRoundedCap != roundedCapValue {
|
||||||
|
|
|
@ -159,7 +159,7 @@ var bpMaxMinPistonCappingMutex sync.Mutex
|
||||||
var bpMaxMinPistonCappingCapValues = make(map[string]float64)
|
var bpMaxMinPistonCappingCapValues = make(map[string]float64)
|
||||||
|
|
||||||
// Storing the previous cap value for each host so as to not repeatedly cap the nodes to the same value. (reduces overhead)
|
// Storing the previous cap value for each host so as to not repeatedly cap the nodes to the same value. (reduces overhead)
|
||||||
var bpMaxMinPistonCappingPreviousRoundedCapValues = make(map[string]int)
|
var bpMaxMinPistonCappingPreviousRoundedCapValues = make(map[string]float64)
|
||||||
|
|
||||||
func (s *BPSWMaxMinPistonCapping) startCapping() {
|
func (s *BPSWMaxMinPistonCapping) startCapping() {
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -169,7 +169,7 @@ func (s *BPSWMaxMinPistonCapping) startCapping() {
|
||||||
// Need to cap each node
|
// Need to cap each node
|
||||||
bpMaxMinPistonCappingMutex.Lock()
|
bpMaxMinPistonCappingMutex.Lock()
|
||||||
for host, capValue := range bpMaxMinPistonCappingCapValues {
|
for host, capValue := range bpMaxMinPistonCappingCapValues {
|
||||||
roundedCapValue := int(math.Floor(capValue + 0.5))
|
roundedCapValue := float64(int(math.Floor(capValue + 0.5)))
|
||||||
// has the cap value changed
|
// has the cap value changed
|
||||||
if previousRoundedCap, ok := bpMaxMinPistonCappingPreviousRoundedCapValues[host]; ok {
|
if previousRoundedCap, ok := bpMaxMinPistonCappingPreviousRoundedCapValues[host]; ok {
|
||||||
if previousRoundedCap != roundedCapValue {
|
if previousRoundedCap != roundedCapValue {
|
||||||
|
|
|
@ -166,7 +166,7 @@ func (s *BPSWMaxMinProacCC) startCapping() {
|
||||||
if bpMaxMinProacCCCapValue > 0.0 {
|
if bpMaxMinProacCCCapValue > 0.0 {
|
||||||
for _, host := range constants.Hosts {
|
for _, host := range constants.Hosts {
|
||||||
// Rounding cap value to nearest int
|
// Rounding cap value to nearest int
|
||||||
if err := rapl.Cap(host, "rapl", int(math.Floor(bpMaxMinProacCCCapValue+0.5))); err != nil {
|
if err := rapl.Cap(host, "rapl", float64(int(math.Floor(bpMaxMinProacCCCapValue+0.5)))); err != nil {
|
||||||
log.Println(err)
|
log.Println(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -192,7 +192,7 @@ func (s *BPSWMaxMinProacCC) startRecapping() {
|
||||||
if s.isRecapping && bpMaxMinProacCCRecapValue > 0.0 {
|
if s.isRecapping && bpMaxMinProacCCRecapValue > 0.0 {
|
||||||
for _, host := range constants.Hosts {
|
for _, host := range constants.Hosts {
|
||||||
// Rounding the recap value to the nearest int
|
// Rounding the recap value to the nearest int
|
||||||
if err := rapl.Cap(host, "rapl", int(math.Floor(bpMaxMinProacCCRecapValue+0.5))); err != nil {
|
if err := rapl.Cap(host, "rapl", float64(int(math.Floor(bpMaxMinProacCCRecapValue+0.5)))); err != nil {
|
||||||
log.Println(err)
|
log.Println(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -166,7 +166,7 @@ func (s *FirstFitProacCC) startCapping() {
|
||||||
if fcfsCurrentCapValue > 0.0 {
|
if fcfsCurrentCapValue > 0.0 {
|
||||||
for _, host := range constants.Hosts {
|
for _, host := range constants.Hosts {
|
||||||
// Rounding curreCapValue to the nearest int.
|
// Rounding curreCapValue to the nearest int.
|
||||||
if err := rapl.Cap(host, "rapl", int(math.Floor(fcfsCurrentCapValue+0.5))); err != nil {
|
if err := rapl.Cap(host, "rapl", float64(int(math.Floor(fcfsCurrentCapValue+0.5)))); err != nil {
|
||||||
log.Println(err)
|
log.Println(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -190,7 +190,7 @@ func (s *FirstFitProacCC) startRecapping() {
|
||||||
if s.isRecapping && fcfsRecapValue > 0.0 {
|
if s.isRecapping && fcfsRecapValue > 0.0 {
|
||||||
for _, host := range constants.Hosts {
|
for _, host := range constants.Hosts {
|
||||||
// Rounding curreCapValue to the nearest int.
|
// Rounding curreCapValue to the nearest int.
|
||||||
if err := rapl.Cap(host, "rapl", int(math.Floor(fcfsRecapValue+0.5))); err != nil {
|
if err := rapl.Cap(host, "rapl", float64(int(math.Floor(fcfsRecapValue+0.5)))); err != nil {
|
||||||
log.Println(err)
|
log.Println(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -179,7 +179,7 @@ func (s *FirstFitSortedWattsProacCC) startCapping() {
|
||||||
if rankedCurrentCapValue > 0.0 {
|
if rankedCurrentCapValue > 0.0 {
|
||||||
for _, host := range constants.Hosts {
|
for _, host := range constants.Hosts {
|
||||||
// Rounding currentCapValue to the nearest int.
|
// Rounding currentCapValue to the nearest int.
|
||||||
if err := rapl.Cap(host, "rapl", int(math.Floor(rankedCurrentCapValue+0.5))); err != nil {
|
if err := rapl.Cap(host, "rapl", float64(int(math.Floor(rankedCurrentCapValue+0.5)))); err != nil {
|
||||||
log.Println(err)
|
log.Println(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -203,7 +203,7 @@ func (s *FirstFitSortedWattsProacCC) startRecapping() {
|
||||||
if s.isRecapping && rankedRecapValue > 0.0 {
|
if s.isRecapping && rankedRecapValue > 0.0 {
|
||||||
for _, host := range constants.Hosts {
|
for _, host := range constants.Hosts {
|
||||||
// Rounding currentCapValue to the nearest int.
|
// Rounding currentCapValue to the nearest int.
|
||||||
if err := rapl.Cap(host, "rapl", int(math.Floor(rankedRecapValue+0.5))); err != nil {
|
if err := rapl.Cap(host, "rapl", float64(int(math.Floor(rankedRecapValue+0.5)))); err != nil {
|
||||||
log.Println(err)
|
log.Println(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,9 +1,5 @@
|
||||||
package utilities
|
package utilities
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
)
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
The Pair and PairList have been taken from google groups forum,
|
The Pair and PairList have been taken from google groups forum,
|
||||||
https://groups.google.com/forum/#!topic/golang-nuts/FT7cjmcL7gw
|
https://groups.google.com/forum/#!topic/golang-nuts/FT7cjmcL7gw
|
||||||
|
@ -18,6 +14,15 @@ type Pair struct {
|
||||||
// A slice of pairs that implements the sort.Interface to sort by value.
|
// A slice of pairs that implements the sort.Interface to sort by value.
|
||||||
type PairList []Pair
|
type PairList []Pair
|
||||||
|
|
||||||
|
// Convert map[string]float64 to PairList
|
||||||
|
func GetPairList(m map[string]float64) PairList {
|
||||||
|
pl := PairList{}
|
||||||
|
for k, v := range m {
|
||||||
|
pl = append(pl, Pair{Key: k, Value: v})
|
||||||
|
}
|
||||||
|
return pl
|
||||||
|
}
|
||||||
|
|
||||||
// Swap pairs in the PairList
|
// Swap pairs in the PairList
|
||||||
func (plist PairList) Swap(i, j int) {
|
func (plist PairList) Swap(i, j int) {
|
||||||
plist[i], plist[j] = plist[j], plist[i]
|
plist[i], plist[j] = plist[j], plist[i]
|
||||||
|
@ -32,16 +37,3 @@ func (plist PairList) Len() int {
|
||||||
func (plist PairList) Less(i, j int) bool {
|
func (plist PairList) Less(i, j int) bool {
|
||||||
return plist[i].Value < plist[j].Value
|
return plist[i].Value < plist[j].Value
|
||||||
}
|
}
|
||||||
|
|
||||||
// convert a PairList to a map[string]float64
|
|
||||||
func OrderedKeys(plist PairList) ([]string, error) {
|
|
||||||
// Validation
|
|
||||||
if plist == nil {
|
|
||||||
return nil, errors.New("Invalid argument: plist")
|
|
||||||
}
|
|
||||||
orderedKeys := make([]string, len(plist))
|
|
||||||
for _, pair := range plist {
|
|
||||||
orderedKeys = append(orderedKeys, pair.Key)
|
|
||||||
}
|
|
||||||
return orderedKeys, nil
|
|
||||||
}
|
|
||||||
|
|
Reference in a new issue