From 9dddc38cade84c3a039560fc37f0e6b2a0f32d48 Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Fri, 24 Feb 2017 20:52:59 -0500 Subject: [PATCH 1/9] created a utility for tasks that allows for KMeans clustering based on watts resource requirement --- def/taskUtils.go | 114 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 114 insertions(+) create mode 100644 def/taskUtils.go diff --git a/def/taskUtils.go b/def/taskUtils.go new file mode 100644 index 0000000..2c003ac --- /dev/null +++ b/def/taskUtils.go @@ -0,0 +1,114 @@ +package def + +import ( + "github.com/mdesenfants/gokmeans" + "sort" +) + +// The watts consumption observations are taken into consideration. +func getObservations(tasks []Task) []gokmeans.Node { + observations := []gokmeans.Node{} + for i := 0; i < len(tasks); i++ { + task := tasks[i] + // If observations present for the power-classes, then using it + if task.ClassToWatts != nil { + observation := gokmeans.Node{} + for _, watts := range task.ClassToWatts { + observation = append(observation, watts) + } + observations = append(observations, observation) + } else { + // Using the watts attribute alone + observations = append(observations, gokmeans.Node{task.Watts}) + } + } + return observations +} + +func clusterSize(tasks []Task) float64 { + size := 0.0 + for _, task := range tasks { + if task.ClassToWatts != nil { + for _, powerClassWatts := range task.ClassToWatts { + size += powerClassWatts + } + } else { + size += task.Watts + } + } + return size +} + +// information about a cluster of tasks +type TaskCluster struct { + clusterIndex int + tasks []Task + sizeScore int // how many other clusters is this one bigger than (in the current workload) +} + +// Sorting TaskClusters based on sizeScore +type TaskClusterSorter []TaskCluster + +func (slice TaskClusterSorter) Len() int { + return len(slice) +} + +func (slice TaskClusterSorter) Less(i, j int) bool { + return slice[i].sizeScore <= slice[j].sizeScore +} + +func (slice TaskClusterSorter) Swap(i, j int) { + slice[i], slice[j] = slice[j], slice[i] +} + +// order clusters in increasing order of task heaviness +// TODO: Make this look into task.ClassToWatts, if present. +func order(clusters map[int][]Task, numberOfClusters int) []TaskCluster { + // determine the position of the cluster in the ordered list of clusters + clusterSizeScores := []TaskCluster{} + for i := 0; i < numberOfClusters; i++ { + // sizing the current cluster + sizeI := clusterSize(clusters[i]) + + // comparing with the other clusters + for j := 0; j != i; j++ { + if sizeI >= clusterSize(clusters[j]) { + if len(clusterSizeScores) >= i { + clusterSizeScores[i].sizeScore++ + } else { + clusterSizeScores[i] = TaskCluster{ + clusterIndex: i, + tasks: clusters[i], + sizeScore: 1, + } + } + } + } + } + + // Sorting the clusters based on sizeScore + sort.Sort(TaskClusterSorter(clusterSizeScores)) + return clusterSizeScores +} + +// Classification of Tasks using KMeans clustering using the watts consumption observations +type TasksToClassify []Task + +func (tc TasksToClassify) ClassifyTasks(numberOfClusters int) []TaskCluster { + clusters := make(map[int][]Task) + observations := getObservations(tc) + // TODO: Make the number of rounds configurable based on the size of the workload + if trained, centroids := gokmeans.Train(observations, numberOfClusters, 50); trained { + for i := 0; i < len(observations); i++ { + observation := observations[i] + classIndex := gokmeans.Nearest(observation, centroids) + if _, ok := clusters[classIndex]; ok { + clusters[classIndex] = append(clusters[classIndex], tc[i]) + } else { + clusters[classIndex] = []Task{tc[i]} + } + } + } + + return order(clusters, numberOfClusters) +} From e54697b0dcade1241fbd6d181988e61e297f5c56 Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Sat, 25 Feb 2017 15:43:32 -0500 Subject: [PATCH 2/9] Added a task utility to be able to cluster tasks into N clusters based on their watts resource requirements. Electron now compatible with Go1.8 and no longer with 1.7. Added TODOs. --- README.md | 3 ++ def/taskUtils.go | 118 ++++++++++++++++++++++------------------------- 2 files changed, 58 insertions(+), 63 deletions(-) diff --git a/README.md b/README.md index 2e494da..ca2e9af 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,9 @@ To Do: * Make def.Task an interface for further modularization and flexibility. * Convert def#WattsToConsider(...) to be a receiver of def.Task and change the name of it to Watts(...). * Have a generic sorter for task resources instead of having one for each kind of resource. + * **Critical** -- Add software requirements to the README.md (Mesos version, RAPL version, PCP version, Go version...) + * Retrofit to use Go 1.8 sorting techniques. Use def/taskUtils.go for reference. + * Retrofit TopHeavy and BottomHeavy schedulers to use the clustering utility for tasks. **Requires [Performance Co-Pilot](http://pcp.io/) tool pmdumptext to be installed on the machine on which electron is launched for logging to work and PCP collector agents installed diff --git a/def/taskUtils.go b/def/taskUtils.go index 2c003ac..04c90b9 100644 --- a/def/taskUtils.go +++ b/def/taskUtils.go @@ -5,6 +5,34 @@ import ( "sort" ) +// Information about a cluster of tasks +type TaskCluster struct { + ClusterIndex int + Tasks []Task + SizeScore int // How many other clusters is this cluster bigger than +} + +// Classification of Tasks using KMeans clustering using the watts consumption observations +type TasksToClassify []Task + +func (tc TasksToClassify) ClassifyTasks(numberOfClusters int) []TaskCluster { + clusters := make(map[int][]Task) + observations := getObservations(tc) + // TODO: Make the number of rounds configurable based on the size of the workload + if trained, centroids := gokmeans.Train(observations, numberOfClusters, 100); trained { + for i := 0; i < len(observations); i++ { + observation := observations[i] + classIndex := gokmeans.Nearest(observation, centroids) + if _, ok := clusters[classIndex]; ok { + clusters[classIndex] = append(clusters[classIndex], tc[i]) + } else { + clusters[classIndex] = []Task{tc[i]} + } + } + } + return labelAndOrder(clusters, numberOfClusters) +} + // The watts consumption observations are taken into consideration. func getObservations(tasks []Task) []gokmeans.Node { observations := []gokmeans.Node{} @@ -25,6 +53,8 @@ func getObservations(tasks []Task) []gokmeans.Node { return observations } +// Size tasks based on the power consumption +// TODO: Size the cluster in a better way just taking an aggregate of the watts resource requirement. func clusterSize(tasks []Task) float64 { size := 0.0 for _, task := range tasks { @@ -39,76 +69,38 @@ func clusterSize(tasks []Task) float64 { return size } -// information about a cluster of tasks -type TaskCluster struct { - clusterIndex int - tasks []Task - sizeScore int // how many other clusters is this one bigger than (in the current workload) -} +// Order clusters in increasing order of task heaviness +func labelAndOrder(clusters map[int][]Task, numberOfClusters int) []TaskCluster { + // Determine the position of the cluster in the ordered list of clusters + sizedClusters := []TaskCluster{} -// Sorting TaskClusters based on sizeScore -type TaskClusterSorter []TaskCluster - -func (slice TaskClusterSorter) Len() int { - return len(slice) -} - -func (slice TaskClusterSorter) Less(i, j int) bool { - return slice[i].sizeScore <= slice[j].sizeScore -} - -func (slice TaskClusterSorter) Swap(i, j int) { - slice[i], slice[j] = slice[j], slice[i] -} - -// order clusters in increasing order of task heaviness -// TODO: Make this look into task.ClassToWatts, if present. -func order(clusters map[int][]Task, numberOfClusters int) []TaskCluster { - // determine the position of the cluster in the ordered list of clusters - clusterSizeScores := []TaskCluster{} + // Initializing for i := 0; i < numberOfClusters; i++ { - // sizing the current cluster + sizedClusters = append(sizedClusters, TaskCluster{ + ClusterIndex: i, + Tasks: clusters[i], + SizeScore: 0, + }) + } + + for i := 0; i < numberOfClusters-1; i++ { + // Sizing the current cluster sizeI := clusterSize(clusters[i]) - // comparing with the other clusters - for j := 0; j != i; j++ { - if sizeI >= clusterSize(clusters[j]) { - if len(clusterSizeScores) >= i { - clusterSizeScores[i].sizeScore++ - } else { - clusterSizeScores[i] = TaskCluster{ - clusterIndex: i, - tasks: clusters[i], - sizeScore: 1, - } - } + // Comparing with the other clusters + for j := i + 1; j < numberOfClusters; j++ { + sizeJ := clusterSize(clusters[j]) + if sizeI > sizeJ { + sizedClusters[i].SizeScore++ + } else { + sizedClusters[j].SizeScore++ } } } // Sorting the clusters based on sizeScore - sort.Sort(TaskClusterSorter(clusterSizeScores)) - return clusterSizeScores -} - -// Classification of Tasks using KMeans clustering using the watts consumption observations -type TasksToClassify []Task - -func (tc TasksToClassify) ClassifyTasks(numberOfClusters int) []TaskCluster { - clusters := make(map[int][]Task) - observations := getObservations(tc) - // TODO: Make the number of rounds configurable based on the size of the workload - if trained, centroids := gokmeans.Train(observations, numberOfClusters, 50); trained { - for i := 0; i < len(observations); i++ { - observation := observations[i] - classIndex := gokmeans.Nearest(observation, centroids) - if _, ok := clusters[classIndex]; ok { - clusters[classIndex] = append(clusters[classIndex], tc[i]) - } else { - clusters[classIndex] = []Task{tc[i]} - } - } - } - - return order(clusters, numberOfClusters) + sort.SliceStable(sizedClusters, func(i, j int) bool { + return sizedClusters[i].SizeScore <= sizedClusters[j].SizeScore + }) + return sizedClusters } From 74eb616a72cfac373c58207ef4432a4ae1b43f95 Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Sat, 25 Feb 2017 19:57:01 -0500 Subject: [PATCH 3/9] the observation that is to be considered for the classification of a task can now be given as a function that takes the task and returns the observations as a slice of float64. Retrofitted the other functions in taskUtils to make use of this feature. --- def/taskUtils.go | 41 +++++++++++++---------------------------- 1 file changed, 13 insertions(+), 28 deletions(-) diff --git a/def/taskUtils.go b/def/taskUtils.go index 04c90b9..6b9a722 100644 --- a/def/taskUtils.go +++ b/def/taskUtils.go @@ -15,10 +15,10 @@ type TaskCluster struct { // Classification of Tasks using KMeans clustering using the watts consumption observations type TasksToClassify []Task -func (tc TasksToClassify) ClassifyTasks(numberOfClusters int) []TaskCluster { +func (tc TasksToClassify) ClassifyTasks(numberOfClusters int, taskObservation func(task Task) []float64) []TaskCluster { clusters := make(map[int][]Task) - observations := getObservations(tc) - // TODO: Make the number of rounds configurable based on the size of the workload + observations := getObservations(tc, taskObservation) + // TODO: Make the number rounds configurable based on the size of the workload if trained, centroids := gokmeans.Train(observations, numberOfClusters, 100); trained { for i := 0; i < len(observations); i++ { observation := observations[i] @@ -30,47 +30,32 @@ func (tc TasksToClassify) ClassifyTasks(numberOfClusters int) []TaskCluster { } } } - return labelAndOrder(clusters, numberOfClusters) + return labelAndOrder(clusters, numberOfClusters, taskObservation) } -// The watts consumption observations are taken into consideration. -func getObservations(tasks []Task) []gokmeans.Node { +// record observations +func getObservations(tasks []Task, taskObservation func(task Task) []float64) []gokmeans.Node { observations := []gokmeans.Node{} for i := 0; i < len(tasks); i++ { - task := tasks[i] - // If observations present for the power-classes, then using it - if task.ClassToWatts != nil { - observation := gokmeans.Node{} - for _, watts := range task.ClassToWatts { - observation = append(observation, watts) - } - observations = append(observations, observation) - } else { - // Using the watts attribute alone - observations = append(observations, gokmeans.Node{task.Watts}) - } + observations = append(observations, taskObservation(tasks[i])) } return observations } // Size tasks based on the power consumption // TODO: Size the cluster in a better way just taking an aggregate of the watts resource requirement. -func clusterSize(tasks []Task) float64 { +func clusterSize(tasks []Task, taskObservation func(task Task) []float64) float64 { size := 0.0 for _, task := range tasks { - if task.ClassToWatts != nil { - for _, powerClassWatts := range task.ClassToWatts { - size += powerClassWatts - } - } else { - size += task.Watts + for _, observation := range taskObservation(task) { + size += observation } } return size } // Order clusters in increasing order of task heaviness -func labelAndOrder(clusters map[int][]Task, numberOfClusters int) []TaskCluster { +func labelAndOrder(clusters map[int][]Task, numberOfClusters int, taskObservation func(task Task) []float64) []TaskCluster { // Determine the position of the cluster in the ordered list of clusters sizedClusters := []TaskCluster{} @@ -85,11 +70,11 @@ func labelAndOrder(clusters map[int][]Task, numberOfClusters int) []TaskCluster for i := 0; i < numberOfClusters-1; i++ { // Sizing the current cluster - sizeI := clusterSize(clusters[i]) + sizeI := clusterSize(clusters[i], taskObservation) // Comparing with the other clusters for j := i + 1; j < numberOfClusters; j++ { - sizeJ := clusterSize(clusters[j]) + sizeJ := clusterSize(clusters[j], taskObservation) if sizeI > sizeJ { sizedClusters[i].SizeScore++ } else { From 34ab753491ce474f4ea133a04217823eb9fb058e Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Mon, 13 Mar 2017 16:38:53 -0400 Subject: [PATCH 4/9] Used the KMeans classification to classify the tasks. --- schedulers/bottomHeavy.go | 23 +++++++++++++++++++---- schedulers/topHeavy.go | 23 +++++++++++++++++++---- 2 files changed, 38 insertions(+), 8 deletions(-) diff --git a/schedulers/bottomHeavy.go b/schedulers/bottomHeavy.go index e1b5567..2112fe2 100644 --- a/schedulers/bottomHeavy.go +++ b/schedulers/bottomHeavy.go @@ -11,7 +11,6 @@ import ( "github.com/mesos/mesos-go/mesosutil" sched "github.com/mesos/mesos-go/scheduler" "log" - "math" "os" "sort" "time" @@ -68,7 +67,23 @@ func NewBottomHeavy(tasks []def.Task, wattsAsAResource bool, schedTracePrefix st // Separating small tasks from large tasks. // Classification done based on MMPU watts requirements. - mid := int(math.Floor((float64(len(tasks)) / 2.0) + 0.5)) + tasksToClassify := def.TasksToClassify(tasks) + classifiedTasks := tasksToClassify.ClassifyTasks(2, func(task def.Task) float64 { + if task.ClassToWatts != nil { + // taking the aggregate + observation := 0.0 + for _, watts := range task.ClassToWatts { + observation += watts + } + return observation + } else if task.Watts != 0.0 { + return task.Watts + } else { + log.Fatal("Unable to classify tasks. Missing Watts or ClassToWatts attribute in workload.") + return 0.0 // won't reach here + } + }) + s := &BottomHeavy{ base: base{ wattsAsAResource: wattsAsAResource, @@ -80,8 +95,8 @@ func NewBottomHeavy(tasks []def.Task, wattsAsAResource bool, schedTracePrefix st RecordPCP: false, schedTrace: log.New(logFile, "", log.LstdFlags), }, - smallTasks: tasks[:mid], - largeTasks: tasks[mid+1:], + smallTasks: classifiedTasks[0].Tasks, + largeTasks: classifiedTasks[1].Tasks, } return s } diff --git a/schedulers/topHeavy.go b/schedulers/topHeavy.go index 63c11e0..44b8292 100644 --- a/schedulers/topHeavy.go +++ b/schedulers/topHeavy.go @@ -11,7 +11,6 @@ import ( "github.com/mesos/mesos-go/mesosutil" sched "github.com/mesos/mesos-go/scheduler" "log" - "math" "os" "sort" "time" @@ -67,7 +66,23 @@ func NewTopHeavy(tasks []def.Task, wattsAsAResource bool, schedTracePrefix strin // Separating small tasks from large tasks. // Classification done based on MMPU watts requirements. - mid := int(math.Floor((float64(len(tasks)) / 2.0) + 0.5)) + tasksToClassify := def.TasksToClassify(tasks) + classifiedTasks := tasksToClassify.ClassifyTasks(2, func(task def.Task) float64 { + if task.ClassToWatts != nil { + // taking the aggregate + observation := 0.0 + for _, watts := range task.ClassToWatts { + observation += watts + } + return observation + } else if task.Watts != 0.0 { + return task.Watts + } else { + log.Fatal("Unable to classify tasks. Missing Watts or ClassToWatts attribute in workload.") + return 0.0 + } + }) + s := &TopHeavy{ base: base{ wattsAsAResource: wattsAsAResource, @@ -79,8 +94,8 @@ func NewTopHeavy(tasks []def.Task, wattsAsAResource bool, schedTracePrefix strin RecordPCP: false, schedTrace: log.New(logFile, "", log.LstdFlags), }, - smallTasks: tasks[:mid], - largeTasks: tasks[mid+1:], + smallTasks: classifiedTasks[0].Tasks, + largeTasks: classifiedTasks[1].Tasks, } return s } From 190b395bc36fed53693890f92f74c0571387d24b Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Mon, 13 Mar 2017 16:44:52 -0400 Subject: [PATCH 5/9] fixed bug to return []float64{...} as observations rather than one value. --- schedulers/bottomHeavy.go | 12 ++++++------ schedulers/topHeavy.go | 12 ++++++------ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/schedulers/bottomHeavy.go b/schedulers/bottomHeavy.go index 2112fe2..f938095 100644 --- a/schedulers/bottomHeavy.go +++ b/schedulers/bottomHeavy.go @@ -68,19 +68,19 @@ func NewBottomHeavy(tasks []def.Task, wattsAsAResource bool, schedTracePrefix st // Separating small tasks from large tasks. // Classification done based on MMPU watts requirements. tasksToClassify := def.TasksToClassify(tasks) - classifiedTasks := tasksToClassify.ClassifyTasks(2, func(task def.Task) float64 { + classifiedTasks := tasksToClassify.ClassifyTasks(2, func(task def.Task) []float64 { if task.ClassToWatts != nil { // taking the aggregate - observation := 0.0 + observations := []float64{} for _, watts := range task.ClassToWatts { - observation += watts + observations = append(observations, watts) } - return observation + return observations } else if task.Watts != 0.0 { - return task.Watts + return []float64{task.Watts} } else { log.Fatal("Unable to classify tasks. Missing Watts or ClassToWatts attribute in workload.") - return 0.0 // won't reach here + return []float64{0.0} // won't reach here } }) diff --git a/schedulers/topHeavy.go b/schedulers/topHeavy.go index 44b8292..c4f0d44 100644 --- a/schedulers/topHeavy.go +++ b/schedulers/topHeavy.go @@ -67,19 +67,19 @@ func NewTopHeavy(tasks []def.Task, wattsAsAResource bool, schedTracePrefix strin // Separating small tasks from large tasks. // Classification done based on MMPU watts requirements. tasksToClassify := def.TasksToClassify(tasks) - classifiedTasks := tasksToClassify.ClassifyTasks(2, func(task def.Task) float64 { + classifiedTasks := tasksToClassify.ClassifyTasks(2, func(task def.Task) []float64 { if task.ClassToWatts != nil { // taking the aggregate - observation := 0.0 + observations := []float64{} for _, watts := range task.ClassToWatts { - observation += watts + observations = append(observations, watts) } - return observation + return observations } else if task.Watts != 0.0 { - return task.Watts + return []float64{task.Watts} } else { log.Fatal("Unable to classify tasks. Missing Watts or ClassToWatts attribute in workload.") - return 0.0 + return []float64{0.0} // won't be here } }) From e3569e95ce4143170ec72083bbdb2b802e9fed3a Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Sun, 30 Apr 2017 16:48:38 -0400 Subject: [PATCH 6/9] fixed comments to be a little more meaningful. --- def/taskUtils.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/def/taskUtils.go b/def/taskUtils.go index 6b9a722..bbdbb4c 100644 --- a/def/taskUtils.go +++ b/def/taskUtils.go @@ -18,7 +18,9 @@ type TasksToClassify []Task func (tc TasksToClassify) ClassifyTasks(numberOfClusters int, taskObservation func(task Task) []float64) []TaskCluster { clusters := make(map[int][]Task) observations := getObservations(tc, taskObservation) - // TODO: Make the number rounds configurable based on the size of the workload + // TODO: Make the max number of rounds configurable based on the size of the workload + // The max number of rounds (currently defaulted to 100) is the number of iterations performed to obtain + // distinct clusters. When the data size becomes very large, we would need more iterations for clustering. if trained, centroids := gokmeans.Train(observations, numberOfClusters, 100); trained { for i := 0; i < len(observations); i++ { observation := observations[i] @@ -43,7 +45,7 @@ func getObservations(tasks []Task, taskObservation func(task Task) []float64) [] } // Size tasks based on the power consumption -// TODO: Size the cluster in a better way just taking an aggregate of the watts resource requirement. +// TODO: Size the cluster in a better way other than just taking an aggregate of the watts resource requirement. func clusterSize(tasks []Task, taskObservation func(task Task) []float64) float64 { size := 0.0 for _, task := range tasks { From 41ef269c62844cb047717e76939202b2791f553f Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Tue, 22 Aug 2017 12:56:36 -0400 Subject: [PATCH 7/9] fixed indentation --- scheduler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scheduler.go b/scheduler.go index 2128309..1eca3b4 100644 --- a/scheduler.go +++ b/scheduler.go @@ -120,4 +120,4 @@ func main() { log.Printf("Framework stopped with status %s and error: %s\n", status.String(), err.Error()) } log.Println("Exiting...") -} \ No newline at end of file +} From f4459c8cbf5c375906776ccc89140b4990c3bae0 Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Tue, 22 Aug 2017 13:00:40 -0400 Subject: [PATCH 8/9] Consolidated the ClassifyTasks(...) functions from topHeavy and bottomHeavy and added the function to def/taskUtils.go. Added TODOs for refining the means by which the kmeans classified clusters were sorted. --- def/taskUtils.go | 25 ++++++++++++++++++++++++- schedulers/README.md | 1 + schedulers/bottomHeavy.go | 21 +++------------------ schedulers/topHeavy.go | 21 +++------------------ 4 files changed, 31 insertions(+), 37 deletions(-) diff --git a/def/taskUtils.go b/def/taskUtils.go index 6b9a722..f02342b 100644 --- a/def/taskUtils.go +++ b/def/taskUtils.go @@ -3,6 +3,7 @@ package def import ( "github.com/mdesenfants/gokmeans" "sort" + "log" ) // Information about a cluster of tasks @@ -15,7 +16,29 @@ type TaskCluster struct { // Classification of Tasks using KMeans clustering using the watts consumption observations type TasksToClassify []Task -func (tc TasksToClassify) ClassifyTasks(numberOfClusters int, taskObservation func(task Task) []float64) []TaskCluster { +// Basic taskObservation calculator. This returns an array consisting of the MMPU requirements of a task. +func (tc TasksToClassify) taskObservationCalculator(task Task) []float64 { + if task.ClassToWatts != nil { + // taking the aggregate + observations := []float64{} + for _, watts := range task.ClassToWatts { + observations = append(observations, watts) + } + return observations + } else if task.Watts != 0.0 { + return []float64{task.Watts} + } else { + log.Fatal("Unable to classify tasks. Missing Watts or ClassToWatts attribute in workload.") + return []float64{0.0} // won't reach here + } +} + +func ClassifyTasks(tasks []Task, numberOfClusters int) []TaskCluster { + tc := TasksToClassify(tasks) + return tc.classify(numberOfClusters, tc.taskObservationCalculator) +} + +func (tc TasksToClassify) classify(numberOfClusters int, taskObservation func(task Task) []float64) []TaskCluster { clusters := make(map[int][]Task) observations := getObservations(tc, taskObservation) // TODO: Make the number rounds configurable based on the size of the workload diff --git a/schedulers/README.md b/schedulers/README.md index 476e114..9173d01 100644 --- a/schedulers/README.md +++ b/schedulers/README.md @@ -7,6 +7,7 @@ To Do: * Fix the race condition on 'tasksRunning' in proactiveclusterwidecappingfcfs.go and proactiveclusterwidecappingranked.go * **Critical**: Separate the capping strategies from the scheduling algorithms and make it possible to use any capping strategy with any scheduler. * Create a package that would contain routines to perform various logging and move helpers.coLocated(...) into that. + * Refine the sorting algorithm that sorts the clusters of tasks retrieved using the kmeans algorithm. This also involves the reduction in time complexity of the same. Scheduling Algorithms: diff --git a/schedulers/bottomHeavy.go b/schedulers/bottomHeavy.go index 7f8d623..d677f00 100644 --- a/schedulers/bottomHeavy.go +++ b/schedulers/bottomHeavy.go @@ -65,24 +65,8 @@ func NewBottomHeavy(tasks []def.Task, wattsAsAResource bool, schedTracePrefix st log.Fatal(err) } - // Separating small tasks from large tasks. - // Classification done based on MMPU watts requirements. - tasksToClassify := def.TasksToClassify(tasks) - classifiedTasks := tasksToClassify.ClassifyTasks(2, func(task def.Task) []float64 { - if task.ClassToWatts != nil { - // taking the aggregate - observations := []float64{} - for _, watts := range task.ClassToWatts { - observations = append(observations, watts) - } - return observations - } else if task.Watts != 0.0 { - return []float64{task.Watts} - } else { - log.Fatal("Unable to classify tasks. Missing Watts or ClassToWatts attribute in workload.") - return []float64{0.0} // won't reach here - } - }) + // Classification done based on MMPU watts requirements, into 2 clusters. + classifiedTasks := def.ClassifyTasks(tasks, 2) s := &BottomHeavy{ base: base{ @@ -95,6 +79,7 @@ func NewBottomHeavy(tasks []def.Task, wattsAsAResource bool, schedTracePrefix st RecordPCP: false, schedTrace: log.New(logFile, "", log.LstdFlags), }, + // Separating small tasks from large tasks. smallTasks: classifiedTasks[0].Tasks, largeTasks: classifiedTasks[1].Tasks, } diff --git a/schedulers/topHeavy.go b/schedulers/topHeavy.go index 35aa06c..0f7f405 100644 --- a/schedulers/topHeavy.go +++ b/schedulers/topHeavy.go @@ -64,24 +64,8 @@ func NewTopHeavy(tasks []def.Task, wattsAsAResource bool, schedTracePrefix strin log.Fatal(err) } - // Separating small tasks from large tasks. - // Classification done based on MMPU watts requirements. - tasksToClassify := def.TasksToClassify(tasks) - classifiedTasks := tasksToClassify.ClassifyTasks(2, func(task def.Task) []float64 { - if task.ClassToWatts != nil { - // taking the aggregate - observations := []float64{} - for _, watts := range task.ClassToWatts { - observations = append(observations, watts) - } - return observations - } else if task.Watts != 0.0 { - return []float64{task.Watts} - } else { - log.Fatal("Unable to classify tasks. Missing Watts or ClassToWatts attribute in workload.") - return []float64{0.0} // won't be here - } - }) + // Classification done based on MMPU watts requirements, into 2 clusters. + classifiedTasks := def.ClassifyTasks(tasks, 2) s := &TopHeavy{ base: base{ @@ -94,6 +78,7 @@ func NewTopHeavy(tasks []def.Task, wattsAsAResource bool, schedTracePrefix strin RecordPCP: false, schedTrace: log.New(logFile, "", log.LstdFlags), }, + // Separating small tasks from large tasks. smallTasks: classifiedTasks[0].Tasks, largeTasks: classifiedTasks[1].Tasks, } From 235ed189d59869eaf9f74b64584f6650a6bde134 Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Tue, 22 Aug 2017 13:09:05 -0400 Subject: [PATCH 9/9] Moved the TODO, for the refinement of the cluster sorting algorithm, to the main README file. --- README.md | 1 + schedulers/README.md | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 152f0eb..d93a97c 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,7 @@ To Do: * **Critical** -- Add software requirements to the README.md (Mesos version, RAPL version, PCP version, Go version...) * **Critical** -- Retrofit to use Go 1.8 sorting techniques. Use def/taskUtils.go for reference. * Handle powerclass not configured on a node condition. As of now, an assumption is made that the powerclass is configured + * Refine the sorting algorithm that sorts the clusters of tasks retrieved using the kmeans algorithm. This also involves the reduction in time complexity of the same. for all the nodes. **Requires [Performance Co-Pilot](http://pcp.io/) tool pmdumptext to be installed on the diff --git a/schedulers/README.md b/schedulers/README.md index 9173d01..476e114 100644 --- a/schedulers/README.md +++ b/schedulers/README.md @@ -7,7 +7,6 @@ To Do: * Fix the race condition on 'tasksRunning' in proactiveclusterwidecappingfcfs.go and proactiveclusterwidecappingranked.go * **Critical**: Separate the capping strategies from the scheduling algorithms and make it possible to use any capping strategy with any scheduler. * Create a package that would contain routines to perform various logging and move helpers.coLocated(...) into that. - * Refine the sorting algorithm that sorts the clusters of tasks retrieved using the kmeans algorithm. This also involves the reduction in time complexity of the same. Scheduling Algorithms: