Merged in differentSwitchingMechanisms (pull request #14)

DifferentSwitchingMechanisms

Approved-by: Akash Kothawale <akothaw1@binghamton.edu>
This commit is contained in:
Pradyumna Kaushik 2018-04-17 23:44:36 +00:00
parent 1bee742588
commit 66c19b53c9
13 changed files with 172 additions and 31 deletions

View file

@ -62,6 +62,8 @@ type BaseScheduler struct {
// This scheduling policy would be deployed first regardless of the distribution of tasks in the TaskQueue.
// Note: Scheduling policy switching needs to be enabled.
nameOfFstSchedPolToDeploy string
// Scheduling policy switching criteria.
schedPolSwitchCriteria string
// Size of window of tasks that can be scheduled in the next offer cycle.
// The window size can be adjusted to make the most use of every resource offer.
@ -195,8 +197,8 @@ func (s *BaseScheduler) ResourceOffers(driver sched.SchedulerDriver, offers []*m
}
// Switch just before consuming the resource offers.
s.curSchedPolicy.SwitchIfNecessary(s)
s.Log(elecLogDef.GENERAL, fmt.Sprintf("SchedWindowSize[%d], #TasksInWindow[%d]",
s.schedWindowSize, s.numTasksInSchedWindow))
// s.Log(elecLogDef.GENERAL, fmt.Sprintf("SchedWindowSize[%d], #TasksInWindow[%d]",
// s.schedWindowSize, s.numTasksInSchedWindow))
s.curSchedPolicy.ConsumeOffers(s, driver, offers)
s.hasReceivedResourceOffers = true
}
@ -211,16 +213,16 @@ func (s *BaseScheduler) StatusUpdate(driver sched.SchedulerDriver, status *mesos
}
// Add task to list of tasks running on node
s.Running[*status.SlaveId.Value][*status.TaskId.Value] = true
s.TasksRunningMutex.Unlock()
s.tasksRunning++
s.TasksRunningMutex.Unlock()
} else if IsTerminal(status.State) {
// Update resource availability.
utilities.ResourceAvailabilityUpdate("ON_TASK_TERMINAL_STATE",
*status.TaskId, *status.SlaveId)
s.TasksRunningMutex.Lock()
delete(s.Running[*status.SlaveId.Value], *status.TaskId.Value)
s.TasksRunningMutex.Unlock()
s.tasksRunning--
s.TasksRunningMutex.Unlock()
if s.tasksRunning == 0 {
select {
case <-s.Shutdown:
@ -401,16 +403,18 @@ func (s *BaseScheduler) LogTaskStatusUpdate(status *mesos.TaskStatus) {
s.Log(lmt, msg)
}
func (s *BaseScheduler) LogSchedPolicySwitch(taskDist float64, name string, nextPolicy SchedPolicyState) {
log := func() {
func (s *BaseScheduler) LogSchedPolicySwitch(name string, nextPolicy SchedPolicyState) {
logSPS := func() {
s.Log(elecLogDef.SPS, name)
s.Log(elecLogDef.GENERAL, fmt.Sprintf("Switching... TaskDistribution[%f] ==> %s", taskDist, name))
}
if s.hasReceivedResourceOffers && (s.curSchedPolicy != nextPolicy) {
log()
logSPS()
} else if !s.hasReceivedResourceOffers {
log()
logSPS()
}
// Logging the size of the scheduling window and the scheduling policy
// that is going to schedule the tasks in the scheduling window.
s.Log(elecLogDef.SCHED_WINDOW, fmt.Sprintf("%d %s", s.schedWindowSize, name))
}
func (s *BaseScheduler) LogClsfnAndTaskDistOverhead(overhead time.Duration) {

View file

@ -72,7 +72,7 @@ type ElectronScheduler interface {
// Log Status update of a task
LogTaskStatusUpdate(status *mesos.TaskStatus)
// Log Scheduling policy switches (if any)
LogSchedulingPolicySwitch()
LogSchedPolicySwitch(name string, nextPolicy SchedPolicyState)
// Log the computation overhead of classifying tasks in the scheduling window.
LogClsfnAndTaskDistOverhead(overhead time.Duration)
}

View file

@ -117,9 +117,14 @@ func WithLoggingChannels(lmt chan elecLogDef.LogMessageType, msg chan string) sc
}
}
func WithSchedPolSwitchEnabled(enableSchedPolicySwitch bool) schedulerOptions {
func WithSchedPolSwitchEnabled(enableSchedPolicySwitch bool, switchingCriteria string) schedulerOptions {
return func(s ElectronScheduler) error {
s.(*BaseScheduler).schedPolSwitchEnabled = enableSchedPolicySwitch
// Checking if valid switching criteria.
if _, ok := switchBasedOn[switchingCriteria]; !ok {
return errors.New("Invalid scheduling policy switching criteria.")
}
s.(*BaseScheduler).schedPolSwitchCriteria = switchingCriteria
return nil
}
}

View file

@ -2,6 +2,8 @@ package schedulers
import (
"bitbucket.org/sunybingcloud/elektron/def"
elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def"
"fmt"
mesos "github.com/mesos/mesos-go/api/v0/mesosproto"
sched "github.com/mesos/mesos-go/api/v0/scheduler"
"log"
@ -18,8 +20,15 @@ type SchedPolicyState interface {
ConsumeOffers(SchedPolicyContext, sched.SchedulerDriver, []*mesos.Offer)
// Get information about the scheduling policy.
GetInfo() (info struct {
taskDist float64
varCpuShare float64
taskDist float64
varCpuShare float64
nextPolicyName string
prevPolicyName string
})
// Update links to next and previous scheduling policy.
UpdateLinks(info struct {
nextPolicyName string
prevPolicyName string
})
// Switch scheduling policy if necessary.
SwitchIfNecessary(SchedPolicyContext)
@ -36,9 +45,23 @@ type baseSchedPolicyState struct {
// The average variance in cpu-share per task that this scheduling policy can cause.
// Note: This number corresponds to a given workload.
VarianceCpuSharePerTask float64 `json:"varCpuShare"`
// Next and Previous scheduling policy in round-robin order.
// This order is determined by the sorted order (non-decreasing or non-increasing) of taskDistribution.
nextPolicyName string
prevPolicyName string
}
func (bsps *baseSchedPolicyState) nextPolicy(baseSchedRef *BaseScheduler) (string, float64) {
// Scheduling policy switching criteria.
// Takes a pointer to the BaseScheduler and returns the name of the scheduling policy to switch to.
type switchBy func(*BaseScheduler) string
var switchBasedOn map[string]switchBy = map[string]switchBy{
"taskDist": switchTaskDistBased,
"round-robin": switchRoundRobinBased,
"rev-round-robin": switchRevRoundRobinBased,
}
func switchTaskDistBased(baseSchedRef *BaseScheduler) string {
// Name of the scheduling policy to switch to.
switchToPolicyName := ""
// Record overhead to classify the tasks in the scheduling window and using the classification results
@ -47,6 +70,7 @@ func (bsps *baseSchedPolicyState) nextPolicy(baseSchedRef *BaseScheduler) (strin
// Determine the distribution of tasks in the new scheduling window.
taskDist, err := def.GetTaskDistributionInWindow(baseSchedRef.schedWindowSize, baseSchedRef.tasks)
baseSchedRef.LogClsfnAndTaskDistOverhead(time.Now().Sub(startTime))
baseSchedRef.Log(elecLogDef.GENERAL, fmt.Sprintf("Switching... TaskDistribution[%f]", taskDist))
if err != nil {
// All the tasks in the window were only classified into 1 cluster.
// Max-Min and Max-GreedyMins would work the same way as Bin-Packing for this situation.
@ -98,7 +122,27 @@ func (bsps *baseSchedPolicyState) nextPolicy(baseSchedRef *BaseScheduler) (strin
}
}
}
return switchToPolicyName, taskDist
return switchToPolicyName
}
// Switching based on a round-robin approach.
// Not being considerate to the state of TaskQueue or the state of the cluster.
func switchRoundRobinBased(baseSchedRef *BaseScheduler) string {
// If haven't received any resource offers.
if !baseSchedRef.hasReceivedResourceOffers {
return schedPoliciesToSwitch[0].spName
}
return baseSchedRef.curSchedPolicy.GetInfo().nextPolicyName
}
// Switching based on a round-robin approach, but in the reverse order.
// Not being considerate to the state of the TaskQueue or the state of the cluster.
func switchRevRoundRobinBased(baseSchedRef *BaseScheduler) string {
// If haven't received any resource offers.
if !baseSchedRef.hasReceivedResourceOffers {
return schedPoliciesToSwitch[len(schedPoliciesToSwitch)-1].spName
}
return baseSchedRef.curSchedPolicy.GetInfo().prevPolicyName
}
func (bsps *baseSchedPolicyState) SwitchIfNecessary(spc SchedPolicyContext) {
@ -107,19 +151,20 @@ func (bsps *baseSchedPolicyState) SwitchIfNecessary(spc SchedPolicyContext) {
if baseSchedRef.schedPolSwitchEnabled {
// Name of scheduling policy to switch to.
switchToPolicyName := ""
// Distribution of tasks in the scheduling window
var taskDist float64
// Size of the new scheduling window.
newSchedWindowSize := 0
// If scheduling window has not been fixed, then determine the scheduling window based on the current
// availability of resources on the cluster (Mesos perspective).
if !baseSchedRef.toFixSchedWindow {
// Need to compute the size of the scheduling window.
// The next scheduling policy will schedule at max schedWindowSize number of tasks.
baseSchedRef.schedWindowSize, baseSchedRef.numTasksInSchedWindow =
newSchedWindowSize, baseSchedRef.numTasksInSchedWindow =
baseSchedRef.schedWindowResStrategy.Apply(func() interface{} { return baseSchedRef.tasks })
}
// Now, we need to switch if the scheduling window is > 0.
if baseSchedRef.schedWindowSize > 0 {
// Now, we need to switch if the new scheduling window is > 0.
if (!baseSchedRef.toFixSchedWindow && (newSchedWindowSize > 0)) ||
(baseSchedRef.toFixSchedWindow && (baseSchedRef.schedWindowSize > 0)) {
// If we haven't received any resource offers, then
// check whether we need to fix the first scheduling policy to deploy.
// If not, then determine the first scheduling policy based on the distribution of tasks
@ -133,12 +178,24 @@ func (bsps *baseSchedPolicyState) SwitchIfNecessary(spc SchedPolicyContext) {
if !baseSchedRef.hasReceivedResourceOffers {
if baseSchedRef.nameOfFstSchedPolToDeploy != "" {
switchToPolicyName = baseSchedRef.nameOfFstSchedPolToDeploy
if !baseSchedRef.toFixSchedWindow {
baseSchedRef.schedWindowSize = newSchedWindowSize
}
} else {
switchToPolicyName, taskDist = bsps.nextPolicy(baseSchedRef)
// Decided to switch, so updating the window size.
if !baseSchedRef.toFixSchedWindow {
baseSchedRef.schedWindowSize = newSchedWindowSize
}
switchToPolicyName = switchBasedOn[baseSchedRef.schedPolSwitchCriteria](baseSchedRef)
}
} else {
// Checking if the currently deployed scheduling policy has scheduled all the tasks in the window.
if bsps.numTasksScheduled >= baseSchedRef.schedWindowSize {
switchToPolicyName, taskDist = bsps.nextPolicy(baseSchedRef)
// Decided to switch, so updating the window size.
if !baseSchedRef.toFixSchedWindow {
baseSchedRef.schedWindowSize = newSchedWindowSize
}
switchToPolicyName = switchBasedOn[baseSchedRef.schedPolSwitchCriteria](baseSchedRef)
} else {
// We continue working with the currently deployed scheduling policy.
log.Println("Continuing with the current scheduling policy...")
@ -147,6 +204,12 @@ func (bsps *baseSchedPolicyState) SwitchIfNecessary(spc SchedPolicyContext) {
return
}
}
// Switching scheduling policy.
baseSchedRef.LogSchedPolicySwitch(switchToPolicyName, SchedPolicies[switchToPolicyName])
baseSchedRef.SwitchSchedPol(SchedPolicies[switchToPolicyName])
// Resetting the number of tasks scheduled as this is a new scheduling policy that has been
// deployed.
bsps.numTasksScheduled = 0
} else {
// We continue working with the currently deployed scheduling policy.
log.Println("Continuing with the current scheduling policy...")
@ -154,20 +217,26 @@ func (bsps *baseSchedPolicyState) SwitchIfNecessary(spc SchedPolicyContext) {
baseSchedRef.schedWindowSize)
return
}
// Switching scheduling policy.
baseSchedRef.LogSchedPolicySwitch(taskDist, switchToPolicyName, SchedPolicies[switchToPolicyName])
baseSchedRef.SwitchSchedPol(SchedPolicies[switchToPolicyName])
// Resetting the number of tasks scheduled as this is a new scheduling policy that has been
// deployed.
bsps.numTasksScheduled = 0
}
}
func (bsps *baseSchedPolicyState) GetInfo() (info struct {
taskDist float64
varCpuShare float64
taskDist float64
varCpuShare float64
nextPolicyName string
prevPolicyName string
}) {
info.taskDist = bsps.TaskDistribution
info.varCpuShare = bsps.VarianceCpuSharePerTask
info.nextPolicyName = bsps.nextPolicyName
info.prevPolicyName = bsps.prevPolicyName
return info
}
func (bsps *baseSchedPolicyState) UpdateLinks(info struct {
nextPolicyName string
prevPolicyName string
}) {
bsps.nextPolicyName = info.nextPolicyName
bsps.prevPolicyName = info.prevPolicyName
}

View file

@ -88,6 +88,22 @@ func InitSchedPolicyCharacteristics(schedPoliciesConfigFilename string) error {
index++
}
}
// Initializing the next and previous policy based on the the round-robin ordering.
// The next policy for policy at N would correspond to the value at index N+1 in schedPoliciesToSwitch.
for curPolicyIndex := 0; curPolicyIndex < len(schedPoliciesToSwitch); curPolicyIndex++ {
info := struct {
nextPolicyName string
prevPolicyName string
}{}
if curPolicyIndex == 0 {
info.prevPolicyName = schedPoliciesToSwitch[len(schedPoliciesToSwitch)-1].spName
} else {
info.prevPolicyName = schedPoliciesToSwitch[curPolicyIndex-1].spName
}
info.nextPolicyName = schedPoliciesToSwitch[(curPolicyIndex+1)%len(schedPoliciesToSwitch)].spName
schedPoliciesToSwitch[curPolicyIndex].sp.UpdateLinks(info)
}
}
return nil