Added a task utility to be able to cluster tasks into N clusters based on their watts resource requirements. Electron now compatible with Go1.8 and no longer with 1.7. Added TODOs.

This commit is contained in:
Pradyumna Kaushik 2017-02-25 15:43:32 -05:00
parent 9dddc38cad
commit e54697b0dc
2 changed files with 58 additions and 63 deletions

View file

@ -18,6 +18,9 @@ To Do:
* Make def.Task an interface for further modularization and flexibility. * Make def.Task an interface for further modularization and flexibility.
* Convert def#WattsToConsider(...) to be a receiver of def.Task and change the name of it to Watts(...). * Convert def#WattsToConsider(...) to be a receiver of def.Task and change the name of it to Watts(...).
* Have a generic sorter for task resources instead of having one for each kind of resource. * Have a generic sorter for task resources instead of having one for each kind of resource.
* **Critical** -- Add software requirements to the README.md (Mesos version, RAPL version, PCP version, Go version...)
* Retrofit to use Go 1.8 sorting techniques. Use def/taskUtils.go for reference.
* Retrofit TopHeavy and BottomHeavy schedulers to use the clustering utility for tasks.
**Requires [Performance Co-Pilot](http://pcp.io/) tool pmdumptext to be installed on the **Requires [Performance Co-Pilot](http://pcp.io/) tool pmdumptext to be installed on the
machine on which electron is launched for logging to work and PCP collector agents installed machine on which electron is launched for logging to work and PCP collector agents installed

View file

@ -5,6 +5,34 @@ import (
"sort" "sort"
) )
// Information about a cluster of tasks
type TaskCluster struct {
ClusterIndex int
Tasks []Task
SizeScore int // How many other clusters is this cluster bigger than
}
// Classification of Tasks using KMeans clustering using the watts consumption observations
type TasksToClassify []Task
func (tc TasksToClassify) ClassifyTasks(numberOfClusters int) []TaskCluster {
clusters := make(map[int][]Task)
observations := getObservations(tc)
// TODO: Make the number of rounds configurable based on the size of the workload
if trained, centroids := gokmeans.Train(observations, numberOfClusters, 100); trained {
for i := 0; i < len(observations); i++ {
observation := observations[i]
classIndex := gokmeans.Nearest(observation, centroids)
if _, ok := clusters[classIndex]; ok {
clusters[classIndex] = append(clusters[classIndex], tc[i])
} else {
clusters[classIndex] = []Task{tc[i]}
}
}
}
return labelAndOrder(clusters, numberOfClusters)
}
// The watts consumption observations are taken into consideration. // The watts consumption observations are taken into consideration.
func getObservations(tasks []Task) []gokmeans.Node { func getObservations(tasks []Task) []gokmeans.Node {
observations := []gokmeans.Node{} observations := []gokmeans.Node{}
@ -25,6 +53,8 @@ func getObservations(tasks []Task) []gokmeans.Node {
return observations return observations
} }
// Size tasks based on the power consumption
// TODO: Size the cluster in a better way just taking an aggregate of the watts resource requirement.
func clusterSize(tasks []Task) float64 { func clusterSize(tasks []Task) float64 {
size := 0.0 size := 0.0
for _, task := range tasks { for _, task := range tasks {
@ -39,76 +69,38 @@ func clusterSize(tasks []Task) float64 {
return size return size
} }
// information about a cluster of tasks // Order clusters in increasing order of task heaviness
type TaskCluster struct { func labelAndOrder(clusters map[int][]Task, numberOfClusters int) []TaskCluster {
clusterIndex int // Determine the position of the cluster in the ordered list of clusters
tasks []Task sizedClusters := []TaskCluster{}
sizeScore int // how many other clusters is this one bigger than (in the current workload)
}
// Sorting TaskClusters based on sizeScore // Initializing
type TaskClusterSorter []TaskCluster
func (slice TaskClusterSorter) Len() int {
return len(slice)
}
func (slice TaskClusterSorter) Less(i, j int) bool {
return slice[i].sizeScore <= slice[j].sizeScore
}
func (slice TaskClusterSorter) Swap(i, j int) {
slice[i], slice[j] = slice[j], slice[i]
}
// order clusters in increasing order of task heaviness
// TODO: Make this look into task.ClassToWatts, if present.
func order(clusters map[int][]Task, numberOfClusters int) []TaskCluster {
// determine the position of the cluster in the ordered list of clusters
clusterSizeScores := []TaskCluster{}
for i := 0; i < numberOfClusters; i++ { for i := 0; i < numberOfClusters; i++ {
// sizing the current cluster sizedClusters = append(sizedClusters, TaskCluster{
ClusterIndex: i,
Tasks: clusters[i],
SizeScore: 0,
})
}
for i := 0; i < numberOfClusters-1; i++ {
// Sizing the current cluster
sizeI := clusterSize(clusters[i]) sizeI := clusterSize(clusters[i])
// comparing with the other clusters // Comparing with the other clusters
for j := 0; j != i; j++ { for j := i + 1; j < numberOfClusters; j++ {
if sizeI >= clusterSize(clusters[j]) { sizeJ := clusterSize(clusters[j])
if len(clusterSizeScores) >= i { if sizeI > sizeJ {
clusterSizeScores[i].sizeScore++ sizedClusters[i].SizeScore++
} else { } else {
clusterSizeScores[i] = TaskCluster{ sizedClusters[j].SizeScore++
clusterIndex: i,
tasks: clusters[i],
sizeScore: 1,
}
}
} }
} }
} }
// Sorting the clusters based on sizeScore // Sorting the clusters based on sizeScore
sort.Sort(TaskClusterSorter(clusterSizeScores)) sort.SliceStable(sizedClusters, func(i, j int) bool {
return clusterSizeScores return sizedClusters[i].SizeScore <= sizedClusters[j].SizeScore
} })
return sizedClusters
// Classification of Tasks using KMeans clustering using the watts consumption observations
type TasksToClassify []Task
func (tc TasksToClassify) ClassifyTasks(numberOfClusters int) []TaskCluster {
clusters := make(map[int][]Task)
observations := getObservations(tc)
// TODO: Make the number of rounds configurable based on the size of the workload
if trained, centroids := gokmeans.Train(observations, numberOfClusters, 50); trained {
for i := 0; i < len(observations); i++ {
observation := observations[i]
classIndex := gokmeans.Nearest(observation, centroids)
if _, ok := clusters[classIndex]; ok {
clusters[classIndex] = append(clusters[classIndex], tc[i])
} else {
clusters[classIndex] = []Task{tc[i]}
}
}
}
return order(clusters, numberOfClusters)
} }