2017-02-24 20:52:59 -05:00
package def
import (
"github.com/mdesenfants/gokmeans"
"sort"
)
2017-02-25 15:43:32 -05:00
// Information about a cluster of tasks
type TaskCluster struct {
ClusterIndex int
Tasks [ ] Task
SizeScore int // How many other clusters is this cluster bigger than
}
// Classification of Tasks using KMeans clustering using the watts consumption observations
type TasksToClassify [ ] Task
2017-02-25 19:57:01 -05:00
func ( tc TasksToClassify ) ClassifyTasks ( numberOfClusters int , taskObservation func ( task Task ) [ ] float64 ) [ ] TaskCluster {
2017-02-25 15:43:32 -05:00
clusters := make ( map [ int ] [ ] Task )
2017-02-25 19:57:01 -05:00
observations := getObservations ( tc , taskObservation )
// TODO: Make the number rounds configurable based on the size of the workload
2017-02-25 15:43:32 -05:00
if trained , centroids := gokmeans . Train ( observations , numberOfClusters , 100 ) ; trained {
for i := 0 ; i < len ( observations ) ; i ++ {
observation := observations [ i ]
classIndex := gokmeans . Nearest ( observation , centroids )
if _ , ok := clusters [ classIndex ] ; ok {
clusters [ classIndex ] = append ( clusters [ classIndex ] , tc [ i ] )
} else {
clusters [ classIndex ] = [ ] Task { tc [ i ] }
}
}
}
2017-02-25 19:57:01 -05:00
return labelAndOrder ( clusters , numberOfClusters , taskObservation )
2017-02-25 15:43:32 -05:00
}
2017-02-25 19:57:01 -05:00
// record observations
func getObservations ( tasks [ ] Task , taskObservation func ( task Task ) [ ] float64 ) [ ] gokmeans . Node {
2017-02-24 20:52:59 -05:00
observations := [ ] gokmeans . Node { }
for i := 0 ; i < len ( tasks ) ; i ++ {
2017-02-25 19:57:01 -05:00
observations = append ( observations , taskObservation ( tasks [ i ] ) )
2017-02-24 20:52:59 -05:00
}
return observations
}
2017-02-25 15:43:32 -05:00
// Size tasks based on the power consumption
// TODO: Size the cluster in a better way just taking an aggregate of the watts resource requirement.
2017-02-25 19:57:01 -05:00
func clusterSize ( tasks [ ] Task , taskObservation func ( task Task ) [ ] float64 ) float64 {
2017-02-24 20:52:59 -05:00
size := 0.0
for _ , task := range tasks {
2017-02-25 19:57:01 -05:00
for _ , observation := range taskObservation ( task ) {
size += observation
2017-02-24 20:52:59 -05:00
}
}
return size
}
2017-02-25 15:43:32 -05:00
// Order clusters in increasing order of task heaviness
2017-02-25 19:57:01 -05:00
func labelAndOrder ( clusters map [ int ] [ ] Task , numberOfClusters int , taskObservation func ( task Task ) [ ] float64 ) [ ] TaskCluster {
2017-02-25 15:43:32 -05:00
// Determine the position of the cluster in the ordered list of clusters
sizedClusters := [ ] TaskCluster { }
2017-02-24 20:52:59 -05:00
2017-02-25 15:43:32 -05:00
// Initializing
2017-02-24 20:52:59 -05:00
for i := 0 ; i < numberOfClusters ; i ++ {
2017-02-25 15:43:32 -05:00
sizedClusters = append ( sizedClusters , TaskCluster {
ClusterIndex : i ,
Tasks : clusters [ i ] ,
SizeScore : 0 ,
} )
2017-02-24 20:52:59 -05:00
}
2017-02-25 15:43:32 -05:00
for i := 0 ; i < numberOfClusters - 1 ; i ++ {
// Sizing the current cluster
2017-02-25 19:57:01 -05:00
sizeI := clusterSize ( clusters [ i ] , taskObservation )
2017-02-24 20:52:59 -05:00
2017-02-25 15:43:32 -05:00
// Comparing with the other clusters
for j := i + 1 ; j < numberOfClusters ; j ++ {
2017-02-25 19:57:01 -05:00
sizeJ := clusterSize ( clusters [ j ] , taskObservation )
2017-02-25 15:43:32 -05:00
if sizeI > sizeJ {
sizedClusters [ i ] . SizeScore ++
2017-02-24 20:52:59 -05:00
} else {
2017-02-25 15:43:32 -05:00
sizedClusters [ j ] . SizeScore ++
2017-02-24 20:52:59 -05:00
}
}
}
2017-02-25 15:43:32 -05:00
// Sorting the clusters based on sizeScore
sort . SliceStable ( sizedClusters , func ( i , j int ) bool {
return sizedClusters [ i ] . SizeScore <= sizedClusters [ j ] . SizeScore
} )
return sizedClusters
2017-02-24 20:52:59 -05:00
}