diff --git a/schedulers/bottomHeavy.go b/schedulers/bottomHeavy.go index e1b5567..2112fe2 100644 --- a/schedulers/bottomHeavy.go +++ b/schedulers/bottomHeavy.go @@ -11,7 +11,6 @@ import ( "github.com/mesos/mesos-go/mesosutil" sched "github.com/mesos/mesos-go/scheduler" "log" - "math" "os" "sort" "time" @@ -68,7 +67,23 @@ func NewBottomHeavy(tasks []def.Task, wattsAsAResource bool, schedTracePrefix st // Separating small tasks from large tasks. // Classification done based on MMPU watts requirements. - mid := int(math.Floor((float64(len(tasks)) / 2.0) + 0.5)) + tasksToClassify := def.TasksToClassify(tasks) + classifiedTasks := tasksToClassify.ClassifyTasks(2, func(task def.Task) float64 { + if task.ClassToWatts != nil { + // taking the aggregate + observation := 0.0 + for _, watts := range task.ClassToWatts { + observation += watts + } + return observation + } else if task.Watts != 0.0 { + return task.Watts + } else { + log.Fatal("Unable to classify tasks. Missing Watts or ClassToWatts attribute in workload.") + return 0.0 // won't reach here + } + }) + s := &BottomHeavy{ base: base{ wattsAsAResource: wattsAsAResource, @@ -80,8 +95,8 @@ func NewBottomHeavy(tasks []def.Task, wattsAsAResource bool, schedTracePrefix st RecordPCP: false, schedTrace: log.New(logFile, "", log.LstdFlags), }, - smallTasks: tasks[:mid], - largeTasks: tasks[mid+1:], + smallTasks: classifiedTasks[0].Tasks, + largeTasks: classifiedTasks[1].Tasks, } return s } diff --git a/schedulers/topHeavy.go b/schedulers/topHeavy.go index 63c11e0..44b8292 100644 --- a/schedulers/topHeavy.go +++ b/schedulers/topHeavy.go @@ -11,7 +11,6 @@ import ( "github.com/mesos/mesos-go/mesosutil" sched "github.com/mesos/mesos-go/scheduler" "log" - "math" "os" "sort" "time" @@ -67,7 +66,23 @@ func NewTopHeavy(tasks []def.Task, wattsAsAResource bool, schedTracePrefix strin // Separating small tasks from large tasks. // Classification done based on MMPU watts requirements. - mid := int(math.Floor((float64(len(tasks)) / 2.0) + 0.5)) + tasksToClassify := def.TasksToClassify(tasks) + classifiedTasks := tasksToClassify.ClassifyTasks(2, func(task def.Task) float64 { + if task.ClassToWatts != nil { + // taking the aggregate + observation := 0.0 + for _, watts := range task.ClassToWatts { + observation += watts + } + return observation + } else if task.Watts != 0.0 { + return task.Watts + } else { + log.Fatal("Unable to classify tasks. Missing Watts or ClassToWatts attribute in workload.") + return 0.0 + } + }) + s := &TopHeavy{ base: base{ wattsAsAResource: wattsAsAResource, @@ -79,8 +94,8 @@ func NewTopHeavy(tasks []def.Task, wattsAsAResource bool, schedTracePrefix strin RecordPCP: false, schedTrace: log.New(logFile, "", log.LstdFlags), }, - smallTasks: tasks[:mid], - largeTasks: tasks[mid+1:], + smallTasks: classifiedTasks[0].Tasks, + largeTasks: classifiedTasks[1].Tasks, } return s }