diff --git a/schedulers/MaxGreedyMins.go b/schedulers/MaxGreedyMins.go index 4ac046a..5d9ae08 100644 --- a/schedulers/MaxGreedyMins.go +++ b/schedulers/MaxGreedyMins.go @@ -4,7 +4,6 @@ import ( "bitbucket.org/sunybingcloud/elektron/def" "bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils" "bitbucket.org/sunybingcloud/elektron/utilities/offerUtils" - "bitbucket.org/sunybingcloud/elektron/utilities/schedUtils" "fmt" mesos "github.com/mesos/mesos-go/api/v0/mesosproto" sched "github.com/mesos/mesos-go/api/v0/scheduler" @@ -177,8 +176,7 @@ func (s *MaxGreedyMins) ConsumeOffers(spc SchedPolicyContext, driver sched.Sched if baseSchedRef.schedPolSwitchEnabled { // Need to recompute the schedWindow for the next offer cycle. // The next scheduling policy will schedule at max schedWindow number of tasks. - baseSchedRef.schedWindow = schedUtils. - SchedWindowResizingCritToStrategy[baseSchedRef.schedWindowResizeCrit].Apply( + baseSchedRef.curSchedWindow = baseSchedRef.schedWindowResStrategy.Apply( func() interface{} { return baseSchedRef.tasks }) // Switching to a random scheduling policy. // TODO: Switch based on some criteria. diff --git a/schedulers/MaxMin.go b/schedulers/MaxMin.go index ee1fda8..6cfa808 100644 --- a/schedulers/MaxMin.go +++ b/schedulers/MaxMin.go @@ -4,7 +4,6 @@ import ( "bitbucket.org/sunybingcloud/elektron/def" "bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils" "bitbucket.org/sunybingcloud/elektron/utilities/offerUtils" - "bitbucket.org/sunybingcloud/elektron/utilities/schedUtils" "fmt" mesos "github.com/mesos/mesos-go/api/v0/mesosproto" sched "github.com/mesos/mesos-go/api/v0/scheduler" @@ -171,8 +170,7 @@ func (s *MaxMin) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDri if baseSchedRef.schedPolSwitchEnabled { // Need to recompute the schedWindow for the next offer cycle. // The next scheduling policy will schedule at max schedWindow number of tasks. - baseSchedRef.schedWindow = schedUtils. - SchedWindowResizingCritToStrategy[baseSchedRef.schedWindowResizeCrit].Apply( + baseSchedRef.curSchedWindow = baseSchedRef.schedWindowResStrategy.Apply( func() interface{} { return baseSchedRef.tasks }) // Switching to a random scheduling policy. // TODO: Switch based on some criteria. diff --git a/schedulers/base.go b/schedulers/base.go index db2c849..3602f71 100644 --- a/schedulers/base.go +++ b/schedulers/base.go @@ -60,13 +60,13 @@ type BaseScheduler struct { // Size of window of tasks that can be scheduled in the next offer cycle. // The window size can be adjusted to make the most use of every resource offer. // By default, the schedulingWindow would correspond to all the remaining tasks that haven't yet been scheduled. - schedulingWindow int + schedulingWindow int - // Criteria to resize the schedulingWindow. - schedWindowResizeCrit schedUtils.SchedulingWindowResizingCriteria + // Strategy to resize the schedulingWindow. + schedWindowResStrategy schedUtils.SchedWindowResizingStrategy // Window of tasks that the current scheduling policy has to schedule. // Once #schedWindow tasks are scheduled, the current scheduling policy has to stop scheduling. - schedWindow int + curSchedWindow int } func (s *BaseScheduler) init(opts ...schedPolicyOption) { @@ -78,7 +78,7 @@ func (s *BaseScheduler) init(opts ...schedPolicyOption) { } s.running = make(map[string]map[string]bool) s.mutex = sync.Mutex{} - s.schedWindowResizeCrit = "fillNextOfferCycle" + s.schedWindowResStrategy = schedUtils.SchedWindowResizingCritToStrategy["fillNextOfferCycle"] } func (s *BaseScheduler) SwitchSchedPol(newSchedPol SchedPolicyState) { diff --git a/schedulers/bin-packing.go b/schedulers/bin-packing.go index f589f82..8ecbc98 100644 --- a/schedulers/bin-packing.go +++ b/schedulers/bin-packing.go @@ -4,7 +4,6 @@ import ( "bitbucket.org/sunybingcloud/elektron/def" "bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils" "bitbucket.org/sunybingcloud/elektron/utilities/offerUtils" - "bitbucket.org/sunybingcloud/elektron/utilities/schedUtils" "fmt" mesos "github.com/mesos/mesos-go/api/v0/mesosproto" sched "github.com/mesos/mesos-go/api/v0/scheduler" @@ -121,8 +120,7 @@ func (s *BinPackSortedWatts) ConsumeOffers(spc SchedPolicyContext, driver sched. if baseSchedRef.schedPolSwitchEnabled { // Need to recompute the schedWindow for the next offer cycle. // The next scheduling policy will schedule at max schedWindow number of tasks. - baseSchedRef.schedWindow = schedUtils. - SchedWindowResizingCritToStrategy[baseSchedRef.schedWindowResizeCrit].Apply( + baseSchedRef.curSchedWindow = baseSchedRef.schedWindowResStrategy.Apply( func() interface{} { return baseSchedRef.tasks }) // Switching to a random scheduling policy. // TODO: Switch based on some criteria. diff --git a/schedulers/first-fit.go b/schedulers/first-fit.go index c0d1235..ab736ee 100644 --- a/schedulers/first-fit.go +++ b/schedulers/first-fit.go @@ -4,7 +4,6 @@ import ( "bitbucket.org/sunybingcloud/elektron/def" "bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils" "bitbucket.org/sunybingcloud/elektron/utilities/offerUtils" - "bitbucket.org/sunybingcloud/elektron/utilities/schedUtils" "fmt" mesos "github.com/mesos/mesos-go/api/v0/mesosproto" sched "github.com/mesos/mesos-go/api/v0/scheduler" @@ -106,8 +105,7 @@ func (s *FirstFit) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerD if baseSchedRef.schedPolSwitchEnabled { // Need to recompute the schedWindow for the next offer cycle. // The next scheduling policy will schedule at max schedWindow number of tasks. - baseSchedRef.schedWindow = schedUtils. - SchedWindowResizingCritToStrategy[baseSchedRef.schedWindowResizeCrit].Apply( + baseSchedRef.curSchedWindow = baseSchedRef.schedWindowResStrategy.Apply( func() interface{} { return baseSchedRef.tasks }) // Switching to a random scheduling policy. // TODO: Switch based on some criteria. diff --git a/utilities/schedUtils/schedUtils.go b/utilities/schedUtils/schedUtils.go index 6c57020..496c4f7 100644 --- a/utilities/schedUtils/schedUtils.go +++ b/utilities/schedUtils/schedUtils.go @@ -3,24 +3,25 @@ package schedUtils import ( "bitbucket.org/sunybingcloud/elektron/def" "bitbucket.org/sunybingcloud/elektron/utilities" + "log" ) // Criteria for resizing the scheduling window. type SchedulingWindowResizingCriteria string -var SchedWindowResizingCritToStrategy = map[SchedulingWindowResizingCriteria]schedWindowResizingStrategy{ +var SchedWindowResizingCritToStrategy = map[SchedulingWindowResizingCriteria]SchedWindowResizingStrategy{ "fillNextOfferCycle": &fillNextOfferCycle{}, } // Interface for a scheduling window resizing strategy. -type schedWindowResizingStrategy interface { +type SchedWindowResizingStrategy interface { // Apply the window resizing strategy and return the news scheduling window size. Apply(func() interface{}) int } // Scheduling window resizing strategy that attempts to resize the scheduling window // to include as many tasks as possible so as to make the most use of the next offer cycle. -type fillNextOfferCycle struct{} +type fillNextOfferCycle struct {} func (s *fillNextOfferCycle) Apply(getArgs func() interface{}) int { return s.apply(getArgs().([]def.Task)) @@ -47,17 +48,23 @@ func (s *fillNextOfferCycle) apply(taskQueue []def.Task) int { return false } + done := false for _, task := range taskQueue { for i := *task.Instances; i > 0; i-- { + log.Printf("Checking if Instance #%d of Task[%s] can be scheduled "+ + "during the next offer cycle...", i, task.Name) if canSchedule(task) { filledCPU += task.CPU filledRAM += task.RAM newSchedWindow++ } else { + done = true break } } - break + if done { + break + } } return newSchedWindow }