diff --git a/scheduler.go b/scheduler.go index 14eef78..af3791a 100644 --- a/scheduler.go +++ b/scheduler.go @@ -29,6 +29,8 @@ var listSchedPolicies = flag.Bool("listSchedPolicies", false, "List the names of var enableSchedPolicySwitch = flag.Bool("switchSchedPolicy", false, "Enable switching of scheduling policies at runtime.") var schedPolConfigFile = flag.String("schedPolConfig", "", "Config file that contains information for each scheduling policy.") var fixFirstSchedPol = flag.String("fixFirstSchedPol", "", "Name of the scheduling policy to be deployed first, regardless of the distribution of tasks, provided switching is enabled.") +var fixSchedWindow = flag.Bool("fixSchedWindow", false, "Fix the size of the scheduling window that every deployed scheduling policy should schedule, provided switching is enabled.") +var schedWindowSize = flag.Int("schedWindowSize", 200, "Size of the scheduling window if fixSchedWindow is set.") // Short hand args func init() { @@ -43,7 +45,9 @@ func init() { flag.BoolVar(listSchedPolicies, "lsp", false, "Names of the pluaggable scheduling policies. (shorthand)") flag.BoolVar(enableSchedPolicySwitch, "ssp", false, "Enable switching of scheduling policies at runtime.") flag.StringVar(schedPolConfigFile, "spConfig", "", "Config file that contains information for each scheduling policy (shorthand).") - flag.StringVar(fixFirstSchedPol, "fxFstSchedPol", "", "Name of the schedulin gpolicy to be deployed first, regardless of the distribution of tasks, provided switching is enabled (shorthand).") + flag.StringVar(fixFirstSchedPol, "fxFstSchedPol", "", "Name of the scheduling gpolicy to be deployed first, regardless of the distribution of tasks, provided switching is enabled (shorthand).") + flag.BoolVar(fixSchedWindow, "fixSw", false, "Fix the size of the scheduling window that every deployed scheduling policy should schedule, provided switching is enabled (shorthand).") + flag.IntVar(schedWindowSize, "swSize", 200, "Size of the scheduling window if fixSchedWindow is set (shorthand).") } func listAllSchedulingPolicies() { @@ -138,7 +142,8 @@ func main() { schedulers.WithPCPLog(pcpLog), schedulers.WithLoggingChannels(logMType, logMsg), schedulers.WithSchedPolSwitchEnabled(*enableSchedPolicySwitch), - schedulers.WithNameOfFirstSchedPolToFix(*fixFirstSchedPol)) + schedulers.WithNameOfFirstSchedPolToFix(*fixFirstSchedPol), + schedulers.WithFixedSchedulingWindow(*fixSchedWindow, *schedWindowSize)) driver, err := sched.NewMesosSchedulerDriver(sched.DriverConfig{ Master: *master, Framework: &mesos.FrameworkInfo{ diff --git a/schedulers/base.go b/schedulers/base.go index cae29e5..0995ee8 100644 --- a/schedulers/base.go +++ b/schedulers/base.go @@ -71,6 +71,9 @@ type BaseScheduler struct { // Number of tasks in the window numTasksInSchedWindow int + // Whether the scheduling window needs to be fixed. + toFixSchedWindow bool // If yes, then schedWindowSize is initialized and kept unchanged. + // Strategy to resize the schedulingWindow. schedWindowResStrategy schedUtils.SchedWindowResizingStrategy @@ -399,9 +402,14 @@ func (s *BaseScheduler) LogTaskStatusUpdate(status *mesos.TaskStatus) { } func (s *BaseScheduler) LogSchedPolicySwitch(taskDist float64, name string, nextPolicy SchedPolicyState) { - if s.curSchedPolicy != nextPolicy { + log := func() { s.Log(elecLogDef.SPS, name) - s.Log(elecLogDef.GENERAL, fmt.Sprintf("Switching... TaskDistribution[%d] ==> %s", taskDist, name)) + s.Log(elecLogDef.GENERAL, fmt.Sprintf("Switching... TaskDistribution[%f] ==> %s", taskDist, name)) + } + if s.hasReceivedResourceOffers && (s.curSchedPolicy != nextPolicy) { + log() + } else if !s.hasReceivedResourceOffers { + log() } } diff --git a/schedulers/helpers.go b/schedulers/helpers.go index 4a5ada2..ae82eb0 100644 --- a/schedulers/helpers.go +++ b/schedulers/helpers.go @@ -141,6 +141,26 @@ func WithNameOfFirstSchedPolToFix(nameOfFirstSchedPol string) schedulerOptions { } } +func WithFixedSchedulingWindow(toFixSchedWindow bool, fixedSchedWindowSize int) schedulerOptions { + return func(s ElectronScheduler) error { + if toFixSchedWindow { + if fixedSchedWindowSize <= 0 { + return errors.New("Invalid value of scheduling window size. Please provide a value > 0.") + } + lmt := elecLogDef.WARNING + msgColor := elecLogDef.LogMessageColors[lmt] + msg := msgColor.Sprintf("Fixing the size of the scheduling window to %d...", fixedSchedWindowSize) + s.(*BaseScheduler).Log(lmt, msg) + s.(*BaseScheduler).toFixSchedWindow = toFixSchedWindow + s.(*BaseScheduler).schedWindowSize = fixedSchedWindowSize + } + // There shouldn't be any error for this scheduler option. + // If toFixSchedWindow is set to false, then the fixedSchedWindowSize would be ignored. In this case, + // the size of the scheduling window would be determined at runtime. + return nil + } +} + // Launch tasks. func LaunchTasks(offerIDs []*mesos.OfferID, tasksToLaunch []*mesos.TaskInfo, driver sched.SchedulerDriver) { driver.LaunchTasks(offerIDs, tasksToLaunch, mesosUtils.DefaultFilter) diff --git a/schedulers/schedPolicy.go b/schedulers/schedPolicy.go index 9c25a97..e6644ac 100644 --- a/schedulers/schedPolicy.go +++ b/schedulers/schedPolicy.go @@ -4,6 +4,7 @@ import ( "bitbucket.org/sunybingcloud/electron/def" mesos "github.com/mesos/mesos-go/api/v0/mesosproto" sched "github.com/mesos/mesos-go/api/v0/scheduler" + "log" "time" ) @@ -37,88 +38,129 @@ type baseSchedPolicyState struct { VarianceCpuSharePerTask float64 `json:"varCpuShare"` } -func (bsps *baseSchedPolicyState) SwitchIfNecessary(spc SchedPolicyContext) { - baseSchedRef := spc.(*BaseScheduler) - // Switching scheduling policy only if feature enabled from CLI - if baseSchedRef.schedPolSwitchEnabled { - // Name of the scheduling policy to switch to. - switchToPolicyName := "" - // Need to recompute size of the scheduling window for the next offer cycle. - // The next scheduling policy will schedule at max schedWindowSize number of tasks. - baseSchedRef.schedWindowSize, baseSchedRef.numTasksInSchedWindow = - baseSchedRef.schedWindowResStrategy.Apply(func() interface{} { return baseSchedRef.tasks }) - if baseSchedRef.schedWindowSize > 0 { - // Record overhead to classify the tasks in the scheduling window and using the classification results - // to determine the distribution of low power consuming and high power consuming tasks. - startTime := time.Now() - // Determine the distribution of tasks in the new scheduling window. - taskDist, err := def.GetTaskDistributionInWindow(baseSchedRef.schedWindowSize, baseSchedRef.tasks) - baseSchedRef.LogClsfnAndTaskDistOverhead(time.Now().Sub(startTime)) - // If no resource offers have been received yet, and - // the name of the first scheduling policy to be deployed is provided, - // we switch to this policy regardless of the task distribution. - if !baseSchedRef.hasReceivedResourceOffers && (baseSchedRef.nameOfFstSchedPolToDeploy != "") { - switchToPolicyName = baseSchedRef.nameOfFstSchedPolToDeploy - } else if err != nil { - // All the tasks in the window were only classified into 1 cluster. - // Max-Min and Max-GreedyMins would work the same way as Bin-Packing for this situation. - // So, we have 2 choices to make. First-Fit or Bin-Packing. - // If choose Bin-Packing, then there might be a performance degradation due to increase in - // resource contention. So, First-Fit might be a better option to cater to the worst case - // where all the tasks are power intensive tasks. - // TODO: Another possibility is to do the exact opposite and choose Bin-Packing. - // TODO[2]: Determine scheduling policy based on the distribution of tasks in the whole queue. - switchToPolicyName = bp - } else { - // The tasks in the scheduling window were classified into 2 clusters, meaning that there is - // some variety in the kind of tasks. - // We now select the scheduling policy which is most appropriate for this distribution of tasks. - first := schedPoliciesToSwitch[0] - last := schedPoliciesToSwitch[len(schedPoliciesToSwitch)-1] - if taskDist < first.sp.GetInfo().taskDist { - switchToPolicyName = first.spName - } else if taskDist > last.sp.GetInfo().taskDist { - switchToPolicyName = last.spName +func (bsps *baseSchedPolicyState) nextPolicy(baseSchedRef *BaseScheduler) (string, float64) { + // Name of the scheduling policy to switch to. + switchToPolicyName := "" + // Record overhead to classify the tasks in the scheduling window and using the classification results + // to determine the distribution of low power consuming and high power consuming tasks. + startTime := time.Now() + // Determine the distribution of tasks in the new scheduling window. + taskDist, err := def.GetTaskDistributionInWindow(baseSchedRef.schedWindowSize, baseSchedRef.tasks) + baseSchedRef.LogClsfnAndTaskDistOverhead(time.Now().Sub(startTime)) + if err != nil { + // All the tasks in the window were only classified into 1 cluster. + // Max-Min and Max-GreedyMins would work the same way as Bin-Packing for this situation. + // So, we have 2 choices to make. First-Fit or Bin-Packing. + // If choose Bin-Packing, then there might be a performance degradation due to increase in + // resource contention. So, First-Fit might be a better option to cater to the worst case + // where all the tasks are power intensive tasks. + // TODO: Another possibility is to do the exact opposite and choose Bin-Packing. + // TODO[2]: Determine scheduling policy based on the distribution of tasks in the whole queue. + switchToPolicyName = bp + } else { + // The tasks in the scheduling window were classified into 2 clusters, meaning that there is + // some variety in the kind of tasks. + // We now select the scheduling policy which is most appropriate for this distribution of tasks. + first := schedPoliciesToSwitch[0] + last := schedPoliciesToSwitch[len(schedPoliciesToSwitch)-1] + if taskDist < first.sp.GetInfo().taskDist { + switchToPolicyName = first.spName + } else if taskDist > last.sp.GetInfo().taskDist { + switchToPolicyName = last.spName + } else { + low := 0 + high := len(schedPoliciesToSwitch) - 1 + for low <= high { + mid := (low + high) / 2 + if taskDist < schedPoliciesToSwitch[mid].sp.GetInfo().taskDist { + high = mid - 1 + } else if taskDist > schedPoliciesToSwitch[mid].sp.GetInfo().taskDist { + low = mid + 1 } else { - low := 0 - high := len(schedPoliciesToSwitch) - 1 - for low <= high { - mid := (low + high) / 2 - if taskDist < schedPoliciesToSwitch[mid].sp.GetInfo().taskDist { - high = mid - 1 - } else if taskDist > schedPoliciesToSwitch[mid].sp.GetInfo().taskDist { - low = mid + 1 - } else { - switchToPolicyName = schedPoliciesToSwitch[mid].spName - break - } - } - // We're here if low == high+1. - // If haven't yet found the closest match. - if switchToPolicyName == "" { - lowDiff := schedPoliciesToSwitch[low].sp.GetInfo().taskDist - taskDist - highDiff := taskDist - schedPoliciesToSwitch[high].sp.GetInfo().taskDist - if lowDiff > highDiff { - switchToPolicyName = schedPoliciesToSwitch[high].spName - } else if highDiff > lowDiff { - switchToPolicyName = schedPoliciesToSwitch[low].spName - } else { - // index doens't matter as the values at high and low are equidistant - // from taskDist. - switchToPolicyName = schedPoliciesToSwitch[high].spName - } - } + switchToPolicyName = schedPoliciesToSwitch[mid].spName + break + } + } + // We're here if low == high+1. + // If haven't yet found the closest match. + if switchToPolicyName == "" { + lowDiff := schedPoliciesToSwitch[low].sp.GetInfo().taskDist - taskDist + highDiff := taskDist - schedPoliciesToSwitch[high].sp.GetInfo().taskDist + if lowDiff > highDiff { + switchToPolicyName = schedPoliciesToSwitch[high].spName + } else if highDiff > lowDiff { + switchToPolicyName = schedPoliciesToSwitch[low].spName + } else { + // index doens't matter as the values at high and low are equidistant + // from taskDist. + switchToPolicyName = schedPoliciesToSwitch[high].spName } } - // Switching scheduling policy. - baseSchedRef.LogSchedPolicySwitch(taskDist, switchToPolicyName, SchedPolicies[switchToPolicyName]) - baseSchedRef.SwitchSchedPol(SchedPolicies[switchToPolicyName]) - // Resetting the number of tasks scheduled. - bsps.numTasksScheduled = 0 - } else { - // There is no need to switch the scheduling policy as there aren't any tasks in the window. } } + return switchToPolicyName, taskDist +} + +func (bsps *baseSchedPolicyState) SwitchIfNecessary(spc SchedPolicyContext) { + baseSchedRef := spc.(*BaseScheduler) + // Switching scheduling policy only if feature enabled from CLI. + if baseSchedRef.schedPolSwitchEnabled { + // Name of scheduling policy to switch to. + switchToPolicyName := "" + // Distribution of tasks in the scheduling window + var taskDist float64 + // If scheduling window has not been fixed, then determine the scheduling window based on the current + // availability of resources on the cluster (Mesos perspective). + if !baseSchedRef.toFixSchedWindow { + // Need to compute the size of the scheduling window. + // The next scheduling policy will schedule at max schedWindowSize number of tasks. + baseSchedRef.schedWindowSize, baseSchedRef.numTasksInSchedWindow = + baseSchedRef.schedWindowResStrategy.Apply(func() interface{} { return baseSchedRef.tasks }) + } + + // Now, we need to switch if the scheduling window is > 0. + if baseSchedRef.schedWindowSize > 0 { + // If we haven't received any resource offers, then + // check whether we need to fix the first scheduling policy to deploy. + // If not, then determine the first scheduling policy based on the distribution of tasks + // in the scheduling window. + // Else, + // Check whether the currently deployed scheduling policy has already scheduled the + // schedWindowSize number of tasks. + // If yes, then we switch to the scheduling policy based on the distribution of tasks in + // the scheduling window. + // If not, then we continue to use the currently deployed scheduling policy. + if !baseSchedRef.hasReceivedResourceOffers { + if baseSchedRef.nameOfFstSchedPolToDeploy != "" { + switchToPolicyName = baseSchedRef.nameOfFstSchedPolToDeploy + } else { + switchToPolicyName, taskDist = bsps.nextPolicy(baseSchedRef) + } + } else { + if bsps.numTasksScheduled >= baseSchedRef.schedWindowSize { + switchToPolicyName, taskDist = bsps.nextPolicy(baseSchedRef) + } else { + // We continue working with the currently deployed scheduling policy. + log.Println("Continuing with the current scheduling policy...") + log.Printf("TasksScheduled[%d], SchedWindowSize[%d]", bsps.numTasksScheduled, + baseSchedRef.schedWindowSize) + return + } + } + } else { + // We continue working with the currently deployed scheduling policy. + log.Println("Continuing with the current scheduling policy...") + log.Printf("TasksScheduled[%d], SchedWindowSize[%d]", bsps.numTasksScheduled, + baseSchedRef.schedWindowSize) + return + } + // Switching scheduling policy. + baseSchedRef.LogSchedPolicySwitch(taskDist, switchToPolicyName, SchedPolicies[switchToPolicyName]) + baseSchedRef.SwitchSchedPol(SchedPolicies[switchToPolicyName]) + // Resetting the number of tasks scheduled as this is a new scheduling policy that has been + // deployed. + bsps.numTasksScheduled = 0 + } } func (bsps *baseSchedPolicyState) GetInfo() (info struct { diff --git a/utilities/schedUtils/schedUtils.go b/utilities/schedUtils/schedUtils.go index 889141a..4e59e9f 100644 --- a/utilities/schedUtils/schedUtils.go +++ b/utilities/schedUtils/schedUtils.go @@ -86,14 +86,14 @@ func (s *fillNextOfferCycle) apply(taskQueue []def.Task) (int, int) { // Setting window as the length of the entire queue. // Also setting numberOfTasksTraversed to the number of tasks in the entire queue. // TODO: Create another resizing strategy that sizes the window to the length of the entire pending queue. - flattenedLength := 0 - numTasks := 0 - for _, ts := range taskQueue { - numTasks++ - flattenedLength += *ts.Instances - } - newSchedWindow = flattenedLength - numberOfTasksTraversed = numTasks + // flattenedLength := 0 + // numTasks := 0 + // for _, ts := range taskQueue { + // numTasks++ + // flattenedLength += *ts.Instances + // } + // newSchedWindow = flattenedLength + // numberOfTasksTraversed = numTasks return newSchedWindow, numberOfTasksTraversed }