diff --git a/def/taskUtils.go b/def/taskUtils.go index f239bc6..f753c26 100644 --- a/def/taskUtils.go +++ b/def/taskUtils.go @@ -228,6 +228,8 @@ func GetTaskDistributionInWindow(windowSize int, tasks []Task) (float64, error) // The first cluster would corresponding to the light power consuming tasks. // The second cluster would corresponding to the high power consuming tasks. lpcTasksTotalInst := getTotalInstances(taskClusters[0].Tasks, taskExceedingWindow) + fmt.Printf("lpc:%d\n", lpcTasksTotalInst) hpcTasksTotalInst := getTotalInstances(taskClusters[1].Tasks, taskExceedingWindow) + fmt.Printf("hpc:%d\n", hpcTasksTotalInst) return float64(lpcTasksTotalInst) / float64(hpcTasksTotalInst), nil } diff --git a/logging/def/logType.go b/logging/def/logType.go index 1caf321..891b06e 100644 --- a/logging/def/logType.go +++ b/logging/def/logType.go @@ -16,6 +16,7 @@ var ( DEG_COL = messageNametoMessageType("DEG_COL") SPS = messageNametoMessageType("SPS") CLSFN_TASKDIST_OVERHEAD = messageNametoMessageType("CLSFN_TASKDIST_OVERHEAD") + SCHED_WINDOW = messageNametoMessageType("SCHED_WINDOW") ) // Text colors for the different types of log messages. diff --git a/logging/def/logger.go b/logging/def/logger.go index 7d595a2..42e946a 100644 --- a/logging/def/logger.go +++ b/logging/def/logger.go @@ -21,6 +21,7 @@ func newLogger() *LoggerDriver { DEG_COL: true, SPS: true, CLSFN_TASKDIST_OVERHEAD: true, + SCHED_WINDOW: true, }, } return logger diff --git a/logging/def/loggerFactory.go b/logging/def/loggerFactory.go index 92ed922..5ee0d13 100644 --- a/logging/def/loggerFactory.go +++ b/logging/def/loggerFactory.go @@ -14,6 +14,7 @@ const ( degColLogger = "degCol-logger" spsLogger = "schedPolicySwitch-logger" clsfnTaskDistOverheadLogger = "classificationOverhead-logger" + schedWindowLogger = "schedWindow-logger" ) // Logger class factory @@ -24,6 +25,7 @@ var Loggers map[string]loggerObserver = map[string]loggerObserver{ degColLogger: nil, spsLogger: nil, clsfnTaskDistOverheadLogger: nil, + schedWindowLogger: nil, } // Logger options to help initialize loggers @@ -46,6 +48,7 @@ func withLoggerSpecifics(prefix string) loggerOption { degColLogger: &specifics{}, spsLogger: &specifics{}, clsfnTaskDistOverheadLogger: &specifics{}, + schedWindowLogger: &specifics{}, } l.(*loggerObserverImpl).setLogFilePrefix(prefix) l.(*loggerObserverImpl).setLogFile() @@ -76,6 +79,9 @@ func attachAllLoggers(lg *LoggerDriver, startTime time.Time, prefix string) { Loggers[clsfnTaskDistOverheadLogger] = &ClsfnTaskDistOverheadLogger{ loggerObserverImpl: *loi, } + Loggers[schedWindowLogger] = &SchedWindowLogger{ + loggerObserverImpl: *loi, + } for _, lmt := range GetLogMessageTypes() { switch lmt { @@ -97,6 +103,8 @@ func attachAllLoggers(lg *LoggerDriver, startTime time.Time, prefix string) { lg.attach(SPS, Loggers[spsLogger]) case CLSFN_TASKDIST_OVERHEAD.String(): lg.attach(CLSFN_TASKDIST_OVERHEAD, Loggers[clsfnTaskDistOverheadLogger]) + case SCHED_WINDOW.String(): + lg.attach(SCHED_WINDOW, Loggers[schedWindowLogger]) } } } diff --git a/logging/def/loggerObservers.go b/logging/def/loggerObservers.go index 465c307..87f5b46 100644 --- a/logging/def/loggerObservers.go +++ b/logging/def/loggerObservers.go @@ -93,6 +93,15 @@ func (loi *loggerObserverImpl) setLogFilePrefix(prefix string) { clsfnTaskDistOverheadLogFilePrefix = loi.logDirectory + "/" + clsfnTaskDistOverheadLogFilePrefix } loi.logObserverSpecifics[clsfnTaskDistOverheadLogger].logFilePrefix = clsfnTaskDistOverheadLogFilePrefix + + // Setting logFilePrefix for schedWindow logger. + // Going to log the time stamp when the scheduling window was determined + // and the size of the scheduling window. + schedWindowLogFilePrefix := prefix + "_schedWindow.log" + if loi.logDirectory != "" { + schedWindowLogFilePrefix = loi.logDirectory + "/" + schedWindowLogFilePrefix + } + loi.logObserverSpecifics[schedWindowLogger].logFilePrefix = schedWindowLogFilePrefix } func (loi *loggerObserverImpl) setLogDirectory(dirName string) { diff --git a/logging/def/schedWindowLogger.go b/logging/def/schedWindowLogger.go new file mode 100644 index 0000000..518b30b --- /dev/null +++ b/logging/def/schedWindowLogger.go @@ -0,0 +1,10 @@ +package logging + +type SchedWindowLogger struct { + loggerObserverImpl +} + +func (swl SchedWindowLogger) Log(message string) { + // Logging schedule trace to mentioned file + swl.logObserverSpecifics[schedWindowLogger].logFile.Println(message) +} diff --git a/schedPolConfig.json b/schedPolConfig.json new file mode 100644 index 0000000..cf01d90 --- /dev/null +++ b/schedPolConfig.json @@ -0,0 +1,14 @@ +{ + "bin-packing": { + "taskDist": 10.0, + "varCpuShare": 0.64 + }, + "max-min": { + "taskDist": 0.416, + "varCpuShare": 0.35 + }, + "max-greedymins": { + "taskDist": 6.667, + "varCpuShare": 0.89 + } +} diff --git a/scheduler.go b/scheduler.go index af3791a..9892ee6 100644 --- a/scheduler.go +++ b/scheduler.go @@ -31,6 +31,7 @@ var schedPolConfigFile = flag.String("schedPolConfig", "", "Config file that con var fixFirstSchedPol = flag.String("fixFirstSchedPol", "", "Name of the scheduling policy to be deployed first, regardless of the distribution of tasks, provided switching is enabled.") var fixSchedWindow = flag.Bool("fixSchedWindow", false, "Fix the size of the scheduling window that every deployed scheduling policy should schedule, provided switching is enabled.") var schedWindowSize = flag.Int("schedWindowSize", 200, "Size of the scheduling window if fixSchedWindow is set.") +var schedPolSwitchCriteria = flag.String("schedPolSwitchCriteria", "taskDist", "Scheduling policy switching criteria.") // Short hand args func init() { @@ -48,6 +49,7 @@ func init() { flag.StringVar(fixFirstSchedPol, "fxFstSchedPol", "", "Name of the scheduling gpolicy to be deployed first, regardless of the distribution of tasks, provided switching is enabled (shorthand).") flag.BoolVar(fixSchedWindow, "fixSw", false, "Fix the size of the scheduling window that every deployed scheduling policy should schedule, provided switching is enabled (shorthand).") flag.IntVar(schedWindowSize, "swSize", 200, "Size of the scheduling window if fixSchedWindow is set (shorthand).") + flag.StringVar(schedPolSwitchCriteria, "spsCriteria", "taskDist", "Scheduling policy switching criteria (shorthand).") } func listAllSchedulingPolicies() { @@ -141,7 +143,7 @@ func main() { schedulers.WithDone(done), schedulers.WithPCPLog(pcpLog), schedulers.WithLoggingChannels(logMType, logMsg), - schedulers.WithSchedPolSwitchEnabled(*enableSchedPolicySwitch), + schedulers.WithSchedPolSwitchEnabled(*enableSchedPolicySwitch, *schedPolSwitchCriteria), schedulers.WithNameOfFirstSchedPolToFix(*fixFirstSchedPol), schedulers.WithFixedSchedulingWindow(*fixSchedWindow, *schedWindowSize)) driver, err := sched.NewMesosSchedulerDriver(sched.DriverConfig{ diff --git a/schedulers/base.go b/schedulers/base.go index 0995ee8..eec1281 100644 --- a/schedulers/base.go +++ b/schedulers/base.go @@ -62,6 +62,8 @@ type BaseScheduler struct { // This scheduling policy would be deployed first regardless of the distribution of tasks in the TaskQueue. // Note: Scheduling policy switching needs to be enabled. nameOfFstSchedPolToDeploy string + // Scheduling policy switching criteria. + schedPolSwitchCriteria string // Size of window of tasks that can be scheduled in the next offer cycle. // The window size can be adjusted to make the most use of every resource offer. @@ -195,8 +197,8 @@ func (s *BaseScheduler) ResourceOffers(driver sched.SchedulerDriver, offers []*m } // Switch just before consuming the resource offers. s.curSchedPolicy.SwitchIfNecessary(s) - s.Log(elecLogDef.GENERAL, fmt.Sprintf("SchedWindowSize[%d], #TasksInWindow[%d]", - s.schedWindowSize, s.numTasksInSchedWindow)) + // s.Log(elecLogDef.GENERAL, fmt.Sprintf("SchedWindowSize[%d], #TasksInWindow[%d]", + // s.schedWindowSize, s.numTasksInSchedWindow)) s.curSchedPolicy.ConsumeOffers(s, driver, offers) s.hasReceivedResourceOffers = true } @@ -211,16 +213,16 @@ func (s *BaseScheduler) StatusUpdate(driver sched.SchedulerDriver, status *mesos } // Add task to list of tasks running on node s.Running[*status.SlaveId.Value][*status.TaskId.Value] = true - s.TasksRunningMutex.Unlock() s.tasksRunning++ + s.TasksRunningMutex.Unlock() } else if IsTerminal(status.State) { // Update resource availability. utilities.ResourceAvailabilityUpdate("ON_TASK_TERMINAL_STATE", *status.TaskId, *status.SlaveId) s.TasksRunningMutex.Lock() delete(s.Running[*status.SlaveId.Value], *status.TaskId.Value) - s.TasksRunningMutex.Unlock() s.tasksRunning-- + s.TasksRunningMutex.Unlock() if s.tasksRunning == 0 { select { case <-s.Shutdown: @@ -401,16 +403,18 @@ func (s *BaseScheduler) LogTaskStatusUpdate(status *mesos.TaskStatus) { s.Log(lmt, msg) } -func (s *BaseScheduler) LogSchedPolicySwitch(taskDist float64, name string, nextPolicy SchedPolicyState) { - log := func() { +func (s *BaseScheduler) LogSchedPolicySwitch(name string, nextPolicy SchedPolicyState) { + logSPS := func() { s.Log(elecLogDef.SPS, name) - s.Log(elecLogDef.GENERAL, fmt.Sprintf("Switching... TaskDistribution[%f] ==> %s", taskDist, name)) } if s.hasReceivedResourceOffers && (s.curSchedPolicy != nextPolicy) { - log() + logSPS() } else if !s.hasReceivedResourceOffers { - log() + logSPS() } + // Logging the size of the scheduling window and the scheduling policy + // that is going to schedule the tasks in the scheduling window. + s.Log(elecLogDef.SCHED_WINDOW, fmt.Sprintf("%d %s", s.schedWindowSize, name)) } func (s *BaseScheduler) LogClsfnAndTaskDistOverhead(overhead time.Duration) { diff --git a/schedulers/electronScheduler.go b/schedulers/electronScheduler.go index 0afc4cb..bd91d1b 100644 --- a/schedulers/electronScheduler.go +++ b/schedulers/electronScheduler.go @@ -72,7 +72,7 @@ type ElectronScheduler interface { // Log Status update of a task LogTaskStatusUpdate(status *mesos.TaskStatus) // Log Scheduling policy switches (if any) - LogSchedulingPolicySwitch() + LogSchedPolicySwitch(name string, nextPolicy SchedPolicyState) // Log the computation overhead of classifying tasks in the scheduling window. LogClsfnAndTaskDistOverhead(overhead time.Duration) } diff --git a/schedulers/helpers.go b/schedulers/helpers.go index ae82eb0..5db77b8 100644 --- a/schedulers/helpers.go +++ b/schedulers/helpers.go @@ -117,9 +117,14 @@ func WithLoggingChannels(lmt chan elecLogDef.LogMessageType, msg chan string) sc } } -func WithSchedPolSwitchEnabled(enableSchedPolicySwitch bool) schedulerOptions { +func WithSchedPolSwitchEnabled(enableSchedPolicySwitch bool, switchingCriteria string) schedulerOptions { return func(s ElectronScheduler) error { s.(*BaseScheduler).schedPolSwitchEnabled = enableSchedPolicySwitch + // Checking if valid switching criteria. + if _, ok := switchBasedOn[switchingCriteria]; !ok { + return errors.New("Invalid scheduling policy switching criteria.") + } + s.(*BaseScheduler).schedPolSwitchCriteria = switchingCriteria return nil } } diff --git a/schedulers/schedPolicy.go b/schedulers/schedPolicy.go index e7bf710..5569256 100644 --- a/schedulers/schedPolicy.go +++ b/schedulers/schedPolicy.go @@ -2,6 +2,8 @@ package schedulers import ( "bitbucket.org/sunybingcloud/elektron/def" + elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def" + "fmt" mesos "github.com/mesos/mesos-go/api/v0/mesosproto" sched "github.com/mesos/mesos-go/api/v0/scheduler" "log" @@ -18,8 +20,15 @@ type SchedPolicyState interface { ConsumeOffers(SchedPolicyContext, sched.SchedulerDriver, []*mesos.Offer) // Get information about the scheduling policy. GetInfo() (info struct { - taskDist float64 - varCpuShare float64 + taskDist float64 + varCpuShare float64 + nextPolicyName string + prevPolicyName string + }) + // Update links to next and previous scheduling policy. + UpdateLinks(info struct { + nextPolicyName string + prevPolicyName string }) // Switch scheduling policy if necessary. SwitchIfNecessary(SchedPolicyContext) @@ -36,9 +45,23 @@ type baseSchedPolicyState struct { // The average variance in cpu-share per task that this scheduling policy can cause. // Note: This number corresponds to a given workload. VarianceCpuSharePerTask float64 `json:"varCpuShare"` + // Next and Previous scheduling policy in round-robin order. + // This order is determined by the sorted order (non-decreasing or non-increasing) of taskDistribution. + nextPolicyName string + prevPolicyName string } -func (bsps *baseSchedPolicyState) nextPolicy(baseSchedRef *BaseScheduler) (string, float64) { +// Scheduling policy switching criteria. +// Takes a pointer to the BaseScheduler and returns the name of the scheduling policy to switch to. +type switchBy func(*BaseScheduler) string + +var switchBasedOn map[string]switchBy = map[string]switchBy{ + "taskDist": switchTaskDistBased, + "round-robin": switchRoundRobinBased, + "rev-round-robin": switchRevRoundRobinBased, +} + +func switchTaskDistBased(baseSchedRef *BaseScheduler) string { // Name of the scheduling policy to switch to. switchToPolicyName := "" // Record overhead to classify the tasks in the scheduling window and using the classification results @@ -47,6 +70,7 @@ func (bsps *baseSchedPolicyState) nextPolicy(baseSchedRef *BaseScheduler) (strin // Determine the distribution of tasks in the new scheduling window. taskDist, err := def.GetTaskDistributionInWindow(baseSchedRef.schedWindowSize, baseSchedRef.tasks) baseSchedRef.LogClsfnAndTaskDistOverhead(time.Now().Sub(startTime)) + baseSchedRef.Log(elecLogDef.GENERAL, fmt.Sprintf("Switching... TaskDistribution[%f]", taskDist)) if err != nil { // All the tasks in the window were only classified into 1 cluster. // Max-Min and Max-GreedyMins would work the same way as Bin-Packing for this situation. @@ -98,7 +122,27 @@ func (bsps *baseSchedPolicyState) nextPolicy(baseSchedRef *BaseScheduler) (strin } } } - return switchToPolicyName, taskDist + return switchToPolicyName +} + +// Switching based on a round-robin approach. +// Not being considerate to the state of TaskQueue or the state of the cluster. +func switchRoundRobinBased(baseSchedRef *BaseScheduler) string { + // If haven't received any resource offers. + if !baseSchedRef.hasReceivedResourceOffers { + return schedPoliciesToSwitch[0].spName + } + return baseSchedRef.curSchedPolicy.GetInfo().nextPolicyName +} + +// Switching based on a round-robin approach, but in the reverse order. +// Not being considerate to the state of the TaskQueue or the state of the cluster. +func switchRevRoundRobinBased(baseSchedRef *BaseScheduler) string { + // If haven't received any resource offers. + if !baseSchedRef.hasReceivedResourceOffers { + return schedPoliciesToSwitch[len(schedPoliciesToSwitch)-1].spName + } + return baseSchedRef.curSchedPolicy.GetInfo().prevPolicyName } func (bsps *baseSchedPolicyState) SwitchIfNecessary(spc SchedPolicyContext) { @@ -107,19 +151,20 @@ func (bsps *baseSchedPolicyState) SwitchIfNecessary(spc SchedPolicyContext) { if baseSchedRef.schedPolSwitchEnabled { // Name of scheduling policy to switch to. switchToPolicyName := "" - // Distribution of tasks in the scheduling window - var taskDist float64 + // Size of the new scheduling window. + newSchedWindowSize := 0 // If scheduling window has not been fixed, then determine the scheduling window based on the current // availability of resources on the cluster (Mesos perspective). if !baseSchedRef.toFixSchedWindow { // Need to compute the size of the scheduling window. // The next scheduling policy will schedule at max schedWindowSize number of tasks. - baseSchedRef.schedWindowSize, baseSchedRef.numTasksInSchedWindow = + newSchedWindowSize, baseSchedRef.numTasksInSchedWindow = baseSchedRef.schedWindowResStrategy.Apply(func() interface{} { return baseSchedRef.tasks }) } - // Now, we need to switch if the scheduling window is > 0. - if baseSchedRef.schedWindowSize > 0 { + // Now, we need to switch if the new scheduling window is > 0. + if (!baseSchedRef.toFixSchedWindow && (newSchedWindowSize > 0)) || + (baseSchedRef.toFixSchedWindow && (baseSchedRef.schedWindowSize > 0)) { // If we haven't received any resource offers, then // check whether we need to fix the first scheduling policy to deploy. // If not, then determine the first scheduling policy based on the distribution of tasks @@ -133,12 +178,24 @@ func (bsps *baseSchedPolicyState) SwitchIfNecessary(spc SchedPolicyContext) { if !baseSchedRef.hasReceivedResourceOffers { if baseSchedRef.nameOfFstSchedPolToDeploy != "" { switchToPolicyName = baseSchedRef.nameOfFstSchedPolToDeploy + if !baseSchedRef.toFixSchedWindow { + baseSchedRef.schedWindowSize = newSchedWindowSize + } } else { - switchToPolicyName, taskDist = bsps.nextPolicy(baseSchedRef) + // Decided to switch, so updating the window size. + if !baseSchedRef.toFixSchedWindow { + baseSchedRef.schedWindowSize = newSchedWindowSize + } + switchToPolicyName = switchBasedOn[baseSchedRef.schedPolSwitchCriteria](baseSchedRef) } } else { + // Checking if the currently deployed scheduling policy has scheduled all the tasks in the window. if bsps.numTasksScheduled >= baseSchedRef.schedWindowSize { - switchToPolicyName, taskDist = bsps.nextPolicy(baseSchedRef) + // Decided to switch, so updating the window size. + if !baseSchedRef.toFixSchedWindow { + baseSchedRef.schedWindowSize = newSchedWindowSize + } + switchToPolicyName = switchBasedOn[baseSchedRef.schedPolSwitchCriteria](baseSchedRef) } else { // We continue working with the currently deployed scheduling policy. log.Println("Continuing with the current scheduling policy...") @@ -147,6 +204,12 @@ func (bsps *baseSchedPolicyState) SwitchIfNecessary(spc SchedPolicyContext) { return } } + // Switching scheduling policy. + baseSchedRef.LogSchedPolicySwitch(switchToPolicyName, SchedPolicies[switchToPolicyName]) + baseSchedRef.SwitchSchedPol(SchedPolicies[switchToPolicyName]) + // Resetting the number of tasks scheduled as this is a new scheduling policy that has been + // deployed. + bsps.numTasksScheduled = 0 } else { // We continue working with the currently deployed scheduling policy. log.Println("Continuing with the current scheduling policy...") @@ -154,20 +217,26 @@ func (bsps *baseSchedPolicyState) SwitchIfNecessary(spc SchedPolicyContext) { baseSchedRef.schedWindowSize) return } - // Switching scheduling policy. - baseSchedRef.LogSchedPolicySwitch(taskDist, switchToPolicyName, SchedPolicies[switchToPolicyName]) - baseSchedRef.SwitchSchedPol(SchedPolicies[switchToPolicyName]) - // Resetting the number of tasks scheduled as this is a new scheduling policy that has been - // deployed. - bsps.numTasksScheduled = 0 } } func (bsps *baseSchedPolicyState) GetInfo() (info struct { - taskDist float64 - varCpuShare float64 + taskDist float64 + varCpuShare float64 + nextPolicyName string + prevPolicyName string }) { info.taskDist = bsps.TaskDistribution info.varCpuShare = bsps.VarianceCpuSharePerTask + info.nextPolicyName = bsps.nextPolicyName + info.prevPolicyName = bsps.prevPolicyName return info } + +func (bsps *baseSchedPolicyState) UpdateLinks(info struct { + nextPolicyName string + prevPolicyName string +}) { + bsps.nextPolicyName = info.nextPolicyName + bsps.prevPolicyName = info.prevPolicyName +} diff --git a/schedulers/store.go b/schedulers/store.go index ff3b844..6a40a52 100644 --- a/schedulers/store.go +++ b/schedulers/store.go @@ -88,6 +88,22 @@ func InitSchedPolicyCharacteristics(schedPoliciesConfigFilename string) error { index++ } } + + // Initializing the next and previous policy based on the the round-robin ordering. + // The next policy for policy at N would correspond to the value at index N+1 in schedPoliciesToSwitch. + for curPolicyIndex := 0; curPolicyIndex < len(schedPoliciesToSwitch); curPolicyIndex++ { + info := struct { + nextPolicyName string + prevPolicyName string + }{} + if curPolicyIndex == 0 { + info.prevPolicyName = schedPoliciesToSwitch[len(schedPoliciesToSwitch)-1].spName + } else { + info.prevPolicyName = schedPoliciesToSwitch[curPolicyIndex-1].spName + } + info.nextPolicyName = schedPoliciesToSwitch[(curPolicyIndex+1)%len(schedPoliciesToSwitch)].spName + schedPoliciesToSwitch[curPolicyIndex].sp.UpdateLinks(info) + } } return nil