Merged in scheduleOnlySchedWindowTasks (pull request #6)
ScheduleOnlySchedWindowTasks Approved-by: Akash Kothawale <akothaw1@binghamton.edu>
This commit is contained in:
parent
3d51efc679
commit
85383da550
10 changed files with 103 additions and 112 deletions
|
@ -1,7 +1,7 @@
|
||||||
package def
|
package def
|
||||||
|
|
||||||
// The sortBy function that takes a task reference and returns the resource to consider when sorting.
|
// the sortBy function that takes a task reference and returns the resource to consider when sorting.
|
||||||
type sortBy func(task *Task) float64
|
type SortBy func(task *Task) float64
|
||||||
|
|
||||||
// Possible Sorting Criteria.
|
// Possible Sorting Criteria.
|
||||||
// Each holds a closure that fetches the required resource from the
|
// Each holds a closure that fetches the required resource from the
|
||||||
|
|
|
@ -119,7 +119,7 @@ func labelAndOrder(clusters map[int][]Task, numberOfClusters int, taskObservatio
|
||||||
|
|
||||||
// Generic Task Sorter.
|
// Generic Task Sorter.
|
||||||
// Be able to sort an array of tasks based on any of the tasks' resources.
|
// Be able to sort an array of tasks based on any of the tasks' resources.
|
||||||
func SortTasks(ts []Task, sb sortBy) {
|
func SortTasks(ts []Task, sb SortBy) {
|
||||||
sort.SliceStable(ts, func(i, j int) bool {
|
sort.SliceStable(ts, func(i, j int) bool {
|
||||||
return sb(&ts[i]) <= sb(&ts[j])
|
return sb(&ts[i]) <= sb(&ts[j])
|
||||||
})
|
})
|
||||||
|
|
|
@ -7,7 +7,6 @@ import (
|
||||||
mesos "github.com/mesos/mesos-go/api/v0/mesosproto"
|
mesos "github.com/mesos/mesos-go/api/v0/mesosproto"
|
||||||
sched "github.com/mesos/mesos-go/api/v0/scheduler"
|
sched "github.com/mesos/mesos-go/api/v0/scheduler"
|
||||||
"log"
|
"log"
|
||||||
"math/rand"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Decides if to take an offer or not
|
// Decides if to take an offer or not
|
||||||
|
@ -59,6 +58,7 @@ func (s *MaxGreedyMins) CheckFit(
|
||||||
|
|
||||||
baseSchedRef.LogSchedTrace(taskToSchedule, offer)
|
baseSchedRef.LogSchedTrace(taskToSchedule, offer)
|
||||||
*task.Instances--
|
*task.Instances--
|
||||||
|
s.numTasksScheduled++
|
||||||
|
|
||||||
if *task.Instances <= 0 {
|
if *task.Instances <= 0 {
|
||||||
// All instances of task have been scheduled, remove it
|
// All instances of task have been scheduled, remove it
|
||||||
|
@ -79,7 +79,11 @@ func (s *MaxGreedyMins) CheckFit(
|
||||||
func (s *MaxGreedyMins) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDriver, offers []*mesos.Offer) {
|
func (s *MaxGreedyMins) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDriver, offers []*mesos.Offer) {
|
||||||
log.Println("Max-GreedyMins scheduling...")
|
log.Println("Max-GreedyMins scheduling...")
|
||||||
baseSchedRef := spc.(*BaseScheduler)
|
baseSchedRef := spc.(*BaseScheduler)
|
||||||
def.SortTasks(baseSchedRef.tasks, def.SortByWatts)
|
if baseSchedRef.schedPolSwitchEnabled {
|
||||||
|
SortNTasks(baseSchedRef.tasks, baseSchedRef.numTasksInSchedWindow, def.SortByWatts)
|
||||||
|
} else {
|
||||||
|
def.SortTasks(baseSchedRef.tasks, def.SortByWatts)
|
||||||
|
}
|
||||||
baseSchedRef.LogOffersReceived(offers)
|
baseSchedRef.LogOffersReceived(offers)
|
||||||
|
|
||||||
for _, offer := range offers {
|
for _, offer := range offers {
|
||||||
|
@ -105,7 +109,13 @@ func (s *MaxGreedyMins) ConsumeOffers(spc SchedPolicyContext, driver sched.Sched
|
||||||
// Attempt to schedule a single instance of the heaviest workload available first
|
// Attempt to schedule a single instance of the heaviest workload available first
|
||||||
// Start from the back until one fits
|
// Start from the back until one fits
|
||||||
for i := len(baseSchedRef.tasks) - 1; i >= 0; i-- {
|
for i := len(baseSchedRef.tasks) - 1; i >= 0; i-- {
|
||||||
|
// If scheduling policy switching enabled, then
|
||||||
|
// stop scheduling if the #baseSchedRef.schedWindowSize tasks have been scheduled.
|
||||||
|
if baseSchedRef.schedPolSwitchEnabled && (s.numTasksScheduled >= baseSchedRef.schedWindowSize) {
|
||||||
|
log.Printf("Stopped scheduling... Completed scheduling %d tasks.",
|
||||||
|
s.numTasksScheduled)
|
||||||
|
break // Offers will automatically get declined.
|
||||||
|
}
|
||||||
task := baseSchedRef.tasks[i]
|
task := baseSchedRef.tasks[i]
|
||||||
wattsConsideration, err := def.WattsToConsider(task, baseSchedRef.classMapWatts, offer)
|
wattsConsideration, err := def.WattsToConsider(task, baseSchedRef.classMapWatts, offer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -131,6 +141,11 @@ func (s *MaxGreedyMins) ConsumeOffers(spc SchedPolicyContext, driver sched.Sched
|
||||||
|
|
||||||
// Pack the rest of the offer with the smallest tasks
|
// Pack the rest of the offer with the smallest tasks
|
||||||
for i := 0; i < len(baseSchedRef.tasks); i++ {
|
for i := 0; i < len(baseSchedRef.tasks); i++ {
|
||||||
|
// If scheduling policy switching enabled, then
|
||||||
|
// stop scheduling if the #baseSchedRef.schedWindowSize tasks have been scheduled.
|
||||||
|
if baseSchedRef.schedPolSwitchEnabled && (s.numTasksScheduled >= baseSchedRef.schedWindowSize) {
|
||||||
|
break // Offers will automatically get declined.
|
||||||
|
}
|
||||||
task := baseSchedRef.tasks[i]
|
task := baseSchedRef.tasks[i]
|
||||||
wattsConsideration, err := def.WattsToConsider(task, baseSchedRef.classMapWatts, offer)
|
wattsConsideration, err := def.WattsToConsider(task, baseSchedRef.classMapWatts, offer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -169,22 +184,5 @@ func (s *MaxGreedyMins) ConsumeOffers(spc SchedPolicyContext, driver sched.Sched
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Switch scheduling policy only if feature enabled from CLI
|
s.switchIfNecessary(spc)
|
||||||
if baseSchedRef.schedPolSwitchEnabled {
|
|
||||||
// Need to recompute the schedWindow for the next offer cycle.
|
|
||||||
// The next scheduling policy will schedule at max schedWindow number of tasks.
|
|
||||||
baseSchedRef.curSchedWindow = baseSchedRef.schedWindowResStrategy.Apply(
|
|
||||||
func() interface{} { return baseSchedRef.tasks })
|
|
||||||
// Switching to a random scheduling policy.
|
|
||||||
// TODO: Switch based on some criteria.
|
|
||||||
index := rand.Intn(len(SchedPolicies))
|
|
||||||
for k, v := range SchedPolicies {
|
|
||||||
if index == 0 {
|
|
||||||
baseSchedRef.LogSchedPolicySwitch(k, v)
|
|
||||||
spc.SwitchSchedPol(v)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
index--
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,7 +7,6 @@ import (
|
||||||
mesos "github.com/mesos/mesos-go/api/v0/mesosproto"
|
mesos "github.com/mesos/mesos-go/api/v0/mesosproto"
|
||||||
sched "github.com/mesos/mesos-go/api/v0/scheduler"
|
sched "github.com/mesos/mesos-go/api/v0/scheduler"
|
||||||
"log"
|
"log"
|
||||||
"math/rand"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Decides if to take an offer or not
|
// Decides if to take an offer or not
|
||||||
|
@ -59,6 +58,7 @@ func (s *MaxMin) CheckFit(
|
||||||
|
|
||||||
baseSchedRef.LogSchedTrace(taskToSchedule, offer)
|
baseSchedRef.LogSchedTrace(taskToSchedule, offer)
|
||||||
*task.Instances--
|
*task.Instances--
|
||||||
|
s.numTasksScheduled++
|
||||||
|
|
||||||
if *task.Instances <= 0 {
|
if *task.Instances <= 0 {
|
||||||
// All instances of task have been scheduled, remove it.
|
// All instances of task have been scheduled, remove it.
|
||||||
|
@ -78,7 +78,11 @@ func (s *MaxMin) CheckFit(
|
||||||
func (s *MaxMin) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDriver, offers []*mesos.Offer) {
|
func (s *MaxMin) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDriver, offers []*mesos.Offer) {
|
||||||
log.Println("Max-Min scheduling...")
|
log.Println("Max-Min scheduling...")
|
||||||
baseSchedRef := spc.(*BaseScheduler)
|
baseSchedRef := spc.(*BaseScheduler)
|
||||||
def.SortTasks(baseSchedRef.tasks, def.SortByWatts)
|
if baseSchedRef.schedPolSwitchEnabled {
|
||||||
|
SortNTasks(baseSchedRef.tasks, baseSchedRef.numTasksInSchedWindow, def.SortByWatts)
|
||||||
|
} else {
|
||||||
|
def.SortTasks(baseSchedRef.tasks, def.SortByWatts)
|
||||||
|
}
|
||||||
baseSchedRef.LogOffersReceived(offers)
|
baseSchedRef.LogOffersReceived(offers)
|
||||||
|
|
||||||
for _, offer := range offers {
|
for _, offer := range offers {
|
||||||
|
@ -108,6 +112,14 @@ func (s *MaxMin) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDri
|
||||||
var index int
|
var index int
|
||||||
start := true // If false then index has changed and need to keep it that way
|
start := true // If false then index has changed and need to keep it that way
|
||||||
for i := 0; i < len(baseSchedRef.tasks); i++ {
|
for i := 0; i < len(baseSchedRef.tasks); i++ {
|
||||||
|
// If scheduling policy switching enabled, then
|
||||||
|
// stop scheduling if the #baseSchedRef.schedWindowSize tasks have been scheduled.
|
||||||
|
if baseSchedRef.schedPolSwitchEnabled &&
|
||||||
|
(s.numTasksScheduled >= baseSchedRef.schedWindowSize) {
|
||||||
|
log.Printf("Stopped scheduling... Completed scheduling %d tasks.",
|
||||||
|
s.numTasksScheduled)
|
||||||
|
break // Offers will automatically get declined.
|
||||||
|
}
|
||||||
// We need to pick a min task or a max task
|
// We need to pick a min task or a max task
|
||||||
// depending on the value of direction.
|
// depending on the value of direction.
|
||||||
if direction && start {
|
if direction && start {
|
||||||
|
@ -128,7 +140,6 @@ func (s *MaxMin) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDri
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Fix this so index doesn't need to be passed.
|
|
||||||
taken, taskToSchedule := s.CheckFit(spc, index, task, wattsConsideration, offer,
|
taken, taskToSchedule := s.CheckFit(spc, index, task, wattsConsideration, offer,
|
||||||
&totalCPU, &totalRAM, &totalWatts)
|
&totalCPU, &totalRAM, &totalWatts)
|
||||||
|
|
||||||
|
@ -163,22 +174,5 @@ func (s *MaxMin) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDri
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Switch scheduling policy only if feature enabled from CLI
|
s.switchIfNecessary(spc)
|
||||||
if baseSchedRef.schedPolSwitchEnabled {
|
|
||||||
// Need to recompute the schedWindow for the next offer cycle.
|
|
||||||
// The next scheduling policy will schedule at max schedWindow number of tasks.
|
|
||||||
baseSchedRef.curSchedWindow = baseSchedRef.schedWindowResStrategy.Apply(
|
|
||||||
func() interface{} { return baseSchedRef.tasks })
|
|
||||||
// Switching to a random scheduling policy.
|
|
||||||
// TODO: Switch based on some criteria.
|
|
||||||
index := rand.Intn(len(SchedPolicies))
|
|
||||||
for k, v := range SchedPolicies {
|
|
||||||
if index == 0 {
|
|
||||||
baseSchedRef.LogSchedPolicySwitch(k, v)
|
|
||||||
spc.SwitchSchedPol(v)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
index--
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -61,14 +61,14 @@ type BaseScheduler struct {
|
||||||
|
|
||||||
// Size of window of tasks that can be scheduled in the next offer cycle.
|
// Size of window of tasks that can be scheduled in the next offer cycle.
|
||||||
// The window size can be adjusted to make the most use of every resource offer.
|
// The window size can be adjusted to make the most use of every resource offer.
|
||||||
// By default, the schedulingWindow would correspond to all the remaining tasks that haven't yet been scheduled.
|
// By default, the schedWindowSize would correspond to the number of remaining tasks that haven't yet
|
||||||
schedulingWindow int
|
// been scheduled.
|
||||||
|
schedWindowSize int
|
||||||
|
// Number of tasks in the window
|
||||||
|
numTasksInSchedWindow int
|
||||||
|
|
||||||
// Strategy to resize the schedulingWindow.
|
// Strategy to resize the schedulingWindow.
|
||||||
schedWindowResStrategy schedUtils.SchedWindowResizingStrategy
|
schedWindowResStrategy schedUtils.SchedWindowResizingStrategy
|
||||||
// Window of tasks that the current scheduling policy has to schedule.
|
|
||||||
// Once #schedWindow tasks are scheduled, the current scheduling policy has to stop scheduling.
|
|
||||||
curSchedWindow int
|
|
||||||
|
|
||||||
// Indicate whether the any resource offers from mesos have been received.
|
// Indicate whether the any resource offers from mesos have been received.
|
||||||
hasReceivedResourceOffers bool
|
hasReceivedResourceOffers bool
|
||||||
|
@ -188,6 +188,7 @@ func (s *BaseScheduler) Disconnected(sched.SchedulerDriver) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *BaseScheduler) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {
|
func (s *BaseScheduler) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {
|
||||||
|
// Recording the total amount of resources available across the cluster.
|
||||||
utilities.RecordTotalResourceAvailability(offers)
|
utilities.RecordTotalResourceAvailability(offers)
|
||||||
for _, offer := range offers {
|
for _, offer := range offers {
|
||||||
if _, ok := s.HostNameToSlaveID[offer.GetHostname()]; !ok {
|
if _, ok := s.HostNameToSlaveID[offer.GetHostname()]; !ok {
|
||||||
|
@ -195,16 +196,18 @@ func (s *BaseScheduler) ResourceOffers(driver sched.SchedulerDriver, offers []*m
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// If no resource offers have been received yet, and if scheduling policy switching has been enabled,
|
// If no resource offers have been received yet, and if scheduling policy switching has been enabled,
|
||||||
// then we would need to compute the scheduling window for the current scheduling policy.
|
// then we would need to compute the size of the scheduling window for the current scheduling policy.
|
||||||
// Initially the size of the scheduling window is 0. So, based on the total available resources on the cluster,
|
// Initially the size of the scheduling window is 0. So, based on the total available resources on the cluster,
|
||||||
// the scheduling window is determined and the scheduling policy is then applied for the corresponding number
|
// the size of the window is determined and the scheduling policy is then applied for the corresponding number
|
||||||
// of tasks.
|
// of tasks.
|
||||||
// Subsequently, the scheduling window is determined at the end of each offer cycle.
|
// Subsequently, the size of the scheduling window is determined at the end of each offer cycle.
|
||||||
if !s.hasReceivedResourceOffers && s.schedPolSwitchEnabled {
|
if s.schedPolSwitchEnabled && !s.hasReceivedResourceOffers {
|
||||||
s.curSchedWindow = s.schedWindowResStrategy.Apply(func() interface{} {
|
s.schedWindowSize, s.numTasksInSchedWindow = s.schedWindowResStrategy.Apply(func() interface{} {
|
||||||
return s.tasks
|
return s.tasks
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
log.Printf("SchedWindowSize: %d, NumberOfTasksInWindow: %d", s.schedWindowSize, s.numTasksInSchedWindow)
|
||||||
|
|
||||||
s.curSchedPolicy.ConsumeOffers(s, driver, offers)
|
s.curSchedPolicy.ConsumeOffers(s, driver, offers)
|
||||||
s.hasReceivedResourceOffers = true
|
s.hasReceivedResourceOffers = true
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,7 +7,6 @@ import (
|
||||||
mesos "github.com/mesos/mesos-go/api/v0/mesosproto"
|
mesos "github.com/mesos/mesos-go/api/v0/mesosproto"
|
||||||
sched "github.com/mesos/mesos-go/api/v0/scheduler"
|
sched "github.com/mesos/mesos-go/api/v0/scheduler"
|
||||||
"log"
|
"log"
|
||||||
"math/rand"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Decides if to take an offer or not
|
// Decides if to take an offer or not
|
||||||
|
@ -37,7 +36,11 @@ type BinPackSortedWatts struct {
|
||||||
func (s *BinPackSortedWatts) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDriver, offers []*mesos.Offer) {
|
func (s *BinPackSortedWatts) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDriver, offers []*mesos.Offer) {
|
||||||
log.Println("BPSW scheduling...")
|
log.Println("BPSW scheduling...")
|
||||||
baseSchedRef := spc.(*BaseScheduler)
|
baseSchedRef := spc.(*BaseScheduler)
|
||||||
def.SortTasks(baseSchedRef.tasks, def.SortByWatts)
|
if baseSchedRef.schedPolSwitchEnabled {
|
||||||
|
SortNTasks(baseSchedRef.tasks, baseSchedRef.numTasksInSchedWindow, def.SortByWatts)
|
||||||
|
} else {
|
||||||
|
def.SortTasks(baseSchedRef.tasks, def.SortByWatts)
|
||||||
|
}
|
||||||
baseSchedRef.LogOffersReceived(offers)
|
baseSchedRef.LogOffersReceived(offers)
|
||||||
|
|
||||||
for _, offer := range offers {
|
for _, offer := range offers {
|
||||||
|
@ -71,6 +74,14 @@ func (s *BinPackSortedWatts) ConsumeOffers(spc SchedPolicyContext, driver sched.
|
||||||
}
|
}
|
||||||
|
|
||||||
for *task.Instances > 0 {
|
for *task.Instances > 0 {
|
||||||
|
// If scheduling policy switching enabled, then
|
||||||
|
// stop scheduling if the #baseSchedRef.schedWindowSize tasks have been scheduled.
|
||||||
|
if baseSchedRef.schedPolSwitchEnabled &&
|
||||||
|
(s.numTasksScheduled >= baseSchedRef.schedWindowSize) {
|
||||||
|
log.Printf("Stopped scheduling... Completed scheduling %d tasks.",
|
||||||
|
s.numTasksScheduled)
|
||||||
|
break // Offers will automatically get declined.
|
||||||
|
}
|
||||||
// Does the task fit
|
// Does the task fit
|
||||||
if s.takeOffer(spc, offer, task, totalCPU, totalRAM, totalWatts) {
|
if s.takeOffer(spc, offer, task, totalCPU, totalRAM, totalWatts) {
|
||||||
|
|
||||||
|
@ -84,11 +95,11 @@ func (s *BinPackSortedWatts) ConsumeOffers(spc SchedPolicyContext, driver sched.
|
||||||
|
|
||||||
baseSchedRef.LogSchedTrace(taskToSchedule, offer)
|
baseSchedRef.LogSchedTrace(taskToSchedule, offer)
|
||||||
*task.Instances--
|
*task.Instances--
|
||||||
|
s.numTasksScheduled++
|
||||||
|
|
||||||
if *task.Instances <= 0 {
|
if *task.Instances <= 0 {
|
||||||
// All instances of task have been scheduled, remove it
|
// All instances of task have been scheduled, remove it
|
||||||
baseSchedRef.tasks = append(baseSchedRef.tasks[:i],
|
baseSchedRef.tasks = append(baseSchedRef.tasks[:i], baseSchedRef.tasks[i+1:]...)
|
||||||
baseSchedRef.tasks[i+1:]...)
|
|
||||||
|
|
||||||
if len(baseSchedRef.tasks) <= 0 {
|
if len(baseSchedRef.tasks) <= 0 {
|
||||||
baseSchedRef.LogTerminateScheduler()
|
baseSchedRef.LogTerminateScheduler()
|
||||||
|
@ -113,22 +124,5 @@ func (s *BinPackSortedWatts) ConsumeOffers(spc SchedPolicyContext, driver sched.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Switch scheduling policy only if feature enabled from CLI
|
s.switchIfNecessary(spc)
|
||||||
if baseSchedRef.schedPolSwitchEnabled {
|
|
||||||
// Need to recompute the schedWindow for the next offer cycle.
|
|
||||||
// The next scheduling policy will schedule at max schedWindow number of tasks.
|
|
||||||
baseSchedRef.curSchedWindow = baseSchedRef.schedWindowResStrategy.Apply(
|
|
||||||
func() interface{} { return baseSchedRef.tasks })
|
|
||||||
// Switching to a random scheduling policy.
|
|
||||||
// TODO: Switch based on some criteria.
|
|
||||||
index := rand.Intn(len(SchedPolicies))
|
|
||||||
for k, v := range SchedPolicies {
|
|
||||||
if index == 0 {
|
|
||||||
baseSchedRef.LogSchedPolicySwitch(k, v)
|
|
||||||
spc.SwitchSchedPol(v)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
index--
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,7 +7,6 @@ import (
|
||||||
mesos "github.com/mesos/mesos-go/api/v0/mesosproto"
|
mesos "github.com/mesos/mesos-go/api/v0/mesosproto"
|
||||||
sched "github.com/mesos/mesos-go/api/v0/scheduler"
|
sched "github.com/mesos/mesos-go/api/v0/scheduler"
|
||||||
"log"
|
"log"
|
||||||
"math/rand"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Decides if to take an offer or not
|
// Decides if to take an offer or not
|
||||||
|
@ -55,6 +54,13 @@ func (s *FirstFit) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerD
|
||||||
// First fit strategy
|
// First fit strategy
|
||||||
offerTaken := false
|
offerTaken := false
|
||||||
for i := 0; i < len(baseSchedRef.tasks); i++ {
|
for i := 0; i < len(baseSchedRef.tasks); i++ {
|
||||||
|
// If scheduling policy switching enabled, then
|
||||||
|
// stop scheduling if the #baseSchedRef.schedWindowSize tasks have been scheduled.
|
||||||
|
if baseSchedRef.schedPolSwitchEnabled && (s.numTasksScheduled >= baseSchedRef.schedWindowSize) {
|
||||||
|
log.Printf("Stopped scheduling... Completed scheduling %d tasks.",
|
||||||
|
s.numTasksScheduled)
|
||||||
|
break // Offers will automatically get declined.
|
||||||
|
}
|
||||||
task := baseSchedRef.tasks[i]
|
task := baseSchedRef.tasks[i]
|
||||||
|
|
||||||
// Don't take offer if it doesn't match our task's host requirement.
|
// Don't take offer if it doesn't match our task's host requirement.
|
||||||
|
@ -76,11 +82,11 @@ func (s *FirstFit) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerD
|
||||||
|
|
||||||
baseSchedRef.LogSchedTrace(taskToSchedule, offer)
|
baseSchedRef.LogSchedTrace(taskToSchedule, offer)
|
||||||
*task.Instances--
|
*task.Instances--
|
||||||
|
s.numTasksScheduled++
|
||||||
|
|
||||||
if *task.Instances <= 0 {
|
if *task.Instances <= 0 {
|
||||||
// All instances of task have been scheduled, remove it
|
// All instances of task have been scheduled, remove it
|
||||||
baseSchedRef.tasks[i] = baseSchedRef.tasks[len(baseSchedRef.tasks)-1]
|
baseSchedRef.tasks = append(baseSchedRef.tasks[:i], baseSchedRef.tasks[i+1:]...)
|
||||||
baseSchedRef.tasks = baseSchedRef.tasks[:len(baseSchedRef.tasks)-1]
|
|
||||||
|
|
||||||
if len(baseSchedRef.tasks) <= 0 {
|
if len(baseSchedRef.tasks) <= 0 {
|
||||||
baseSchedRef.LogTerminateScheduler()
|
baseSchedRef.LogTerminateScheduler()
|
||||||
|
@ -99,22 +105,5 @@ func (s *FirstFit) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerD
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Switch scheduling policy only if feature enabled from CLI
|
s.switchIfNecessary(spc)
|
||||||
if baseSchedRef.schedPolSwitchEnabled {
|
|
||||||
// Need to recompute the schedWindow for the next offer cycle.
|
|
||||||
// The next scheduling policy will schedule at max schedWindow number of tasks.
|
|
||||||
baseSchedRef.curSchedWindow = baseSchedRef.schedWindowResStrategy.Apply(
|
|
||||||
func() interface{} { return baseSchedRef.tasks })
|
|
||||||
// Switching to a random scheduling policy.
|
|
||||||
// TODO: Switch based on some criteria.
|
|
||||||
index := rand.Intn(len(SchedPolicies))
|
|
||||||
for k, v := range SchedPolicies {
|
|
||||||
if index == 0 {
|
|
||||||
baseSchedRef.LogSchedPolicySwitch(k, v)
|
|
||||||
spc.SwitchSchedPol(v)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
index--
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -132,3 +132,8 @@ func LaunchTasks(offerIDs []*mesos.OfferID, tasksToLaunch []*mesos.TaskInfo, dri
|
||||||
utilities.ResourceAvailabilityUpdate("ON_TASK_ACTIVE_STATE", *task.TaskId, *task.SlaveId)
|
utilities.ResourceAvailabilityUpdate("ON_TASK_ACTIVE_STATE", *task.TaskId, *task.SlaveId)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Sort N tasks in the TaskQueue
|
||||||
|
func SortNTasks(tasks []def.Task, n int, sb def.SortBy) {
|
||||||
|
def.SortTasks(tasks[:n], sb)
|
||||||
|
}
|
||||||
|
|
|
@ -26,21 +26,18 @@ func (bsps *baseSchedPolicyState) switchIfNecessary(spc SchedPolicyContext) {
|
||||||
baseSchedRef := spc.(*BaseScheduler)
|
baseSchedRef := spc.(*BaseScheduler)
|
||||||
// Switch scheduling policy only if feature enabled from CLI
|
// Switch scheduling policy only if feature enabled from CLI
|
||||||
if baseSchedRef.schedPolSwitchEnabled {
|
if baseSchedRef.schedPolSwitchEnabled {
|
||||||
// Need to recompute schedulWindow for the next offer cycle.
|
// Need to recompute size of the scheduling window for the next offer cycle.
|
||||||
// The next scheduling policy will schedule at max schedWindow number of tasks.
|
// The next scheduling policy will schedule at max schedWindowSize number of tasks.
|
||||||
baseSchedRef.curSchedWindow = baseSchedRef.schedWindowResStrategy.Apply(
|
baseSchedRef.schedWindowSize, baseSchedRef.numTasksInSchedWindow =
|
||||||
func() interface{} { return baseSchedRef.tasks })
|
baseSchedRef.schedWindowResStrategy.Apply(func() interface{} { return baseSchedRef.tasks })
|
||||||
// Switching to a random scheduling policy.
|
// Switching to a random scheduling policy.
|
||||||
// TODO: Switch based on some criteria.
|
// TODO: Switch based on some criteria.
|
||||||
index := rand.Intn(len(SchedPolicies))
|
index := rand.Intn(len(SchedPolicies))
|
||||||
for _, v := range SchedPolicies {
|
for _, v := range SchedPolicies {
|
||||||
if index == 0 {
|
if index == 0 {
|
||||||
spc.SwitchSchedPol(v)
|
spc.SwitchSchedPol(v)
|
||||||
// If switched to a different scheduling policy,
|
// Resetting the number of tasks scheduled.
|
||||||
// then setting the numberTasksScheduled to 0.
|
bsps.numTasksScheduled = 0
|
||||||
if v != bsps {
|
|
||||||
bsps.numTasksScheduled = 0
|
|
||||||
}
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
index--
|
index--
|
||||||
|
|
|
@ -15,15 +15,19 @@ var SchedWindowResizingCritToStrategy = map[SchedulingWindowResizingCriteria]Sch
|
||||||
|
|
||||||
// Interface for a scheduling window resizing strategy.
|
// Interface for a scheduling window resizing strategy.
|
||||||
type SchedWindowResizingStrategy interface {
|
type SchedWindowResizingStrategy interface {
|
||||||
// Apply the window resizing strategy and return the news scheduling window size.
|
// Apply the window resizing strategy and return the size of the scheduling window and the number tasks that
|
||||||
Apply(func() interface{}) int
|
// were traversed in the process.
|
||||||
|
// The size of the scheduling window would correspond to the total number of
|
||||||
|
// instances (flattened) that can be scheduled in the next offer cycle.
|
||||||
|
// The number of tasks would correspond to number of different tasks (instances not included).
|
||||||
|
Apply(func() interface{}) (int, int)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Scheduling window resizing strategy that attempts to resize the scheduling window
|
// Scheduling window resizing strategy that attempts to resize the scheduling window
|
||||||
// to include as many tasks as possible so as to make the most use of the next offer cycle.
|
// to include as many tasks as possible so as to make the most use of the next offer cycle.
|
||||||
type fillNextOfferCycle struct{}
|
type fillNextOfferCycle struct{}
|
||||||
|
|
||||||
func (s *fillNextOfferCycle) Apply(getArgs func() interface{}) int {
|
func (s *fillNextOfferCycle) Apply(getArgs func() interface{}) (int, int) {
|
||||||
return s.apply(getArgs().([]def.Task))
|
return s.apply(getArgs().([]def.Task))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -33,7 +37,7 @@ func (s *fillNextOfferCycle) Apply(getArgs func() interface{}) int {
|
||||||
//
|
//
|
||||||
// Note: To be able to make the most use of the next offer cycle, one would need to perform a non-polynomial search
|
// Note: To be able to make the most use of the next offer cycle, one would need to perform a non-polynomial search
|
||||||
// which is computationally expensive.
|
// which is computationally expensive.
|
||||||
func (s *fillNextOfferCycle) apply(taskQueue []def.Task) int {
|
func (s *fillNextOfferCycle) apply(taskQueue []def.Task) (int, int) {
|
||||||
clusterwideResourceCount := utilities.GetClusterwideResourceAvailability()
|
clusterwideResourceCount := utilities.GetClusterwideResourceAvailability()
|
||||||
newSchedWindow := 0
|
newSchedWindow := 0
|
||||||
filledCPU := 0.0
|
filledCPU := 0.0
|
||||||
|
@ -49,7 +53,10 @@ func (s *fillNextOfferCycle) apply(taskQueue []def.Task) int {
|
||||||
}
|
}
|
||||||
|
|
||||||
done := false
|
done := false
|
||||||
|
// Track of number of tasks traversed.
|
||||||
|
numberOfTasksTraversed := 0
|
||||||
for _, task := range taskQueue {
|
for _, task := range taskQueue {
|
||||||
|
numberOfTasksTraversed++
|
||||||
for i := *task.Instances; i > 0; i-- {
|
for i := *task.Instances; i > 0; i-- {
|
||||||
log.Printf("Checking if Instance #%d of Task[%s] can be scheduled "+
|
log.Printf("Checking if Instance #%d of Task[%s] can be scheduled "+
|
||||||
"during the next offer cycle...", i, task.Name)
|
"during the next offer cycle...", i, task.Name)
|
||||||
|
@ -59,6 +66,10 @@ func (s *fillNextOfferCycle) apply(taskQueue []def.Task) int {
|
||||||
newSchedWindow++
|
newSchedWindow++
|
||||||
} else {
|
} else {
|
||||||
done = true
|
done = true
|
||||||
|
if i == *task.Instances {
|
||||||
|
// We don't count this task if none of the instances could be scheduled.
|
||||||
|
numberOfTasksTraversed--
|
||||||
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -66,5 +77,5 @@ func (s *fillNextOfferCycle) apply(taskQueue []def.Task) int {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return newSchedWindow
|
return newSchedWindow, numberOfTasksTraversed
|
||||||
}
|
}
|
||||||
|
|
Reference in a new issue