added a baseSchedPolicyState struct in schedulers/schedPolicy.go. This struct would store information common to scheduling policies. Added member called numTasksScheduled to baseSchedPolicyState that would keep count of the number of tasks that the current scheduling policy has scheduled. Moved the logic to switch (currently performing a random switch) to baseSchedPolicyState#switchIfNecessary(...) and retrofitted all scheduling policies to call this instead of inlining the code in each of them.
This commit is contained in:
parent
5a28f8539a
commit
f041e6668b
6 changed files with 63 additions and 84 deletions
|
@ -68,6 +68,9 @@ type BaseScheduler struct {
|
|||
// Window of tasks that the current scheduling policy has to schedule.
|
||||
// Once #schedWindow tasks are scheduled, the current scheduling policy has to stop scheduling.
|
||||
curSchedWindow int
|
||||
|
||||
// Indicate whether the any resource offers from mesos have been received.
|
||||
hasReceivedResourceOffers bool
|
||||
}
|
||||
|
||||
func (s *BaseScheduler) init(opts ...schedPolicyOption) {
|
||||
|
@ -83,6 +86,8 @@ func (s *BaseScheduler) init(opts ...schedPolicyOption) {
|
|||
s.HostNameToSlaveID = make(map[string]string)
|
||||
s.mutex = sync.Mutex{}
|
||||
s.schedWindowResStrategy = schedUtils.SchedWindowResizingCritToStrategy["fillNextOfferCycle"]
|
||||
// Initially no resource offers would have been received.
|
||||
s.hasReceivedResourceOffers = false
|
||||
}
|
||||
|
||||
func (s *BaseScheduler) SwitchSchedPol(newSchedPol SchedPolicyState) {
|
||||
|
@ -187,7 +192,19 @@ func (s *BaseScheduler) ResourceOffers(driver sched.SchedulerDriver, offers []*m
|
|||
s.HostNameToSlaveID[offer.GetHostname()] = offer.GetSlaveId().GoString()
|
||||
}
|
||||
}
|
||||
// If no resource offers have been received yet, and if scheduling policy switching has been enabled,
|
||||
// then we would need to compute the scheduling window for the current scheduling policy.
|
||||
// Initially the size of the scheduling window is 0. So, based on the total available resources on the cluster,
|
||||
// the scheduling window is determined and the scheduling policy is then applied for the corresponding number
|
||||
// of tasks.
|
||||
// Subsequently, the scheduling window is determined at the end of each offer cycle.
|
||||
if !s.hasReceivedResourceOffers && s.schedPolSwitchEnabled {
|
||||
s.curSchedWindow = s.schedWindowResStrategy.Apply(func() interface{} {
|
||||
return s.tasks
|
||||
})
|
||||
}
|
||||
s.curSchedPolicy.ConsumeOffers(s, driver, offers)
|
||||
s.hasReceivedResourceOffers = true
|
||||
}
|
||||
|
||||
func (s *BaseScheduler) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) {
|
||||
|
|
Reference in a new issue