From f1c6adb05b8cd5c7b560fc6e7bd716e97d8055db Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Tue, 17 Apr 2018 20:10:36 +0000 Subject: [PATCH] Merged in measureClassificationOverhead (pull request #12) MeasureClassificationOverhead Approved-by: Akash Kothawale --- logging/def/clsfnTaskDistOverhead.go | 11 +++ logging/def/logType.go | 17 ++-- logging/def/logger.go | 1 + logging/def/loggerFactory.go | 38 +++++---- logging/def/loggerObservers.go | 9 ++ schedulers/base.go | 5 ++ schedulers/electronScheduler.go | 3 + schedulers/first-fit.go | 1 - schedulers/schedPolicy.go | 123 ++++++++++++++------------- 9 files changed, 127 insertions(+), 81 deletions(-) create mode 100644 logging/def/clsfnTaskDistOverhead.go diff --git a/logging/def/clsfnTaskDistOverhead.go b/logging/def/clsfnTaskDistOverhead.go new file mode 100644 index 0000000..f0781ce --- /dev/null +++ b/logging/def/clsfnTaskDistOverhead.go @@ -0,0 +1,11 @@ +package logging + +type ClsfnTaskDistOverheadLogger struct { + loggerObserverImpl +} + +func (col ClsfnTaskDistOverheadLogger) Log(message string) { + // Logging the overhead of classifying tasks in the scheduling window and determining the distribution + // of light power consuming and heavy power consuming tasks. + col.logObserverSpecifics[clsfnTaskDistOverheadLogger].logFile.Println(message) +} diff --git a/logging/def/logType.go b/logging/def/logType.go index eb4d80a..1caf321 100644 --- a/logging/def/logType.go +++ b/logging/def/logType.go @@ -7,14 +7,15 @@ var logMessageNames []string // Possible log message types var ( - ERROR = messageNametoMessageType("ERROR") - WARNING = messageNametoMessageType("WARNING") - GENERAL = messageNametoMessageType("GENERAL") - SUCCESS = messageNametoMessageType("SUCCESS") - SCHED_TRACE = messageNametoMessageType("SCHED_TRACE") - PCP = messageNametoMessageType("PCP") - DEG_COL = messageNametoMessageType("DEG_COL") - SPS = messageNametoMessageType("SPS") + ERROR = messageNametoMessageType("ERROR") + WARNING = messageNametoMessageType("WARNING") + GENERAL = messageNametoMessageType("GENERAL") + SUCCESS = messageNametoMessageType("SUCCESS") + SCHED_TRACE = messageNametoMessageType("SCHED_TRACE") + PCP = messageNametoMessageType("PCP") + DEG_COL = messageNametoMessageType("DEG_COL") + SPS = messageNametoMessageType("SPS") + CLSFN_TASKDIST_OVERHEAD = messageNametoMessageType("CLSFN_TASKDIST_OVERHEAD") ) // Text colors for the different types of log messages. diff --git a/logging/def/logger.go b/logging/def/logger.go index 351ff82..7d595a2 100644 --- a/logging/def/logger.go +++ b/logging/def/logger.go @@ -20,6 +20,7 @@ func newLogger() *LoggerDriver { PCP: true, DEG_COL: true, SPS: true, + CLSFN_TASKDIST_OVERHEAD: true, }, } return logger diff --git a/logging/def/loggerFactory.go b/logging/def/loggerFactory.go index 783a0b9..92ed922 100644 --- a/logging/def/loggerFactory.go +++ b/logging/def/loggerFactory.go @@ -8,20 +8,22 @@ import ( // Names of different loggers const ( - conLogger = "console-logger" - schedTraceLogger = "schedTrace-logger" - pcpLogger = "pcp-logger" - degColLogger = "degCol-logger" - spsLogger = "schedPolicySwitch-logger" + conLogger = "console-logger" + schedTraceLogger = "schedTrace-logger" + pcpLogger = "pcp-logger" + degColLogger = "degCol-logger" + spsLogger = "schedPolicySwitch-logger" + clsfnTaskDistOverheadLogger = "classificationOverhead-logger" ) // Logger class factory var Loggers map[string]loggerObserver = map[string]loggerObserver{ - conLogger: nil, - schedTraceLogger: nil, - pcpLogger: nil, - degColLogger: nil, - spsLogger: nil, + conLogger: nil, + schedTraceLogger: nil, + pcpLogger: nil, + degColLogger: nil, + spsLogger: nil, + clsfnTaskDistOverheadLogger: nil, } // Logger options to help initialize loggers @@ -38,11 +40,12 @@ func withLogDirectory(startTime time.Time, prefix string) loggerOption { func withLoggerSpecifics(prefix string) loggerOption { return func(l loggerObserver) error { l.(*loggerObserverImpl).logObserverSpecifics = map[string]*specifics{ - conLogger: &specifics{}, - schedTraceLogger: &specifics{}, - pcpLogger: &specifics{}, - degColLogger: &specifics{}, - spsLogger: &specifics{}, + conLogger: &specifics{}, + schedTraceLogger: &specifics{}, + pcpLogger: &specifics{}, + degColLogger: &specifics{}, + spsLogger: &specifics{}, + clsfnTaskDistOverheadLogger: &specifics{}, } l.(*loggerObserverImpl).setLogFilePrefix(prefix) l.(*loggerObserverImpl).setLogFile() @@ -70,6 +73,9 @@ func attachAllLoggers(lg *LoggerDriver, startTime time.Time, prefix string) { Loggers[spsLogger] = &SchedPolicySwitchLogger{ loggerObserverImpl: *loi, } + Loggers[clsfnTaskDistOverheadLogger] = &ClsfnTaskDistOverheadLogger{ + loggerObserverImpl: *loi, + } for _, lmt := range GetLogMessageTypes() { switch lmt { @@ -89,6 +95,8 @@ func attachAllLoggers(lg *LoggerDriver, startTime time.Time, prefix string) { lg.attach(DEG_COL, Loggers[degColLogger]) case SPS.String(): lg.attach(SPS, Loggers[spsLogger]) + case CLSFN_TASKDIST_OVERHEAD.String(): + lg.attach(CLSFN_TASKDIST_OVERHEAD, Loggers[clsfnTaskDistOverheadLogger]) } } } diff --git a/logging/def/loggerObservers.go b/logging/def/loggerObservers.go index 3479b2b..465c307 100644 --- a/logging/def/loggerObservers.go +++ b/logging/def/loggerObservers.go @@ -84,6 +84,15 @@ func (loi *loggerObserverImpl) setLogFilePrefix(prefix string) { schedPolicySwitchLogFilePrefix = loi.logDirectory + "/" + schedPolicySwitchLogFilePrefix } loi.logObserverSpecifics[spsLogger].logFilePrefix = schedPolicySwitchLogFilePrefix + + // Setting logFilePrefix for clsfnTaskDist logger. + // Execution time of every call to def.GetTaskDistribution(...) would be recorded and logged in this file. + // The overhead would be logged in microseconds. + clsfnTaskDistOverheadLogFilePrefix := prefix + "_classificationOverhead.log" + if loi.logDirectory != "" { + clsfnTaskDistOverheadLogFilePrefix = loi.logDirectory + "/" + clsfnTaskDistOverheadLogFilePrefix + } + loi.logObserverSpecifics[clsfnTaskDistOverheadLogger].logFilePrefix = clsfnTaskDistOverheadLogFilePrefix } func (loi *loggerObserverImpl) setLogDirectory(dirName string) { diff --git a/schedulers/base.go b/schedulers/base.go index ec81227..cae29e5 100644 --- a/schedulers/base.go +++ b/schedulers/base.go @@ -404,3 +404,8 @@ func (s *BaseScheduler) LogSchedPolicySwitch(taskDist float64, name string, next s.Log(elecLogDef.GENERAL, fmt.Sprintf("Switching... TaskDistribution[%d] ==> %s", taskDist, name)) } } + +func (s *BaseScheduler) LogClsfnAndTaskDistOverhead(overhead time.Duration) { + // Logging the overhead in microseconds. + s.Log(elecLogDef.CLSFN_TASKDIST_OVERHEAD, fmt.Sprintf("%f", float64(overhead.Nanoseconds())/1000.0)) +} diff --git a/schedulers/electronScheduler.go b/schedulers/electronScheduler.go index 0c64bbb..0afc4cb 100644 --- a/schedulers/electronScheduler.go +++ b/schedulers/electronScheduler.go @@ -5,6 +5,7 @@ import ( elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def" mesos "github.com/mesos/mesos-go/api/v0/mesosproto" sched "github.com/mesos/mesos-go/api/v0/scheduler" + "time" ) // Implements mesos scheduler. @@ -72,4 +73,6 @@ type ElectronScheduler interface { LogTaskStatusUpdate(status *mesos.TaskStatus) // Log Scheduling policy switches (if any) LogSchedulingPolicySwitch() + // Log the computation overhead of classifying tasks in the scheduling window. + LogClsfnAndTaskDistOverhead(overhead time.Duration) } diff --git a/schedulers/first-fit.go b/schedulers/first-fit.go index 8538aa8..d24d26f 100644 --- a/schedulers/first-fit.go +++ b/schedulers/first-fit.go @@ -6,7 +6,6 @@ import ( "bitbucket.org/sunybingcloud/elektron/utilities/offerUtils" mesos "github.com/mesos/mesos-go/api/v0/mesosproto" sched "github.com/mesos/mesos-go/api/v0/scheduler" - "log" ) // Decides if to take an offer or not diff --git a/schedulers/schedPolicy.go b/schedulers/schedPolicy.go index 8f6a098..9c25a97 100644 --- a/schedulers/schedPolicy.go +++ b/schedulers/schedPolicy.go @@ -4,6 +4,7 @@ import ( "bitbucket.org/sunybingcloud/electron/def" mesos "github.com/mesos/mesos-go/api/v0/mesosproto" sched "github.com/mesos/mesos-go/api/v0/scheduler" + "time" ) type SchedPolicyContext interface { @@ -46,69 +47,77 @@ func (bsps *baseSchedPolicyState) SwitchIfNecessary(spc SchedPolicyContext) { // The next scheduling policy will schedule at max schedWindowSize number of tasks. baseSchedRef.schedWindowSize, baseSchedRef.numTasksInSchedWindow = baseSchedRef.schedWindowResStrategy.Apply(func() interface{} { return baseSchedRef.tasks }) - // Determine the distribution of tasks in the new scheduling window. - taskDist, err := def.GetTaskDistributionInWindow(baseSchedRef.schedWindowSize, baseSchedRef.tasks) - // If no resource offers have been received yet, and - // the name of the first scheduling policy to be deployed is provided, - // we switch to this policy regardless of the task distribution. - if !baseSchedRef.hasReceivedResourceOffers && (baseSchedRef.nameOfFstSchedPolToDeploy != "") { - switchToPolicyName = baseSchedRef.nameOfFstSchedPolToDeploy - } else if err != nil { - // All the tasks in the window were only classified into 1 cluster. - // Max-Min and Max-GreedyMins would work the same way as Bin-Packing for this situation. - // So, we have 2 choices to make. First-Fit or Bin-Packing. - // If choose Bin-Packing, then there might be a performance degradation due to increase in - // resource contention. So, First-Fit might be a better option to cater to the worst case - // where all the tasks are power intensive tasks. - // TODO: Another possibility is to do the exact opposite and choose Bin-Packing. - // TODO[2]: Determine scheduling policy based on the distribution of tasks in the whole queue. - switchToPolicyName = bp - } else { - // The tasks in the scheduling window were classified into 2 clusters, meaning that there is - // some variety in the kind of tasks. - // We now select the scheduling policy which is most appropriate for this distribution of tasks. - first := schedPoliciesToSwitch[0] - last := schedPoliciesToSwitch[len(schedPoliciesToSwitch)-1] - if taskDist < first.sp.GetInfo().taskDist { - switchToPolicyName = first.spName - } else if taskDist > last.sp.GetInfo().taskDist { - switchToPolicyName = last.spName + if baseSchedRef.schedWindowSize > 0 { + // Record overhead to classify the tasks in the scheduling window and using the classification results + // to determine the distribution of low power consuming and high power consuming tasks. + startTime := time.Now() + // Determine the distribution of tasks in the new scheduling window. + taskDist, err := def.GetTaskDistributionInWindow(baseSchedRef.schedWindowSize, baseSchedRef.tasks) + baseSchedRef.LogClsfnAndTaskDistOverhead(time.Now().Sub(startTime)) + // If no resource offers have been received yet, and + // the name of the first scheduling policy to be deployed is provided, + // we switch to this policy regardless of the task distribution. + if !baseSchedRef.hasReceivedResourceOffers && (baseSchedRef.nameOfFstSchedPolToDeploy != "") { + switchToPolicyName = baseSchedRef.nameOfFstSchedPolToDeploy + } else if err != nil { + // All the tasks in the window were only classified into 1 cluster. + // Max-Min and Max-GreedyMins would work the same way as Bin-Packing for this situation. + // So, we have 2 choices to make. First-Fit or Bin-Packing. + // If choose Bin-Packing, then there might be a performance degradation due to increase in + // resource contention. So, First-Fit might be a better option to cater to the worst case + // where all the tasks are power intensive tasks. + // TODO: Another possibility is to do the exact opposite and choose Bin-Packing. + // TODO[2]: Determine scheduling policy based on the distribution of tasks in the whole queue. + switchToPolicyName = bp } else { - low := 0 - high := len(schedPoliciesToSwitch) - 1 - for low <= high { - mid := (low + high) / 2 - if taskDist < schedPoliciesToSwitch[mid].sp.GetInfo().taskDist { - high = mid - 1 - } else if taskDist > schedPoliciesToSwitch[mid].sp.GetInfo().taskDist { - low = mid + 1 - } else { - switchToPolicyName = schedPoliciesToSwitch[mid].spName - break + // The tasks in the scheduling window were classified into 2 clusters, meaning that there is + // some variety in the kind of tasks. + // We now select the scheduling policy which is most appropriate for this distribution of tasks. + first := schedPoliciesToSwitch[0] + last := schedPoliciesToSwitch[len(schedPoliciesToSwitch)-1] + if taskDist < first.sp.GetInfo().taskDist { + switchToPolicyName = first.spName + } else if taskDist > last.sp.GetInfo().taskDist { + switchToPolicyName = last.spName + } else { + low := 0 + high := len(schedPoliciesToSwitch) - 1 + for low <= high { + mid := (low + high) / 2 + if taskDist < schedPoliciesToSwitch[mid].sp.GetInfo().taskDist { + high = mid - 1 + } else if taskDist > schedPoliciesToSwitch[mid].sp.GetInfo().taskDist { + low = mid + 1 + } else { + switchToPolicyName = schedPoliciesToSwitch[mid].spName + break + } } - } - // We're here if low == high+1. - // If haven't yet found the closest match. - if switchToPolicyName == "" { - lowDiff := schedPoliciesToSwitch[low].sp.GetInfo().taskDist - taskDist - highDiff := taskDist - schedPoliciesToSwitch[high].sp.GetInfo().taskDist - if lowDiff > highDiff { - switchToPolicyName = schedPoliciesToSwitch[high].spName - } else if highDiff > lowDiff { - switchToPolicyName = schedPoliciesToSwitch[low].spName - } else { - // index doens't matter as the values at high and low are equidistant - // from taskDist. - switchToPolicyName = schedPoliciesToSwitch[high].spName + // We're here if low == high+1. + // If haven't yet found the closest match. + if switchToPolicyName == "" { + lowDiff := schedPoliciesToSwitch[low].sp.GetInfo().taskDist - taskDist + highDiff := taskDist - schedPoliciesToSwitch[high].sp.GetInfo().taskDist + if lowDiff > highDiff { + switchToPolicyName = schedPoliciesToSwitch[high].spName + } else if highDiff > lowDiff { + switchToPolicyName = schedPoliciesToSwitch[low].spName + } else { + // index doens't matter as the values at high and low are equidistant + // from taskDist. + switchToPolicyName = schedPoliciesToSwitch[high].spName + } } } } + // Switching scheduling policy. + baseSchedRef.LogSchedPolicySwitch(taskDist, switchToPolicyName, SchedPolicies[switchToPolicyName]) + baseSchedRef.SwitchSchedPol(SchedPolicies[switchToPolicyName]) + // Resetting the number of tasks scheduled. + bsps.numTasksScheduled = 0 + } else { + // There is no need to switch the scheduling policy as there aren't any tasks in the window. } - // Switching scheduling policy. - baseSchedRef.LogSchedPolicySwitch(taskDist, switchToPolicyName, SchedPolicies[switchToPolicyName]) - baseSchedRef.SwitchSchedPol(SchedPolicies[switchToPolicyName]) - // Resetting the number of tasks scheduled. - bsps.numTasksScheduled = 0 } }