diff --git a/def/sortingCriteria.go b/def/sortingCriteria.go index d34090e..2d0426b 100644 --- a/def/sortingCriteria.go +++ b/def/sortingCriteria.go @@ -1,7 +1,7 @@ package def -// The sortBy function that takes a task reference and returns the resource to consider when sorting. -type sortBy func(task *Task) float64 +// the sortBy function that takes a task reference and returns the resource to consider when sorting. +type SortBy func(task *Task) float64 // Possible Sorting Criteria. // Each holds a closure that fetches the required resource from the diff --git a/def/taskUtils.go b/def/taskUtils.go index 8704ef5..71ae80b 100644 --- a/def/taskUtils.go +++ b/def/taskUtils.go @@ -119,7 +119,7 @@ func labelAndOrder(clusters map[int][]Task, numberOfClusters int, taskObservatio // Generic Task Sorter. // Be able to sort an array of tasks based on any of the tasks' resources. -func SortTasks(ts []Task, sb sortBy) { +func SortTasks(ts []Task, sb SortBy) { sort.SliceStable(ts, func(i, j int) bool { return sb(&ts[i]) <= sb(&ts[j]) }) diff --git a/schedulers/MaxGreedyMins.go b/schedulers/MaxGreedyMins.go index b7c03c6..aeef812 100644 --- a/schedulers/MaxGreedyMins.go +++ b/schedulers/MaxGreedyMins.go @@ -7,7 +7,6 @@ import ( mesos "github.com/mesos/mesos-go/api/v0/mesosproto" sched "github.com/mesos/mesos-go/api/v0/scheduler" "log" - "math/rand" ) // Decides if to take an offer or not @@ -59,6 +58,7 @@ func (s *MaxGreedyMins) CheckFit( baseSchedRef.LogSchedTrace(taskToSchedule, offer) *task.Instances-- + s.numTasksScheduled++ if *task.Instances <= 0 { // All instances of task have been scheduled, remove it @@ -79,7 +79,11 @@ func (s *MaxGreedyMins) CheckFit( func (s *MaxGreedyMins) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDriver, offers []*mesos.Offer) { log.Println("Max-GreedyMins scheduling...") baseSchedRef := spc.(*BaseScheduler) - def.SortTasks(baseSchedRef.tasks, def.SortByWatts) + if baseSchedRef.schedPolSwitchEnabled { + SortNTasks(baseSchedRef.tasks, baseSchedRef.numTasksInSchedWindow, def.SortByWatts) + } else { + def.SortTasks(baseSchedRef.tasks, def.SortByWatts) + } baseSchedRef.LogOffersReceived(offers) for _, offer := range offers { @@ -105,7 +109,13 @@ func (s *MaxGreedyMins) ConsumeOffers(spc SchedPolicyContext, driver sched.Sched // Attempt to schedule a single instance of the heaviest workload available first // Start from the back until one fits for i := len(baseSchedRef.tasks) - 1; i >= 0; i-- { - + // If scheduling policy switching enabled, then + // stop scheduling if the #baseSchedRef.schedWindowSize tasks have been scheduled. + if baseSchedRef.schedPolSwitchEnabled && (s.numTasksScheduled >= baseSchedRef.schedWindowSize) { + log.Printf("Stopped scheduling... Completed scheduling %d tasks.", + s.numTasksScheduled) + break // Offers will automatically get declined. + } task := baseSchedRef.tasks[i] wattsConsideration, err := def.WattsToConsider(task, baseSchedRef.classMapWatts, offer) if err != nil { @@ -131,6 +141,11 @@ func (s *MaxGreedyMins) ConsumeOffers(spc SchedPolicyContext, driver sched.Sched // Pack the rest of the offer with the smallest tasks for i := 0; i < len(baseSchedRef.tasks); i++ { + // If scheduling policy switching enabled, then + // stop scheduling if the #baseSchedRef.schedWindowSize tasks have been scheduled. + if baseSchedRef.schedPolSwitchEnabled && (s.numTasksScheduled >= baseSchedRef.schedWindowSize) { + break // Offers will automatically get declined. + } task := baseSchedRef.tasks[i] wattsConsideration, err := def.WattsToConsider(task, baseSchedRef.classMapWatts, offer) if err != nil { @@ -169,22 +184,5 @@ func (s *MaxGreedyMins) ConsumeOffers(spc SchedPolicyContext, driver sched.Sched } } - // Switch scheduling policy only if feature enabled from CLI - if baseSchedRef.schedPolSwitchEnabled { - // Need to recompute the schedWindow for the next offer cycle. - // The next scheduling policy will schedule at max schedWindow number of tasks. - baseSchedRef.curSchedWindow = baseSchedRef.schedWindowResStrategy.Apply( - func() interface{} { return baseSchedRef.tasks }) - // Switching to a random scheduling policy. - // TODO: Switch based on some criteria. - index := rand.Intn(len(SchedPolicies)) - for k, v := range SchedPolicies { - if index == 0 { - baseSchedRef.LogSchedPolicySwitch(k, v) - spc.SwitchSchedPol(v) - break - } - index-- - } - } + s.switchIfNecessary(spc) } diff --git a/schedulers/MaxMin.go b/schedulers/MaxMin.go index 90d533d..841c96d 100644 --- a/schedulers/MaxMin.go +++ b/schedulers/MaxMin.go @@ -7,7 +7,6 @@ import ( mesos "github.com/mesos/mesos-go/api/v0/mesosproto" sched "github.com/mesos/mesos-go/api/v0/scheduler" "log" - "math/rand" ) // Decides if to take an offer or not @@ -59,6 +58,7 @@ func (s *MaxMin) CheckFit( baseSchedRef.LogSchedTrace(taskToSchedule, offer) *task.Instances-- + s.numTasksScheduled++ if *task.Instances <= 0 { // All instances of task have been scheduled, remove it. @@ -78,7 +78,11 @@ func (s *MaxMin) CheckFit( func (s *MaxMin) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDriver, offers []*mesos.Offer) { log.Println("Max-Min scheduling...") baseSchedRef := spc.(*BaseScheduler) - def.SortTasks(baseSchedRef.tasks, def.SortByWatts) + if baseSchedRef.schedPolSwitchEnabled { + SortNTasks(baseSchedRef.tasks, baseSchedRef.numTasksInSchedWindow, def.SortByWatts) + } else { + def.SortTasks(baseSchedRef.tasks, def.SortByWatts) + } baseSchedRef.LogOffersReceived(offers) for _, offer := range offers { @@ -108,6 +112,14 @@ func (s *MaxMin) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDri var index int start := true // If false then index has changed and need to keep it that way for i := 0; i < len(baseSchedRef.tasks); i++ { + // If scheduling policy switching enabled, then + // stop scheduling if the #baseSchedRef.schedWindowSize tasks have been scheduled. + if baseSchedRef.schedPolSwitchEnabled && + (s.numTasksScheduled >= baseSchedRef.schedWindowSize) { + log.Printf("Stopped scheduling... Completed scheduling %d tasks.", + s.numTasksScheduled) + break // Offers will automatically get declined. + } // We need to pick a min task or a max task // depending on the value of direction. if direction && start { @@ -128,7 +140,6 @@ func (s *MaxMin) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDri continue } - // TODO: Fix this so index doesn't need to be passed. taken, taskToSchedule := s.CheckFit(spc, index, task, wattsConsideration, offer, &totalCPU, &totalRAM, &totalWatts) @@ -163,22 +174,5 @@ func (s *MaxMin) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDri } } - // Switch scheduling policy only if feature enabled from CLI - if baseSchedRef.schedPolSwitchEnabled { - // Need to recompute the schedWindow for the next offer cycle. - // The next scheduling policy will schedule at max schedWindow number of tasks. - baseSchedRef.curSchedWindow = baseSchedRef.schedWindowResStrategy.Apply( - func() interface{} { return baseSchedRef.tasks }) - // Switching to a random scheduling policy. - // TODO: Switch based on some criteria. - index := rand.Intn(len(SchedPolicies)) - for k, v := range SchedPolicies { - if index == 0 { - baseSchedRef.LogSchedPolicySwitch(k, v) - spc.SwitchSchedPol(v) - break - } - index-- - } - } + s.switchIfNecessary(spc) } diff --git a/schedulers/base.go b/schedulers/base.go index 8a59092..cc74215 100644 --- a/schedulers/base.go +++ b/schedulers/base.go @@ -61,14 +61,14 @@ type BaseScheduler struct { // Size of window of tasks that can be scheduled in the next offer cycle. // The window size can be adjusted to make the most use of every resource offer. - // By default, the schedulingWindow would correspond to all the remaining tasks that haven't yet been scheduled. - schedulingWindow int + // By default, the schedWindowSize would correspond to the number of remaining tasks that haven't yet + // been scheduled. + schedWindowSize int + // Number of tasks in the window + numTasksInSchedWindow int // Strategy to resize the schedulingWindow. schedWindowResStrategy schedUtils.SchedWindowResizingStrategy - // Window of tasks that the current scheduling policy has to schedule. - // Once #schedWindow tasks are scheduled, the current scheduling policy has to stop scheduling. - curSchedWindow int // Indicate whether the any resource offers from mesos have been received. hasReceivedResourceOffers bool @@ -188,6 +188,7 @@ func (s *BaseScheduler) Disconnected(sched.SchedulerDriver) { } func (s *BaseScheduler) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { + // Recording the total amount of resources available across the cluster. utilities.RecordTotalResourceAvailability(offers) for _, offer := range offers { if _, ok := s.HostNameToSlaveID[offer.GetHostname()]; !ok { @@ -195,16 +196,18 @@ func (s *BaseScheduler) ResourceOffers(driver sched.SchedulerDriver, offers []*m } } // If no resource offers have been received yet, and if scheduling policy switching has been enabled, - // then we would need to compute the scheduling window for the current scheduling policy. + // then we would need to compute the size of the scheduling window for the current scheduling policy. // Initially the size of the scheduling window is 0. So, based on the total available resources on the cluster, - // the scheduling window is determined and the scheduling policy is then applied for the corresponding number + // the size of the window is determined and the scheduling policy is then applied for the corresponding number // of tasks. - // Subsequently, the scheduling window is determined at the end of each offer cycle. - if !s.hasReceivedResourceOffers && s.schedPolSwitchEnabled { - s.curSchedWindow = s.schedWindowResStrategy.Apply(func() interface{} { + // Subsequently, the size of the scheduling window is determined at the end of each offer cycle. + if s.schedPolSwitchEnabled && !s.hasReceivedResourceOffers { + s.schedWindowSize, s.numTasksInSchedWindow = s.schedWindowResStrategy.Apply(func() interface{} { return s.tasks }) } + log.Printf("SchedWindowSize: %d, NumberOfTasksInWindow: %d", s.schedWindowSize, s.numTasksInSchedWindow) + s.curSchedPolicy.ConsumeOffers(s, driver, offers) s.hasReceivedResourceOffers = true } diff --git a/schedulers/bin-packing.go b/schedulers/bin-packing.go index 02e0067..dd6f9be 100644 --- a/schedulers/bin-packing.go +++ b/schedulers/bin-packing.go @@ -7,7 +7,6 @@ import ( mesos "github.com/mesos/mesos-go/api/v0/mesosproto" sched "github.com/mesos/mesos-go/api/v0/scheduler" "log" - "math/rand" ) // Decides if to take an offer or not @@ -37,7 +36,11 @@ type BinPackSortedWatts struct { func (s *BinPackSortedWatts) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDriver, offers []*mesos.Offer) { log.Println("BPSW scheduling...") baseSchedRef := spc.(*BaseScheduler) - def.SortTasks(baseSchedRef.tasks, def.SortByWatts) + if baseSchedRef.schedPolSwitchEnabled { + SortNTasks(baseSchedRef.tasks, baseSchedRef.numTasksInSchedWindow, def.SortByWatts) + } else { + def.SortTasks(baseSchedRef.tasks, def.SortByWatts) + } baseSchedRef.LogOffersReceived(offers) for _, offer := range offers { @@ -71,6 +74,14 @@ func (s *BinPackSortedWatts) ConsumeOffers(spc SchedPolicyContext, driver sched. } for *task.Instances > 0 { + // If scheduling policy switching enabled, then + // stop scheduling if the #baseSchedRef.schedWindowSize tasks have been scheduled. + if baseSchedRef.schedPolSwitchEnabled && + (s.numTasksScheduled >= baseSchedRef.schedWindowSize) { + log.Printf("Stopped scheduling... Completed scheduling %d tasks.", + s.numTasksScheduled) + break // Offers will automatically get declined. + } // Does the task fit if s.takeOffer(spc, offer, task, totalCPU, totalRAM, totalWatts) { @@ -84,11 +95,11 @@ func (s *BinPackSortedWatts) ConsumeOffers(spc SchedPolicyContext, driver sched. baseSchedRef.LogSchedTrace(taskToSchedule, offer) *task.Instances-- + s.numTasksScheduled++ if *task.Instances <= 0 { // All instances of task have been scheduled, remove it - baseSchedRef.tasks = append(baseSchedRef.tasks[:i], - baseSchedRef.tasks[i+1:]...) + baseSchedRef.tasks = append(baseSchedRef.tasks[:i], baseSchedRef.tasks[i+1:]...) if len(baseSchedRef.tasks) <= 0 { baseSchedRef.LogTerminateScheduler() @@ -113,22 +124,5 @@ func (s *BinPackSortedWatts) ConsumeOffers(spc SchedPolicyContext, driver sched. } } - // Switch scheduling policy only if feature enabled from CLI - if baseSchedRef.schedPolSwitchEnabled { - // Need to recompute the schedWindow for the next offer cycle. - // The next scheduling policy will schedule at max schedWindow number of tasks. - baseSchedRef.curSchedWindow = baseSchedRef.schedWindowResStrategy.Apply( - func() interface{} { return baseSchedRef.tasks }) - // Switching to a random scheduling policy. - // TODO: Switch based on some criteria. - index := rand.Intn(len(SchedPolicies)) - for k, v := range SchedPolicies { - if index == 0 { - baseSchedRef.LogSchedPolicySwitch(k, v) - spc.SwitchSchedPol(v) - break - } - index-- - } - } + s.switchIfNecessary(spc) } diff --git a/schedulers/first-fit.go b/schedulers/first-fit.go index 3c21a39..0a93858 100644 --- a/schedulers/first-fit.go +++ b/schedulers/first-fit.go @@ -7,7 +7,6 @@ import ( mesos "github.com/mesos/mesos-go/api/v0/mesosproto" sched "github.com/mesos/mesos-go/api/v0/scheduler" "log" - "math/rand" ) // Decides if to take an offer or not @@ -55,6 +54,13 @@ func (s *FirstFit) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerD // First fit strategy offerTaken := false for i := 0; i < len(baseSchedRef.tasks); i++ { + // If scheduling policy switching enabled, then + // stop scheduling if the #baseSchedRef.schedWindowSize tasks have been scheduled. + if baseSchedRef.schedPolSwitchEnabled && (s.numTasksScheduled >= baseSchedRef.schedWindowSize) { + log.Printf("Stopped scheduling... Completed scheduling %d tasks.", + s.numTasksScheduled) + break // Offers will automatically get declined. + } task := baseSchedRef.tasks[i] // Don't take offer if it doesn't match our task's host requirement. @@ -76,11 +82,11 @@ func (s *FirstFit) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerD baseSchedRef.LogSchedTrace(taskToSchedule, offer) *task.Instances-- + s.numTasksScheduled++ if *task.Instances <= 0 { // All instances of task have been scheduled, remove it - baseSchedRef.tasks[i] = baseSchedRef.tasks[len(baseSchedRef.tasks)-1] - baseSchedRef.tasks = baseSchedRef.tasks[:len(baseSchedRef.tasks)-1] + baseSchedRef.tasks = append(baseSchedRef.tasks[:i], baseSchedRef.tasks[i+1:]...) if len(baseSchedRef.tasks) <= 0 { baseSchedRef.LogTerminateScheduler() @@ -99,22 +105,5 @@ func (s *FirstFit) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerD } } - // Switch scheduling policy only if feature enabled from CLI - if baseSchedRef.schedPolSwitchEnabled { - // Need to recompute the schedWindow for the next offer cycle. - // The next scheduling policy will schedule at max schedWindow number of tasks. - baseSchedRef.curSchedWindow = baseSchedRef.schedWindowResStrategy.Apply( - func() interface{} { return baseSchedRef.tasks }) - // Switching to a random scheduling policy. - // TODO: Switch based on some criteria. - index := rand.Intn(len(SchedPolicies)) - for k, v := range SchedPolicies { - if index == 0 { - baseSchedRef.LogSchedPolicySwitch(k, v) - spc.SwitchSchedPol(v) - break - } - index-- - } - } + s.switchIfNecessary(spc) } diff --git a/schedulers/helpers.go b/schedulers/helpers.go index 2caa26f..a820837 100644 --- a/schedulers/helpers.go +++ b/schedulers/helpers.go @@ -132,3 +132,8 @@ func LaunchTasks(offerIDs []*mesos.OfferID, tasksToLaunch []*mesos.TaskInfo, dri utilities.ResourceAvailabilityUpdate("ON_TASK_ACTIVE_STATE", *task.TaskId, *task.SlaveId) } } + +// Sort N tasks in the TaskQueue +func SortNTasks(tasks []def.Task, n int, sb def.SortBy) { + def.SortTasks(tasks[:n], sb) +} diff --git a/schedulers/schedPolicy.go b/schedulers/schedPolicy.go index 50ada80..8a64c78 100644 --- a/schedulers/schedPolicy.go +++ b/schedulers/schedPolicy.go @@ -26,21 +26,18 @@ func (bsps *baseSchedPolicyState) switchIfNecessary(spc SchedPolicyContext) { baseSchedRef := spc.(*BaseScheduler) // Switch scheduling policy only if feature enabled from CLI if baseSchedRef.schedPolSwitchEnabled { - // Need to recompute schedulWindow for the next offer cycle. - // The next scheduling policy will schedule at max schedWindow number of tasks. - baseSchedRef.curSchedWindow = baseSchedRef.schedWindowResStrategy.Apply( - func() interface{} { return baseSchedRef.tasks }) + // Need to recompute size of the scheduling window for the next offer cycle. + // The next scheduling policy will schedule at max schedWindowSize number of tasks. + baseSchedRef.schedWindowSize, baseSchedRef.numTasksInSchedWindow = + baseSchedRef.schedWindowResStrategy.Apply(func() interface{} { return baseSchedRef.tasks }) // Switching to a random scheduling policy. // TODO: Switch based on some criteria. index := rand.Intn(len(SchedPolicies)) for _, v := range SchedPolicies { if index == 0 { spc.SwitchSchedPol(v) - // If switched to a different scheduling policy, - // then setting the numberTasksScheduled to 0. - if v != bsps { - bsps.numTasksScheduled = 0 - } + // Resetting the number of tasks scheduled. + bsps.numTasksScheduled = 0 break } index-- diff --git a/utilities/schedUtils/schedUtils.go b/utilities/schedUtils/schedUtils.go index ba70b85..6a2b0b7 100644 --- a/utilities/schedUtils/schedUtils.go +++ b/utilities/schedUtils/schedUtils.go @@ -15,15 +15,19 @@ var SchedWindowResizingCritToStrategy = map[SchedulingWindowResizingCriteria]Sch // Interface for a scheduling window resizing strategy. type SchedWindowResizingStrategy interface { - // Apply the window resizing strategy and return the news scheduling window size. - Apply(func() interface{}) int + // Apply the window resizing strategy and return the size of the scheduling window and the number tasks that + // were traversed in the process. + // The size of the scheduling window would correspond to the total number of + // instances (flattened) that can be scheduled in the next offer cycle. + // The number of tasks would correspond to number of different tasks (instances not included). + Apply(func() interface{}) (int, int) } // Scheduling window resizing strategy that attempts to resize the scheduling window // to include as many tasks as possible so as to make the most use of the next offer cycle. type fillNextOfferCycle struct{} -func (s *fillNextOfferCycle) Apply(getArgs func() interface{}) int { +func (s *fillNextOfferCycle) Apply(getArgs func() interface{}) (int, int) { return s.apply(getArgs().([]def.Task)) } @@ -33,7 +37,7 @@ func (s *fillNextOfferCycle) Apply(getArgs func() interface{}) int { // // Note: To be able to make the most use of the next offer cycle, one would need to perform a non-polynomial search // which is computationally expensive. -func (s *fillNextOfferCycle) apply(taskQueue []def.Task) int { +func (s *fillNextOfferCycle) apply(taskQueue []def.Task) (int, int) { clusterwideResourceCount := utilities.GetClusterwideResourceAvailability() newSchedWindow := 0 filledCPU := 0.0 @@ -49,7 +53,10 @@ func (s *fillNextOfferCycle) apply(taskQueue []def.Task) int { } done := false + // Track of number of tasks traversed. + numberOfTasksTraversed := 0 for _, task := range taskQueue { + numberOfTasksTraversed++ for i := *task.Instances; i > 0; i-- { log.Printf("Checking if Instance #%d of Task[%s] can be scheduled "+ "during the next offer cycle...", i, task.Name) @@ -59,6 +66,10 @@ func (s *fillNextOfferCycle) apply(taskQueue []def.Task) int { newSchedWindow++ } else { done = true + if i == *task.Instances { + // We don't count this task if none of the instances could be scheduled. + numberOfTasksTraversed-- + } break } } @@ -66,5 +77,5 @@ func (s *fillNextOfferCycle) apply(taskQueue []def.Task) int { break } } - return newSchedWindow + return newSchedWindow, numberOfTasksTraversed }