Merged in measureClassificationOverhead (pull request #12)

MeasureClassificationOverhead

Approved-by: Akash Kothawale <akothaw1@binghamton.edu>
This commit is contained in:
Pradyumna Kaushik 2018-04-17 20:10:36 +00:00
parent ae81125110
commit f1c6adb05b
9 changed files with 127 additions and 81 deletions

View file

@ -0,0 +1,11 @@
package logging
type ClsfnTaskDistOverheadLogger struct {
loggerObserverImpl
}
func (col ClsfnTaskDistOverheadLogger) Log(message string) {
// Logging the overhead of classifying tasks in the scheduling window and determining the distribution
// of light power consuming and heavy power consuming tasks.
col.logObserverSpecifics[clsfnTaskDistOverheadLogger].logFile.Println(message)
}

View file

@ -7,14 +7,15 @@ var logMessageNames []string
// Possible log message types // Possible log message types
var ( var (
ERROR = messageNametoMessageType("ERROR") ERROR = messageNametoMessageType("ERROR")
WARNING = messageNametoMessageType("WARNING") WARNING = messageNametoMessageType("WARNING")
GENERAL = messageNametoMessageType("GENERAL") GENERAL = messageNametoMessageType("GENERAL")
SUCCESS = messageNametoMessageType("SUCCESS") SUCCESS = messageNametoMessageType("SUCCESS")
SCHED_TRACE = messageNametoMessageType("SCHED_TRACE") SCHED_TRACE = messageNametoMessageType("SCHED_TRACE")
PCP = messageNametoMessageType("PCP") PCP = messageNametoMessageType("PCP")
DEG_COL = messageNametoMessageType("DEG_COL") DEG_COL = messageNametoMessageType("DEG_COL")
SPS = messageNametoMessageType("SPS") SPS = messageNametoMessageType("SPS")
CLSFN_TASKDIST_OVERHEAD = messageNametoMessageType("CLSFN_TASKDIST_OVERHEAD")
) )
// Text colors for the different types of log messages. // Text colors for the different types of log messages.

View file

@ -20,6 +20,7 @@ func newLogger() *LoggerDriver {
PCP: true, PCP: true,
DEG_COL: true, DEG_COL: true,
SPS: true, SPS: true,
CLSFN_TASKDIST_OVERHEAD: true,
}, },
} }
return logger return logger

View file

