From f041e6668bf0cc4c5d9826a6d719752c65449bda Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Tue, 6 Feb 2018 16:50:16 -0500 Subject: [PATCH] added a baseSchedPolicyState struct in schedulers/schedPolicy.go. This struct would store information common to scheduling policies. Added member called numTasksScheduled to baseSchedPolicyState that would keep count of the number of tasks that the current scheduling policy has scheduled. Moved the logic to switch (currently performing a random switch) to baseSchedPolicyState#switchIfNecessary(...) and retrofitted all scheduling policies to call this instead of inlining the code in each of them. --- schedulers/MaxGreedyMins.go | 24 +++--------------------- schedulers/MaxMin.go | 24 +++--------------------- schedulers/base.go | 17 +++++++++++++++++ schedulers/bin-packing.go | 24 +++--------------------- schedulers/first-fit.go | 25 ++++--------------------- schedulers/schedPolicy.go | 33 +++++++++++++++++++++++++++++++++ 6 files changed, 63 insertions(+), 84 deletions(-) diff --git a/schedulers/MaxGreedyMins.go b/schedulers/MaxGreedyMins.go index 5d9ae08..5f4cb6b 100644 --- a/schedulers/MaxGreedyMins.go +++ b/schedulers/MaxGreedyMins.go @@ -4,11 +4,9 @@ import ( "bitbucket.org/sunybingcloud/elektron/def" "bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils" "bitbucket.org/sunybingcloud/elektron/utilities/offerUtils" - "fmt" mesos "github.com/mesos/mesos-go/api/v0/mesosproto" sched "github.com/mesos/mesos-go/api/v0/scheduler" "log" - "math/rand" ) // Decides if to take an offer or not @@ -32,7 +30,7 @@ func (s *MaxGreedyMins) takeOffer(spc SchedPolicyContext, offer *mesos.Offer, ta } type MaxGreedyMins struct { - SchedPolicyState + baseSchedPolicyState } // Determine if the remaining space inside of the offer is enough for this @@ -78,7 +76,7 @@ func (s *MaxGreedyMins) CheckFit( } func (s *MaxGreedyMins) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDriver, offers []*mesos.Offer) { - fmt.Println("Max-GreedyMins scheduling...") + log.Println("Max-GreedyMins scheduling...") baseSchedRef := spc.(*BaseScheduler) def.SortTasks(baseSchedRef.tasks, def.SortByWatts) baseSchedRef.LogOffersReceived(offers) @@ -172,21 +170,5 @@ func (s *MaxGreedyMins) ConsumeOffers(spc SchedPolicyContext, driver sched.Sched } } - // Switch scheduling policy only if feature enabled from CLI - if baseSchedRef.schedPolSwitchEnabled { - // Need to recompute the schedWindow for the next offer cycle. - // The next scheduling policy will schedule at max schedWindow number of tasks. - baseSchedRef.curSchedWindow = baseSchedRef.schedWindowResStrategy.Apply( - func() interface{} { return baseSchedRef.tasks }) - // Switching to a random scheduling policy. - // TODO: Switch based on some criteria. - index := rand.Intn(len(SchedPolicies)) - for _, v := range SchedPolicies { - if index == 0 { - spc.SwitchSchedPol(v) - break - } - index-- - } - } + s.switchIfNecessary(spc) } diff --git a/schedulers/MaxMin.go b/schedulers/MaxMin.go index 6cfa808..aac25f5 100644 --- a/schedulers/MaxMin.go +++ b/schedulers/MaxMin.go @@ -4,11 +4,9 @@ import ( "bitbucket.org/sunybingcloud/elektron/def" "bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils" "bitbucket.org/sunybingcloud/elektron/utilities/offerUtils" - "fmt" mesos "github.com/mesos/mesos-go/api/v0/mesosproto" sched "github.com/mesos/mesos-go/api/v0/scheduler" "log" - "math/rand" ) // Decides if to take an offer or not @@ -32,7 +30,7 @@ func (s *MaxMin) takeOffer(spc SchedPolicyContext, offer *mesos.Offer, task def. } type MaxMin struct { - SchedPolicyState + baseSchedPolicyState } // Determine if the remaining space inside of the offer is enough for this @@ -77,7 +75,7 @@ func (s *MaxMin) CheckFit( } func (s *MaxMin) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDriver, offers []*mesos.Offer) { - fmt.Println("Max-Min scheduling...") + log.Println("Max-Min scheduling...") baseSchedRef := spc.(*BaseScheduler) def.SortTasks(baseSchedRef.tasks, def.SortByWatts) baseSchedRef.LogOffersReceived(offers) @@ -166,21 +164,5 @@ func (s *MaxMin) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDri } } - // Switch scheduling policy only if feature enabled from CLI - if baseSchedRef.schedPolSwitchEnabled { - // Need to recompute the schedWindow for the next offer cycle. - // The next scheduling policy will schedule at max schedWindow number of tasks. - baseSchedRef.curSchedWindow = baseSchedRef.schedWindowResStrategy.Apply( - func() interface{} { return baseSchedRef.tasks }) - // Switching to a random scheduling policy. - // TODO: Switch based on some criteria. - index := rand.Intn(len(SchedPolicies)) - for _, v := range SchedPolicies { - if index == 0 { - spc.SwitchSchedPol(v) - break - } - index-- - } - } + s.switchIfNecessary(spc) } diff --git a/schedulers/base.go b/schedulers/base.go index f7ab082..4284fb4 100644 --- a/schedulers/base.go +++ b/schedulers/base.go @@ -68,6 +68,9 @@ type BaseScheduler struct { // Window of tasks that the current scheduling policy has to schedule. // Once #schedWindow tasks are scheduled, the current scheduling policy has to stop scheduling. curSchedWindow int + + // Indicate whether the any resource offers from mesos have been received. + hasReceivedResourceOffers bool } func (s *BaseScheduler) init(opts ...schedPolicyOption) { @@ -83,6 +86,8 @@ func (s *BaseScheduler) init(opts ...schedPolicyOption) { s.HostNameToSlaveID = make(map[string]string) s.mutex = sync.Mutex{} s.schedWindowResStrategy = schedUtils.SchedWindowResizingCritToStrategy["fillNextOfferCycle"] + // Initially no resource offers would have been received. + s.hasReceivedResourceOffers = false } func (s *BaseScheduler) SwitchSchedPol(newSchedPol SchedPolicyState) { @@ -187,7 +192,19 @@ func (s *BaseScheduler) ResourceOffers(driver sched.SchedulerDriver, offers []*m s.HostNameToSlaveID[offer.GetHostname()] = offer.GetSlaveId().GoString() } } + // If no resource offers have been received yet, and if scheduling policy switching has been enabled, + // then we would need to compute the scheduling window for the current scheduling policy. + // Initially the size of the scheduling window is 0. So, based on the total available resources on the cluster, + // the scheduling window is determined and the scheduling policy is then applied for the corresponding number + // of tasks. + // Subsequently, the scheduling window is determined at the end of each offer cycle. + if !s.hasReceivedResourceOffers && s.schedPolSwitchEnabled { + s.curSchedWindow = s.schedWindowResStrategy.Apply(func() interface{} { + return s.tasks + }) + } s.curSchedPolicy.ConsumeOffers(s, driver, offers) + s.hasReceivedResourceOffers = true } func (s *BaseScheduler) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { diff --git a/schedulers/bin-packing.go b/schedulers/bin-packing.go index 8ecbc98..fbaed3f 100644 --- a/schedulers/bin-packing.go +++ b/schedulers/bin-packing.go @@ -4,11 +4,9 @@ import ( "bitbucket.org/sunybingcloud/elektron/def" "bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils" "bitbucket.org/sunybingcloud/elektron/utilities/offerUtils" - "fmt" mesos "github.com/mesos/mesos-go/api/v0/mesosproto" sched "github.com/mesos/mesos-go/api/v0/scheduler" "log" - "math/rand" ) // Decides if to take an offer or not @@ -32,11 +30,11 @@ func (s *BinPackSortedWatts) takeOffer(spc SchedPolicyContext, offer *mesos.Offe } type BinPackSortedWatts struct { - SchedPolicyState + baseSchedPolicyState } func (s *BinPackSortedWatts) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDriver, offers []*mesos.Offer) { - fmt.Println("BPSW scheduling...") + log.Println("BPSW scheduling...") baseSchedRef := spc.(*BaseScheduler) def.SortTasks(baseSchedRef.tasks, def.SortByWatts) baseSchedRef.LogOffersReceived(offers) @@ -116,21 +114,5 @@ func (s *BinPackSortedWatts) ConsumeOffers(spc SchedPolicyContext, driver sched. } } - // Switch scheduling policy only if feature enabled from CLI - if baseSchedRef.schedPolSwitchEnabled { - // Need to recompute the schedWindow for the next offer cycle. - // The next scheduling policy will schedule at max schedWindow number of tasks. - baseSchedRef.curSchedWindow = baseSchedRef.schedWindowResStrategy.Apply( - func() interface{} { return baseSchedRef.tasks }) - // Switching to a random scheduling policy. - // TODO: Switch based on some criteria. - index := rand.Intn(len(SchedPolicies)) - for _, v := range SchedPolicies { - if index == 0 { - spc.SwitchSchedPol(v) - break - } - index-- - } - } + s.switchIfNecessary(spc) } diff --git a/schedulers/first-fit.go b/schedulers/first-fit.go index ab736ee..c1ac810 100644 --- a/schedulers/first-fit.go +++ b/schedulers/first-fit.go @@ -4,10 +4,9 @@ import ( "bitbucket.org/sunybingcloud/elektron/def" "bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils" "bitbucket.org/sunybingcloud/elektron/utilities/offerUtils" - "fmt" mesos "github.com/mesos/mesos-go/api/v0/mesosproto" sched "github.com/mesos/mesos-go/api/v0/scheduler" - "math/rand" + "log" ) // Decides if to take an offer or not @@ -31,11 +30,11 @@ func (s *FirstFit) takeOffer(spc SchedPolicyContext, offer *mesos.Offer, task de // Elektron scheduler implements the Scheduler interface. type FirstFit struct { - SchedPolicyState + baseSchedPolicyState } func (s *FirstFit) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDriver, offers []*mesos.Offer) { - fmt.Println("FirstFit scheduling...") + log.Println("FirstFit scheduling...") baseSchedRef := spc.(*BaseScheduler) baseSchedRef.LogOffersReceived(offers) @@ -101,21 +100,5 @@ func (s *FirstFit) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerD } } - // Switch scheduling policy only if feature enabled from CLI - if baseSchedRef.schedPolSwitchEnabled { - // Need to recompute the schedWindow for the next offer cycle. - // The next scheduling policy will schedule at max schedWindow number of tasks. - baseSchedRef.curSchedWindow = baseSchedRef.schedWindowResStrategy.Apply( - func() interface{} { return baseSchedRef.tasks }) - // Switching to a random scheduling policy. - // TODO: Switch based on some criteria. - index := rand.Intn(len(SchedPolicies)) - for _, v := range SchedPolicies { - if index == 0 { - spc.SwitchSchedPol(v) - break - } - index-- - } - } + s.switchIfNecessary(spc) } diff --git a/schedulers/schedPolicy.go b/schedulers/schedPolicy.go index dfb4863..50ada80 100644 --- a/schedulers/schedPolicy.go +++ b/schedulers/schedPolicy.go @@ -3,6 +3,7 @@ package schedulers import ( mesos "github.com/mesos/mesos-go/api/v0/mesosproto" sched "github.com/mesos/mesos-go/api/v0/scheduler" + "math/rand" ) type SchedPolicyContext interface { @@ -14,3 +15,35 @@ type SchedPolicyState interface { // Define the particular scheduling policy's methodology of resource offer consumption. ConsumeOffers(SchedPolicyContext, sched.SchedulerDriver, []*mesos.Offer) } + +type baseSchedPolicyState struct { + SchedPolicyState + // Keep track of the number of tasks that have been scheduled. + numTasksScheduled int +} + +func (bsps *baseSchedPolicyState) switchIfNecessary(spc SchedPolicyContext) { + baseSchedRef := spc.(*BaseScheduler) + // Switch scheduling policy only if feature enabled from CLI + if baseSchedRef.schedPolSwitchEnabled { + // Need to recompute schedulWindow for the next offer cycle. + // The next scheduling policy will schedule at max schedWindow number of tasks. + baseSchedRef.curSchedWindow = baseSchedRef.schedWindowResStrategy.Apply( + func() interface{} { return baseSchedRef.tasks }) + // Switching to a random scheduling policy. + // TODO: Switch based on some criteria. + index := rand.Intn(len(SchedPolicies)) + for _, v := range SchedPolicies { + if index == 0 { + spc.SwitchSchedPol(v) + // If switched to a different scheduling policy, + // then setting the numberTasksScheduled to 0. + if v != bsps { + bsps.numTasksScheduled = 0 + } + break + } + index-- + } + } +}