Merged in akash/LogPolicySwitch (pull request #4)

Scheduling Policy Switch Logger

Approved-by: Pradyumna Kaushik <pkaushi1@binghamton.edu>
This commit is contained in:
Akash Kothawale 2018-02-11 13:11:42 +00:00 committed by Pradyumna Kaushik
parent 3fa4a45ca7
commit b1bd21f730
11 changed files with 94 additions and 4 deletions

View file

@ -14,6 +14,7 @@ var (
SCHED_TRACE = messageNametoMessageType("SCHED_TRACE")
PCP = messageNametoMessageType("PCP")
DEG_COL = messageNametoMessageType("DEG_COL")
SPS = messageNametoMessageType("SPS")
)
// Text colors for the different types of log messages.

View file

@ -19,6 +19,7 @@ func newLogger() *LoggerDriver {
SUCCESS: true,
PCP: true,
DEG_COL: true,
SPS: true,
},
}
return logger

View file

@ -12,6 +12,7 @@ const (
schedTraceLogger = "schedTrace-logger"
pcpLogger = "pcp-logger"
degColLogger = "degCol-logger"
spsLogger = "schedPolicySwitch-logger"
)
// Logger class factory
@ -20,6 +21,7 @@ var Loggers map[string]loggerObserver = map[string]loggerObserver{
schedTraceLogger: nil,
pcpLogger: nil,
degColLogger: nil,
spsLogger: nil,
}
// Logger options to help initialize loggers
@ -40,6 +42,7 @@ func withLoggerSpecifics(prefix string) loggerOption {
schedTraceLogger: &specifics{},
pcpLogger: &specifics{},
degColLogger: &specifics{},
spsLogger: &specifics{},
}
l.(*loggerObserverImpl).setLogFilePrefix(prefix)
l.(*loggerObserverImpl).setLogFile()
@ -64,6 +67,9 @@ func attachAllLoggers(lg *LoggerDriver, startTime time.Time, prefix string) {
Loggers[degColLogger] = &DegColLogger{
loggerObserverImpl: *loi,
}
Loggers[spsLogger] = &SchedPolicySwitchLogger{
loggerObserverImpl: *loi,
}
for _, lmt := range GetLogMessageTypes() {
switch lmt {
@ -81,6 +87,8 @@ func attachAllLoggers(lg *LoggerDriver, startTime time.Time, prefix string) {
lg.attach(PCP, Loggers[pcpLogger])
case DEG_COL.String():
lg.attach(DEG_COL, Loggers[degColLogger])
case SPS.String():
lg.attach(SPS, Loggers[spsLogger])
}
}
}

View file

@ -77,6 +77,13 @@ func (loi *loggerObserverImpl) setLogFilePrefix(prefix string) {
degColLogFilePrefix = loi.logDirectory + "/" + degColLogFilePrefix
}
loi.logObserverSpecifics[degColLogger].logFilePrefix = degColLogFilePrefix
// Setting logFilePrefix for schedulingPolicySwitch logger
schedPolicySwitchLogFilePrefix := prefix + "_schedPolicySwitch.log"
if loi.logDirectory != "" {
schedPolicySwitchLogFilePrefix = loi.logDirectory + "/" + schedPolicySwitchLogFilePrefix
}
loi.logObserverSpecifics[spsLogger].logFilePrefix = schedPolicySwitchLogFilePrefix
}
func (loi *loggerObserverImpl) setLogDirectory(dirName string) {

View file

@ -0,0 +1,9 @@
package logging
type SchedPolicySwitchLogger struct {
loggerObserverImpl
}
func (pl *SchedPolicySwitchLogger) Log(message string) {
pl.logObserverSpecifics[spsLogger].logFile.Println(message)
}

View file

@ -7,6 +7,7 @@ import (
mesos "github.com/mesos/mesos-go/api/v0/mesosproto"
sched "github.com/mesos/mesos-go/api/v0/scheduler"
"log"
"math/rand"
)
// Decides if to take an offer or not
@ -168,5 +169,18 @@ func (s *MaxGreedyMins) ConsumeOffers(spc SchedPolicyContext, driver sched.Sched
}
}
s.switchIfNecessary(spc)
// Switch scheduling policy only if feature enabled from CLI
if baseSchedRef.schedPolSwitchEnabled {
// Switching to a random scheduling policy.
// TODO: Switch based on some criteria.
index := rand.Intn(len(SchedPolicies))
for k, v := range SchedPolicies {
if index == 0 {
baseSchedRef.LogSchedPolicySwitch(k, v)
spc.SwitchSchedPol(v)
break
}
index--
}
}
}