@ -8,20 +8,22 @@ import (
// Names of different loggers // Names of different loggers
const ( const (
conLogger = "console-logger" conLogger = "console-logger"
schedTraceLogger = "schedTrace-logger" schedTraceLogger = "schedTrace-logger"
pcpLogger = "pcp-logger" pcpLogger = "pcp-logger"
degColLogger = "degCol-logger" degColLogger = "degCol-logger"
spsLogger = "schedPolicySwitch-logger" spsLogger = "schedPolicySwitch-logger"
clsfnTaskDistOverheadLogger = "classificationOverhead-logger"
) )
// Logger class factory // Logger class factory
var Loggers map[string]loggerObserver = map[string]loggerObserver{ var Loggers map[string]loggerObserver = map[string]loggerObserver{
conLogger: nil, conLogger: nil,
schedTraceLogger: nil, schedTraceLogger: nil,
pcpLogger: nil, pcpLogger: nil,
degColLogger: nil, degColLogger: nil,
spsLogger: nil, spsLogger: nil,
clsfnTaskDistOverheadLogger: nil,
} }
// Logger options to help initialize loggers // Logger options to help initialize loggers
@ -38,11 +40,12 @@ func withLogDirectory(startTime time.Time, prefix string) loggerOption {
func withLoggerSpecifics(prefix string) loggerOption { func withLoggerSpecifics(prefix string) loggerOption {
return func(l loggerObserver) error { return func(l loggerObserver) error {
l.(*loggerObserverImpl).logObserverSpecifics = map[string]*specifics{ l.(*loggerObserverImpl).logObserverSpecifics = map[string]*specifics{
conLogger: &specifics{}, conLogger: &specifics{},
schedTraceLogger: &specifics{}, schedTraceLogger: &specifics{},
pcpLogger: &specifics{}, pcpLogger: &specifics{},
degColLogger: &specifics{}, degColLogger: &specifics{},
spsLogger: &specifics{}, spsLogger: &specifics{},
clsfnTaskDistOverheadLogger: &specifics{},
} }
l.(*loggerObserverImpl).setLogFilePrefix(prefix) l.(*loggerObserverImpl).setLogFilePrefix(prefix)
l.(*loggerObserverImpl).setLogFile() l.(*loggerObserverImpl).setLogFile()
@ -70,6 +73,9 @@ func attachAllLoggers(lg *LoggerDriver, startTime time.Time, prefix string) {
Loggers[spsLogger] = &SchedPolicySwitchLogger{ Loggers[spsLogger] = &SchedPolicySwitchLogger{
loggerObserverImpl: *loi, loggerObserverImpl: *loi,
} }
Loggers[clsfnTaskDistOverheadLogger] = &ClsfnTaskDistOverheadLogger{
loggerObserverImpl: *loi,
}
for _, lmt := range GetLogMessageTypes() { for _, lmt := range GetLogMessageTypes() {
switch lmt { switch lmt {
@ -89,6 +95,8 @@ func attachAllLoggers(lg *LoggerDriver, startTime time.Time, prefix string) {
lg.attach(DEG_COL, Loggers[degColLogger]) lg.attach(DEG_COL, Loggers[degColLogger])
case SPS.String(): case SPS.String():
lg.attach(SPS, Loggers[spsLogger]) lg.attach(SPS, Loggers[spsLogger])
case CLSFN_TASKDIST_OVERHEAD.String():
lg.attach(CLSFN_TASKDIST_OVERHEAD, Loggers[clsfnTaskDistOverheadLogger])
} }
} }
} }

View file

@ -84,6 +84,15 @@ func (loi *loggerObserverImpl) setLogFilePrefix(prefix string) {
schedPolicySwitchLogFilePrefix = loi.logDirectory + "/" + schedPolicySwitchLogFilePrefix schedPolicySwitchLogFilePrefix = loi.logDirectory + "/" + schedPolicySwitchLogFilePrefix
} }
loi.logObserverSpecifics[spsLogger].logFilePrefix = schedPolicySwitchLogFilePrefix loi.logObserverSpecifics[spsLogger].logFilePrefix = schedPolicySwitchLogFilePrefix
// Setting logFilePrefix for clsfnTaskDist logger.
// Execution time of every call to def.GetTaskDistribution(...) would be recorded and logged in this file.
// The overhead would be logged in microseconds.
clsfnTaskDistOverheadLogFilePrefix := prefix + "_classificationOverhead.log"
if loi.logDirectory != "" {
clsfnTaskDistOverheadLogFilePrefix = loi.logDirectory + "/" + clsfnTaskDistOverheadLogFilePrefix
}
loi.logObserverSpecifics[clsfnTaskDistOverheadLogger].logFilePrefix = clsfnTaskDistOverheadLogFilePrefix
} }
func (loi *loggerObserverImpl) setLogDirectory(dirName string) { func (loi *loggerObserverImpl) setLogDirectory(dirName string) {

View file

@ -404,3 +404,8 @@ func (s *BaseScheduler) LogSchedPolicySwitch(taskDist float64, name string, next
s.Log(elecLogDef.GENERAL, fmt.Sprintf("Switching... TaskDistribution[%d] ==> %s", taskDist, name)) s.Log(elecLogDef.GENERAL, fmt.Sprintf("Switching... TaskDistribution[%d] ==> %s", taskDist, name))
} }
} }
func (s *BaseScheduler) LogClsfnAndTaskDistOverhead(overhead time.Duration) {
// Logging the overhead in microseconds.
s.Log(elecLogDef.CLSFN_TASKDIST_OVERHEAD, fmt.Sprintf("%f", float64(overhead.Nanoseconds())/1000.0))
}

View file

@ -5,6 +5,7 @@ import (
elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def" elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def"
mesos "github.com/mesos/mesos-go/api/v0/mesosproto" mesos "github.com/mesos/mesos-go/api/v0/mesosproto"
sched "github.com/mesos/mesos-go/api/v0/scheduler" sched "github.com/mesos/mesos-go/api/v0/scheduler"
"time"
) )
// Implements mesos scheduler. // Implements mesos scheduler.
@ -72,4 +73,6 @@ type ElectronScheduler interface {
LogTaskStatusUpdate(status *mesos.TaskStatus) LogTaskStatusUpdate(status *mesos.TaskStatus)
// Log Scheduling policy switches (if any) // Log Scheduling policy switches (if any)
LogSchedulingPolicySwitch() LogSchedulingPolicySwitch()
// Log the computation overhead of classifying tasks in the scheduling window.
LogClsfnAndTaskDistOverhead(overhead time.Duration)
} }

