2017-02-24 20:52:59 -05:00
package def
import (
"github.com/mdesenfants/gokmeans"
"sort"
2017-08-22 13:00:40 -04:00
"log"
2017-08-23 19:35:19 -04:00
"reflect"
2017-02-24 20:52:59 -05:00
)
2017-02-25 15:43:32 -05:00
// Information about a cluster of tasks
type TaskCluster struct {
ClusterIndex int
Tasks [ ] Task
SizeScore int // How many other clusters is this cluster bigger than
}
// Classification of Tasks using KMeans clustering using the watts consumption observations
type TasksToClassify [ ] Task
2017-08-22 13:00:40 -04:00
// Basic taskObservation calculator. This returns an array consisting of the MMPU requirements of a task.
func ( tc TasksToClassify ) taskObservationCalculator ( task Task ) [ ] float64 {
if task . ClassToWatts != nil {
// taking the aggregate
observations := [ ] float64 { }
for _ , watts := range task . ClassToWatts {
observations = append ( observations , watts )
}
return observations
} else if task . Watts != 0.0 {
return [ ] float64 { task . Watts }
} else {
log . Fatal ( "Unable to classify tasks. Missing Watts or ClassToWatts attribute in workload." )
return [ ] float64 { 0.0 } // won't reach here
}
}
func ClassifyTasks ( tasks [ ] Task , numberOfClusters int ) [ ] TaskCluster {
tc := TasksToClassify ( tasks )
return tc . classify ( numberOfClusters , tc . taskObservationCalculator )
}
func ( tc TasksToClassify ) classify ( numberOfClusters int , taskObservation func ( task Task ) [ ] float64 ) [ ] TaskCluster {
2017-02-25 15:43:32 -05:00
clusters := make ( map [ int ] [ ] Task )
2017-02-25 19:57:01 -05:00
observations := getObservations ( tc , taskObservation )
2017-04-30 16:48:38 -04:00
// TODO: Make the max number of rounds configurable based on the size of the workload
// The max number of rounds (currently defaulted to 100) is the number of iterations performed to obtain
// distinct clusters. When the data size becomes very large, we would need more iterations for clustering.
2017-02-25 15:43:32 -05:00
if trained , centroids := gokmeans . Train ( observations , numberOfClusters , 100 ) ; trained {
for i := 0 ; i < len ( observations ) ; i ++ {
observation := observations [ i ]
classIndex := gokmeans . Nearest ( observation , centroids )
if _ , ok := clusters [ classIndex ] ; ok {
clusters [ classIndex ] = append ( clusters [ classIndex ] , tc [ i ] )
} else {
clusters [ classIndex ] = [ ] Task { tc [ i ] }
}
}
}
2017-02-25 19:57:01 -05:00
return labelAndOrder ( clusters , numberOfClusters , taskObservation )
2017-02-25 15:43:32 -05:00
}
2017-02-25 19:57:01 -05:00
// record observations
func getObservations ( tasks [ ] Task , taskObservation func ( task Task ) [ ] float64 ) [ ] gokmeans . Node {
2017-02-24 20:52:59 -05:00
observations := [ ] gokmeans . Node { }
for i := 0 ; i < len ( tasks ) ; i ++ {
2017-02-25 19:57:01 -05:00
observations = append ( observations , taskObservation ( tasks [ i ] ) )
2017-02-24 20:52:59 -05:00
}
return observations
}
2017-02-25 15:43:32 -05:00
// Size tasks based on the power consumption
2017-04-30 16:48:38 -04:00
// TODO: Size the cluster in a better way other than just taking an aggregate of the watts resource requirement.
2017-02-25 19:57:01 -05:00
func clusterSize ( tasks [ ] Task , taskObservation func ( task Task ) [ ] float64 ) float64 {
2017-02-24 20:52:59 -05:00
size := 0.0
for _ , task := range tasks {
2017-02-25 19:57:01 -05:00
for _ , observation := range taskObservation ( task ) {
size += observation
2017-02-24 20:52:59 -05:00
}
}
return size
}
2017-02-25 15:43:32 -05:00
// Order clusters in increasing order of task heaviness
2017-02-25 19:57:01 -05:00
func labelAndOrder ( clusters map [ int ] [ ] Task , numberOfClusters int , taskObservation func ( task Task ) [ ] float64 ) [ ] TaskCluster {
2017-02-25 15:43:32 -05:00
// Determine the position of the cluster in the ordered list of clusters
sizedClusters := [ ] TaskCluster { }
2017-02-24 20:52:59 -05:00
2017-02-25 15:43:32 -05:00
// Initializing
2017-02-24 20:52:59 -05:00
for i := 0 ; i < numberOfClusters ; i ++ {
2017-02-25 15:43:32 -05:00
sizedClusters = append ( sizedClusters , TaskCluster {
ClusterIndex : i ,
Tasks : clusters [ i ] ,
SizeScore : 0 ,
} )
2017-02-24 20:52:59 -05:00
}
2017-02-25 15:43:32 -05:00
for i := 0 ; i < numberOfClusters - 1 ; i ++ {
// Sizing the current cluster
2017-02-25 19:57:01 -05:00
sizeI := clusterSize ( clusters [ i ] , taskObservation )
2017-02-24 20:52:59 -05:00
2017-02-25 15:43:32 -05:00
// Comparing with the other clusters
for j := i + 1 ; j < numberOfClusters ; j ++ {
2017-02-25 19:57:01 -05:00
sizeJ := clusterSize ( clusters [ j ] , taskObservation )
2017-02-25 15:43:32 -05:00
if sizeI > sizeJ {
sizedClusters [ i ] . SizeScore ++
2017-02-24 20:52:59 -05:00
} else {
2017-02-25 15:43:32 -05:00
sizedClusters [ j ] . SizeScore ++
2017-02-24 20:52:59 -05:00
}
}
}
2017-02-25 15:43:32 -05:00
// Sorting the clusters based on sizeScore
sort . SliceStable ( sizedClusters , func ( i , j int ) bool {
return sizedClusters [ i ] . SizeScore <= sizedClusters [ j ] . SizeScore
} )
return sizedClusters
2017-02-24 20:52:59 -05:00
}
2017-08-23 19:35:19 -04:00
// Generic Task Sorter.
// Be able to sort an array of tasks based on any of the tasks' resources.
// Retrieve a sorter (same signature as 'Less' function in sort.Interface) for the given sorting criteria.
2017-08-23 20:10:33 -04:00
func TaskSorter ( sc sortCriteria , tasks [ ] Task ) func ( i , j int ) bool {
2017-08-23 19:35:19 -04:00
return func ( i , j int ) bool {
taskIFields := reflect . Indirect ( reflect . ValueOf ( tasks [ i ] ) )
tasksJFields := reflect . Indirect ( reflect . ValueOf ( tasks [ j ] ) )
resourceI := taskIFields . FieldByName ( sc . String ( ) ) . Float ( )
resourceJ := tasksJFields . FieldByName ( sc . String ( ) ) . Float ( )
return resourceI <= resourceJ
}
}