diff --git a/README.md b/README.md index 16a1114..d93a97c 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,10 @@ To Do: * Make def.Task an interface for further modularization and flexibility. * Convert def#WattsToConsider(...) to be a receiver of def.Task and change the name of it to Watts(...). * Have a generic sorter for task resources instead of having one for each kind of resource. + * **Critical** -- Add software requirements to the README.md (Mesos version, RAPL version, PCP version, Go version...) + * **Critical** -- Retrofit to use Go 1.8 sorting techniques. Use def/taskUtils.go for reference. * Handle powerclass not configured on a node condition. As of now, an assumption is made that the powerclass is configured + * Refine the sorting algorithm that sorts the clusters of tasks retrieved using the kmeans algorithm. This also involves the reduction in time complexity of the same. for all the nodes. **Requires [Performance Co-Pilot](http://pcp.io/) tool pmdumptext to be installed on the diff --git a/def/taskUtils.go b/def/taskUtils.go new file mode 100644 index 0000000..2b56be3 --- /dev/null +++ b/def/taskUtils.go @@ -0,0 +1,116 @@ +package def + +import ( + "github.com/mdesenfants/gokmeans" + "sort" + "log" +) + +// Information about a cluster of tasks +type TaskCluster struct { + ClusterIndex int + Tasks []Task + SizeScore int // How many other clusters is this cluster bigger than +} + +// Classification of Tasks using KMeans clustering using the watts consumption observations +type TasksToClassify []Task + +// Basic taskObservation calculator. This returns an array consisting of the MMPU requirements of a task. +func (tc TasksToClassify) taskObservationCalculator(task Task) []float64 { + if task.ClassToWatts != nil { + // taking the aggregate + observations := []float64{} + for _, watts := range task.ClassToWatts { + observations = append(observations, watts) + } + return observations + } else if task.Watts != 0.0 { + return []float64{task.Watts} + } else { + log.Fatal("Unable to classify tasks. Missing Watts or ClassToWatts attribute in workload.") + return []float64{0.0} // won't reach here + } +} + +func ClassifyTasks(tasks []Task, numberOfClusters int) []TaskCluster { + tc := TasksToClassify(tasks) + return tc.classify(numberOfClusters, tc.taskObservationCalculator) +} + +func (tc TasksToClassify) classify(numberOfClusters int, taskObservation func(task Task) []float64) []TaskCluster { + clusters := make(map[int][]Task) + observations := getObservations(tc, taskObservation) + // TODO: Make the max number of rounds configurable based on the size of the workload + // The max number of rounds (currently defaulted to 100) is the number of iterations performed to obtain + // distinct clusters. When the data size becomes very large, we would need more iterations for clustering. + if trained, centroids := gokmeans.Train(observations, numberOfClusters, 100); trained { + for i := 0; i < len(observations); i++ { + observation := observations[i] + classIndex := gokmeans.Nearest(observation, centroids) + if _, ok := clusters[classIndex]; ok { + clusters[classIndex] = append(clusters[classIndex], tc[i]) + } else { + clusters[classIndex] = []Task{tc[i]} + } + } + } + return labelAndOrder(clusters, numberOfClusters, taskObservation) +} + +// record observations +func getObservations(tasks []Task, taskObservation func(task Task) []float64) []gokmeans.Node { + observations := []gokmeans.Node{} + for i := 0; i < len(tasks); i++ { + observations = append(observations, taskObservation(tasks[i])) + } + return observations +} + +// Size tasks based on the power consumption +// TODO: Size the cluster in a better way other than just taking an aggregate of the watts resource requirement. +func clusterSize(tasks []Task, taskObservation func(task Task) []float64) float64 { + size := 0.0 + for _, task := range tasks { + for _, observation := range taskObservation(task) { + size += observation + } + } + return size +} + +// Order clusters in increasing order of task heaviness +func labelAndOrder(clusters map[int][]Task, numberOfClusters int, taskObservation func(task Task) []float64) []TaskCluster { + // Determine the position of the cluster in the ordered list of clusters + sizedClusters := []TaskCluster{} + + // Initializing + for i := 0; i < numberOfClusters; i++ { + sizedClusters = append(sizedClusters, TaskCluster{ + ClusterIndex: i, + Tasks: clusters[i], + SizeScore: 0, + }) + } + + for i := 0; i < numberOfClusters-1; i++ { + // Sizing the current cluster + sizeI := clusterSize(clusters[i], taskObservation) + + // Comparing with the other clusters + for j := i + 1; j < numberOfClusters; j++ { + sizeJ := clusterSize(clusters[j], taskObservation) + if sizeI > sizeJ { + sizedClusters[i].SizeScore++ + } else { + sizedClusters[j].SizeScore++ + } + } + } + + // Sorting the clusters based on sizeScore + sort.SliceStable(sizedClusters, func(i, j int) bool { + return sizedClusters[i].SizeScore <= sizedClusters[j].SizeScore + }) + return sizedClusters +} diff --git a/scheduler.go b/scheduler.go index 2128309..1eca3b4 100644 --- a/scheduler.go +++ b/scheduler.go @@ -120,4 +120,4 @@ func main() { log.Printf("Framework stopped with status %s and error: %s\n", status.String(), err.Error()) } log.Println("Exiting...") -} \ No newline at end of file +} diff --git a/schedulers/bottomHeavy.go b/schedulers/bottomHeavy.go index 3d8251e..d677f00 100644 --- a/schedulers/bottomHeavy.go +++ b/schedulers/bottomHeavy.go @@ -11,7 +11,6 @@ import ( "github.com/mesos/mesos-go/mesosutil" sched "github.com/mesos/mesos-go/scheduler" "log" - "math" "os" "sort" "time" @@ -66,9 +65,9 @@ func NewBottomHeavy(tasks []def.Task, wattsAsAResource bool, schedTracePrefix st log.Fatal(err) } - // Separating small tasks from large tasks. - // Classification done based on MMPU watts requirements. - mid := int(math.Floor((float64(len(tasks)) / 2.0) + 0.5)) + // Classification done based on MMPU watts requirements, into 2 clusters. + classifiedTasks := def.ClassifyTasks(tasks, 2) + s := &BottomHeavy{ base: base{ wattsAsAResource: wattsAsAResource, @@ -80,8 +79,9 @@ func NewBottomHeavy(tasks []def.Task, wattsAsAResource bool, schedTracePrefix st RecordPCP: false, schedTrace: log.New(logFile, "", log.LstdFlags), }, - smallTasks: tasks[:mid], - largeTasks: tasks[mid+1:], + // Separating small tasks from large tasks. + smallTasks: classifiedTasks[0].Tasks, + largeTasks: classifiedTasks[1].Tasks, } return s } diff --git a/schedulers/topHeavy.go b/schedulers/topHeavy.go index 1b62336..0f7f405 100644 --- a/schedulers/topHeavy.go +++ b/schedulers/topHeavy.go @@ -11,7 +11,6 @@ import ( "github.com/mesos/mesos-go/mesosutil" sched "github.com/mesos/mesos-go/scheduler" "log" - "math" "os" "sort" "time" @@ -65,9 +64,9 @@ func NewTopHeavy(tasks []def.Task, wattsAsAResource bool, schedTracePrefix strin log.Fatal(err) } - // Separating small tasks from large tasks. - // Classification done based on MMPU watts requirements. - mid := int(math.Floor((float64(len(tasks)) / 2.0) + 0.5)) + // Classification done based on MMPU watts requirements, into 2 clusters. + classifiedTasks := def.ClassifyTasks(tasks, 2) + s := &TopHeavy{ base: base{ wattsAsAResource: wattsAsAResource, @@ -79,8 +78,9 @@ func NewTopHeavy(tasks []def.Task, wattsAsAResource bool, schedTracePrefix strin RecordPCP: false, schedTrace: log.New(logFile, "", log.LstdFlags), }, - smallTasks: tasks[:mid], - largeTasks: tasks[mid+1:], + // Separating small tasks from large tasks. + smallTasks: classifiedTasks[0].Tasks, + largeTasks: classifiedTasks[1].Tasks, } return s }