View file

@ -6,7 +6,6 @@ import (
"bitbucket.org/sunybingcloud/elektron/utilities/offerUtils" "bitbucket.org/sunybingcloud/elektron/utilities/offerUtils"
mesos "github.com/mesos/mesos-go/api/v0/mesosproto" mesos "github.com/mesos/mesos-go/api/v0/mesosproto"
sched "github.com/mesos/mesos-go/api/v0/scheduler" sched "github.com/mesos/mesos-go/api/v0/scheduler"
"log"
) )
// Decides if to take an offer or not // Decides if to take an offer or not

View file

@ -4,6 +4,7 @@ import (
"bitbucket.org/sunybingcloud/electron/def" "bitbucket.org/sunybingcloud/electron/def"
mesos "github.com/mesos/mesos-go/api/v0/mesosproto" mesos "github.com/mesos/mesos-go/api/v0/mesosproto"
sched "github.com/mesos/mesos-go/api/v0/scheduler" sched "github.com/mesos/mesos-go/api/v0/scheduler"
"time"
) )
type SchedPolicyContext interface { type SchedPolicyContext interface {
@ -46,69 +47,77 @@ func (bsps *baseSchedPolicyState) SwitchIfNecessary(spc SchedPolicyContext) {
// The next scheduling policy will schedule at max schedWindowSize number of tasks. // The next scheduling policy will schedule at max schedWindowSize number of tasks.
baseSchedRef.schedWindowSize, baseSchedRef.numTasksInSchedWindow = baseSchedRef.schedWindowSize, baseSchedRef.numTasksInSchedWindow =
baseSchedRef.schedWindowResStrategy.Apply(func() interface{} { return baseSchedRef.tasks }) baseSchedRef.schedWindowResStrategy.Apply(func() interface{} { return baseSchedRef.tasks })
// Determine the distribution of tasks in the new scheduling window. if baseSchedRef.schedWindowSize > 0 {
taskDist, err := def.GetTaskDistributionInWindow(baseSchedRef.schedWindowSize, baseSchedRef.tasks) // Record overhead to classify the tasks in the scheduling window and using the classification results
// If no resource offers have been received yet, and // to determine the distribution of low power consuming and high power consuming tasks.
// the name of the first scheduling policy to be deployed is provided, startTime := time.Now()
// we switch to this policy regardless of the task distribution. // Determine the distribution of tasks in the new scheduling window.
if !baseSchedRef.hasReceivedResourceOffers && (baseSchedRef.nameOfFstSchedPolToDeploy != "") { taskDist, err := def.GetTaskDistributionInWindow(baseSchedRef.schedWindowSize, baseSchedRef.tasks)
switchToPolicyName = baseSchedRef.nameOfFstSchedPolToDeploy baseSchedRef.LogClsfnAndTaskDistOverhead(time.Now().Sub(startTime))
} else if err != nil { // If no resource offers have been received yet, and
// All the tasks in the window were only classified into 1 cluster. // the name of the first scheduling policy to be deployed is provided,
// Max-Min and Max-GreedyMins would work the same way as Bin-Packing for this situation. // we switch to this policy regardless of the task distribution.
// So, we have 2 choices to make. First-Fit or Bin-Packing. if !baseSchedRef.hasReceivedResourceOffers && (baseSchedRef.nameOfFstSchedPolToDeploy != "") {
// If choose Bin-Packing, then there might be a performance degradation due to increase in switchToPolicyName = baseSchedRef.nameOfFstSchedPolToDeploy
// resource contention. So, First-Fit might be a better option to cater to the worst case } else if err != nil {
// where all the tasks are power intensive tasks. // All the tasks in the window were only classified into 1 cluster.
// TODO: Another possibility is to do the exact opposite and choose Bin-Packing. // Max-Min and Max-GreedyMins would work the same way as Bin-Packing for this situation.
// TODO[2]: Determine scheduling policy based on the distribution of tasks in the whole queue. // So, we have 2 choices to make. First-Fit or Bin-Packing.
switchToPolicyName = bp // If choose Bin-Packing, then there might be a performance degradation due to increase in
} else { // resource contention. So, First-Fit might be a better option to cater to the worst case
// The tasks in the scheduling window were classified into 2 clusters, meaning that there is // where all the tasks are power intensive tasks.
// some variety in the kind of tasks. // TODO: Another possibility is to do the exact opposite and choose Bin-Packing.
// We now select the scheduling policy which is most appropriate for this distribution of tasks. // TODO[2]: Determine scheduling policy based on the distribution of tasks in the whole queue.
first := schedPoliciesToSwitch[0] switchToPolicyName = bp
last := schedPoliciesToSwitch[len(schedPoliciesToSwitch)-1]
if taskDist < first.sp.GetInfo().taskDist {
switchToPolicyName = first.spName
} else if taskDist > last.sp.GetInfo().taskDist {
switchToPolicyName = last.spName
} else { } else {
low := 0 // The tasks in the scheduling window were classified into 2 clusters, meaning that there is
high := len(schedPoliciesToSwitch) - 1 // some variety in the kind of tasks.
for low <= high { // We now select the scheduling policy which is most appropriate for this distribution of tasks.
mid := (low + high) / 2 first := schedPoliciesToSwitch[0]
if taskDist < schedPoliciesToSwitch[mid].sp.GetInfo().taskDist { last := schedPoliciesToSwitch[len(schedPoliciesToSwitch)-1]
high = mid - 1 if taskDist < first.sp.GetInfo().taskDist {
} else if taskDist > schedPoliciesToSwitch[mid].sp.GetInfo().taskDist { switchToPolicyName = first.spName
low = mid + 1 } else if taskDist > last.sp.GetInfo().taskDist {
} else { switchToPolicyName = last.spName
switchToPolicyName = schedPoliciesToSwitch[mid].spName } else {
break low := 0
high := len(schedPoliciesToSwitch) - 1
for low <= high {
mid := (low + high) / 2
if taskDist < schedPoliciesToSwitch[mid].sp.GetInfo().taskDist {
high = mid - 1
} else if taskDist > schedPoliciesToSwitch[mid].sp.GetInfo().taskDist {
low = mid + 1
} else {
switchToPolicyName = schedPoliciesToSwitch[mid].spName
break
}
} }
} // We're here if low == high+1.
// We're here if low == high+1. // If haven't yet found the closest match.
// If haven't yet found the closest match. if switchToPolicyName == "" {
if switchToPolicyName == "" { lowDiff := schedPoliciesToSwitch[low].sp.GetInfo().taskDist - taskDist
lowDiff := schedPoliciesToSwitch[low].sp.GetInfo().taskDist - taskDist highDiff := taskDist - schedPoliciesToSwitch[high].sp.GetInfo().taskDist
highDiff := taskDist - schedPoliciesToSwitch[high].sp.GetInfo().taskDist if lowDiff > highDiff {
if lowDiff > highDiff { switchToPolicyName = schedPoliciesToSwitch[high].spName
switchToPolicyName = schedPoliciesToSwitch[high].spName } else if highDiff > lowDiff {
} else if highDiff > lowDiff { switchToPolicyName = schedPoliciesToSwitch[low].spName
switchToPolicyName = schedPoliciesToSwitch[low].spName } else {
} else { // index doens't matter as the values at high and low are equidistant
// index doens't matter as the values at high and low are equidistant // from taskDist.
// from taskDist. switchToPolicyName = schedPoliciesToSwitch[high].spName
switchToPolicyName = schedPoliciesToSwitch[high].spName }
} }
} }
} }
// Switching scheduling policy.
baseSchedRef.LogSchedPolicySwitch(taskDist, switchToPolicyName, SchedPolicies[switchToPolicyName])
baseSchedRef.SwitchSchedPol(SchedPolicies[switchToPolicyName])
// Resetting the number of tasks scheduled.
bsps.numTasksScheduled = 0
} else {
// There is no need to switch the scheduling policy as there aren't any tasks in the window.
} }
// Switching scheduling policy.
baseSchedRef.LogSchedPolicySwitch(taskDist, switchToPolicyName, SchedPolicies[switchToPolicyName])
baseSchedRef.SwitchSchedPol(SchedPolicies[switchToPolicyName])
// Resetting the number of tasks scheduled.
bsps.numTasksScheduled = 0
} }
} }