diff --git a/schedulers/MaxGreedyMins.go b/schedulers/MaxGreedyMins.go index 5d9ae08..5f4cb6b 100644 --- a/schedulers/MaxGreedyMins.go +++ b/schedulers/MaxGreedyMins.go @@ -4,11 +4,9 @@ import ( "bitbucket.org/sunybingcloud/elektron/def" "bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils" "bitbucket.org/sunybingcloud/elektron/utilities/offerUtils" - "fmt" mesos "github.com/mesos/mesos-go/api/v0/mesosproto" sched "github.com/mesos/mesos-go/api/v0/scheduler" "log" - "math/rand" ) // Decides if to take an offer or not @@ -32,7 +30,7 @@ func (s *MaxGreedyMins) takeOffer(spc SchedPolicyContext, offer *mesos.Offer, ta } type MaxGreedyMins struct { - SchedPolicyState + baseSchedPolicyState } // Determine if the remaining space inside of the offer is enough for this @@ -78,7 +76,7 @@ func (s *MaxGreedyMins) CheckFit( } func (s *MaxGreedyMins) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDriver, offers []*mesos.Offer) { - fmt.Println("Max-GreedyMins scheduling...") + log.Println("Max-GreedyMins scheduling...") baseSchedRef := spc.(*BaseScheduler) def.SortTasks(baseSchedRef.tasks, def.SortByWatts) baseSchedRef.LogOffersReceived(offers) @@ -172,21 +170,5 @@ func (s *MaxGreedyMins) ConsumeOffers(spc SchedPolicyContext, driver sched.Sched } } - // Switch scheduling policy only if feature enabled from CLI - if baseSchedRef.schedPolSwitchEnabled { - // Need to recompute the schedWindow for the next offer cycle. - // The next scheduling policy will schedule at max schedWindow number of tasks. - baseSchedRef.curSchedWindow = baseSchedRef.schedWindowResStrategy.Apply( - func() interface{} { return baseSchedRef.tasks }) - // Switching to a random scheduling policy. - // TODO: Switch based on some criteria. - index := rand.Intn(len(SchedPolicies)) - for _, v := range SchedPolicies { - if index == 0 { - spc.SwitchSchedPol(v) - break - } - index-- - } - } + s.switchIfNecessary(spc) } diff --git a/schedulers/MaxMin.go b/schedulers/MaxMin.go index 6cfa808..aac25f5 100644 --- a/schedulers/MaxMin.go +++ b/schedulers/MaxMin.go @@ -4,11 +4,9 @@ import ( "bitbucket.org/sunybingcloud/elektron/def" "bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils" "bitbucket.org/sunybingcloud/elektron/utilities/offerUtils" - "fmt" mesos "github.com/mesos/mesos-go/api/v0/mesosproto" sched "github.com/mesos/mesos-go/api/v0/scheduler" "log" - "math/rand" ) // Decides if to take an offer or not @@ -32,7 +30,7 @@ func (s *MaxMin) takeOffer(spc SchedPolicyContext, offer *mesos.Offer, task def. } type MaxMin struct { - SchedPolicyState + baseSchedPolicyState } // Determine if the remaining space inside of the offer is enough for this @@ -77,7 +75,7 @@ func (s *MaxMin) CheckFit( } func (s *MaxMin) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDriver, offers []*mesos.Offer) { - fmt.Println("Max-Min scheduling...") + log.Println("Max-Min scheduling...") baseSchedRef := spc.(*BaseScheduler) def.SortTasks(baseSchedRef.tasks, def.SortByWatts) baseSchedRef.LogOffersReceived(offers) @@ -166,21 +164,5 @@ func (s *MaxMin) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDri } } - // Switch scheduling policy only if feature enabled from CLI - if baseSchedRef.schedPolSwitchEnabled { - // Need to recompute the schedWindow for the next offer cycle. - // The next scheduling policy will schedule at max schedWindow number of tasks. - baseSchedRef.curSchedWindow = baseSchedRef.schedWindowResStrategy.Apply( - func() interface{} { return baseSchedRef.tasks }) - // Switching to a random scheduling policy. - // TODO: Switch based on some criteria. - index := rand.Intn(len(SchedPolicies)) - for _, v := range SchedPolicies { - if index == 0 { - spc.SwitchSchedPol(v) - break - } - index-- - } - } + s.switchIfNecessary(spc) } diff --git a/schedulers/base.go b/schedulers/base.go index f7ab082..4284fb4 100644 --- a/schedulers/base.go +++ b/schedulers/base.go @@ -68,6 +68,9 @@ type BaseScheduler struct { // Window of tasks that the current scheduling policy has to schedule. // Once #schedWindow tasks are scheduled, the current scheduling policy has to stop scheduling. curSchedWindow int + + // Indicate whether the any resource offers from mesos have been received. + hasReceivedResourceOffers bool } func (s *BaseScheduler) init(opts ...schedPolicyOption) { @@ -83,6 +86,8 @@ func (s *BaseScheduler) init(opts ...schedPolicyOption) { s.HostNameToSlaveID = make(map[string]string) s.mutex = sync.Mutex{} s.schedWindowResStrategy = schedUtils.SchedWindowResizingCritToStrategy["fillNextOfferCycle"] + // Initially no resource offers would have been received. + s.hasReceivedResourceOffers = false } func (s *BaseScheduler) SwitchSchedPol(newSchedPol SchedPolicyState) { @@ -187,7 +192,19 @@ func (s *BaseScheduler) ResourceOffers(driver sched.SchedulerDriver, offers []*m s.HostNameToSlaveID[offer.GetHostname()] = offer.GetSlaveId().GoString() } } + // If no resource offers have been received yet, and if scheduling policy switching has been enabled, + // then we would need to compute the scheduling window for the current scheduling policy. + // Initially the size of the scheduling window is 0. So, based on the total available resources on the cluster, + // the scheduling window is determined and the scheduling policy is then applied for the corresponding number + // of tasks. + // Subsequently, the scheduling window is determined at the end of each offer cycle. + if !s.hasReceivedResourceOffers && s.schedPolSwitchEnabled { + s.curSchedWindow = s.schedWindowResStrategy.Apply(func() interface{} { + return s.tasks + }) + } s.curSchedPolicy.ConsumeOffers(s, driver, offers) + s.hasReceivedResourceOffers = true } func (s *BaseScheduler) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { diff --git a/schedulers/bin-packing.go b/schedulers/bin-packing.go index 8ecbc98..fbaed3f 100644 --- a/schedulers/bin-packing.go +++ b/schedulers/bin-packing.go @@ -4,11 +4,9 @@ import ( "bitbucket.org/sunybingcloud/elektron/def" "bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils" "bitbucket.org/sunybingcloud/elektron/utilities/offerUtils" - "fmt" mesos "github.com/mesos/mesos-go/api/v0/mesosproto" sched "github.com/mesos/mesos-go/api/v0/scheduler" "log" - "math/rand" ) // Decides if to take an offer or not @@ -32,11 +30,11 @@ func (s *BinPackSortedWatts) takeOffer(spc SchedPolicyContext, offer *mesos.Offe } type BinPackSortedWatts struct { - SchedPolicyState + baseSchedPolicyState } func (s *BinPackSortedWatts) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDriver, offers []*mesos.Offer) { - fmt.Println("BPSW scheduling...") + log.Println("BPSW scheduling...") baseSchedRef := spc.(*BaseScheduler) def.SortTasks(baseSchedRef.tasks, def.SortByWatts) baseSchedRef.LogOffersReceived(offers) @@ -116,21 +114,5 @@ func (s *BinPackSortedWatts) ConsumeOffers(spc SchedPolicyContext, driver sched. } } - // Switch scheduling policy only if feature enabled from CLI - if baseSchedRef.schedPolSwitchEnabled { - // Need to recompute the schedWindow for the next offer cycle. - // The next scheduling policy will schedule at max schedWindow number of tasks. - baseSchedRef.curSchedWindow = baseSchedRef.schedWindowResStrategy.Apply( - func() interface{} { return baseSchedRef.tasks }) - // Switching to a random scheduling policy. - // TODO: Switch based on some criteria. - index := rand.Intn(len(SchedPolicies)) - for _, v := range SchedPolicies { - if index == 0 { - spc.SwitchSchedPol(v) - break - } - index-- - } - } + s.switchIfNecessary(spc) } diff --git a/schedulers/first-fit.go b/schedulers/first-fit.go index ab736ee..c1ac810 100644 --- a/schedulers/first-fit.go +++ b/schedulers/first-fit.go @@ -4,10 +4,9 @@ import ( "bitbucket.org/sunybingcloud/elektron/def" "bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils" "bitbucket.org/sunybingcloud/elektron/utilities/offerUtils" - "fmt" mesos "github.com/mesos/mesos-go/api/v0/mesosproto" sched "github.com/mesos/mesos-go/api/v0/scheduler" - "math/rand" + "log" ) // Decides if to take an offer or not @@ -31,11 +30,11 @@ func (s *FirstFit) takeOffer(spc SchedPolicyContext, offer *mesos.Offer, task de // Elektron scheduler implements the Scheduler interface. type FirstFit struct { - SchedPolicyState + baseSchedPolicyState } func (s *FirstFit) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDriver, offers []*mesos.Offer) { - fmt.Println("FirstFit scheduling...") + log.Println("FirstFit scheduling...") baseSchedRef := spc.(*BaseScheduler) baseSchedRef.LogOffersReceived(offers) @@ -101,21 +100,5 @@ func (s *FirstFit) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerD } } - // Switch scheduling policy only if feature enabled from CLI - if baseSchedRef.schedPolSwitchEnabled { - // Need to recompute the schedWindow for the next offer cycle. - // The next scheduling policy will schedule at max schedWindow number of tasks. - baseSchedRef.curSchedWindow = baseSchedRef.schedWindowResStrategy.Apply( - func() interface{} { return baseSchedRef.tasks }) - // Switching to a random scheduling policy. - // TODO: Switch based on some criteria. - index := rand.Intn(len(SchedPolicies)) - for _, v := range SchedPolicies { - if index == 0 { - spc.SwitchSchedPol(v) - break - } - index-- - } - } + s.switchIfNecessary(spc) } diff --git a/schedulers/schedPolicy.go b/schedulers/schedPolicy.go index dfb4863..50ada80 100644 --- a/schedulers/schedPolicy.go +++ b/schedulers/schedPolicy.go @@ -3,6 +3,7 @@ package schedulers import ( mesos "github.com/mesos/mesos-go/api/v0/mesosproto" sched "github.com/mesos/mesos-go/api/v0/scheduler" + "math/rand" ) type SchedPolicyContext interface { @@ -14,3 +15,35 @@ type SchedPolicyState interface { // Define the particular scheduling policy's methodology of resource offer consumption. ConsumeOffers(SchedPolicyContext, sched.SchedulerDriver, []*mesos.Offer) } + +type baseSchedPolicyState struct { + SchedPolicyState + // Keep track of the number of tasks that have been scheduled. + numTasksScheduled int +} + +func (bsps *baseSchedPolicyState) switchIfNecessary(spc SchedPolicyContext) { + baseSchedRef := spc.(*BaseScheduler) + // Switch scheduling policy only if feature enabled from CLI + if baseSchedRef.schedPolSwitchEnabled { + // Need to recompute schedulWindow for the next offer cycle. + // The next scheduling policy will schedule at max schedWindow number of tasks. + baseSchedRef.curSchedWindow = baseSchedRef.schedWindowResStrategy.Apply( + func() interface{} { return baseSchedRef.tasks }) + // Switching to a random scheduling policy. + // TODO: Switch based on some criteria. + index := rand.Intn(len(SchedPolicies)) + for _, v := range SchedPolicies { + if index == 0 { + spc.SwitchSchedPol(v) + // If switched to a different scheduling policy, + // then setting the numberTasksScheduled to 0. + if v != bsps { + bsps.numTasksScheduled = 0 + } + break + } + index-- + } + } +}