Merged in kmeansTaskClassification (pull request #17)

KmeansTaskClassification

Approved-by: ajain13 <ajain13@binghamton.edu>
Approved-by: Renan DelValle <rdelval1@binghamton.edu>
This commit is contained in:
Pradyumna Kaushik 2017-08-23 02:57:33 +00:00 committed by Renan DelValle
commit 9b9dc73269
5 changed files with 132 additions and 13 deletions

View file

@ -17,7 +17,10 @@ To Do:
* Make def.Task an interface for further modularization and flexibility. * Make def.Task an interface for further modularization and flexibility.
* Convert def#WattsToConsider(...) to be a receiver of def.Task and change the name of it to Watts(...). * Convert def#WattsToConsider(...) to be a receiver of def.Task and change the name of it to Watts(...).
* Have a generic sorter for task resources instead of having one for each kind of resource. * Have a generic sorter for task resources instead of having one for each kind of resource.
* **Critical** -- Add software requirements to the README.md (Mesos version, RAPL version, PCP version, Go version...)
* **Critical** -- Retrofit to use Go 1.8 sorting techniques. Use def/taskUtils.go for reference.
* Handle powerclass not configured on a node condition. As of now, an assumption is made that the powerclass is configured * Handle powerclass not configured on a node condition. As of now, an assumption is made that the powerclass is configured
* Refine the sorting algorithm that sorts the clusters of tasks retrieved using the kmeans algorithm. This also involves the reduction in time complexity of the same.
for all the nodes. for all the nodes.
**Requires [Performance Co-Pilot](http://pcp.io/) tool pmdumptext to be installed on the **Requires [Performance Co-Pilot](http://pcp.io/) tool pmdumptext to be installed on the

116
def/taskUtils.go Normal file
View file

@ -0,0 +1,116 @@
package def
import (
"github.com/mdesenfants/gokmeans"
"sort"
"log"
)
// Information about a cluster of tasks
type TaskCluster struct {
ClusterIndex int
Tasks []Task
SizeScore int // How many other clusters is this cluster bigger than
}
// Classification of Tasks using KMeans clustering using the watts consumption observations
type TasksToClassify []Task
// Basic taskObservation calculator. This returns an array consisting of the MMPU requirements of a task.
func (tc TasksToClassify) taskObservationCalculator(task Task) []float64 {
if task.ClassToWatts != nil {
// taking the aggregate
observations := []float64{}
for _, watts := range task.ClassToWatts {
observations = append(observations, watts)
}
return observations
} else if task.Watts != 0.0 {
return []float64{task.Watts}
} else {
log.Fatal("Unable to classify tasks. Missing Watts or ClassToWatts attribute in workload.")
return []float64{0.0} // won't reach here
}
}
func ClassifyTasks(tasks []Task, numberOfClusters int) []TaskCluster {
tc := TasksToClassify(tasks)
return tc.classify(numberOfClusters, tc.taskObservationCalculator)
}
func (tc TasksToClassify) classify(numberOfClusters int, taskObservation func(task Task) []float64) []TaskCluster {
clusters := make(map[int][]Task)
observations := getObservations(tc, taskObservation)
// TODO: Make the max number of rounds configurable based on the size of the workload
// The max number of rounds (currently defaulted to 100) is the number of iterations performed to obtain
// distinct clusters. When the data size becomes very large, we would need more iterations for clustering.
if trained, centroids := gokmeans.Train(observations, numberOfClusters, 100); trained {
for i := 0; i < len(observations); i++ {
observation := observations[i]
classIndex := gokmeans.Nearest(observation, centroids)
if _, ok := clusters[classIndex]; ok {
clusters[classIndex] = append(clusters[classIndex], tc[i])
} else {
clusters[classIndex] = []Task{tc[i]}
}
}
}
return labelAndOrder(clusters, numberOfClusters, taskObservation)
}
// record observations
func getObservations(tasks []Task, taskObservation func(task Task) []float64) []gokmeans.Node {
observations := []gokmeans.Node{}
for i := 0; i < len(tasks); i++ {
observations = append(observations, taskObservation(tasks[i]))
}
return observations
}
// Size tasks based on the power consumption
// TODO: Size the cluster in a better way other than just taking an aggregate of the watts resource requirement.
func clusterSize(tasks []Task, taskObservation func(task Task) []float64) float64 {
size := 0.0
for _, task := range tasks {
for _, observation := range taskObservation(task) {
size += observation
}
}
return size
}
// Order clusters in increasing order of task heaviness
func labelAndOrder(clusters map[int][]Task, numberOfClusters int, taskObservation func(task Task) []float64) []TaskCluster {
// Determine the position of the cluster in the ordered list of clusters
sizedClusters := []TaskCluster{}
// Initializing
for i := 0; i < numberOfClusters; i++ {
sizedClusters = append(sizedClusters, TaskCluster{
ClusterIndex: i,
Tasks: clusters[i],
SizeScore: 0,
})
}
for i := 0; i < numberOfClusters-1; i++ {
// Sizing the current cluster
sizeI := clusterSize(clusters[i], taskObservation)
// Comparing with the other clusters
for j := i + 1; j < numberOfClusters; j++ {
sizeJ := clusterSize(clusters[j], taskObservation)
if sizeI > sizeJ {
sizedClusters[i].SizeScore++
} else {
sizedClusters[j].SizeScore++
}
}
}
// Sorting the clusters based on sizeScore
sort.SliceStable(sizedClusters, func(i, j int) bool {
return sizedClusters[i].SizeScore <= sizedClusters[j].SizeScore
})
return sizedClusters
}

View file

@ -120,4 +120,4 @@ func main() {
log.Printf("Framework stopped with status %s and error: %s\n", status.String(), err.Error()) log.Printf("Framework stopped with status %s and error: %s\n", status.String(), err.Error())
} }
log.Println("Exiting...") log.Println("Exiting...")
} }

