Fixed the comments to be capitalized at the start and also terminate with a period.
This commit is contained in:
parent
577120ae7c
commit
b807625b78
19 changed files with 194 additions and 201 deletions
|
@ -1,9 +1,4 @@
|
||||||
/*
|
// TODO: Clean this up and use Mesos Attributes instead.
|
||||||
Constants that are used across scripts
|
|
||||||
1. Tolerance = tolerance for a task that when exceeded would starve the task.
|
|
||||||
2. ConsiderationWindowSize = number of tasks to consider for computation of the dynamic cap.
|
|
||||||
TODO: Clean this up and use Mesos Attributes instead.
|
|
||||||
*/
|
|
||||||
package constants
|
package constants
|
||||||
|
|
||||||
var Hosts = make(map[string]struct{})
|
var Hosts = make(map[string]struct{})
|
||||||
|
|
|
@ -1,9 +1,9 @@
|
||||||
package def
|
package def
|
||||||
|
|
||||||
// the sortBy function that takes a task reference and returns the resource to consider when sorting.
|
// The sortBy function that takes a task reference and returns the resource to consider when sorting.
|
||||||
type sortBy func(task *Task) float64
|
type sortBy func(task *Task) float64
|
||||||
|
|
||||||
// Possible Sorting Criteria
|
// Possible Sorting Criteria.
|
||||||
// Each holds a closure that fetches the required resource from the
|
// Each holds a closure that fetches the required resource from the
|
||||||
// given task reference.
|
// given task reference.
|
||||||
var (
|
var (
|
||||||
|
|
|
@ -71,11 +71,11 @@ func (tsk *Task) SetTaskID(taskID string) bool {
|
||||||
Determine the watts value to consider for each task.
|
Determine the watts value to consider for each task.
|
||||||
|
|
||||||
This value could either be task.Watts or task.ClassToWatts[<power class>]
|
This value could either be task.Watts or task.ClassToWatts[<power class>]
|
||||||
If task.ClassToWatts is not present, then return task.Watts (this would be for workloads which don't have classMapWatts)
|
If task.ClassToWatts is not present, then return task.Watts (this would be for workloads which don't have classMapWatts).
|
||||||
*/
|
*/
|
||||||
func WattsToConsider(task Task, classMapWatts bool, offer *mesos.Offer) (float64, error) {
|
func WattsToConsider(task Task, classMapWatts bool, offer *mesos.Offer) (float64, error) {
|
||||||
if classMapWatts {
|
if classMapWatts {
|
||||||
// checking if ClassToWatts was present in the workload.
|
// Checking if ClassToWatts was present in the workload.
|
||||||
if task.ClassToWatts != nil {
|
if task.ClassToWatts != nil {
|
||||||
return task.ClassToWatts[offerUtils.PowerClass(offer)], nil
|
return task.ClassToWatts[offerUtils.PowerClass(offer)], nil
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -6,20 +6,20 @@ import (
|
||||||
"sort"
|
"sort"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Information about a cluster of tasks
|
// Information about a cluster of tasks.
|
||||||
type TaskCluster struct {
|
type TaskCluster struct {
|
||||||
ClusterIndex int
|
ClusterIndex int
|
||||||
Tasks []Task
|
Tasks []Task
|
||||||
SizeScore int // How many other clusters is this cluster bigger than
|
SizeScore int // How many other clusters is this cluster bigger than
|
||||||
}
|
}
|
||||||
|
|
||||||
// Classification of Tasks using KMeans clustering using the watts consumption observations
|
// Classification of Tasks using KMeans clustering using the watts consumption observations.
|
||||||
type TasksToClassify []Task
|
type TasksToClassify []Task
|
||||||
|
|
||||||
// Basic taskObservation calculator. This returns an array consisting of the MMPU requirements of a task.
|
// Basic taskObservation calculator. This returns an array consisting of the MMPU requirements of a task.
|
||||||
func (tc TasksToClassify) taskObservationCalculator(task Task) []float64 {
|
func (tc TasksToClassify) taskObservationCalculator(task Task) []float64 {
|
||||||
if task.ClassToWatts != nil {
|
if task.ClassToWatts != nil {
|
||||||
// taking the aggregate
|
// Taking the aggregate.
|
||||||
observations := []float64{}
|
observations := []float64{}
|
||||||
for _, watts := range task.ClassToWatts {
|
for _, watts := range task.ClassToWatts {
|
||||||
observations = append(observations, watts)
|
observations = append(observations, watts)
|
||||||
|
@ -29,7 +29,7 @@ func (tc TasksToClassify) taskObservationCalculator(task Task) []float64 {
|
||||||
return []float64{task.Watts}
|
return []float64{task.Watts}
|
||||||
} else {
|
} else {
|
||||||
log.Fatal("Unable to classify tasks. Missing Watts or ClassToWatts attribute in workload.")
|
log.Fatal("Unable to classify tasks. Missing Watts or ClassToWatts attribute in workload.")
|
||||||
return []float64{0.0} // won't reach here
|
return []float64{0.0} // Won't reach here.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -41,9 +41,9 @@ func ClassifyTasks(tasks []Task, numberOfClusters int) []TaskCluster {
|
||||||
func (tc TasksToClassify) classify(numberOfClusters int, taskObservation func(task Task) []float64) []TaskCluster {
|
func (tc TasksToClassify) classify(numberOfClusters int, taskObservation func(task Task) []float64) []TaskCluster {
|
||||||
clusters := make(map[int][]Task)
|
clusters := make(map[int][]Task)
|
||||||
observations := getObservations(tc, taskObservation)
|
observations := getObservations(tc, taskObservation)
|
||||||
// TODO: Make the max number of rounds configurable based on the size of the workload
|
// TODO: Make the max number of rounds configurable based on the size of the workload.
|
||||||
// The max number of rounds (currently defaulted to 100) is the number of iterations performed to obtain
|
// The max number of rounds (currently defaulted to 100) is the number of iterations performed to obtain
|
||||||
// distinct clusters. When the data size becomes very large, we would need more iterations for clustering.
|
// distinct clusters. When the data size becomes very large, we would need more iterations for clustering.
|
||||||
if trained, centroids := gokmeans.Train(observations, numberOfClusters, 100); trained {
|
if trained, centroids := gokmeans.Train(observations, numberOfClusters, 100); trained {
|
||||||
for i := 0; i < len(observations); i++ {
|
for i := 0; i < len(observations); i++ {
|
||||||
observation := observations[i]
|
observation := observations[i]
|
||||||
|
@ -58,7 +58,7 @@ func (tc TasksToClassify) classify(numberOfClusters int, taskObservation func(ta
|
||||||
return labelAndOrder(clusters, numberOfClusters, taskObservation)
|
return labelAndOrder(clusters, numberOfClusters, taskObservation)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Record observations
|
// Record observations.
|
||||||
func getObservations(tasks []Task, taskObservation func(task Task) []float64) []gokmeans.Node {
|
func getObservations(tasks []Task, taskObservation func(task Task) []float64) []gokmeans.Node {
|
||||||
observations := []gokmeans.Node{}
|
observations := []gokmeans.Node{}
|
||||||
for i := 0; i < len(tasks); i++ {
|
for i := 0; i < len(tasks); i++ {
|
||||||
|
@ -67,7 +67,7 @@ func getObservations(tasks []Task, taskObservation func(task Task) []float64) []
|
||||||
return observations
|
return observations
|
||||||
}
|
}
|
||||||
|
|
||||||
// Size tasks based on the power consumption
|
// Size tasks based on the power consumption.
|
||||||
// TODO: Size the cluster in a better way other than just taking an aggregate of the watts resource requirement.
|
// TODO: Size the cluster in a better way other than just taking an aggregate of the watts resource requirement.
|
||||||
func clusterSize(tasks []Task, taskObservation func(task Task) []float64) float64 {
|
func clusterSize(tasks []Task, taskObservation func(task Task) []float64) float64 {
|
||||||
size := 0.0
|
size := 0.0
|
||||||
|
@ -79,12 +79,12 @@ func clusterSize(tasks []Task, taskObservation func(task Task) []float64) float6
|
||||||
return size
|
return size
|
||||||
}
|
}
|
||||||
|
|
||||||
// Order clusters in increasing order of task heaviness
|
// Order clusters in increasing order of task heaviness.
|
||||||
func labelAndOrder(clusters map[int][]Task, numberOfClusters int, taskObservation func(task Task) []float64) []TaskCluster {
|
func labelAndOrder(clusters map[int][]Task, numberOfClusters int, taskObservation func(task Task) []float64) []TaskCluster {
|
||||||
// Determine the position of the cluster in the ordered list of clusters
|
// Determine the position of the cluster in the ordered list of clusters.
|
||||||
sizedClusters := []TaskCluster{}
|
sizedClusters := []TaskCluster{}
|
||||||
|
|
||||||
// Initializing
|
// Initializing.
|
||||||
for i := 0; i < numberOfClusters; i++ {
|
for i := 0; i < numberOfClusters; i++ {
|
||||||
sizedClusters = append(sizedClusters, TaskCluster{
|
sizedClusters = append(sizedClusters, TaskCluster{
|
||||||
ClusterIndex: i,
|
ClusterIndex: i,
|
||||||
|
@ -94,10 +94,10 @@ func labelAndOrder(clusters map[int][]Task, numberOfClusters int, taskObservatio
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < numberOfClusters-1; i++ {
|
for i := 0; i < numberOfClusters-1; i++ {
|
||||||
// Sizing the current cluster
|
// Sizing the current cluster.
|
||||||
sizeI := clusterSize(clusters[i], taskObservation)
|
sizeI := clusterSize(clusters[i], taskObservation)
|
||||||
|
|
||||||
// Comparing with the other clusters
|
// Comparing with the other clusters.
|
||||||
for j := i + 1; j < numberOfClusters; j++ {
|
for j := i + 1; j < numberOfClusters; j++ {
|
||||||
sizeJ := clusterSize(clusters[j], taskObservation)
|
sizeJ := clusterSize(clusters[j], taskObservation)
|
||||||
if sizeI > sizeJ {
|
if sizeI > sizeJ {
|
||||||
|
@ -108,7 +108,7 @@ func labelAndOrder(clusters map[int][]Task, numberOfClusters int, taskObservatio
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sorting the clusters based on sizeScore
|
// Sorting the clusters based on sizeScore.
|
||||||
sort.SliceStable(sizedClusters, func(i, j int) bool {
|
sort.SliceStable(sizedClusters, func(i, j int) bool {
|
||||||
return sizedClusters[i].SizeScore <= sizedClusters[j].SizeScore
|
return sizedClusters[i].SizeScore <= sizedClusters[j].SizeScore
|
||||||
})
|
})
|
||||||
|
|
|
@ -31,13 +31,13 @@ func Start(quit chan struct{}, logging *bool, prefix string) {
|
||||||
scanner := bufio.NewScanner(pipe)
|
scanner := bufio.NewScanner(pipe)
|
||||||
|
|
||||||
go func(logging *bool) {
|
go func(logging *bool) {
|
||||||
// Get names of the columns
|
// Get names of the columns.
|
||||||
scanner.Scan()
|
scanner.Scan()
|
||||||
|
|
||||||
// Write to logfile
|
// Write to logfile.
|
||||||
logFile.WriteString(scanner.Text() + "\n")
|
logFile.WriteString(scanner.Text() + "\n")
|
||||||
|
|
||||||
// Throw away first set of results
|
// Throw away first set of results.
|
||||||
scanner.Scan()
|
scanner.Scan()
|
||||||
|
|
||||||
seconds := 0
|
seconds := 0
|
||||||
|
@ -66,7 +66,7 @@ func Start(quit chan struct{}, logging *bool, prefix string) {
|
||||||
time.Sleep(5 * time.Second)
|
time.Sleep(5 * time.Second)
|
||||||
|
|
||||||
// http://stackoverflow.com/questions/22470193/why-wont-go-kill-a-child-process-correctly
|
// http://stackoverflow.com/questions/22470193/why-wont-go-kill-a-child-process-correctly
|
||||||
// kill process and all children processes
|
// Kill process and all children processes.
|
||||||
syscall.Kill(-pgid, 15)
|
syscall.Kill(-pgid, 15)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,19 +23,19 @@ func AverageNodePowerHistory(history *ring.Ring) float64 {
|
||||||
return 0.0
|
return 0.0
|
||||||
}
|
}
|
||||||
|
|
||||||
count /= 4 // two PKGs, two DRAM for all nodes currently
|
count /= 4 // Two PKGs, two DRAM for all nodes currently.
|
||||||
|
|
||||||
return (total / count)
|
return (total / count)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Figure a way to merge this and avgpower
|
// TODO: Figure a way to merge this and avgpower.
|
||||||
func AverageClusterPowerHistory(history *ring.Ring) float64 {
|
func AverageClusterPowerHistory(history *ring.Ring) float64 {
|
||||||
|
|
||||||
total := 0.0
|
total := 0.0
|
||||||
count := 0.0
|
count := 0.0
|
||||||
|
|
||||||
history.Do(func(x interface{}) {
|
history.Do(func(x interface{}) {
|
||||||
if val, ok := x.(float64); ok { //Add it if we can get a float
|
if val, ok := x.(float64); ok { // Add it if we can get a float.
|
||||||
total += val
|
total += val
|
||||||
count++
|
count++
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,10 +40,10 @@ func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, prefix s
|
||||||
scanner := bufio.NewScanner(pipe)
|
scanner := bufio.NewScanner(pipe)
|
||||||
|
|
||||||
go func(logging *bool, hiThreshold, loThreshold float64) {
|
go func(logging *bool, hiThreshold, loThreshold float64) {
|
||||||
// Get names of the columns
|
// Get names of the columns.
|
||||||
scanner.Scan()
|
scanner.Scan()
|
||||||
|
|
||||||
// Write to logfile
|
// Write to logfile.
|
||||||
logFile.WriteString(scanner.Text() + "\n")
|
logFile.WriteString(scanner.Text() + "\n")
|
||||||
|
|
||||||
headers := strings.Split(scanner.Text(), ",")
|
headers := strings.Split(scanner.Text(), ",")
|
||||||
|
@ -54,22 +54,21 @@ func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, prefix s
|
||||||
|
|
||||||
for i, hostMetric := range headers {
|
for i, hostMetric := range headers {
|
||||||
metricSplit := strings.Split(hostMetric, ":")
|
metricSplit := strings.Split(hostMetric, ":")
|
||||||
//log.Printf("%d Host %s: Metric: %s\n", i, split[0], split[1])
|
|
||||||
|
|
||||||
if strings.Contains(metricSplit[1], "RAPL_ENERGY_PKG") ||
|
if strings.Contains(metricSplit[1], "RAPL_ENERGY_PKG") ||
|
||||||
strings.Contains(metricSplit[1], "RAPL_ENERGY_DRAM") {
|
strings.Contains(metricSplit[1], "RAPL_ENERGY_DRAM") {
|
||||||
//fmt.Println("Index: ", i)
|
|
||||||
powerIndexes = append(powerIndexes, i)
|
powerIndexes = append(powerIndexes, i)
|
||||||
indexToHost[i] = metricSplit[0]
|
indexToHost[i] = metricSplit[0]
|
||||||
|
|
||||||
// Only create one ring per host
|
// Only create one ring per host.
|
||||||
if _, ok := powerHistories[metricSplit[0]]; !ok {
|
if _, ok := powerHistories[metricSplit[0]]; !ok {
|
||||||
powerHistories[metricSplit[0]] = ring.New(20) // Two PKGS, two DRAM per node, 20 = 5 seconds of tracking
|
// Two PKGS, two DRAM per node, 20 - 5 seconds of tracking.
|
||||||
|
powerHistories[metricSplit[0]] = ring.New(20)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Throw away first set of results
|
// Throw away first set of results.
|
||||||
scanner.Scan()
|
scanner.Scan()
|
||||||
|
|
||||||
cappedHosts := make(map[string]bool)
|
cappedHosts := make(map[string]bool)
|
||||||
|
@ -108,7 +107,7 @@ func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, prefix s
|
||||||
|
|
||||||
if clusterMean > hiThreshold {
|
if clusterMean > hiThreshold {
|
||||||
log.Printf("Need to cap a node")
|
log.Printf("Need to cap a node")
|
||||||
// Create statics for all victims and choose one to cap
|
// Create statics for all victims and choose one to cap.
|
||||||
victims := make([]pcp.Victim, 0, 8)
|
victims := make([]pcp.Victim, 0, 8)
|
||||||
|
|
||||||
// TODO: Just keep track of the largest to reduce fron nlogn to n
|
// TODO: Just keep track of the largest to reduce fron nlogn to n
|
||||||
|
@ -116,15 +115,15 @@ func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, prefix s
|
||||||
|
|
||||||
histMean := pcp.AverageNodePowerHistory(history)
|
histMean := pcp.AverageNodePowerHistory(history)
|
||||||
|
|
||||||
// Consider doing mean calculations using go routines if we need to speed up
|
// Consider doing mean calculations using go routines if we need to speed up.
|
||||||
victims = append(victims, pcp.Victim{Watts: histMean, Host: name})
|
victims = append(victims, pcp.Victim{Watts: histMean, Host: name})
|
||||||
}
|
}
|
||||||
|
|
||||||
sort.Sort(pcp.VictimSorter(victims)) // Sort by average wattage
|
sort.Sort(pcp.VictimSorter(victims)) // Sort by average wattage.
|
||||||
|
|
||||||
// From best victim to worst, if everyone is already capped NOOP
|
// From best victim to worst, if everyone is already capped NOOP.
|
||||||
for _, victim := range victims {
|
for _, victim := range victims {
|
||||||
// Only cap if host hasn't been capped yet
|
// Only cap if host hasn't been capped yet.
|
||||||
if !cappedHosts[victim.Host] {
|
if !cappedHosts[victim.Host] {
|
||||||
cappedHosts[victim.Host] = true
|
cappedHosts[victim.Host] = true
|
||||||
orderCapped = append(orderCapped, victim.Host)
|
orderCapped = append(orderCapped, victim.Host)
|
||||||
|
@ -132,7 +131,7 @@ func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, prefix s
|
||||||
if err := rapl.Cap(victim.Host, "rapl", 50); err != nil {
|
if err := rapl.Cap(victim.Host, "rapl", 50); err != nil {
|
||||||
log.Print("Error capping host")
|
log.Print("Error capping host")
|
||||||
}
|
}
|
||||||
break // Only cap one machine at at time
|
break // Only cap one machine at at time.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -142,7 +141,7 @@ func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, prefix s
|
||||||
host := orderCapped[len(orderCapped)-1]
|
host := orderCapped[len(orderCapped)-1]
|
||||||
orderCapped = orderCapped[:len(orderCapped)-1]
|
orderCapped = orderCapped[:len(orderCapped)-1]
|
||||||
cappedHosts[host] = false
|
cappedHosts[host] = false
|
||||||
// User RAPL package to send uncap
|
// User RAPL package to send uncap.
|
||||||
log.Printf("Uncapping host %s", host)
|
log.Printf("Uncapping host %s", host)
|
||||||
if err := rapl.Cap(host, "rapl", 100); err != nil {
|
if err := rapl.Cap(host, "rapl", 100); err != nil {
|
||||||
log.Print("Error uncapping host")
|
log.Print("Error uncapping host")
|
||||||
|
@ -169,7 +168,7 @@ func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, prefix s
|
||||||
time.Sleep(5 * time.Second)
|
time.Sleep(5 * time.Second)
|
||||||
|
|
||||||
// http://stackoverflow.com/questions/22470193/why-wont-go-kill-a-child-process-correctly
|
// http://stackoverflow.com/questions/22470193/why-wont-go-kill-a-child-process-correctly
|
||||||
// kill process and all children processes
|
// Kill process and all children processes.
|
||||||
syscall.Kill(-pgid, 15)
|
syscall.Kill(-pgid, 15)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,10 +54,10 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref
|
||||||
scanner := bufio.NewScanner(pipe)
|
scanner := bufio.NewScanner(pipe)
|
||||||
|
|
||||||
go func(logging *bool, hiThreshold, loThreshold float64) {
|
go func(logging *bool, hiThreshold, loThreshold float64) {
|
||||||
// Get names of the columns
|
// Get names of the columns.
|
||||||
scanner.Scan()
|
scanner.Scan()
|
||||||
|
|
||||||
// Write to logfile
|
// Write to logfile.
|
||||||
logFile.WriteString(scanner.Text() + "\n")
|
logFile.WriteString(scanner.Text() + "\n")
|
||||||
|
|
||||||
headers := strings.Split(scanner.Text(), ",")
|
headers := strings.Split(scanner.Text(), ",")
|
||||||
|
@ -68,30 +68,29 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref
|
||||||
|
|
||||||
for i, hostMetric := range headers {
|
for i, hostMetric := range headers {
|
||||||
metricSplit := strings.Split(hostMetric, ":")
|
metricSplit := strings.Split(hostMetric, ":")
|
||||||
//log.Printf("%d Host %s: Metric: %s\n", i, split[0], split[1])
|
|
||||||
|
|
||||||
if strings.Contains(metricSplit[1], "RAPL_ENERGY_PKG") ||
|
if strings.Contains(metricSplit[1], "RAPL_ENERGY_PKG") ||
|
||||||
strings.Contains(metricSplit[1], "RAPL_ENERGY_DRAM") {
|
strings.Contains(metricSplit[1], "RAPL_ENERGY_DRAM") {
|
||||||
//fmt.Println("Index: ", i)
|
|
||||||
powerIndexes = append(powerIndexes, i)
|
powerIndexes = append(powerIndexes, i)
|
||||||
indexToHost[i] = metricSplit[0]
|
indexToHost[i] = metricSplit[0]
|
||||||
|
|
||||||
// Only create one ring per host
|
// Only create one ring per host.
|
||||||
if _, ok := powerHistories[metricSplit[0]]; !ok {
|
if _, ok := powerHistories[metricSplit[0]]; !ok {
|
||||||
powerHistories[metricSplit[0]] = ring.New(20) // Two PKGS, two DRAM per node, 20 = 5 seconds of tracking
|
// Two PKGS, two DRAM per node, 20 = 5 seconds of tracking.
|
||||||
|
powerHistories[metricSplit[0]] = ring.New(20)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Throw away first set of results
|
// Throw away first set of results.
|
||||||
scanner.Scan()
|
scanner.Scan()
|
||||||
|
|
||||||
// To keep track of the capped states of the capped victims
|
// To keep track of the capped states of the capped victims.
|
||||||
cappedVictims := make(map[string]float64)
|
cappedVictims := make(map[string]float64)
|
||||||
// TODO: Come with a better name for this.
|
// TODO: Come with a better name for this.
|
||||||
orderCapped := make([]string, 0, 8)
|
orderCapped := make([]string, 0, 8)
|
||||||
// TODO: Change this to a priority queue ordered by the cap value. This will get rid of the sorting performed in the code.
|
// TODO: Change this to a priority queue ordered by the cap value. This will get rid of the sorting performed in the code.
|
||||||
// Parallel data structure to orderCapped to keep track of the uncapped states of the uncapped victims
|
// Parallel data structure to orderCapped to keep track of the uncapped states of the uncapped victims.
|
||||||
orderCappedVictims := make(map[string]float64)
|
orderCappedVictims := make(map[string]float64)
|
||||||
clusterPowerHist := ring.New(5)
|
clusterPowerHist := ring.New(5)
|
||||||
seconds := 0
|
seconds := 0
|
||||||
|
@ -128,7 +127,7 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref
|
||||||
log.Println("Need to cap a node")
|
log.Println("Need to cap a node")
|
||||||
log.Printf("Cap values of capped victims: %v", cappedVictims)
|
log.Printf("Cap values of capped victims: %v", cappedVictims)
|
||||||
log.Printf("Cap values of victims to uncap: %v", orderCappedVictims)
|
log.Printf("Cap values of victims to uncap: %v", orderCappedVictims)
|
||||||
// Create statics for all victims and choose one to cap
|
// Create statics for all victims and choose one to cap.
|
||||||
victims := make([]pcp.Victim, 0, 8)
|
victims := make([]pcp.Victim, 0, 8)
|
||||||
|
|
||||||
// TODO: Just keep track of the largest to reduce fron nlogn to n
|
// TODO: Just keep track of the largest to reduce fron nlogn to n
|
||||||
|
@ -136,46 +135,46 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref
|
||||||
|
|
||||||
histMean := pcp.AverageNodePowerHistory(history)
|
histMean := pcp.AverageNodePowerHistory(history)
|
||||||
|
|
||||||
// Consider doing mean calculations using go routines if we need to speed up
|
// Consider doing mean calculations using go routines if we need to speed up.
|
||||||
victims = append(victims, pcp.Victim{Watts: histMean, Host: name})
|
victims = append(victims, pcp.Victim{Watts: histMean, Host: name})
|
||||||
}
|
}
|
||||||
|
|
||||||
sort.Sort(pcp.VictimSorter(victims)) // Sort by average wattage
|
sort.Sort(pcp.VictimSorter(victims)) // Sort by average wattage.
|
||||||
|
|
||||||
// Finding the best victim to cap in a round robin manner
|
// Finding the best victim to cap in a round robin manner.
|
||||||
newVictimFound := false
|
newVictimFound := false
|
||||||
alreadyCappedHosts := []string{} // Host-names of victims that are already capped
|
alreadyCappedHosts := []string{} // Host-names of victims that are already capped.
|
||||||
for i := 0; i < len(victims); i++ {
|
for i := 0; i < len(victims); i++ {
|
||||||
// Try to pick a victim that hasn't been capped yet
|
// Try to pick a victim that hasn't been capped yet.
|
||||||
if _, ok := cappedVictims[victims[i].Host]; !ok {
|
if _, ok := cappedVictims[victims[i].Host]; !ok {
|
||||||
// If this victim can't be capped further, then we move on to find another victim
|
// If this victim can't be capped further, then we move on to find another victim.
|
||||||
if _, ok := orderCappedVictims[victims[i].Host]; ok {
|
if _, ok := orderCappedVictims[victims[i].Host]; ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// Need to cap this victim
|
// Need to cap this victim.
|
||||||
if err := rapl.Cap(victims[i].Host, "rapl", 50.0); err != nil {
|
if err := rapl.Cap(victims[i].Host, "rapl", 50.0); err != nil {
|
||||||
log.Printf("Error capping host %s", victims[i].Host)
|
log.Printf("Error capping host %s", victims[i].Host)
|
||||||
} else {
|
} else {
|
||||||
log.Printf("Capped host[%s] at %f", victims[i].Host, 50.0)
|
log.Printf("Capped host[%s] at %f", victims[i].Host, 50.0)
|
||||||
// Keeping track of this victim and it's cap value
|
// Keeping track of this victim and it's cap value.
|
||||||
cappedVictims[victims[i].Host] = 50.0
|
cappedVictims[victims[i].Host] = 50.0
|
||||||
newVictimFound = true
|
newVictimFound = true
|
||||||
// This node can be uncapped and hence adding to orderCapped
|
// This node can be uncapped and hence adding to orderCapped.
|
||||||
orderCapped = append(orderCapped, victims[i].Host)
|
orderCapped = append(orderCapped, victims[i].Host)
|
||||||
orderCappedVictims[victims[i].Host] = 50.0
|
orderCappedVictims[victims[i].Host] = 50.0
|
||||||
break // Breaking only on successful cap
|
break // Breaking only on successful cap.
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
alreadyCappedHosts = append(alreadyCappedHosts, victims[i].Host)
|
alreadyCappedHosts = append(alreadyCappedHosts, victims[i].Host)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// If no new victim found, then we need to cap the best victim among the ones that are already capped
|
// If no new victim found, then we need to cap the best victim among the ones that are already capped.
|
||||||
if !newVictimFound {
|
if !newVictimFound {
|
||||||
canCapAlreadyCappedVictim := false
|
canCapAlreadyCappedVictim := false
|
||||||
for i := 0; i < len(alreadyCappedHosts); i++ {
|
for i := 0; i < len(alreadyCappedHosts); i++ {
|
||||||
// If already capped then the host must be present in orderCappedVictims
|
// If already capped then the host must be present in orderCappedVictims.
|
||||||
capValue := orderCappedVictims[alreadyCappedHosts[i]]
|
capValue := orderCappedVictims[alreadyCappedHosts[i]]
|
||||||
// If capValue is greater than the threshold then cap, else continue
|
// If capValue is greater than the threshold then cap, else continue.
|
||||||
if capValue > constants.LowerCapLimit {
|
if capValue > constants.LowerCapLimit {
|
||||||
newCapValue := getNextCapValue(capValue, 2)
|
newCapValue := getNextCapValue(capValue, 2)
|
||||||
if err := rapl.Cap(alreadyCappedHosts[i], "rapl", newCapValue); err != nil {
|
if err := rapl.Cap(alreadyCappedHosts[i], "rapl", newCapValue); err != nil {
|
||||||
|
@ -183,14 +182,14 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref
|
||||||
} else {
|
} else {
|
||||||
// Successful cap
|
// Successful cap
|
||||||
log.Printf("Capped host[%s] at %f", alreadyCappedHosts[i], newCapValue)
|
log.Printf("Capped host[%s] at %f", alreadyCappedHosts[i], newCapValue)
|
||||||
// Checking whether this victim can be capped further
|
// Checking whether this victim can be capped further.
|
||||||
if newCapValue <= constants.LowerCapLimit {
|
if newCapValue <= constants.LowerCapLimit {
|
||||||
// Deleting victim from cappedVictims
|
// Deleting victim from cappedVictims.
|
||||||
delete(cappedVictims, alreadyCappedHosts[i])
|
delete(cappedVictims, alreadyCappedHosts[i])
|
||||||
// Updating the cap value in orderCappedVictims
|
// Updating the cap value in orderCappedVictims.
|
||||||
orderCappedVictims[alreadyCappedHosts[i]] = newCapValue
|
orderCappedVictims[alreadyCappedHosts[i]] = newCapValue
|
||||||
} else {
|
} else {
|
||||||
// Updating the cap value
|
// Updating the cap value.
|
||||||
cappedVictims[alreadyCappedHosts[i]] = newCapValue
|
cappedVictims[alreadyCappedHosts[i]] = newCapValue
|
||||||
orderCappedVictims[alreadyCappedHosts[i]] = newCapValue
|
orderCappedVictims[alreadyCappedHosts[i]] = newCapValue
|
||||||
}
|
}
|
||||||
|
@ -198,9 +197,10 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref
|
||||||
break // Breaking only on successful cap.
|
break // Breaking only on successful cap.
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Do nothing
|
// Do nothing.
|
||||||
// Continue to find another victim to cap.
|
// Continue to find another victim to cap.
|
||||||
// If cannot find any victim, then all nodes have been capped to the maximum and we stop capping at this point.
|
// If cannot find any victim, then all nodes have been
|
||||||
|
// capped to the maximum and we stop capping at this point.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !canCapAlreadyCappedVictim {
|
if !canCapAlreadyCappedVictim {
|
||||||
|
@ -213,9 +213,9 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref
|
||||||
log.Printf("Cap values of capped victims: %v", cappedVictims)
|
log.Printf("Cap values of capped victims: %v", cappedVictims)
|
||||||
log.Printf("Cap values of victims to uncap: %v", orderCappedVictims)
|
log.Printf("Cap values of victims to uncap: %v", orderCappedVictims)
|
||||||
if len(orderCapped) > 0 {
|
if len(orderCapped) > 0 {
|
||||||
// We pick the host that is capped the most to uncap
|
// We pick the host that is capped the most to uncap.
|
||||||
orderCappedToSort := utilities.GetPairList(orderCappedVictims)
|
orderCappedToSort := utilities.GetPairList(orderCappedVictims)
|
||||||
sort.Sort(orderCappedToSort) // Sorted hosts in non-decreasing order of capped states
|
sort.Sort(orderCappedToSort) // Sorted hosts in non-decreasing order of capped states.
|
||||||
hostToUncap := orderCappedToSort[0].Key
|
hostToUncap := orderCappedToSort[0].Key
|
||||||
// Uncapping the host.
|
// Uncapping the host.
|
||||||
// This is a floating point operation and might suffer from precision loss.
|
// This is a floating point operation and might suffer from precision loss.
|
||||||
|
@ -223,23 +223,23 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref
|
||||||
if err := rapl.Cap(hostToUncap, "rapl", newUncapValue); err != nil {
|
if err := rapl.Cap(hostToUncap, "rapl", newUncapValue); err != nil {
|
||||||
log.Printf("Error uncapping host[%s]", hostToUncap)
|
log.Printf("Error uncapping host[%s]", hostToUncap)
|
||||||
} else {
|
} else {
|
||||||
// Successful uncap
|
// Successful uncap.
|
||||||
log.Printf("Uncapped host[%s] to %f", hostToUncap, newUncapValue)
|
log.Printf("Uncapped host[%s] to %f", hostToUncap, newUncapValue)
|
||||||
// Can we uncap this host further. If not, then we remove its entry from orderCapped
|
// Can we uncap this host further. If not, then we remove its entry from orderCapped.
|
||||||
if newUncapValue >= 100.0 { // can compare using ==
|
if newUncapValue >= 100.0 { // Can compare using ==
|
||||||
// Deleting entry from orderCapped
|
// Deleting entry from orderCapped.
|
||||||
for i, victimHost := range orderCapped {
|
for i, victimHost := range orderCapped {
|
||||||
if victimHost == hostToUncap {
|
if victimHost == hostToUncap {
|
||||||
orderCapped = append(orderCapped[:i], orderCapped[i+1:]...)
|
orderCapped = append(orderCapped[:i], orderCapped[i+1:]...)
|
||||||
break // We are done removing host from orderCapped
|
break // We are done removing host from orderCapped.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Removing entry for host from the parallel data structure
|
// Removing entry for host from the parallel data structure.
|
||||||
delete(orderCappedVictims, hostToUncap)
|
delete(orderCappedVictims, hostToUncap)
|
||||||
// Removing entry from cappedVictims as this host is no longer capped
|
// Removing entry from cappedVictims as this host is no longer capped.
|
||||||
delete(cappedVictims, hostToUncap)
|
delete(cappedVictims, hostToUncap)
|
||||||
} else if newUncapValue > constants.LowerCapLimit { // this check is unnecessary and can be converted to 'else'
|
} else if newUncapValue > constants.LowerCapLimit { // This check is unnecessary and can be converted to 'else'.
|
||||||
// Updating the cap value
|
// Updating the cap value.
|
||||||
orderCappedVictims[hostToUncap] = newUncapValue
|
orderCappedVictims[hostToUncap] = newUncapValue
|
||||||
cappedVictims[hostToUncap] = newUncapValue
|
cappedVictims[hostToUncap] = newUncapValue
|
||||||
}
|
}
|
||||||
|
@ -268,7 +268,7 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref
|
||||||
time.Sleep(5 * time.Second)
|
time.Sleep(5 * time.Second)
|
||||||
|
|
||||||
// http://stackoverflow.com/questions/22470193/why-wont-go-kill-a-child-process-correctly
|
// http://stackoverflow.com/questions/22470193/why-wont-go-kill-a-child-process-correctly
|
||||||
// kill process and all children processes
|
// Kill process and all children processes.
|
||||||
syscall.Kill(-pgid, 15)
|
syscall.Kill(-pgid, 15)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
20
scheduler.go
20
scheduler.go
|
@ -29,7 +29,7 @@ var schedPolicyName = flag.String("schedPolicy", "first-fit", "Name of the sched
|
||||||
"Use option -listSchedPolicies to get the names of available scheduling policies")
|
"Use option -listSchedPolicies to get the names of available scheduling policies")
|
||||||
var listSchedPolicies = flag.Bool("listSchedPolicies", false, "Names of the pluaggable scheduling policies.")
|
var listSchedPolicies = flag.Bool("listSchedPolicies", false, "Names of the pluaggable scheduling policies.")
|
||||||
|
|
||||||
// Short hand args
|
// Short hand args.
|
||||||
func init() {
|
func init() {
|
||||||
flag.StringVar(master, "m", "<mesos-master>:5050", "Location of leading Mesos master (shorthand)")
|
flag.StringVar(master, "m", "<mesos-master>:5050", "Location of leading Mesos master (shorthand)")
|
||||||
flag.StringVar(tasksFile, "w", "", "JSON file containing task definitions (shorthand)")
|
flag.StringVar(tasksFile, "w", "", "JSON file containing task definitions (shorthand)")
|
||||||
|
@ -57,17 +57,17 @@ func listAllSchedulingPolicies() {
|
||||||
func main() {
|
func main() {
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
// checking to see if we need to just list the pluggable scheduling policies
|
// Checking to see if we need to just list the pluggable scheduling policies.
|
||||||
if *listSchedPolicies {
|
if *listSchedPolicies {
|
||||||
listAllSchedulingPolicies()
|
listAllSchedulingPolicies()
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// If non-default scheduling policy given,
|
// If non-default scheduling policy given,
|
||||||
// checking if scheduling policyName exists
|
// checking if scheduling policyName exists.
|
||||||
if *schedPolicyName != "first-fit" {
|
if *schedPolicyName != "first-fit" {
|
||||||
if _, ok := schedulers.Schedulers[*schedPolicyName]; !ok {
|
if _, ok := schedulers.Schedulers[*schedPolicyName]; !ok {
|
||||||
// invalid scheduling policy
|
// Invalid scheduling policy.
|
||||||
log.Println("Invalid scheduling policy given. The possible scheduling policies are:")
|
log.Println("Invalid scheduling policy given. The possible scheduling policies are:")
|
||||||
listAllSchedulingPolicies()
|
listAllSchedulingPolicies()
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
|
@ -126,10 +126,10 @@ func main() {
|
||||||
go pcp.Start(pcpLog, &recordPCP, logPrefix)
|
go pcp.Start(pcpLog, &recordPCP, logPrefix)
|
||||||
//go pcp.StartPCPLogAndExtremaDynamicCap(pcpLog, &recordPCP, logPrefix, *hiThreshold, *loThreshold)
|
//go pcp.StartPCPLogAndExtremaDynamicCap(pcpLog, &recordPCP, logPrefix, *hiThreshold, *loThreshold)
|
||||||
//go pcp.StartPCPLogAndProgressiveExtremaCap(pcpLog, &recordPCP, logPrefix, *hiThreshold, *loThreshold)
|
//go pcp.StartPCPLogAndProgressiveExtremaCap(pcpLog, &recordPCP, logPrefix, *hiThreshold, *loThreshold)
|
||||||
time.Sleep(1 * time.Second) // Take a second between starting PCP log and continuing
|
time.Sleep(1 * time.Second) // Take a second between starting PCP log and continuing.
|
||||||
|
|
||||||
// Attempt to handle SIGINT to not leave pmdumptext running
|
// Attempt to handle SIGINT to not leave pmdumptext running.
|
||||||
// Catch interrupt
|
// Catch interrupt.
|
||||||
go func() {
|
go func() {
|
||||||
c := make(chan os.Signal, 1)
|
c := make(chan os.Signal, 1)
|
||||||
signal.Notify(c, os.Interrupt, os.Kill)
|
signal.Notify(c, os.Interrupt, os.Kill)
|
||||||
|
@ -145,13 +145,13 @@ func main() {
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
|
||||||
// Signals we have scheduled every task we have
|
// Signals we have scheduled every task we have.
|
||||||
select {
|
select {
|
||||||
case <-shutdown:
|
case <-shutdown:
|
||||||
//case <-time.After(shutdownTimeout):
|
//case <-time.After(shutdownTimeout):
|
||||||
}
|
}
|
||||||
|
|
||||||
// All tasks have finished
|
// All tasks have finished.
|
||||||
select {
|
select {
|
||||||
case <-done:
|
case <-done:
|
||||||
close(pcpLog)
|
close(pcpLog)
|
||||||
|
@ -159,7 +159,7 @@ func main() {
|
||||||
//case <-time.After(shutdownTimeout):
|
//case <-time.After(shutdownTimeout):
|
||||||
}
|
}
|
||||||
|
|
||||||
// Done shutting down
|
// Done shutting down.
|
||||||
driver.Stop(false)
|
driver.Stop(false)
|
||||||
|
|
||||||
}()
|
}()
|
||||||
|
|
|
@ -24,17 +24,17 @@ type base struct {
|
||||||
classMapWatts bool
|
classMapWatts bool
|
||||||
|
|
||||||
// First set of PCP values are garbage values, signal to logger to start recording when we're
|
// First set of PCP values are garbage values, signal to logger to start recording when we're
|
||||||
// about to schedule a new task
|
// about to schedule a new task.
|
||||||
RecordPCP *bool
|
RecordPCP *bool
|
||||||
|
|
||||||
// This channel is closed when the program receives an interrupt,
|
// This channel is closed when the program receives an interrupt,
|
||||||
// signalling that the program should shut down.
|
// signalling that the program should shut down.
|
||||||
Shutdown chan struct{}
|
Shutdown chan struct{}
|
||||||
// This channel is closed after shutdown is closed, and only when all
|
// This channel is closed after shutdown is closed, and only when all
|
||||||
// outstanding tasks have been cleaned up
|
// outstanding tasks have been cleaned up.
|
||||||
Done chan struct{}
|
Done chan struct{}
|
||||||
|
|
||||||
// Controls when to shutdown pcp logging
|
// Controls when to shutdown pcp logging.
|
||||||
PCPLog chan struct{}
|
PCPLog chan struct{}
|
||||||
|
|
||||||
schedTrace *log.Logger
|
schedTrace *log.Logger
|
||||||
|
@ -42,7 +42,7 @@ type base struct {
|
||||||
|
|
||||||
func (s *base) init(opts ...schedPolicyOption) {
|
func (s *base) init(opts ...schedPolicyOption) {
|
||||||
for _, opt := range opts {
|
for _, opt := range opts {
|
||||||
// applying options
|
// Applying options.
|
||||||
if err := opt(s); err != nil {
|
if err := opt(s); err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,7 +13,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Decides if to take an offer or not
|
// Decides if to take an offer or not.
|
||||||
func (s *BinPacking) takeOffer(offer *mesos.Offer, task def.Task, totalCPU, totalRAM, totalWatts float64) bool {
|
func (s *BinPacking) takeOffer(offer *mesos.Offer, task def.Task, totalCPU, totalRAM, totalWatts float64) bool {
|
||||||
|
|
||||||
cpus, mem, watts := offerUtils.OfferAgg(offer)
|
cpus, mem, watts := offerUtils.OfferAgg(offer)
|
||||||
|
@ -22,7 +22,7 @@ func (s *BinPacking) takeOffer(offer *mesos.Offer, task def.Task, totalCPU, tota
|
||||||
|
|
||||||
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
|
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Error in determining wattsConsideration
|
// Error in determining wattsConsideration.
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
if (cpus >= (totalCPU + task.CPU)) && (mem >= (totalRAM + task.RAM)) &&
|
if (cpus >= (totalCPU + task.CPU)) && (mem >= (totalRAM + task.RAM)) &&
|
||||||
|
@ -33,13 +33,13 @@ func (s *BinPacking) takeOffer(offer *mesos.Offer, task def.Task, totalCPU, tota
|
||||||
}
|
}
|
||||||
|
|
||||||
type BinPacking struct {
|
type BinPacking struct {
|
||||||
base // Type embedded to inherit common functions
|
base // Type embedded to inherit common functions.
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialization
|
// Initialization.
|
||||||
func (s *BinPacking) init(opts ...schedPolicyOption) {
|
func (s *BinPacking) init(opts ...schedPolicyOption) {
|
||||||
s.base.init(opts...)
|
s.base.init(opts...)
|
||||||
// sorting the tasks based on watts
|
// Sorting the tasks based on watts.
|
||||||
def.SortTasks(s.tasks, def.SortByWatts)
|
def.SortTasks(s.tasks, def.SortByWatts)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -48,17 +48,17 @@ func (s *BinPacking) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo
|
||||||
s.tasksCreated++
|
s.tasksCreated++
|
||||||
|
|
||||||
if !*s.RecordPCP {
|
if !*s.RecordPCP {
|
||||||
// Turn on logging
|
// Turn on logging.
|
||||||
*s.RecordPCP = true
|
*s.RecordPCP = true
|
||||||
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
|
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
|
||||||
}
|
}
|
||||||
|
|
||||||
// If this is our first time running into this Agent
|
// If this is our first time running into this Agent.
|
||||||
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
|
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
|
||||||
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
|
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add task to list of tasks running on node
|
// Add task to list of tasks running on node.
|
||||||
s.running[offer.GetSlaveId().GoString()][taskName] = true
|
s.running[offer.GetSlaveId().GoString()][taskName] = true
|
||||||
|
|
||||||
resources := []*mesos.Resource{
|
resources := []*mesos.Resource{
|
||||||
|
@ -71,7 +71,7 @@ func (s *BinPacking) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo
|
||||||
log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider)
|
log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider)
|
||||||
resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider))
|
resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider))
|
||||||
} else {
|
} else {
|
||||||
// Error in determining wattsToConsider
|
// Error in determining wattsToConsider.
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -121,17 +121,17 @@ func (s *BinPacking) ResourceOffers(driver sched.SchedulerDriver, offers []*meso
|
||||||
task := s.tasks[i]
|
task := s.tasks[i]
|
||||||
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
|
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Error in determining wattsConsideration
|
// Error in determining wattsConsideration.
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Don't take offer if it doesn't match our task's host requirement
|
// Don't take offer if it doesn't match our task's host requirement.
|
||||||
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
|
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
for *task.Instances > 0 {
|
for *task.Instances > 0 {
|
||||||
// Does the task fit
|
// Does the task fit.
|
||||||
if s.takeOffer(offer, task, totalCPU, totalRAM, totalWatts) {
|
if s.takeOffer(offer, task, totalCPU, totalRAM, totalWatts) {
|
||||||
|
|
||||||
offerTaken = true
|
offerTaken = true
|
||||||
|
@ -148,7 +148,7 @@ func (s *BinPacking) ResourceOffers(driver sched.SchedulerDriver, offers []*meso
|
||||||
*task.Instances--
|
*task.Instances--
|
||||||
|
|
||||||
if *task.Instances <= 0 {
|
if *task.Instances <= 0 {
|
||||||
// All instances of task have been scheduled, remove it
|
// All instances of task have been scheduled, remove it.
|
||||||
s.tasks = append(s.tasks[:i], s.tasks[i+1:]...)
|
s.tasks = append(s.tasks[:i], s.tasks[i+1:]...)
|
||||||
|
|
||||||
if len(s.tasks) <= 0 {
|
if len(s.tasks) <= 0 {
|
||||||
|
@ -157,7 +157,7 @@ func (s *BinPacking) ResourceOffers(driver sched.SchedulerDriver, offers []*meso
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
break // Continue on to next offer
|
break // Continue on to next offer.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -167,7 +167,7 @@ func (s *BinPacking) ResourceOffers(driver sched.SchedulerDriver, offers []*meso
|
||||||
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
|
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
// If there was no match for the task
|
// If there was no match for the task.
|
||||||
fmt.Println("There is not enough resources to launch a task:")
|
fmt.Println("There is not enough resources to launch a task:")
|
||||||
cpus, mem, watts := offerUtils.OfferAgg(offer)
|
cpus, mem, watts := offerUtils.OfferAgg(offer)
|
||||||
|
|
||||||
|
|
|
@ -13,7 +13,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Decides if to take an offer or not
|
// Decides if to take an offer or not.
|
||||||
func (s *FirstFit) takeOffer(offer *mesos.Offer, task def.Task) bool {
|
func (s *FirstFit) takeOffer(offer *mesos.Offer, task def.Task) bool {
|
||||||
|
|
||||||
cpus, mem, watts := offerUtils.OfferAgg(offer)
|
cpus, mem, watts := offerUtils.OfferAgg(offer)
|
||||||
|
@ -22,7 +22,7 @@ func (s *FirstFit) takeOffer(offer *mesos.Offer, task def.Task) bool {
|
||||||
|
|
||||||
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
|
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Error in determining wattsConsideration
|
// Error in determining wattsConsideration.
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
if cpus >= task.CPU && mem >= task.RAM && (!s.wattsAsAResource || watts >= wattsConsideration) {
|
if cpus >= task.CPU && mem >= task.RAM && (!s.wattsAsAResource || watts >= wattsConsideration) {
|
||||||
|
@ -32,12 +32,12 @@ func (s *FirstFit) takeOffer(offer *mesos.Offer, task def.Task) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// elektronScheduler implements the Scheduler interface
|
// Elektron scheduler implements the Scheduler interface.
|
||||||
type FirstFit struct {
|
type FirstFit struct {
|
||||||
base // Type embedded to inherit common functions
|
base // Type embedded to inherit common functions
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialization
|
// Initialization.
|
||||||
func (s *FirstFit) init(opts ...schedPolicyOption) {
|
func (s *FirstFit) init(opts ...schedPolicyOption) {
|
||||||
s.base.init(opts...)
|
s.base.init(opts...)
|
||||||
}
|
}
|
||||||
|
@ -47,17 +47,17 @@ func (s *FirstFit) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
|
||||||
s.tasksCreated++
|
s.tasksCreated++
|
||||||
|
|
||||||
if !*s.RecordPCP {
|
if !*s.RecordPCP {
|
||||||
// Turn on logging
|
// Turn on logging.
|
||||||
*s.RecordPCP = true
|
*s.RecordPCP = true
|
||||||
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
|
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts.
|
||||||
}
|
}
|
||||||
|
|
||||||
// If this is our first time running into this Agent
|
// If this is our first time running into this Agent.
|
||||||
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
|
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
|
||||||
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
|
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add task to list of tasks running on node
|
// Add task to list of tasks running on node.
|
||||||
s.running[offer.GetSlaveId().GoString()][taskName] = true
|
s.running[offer.GetSlaveId().GoString()][taskName] = true
|
||||||
|
|
||||||
resources := []*mesos.Resource{
|
resources := []*mesos.Resource{
|
||||||
|
@ -70,7 +70,7 @@ func (s *FirstFit) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
|
||||||
log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider)
|
log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider)
|
||||||
resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider))
|
resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider))
|
||||||
} else {
|
} else {
|
||||||
// Error in determining wattsConsideration
|
// Error in determining wattsConsideration.
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -112,18 +112,17 @@ func (s *FirstFit) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.
|
||||||
|
|
||||||
tasks := []*mesos.TaskInfo{}
|
tasks := []*mesos.TaskInfo{}
|
||||||
|
|
||||||
// First fit strategy
|
// First fit strategy.
|
||||||
|
|
||||||
offerTaken := false
|
offerTaken := false
|
||||||
for i := 0; i < len(s.tasks); i++ {
|
for i := 0; i < len(s.tasks); i++ {
|
||||||
task := s.tasks[i]
|
task := s.tasks[i]
|
||||||
|
|
||||||
// Don't take offer if it doesn't match our task's host requirement
|
// Don't take offer if it doesn't match our task's host requirement.
|
||||||
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
|
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Decision to take the offer or not
|
// Decision to take the offer or not.
|
||||||
if s.takeOffer(offer, task) {
|
if s.takeOffer(offer, task) {
|
||||||
|
|
||||||
log.Println("Co-Located with: ")
|
log.Println("Co-Located with: ")
|
||||||
|
@ -142,7 +141,7 @@ func (s *FirstFit) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.
|
||||||
*task.Instances--
|
*task.Instances--
|
||||||
|
|
||||||
if *task.Instances <= 0 {
|
if *task.Instances <= 0 {
|
||||||
// All instances of task have been scheduled, remove it
|
// All instances of task have been scheduled, remove it.
|
||||||
s.tasks[i] = s.tasks[len(s.tasks)-1]
|
s.tasks[i] = s.tasks[len(s.tasks)-1]
|
||||||
s.tasks = s.tasks[:len(s.tasks)-1]
|
s.tasks = s.tasks[:len(s.tasks)-1]
|
||||||
|
|
||||||
|
@ -151,11 +150,11 @@ func (s *FirstFit) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.
|
||||||
close(s.Shutdown)
|
close(s.Shutdown)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
break // Offer taken, move on
|
break // Offer taken, move on.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If there was no match for the task
|
// If there was no match for the task.
|
||||||
if !offerTaken {
|
if !offerTaken {
|
||||||
fmt.Println("There is not enough resources to launch a task:")
|
fmt.Println("There is not enough resources to launch a task:")
|
||||||
cpus, mem, watts := offerUtils.OfferAgg(offer)
|
cpus, mem, watts := offerUtils.OfferAgg(offer)
|
||||||
|
|
|
@ -18,7 +18,7 @@ func coLocated(tasks map[string]bool) {
|
||||||
fmt.Println("---------------------")
|
fmt.Println("---------------------")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the powerClass of the given hostname
|
// Get the powerClass of the given hostname.
|
||||||
func hostToPowerClass(hostName string) string {
|
func hostToPowerClass(hostName string) string {
|
||||||
for powerClass, hosts := range constants.PowerClasses {
|
for powerClass, hosts := range constants.PowerClasses {
|
||||||
if _, ok := hosts[hostName]; ok {
|
if _, ok := hosts[hostName]; ok {
|
||||||
|
@ -28,7 +28,7 @@ func hostToPowerClass(hostName string) string {
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
// scheduler policy options to help initialize schedulers
|
// Scheduler policy options to help initialize schedulers.
|
||||||
type schedPolicyOption func(e ElectronScheduler) error
|
type schedPolicyOption func(e ElectronScheduler) error
|
||||||
|
|
||||||
func WithTasks(ts []def.Task) schedPolicyOption {
|
func WithTasks(ts []def.Task) schedPolicyOption {
|
||||||
|
|
|
@ -13,7 +13,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Decides if to take an offer or not
|
// Decides if to take an offer or not.
|
||||||
func (s *MaxGreedyMins) takeOffer(offer *mesos.Offer, task def.Task,
|
func (s *MaxGreedyMins) takeOffer(offer *mesos.Offer, task def.Task,
|
||||||
totalCPU, totalRAM, totalWatts float64) bool {
|
totalCPU, totalRAM, totalWatts float64) bool {
|
||||||
|
|
||||||
|
@ -23,7 +23,7 @@ func (s *MaxGreedyMins) takeOffer(offer *mesos.Offer, task def.Task,
|
||||||
|
|
||||||
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
|
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Error in determining wattsConsideration
|
// Error in determining wattsConsideration.
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
if (cpus >= (totalCPU + task.CPU)) && (mem >= (totalRAM + task.RAM)) &&
|
if (cpus >= (totalCPU + task.CPU)) && (mem >= (totalRAM + task.RAM)) &&
|
||||||
|
@ -34,10 +34,10 @@ func (s *MaxGreedyMins) takeOffer(offer *mesos.Offer, task def.Task,
|
||||||
}
|
}
|
||||||
|
|
||||||
type MaxGreedyMins struct {
|
type MaxGreedyMins struct {
|
||||||
base //Type embedding to inherit common functions
|
base //Type embedding to inherit common functions.
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialization
|
// Initialization.
|
||||||
func (s *MaxGreedyMins) init(opts ...schedPolicyOption) {
|
func (s *MaxGreedyMins) init(opts ...schedPolicyOption) {
|
||||||
s.base.init(opts...)
|
s.base.init(opts...)
|
||||||
}
|
}
|
||||||
|
@ -46,19 +46,19 @@ func (s *MaxGreedyMins) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskIn
|
||||||
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
|
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
|
||||||
s.tasksCreated++
|
s.tasksCreated++
|
||||||
|
|
||||||
// Start recording only when we're creating the first task
|
// Start recording only when we're creating the first task.
|
||||||
if !*s.RecordPCP {
|
if !*s.RecordPCP {
|
||||||
// Turn on logging
|
// Turn on logging
|
||||||
*s.RecordPCP = true
|
*s.RecordPCP = true
|
||||||
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
|
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts.
|
||||||
}
|
}
|
||||||
|
|
||||||
// If this is our first time running into this Agent
|
// If this is our first time running into this Agent.
|
||||||
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
|
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
|
||||||
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
|
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add task to list of tasks running on node
|
// Add task to list of tasks running on node.
|
||||||
s.running[offer.GetSlaveId().GoString()][taskName] = true
|
s.running[offer.GetSlaveId().GoString()][taskName] = true
|
||||||
|
|
||||||
resources := []*mesos.Resource{
|
resources := []*mesos.Resource{
|
||||||
|
@ -71,7 +71,7 @@ func (s *MaxGreedyMins) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskIn
|
||||||
log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider)
|
log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider)
|
||||||
resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider))
|
resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider))
|
||||||
} else {
|
} else {
|
||||||
// Error in determining wattsConsideration
|
// Error in determining wattsConsideration.
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -90,7 +90,7 @@ func (s *MaxGreedyMins) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskIn
|
||||||
Type: mesos.ContainerInfo_DOCKER.Enum(),
|
Type: mesos.ContainerInfo_DOCKER.Enum(),
|
||||||
Docker: &mesos.ContainerInfo_DockerInfo{
|
Docker: &mesos.ContainerInfo_DockerInfo{
|
||||||
Image: proto.String(task.Image),
|
Image: proto.String(task.Image),
|
||||||
Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated
|
Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated.
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -107,7 +107,7 @@ func (s *MaxGreedyMins) CheckFit(
|
||||||
totalRAM *float64,
|
totalRAM *float64,
|
||||||
totalWatts *float64) (bool, *mesos.TaskInfo) {
|
totalWatts *float64) (bool, *mesos.TaskInfo) {
|
||||||
|
|
||||||
// Does the task fit
|
// Does the task fit.
|
||||||
if s.takeOffer(offer, task, *totalCPU, *totalRAM, *totalWatts) {
|
if s.takeOffer(offer, task, *totalCPU, *totalRAM, *totalWatts) {
|
||||||
|
|
||||||
*totalWatts += wattsConsideration
|
*totalWatts += wattsConsideration
|
||||||
|
@ -123,7 +123,7 @@ func (s *MaxGreedyMins) CheckFit(
|
||||||
*task.Instances--
|
*task.Instances--
|
||||||
|
|
||||||
if *task.Instances <= 0 {
|
if *task.Instances <= 0 {
|
||||||
// All instances of task have been scheduled, remove it
|
// All instances of task have been scheduled, remove it.
|
||||||
s.tasks = append(s.tasks[:i], s.tasks[i+1:]...)
|
s.tasks = append(s.tasks[:i], s.tasks[i+1:]...)
|
||||||
|
|
||||||
if len(s.tasks) <= 0 {
|
if len(s.tasks) <= 0 {
|
||||||
|
@ -160,20 +160,20 @@ func (s *MaxGreedyMins) ResourceOffers(driver sched.SchedulerDriver, offers []*m
|
||||||
totalCPU := 0.0
|
totalCPU := 0.0
|
||||||
totalRAM := 0.0
|
totalRAM := 0.0
|
||||||
|
|
||||||
// Assumes s.tasks is ordered in non-decreasing median max peak order
|
// Assumes s.tasks is ordered in non-decreasing median max peak order.
|
||||||
|
|
||||||
// Attempt to schedule a single instance of the heaviest workload available first
|
// Attempt to schedule a single instance of the heaviest workload available first.
|
||||||
// Start from the back until one fits
|
// Start from the back until one fits.
|
||||||
for i := len(s.tasks) - 1; i >= 0; i-- {
|
for i := len(s.tasks) - 1; i >= 0; i-- {
|
||||||
|
|
||||||
task := s.tasks[i]
|
task := s.tasks[i]
|
||||||
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
|
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Error in determining wattsConsideration
|
// Error in determining wattsConsideration.
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Don't take offer if it doesn't match our task's host requirement
|
// Don't take offer if it doesn't match our task's host requirement.
|
||||||
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
|
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -189,16 +189,16 @@ func (s *MaxGreedyMins) ResourceOffers(driver sched.SchedulerDriver, offers []*m
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pack the rest of the offer with the smallest tasks
|
// Pack the rest of the offer with the smallest tasks.
|
||||||
for i := 0; i < len(s.tasks); i++ {
|
for i := 0; i < len(s.tasks); i++ {
|
||||||
task := s.tasks[i]
|
task := s.tasks[i]
|
||||||
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
|
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Error in determining wattsConsideration
|
// Error in determining wattsConsideration.
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Don't take offer if it doesn't match our task's host requirement
|
// Don't take offer if it doesn't match our task's host requirement.
|
||||||
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
|
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -212,7 +212,7 @@ func (s *MaxGreedyMins) ResourceOffers(driver sched.SchedulerDriver, offers []*m
|
||||||
offerTaken = true
|
offerTaken = true
|
||||||
tasks = append(tasks, taskToSchedule)
|
tasks = append(tasks, taskToSchedule)
|
||||||
} else {
|
} else {
|
||||||
break // Continue on to next task
|
break // Continue on to next task.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -222,7 +222,7 @@ func (s *MaxGreedyMins) ResourceOffers(driver sched.SchedulerDriver, offers []*m
|
||||||
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
|
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
// If there was no match for the task
|
// If there was no match for the task.
|
||||||
fmt.Println("There is not enough resources to launch a task:")
|
fmt.Println("There is not enough resources to launch a task:")
|
||||||
cpus, mem, watts := offerUtils.OfferAgg(offer)
|
cpus, mem, watts := offerUtils.OfferAgg(offer)
|
||||||
|
|
||||||
|
|
|
@ -13,7 +13,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Decides if to take an offer or not
|
// Decides if to take an offer or not.
|
||||||
func (s *MaxMin) takeOffer(offer *mesos.Offer, task def.Task,
|
func (s *MaxMin) takeOffer(offer *mesos.Offer, task def.Task,
|
||||||
totalCPU, totalRAM, totalWatts float64) bool {
|
totalCPU, totalRAM, totalWatts float64) bool {
|
||||||
|
|
||||||
|
@ -23,7 +23,7 @@ func (s *MaxMin) takeOffer(offer *mesos.Offer, task def.Task,
|
||||||
|
|
||||||
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
|
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Error in determining wattsConsideration
|
// Error in determining wattsConsideration.
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
if (cpus >= (totalCPU + task.CPU)) && (mem >= (totalRAM + task.RAM)) &&
|
if (cpus >= (totalCPU + task.CPU)) && (mem >= (totalRAM + task.RAM)) &&
|
||||||
|
@ -34,10 +34,10 @@ func (s *MaxMin) takeOffer(offer *mesos.Offer, task def.Task,
|
||||||
}
|
}
|
||||||
|
|
||||||
type MaxMin struct {
|
type MaxMin struct {
|
||||||
base //Type embedding to inherit common functions
|
base //Type embedding to inherit common functions.
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialization
|
// Initialization.
|
||||||
func (s *MaxMin) init(opts ...schedPolicyOption) {
|
func (s *MaxMin) init(opts ...schedPolicyOption) {
|
||||||
s.base.init(opts...)
|
s.base.init(opts...)
|
||||||
}
|
}
|
||||||
|
@ -46,19 +46,19 @@ func (s *MaxMin) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
|
||||||
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
|
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
|
||||||
s.tasksCreated++
|
s.tasksCreated++
|
||||||
|
|
||||||
// Start recording only when we're creating the first task
|
// Start recording only when we're creating the first task.
|
||||||
if !*s.RecordPCP {
|
if !*s.RecordPCP {
|
||||||
// Turn on logging
|
// Turn on logging.
|
||||||
*s.RecordPCP = true
|
*s.RecordPCP = true
|
||||||
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
|
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts.
|
||||||
}
|
}
|
||||||
|
|
||||||
// If this is our first time running into this Agent
|
// If this is our first time running into this Agent.
|
||||||
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
|
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
|
||||||
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
|
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add task to list of tasks running on node
|
// Add task to list of tasks running on node.
|
||||||
s.running[offer.GetSlaveId().GoString()][taskName] = true
|
s.running[offer.GetSlaveId().GoString()][taskName] = true
|
||||||
|
|
||||||
resources := []*mesos.Resource{
|
resources := []*mesos.Resource{
|
||||||
|
@ -71,7 +71,7 @@ func (s *MaxMin) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
|
||||||
log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider)
|
log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider)
|
||||||
resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider))
|
resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider))
|
||||||
} else {
|
} else {
|
||||||
// Error in determining wattsConsideration
|
// Error in determining wattsConsideration.
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -90,7 +90,7 @@ func (s *MaxMin) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
|
||||||
Type: mesos.ContainerInfo_DOCKER.Enum(),
|
Type: mesos.ContainerInfo_DOCKER.Enum(),
|
||||||
Docker: &mesos.ContainerInfo_DockerInfo{
|
Docker: &mesos.ContainerInfo_DockerInfo{
|
||||||
Image: proto.String(task.Image),
|
Image: proto.String(task.Image),
|
||||||
Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated
|
Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated.
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -107,7 +107,7 @@ func (s *MaxMin) CheckFit(
|
||||||
totalRAM *float64,
|
totalRAM *float64,
|
||||||
totalWatts *float64) (bool, *mesos.TaskInfo) {
|
totalWatts *float64) (bool, *mesos.TaskInfo) {
|
||||||
|
|
||||||
// Does the task fit
|
// Does the task fit.
|
||||||
if s.takeOffer(offer, task, *totalCPU, *totalRAM, *totalWatts) {
|
if s.takeOffer(offer, task, *totalCPU, *totalRAM, *totalWatts) {
|
||||||
|
|
||||||
*totalWatts += wattsConsideration
|
*totalWatts += wattsConsideration
|
||||||
|
@ -123,7 +123,7 @@ func (s *MaxMin) CheckFit(
|
||||||
*task.Instances--
|
*task.Instances--
|
||||||
|
|
||||||
if *task.Instances <= 0 {
|
if *task.Instances <= 0 {
|
||||||
// All instances of task have been scheduled, remove it
|
// All instances of task have been scheduled, remove it.
|
||||||
s.tasks = append(s.tasks[:i], s.tasks[i+1:]...)
|
s.tasks = append(s.tasks[:i], s.tasks[i+1:]...)
|
||||||
|
|
||||||
if len(s.tasks) <= 0 {
|
if len(s.tasks) <= 0 {
|
||||||
|
@ -160,17 +160,17 @@ func (s *MaxMin) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Of
|
||||||
totalCPU := 0.0
|
totalCPU := 0.0
|
||||||
totalRAM := 0.0
|
totalRAM := 0.0
|
||||||
|
|
||||||
// Assumes s.tasks is ordered in non-decreasing median max peak order
|
// Assumes s.tasks is ordered in non-decreasing median max peak order.
|
||||||
|
|
||||||
// Attempt to schedule a single instance of the heaviest workload available first
|
// Attempt to schedule a single instance of the heaviest workload available first.
|
||||||
// Start from the back until one fits
|
// Start from the back until one fits.
|
||||||
|
|
||||||
direction := false // True = Min Max, False = Max Min
|
direction := false // True = Min Max, False = Max Min.
|
||||||
var index int
|
var index int
|
||||||
start := true // if false then index has changed and need to keep it that way
|
start := true // If false then index has changed and need to keep it that way.
|
||||||
for i := 0; i < len(s.tasks); i++ {
|
for i := 0; i < len(s.tasks); i++ {
|
||||||
// we need to pick a min task or a max task
|
// We need to pick a min task or a max task
|
||||||
// depending on the value of direction
|
// depending on the value of direction.
|
||||||
if direction && start {
|
if direction && start {
|
||||||
index = 0
|
index = 0
|
||||||
} else if start {
|
} else if start {
|
||||||
|
@ -180,11 +180,11 @@ func (s *MaxMin) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Of
|
||||||
|
|
||||||
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
|
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Error in determining wattsConsideration
|
// Error in determining wattsConsideration.
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Don't take offer it is doesn't match our task's host requirement
|
// Don't take offer it is doesn't match our task's host requirement.
|
||||||
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
|
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -196,13 +196,13 @@ func (s *MaxMin) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Of
|
||||||
if taken {
|
if taken {
|
||||||
offerTaken = true
|
offerTaken = true
|
||||||
tasks = append(tasks, taskToSchedule)
|
tasks = append(tasks, taskToSchedule)
|
||||||
// Need to change direction and set start to true
|
// Need to change direction and set start to true.
|
||||||
// Setting start to true would ensure that index be set accurately again
|
// Setting start to true would ensure that index be set accurately again.
|
||||||
direction = !direction
|
direction = !direction
|
||||||
start = true
|
start = true
|
||||||
i--
|
i--
|
||||||
} else {
|
} else {
|
||||||
// Need to move index depending on the value of direction
|
// Need to move index depending on the value of direction.
|
||||||
if direction {
|
if direction {
|
||||||
index++
|
index++
|
||||||
start = false
|
start = false
|
||||||
|
|
|
@ -10,7 +10,7 @@ const (
|
||||||
mm = "max-min"
|
mm = "max-min"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Scheduler class factory
|
// Scheduler class factory.
|
||||||
var Schedulers map[string]scheduler.Scheduler = map[string]scheduler.Scheduler{
|
var Schedulers map[string]scheduler.Scheduler = map[string]scheduler.Scheduler{
|
||||||
ff: &FirstFit{base: base{}},
|
ff: &FirstFit{base: base{}},
|
||||||
bp: &BinPacking{base: base{}},
|
bp: &BinPacking{base: base{}},
|
||||||
|
@ -18,7 +18,7 @@ var Schedulers map[string]scheduler.Scheduler = map[string]scheduler.Scheduler{
|
||||||
mm: &MaxMin{base: base{}},
|
mm: &MaxMin{base: base{}},
|
||||||
}
|
}
|
||||||
|
|
||||||
// build the scheduling policy with the options being applied
|
// Build the scheduling policy with the options being applied.
|
||||||
func BuildSchedPolicy(s scheduler.Scheduler, opts ...schedPolicyOption) {
|
func BuildSchedPolicy(s scheduler.Scheduler, opts ...schedPolicyOption) {
|
||||||
s.(ElectronScheduler).init(opts...)
|
s.(ElectronScheduler).init(opts...)
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,7 +24,7 @@ func OfferAgg(offer *mesos.Offer) (float64, float64, float64) {
|
||||||
return cpus, mem, watts
|
return cpus, mem, watts
|
||||||
}
|
}
|
||||||
|
|
||||||
// Determine the power class of the host in the offer
|
// Determine the power class of the host in the offer.
|
||||||
func PowerClass(offer *mesos.Offer) string {
|
func PowerClass(offer *mesos.Offer) string {
|
||||||
var powerClass string
|
var powerClass string
|
||||||
for _, attr := range offer.GetAttributes() {
|
for _, attr := range offer.GetAttributes() {
|
||||||
|
@ -36,7 +36,7 @@ func PowerClass(offer *mesos.Offer) string {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements the sort.Sort interface to sort Offers based on CPU.
|
// Implements the sort.Sort interface to sort Offers based on CPU.
|
||||||
// TODO: Have a generic sorter that sorts based on a defined requirement (CPU, RAM, DISK or Watts)
|
// TODO: Have a generic sorter that sorts based on a defined requirement (CPU, RAM, DISK or Watts).
|
||||||
type OffersSorter []*mesos.Offer
|
type OffersSorter []*mesos.Offer
|
||||||
|
|
||||||
func (offersSorter OffersSorter) Len() int {
|
func (offersSorter OffersSorter) Len() int {
|
||||||
|
@ -48,9 +48,9 @@ func (offersSorter OffersSorter) Swap(i, j int) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (offersSorter OffersSorter) Less(i, j int) bool {
|
func (offersSorter OffersSorter) Less(i, j int) bool {
|
||||||
// getting CPU resource availability of offersSorter[i]
|
// Getting CPU resource availability of offersSorter[i].
|
||||||
cpu1, _, _ := OfferAgg(offersSorter[i])
|
cpu1, _, _ := OfferAgg(offersSorter[i])
|
||||||
// getting CPU resource availability of offersSorter[j]
|
// Getting CPU resource availability of offersSorter[j].
|
||||||
cpu2, _, _ := OfferAgg(offersSorter[j])
|
cpu2, _, _ := OfferAgg(offersSorter[j])
|
||||||
return cpu1 <= cpu2
|
return cpu1 <= cpu2
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,7 +14,7 @@ import (
|
||||||
type Interface interface {
|
type Interface interface {
|
||||||
// Value to use for running average calculation.
|
// Value to use for running average calculation.
|
||||||
Val() float64
|
Val() float64
|
||||||
// Unique ID
|
// Unique ID.
|
||||||
ID() string
|
ID() string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -24,10 +24,10 @@ type runningAverageCalculator struct {
|
||||||
currentSum float64
|
currentSum float64
|
||||||
}
|
}
|
||||||
|
|
||||||
// singleton instance
|
// Singleton instance.
|
||||||
var racSingleton *runningAverageCalculator
|
var racSingleton *runningAverageCalculator
|
||||||
|
|
||||||
// return single instance
|
// Return single instance.
|
||||||
func getInstance(curSum float64, wSize int) *runningAverageCalculator {
|
func getInstance(curSum float64, wSize int) *runningAverageCalculator {
|
||||||
if racSingleton == nil {
|
if racSingleton == nil {
|
||||||
racSingleton = &runningAverageCalculator{
|
racSingleton = &runningAverageCalculator{
|
||||||
|
@ -51,12 +51,12 @@ func (rac *runningAverageCalculator) calculate(data Interface) float64 {
|
||||||
rac.considerationWindow.PushBack(data)
|
rac.considerationWindow.PushBack(data)
|
||||||
rac.currentSum += data.Val()
|
rac.currentSum += data.Val()
|
||||||
} else {
|
} else {
|
||||||
// removing the element at the front of the window.
|
// Removing the element at the front of the window.
|
||||||
elementToRemove := rac.considerationWindow.Front()
|
elementToRemove := rac.considerationWindow.Front()
|
||||||
rac.currentSum -= elementToRemove.Value.(Interface).Val()
|
rac.currentSum -= elementToRemove.Value.(Interface).Val()
|
||||||
rac.considerationWindow.Remove(elementToRemove)
|
rac.considerationWindow.Remove(elementToRemove)
|
||||||
|
|
||||||
// adding new element to the window
|
// Adding new element to the window.
|
||||||
rac.considerationWindow.PushBack(data)
|
rac.considerationWindow.PushBack(data)
|
||||||
rac.currentSum += data.Val()
|
rac.currentSum += data.Val()
|
||||||
}
|
}
|
||||||
|
@ -65,7 +65,7 @@ func (rac *runningAverageCalculator) calculate(data Interface) float64 {
|
||||||
|
|
||||||
/*
|
/*
|
||||||
If element with given ID present in the window, then remove it and return (removeElement, nil).
|
If element with given ID present in the window, then remove it and return (removeElement, nil).
|
||||||
Else, return (nil, error)
|
Else, return (nil, error).
|
||||||
*/
|
*/
|
||||||
func (rac *runningAverageCalculator) removeFromWindow(id string) (interface{}, error) {
|
func (rac *runningAverageCalculator) removeFromWindow(id string) (interface{}, error) {
|
||||||
for element := rac.considerationWindow.Front(); element != nil; element = element.Next() {
|
for element := rac.considerationWindow.Front(); element != nil; element = element.Next() {
|
||||||
|
@ -86,7 +86,7 @@ func Calc(data Interface, windowSize int) float64 {
|
||||||
|
|
||||||
// Remove element from the window if it is present.
|
// Remove element from the window if it is present.
|
||||||
func Remove(id string) (interface{}, error) {
|
func Remove(id string) (interface{}, error) {
|
||||||
// checking if racSingleton has been instantiated
|
// Checking if racSingleton has been instantiated.
|
||||||
if racSingleton == nil {
|
if racSingleton == nil {
|
||||||
return nil, errors.New("Error: Not instantiated. Please call Init() to instantiate.")
|
return nil, errors.New("Error: Not instantiated. Please call Init() to instantiate.")
|
||||||
} else {
|
} else {
|
||||||
|
@ -94,9 +94,9 @@ func Remove(id string) (interface{}, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// initialize the parameters of the running average calculator
|
// Initialize the parameters of the running average calculator.
|
||||||
func Init() {
|
func Init() {
|
||||||
// checking to see if racSingleton needs top be instantiated
|
// Checking to see if racSingleton needs top be instantiated.
|
||||||
if racSingleton == nil {
|
if racSingleton == nil {
|
||||||
racSingleton = getInstance(0.0, 0)
|
racSingleton = getInstance(0.0, 0)
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,7 +2,7 @@ package utilities
|
||||||
|
|
||||||
/*
|
/*
|
||||||
The Pair and PairList have been taken from google groups forum,
|
The Pair and PairList have been taken from google groups forum,
|
||||||
https://groups.google.com/forum/#!topic/golang-nuts/FT7cjmcL7gw
|
https://groups.google.com/forum/#!topic/golang-nuts/FT7cjmcL7gw.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
// Utility struct that helps in sorting map[string]float64 by value.
|
// Utility struct that helps in sorting map[string]float64 by value.
|
||||||
|
@ -14,7 +14,7 @@ type Pair struct {
|
||||||
// A slice of pairs that implements the sort.Interface to sort by value.
|
// A slice of pairs that implements the sort.Interface to sort by value.
|
||||||
type PairList []Pair
|
type PairList []Pair
|
||||||
|
|
||||||
// Convert map[string]float64 to PairList
|
// Convert map[string]float64 to PairList.
|
||||||
func GetPairList(m map[string]float64) PairList {
|
func GetPairList(m map[string]float64) PairList {
|
||||||
pl := PairList{}
|
pl := PairList{}
|
||||||
for k, v := range m {
|
for k, v := range m {
|
||||||
|
@ -23,17 +23,17 @@ func GetPairList(m map[string]float64) PairList {
|
||||||
return pl
|
return pl
|
||||||
}
|
}
|
||||||
|
|
||||||
// Swap pairs in the PairList
|
// Swap pairs in the PairList.
|
||||||
func (plist PairList) Swap(i, j int) {
|
func (plist PairList) Swap(i, j int) {
|
||||||
plist[i], plist[j] = plist[j], plist[i]
|
plist[i], plist[j] = plist[j], plist[i]
|
||||||
}
|
}
|
||||||
|
|
||||||
// function to return the length of the pairlist.
|
// Get the length of the pairlist.
|
||||||
func (plist PairList) Len() int {
|
func (plist PairList) Len() int {
|
||||||
return len(plist)
|
return len(plist)
|
||||||
}
|
}
|
||||||
|
|
||||||
// function to compare two elements in pairlist.
|
// Compare two elements in pairlist.
|
||||||
func (plist PairList) Less(i, j int) bool {
|
func (plist PairList) Less(i, j int) bool {
|
||||||
return plist[i].Value < plist[j].Value
|
return plist[i].Value < plist[j].Value
|
||||||
}
|
}
|
||||||
|
|
Reference in a new issue