From 3ebd7b0c7ecfda9c23e22de7ffb9610e661cf0b5 Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Wed, 31 Jan 2018 19:00:31 -0500 Subject: [PATCH] added utility to compute the scheduling window. Right now there's only criteria on which this is determined -- fillNextOfferCycle. So, the schedWindow is the max number of tasks, that aren't yet scheduled, whose aggregate resource requirement is as close as possible to the resource available in the next round of resource offers. To be able to make the most use of the next offer cycle, one would need to perform a non-polynomial search of the TaskQueue and as this is computationally expensive, a linear search is performed on the TaskQueue. Retrofitted scheduling policies to also call utilities.schedUtils#schedWindowResizingStrategy#Apply before switching to a new scheduling policy. --- power-capping/extrema.go | 4 +- power-capping/progressive-extrema.go | 2 +- rapl/cap.go | 4 +- scheduler.go | 2 +- schedulers/MaxGreedyMins.go | 6 +++ schedulers/MaxMin.go | 6 +++ schedulers/base.go | 13 ++++++ schedulers/bin-packing.go | 6 +++ schedulers/first-fit.go | 6 +++ schedulers/helpers.go | 2 +- schedulers/store.go | 16 +++---- utilities/schedUtils/schedUtils.go | 63 ++++++++++++++++++++++++++++ 12 files changed, 115 insertions(+), 15 deletions(-) create mode 100644 utilities/schedUtils/schedUtils.go diff --git a/power-capping/extrema.go b/power-capping/extrema.go index 23908a1..8d62b02 100644 --- a/power-capping/extrema.go +++ b/power-capping/extrema.go @@ -1,9 +1,9 @@ package pcp import ( + elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def" "bitbucket.org/sunybingcloud/elektron/pcp" "bitbucket.org/sunybingcloud/elektron/rapl" - elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def" "bufio" "container/ring" "fmt" @@ -130,7 +130,7 @@ func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, hiThresh cappedHosts[victim.Host] = true orderCapped = append(orderCapped, victim.Host) logMType <- elecLogDef.GENERAL - logMsg <- fmt.Sprintf("Capping Victim %s Avg. Wattage: %f", victim.Host, victim.Watts * pcp.RAPLUnits) + logMsg <- fmt.Sprintf("Capping Victim %s Avg. Wattage: %f", victim.Host, victim.Watts*pcp.RAPLUnits) if err := rapl.Cap(victim.Host, "rapl", 50); err != nil { logMType <- elecLogDef.ERROR logMsg <- "Error capping host" diff --git a/power-capping/progressive-extrema.go b/power-capping/progressive-extrema.go index 4905b0a..bc9350e 100644 --- a/power-capping/progressive-extrema.go +++ b/power-capping/progressive-extrema.go @@ -2,10 +2,10 @@ package pcp import ( "bitbucket.org/sunybingcloud/elektron/constants" + elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def" "bitbucket.org/sunybingcloud/elektron/pcp" "bitbucket.org/sunybingcloud/elektron/rapl" "bitbucket.org/sunybingcloud/elektron/utilities" - elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def" "bufio" "container/ring" "fmt" diff --git a/rapl/cap.go b/rapl/cap.go index a981ddc..dd09606 100644 --- a/rapl/cap.go +++ b/rapl/cap.go @@ -1,11 +1,11 @@ package rapl import ( + elekEnv "bitbucket.org/sunybingcloud/elektron/environment" "github.com/pkg/errors" "golang.org/x/crypto/ssh" - "strconv" "os" - elekEnv "bitbucket.org/sunybingcloud/elektron/environment" + "strconv" ) func Cap(host, username string, percentage float64) error { diff --git a/scheduler.go b/scheduler.go index 2365e76..2b7ffbb 100644 --- a/scheduler.go +++ b/scheduler.go @@ -2,9 +2,9 @@ package main import ( "bitbucket.org/sunybingcloud/elektron/def" + elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def" "bitbucket.org/sunybingcloud/elektron/pcp" "bitbucket.org/sunybingcloud/elektron/schedulers" - elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def" "flag" "fmt" "github.com/golang/protobuf/proto" diff --git a/schedulers/MaxGreedyMins.go b/schedulers/MaxGreedyMins.go index 9ea9943..4ac046a 100644 --- a/schedulers/MaxGreedyMins.go +++ b/schedulers/MaxGreedyMins.go @@ -4,6 +4,7 @@ import ( "bitbucket.org/sunybingcloud/elektron/def" "bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils" "bitbucket.org/sunybingcloud/elektron/utilities/offerUtils" + "bitbucket.org/sunybingcloud/elektron/utilities/schedUtils" "fmt" mesos "github.com/mesos/mesos-go/api/v0/mesosproto" sched "github.com/mesos/mesos-go/api/v0/scheduler" @@ -174,6 +175,11 @@ func (s *MaxGreedyMins) ConsumeOffers(spc SchedPolicyContext, driver sched.Sched // Switch scheduling policy only if feature enabled from CLI if baseSchedRef.schedPolSwitchEnabled { + // Need to recompute the schedWindow for the next offer cycle. + // The next scheduling policy will schedule at max schedWindow number of tasks. + baseSchedRef.schedWindow = schedUtils. + SchedWindowResizingCritToStrategy[baseSchedRef.schedWindowResizeCrit].Apply( + func() interface{} { return baseSchedRef.tasks }) // Switching to a random scheduling policy. // TODO: Switch based on some criteria. index := rand.Intn(len(SchedPolicies)) diff --git a/schedulers/MaxMin.go b/schedulers/MaxMin.go index 1a3df96..ee1fda8 100644 --- a/schedulers/MaxMin.go +++ b/schedulers/MaxMin.go @@ -4,6 +4,7 @@ import ( "bitbucket.org/sunybingcloud/elektron/def" "bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils" "bitbucket.org/sunybingcloud/elektron/utilities/offerUtils" + "bitbucket.org/sunybingcloud/elektron/utilities/schedUtils" "fmt" mesos "github.com/mesos/mesos-go/api/v0/mesosproto" sched "github.com/mesos/mesos-go/api/v0/scheduler" @@ -168,6 +169,11 @@ func (s *MaxMin) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDri // Switch scheduling policy only if feature enabled from CLI if baseSchedRef.schedPolSwitchEnabled { + // Need to recompute the schedWindow for the next offer cycle. + // The next scheduling policy will schedule at max schedWindow number of tasks. + baseSchedRef.schedWindow = schedUtils. + SchedWindowResizingCritToStrategy[baseSchedRef.schedWindowResizeCrit].Apply( + func() interface{} { return baseSchedRef.tasks }) // Switching to a random scheduling policy. // TODO: Switch based on some criteria. index := rand.Intn(len(SchedPolicies)) diff --git a/schedulers/base.go b/schedulers/base.go index e481d57..db2c849 100644 --- a/schedulers/base.go +++ b/schedulers/base.go @@ -4,6 +4,7 @@ import ( "bitbucket.org/sunybingcloud/elektron/def" elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def" "bitbucket.org/sunybingcloud/elektron/utilities" + "bitbucket.org/sunybingcloud/elektron/utilities/schedUtils" "bytes" "fmt" "github.com/golang/protobuf/proto" @@ -55,6 +56,17 @@ type BaseScheduler struct { // Whether switching of scheduling policies at runtime has been enabled schedPolSwitchEnabled bool + + // Size of window of tasks that can be scheduled in the next offer cycle. + // The window size can be adjusted to make the most use of every resource offer. + // By default, the schedulingWindow would correspond to all the remaining tasks that haven't yet been scheduled. + schedulingWindow int + + // Criteria to resize the schedulingWindow. + schedWindowResizeCrit schedUtils.SchedulingWindowResizingCriteria + // Window of tasks that the current scheduling policy has to schedule. + // Once #schedWindow tasks are scheduled, the current scheduling policy has to stop scheduling. + schedWindow int } func (s *BaseScheduler) init(opts ...schedPolicyOption) { @@ -66,6 +78,7 @@ func (s *BaseScheduler) init(opts ...schedPolicyOption) { } s.running = make(map[string]map[string]bool) s.mutex = sync.Mutex{} + s.schedWindowResizeCrit = "fillNextOfferCycle" } func (s *BaseScheduler) SwitchSchedPol(newSchedPol SchedPolicyState) { diff --git a/schedulers/bin-packing.go b/schedulers/bin-packing.go index 27d1580..f589f82 100644 --- a/schedulers/bin-packing.go +++ b/schedulers/bin-packing.go @@ -4,6 +4,7 @@ import ( "bitbucket.org/sunybingcloud/elektron/def" "bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils" "bitbucket.org/sunybingcloud/elektron/utilities/offerUtils" + "bitbucket.org/sunybingcloud/elektron/utilities/schedUtils" "fmt" mesos "github.com/mesos/mesos-go/api/v0/mesosproto" sched "github.com/mesos/mesos-go/api/v0/scheduler" @@ -118,6 +119,11 @@ func (s *BinPackSortedWatts) ConsumeOffers(spc SchedPolicyContext, driver sched. // Switch scheduling policy only if feature enabled from CLI if baseSchedRef.schedPolSwitchEnabled { + // Need to recompute the schedWindow for the next offer cycle. + // The next scheduling policy will schedule at max schedWindow number of tasks. + baseSchedRef.schedWindow = schedUtils. + SchedWindowResizingCritToStrategy[baseSchedRef.schedWindowResizeCrit].Apply( + func() interface{} { return baseSchedRef.tasks }) // Switching to a random scheduling policy. // TODO: Switch based on some criteria. index := rand.Intn(len(SchedPolicies)) diff --git a/schedulers/first-fit.go b/schedulers/first-fit.go index caf694b..c0d1235 100644 --- a/schedulers/first-fit.go +++ b/schedulers/first-fit.go @@ -4,6 +4,7 @@ import ( "bitbucket.org/sunybingcloud/elektron/def" "bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils" "bitbucket.org/sunybingcloud/elektron/utilities/offerUtils" + "bitbucket.org/sunybingcloud/elektron/utilities/schedUtils" "fmt" mesos "github.com/mesos/mesos-go/api/v0/mesosproto" sched "github.com/mesos/mesos-go/api/v0/scheduler" @@ -103,6 +104,11 @@ func (s *FirstFit) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerD // Switch scheduling policy only if feature enabled from CLI if baseSchedRef.schedPolSwitchEnabled { + // Need to recompute the schedWindow for the next offer cycle. + // The next scheduling policy will schedule at max schedWindow number of tasks. + baseSchedRef.schedWindow = schedUtils. + SchedWindowResizingCritToStrategy[baseSchedRef.schedWindowResizeCrit].Apply( + func() interface{} { return baseSchedRef.tasks }) // Switching to a random scheduling policy. // TODO: Switch based on some criteria. index := rand.Intn(len(SchedPolicies)) diff --git a/schedulers/helpers.go b/schedulers/helpers.go index ac020b9..60ff2e4 100644 --- a/schedulers/helpers.go +++ b/schedulers/helpers.go @@ -125,7 +125,7 @@ func WithSchedPolSwitchEnabled(enableSchedPolicySwitch bool) schedPolicyOption { } } -// Launch tasks. +// Launch tasks and also update the resource availability for the corresponding host. func LaunchTasks(offerIDs []*mesos.OfferID, tasksToLaunch []*mesos.TaskInfo, driver sched.SchedulerDriver) error { driver.LaunchTasks(offerIDs, tasksToLaunch, mesosUtils.DefaultFilter) // Update resource availability diff --git a/schedulers/store.go b/schedulers/store.go index b172598..b13ad76 100644 --- a/schedulers/store.go +++ b/schedulers/store.go @@ -6,18 +6,18 @@ import ( // Names of different scheduling policies. const ( - ff = "first-fit" - bp = "bin-packing" - mgm = "max-greedymins" - mm = "max-min" + ff = "first-fit" + bp = "bin-packing" + mgm = "max-greedymins" + mm = "max-min" ) // Scheduling policy factory var SchedPolicies map[string]SchedPolicyState = map[string]SchedPolicyState{ - ff: &FirstFit{}, - bp: &BinPackSortedWatts{}, - mgm: &MaxGreedyMins{}, - mm: &MaxMin{}, + ff: &FirstFit{}, + bp: &BinPackSortedWatts{}, + mgm: &MaxGreedyMins{}, + mm: &MaxMin{}, } // build the scheduling policy with the options being applied diff --git a/utilities/schedUtils/schedUtils.go b/utilities/schedUtils/schedUtils.go new file mode 100644 index 0000000..6c57020 --- /dev/null +++ b/utilities/schedUtils/schedUtils.go @@ -0,0 +1,63 @@ +package schedUtils + +import ( + "bitbucket.org/sunybingcloud/elektron/def" + "bitbucket.org/sunybingcloud/elektron/utilities" +) + +// Criteria for resizing the scheduling window. +type SchedulingWindowResizingCriteria string + +var SchedWindowResizingCritToStrategy = map[SchedulingWindowResizingCriteria]schedWindowResizingStrategy{ + "fillNextOfferCycle": &fillNextOfferCycle{}, +} + +// Interface for a scheduling window resizing strategy. +type schedWindowResizingStrategy interface { + // Apply the window resizing strategy and return the news scheduling window size. + Apply(func() interface{}) int +} + +// Scheduling window resizing strategy that attempts to resize the scheduling window +// to include as many tasks as possible so as to make the most use of the next offer cycle. +type fillNextOfferCycle struct{} + +func (s *fillNextOfferCycle) Apply(getArgs func() interface{}) int { + return s.apply(getArgs().([]def.Task)) +} + +// Loop over the unscheduled tasks, in submission order, and determine the maximum +// number of tasks that can be scheduled in the next offer cycle. +// As the offers get smaller and smaller, this approach might lead to an increase in internal fragmentation. +// +// Note: To be able to make the most use of the next offer cycle, one would need to perform a non-polynomial search +// which is computationally expensive. +func (s *fillNextOfferCycle) apply(taskQueue []def.Task) int { + clusterwideResourceCount := utilities.GetClusterwideResourceAvailability() + newSchedWindow := 0 + filledCPU := 0.0 + filledRAM := 0.0 + + // Can we schedule another task. + canSchedule := func(t def.Task) bool { + if ((filledCPU + t.CPU) <= clusterwideResourceCount.UnusedCPU) && + ((filledRAM + t.RAM) <= clusterwideResourceCount.UnusedRAM) { + return true + } + return false + } + + for _, task := range taskQueue { + for i := *task.Instances; i > 0; i-- { + if canSchedule(task) { + filledCPU += task.CPU + filledRAM += task.RAM + newSchedWindow++ + } else { + break + } + } + break + } + return newSchedWindow +}