Merged in mapTaskDistrToSchedPolWhenSwitching (pull request #11)

MapTaskDistrToSchedPolWhenSwitching

Approved-by: Akash Kothawale <akothaw1@binghamton.edu>
This commit is contained in:
Pradyumna Kaushik 2018-04-17 20:09:35 +00:00
parent 0f305ab796
commit ae81125110
11 changed files with 255 additions and 65 deletions

View file

@ -77,7 +77,6 @@ func (s *MaxGreedyMins) CheckFit(
}
func (s *MaxGreedyMins) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDriver, offers []*mesos.Offer) {
log.Println("Max-GreedyMins scheduling...")
baseSchedRef := spc.(*BaseScheduler)
if baseSchedRef.schedPolSwitchEnabled {
SortNTasks(baseSchedRef.tasks, baseSchedRef.numTasksInSchedWindow, def.SortByWatts)
@ -112,8 +111,6 @@ func (s *MaxGreedyMins) ConsumeOffers(spc SchedPolicyContext, driver sched.Sched
// If scheduling policy switching enabled, then
// stop scheduling if the #baseSchedRef.schedWindowSize tasks have been scheduled.
if baseSchedRef.schedPolSwitchEnabled && (s.numTasksScheduled >= baseSchedRef.schedWindowSize) {
log.Printf("Stopped scheduling... Completed scheduling %d tasks.",
s.numTasksScheduled)
break // Offers will automatically get declined.
}
task := baseSchedRef.tasks[i]
@ -141,11 +138,6 @@ func (s *MaxGreedyMins) ConsumeOffers(spc SchedPolicyContext, driver sched.Sched
// Pack the rest of the offer with the smallest tasks
for i := 0; i < len(baseSchedRef.tasks); i++ {
// If scheduling policy switching enabled, then
// stop scheduling if the #baseSchedRef.schedWindowSize tasks have been scheduled.
if baseSchedRef.schedPolSwitchEnabled && (s.numTasksScheduled >= baseSchedRef.schedWindowSize) {
break // Offers will automatically get declined.
}
task := baseSchedRef.tasks[i]
wattsConsideration, err := def.WattsToConsider(task, baseSchedRef.classMapWatts, offer)
if err != nil {
@ -159,6 +151,11 @@ func (s *MaxGreedyMins) ConsumeOffers(spc SchedPolicyContext, driver sched.Sched
}
for *task.Instances > 0 {
// If scheduling policy switching enabled, then
// stop scheduling if the #baseSchedRef.schedWindowSize tasks have been scheduled.
if baseSchedRef.schedPolSwitchEnabled && (s.numTasksScheduled >= baseSchedRef.schedWindowSize) {
break // Offers will automatically get declined.
}
// TODO: Fix this so index doesn't need to be passed
taken, taskToSchedule := s.CheckFit(spc, i, task, wattsConsideration, offer,
&totalCPU, &totalRAM, &totalWatts)
@ -183,6 +180,4 @@ func (s *MaxGreedyMins) ConsumeOffers(spc SchedPolicyContext, driver sched.Sched
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
s.switchIfNecessary(spc)
}

View file

@ -76,7 +76,6 @@ func (s *MaxMin) CheckFit(
}
func (s *MaxMin) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDriver, offers []*mesos.Offer) {
log.Println("Max-Min scheduling...")
baseSchedRef := spc.(*BaseScheduler)
if baseSchedRef.schedPolSwitchEnabled {
SortNTasks(baseSchedRef.tasks, baseSchedRef.numTasksInSchedWindow, def.SortByWatts)
@ -116,8 +115,6 @@ func (s *MaxMin) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDri
// stop scheduling if the #baseSchedRef.schedWindowSize tasks have been scheduled.
if baseSchedRef.schedPolSwitchEnabled &&
(s.numTasksScheduled >= baseSchedRef.schedWindowSize) {
log.Printf("Stopped scheduling... Completed scheduling %d tasks.",
s.numTasksScheduled)
break // Offers will automatically get declined.
}
// We need to pick a min task or a max task
@ -173,6 +170,4 @@ func (s *MaxMin) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDri
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
s.switchIfNecessary(spc)
}

View file

@ -58,6 +58,10 @@ type BaseScheduler struct {
// Whether switching of scheduling policies at runtime has been enabled
schedPolSwitchEnabled bool
// Name of the first scheduling policy to be deployed, if provided.
// This scheduling policy would be deployed first regardless of the distribution of tasks in the TaskQueue.
// Note: Scheduling policy switching needs to be enabled.
nameOfFstSchedPolToDeploy string
// Size of window of tasks that can be scheduled in the next offer cycle.
// The window size can be adjusted to make the most use of every resource offer.
@ -186,19 +190,10 @@ func (s *BaseScheduler) ResourceOffers(driver sched.SchedulerDriver, offers []*m
s.HostNameToSlaveID[offer.GetHostname()] = *offer.SlaveId.Value
}
}
// If no resource offers have been received yet, and if scheduling policy switching has been enabled,
// then we would need to compute the size of the scheduling window for the current scheduling policy.
// Initially the size of the scheduling window is 0. So, based on the total available resources on the cluster,
// the size of the window is determined and the scheduling policy is then applied for the corresponding number
// of tasks.
// Subsequently, the size of the scheduling window is determined at the end of each offer cycle.
if s.schedPolSwitchEnabled && !s.hasReceivedResourceOffers {
s.schedWindowSize, s.numTasksInSchedWindow = s.schedWindowResStrategy.Apply(func() interface{} {
return s.tasks
})
}
log.Printf("SchedWindowSize: %d, NumberOfTasksInWindow: %d", s.schedWindowSize, s.numTasksInSchedWindow)
// Switch just before consuming the resource offers.
s.curSchedPolicy.SwitchIfNecessary(s)
s.Log(elecLogDef.GENERAL, fmt.Sprintf("SchedWindowSize[%d], #TasksInWindow[%d]",
s.schedWindowSize, s.numTasksInSchedWindow))
s.curSchedPolicy.ConsumeOffers(s, driver, offers)
s.hasReceivedResourceOffers = true
}
@ -403,8 +398,9 @@ func (s *BaseScheduler) LogTaskStatusUpdate(status *mesos.TaskStatus) {
s.Log(lmt, msg)
}
func (s *BaseScheduler) LogSchedPolicySwitch(name string, nextPolicy SchedPolicyState) {
func (s *BaseScheduler) LogSchedPolicySwitch(taskDist float64, name string, nextPolicy SchedPolicyState) {
if s.curSchedPolicy != nextPolicy {
s.Log(elecLogDef.SPS, name)
s.Log(elecLogDef.GENERAL, fmt.Sprintf("Switching... TaskDistribution[%d] ==> %s", taskDist, name))
}
}

View file

@ -34,7 +34,6 @@ type BinPackSortedWatts struct {
}
func (s *BinPackSortedWatts) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDriver, offers []*mesos.Offer) {
log.Println("BPSW scheduling...")
baseSchedRef := spc.(*BaseScheduler)
if baseSchedRef.schedPolSwitchEnabled {
SortNTasks(baseSchedRef.tasks, baseSchedRef.numTasksInSchedWindow, def.SortByWatts)
@ -78,8 +77,6 @@ func (s *BinPackSortedWatts) ConsumeOffers(spc SchedPolicyContext, driver sched.
// stop scheduling if the #baseSchedRef.schedWindowSize tasks have been scheduled.
if baseSchedRef.schedPolSwitchEnabled &&
(s.numTasksScheduled >= baseSchedRef.schedWindowSize) {
log.Printf("Stopped scheduling... Completed scheduling %d tasks.",
s.numTasksScheduled)
break // Offers will automatically get declined.
}
// Does the task fit
@ -107,7 +104,7 @@ func (s *BinPackSortedWatts) ConsumeOffers(spc SchedPolicyContext, driver sched.
}
}
} else {
break // Continue on to next offer.
break // Continue on to next task
}
}
}
@ -123,6 +120,4 @@ func (s *BinPackSortedWatts) ConsumeOffers(spc SchedPolicyContext, driver sched.
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
s.switchIfNecessary(spc)
}

View file

@ -34,7 +34,6 @@ type FirstFit struct {
}
func (s *FirstFit) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDriver, offers []*mesos.Offer) {
log.Println("FirstFit scheduling...")
baseSchedRef := spc.(*BaseScheduler)
baseSchedRef.LogOffersReceived(offers)
@ -57,8 +56,6 @@ func (s *FirstFit) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerD
// If scheduling policy switching enabled, then
// stop scheduling if the #baseSchedRef.schedWindowSize tasks have been scheduled.
if baseSchedRef.schedPolSwitchEnabled && (s.numTasksScheduled >= baseSchedRef.schedWindowSize) {
log.Printf("Stopped scheduling... Completed scheduling %d tasks.",
s.numTasksScheduled)
break // Offers will automatically get declined.
}
task := baseSchedRef.tasks[i]
@ -104,6 +101,4 @@ func (s *FirstFit) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerD
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
s.switchIfNecessary(spc)
}

View file

@ -124,6 +124,23 @@ func WithSchedPolSwitchEnabled(enableSchedPolicySwitch bool) schedulerOptions {
}
}
func WithNameOfFirstSchedPolToFix(nameOfFirstSchedPol string) schedulerOptions {
return func(s ElectronScheduler) error {
if nameOfFirstSchedPol == "" {
lmt := elecLogDef.WARNING
msgColor := elecLogDef.LogMessageColors[lmt]
msg := msgColor.Sprintf("First scheduling policy to deploy not mentioned. This is now going to be determined at runtime.")
s.(*BaseScheduler).Log(lmt, msg)
return nil
}
if _, ok := SchedPolicies[nameOfFirstSchedPol]; !ok {
return errors.New("Invalid name of scheduling policy.")
}
s.(*BaseScheduler).nameOfFstSchedPolToDeploy = nameOfFirstSchedPol
return nil
}
}
// Launch tasks.
func LaunchTasks(offerIDs []*mesos.OfferID, tasksToLaunch []*mesos.TaskInfo, driver sched.SchedulerDriver) {
driver.LaunchTasks(offerIDs, tasksToLaunch, mesosUtils.DefaultFilter)

View file

@ -1,9 +1,9 @@
package schedulers
import (
"bitbucket.org/sunybingcloud/electron/def"
mesos "github.com/mesos/mesos-go/api/v0/mesosproto"
sched "github.com/mesos/mesos-go/api/v0/scheduler"
"math/rand"
)
type SchedPolicyContext interface {
@ -14,6 +14,13 @@ type SchedPolicyContext interface {
type SchedPolicyState interface {
// Define the particular scheduling policy's methodology of resource offer consumption.
ConsumeOffers(SchedPolicyContext, sched.SchedulerDriver, []*mesos.Offer)
// Get information about the scheduling policy.
GetInfo() (info struct {
taskDist float64
varCpuShare float64
})
// Switch scheduling policy if necessary.
SwitchIfNecessary(SchedPolicyContext)
}
type baseSchedPolicyState struct {
@ -29,26 +36,87 @@ type baseSchedPolicyState struct {
VarianceCpuSharePerTask float64 `json:"varCpuShare"`
}
func (bsps *baseSchedPolicyState) switchIfNecessary(spc SchedPolicyContext) {
func (bsps *baseSchedPolicyState) SwitchIfNecessary(spc SchedPolicyContext) {
baseSchedRef := spc.(*BaseScheduler)
// Switch scheduling policy only if feature enabled from CLI
// Switching scheduling policy only if feature enabled from CLI
if baseSchedRef.schedPolSwitchEnabled {
// Name of the scheduling policy to switch to.
switchToPolicyName := ""
// Need to recompute size of the scheduling window for the next offer cycle.
// The next scheduling policy will schedule at max schedWindowSize number of tasks.
baseSchedRef.schedWindowSize, baseSchedRef.numTasksInSchedWindow =
baseSchedRef.schedWindowResStrategy.Apply(func() interface{} { return baseSchedRef.tasks })
// Switching to a random scheduling policy.
// TODO: Switch based on some criteria.
index := rand.Intn(len(SchedPolicies))
for k, v := range SchedPolicies {
if index == 0 {
baseSchedRef.LogSchedPolicySwitch(k, v)
spc.SwitchSchedPol(v)
// Resetting the number of tasks scheduled.
bsps.numTasksScheduled = 0
break
// Determine the distribution of tasks in the new scheduling window.
taskDist, err := def.GetTaskDistributionInWindow(baseSchedRef.schedWindowSize, baseSchedRef.tasks)
// If no resource offers have been received yet, and
// the name of the first scheduling policy to be deployed is provided,
// we switch to this policy regardless of the task distribution.
if !baseSchedRef.hasReceivedResourceOffers && (baseSchedRef.nameOfFstSchedPolToDeploy != "") {
switchToPolicyName = baseSchedRef.nameOfFstSchedPolToDeploy
} else if err != nil {
// All the tasks in the window were only classified into 1 cluster.
// Max-Min and Max-GreedyMins would work the same way as Bin-Packing for this situation.
// So, we have 2 choices to make. First-Fit or Bin-Packing.
// If choose Bin-Packing, then there might be a performance degradation due to increase in
// resource contention. So, First-Fit might be a better option to cater to the worst case
// where all the tasks are power intensive tasks.
// TODO: Another possibility is to do the exact opposite and choose Bin-Packing.
// TODO[2]: Determine scheduling policy based on the distribution of tasks in the whole queue.
switchToPolicyName = bp
} else {
// The tasks in the scheduling window were classified into 2 clusters, meaning that there is
// some variety in the kind of tasks.
// We now select the scheduling policy which is most appropriate for this distribution of tasks.
first := schedPoliciesToSwitch[0]
last := schedPoliciesToSwitch[len(schedPoliciesToSwitch)-1]
if taskDist < first.sp.GetInfo().taskDist {
switchToPolicyName = first.spName
} else if taskDist > last.sp.GetInfo().taskDist {
switchToPolicyName = last.spName
} else {
low := 0
high := len(schedPoliciesToSwitch) - 1
for low <= high {
mid := (low + high) / 2
if taskDist < schedPoliciesToSwitch[mid].sp.GetInfo().taskDist {
high = mid - 1
} else if taskDist > schedPoliciesToSwitch[mid].sp.GetInfo().taskDist {
low = mid + 1
} else {
switchToPolicyName = schedPoliciesToSwitch[mid].spName
break
}
}
// We're here if low == high+1.
// If haven't yet found the closest match.
if switchToPolicyName == "" {
lowDiff := schedPoliciesToSwitch[low].sp.GetInfo().taskDist - taskDist
highDiff := taskDist - schedPoliciesToSwitch[high].sp.GetInfo().taskDist
if lowDiff > highDiff {
switchToPolicyName = schedPoliciesToSwitch[high].spName
} else if highDiff > lowDiff {
switchToPolicyName = schedPoliciesToSwitch[low].spName
} else {
// index doens't matter as the values at high and low are equidistant
// from taskDist.
switchToPolicyName = schedPoliciesToSwitch[high].spName
}
}
}
index--
}
// Switching scheduling policy.
baseSchedRef.LogSchedPolicySwitch(taskDist, switchToPolicyName, SchedPolicies[switchToPolicyName])
baseSchedRef.SwitchSchedPol(SchedPolicies[switchToPolicyName])
// Resetting the number of tasks scheduled.
bsps.numTasksScheduled = 0
}
}
func (bsps *baseSchedPolicyState) GetInfo() (info struct {
taskDist float64
varCpuShare float64
}) {
info.taskDist = bsps.TaskDistribution
info.varCpuShare = bsps.VarianceCpuSharePerTask
return info
}

View file

@ -1,10 +1,12 @@
package schedulers
import (
"bitbucket.org/sunybingcloud/electron/utilities"
"encoding/json"
sched "github.com/mesos/mesos-go/api/v0/scheduler"
"github.com/pkg/errors"
"os"
"sort"
)
// Names of different scheduling policies.
@ -23,6 +25,15 @@ var SchedPolicies map[string]SchedPolicyState = map[string]SchedPolicyState{
mm: &MaxMin{},
}
// Scheduling policies to choose when switching
var schedPoliciesToSwitch map[int]struct {
spName string
sp SchedPolicyState
} = make(map[int]struct {
spName string
sp SchedPolicyState
})
// Initialize scheduling policy characteristics using the provided config file.
func InitSchedPolicyCharacteristics(schedPoliciesConfigFilename string) error {
var schedPolConfig map[string]baseSchedPolicyState
@ -52,6 +63,31 @@ func InitSchedPolicyCharacteristics(schedPoliciesConfigFilename string) error {
t.VarianceCpuSharePerTask = schedPolConfig[schedPolName].VarianceCpuSharePerTask
}
}
// Initialize schedPoliciesToSwitch to allow binary searching for scheduling policy switching.
spInformation := map[string]float64{}
for spName, sp := range SchedPolicies {
spInformation[spName] = sp.GetInfo().taskDist
}
spInformationPairList := utilities.GetPairList(spInformation)
// Sorting spInformationPairList in non-increasing order of taskDist.
sort.SliceStable(spInformationPairList, func(i, j int) bool {
return spInformationPairList[i].Value < spInformationPairList[j].Value
})
// Initializing scheduling policies that are setup for switching.
index := 0
for _, spInformationPair := range spInformationPairList {
if spInformationPair.Value != 0 {
schedPoliciesToSwitch[index] = struct {
spName string
sp SchedPolicyState
}{
spName: spInformationPair.Key,
sp: SchedPolicies[spInformationPair.Key],
}
index++
}
}
}
return nil