View file

@ -11,7 +11,6 @@ import (
"github.com/mesos/mesos-go/mesosutil" "github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler" sched "github.com/mesos/mesos-go/scheduler"
"log" "log"
"math"
"os" "os"
"sort" "sort"
"time" "time"
@ -66,9 +65,9 @@ func NewBottomHeavy(tasks []def.Task, wattsAsAResource bool, schedTracePrefix st
log.Fatal(err) log.Fatal(err)
} }
// Separating small tasks from large tasks. // Classification done based on MMPU watts requirements, into 2 clusters.
// Classification done based on MMPU watts requirements. classifiedTasks := def.ClassifyTasks(tasks, 2)
mid := int(math.Floor((float64(len(tasks)) / 2.0) + 0.5))
s := &BottomHeavy{ s := &BottomHeavy{
base: base{ base: base{
wattsAsAResource: wattsAsAResource, wattsAsAResource: wattsAsAResource,
@ -80,8 +79,9 @@ func NewBottomHeavy(tasks []def.Task, wattsAsAResource bool, schedTracePrefix st
RecordPCP: false, RecordPCP: false,
schedTrace: log.New(logFile, "", log.LstdFlags), schedTrace: log.New(logFile, "", log.LstdFlags),
}, },
smallTasks: tasks[:mid], // Separating small tasks from large tasks.
largeTasks: tasks[mid+1:], smallTasks: classifiedTasks[0].Tasks,
largeTasks: classifiedTasks[1].Tasks,
} }
return s return s
} }

View file

@ -11,7 +11,6 @@ import (
"github.com/mesos/mesos-go/mesosutil" "github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler" sched "github.com/mesos/mesos-go/scheduler"
"log" "log"
"math"
"os" "os"
"sort" "sort"
"time" "time"
@ -65,9 +64,9 @@ func NewTopHeavy(tasks []def.Task, wattsAsAResource bool, schedTracePrefix strin
log.Fatal(err) log.Fatal(err)
} }
// Separating small tasks from large tasks. // Classification done based on MMPU watts requirements, into 2 clusters.
// Classification done based on MMPU watts requirements. classifiedTasks := def.ClassifyTasks(tasks, 2)
mid := int(math.Floor((float64(len(tasks)) / 2.0) + 0.5))
s := &TopHeavy{ s := &TopHeavy{
base: base{ base: base{
wattsAsAResource: wattsAsAResource, wattsAsAResource: wattsAsAResource,
@ -79,8 +78,9 @@ func NewTopHeavy(tasks []def.Task, wattsAsAResource bool, schedTracePrefix strin
RecordPCP: false, RecordPCP: false,
schedTrace: log.New(logFile, "", log.LstdFlags), schedTrace: log.New(logFile, "", log.LstdFlags),
}, },
smallTasks: tasks[:mid], // Separating small tasks from large tasks.
largeTasks: tasks[mid+1:], smallTasks: classifiedTasks[0].Tasks,
largeTasks: classifiedTasks[1].Tasks,
} }
return s return s
} }