From f4459c8cbf5c375906776ccc89140b4990c3bae0 Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Tue, 22 Aug 2017 13:00:40 -0400 Subject: [PATCH] Consolidated the ClassifyTasks(...) functions from topHeavy and bottomHeavy and added the function to def/taskUtils.go. Added TODOs for refining the means by which the kmeans classified clusters were sorted. --- def/taskUtils.go | 25 ++++++++++++++++++++++++- schedulers/README.md | 1 + schedulers/bottomHeavy.go | 21 +++------------------ schedulers/topHeavy.go | 21 +++------------------ 4 files changed, 31 insertions(+), 37 deletions(-) diff --git a/def/taskUtils.go b/def/taskUtils.go index 6b9a722..f02342b 100644 --- a/def/taskUtils.go +++ b/def/taskUtils.go @@ -3,6 +3,7 @@ package def import ( "github.com/mdesenfants/gokmeans" "sort" + "log" ) // Information about a cluster of tasks @@ -15,7 +16,29 @@ type TaskCluster struct { // Classification of Tasks using KMeans clustering using the watts consumption observations type TasksToClassify []Task -func (tc TasksToClassify) ClassifyTasks(numberOfClusters int, taskObservation func(task Task) []float64) []TaskCluster { +// Basic taskObservation calculator. This returns an array consisting of the MMPU requirements of a task. +func (tc TasksToClassify) taskObservationCalculator(task Task) []float64 { + if task.ClassToWatts != nil { + // taking the aggregate + observations := []float64{} + for _, watts := range task.ClassToWatts { + observations = append(observations, watts) + } + return observations + } else if task.Watts != 0.0 { + return []float64{task.Watts} + } else { + log.Fatal("Unable to classify tasks. Missing Watts or ClassToWatts attribute in workload.") + return []float64{0.0} // won't reach here + } +} + +func ClassifyTasks(tasks []Task, numberOfClusters int) []TaskCluster { + tc := TasksToClassify(tasks) + return tc.classify(numberOfClusters, tc.taskObservationCalculator) +} + +func (tc TasksToClassify) classify(numberOfClusters int, taskObservation func(task Task) []float64) []TaskCluster { clusters := make(map[int][]Task) observations := getObservations(tc, taskObservation) // TODO: Make the number rounds configurable based on the size of the workload diff --git a/schedulers/README.md b/schedulers/README.md index 476e114..9173d01 100644 --- a/schedulers/README.md +++ b/schedulers/README.md @@ -7,6 +7,7 @@ To Do: * Fix the race condition on 'tasksRunning' in proactiveclusterwidecappingfcfs.go and proactiveclusterwidecappingranked.go * **Critical**: Separate the capping strategies from the scheduling algorithms and make it possible to use any capping strategy with any scheduler. * Create a package that would contain routines to perform various logging and move helpers.coLocated(...) into that. + * Refine the sorting algorithm that sorts the clusters of tasks retrieved using the kmeans algorithm. This also involves the reduction in time complexity of the same. Scheduling Algorithms: diff --git a/schedulers/bottomHeavy.go b/schedulers/bottomHeavy.go index 7f8d623..d677f00 100644 --- a/schedulers/bottomHeavy.go +++ b/schedulers/bottomHeavy.go @@ -65,24 +65,8 @@ func NewBottomHeavy(tasks []def.Task, wattsAsAResource bool, schedTracePrefix st log.Fatal(err) } - // Separating small tasks from large tasks. - // Classification done based on MMPU watts requirements. - tasksToClassify := def.TasksToClassify(tasks) - classifiedTasks := tasksToClassify.ClassifyTasks(2, func(task def.Task) []float64 { - if task.ClassToWatts != nil { - // taking the aggregate - observations := []float64{} - for _, watts := range task.ClassToWatts { - observations = append(observations, watts) - } - return observations - } else if task.Watts != 0.0 { - return []float64{task.Watts} - } else { - log.Fatal("Unable to classify tasks. Missing Watts or ClassToWatts attribute in workload.") - return []float64{0.0} // won't reach here - } - }) + // Classification done based on MMPU watts requirements, into 2 clusters. + classifiedTasks := def.ClassifyTasks(tasks, 2) s := &BottomHeavy{ base: base{ @@ -95,6 +79,7 @@ func NewBottomHeavy(tasks []def.Task, wattsAsAResource bool, schedTracePrefix st RecordPCP: false, schedTrace: log.New(logFile, "", log.LstdFlags), }, + // Separating small tasks from large tasks. smallTasks: classifiedTasks[0].Tasks, largeTasks: classifiedTasks[1].Tasks, } diff --git a/schedulers/topHeavy.go b/schedulers/topHeavy.go index 35aa06c..0f7f405 100644 --- a/schedulers/topHeavy.go +++ b/schedulers/topHeavy.go @@ -64,24 +64,8 @@ func NewTopHeavy(tasks []def.Task, wattsAsAResource bool, schedTracePrefix strin log.Fatal(err) } - // Separating small tasks from large tasks. - // Classification done based on MMPU watts requirements. - tasksToClassify := def.TasksToClassify(tasks) - classifiedTasks := tasksToClassify.ClassifyTasks(2, func(task def.Task) []float64 { - if task.ClassToWatts != nil { - // taking the aggregate - observations := []float64{} - for _, watts := range task.ClassToWatts { - observations = append(observations, watts) - } - return observations - } else if task.Watts != 0.0 { - return []float64{task.Watts} - } else { - log.Fatal("Unable to classify tasks. Missing Watts or ClassToWatts attribute in workload.") - return []float64{0.0} // won't be here - } - }) + // Classification done based on MMPU watts requirements, into 2 clusters. + classifiedTasks := def.ClassifyTasks(tasks, 2) s := &TopHeavy{ base: base{ @@ -94,6 +78,7 @@ func NewTopHeavy(tasks []def.Task, wattsAsAResource bool, schedTracePrefix strin RecordPCP: false, schedTrace: log.New(logFile, "", log.LstdFlags), }, + // Separating small tasks from large tasks. smallTasks: classifiedTasks[0].Tasks, largeTasks: classifiedTasks[1].Tasks, }