added utility to compute the scheduling window. Right now there's only criteria on which this is determined -- fillNextOfferCycle. So, the schedWindow is the max number of tasks, that aren't yet scheduled, whose aggregate resource requirement is as close as possible to the resource available in the next round of resource offers. To be able to make the most use of the next offer cycle, one would need to perform a non-polynomial search of the TaskQueue and as this is computationally expensive, a linear search is performed on the TaskQueue. Retrofitted scheduling policies to also call utilities.schedUtils#schedWindowResizingStrategy#Apply before switching to a new scheduling policy.
This commit is contained in:
parent
6f0f3788b9
commit
3ebd7b0c7e
12 changed files with 115 additions and 15 deletions
|
@ -1,9 +1,9 @@
|
|||
package pcp
|
||||
|
||||
import (
|
||||
elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def"
|
||||
"bitbucket.org/sunybingcloud/elektron/pcp"
|
||||
"bitbucket.org/sunybingcloud/elektron/rapl"
|
||||
elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def"
|
||||
"bufio"
|
||||
"container/ring"
|
||||
"fmt"
|
||||
|
@ -130,7 +130,7 @@ func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, hiThresh
|
|||
cappedHosts[victim.Host] = true
|
||||
orderCapped = append(orderCapped, victim.Host)
|
||||
logMType <- elecLogDef.GENERAL
|
||||
logMsg <- fmt.Sprintf("Capping Victim %s Avg. Wattage: %f", victim.Host, victim.Watts * pcp.RAPLUnits)
|
||||
logMsg <- fmt.Sprintf("Capping Victim %s Avg. Wattage: %f", victim.Host, victim.Watts*pcp.RAPLUnits)
|
||||
if err := rapl.Cap(victim.Host, "rapl", 50); err != nil {
|
||||
logMType <- elecLogDef.ERROR
|
||||
logMsg <- "Error capping host"
|
||||
|
|
|
@ -2,10 +2,10 @@ package pcp
|
|||
|
||||
import (
|
||||
"bitbucket.org/sunybingcloud/elektron/constants"
|
||||
elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def"
|
||||
"bitbucket.org/sunybingcloud/elektron/pcp"
|
||||
"bitbucket.org/sunybingcloud/elektron/rapl"
|
||||
"bitbucket.org/sunybingcloud/elektron/utilities"
|
||||
elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def"
|
||||
"bufio"
|
||||
"container/ring"
|
||||
"fmt"
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
package rapl
|
||||
|
||||
import (
|
||||
elekEnv "bitbucket.org/sunybingcloud/elektron/environment"
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/crypto/ssh"
|
||||
"strconv"
|
||||
"os"
|
||||
elekEnv "bitbucket.org/sunybingcloud/elektron/environment"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
func Cap(host, username string, percentage float64) error {
|
||||
|
|
|
@ -2,9 +2,9 @@ package main
|
|||
|
||||
import (
|
||||
"bitbucket.org/sunybingcloud/elektron/def"
|
||||
elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def"
|
||||
"bitbucket.org/sunybingcloud/elektron/pcp"
|
||||
"bitbucket.org/sunybingcloud/elektron/schedulers"
|
||||
elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def"
|
||||
"flag"
|
||||
"fmt"
|
||||
"github.com/golang/protobuf/proto"
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"bitbucket.org/sunybingcloud/elektron/def"
|
||||
"bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils"
|
||||
"bitbucket.org/sunybingcloud/elektron/utilities/offerUtils"
|
||||
"bitbucket.org/sunybingcloud/elektron/utilities/schedUtils"
|
||||
"fmt"
|
||||
mesos "github.com/mesos/mesos-go/api/v0/mesosproto"
|
||||
sched "github.com/mesos/mesos-go/api/v0/scheduler"
|
||||
|
@ -174,6 +175,11 @@ func (s *MaxGreedyMins) ConsumeOffers(spc SchedPolicyContext, driver sched.Sched
|
|||
|
||||
// Switch scheduling policy only if feature enabled from CLI
|
||||
if baseSchedRef.schedPolSwitchEnabled {
|
||||
// Need to recompute the schedWindow for the next offer cycle.
|
||||
// The next scheduling policy will schedule at max schedWindow number of tasks.
|
||||
baseSchedRef.schedWindow = schedUtils.
|
||||
SchedWindowResizingCritToStrategy[baseSchedRef.schedWindowResizeCrit].Apply(
|
||||
func() interface{} { return baseSchedRef.tasks })
|
||||
// Switching to a random scheduling policy.
|
||||
// TODO: Switch based on some criteria.
|
||||
index := rand.Intn(len(SchedPolicies))
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"bitbucket.org/sunybingcloud/elektron/def"
|
||||
"bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils"
|
||||
"bitbucket.org/sunybingcloud/elektron/utilities/offerUtils"
|
||||
"bitbucket.org/sunybingcloud/elektron/utilities/schedUtils"
|
||||
"fmt"
|
||||
mesos "github.com/mesos/mesos-go/api/v0/mesosproto"
|
||||
sched "github.com/mesos/mesos-go/api/v0/scheduler"
|
||||
|
@ -168,6 +169,11 @@ func (s *MaxMin) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDri
|
|||
|
||||
// Switch scheduling policy only if feature enabled from CLI
|
||||
if baseSchedRef.schedPolSwitchEnabled {
|
||||
// Need to recompute the schedWindow for the next offer cycle.
|
||||
// The next scheduling policy will schedule at max schedWindow number of tasks.
|
||||
baseSchedRef.schedWindow = schedUtils.
|
||||
SchedWindowResizingCritToStrategy[baseSchedRef.schedWindowResizeCrit].Apply(
|
||||
func() interface{} { return baseSchedRef.tasks })
|
||||
// Switching to a random scheduling policy.
|
||||
// TODO: Switch based on some criteria.
|
||||
index := rand.Intn(len(SchedPolicies))
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"bitbucket.org/sunybingcloud/elektron/def"
|
||||
elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def"
|
||||
"bitbucket.org/sunybingcloud/elektron/utilities"
|
||||
"bitbucket.org/sunybingcloud/elektron/utilities/schedUtils"
|
||||
"bytes"
|
||||
"fmt"
|
||||
"github.com/golang/protobuf/proto"
|
||||
|
@ -55,6 +56,17 @@ type BaseScheduler struct {
|
|||
|
||||
// Whether switching of scheduling policies at runtime has been enabled
|
||||
schedPolSwitchEnabled bool
|
||||
|
||||
// Size of window of tasks that can be scheduled in the next offer cycle.
|
||||
// The window size can be adjusted to make the most use of every resource offer.
|
||||
// By default, the schedulingWindow would correspond to all the remaining tasks that haven't yet been scheduled.
|
||||
schedulingWindow int
|
||||
|
||||
// Criteria to resize the schedulingWindow.
|
||||
schedWindowResizeCrit schedUtils.SchedulingWindowResizingCriteria
|
||||
// Window of tasks that the current scheduling policy has to schedule.
|
||||
// Once #schedWindow tasks are scheduled, the current scheduling policy has to stop scheduling.
|
||||
schedWindow int
|
||||
}
|
||||
|
||||
func (s *BaseScheduler) init(opts ...schedPolicyOption) {
|
||||
|
@ -66,6 +78,7 @@ func (s *BaseScheduler) init(opts ...schedPolicyOption) {
|
|||
}
|
||||
s.running = make(map[string]map[string]bool)
|
||||
s.mutex = sync.Mutex{}
|
||||
s.schedWindowResizeCrit = "fillNextOfferCycle"
|
||||
}
|
||||
|
||||
func (s *BaseScheduler) SwitchSchedPol(newSchedPol SchedPolicyState) {
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"bitbucket.org/sunybingcloud/elektron/def"
|
||||
"bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils"
|
||||
"bitbucket.org/sunybingcloud/elektron/utilities/offerUtils"
|
||||
"bitbucket.org/sunybingcloud/elektron/utilities/schedUtils"
|
||||
"fmt"
|
||||
mesos "github.com/mesos/mesos-go/api/v0/mesosproto"
|
||||
sched "github.com/mesos/mesos-go/api/v0/scheduler"
|
||||
|
@ -118,6 +119,11 @@ func (s *BinPackSortedWatts) ConsumeOffers(spc SchedPolicyContext, driver sched.
|
|||
|
||||
// Switch scheduling policy only if feature enabled from CLI
|
||||
if baseSchedRef.schedPolSwitchEnabled {
|
||||
// Need to recompute the schedWindow for the next offer cycle.
|
||||
// The next scheduling policy will schedule at max schedWindow number of tasks.
|
||||
baseSchedRef.schedWindow = schedUtils.
|
||||
SchedWindowResizingCritToStrategy[baseSchedRef.schedWindowResizeCrit].Apply(
|
||||
func() interface{} { return baseSchedRef.tasks })
|
||||
// Switching to a random scheduling policy.
|
||||
// TODO: Switch based on some criteria.
|
||||
index := rand.Intn(len(SchedPolicies))
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"bitbucket.org/sunybingcloud/elektron/def"
|
||||
"bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils"
|
||||
"bitbucket.org/sunybingcloud/elektron/utilities/offerUtils"
|
||||
"bitbucket.org/sunybingcloud/elektron/utilities/schedUtils"
|
||||
"fmt"
|
||||
mesos "github.com/mesos/mesos-go/api/v0/mesosproto"
|
||||
sched "github.com/mesos/mesos-go/api/v0/scheduler"
|
||||
|
@ -103,6 +104,11 @@ func (s *FirstFit) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerD
|
|||
|
||||
// Switch scheduling policy only if feature enabled from CLI
|
||||
if baseSchedRef.schedPolSwitchEnabled {
|
||||
// Need to recompute the schedWindow for the next offer cycle.
|
||||
// The next scheduling policy will schedule at max schedWindow number of tasks.
|
||||
baseSchedRef.schedWindow = schedUtils.
|
||||
SchedWindowResizingCritToStrategy[baseSchedRef.schedWindowResizeCrit].Apply(
|
||||
func() interface{} { return baseSchedRef.tasks })
|
||||
// Switching to a random scheduling policy.
|
||||
// TODO: Switch based on some criteria.
|
||||
index := rand.Intn(len(SchedPolicies))
|
||||
|
|
|
@ -125,7 +125,7 @@ func WithSchedPolSwitchEnabled(enableSchedPolicySwitch bool) schedPolicyOption {
|
|||
}
|
||||
}
|
||||
|
||||
// Launch tasks.
|
||||
// Launch tasks and also update the resource availability for the corresponding host.
|
||||
func LaunchTasks(offerIDs []*mesos.OfferID, tasksToLaunch []*mesos.TaskInfo, driver sched.SchedulerDriver) error {
|
||||
driver.LaunchTasks(offerIDs, tasksToLaunch, mesosUtils.DefaultFilter)
|
||||
// Update resource availability
|
||||
|
|
|
@ -6,18 +6,18 @@ import (
|
|||
|
||||
// Names of different scheduling policies.
|
||||
const (
|
||||
ff = "first-fit"
|
||||
bp = "bin-packing"
|
||||
mgm = "max-greedymins"
|
||||
mm = "max-min"
|
||||
ff = "first-fit"
|
||||
bp = "bin-packing"
|
||||
mgm = "max-greedymins"
|
||||
mm = "max-min"
|
||||
)
|
||||
|
||||
// Scheduling policy factory
|
||||
var SchedPolicies map[string]SchedPolicyState = map[string]SchedPolicyState{
|
||||
ff: &FirstFit{},
|
||||
bp: &BinPackSortedWatts{},
|
||||
mgm: &MaxGreedyMins{},
|
||||
mm: &MaxMin{},
|
||||
ff: &FirstFit{},
|
||||
bp: &BinPackSortedWatts{},
|
||||
mgm: &MaxGreedyMins{},
|
||||
mm: &MaxMin{},
|
||||
}
|
||||
|
||||
// build the scheduling policy with the options being applied
|
||||
|
|
63
utilities/schedUtils/schedUtils.go
Normal file
63
utilities/schedUtils/schedUtils.go
Normal file
|
@ -0,0 +1,63 @@
|
|||
package schedUtils
|
||||
|
||||
import (
|
||||
"bitbucket.org/sunybingcloud/elektron/def"
|
||||
"bitbucket.org/sunybingcloud/elektron/utilities"
|
||||
)
|
||||
|
||||
// Criteria for resizing the scheduling window.
|
||||
type SchedulingWindowResizingCriteria string
|
||||
|
||||
var SchedWindowResizingCritToStrategy = map[SchedulingWindowResizingCriteria]schedWindowResizingStrategy{
|
||||
"fillNextOfferCycle": &fillNextOfferCycle{},
|
||||
}
|
||||
|
||||
// Interface for a scheduling window resizing strategy.
|
||||
type schedWindowResizingStrategy interface {
|
||||
// Apply the window resizing strategy and return the news scheduling window size.
|
||||
Apply(func() interface{}) int
|
||||
}
|
||||
|
||||
// Scheduling window resizing strategy that attempts to resize the scheduling window
|
||||
// to include as many tasks as possible so as to make the most use of the next offer cycle.
|
||||
type fillNextOfferCycle struct{}
|
||||
|
||||
func (s *fillNextOfferCycle) Apply(getArgs func() interface{}) int {
|
||||
return s.apply(getArgs().([]def.Task))
|
||||
}
|
||||
|
||||
// Loop over the unscheduled tasks, in submission order, and determine the maximum
|
||||
// number of tasks that can be scheduled in the next offer cycle.
|
||||
// As the offers get smaller and smaller, this approach might lead to an increase in internal fragmentation.
|
||||
//
|
||||
// Note: To be able to make the most use of the next offer cycle, one would need to perform a non-polynomial search
|
||||
// which is computationally expensive.
|
||||
func (s *fillNextOfferCycle) apply(taskQueue []def.Task) int {
|
||||
clusterwideResourceCount := utilities.GetClusterwideResourceAvailability()
|
||||
newSchedWindow := 0
|
||||
filledCPU := 0.0
|
||||
filledRAM := 0.0
|
||||
|
||||
// Can we schedule another task.
|
||||
canSchedule := func(t def.Task) bool {
|
||||
if ((filledCPU + t.CPU) <= clusterwideResourceCount.UnusedCPU) &&
|
||||
((filledRAM + t.RAM) <= clusterwideResourceCount.UnusedRAM) {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
for _, task := range taskQueue {
|
||||
for i := *task.Instances; i > 0; i-- {
|
||||
if canSchedule(task) {
|
||||
filledCPU += task.CPU
|
||||
filledRAM += task.RAM
|
||||
newSchedWindow++
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
return newSchedWindow
|
||||
}
|
Reference in a new issue