diff --git a/README.md b/README.md index 2e494da..ca2e9af 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,9 @@ To Do: * Make def.Task an interface for further modularization and flexibility. * Convert def#WattsToConsider(...) to be a receiver of def.Task and change the name of it to Watts(...). * Have a generic sorter for task resources instead of having one for each kind of resource. + * **Critical** -- Add software requirements to the README.md (Mesos version, RAPL version, PCP version, Go version...) + * Retrofit to use Go 1.8 sorting techniques. Use def/taskUtils.go for reference. + * Retrofit TopHeavy and BottomHeavy schedulers to use the clustering utility for tasks. **Requires [Performance Co-Pilot](http://pcp.io/) tool pmdumptext to be installed on the machine on which electron is launched for logging to work and PCP collector agents installed diff --git a/def/taskUtils.go b/def/taskUtils.go index 2c003ac..04c90b9 100644 --- a/def/taskUtils.go +++ b/def/taskUtils.go @@ -5,6 +5,34 @@ import ( "sort" ) +// Information about a cluster of tasks +type TaskCluster struct { + ClusterIndex int + Tasks []Task + SizeScore int // How many other clusters is this cluster bigger than +} + +// Classification of Tasks using KMeans clustering using the watts consumption observations +type TasksToClassify []Task + +func (tc TasksToClassify) ClassifyTasks(numberOfClusters int) []TaskCluster { + clusters := make(map[int][]Task) + observations := getObservations(tc) + // TODO: Make the number of rounds configurable based on the size of the workload + if trained, centroids := gokmeans.Train(observations, numberOfClusters, 100); trained { + for i := 0; i < len(observations); i++ { + observation := observations[i] + classIndex := gokmeans.Nearest(observation, centroids) + if _, ok := clusters[classIndex]; ok { + clusters[classIndex] = append(clusters[classIndex], tc[i]) + } else { + clusters[classIndex] = []Task{tc[i]} + } + } + } + return labelAndOrder(clusters, numberOfClusters) +} + // The watts consumption observations are taken into consideration. func getObservations(tasks []Task) []gokmeans.Node { observations := []gokmeans.Node{} @@ -25,6 +53,8 @@ func getObservations(tasks []Task) []gokmeans.Node { return observations } +// Size tasks based on the power consumption +// TODO: Size the cluster in a better way just taking an aggregate of the watts resource requirement. func clusterSize(tasks []Task) float64 { size := 0.0 for _, task := range tasks { @@ -39,76 +69,38 @@ func clusterSize(tasks []Task) float64 { return size } -// information about a cluster of tasks -type TaskCluster struct { - clusterIndex int - tasks []Task - sizeScore int // how many other clusters is this one bigger than (in the current workload) -} +// Order clusters in increasing order of task heaviness +func labelAndOrder(clusters map[int][]Task, numberOfClusters int) []TaskCluster { + // Determine the position of the cluster in the ordered list of clusters + sizedClusters := []TaskCluster{} -// Sorting TaskClusters based on sizeScore -type TaskClusterSorter []TaskCluster - -func (slice TaskClusterSorter) Len() int { - return len(slice) -} - -func (slice TaskClusterSorter) Less(i, j int) bool { - return slice[i].sizeScore <= slice[j].sizeScore -} - -func (slice TaskClusterSorter) Swap(i, j int) { - slice[i], slice[j] = slice[j], slice[i] -} - -// order clusters in increasing order of task heaviness -// TODO: Make this look into task.ClassToWatts, if present. -func order(clusters map[int][]Task, numberOfClusters int) []TaskCluster { - // determine the position of the cluster in the ordered list of clusters - clusterSizeScores := []TaskCluster{} + // Initializing for i := 0; i < numberOfClusters; i++ { - // sizing the current cluster + sizedClusters = append(sizedClusters, TaskCluster{ + ClusterIndex: i, + Tasks: clusters[i], + SizeScore: 0, + }) + } + + for i := 0; i < numberOfClusters-1; i++ { + // Sizing the current cluster sizeI := clusterSize(clusters[i]) - // comparing with the other clusters - for j := 0; j != i; j++ { - if sizeI >= clusterSize(clusters[j]) { - if len(clusterSizeScores) >= i { - clusterSizeScores[i].sizeScore++ - } else { - clusterSizeScores[i] = TaskCluster{ - clusterIndex: i, - tasks: clusters[i], - sizeScore: 1, - } - } + // Comparing with the other clusters + for j := i + 1; j < numberOfClusters; j++ { + sizeJ := clusterSize(clusters[j]) + if sizeI > sizeJ { + sizedClusters[i].SizeScore++ + } else { + sizedClusters[j].SizeScore++ } } } // Sorting the clusters based on sizeScore - sort.Sort(TaskClusterSorter(clusterSizeScores)) - return clusterSizeScores -} - -// Classification of Tasks using KMeans clustering using the watts consumption observations -type TasksToClassify []Task - -func (tc TasksToClassify) ClassifyTasks(numberOfClusters int) []TaskCluster { - clusters := make(map[int][]Task) - observations := getObservations(tc) - // TODO: Make the number of rounds configurable based on the size of the workload - if trained, centroids := gokmeans.Train(observations, numberOfClusters, 50); trained { - for i := 0; i < len(observations); i++ { - observation := observations[i] - classIndex := gokmeans.Nearest(observation, centroids) - if _, ok := clusters[classIndex]; ok { - clusters[classIndex] = append(clusters[classIndex], tc[i]) - } else { - clusters[classIndex] = []Task{tc[i]} - } - } - } - - return order(clusters, numberOfClusters) + sort.SliceStable(sizedClusters, func(i, j int) bool { + return sizedClusters[i].SizeScore <= sizedClusters[j].SizeScore + }) + return sizedClusters }