fixed bug in schedWindow resizing strategy, where earlier it was stopping after checking if the first task in the queue can be scheduled in the next offer cycle. Changed baseScheduler to store the schedWindowResizingStrategy instead of the criteria. Retrofitted the scheduling policies to use the scheduling window resizing strategy directly from baseSchedRef.

This commit is contained in:
Pradyumna Kaushik 2018-02-02 17:06:59 -05:00
parent 3ebd7b0c7e
commit 435c4ca1bc
6 changed files with 20 additions and 21 deletions

View file

@ -4,7 +4,6 @@ import (
"bitbucket.org/sunybingcloud/elektron/def" "bitbucket.org/sunybingcloud/elektron/def"
"bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils" "bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/elektron/utilities/offerUtils" "bitbucket.org/sunybingcloud/elektron/utilities/offerUtils"
"bitbucket.org/sunybingcloud/elektron/utilities/schedUtils"
"fmt" "fmt"
mesos "github.com/mesos/mesos-go/api/v0/mesosproto" mesos "github.com/mesos/mesos-go/api/v0/mesosproto"
sched "github.com/mesos/mesos-go/api/v0/scheduler" sched "github.com/mesos/mesos-go/api/v0/scheduler"
@ -177,8 +176,7 @@ func (s *MaxGreedyMins) ConsumeOffers(spc SchedPolicyContext, driver sched.Sched
if baseSchedRef.schedPolSwitchEnabled { if baseSchedRef.schedPolSwitchEnabled {
// Need to recompute the schedWindow for the next offer cycle. // Need to recompute the schedWindow for the next offer cycle.
// The next scheduling policy will schedule at max schedWindow number of tasks. // The next scheduling policy will schedule at max schedWindow number of tasks.
baseSchedRef.schedWindow = schedUtils. baseSchedRef.curSchedWindow = baseSchedRef.schedWindowResStrategy.Apply(
SchedWindowResizingCritToStrategy[baseSchedRef.schedWindowResizeCrit].Apply(
func() interface{} { return baseSchedRef.tasks }) func() interface{} { return baseSchedRef.tasks })
// Switching to a random scheduling policy. // Switching to a random scheduling policy.
// TODO: Switch based on some criteria. // TODO: Switch based on some criteria.

View file

@ -4,7 +4,6 @@ import (
"bitbucket.org/sunybingcloud/elektron/def" "bitbucket.org/sunybingcloud/elektron/def"
"bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils" "bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/elektron/utilities/offerUtils" "bitbucket.org/sunybingcloud/elektron/utilities/offerUtils"
"bitbucket.org/sunybingcloud/elektron/utilities/schedUtils"
"fmt" "fmt"
mesos "github.com/mesos/mesos-go/api/v0/mesosproto" mesos "github.com/mesos/mesos-go/api/v0/mesosproto"
sched "github.com/mesos/mesos-go/api/v0/scheduler" sched "github.com/mesos/mesos-go/api/v0/scheduler"
@ -171,8 +170,7 @@ func (s *MaxMin) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDri
if baseSchedRef.schedPolSwitchEnabled { if baseSchedRef.schedPolSwitchEnabled {
// Need to recompute the schedWindow for the next offer cycle. // Need to recompute the schedWindow for the next offer cycle.
// The next scheduling policy will schedule at max schedWindow number of tasks. // The next scheduling policy will schedule at max schedWindow number of tasks.
baseSchedRef.schedWindow = schedUtils. baseSchedRef.curSchedWindow = baseSchedRef.schedWindowResStrategy.Apply(
SchedWindowResizingCritToStrategy[baseSchedRef.schedWindowResizeCrit].Apply(
func() interface{} { return baseSchedRef.tasks }) func() interface{} { return baseSchedRef.tasks })
// Switching to a random scheduling policy. // Switching to a random scheduling policy.
// TODO: Switch based on some criteria. // TODO: Switch based on some criteria.

View file

@ -60,13 +60,13 @@ type BaseScheduler struct {
// Size of window of tasks that can be scheduled in the next offer cycle. // Size of window of tasks that can be scheduled in the next offer cycle.
// The window size can be adjusted to make the most use of every resource offer. // The window size can be adjusted to make the most use of every resource offer.
// By default, the schedulingWindow would correspond to all the remaining tasks that haven't yet been scheduled. // By default, the schedulingWindow would correspond to all the remaining tasks that haven't yet been scheduled.
schedulingWindow int schedulingWindow int
// Criteria to resize the schedulingWindow. // Strategy to resize the schedulingWindow.
schedWindowResizeCrit schedUtils.SchedulingWindowResizingCriteria schedWindowResStrategy schedUtils.SchedWindowResizingStrategy
// Window of tasks that the current scheduling policy has to schedule. // Window of tasks that the current scheduling policy has to schedule.
// Once #schedWindow tasks are scheduled, the current scheduling policy has to stop scheduling. // Once #schedWindow tasks are scheduled, the current scheduling policy has to stop scheduling.
schedWindow int curSchedWindow int
} }
func (s *BaseScheduler) init(opts ...schedPolicyOption) { func (s *BaseScheduler) init(opts ...schedPolicyOption) {
@ -78,7 +78,7 @@ func (s *BaseScheduler) init(opts ...schedPolicyOption) {
} }
s.running = make(map[string]map[string]bool) s.running = make(map[string]map[string]bool)
s.mutex = sync.Mutex{} s.mutex = sync.Mutex{}
s.schedWindowResizeCrit = "fillNextOfferCycle" s.schedWindowResStrategy = schedUtils.SchedWindowResizingCritToStrategy["fillNextOfferCycle"]
} }
func (s *BaseScheduler) SwitchSchedPol(newSchedPol SchedPolicyState) { func (s *BaseScheduler) SwitchSchedPol(newSchedPol SchedPolicyState) {

View file

@ -4,7 +4,6 @@ import (
"bitbucket.org/sunybingcloud/elektron/def" "bitbucket.org/sunybingcloud/elektron/def"
"bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils" "bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/elektron/utilities/offerUtils" "bitbucket.org/sunybingcloud/elektron/utilities/offerUtils"
"bitbucket.org/sunybingcloud/elektron/utilities/schedUtils"
"fmt" "fmt"
mesos "github.com/mesos/mesos-go/api/v0/mesosproto" mesos "github.com/mesos/mesos-go/api/v0/mesosproto"
sched "github.com/mesos/mesos-go/api/v0/scheduler" sched "github.com/mesos/mesos-go/api/v0/scheduler"
@ -121,8 +120,7 @@ func (s *BinPackSortedWatts) ConsumeOffers(spc SchedPolicyContext, driver sched.
if baseSchedRef.schedPolSwitchEnabled { if baseSchedRef.schedPolSwitchEnabled {
// Need to recompute the schedWindow for the next offer cycle. // Need to recompute the schedWindow for the next offer cycle.
// The next scheduling policy will schedule at max schedWindow number of tasks. // The next scheduling policy will schedule at max schedWindow number of tasks.
baseSchedRef.schedWindow = schedUtils. baseSchedRef.curSchedWindow = baseSchedRef.schedWindowResStrategy.Apply(
SchedWindowResizingCritToStrategy[baseSchedRef.schedWindowResizeCrit].Apply(
func() interface{} { return baseSchedRef.tasks }) func() interface{} { return baseSchedRef.tasks })
// Switching to a random scheduling policy. // Switching to a random scheduling policy.
// TODO: Switch based on some criteria. // TODO: Switch based on some criteria.

View file

@ -4,7 +4,6 @@ import (
"bitbucket.org/sunybingcloud/elektron/def" "bitbucket.org/sunybingcloud/elektron/def"
"bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils" "bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/elektron/utilities/offerUtils" "bitbucket.org/sunybingcloud/elektron/utilities/offerUtils"
"bitbucket.org/sunybingcloud/elektron/utilities/schedUtils"
"fmt" "fmt"
mesos "github.com/mesos/mesos-go/api/v0/mesosproto" mesos "github.com/mesos/mesos-go/api/v0/mesosproto"
sched "github.com/mesos/mesos-go/api/v0/scheduler" sched "github.com/mesos/mesos-go/api/v0/scheduler"
@ -106,8 +105,7 @@ func (s *FirstFit) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerD
if baseSchedRef.schedPolSwitchEnabled { if baseSchedRef.schedPolSwitchEnabled {
// Need to recompute the schedWindow for the next offer cycle. // Need to recompute the schedWindow for the next offer cycle.
// The next scheduling policy will schedule at max schedWindow number of tasks. // The next scheduling policy will schedule at max schedWindow number of tasks.
baseSchedRef.schedWindow = schedUtils. baseSchedRef.curSchedWindow = baseSchedRef.schedWindowResStrategy.Apply(
SchedWindowResizingCritToStrategy[baseSchedRef.schedWindowResizeCrit].Apply(
func() interface{} { return baseSchedRef.tasks }) func() interface{} { return baseSchedRef.tasks })
// Switching to a random scheduling policy. // Switching to a random scheduling policy.
// TODO: Switch based on some criteria. // TODO: Switch based on some criteria.

View file

@ -3,24 +3,25 @@ package schedUtils
import ( import (
"bitbucket.org/sunybingcloud/elektron/def" "bitbucket.org/sunybingcloud/elektron/def"
"bitbucket.org/sunybingcloud/elektron/utilities" "bitbucket.org/sunybingcloud/elektron/utilities"
"log"
) )
// Criteria for resizing the scheduling window. // Criteria for resizing the scheduling window.
type SchedulingWindowResizingCriteria string type SchedulingWindowResizingCriteria string
var SchedWindowResizingCritToStrategy = map[SchedulingWindowResizingCriteria]schedWindowResizingStrategy{ var SchedWindowResizingCritToStrategy = map[SchedulingWindowResizingCriteria]SchedWindowResizingStrategy{
"fillNextOfferCycle": &fillNextOfferCycle{}, "fillNextOfferCycle": &fillNextOfferCycle{},
} }
// Interface for a scheduling window resizing strategy. // Interface for a scheduling window resizing strategy.
type schedWindowResizingStrategy interface { type SchedWindowResizingStrategy interface {
// Apply the window resizing strategy and return the news scheduling window size. // Apply the window resizing strategy and return the news scheduling window size.
Apply(func() interface{}) int Apply(func() interface{}) int
} }
// Scheduling window resizing strategy that attempts to resize the scheduling window // Scheduling window resizing strategy that attempts to resize the scheduling window
// to include as many tasks as possible so as to make the most use of the next offer cycle. // to include as many tasks as possible so as to make the most use of the next offer cycle.
type fillNextOfferCycle struct{} type fillNextOfferCycle struct {}
func (s *fillNextOfferCycle) Apply(getArgs func() interface{}) int { func (s *fillNextOfferCycle) Apply(getArgs func() interface{}) int {
return s.apply(getArgs().([]def.Task)) return s.apply(getArgs().([]def.Task))
@ -47,17 +48,23 @@ func (s *fillNextOfferCycle) apply(taskQueue []def.Task) int {
return false return false
} }
done := false
for _, task := range taskQueue { for _, task := range taskQueue {
for i := *task.Instances; i > 0; i-- { for i := *task.Instances; i > 0; i-- {
log.Printf("Checking if Instance #%d of Task[%s] can be scheduled "+
"during the next offer cycle...", i, task.Name)
if canSchedule(task) { if canSchedule(task) {
filledCPU += task.CPU filledCPU += task.CPU
filledRAM += task.RAM filledRAM += task.RAM
newSchedWindow++ newSchedWindow++
} else { } else {
done = true
break break
} }
} }
break if done {
break
}
} }
return newSchedWindow return newSchedWindow
} }