View file

@ -7,6 +7,7 @@ import (
mesos "github.com/mesos/mesos-go/api/v0/mesosproto"
sched "github.com/mesos/mesos-go/api/v0/scheduler"
"log"
"math/rand"
)
// Decides if to take an offer or not
@ -162,5 +163,18 @@ func (s *MaxMin) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDri
}
}
s.switchIfNecessary(spc)
// Switch scheduling policy only if feature enabled from CLI
if baseSchedRef.schedPolSwitchEnabled {
// Switching to a random scheduling policy.
// TODO: Switch based on some criteria.
index := rand.Intn(len(SchedPolicies))
for k, v := range SchedPolicies {
if index == 0 {
baseSchedRef.LogSchedPolicySwitch(k, v)
spc.SwitchSchedPol(v)
break
}
index--
}
}
}

View file

@ -400,3 +400,9 @@ func (s *BaseScheduler) LogTaskStatusUpdate(status *mesos.TaskStatus) {
*status.TaskId.Value, msgColor.Sprint(NameFor(status.State)))
s.Log(lmt, msg)
}
func (s *BaseScheduler) LogSchedPolicySwitch(name string, nextPolicy SchedPolicyState) {
if s.curSchedPolicy != nextPolicy {
s.Log(elecLogDef.SPS, name)
}
}

View file

@ -7,6 +7,7 @@ import (
mesos "github.com/mesos/mesos-go/api/v0/mesosproto"
sched "github.com/mesos/mesos-go/api/v0/scheduler"
"log"
"math/rand"
)
// Decides if to take an offer or not
@ -112,5 +113,18 @@ func (s *BinPackSortedWatts) ConsumeOffers(spc SchedPolicyContext, driver sched.
}
}
s.switchIfNecessary(spc)
// Switch scheduling policy only if feature enabled from CLI
if baseSchedRef.schedPolSwitchEnabled {
// Switching to a random scheduling policy.
// TODO: Switch based on some criteria.
index := rand.Intn(len(SchedPolicies))
for k, v := range SchedPolicies {
if index == 0 {
baseSchedRef.LogSchedPolicySwitch(k, v)
spc.SwitchSchedPol(v)
break
}
index--
}
}
}

View file

@ -70,4 +70,6 @@ type ElectronScheduler interface {
LogDisconnected()
// Log Status update of a task
LogTaskStatusUpdate(status *mesos.TaskStatus)
// Log Scheduling policy switches (if any)
LogSchedulingPolicySwitch()
}

View file

@ -7,6 +7,7 @@ import (
mesos "github.com/mesos/mesos-go/api/v0/mesosproto"
sched "github.com/mesos/mesos-go/api/v0/scheduler"
"log"
"math/rand"
)
// Decides if to take an offer or not
@ -98,5 +99,18 @@ func (s *FirstFit) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerD
}
}
s.switchIfNecessary(spc)
// Switch scheduling policy only if feature enabled from CLI
if baseSchedRef.schedPolSwitchEnabled {
// Switching to a random scheduling policy.
// TODO: Switch based on some criteria.
index := rand.Intn(len(SchedPolicies))
for k, v := range SchedPolicies {
if index == 0 {
baseSchedRef.LogSchedPolicySwitch(k, v)
spc.SwitchSchedPol(v)
break
}
index--
}
}
}