diff --git a/power-capping/extrema.go b/power-capping/extrema.go index 23908a1..8d62b02 100644 --- a/power-capping/extrema.go +++ b/power-capping/extrema.go @@ -1,9 +1,9 @@ package pcp import ( + elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def" "bitbucket.org/sunybingcloud/elektron/pcp" "bitbucket.org/sunybingcloud/elektron/rapl" - elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def" "bufio" "container/ring" "fmt" @@ -130,7 +130,7 @@ func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, hiThresh cappedHosts[victim.Host] = true orderCapped = append(orderCapped, victim.Host) logMType <- elecLogDef.GENERAL - logMsg <- fmt.Sprintf("Capping Victim %s Avg. Wattage: %f", victim.Host, victim.Watts * pcp.RAPLUnits) + logMsg <- fmt.Sprintf("Capping Victim %s Avg. Wattage: %f", victim.Host, victim.Watts*pcp.RAPLUnits) if err := rapl.Cap(victim.Host, "rapl", 50); err != nil { logMType <- elecLogDef.ERROR logMsg <- "Error capping host" diff --git a/power-capping/progressive-extrema.go b/power-capping/progressive-extrema.go index 4905b0a..bc9350e 100644 --- a/power-capping/progressive-extrema.go +++ b/power-capping/progressive-extrema.go @@ -2,10 +2,10 @@ package pcp import ( "bitbucket.org/sunybingcloud/elektron/constants" + elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def" "bitbucket.org/sunybingcloud/elektron/pcp" "bitbucket.org/sunybingcloud/elektron/rapl" "bitbucket.org/sunybingcloud/elektron/utilities" - elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def" "bufio" "container/ring" "fmt" diff --git a/rapl/cap.go b/rapl/cap.go index a981ddc..dd09606 100644 --- a/rapl/cap.go +++ b/rapl/cap.go @@ -1,11 +1,11 @@ package rapl import ( + elekEnv "bitbucket.org/sunybingcloud/elektron/environment" "github.com/pkg/errors" "golang.org/x/crypto/ssh" - "strconv" "os" - elekEnv "bitbucket.org/sunybingcloud/elektron/environment" + "strconv" ) func Cap(host, username string, percentage float64) error { diff --git a/scheduler.go b/scheduler.go index 2365e76..2b7ffbb 100644 --- a/scheduler.go +++ b/scheduler.go @@ -2,9 +2,9 @@ package main import ( "bitbucket.org/sunybingcloud/elektron/def" + elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def" "bitbucket.org/sunybingcloud/elektron/pcp" "bitbucket.org/sunybingcloud/elektron/schedulers" - elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def" "flag" "fmt" "github.com/golang/protobuf/proto" diff --git a/schedulers/MaxGreedyMins.go b/schedulers/MaxGreedyMins.go index 9ea9943..4ac046a 100644 --- a/schedulers/MaxGreedyMins.go +++ b/schedulers/MaxGreedyMins.go @@ -4,6 +4,7 @@ import ( "bitbucket.org/sunybingcloud/elektron/def" "bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils" "bitbucket.org/sunybingcloud/elektron/utilities/offerUtils" + "bitbucket.org/sunybingcloud/elektron/utilities/schedUtils" "fmt" mesos "github.com/mesos/mesos-go/api/v0/mesosproto" sched "github.com/mesos/mesos-go/api/v0/scheduler" @@ -174,6 +175,11 @@ func (s *MaxGreedyMins) ConsumeOffers(spc SchedPolicyContext, driver sched.Sched // Switch scheduling policy only if feature enabled from CLI if baseSchedRef.schedPolSwitchEnabled { + // Need to recompute the schedWindow for the next offer cycle. + // The next scheduling policy will schedule at max schedWindow number of tasks. + baseSchedRef.schedWindow = schedUtils. + SchedWindowResizingCritToStrategy[baseSchedRef.schedWindowResizeCrit].Apply( + func() interface{} { return baseSchedRef.tasks }) // Switching to a random scheduling policy. // TODO: Switch based on some criteria. index := rand.Intn(len(SchedPolicies)) diff --git a/schedulers/MaxMin.go b/schedulers/MaxMin.go index 1a3df96..ee1fda8 100644 --- a/schedulers/MaxMin.go +++ b/schedulers/MaxMin.go @@ -4,6 +4,7 @@ import ( "bitbucket.org/sunybingcloud/elektron/def" "bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils" "bitbucket.org/sunybingcloud/elektron/utilities/offerUtils" + "bitbucket.org/sunybingcloud/elektron/utilities/schedUtils" "fmt" mesos "github.com/mesos/mesos-go/api/v0/mesosproto" sched "github.com/mesos/mesos-go/api/v0/scheduler" @@ -168,6 +169,11 @@ func (s *MaxMin) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDri // Switch scheduling policy only if feature enabled from CLI if baseSchedRef.schedPolSwitchEnabled { + // Need to recompute the schedWindow for the next offer cycle. + // The next scheduling policy will schedule at max schedWindow number of tasks. + baseSchedRef.schedWindow = schedUtils. + SchedWindowResizingCritToStrategy[baseSchedRef.schedWindowResizeCrit].Apply( + func() interface{} { return baseSchedRef.tasks }) // Switching to a random scheduling policy. // TODO: Switch based on some criteria. index := rand.Intn(len(SchedPolicies)) diff --git a/schedulers/base.go b/schedulers/base.go index e481d57..db2c849 100644 --- a/schedulers/base.go +++ b/schedulers/base.go @@ -4,6 +4,7 @@ import ( "bitbucket.org/sunybingcloud/elektron/def" elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def" "bitbucket.org/sunybingcloud/elektron/utilities" + "bitbucket.org/sunybingcloud/elektron/utilities/schedUtils" "bytes" "fmt" "github.com/golang/protobuf/proto" @@ -55,6 +56,17 @@ type BaseScheduler struct { // Whether switching of scheduling policies at runtime has been enabled schedPolSwitchEnabled bool + + // Size of window of tasks that can be scheduled in the next offer cycle. + // The window size can be adjusted to make the most use of every resource offer. + // By default, the schedulingWindow would correspond to all the remaining tasks that haven't yet been scheduled. + schedulingWindow int + + // Criteria to resize the schedulingWindow. + schedWindowResizeCrit schedUtils.SchedulingWindowResizingCriteria + // Window of tasks that the current scheduling policy has to schedule. + // Once #schedWindow tasks are scheduled, the current scheduling policy has to stop scheduling. + schedWindow int } func (s *BaseScheduler) init(opts ...schedPolicyOption) { @@ -66,6 +78,7 @@ func (s *BaseScheduler) init(opts ...schedPolicyOption) { } s.running = make(map[string]map[string]bool) s.mutex = sync.Mutex{} + s.schedWindowResizeCrit = "fillNextOfferCycle" } func (s *BaseScheduler) SwitchSchedPol(newSchedPol SchedPolicyState) { diff --git a/schedulers/bin-packing.go b/schedulers/bin-packing.go index 27d1580..f589f82 100644 --- a/schedulers/bin-packing.go +++ b/schedulers/bin-packing.go @@ -4,6 +4,7 @@ import ( "bitbucket.org/sunybingcloud/elektron/def" "bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils" "bitbucket.org/sunybingcloud/elektron/utilities/offerUtils" + "bitbucket.org/sunybingcloud/elektron/utilities/schedUtils" "fmt" mesos "github.com/mesos/mesos-go/api/v0/mesosproto" sched "github.com/mesos/mesos-go/api/v0/scheduler" @@ -118,6 +119,11 @@ func (s *BinPackSortedWatts) ConsumeOffers(spc SchedPolicyContext, driver sched. // Switch scheduling policy only if feature enabled from CLI if baseSchedRef.schedPolSwitchEnabled { + // Need to recompute the schedWindow for the next offer cycle. + // The next scheduling policy will schedule at max schedWindow number of tasks. + baseSchedRef.schedWindow = schedUtils. + SchedWindowResizingCritToStrategy[baseSchedRef.schedWindowResizeCrit].Apply( + func() interface{} { return baseSchedRef.tasks }) // Switching to a random scheduling policy. // TODO: Switch based on some criteria. index := rand.Intn(len(SchedPolicies)) diff --git a/schedulers/first-fit.go b/schedulers/first-fit.go index caf694b..c0d1235 100644 --- a/schedulers/first-fit.go +++ b/schedulers/first-fit.go @@ -4,6 +4,7 @@ import ( "bitbucket.org/sunybingcloud/elektron/def" "bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils" "bitbucket.org/sunybingcloud/elektron/utilities/offerUtils" + "bitbucket.org/sunybingcloud/elektron/utilities/schedUtils" "fmt" mesos "github.com/mesos/mesos-go/api/v0/mesosproto" sched "github.com/mesos/mesos-go/api/v0/scheduler" @@ -103,6 +104,11 @@ func (s *FirstFit) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerD // Switch scheduling policy only if feature enabled from CLI if baseSchedRef.schedPolSwitchEnabled { + // Need to recompute the schedWindow for the next offer cycle. + // The next scheduling policy will schedule at max schedWindow number of tasks. + baseSchedRef.schedWindow = schedUtils. + SchedWindowResizingCritToStrategy[baseSchedRef.schedWindowResizeCrit].Apply( + func() interface{} { return baseSchedRef.tasks }) // Switching to a random scheduling policy. // TODO: Switch based on some criteria. index := rand.Intn(len(SchedPolicies)) diff --git a/schedulers/helpers.go b/schedulers/helpers.go index ac020b9..60ff2e4 100644 --- a/schedulers/helpers.go +++ b/schedulers/helpers.go @@ -125,7 +125,7 @@ func WithSchedPolSwitchEnabled(enableSchedPolicySwitch bool) schedPolicyOption { } } -// Launch tasks. +// Launch tasks and also update the resource availability for the corresponding host. func LaunchTasks(offerIDs []*mesos.OfferID, tasksToLaunch []*mesos.TaskInfo, driver sched.SchedulerDriver) error { driver.LaunchTasks(offerIDs, tasksToLaunch, mesosUtils.DefaultFilter) // Update resource availability diff --git a/schedulers/store.go b/schedulers/store.go index b172598..b13ad76 100644 --- a/schedulers/store.go +++ b/schedulers/store.go @@ -6,18 +6,18 @@ import ( // Names of different scheduling policies. const ( - ff = "first-fit" - bp = "bin-packing" - mgm = "max-greedymins" - mm = "max-min" + ff = "first-fit" + bp = "bin-packing" + mgm = "max-greedymins" + mm = "max-min" ) // Scheduling policy factory var SchedPolicies map[string]SchedPolicyState = map[string]SchedPolicyState{ - ff: &FirstFit{}, - bp: &BinPackSortedWatts{}, - mgm: &MaxGreedyMins{}, - mm: &MaxMin{}, + ff: &FirstFit{}, + bp: &BinPackSortedWatts{}, + mgm: &MaxGreedyMins{}, + mm: &MaxMin{}, } // build the scheduling policy with the options being applied diff --git a/utilities/schedUtils/schedUtils.go b/utilities/schedUtils/schedUtils.go new file mode 100644 index 0000000..6c57020 --- /dev/null +++ b/utilities/schedUtils/schedUtils.go @@ -0,0 +1,63 @@ +package schedUtils + +import ( + "bitbucket.org/sunybingcloud/elektron/def" + "bitbucket.org/sunybingcloud/elektron/utilities" +) + +// Criteria for resizing the scheduling window. +type SchedulingWindowResizingCriteria string + +var SchedWindowResizingCritToStrategy = map[SchedulingWindowResizingCriteria]schedWindowResizingStrategy{ + "fillNextOfferCycle": &fillNextOfferCycle{}, +} + +// Interface for a scheduling window resizing strategy. +type schedWindowResizingStrategy interface { + // Apply the window resizing strategy and return the news scheduling window size. + Apply(func() interface{}) int +} + +// Scheduling window resizing strategy that attempts to resize the scheduling window +// to include as many tasks as possible so as to make the most use of the next offer cycle. +type fillNextOfferCycle struct{} + +func (s *fillNextOfferCycle) Apply(getArgs func() interface{}) int { + return s.apply(getArgs().([]def.Task)) +} + +// Loop over the unscheduled tasks, in submission order, and determine the maximum +// number of tasks that can be scheduled in the next offer cycle. +// As the offers get smaller and smaller, this approach might lead to an increase in internal fragmentation. +// +// Note: To be able to make the most use of the next offer cycle, one would need to perform a non-polynomial search +// which is computationally expensive. +func (s *fillNextOfferCycle) apply(taskQueue []def.Task) int { + clusterwideResourceCount := utilities.GetClusterwideResourceAvailability() + newSchedWindow := 0 + filledCPU := 0.0 + filledRAM := 0.0 + + // Can we schedule another task. + canSchedule := func(t def.Task) bool { + if ((filledCPU + t.CPU) <= clusterwideResourceCount.UnusedCPU) && + ((filledRAM + t.RAM) <= clusterwideResourceCount.UnusedRAM) { + return true + } + return false + } + + for _, task := range taskQueue { + for i := *task.Instances; i > 0; i-- { + if canSchedule(task) { + filledCPU += task.CPU + filledRAM += task.RAM + newSchedWindow++ + } else { + break + } + } + break + } + return newSchedWindow +}