From 435c4ca1bc3a561cf0aa8879beb8327f422ffa4b Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Fri, 2 Feb 2018 17:06:59 -0500 Subject: [PATCH] fixed bug in schedWindow resizing strategy, where earlier it was stopping after checking if the first task in the queue can be scheduled in the next offer cycle. Changed baseScheduler to store the schedWindowResizingStrategy instead of the criteria. Retrofitted the scheduling policies to use the scheduling window resizing strategy directly from baseSchedRef. --- schedulers/MaxGreedyMins.go | 4 +--- schedulers/MaxMin.go | 4 +--- schedulers/base.go | 10 +++++----- schedulers/bin-packing.go | 4 +--- schedulers/first-fit.go | 4 +--- utilities/schedUtils/schedUtils.go | 15 +++++++++++---- 6 files changed, 20 insertions(+), 21 deletions(-) diff --git a/schedulers/MaxGreedyMins.go b/schedulers/MaxGreedyMins.go index 4ac046a..5d9ae08 100644 --- a/schedulers/MaxGreedyMins.go +++ b/schedulers/MaxGreedyMins.go @@ -4,7 +4,6 @@ import ( "bitbucket.org/sunybingcloud/elektron/def" "bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils" "bitbucket.org/sunybingcloud/elektron/utilities/offerUtils" - "bitbucket.org/sunybingcloud/elektron/utilities/schedUtils" "fmt" mesos "github.com/mesos/mesos-go/api/v0/mesosproto" sched "github.com/mesos/mesos-go/api/v0/scheduler" @@ -177,8 +176,7 @@ func (s *MaxGreedyMins) ConsumeOffers(spc SchedPolicyContext, driver sched.Sched if baseSchedRef.schedPolSwitchEnabled { // Need to recompute the schedWindow for the next offer cycle. // The next scheduling policy will schedule at max schedWindow number of tasks. - baseSchedRef.schedWindow = schedUtils. - SchedWindowResizingCritToStrategy[baseSchedRef.schedWindowResizeCrit].Apply( + baseSchedRef.curSchedWindow = baseSchedRef.schedWindowResStrategy.Apply( func() interface{} { return baseSchedRef.tasks }) // Switching to a random scheduling policy. // TODO: Switch based on some criteria. diff --git a/schedulers/MaxMin.go b/schedulers/MaxMin.go index ee1fda8..6cfa808 100644 --- a/schedulers/MaxMin.go +++ b/schedulers/MaxMin.go @@ -4,7 +4,6 @@ import ( "bitbucket.org/sunybingcloud/elektron/def" "bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils" "bitbucket.org/sunybingcloud/elektron/utilities/offerUtils" - "bitbucket.org/sunybingcloud/elektron/utilities/schedUtils" "fmt" mesos "github.com/mesos/mesos-go/api/v0/mesosproto" sched "github.com/mesos/mesos-go/api/v0/scheduler" @@ -171,8 +170,7 @@ func (s *MaxMin) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDri if baseSchedRef.schedPolSwitchEnabled { // Need to recompute the schedWindow for the next offer cycle. // The next scheduling policy will schedule at max schedWindow number of tasks. - baseSchedRef.schedWindow = schedUtils. - SchedWindowResizingCritToStrategy[baseSchedRef.schedWindowResizeCrit].Apply( + baseSchedRef.curSchedWindow = baseSchedRef.schedWindowResStrategy.Apply( func() interface{} { return baseSchedRef.tasks }) // Switching to a random scheduling policy. // TODO: Switch based on some criteria. diff --git a/schedulers/base.go b/schedulers/base.go index db2c849..3602f71 100644 --- a/schedulers/base.go +++ b/schedulers/base.go @@ -60,13 +60,13 @@ type BaseScheduler struct { // Size of window of tasks that can be scheduled in the next offer cycle. // The window size can be adjusted to make the most use of every resource offer. // By default, the schedulingWindow would correspond to all the remaining tasks that haven't yet been scheduled. - schedulingWindow int + schedulingWindow int - // Criteria to resize the schedulingWindow. - schedWindowResizeCrit schedUtils.SchedulingWindowResizingCriteria + // Strategy to resize the schedulingWindow. + schedWindowResStrategy schedUtils.SchedWindowResizingStrategy // Window of tasks that the current scheduling policy has to schedule. // Once #schedWindow tasks are scheduled, the current scheduling policy has to stop scheduling. - schedWindow int + curSchedWindow int } func (s *BaseScheduler) init(opts ...schedPolicyOption) { @@ -78,7 +78,7 @@ func (s *BaseScheduler) init(opts ...schedPolicyOption) { } s.running = make(map[string]map[string]bool) s.mutex = sync.Mutex{} - s.schedWindowResizeCrit = "fillNextOfferCycle" + s.schedWindowResStrategy = schedUtils.SchedWindowResizingCritToStrategy["fillNextOfferCycle"] } func (s *BaseScheduler) SwitchSchedPol(newSchedPol SchedPolicyState) { diff --git a/schedulers/bin-packing.go b/schedulers/bin-packing.go index f589f82..8ecbc98 100644 --- a/schedulers/bin-packing.go +++ b/schedulers/bin-packing.go @@ -4,7 +4,6 @@ import ( "bitbucket.org/sunybingcloud/elektron/def" "bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils" "bitbucket.org/sunybingcloud/elektron/utilities/offerUtils" - "bitbucket.org/sunybingcloud/elektron/utilities/schedUtils" "fmt" mesos "github.com/mesos/mesos-go/api/v0/mesosproto" sched "github.com/mesos/mesos-go/api/v0/scheduler" @@ -121,8 +120,7 @@ func (s *BinPackSortedWatts) ConsumeOffers(spc SchedPolicyContext, driver sched. if baseSchedRef.schedPolSwitchEnabled { // Need to recompute the schedWindow for the next offer cycle. // The next scheduling policy will schedule at max schedWindow number of tasks. - baseSchedRef.schedWindow = schedUtils. - SchedWindowResizingCritToStrategy[baseSchedRef.schedWindowResizeCrit].Apply( + baseSchedRef.curSchedWindow = baseSchedRef.schedWindowResStrategy.Apply( func() interface{} { return baseSchedRef.tasks }) // Switching to a random scheduling policy. // TODO: Switch based on some criteria. diff --git a/schedulers/first-fit.go b/schedulers/first-fit.go index c0d1235..ab736ee 100644 --- a/schedulers/first-fit.go +++ b/schedulers/first-fit.go @@ -4,7 +4,6 @@ import ( "bitbucket.org/sunybingcloud/elektron/def" "bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils" "bitbucket.org/sunybingcloud/elektron/utilities/offerUtils" - "bitbucket.org/sunybingcloud/elektron/utilities/schedUtils" "fmt" mesos "github.com/mesos/mesos-go/api/v0/mesosproto" sched "github.com/mesos/mesos-go/api/v0/scheduler" @@ -106,8 +105,7 @@ func (s *FirstFit) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerD if baseSchedRef.schedPolSwitchEnabled { // Need to recompute the schedWindow for the next offer cycle. // The next scheduling policy will schedule at max schedWindow number of tasks. - baseSchedRef.schedWindow = schedUtils. - SchedWindowResizingCritToStrategy[baseSchedRef.schedWindowResizeCrit].Apply( + baseSchedRef.curSchedWindow = baseSchedRef.schedWindowResStrategy.Apply( func() interface{} { return baseSchedRef.tasks }) // Switching to a random scheduling policy. // TODO: Switch based on some criteria. diff --git a/utilities/schedUtils/schedUtils.go b/utilities/schedUtils/schedUtils.go index 6c57020..496c4f7 100644 --- a/utilities/schedUtils/schedUtils.go +++ b/utilities/schedUtils/schedUtils.go @@ -3,24 +3,25 @@ package schedUtils import ( "bitbucket.org/sunybingcloud/elektron/def" "bitbucket.org/sunybingcloud/elektron/utilities" + "log" ) // Criteria for resizing the scheduling window. type SchedulingWindowResizingCriteria string -var SchedWindowResizingCritToStrategy = map[SchedulingWindowResizingCriteria]schedWindowResizingStrategy{ +var SchedWindowResizingCritToStrategy = map[SchedulingWindowResizingCriteria]SchedWindowResizingStrategy{ "fillNextOfferCycle": &fillNextOfferCycle{}, } // Interface for a scheduling window resizing strategy. -type schedWindowResizingStrategy interface { +type SchedWindowResizingStrategy interface { // Apply the window resizing strategy and return the news scheduling window size. Apply(func() interface{}) int } // Scheduling window resizing strategy that attempts to resize the scheduling window // to include as many tasks as possible so as to make the most use of the next offer cycle. -type fillNextOfferCycle struct{} +type fillNextOfferCycle struct {} func (s *fillNextOfferCycle) Apply(getArgs func() interface{}) int { return s.apply(getArgs().([]def.Task)) @@ -47,17 +48,23 @@ func (s *fillNextOfferCycle) apply(taskQueue []def.Task) int { return false } + done := false for _, task := range taskQueue { for i := *task.Instances; i > 0; i-- { + log.Printf("Checking if Instance #%d of Task[%s] can be scheduled "+ + "during the next offer cycle...", i, task.Name) if canSchedule(task) { filledCPU += task.CPU filledRAM += task.RAM newSchedWindow++ } else { + done = true break } } - break + if done { + break + } } return newSchedWindow }