diff --git a/logging/def/consoleLogger.go b/logging/def/consoleLogger.go new file mode 100644 index 0000000..dbcf1ef --- /dev/null +++ b/logging/def/consoleLogger.go @@ -0,0 +1,18 @@ +package logging + +import ( + "log" +) + +type ConsoleLogger struct { + loggerObserverImpl +} + +func (cl ConsoleLogger) Log(message string) { + // We need to log to console only if the message is not empty + if message != "" { + log.Println(message) + // Also logging the message to the console log file + cl.logObserverSpecifics[conLogger].logFile.Println(message) + } +} diff --git a/logging/def/logType.go b/logging/def/logType.go new file mode 100644 index 0000000..8125154 --- /dev/null +++ b/logging/def/logType.go @@ -0,0 +1,41 @@ +package logging + +import "github.com/fatih/color" + +// Defining enums of log message types +var logMessageNames []string + +// Possible log message types +var ( + ERROR = messageNametoMessageType("ERROR") + WARNING = messageNametoMessageType("WARNING") + GENERAL = messageNametoMessageType("GENERAL") + SUCCESS = messageNametoMessageType("SUCCESS") + SCHED_TRACE = messageNametoMessageType("SCHED_TRACE") + PCP = messageNametoMessageType("PCP") +) + +// Text colors for the different types of log messages. +var LogMessageColors map[LogMessageType]*color.Color = map[LogMessageType]*color.Color{ + ERROR: color.New(color.FgRed, color.Bold), + WARNING: color.New(color.FgYellow, color.Bold), + GENERAL: color.New(color.FgWhite, color.Bold), + SUCCESS: color.New(color.FgGreen, color.Bold), +} + +type LogMessageType int + +func (lmt LogMessageType) String() string { + return logMessageNames[lmt] +} + +func GetLogMessageTypes() []string { + return logMessageNames +} + +func messageNametoMessageType(messageName string) LogMessageType { + // Appending messageName to LogMessageNames + logMessageNames = append(logMessageNames, messageName) + // Mapping messageName to int + return LogMessageType(len(logMessageNames) - 1) +} diff --git a/logging/def/logger.go b/logging/def/logger.go new file mode 100644 index 0000000..5817651 --- /dev/null +++ b/logging/def/logger.go @@ -0,0 +1,56 @@ +package logging + +import ( + "time" +) + +type LoggerDriver struct { + loggerSubject + allowedMessageTypes map[LogMessageType]bool +} + +func newLogger() *LoggerDriver { + logger := &LoggerDriver{ + allowedMessageTypes: map[LogMessageType]bool{ + ERROR: true, + GENERAL: true, + WARNING: true, + SCHED_TRACE: true, + SUCCESS: true, + PCP: true, + }, + } + return logger +} + +func BuildLogger(startTime time.Time, prefix string) *LoggerDriver { + // building logger + l := newLogger() + attachAllLoggers(l, startTime, prefix) + return l +} + +func (log *LoggerDriver) EnabledLogging(messageType LogMessageType) { + log.allowedMessageTypes[messageType] = true +} + +func (log *LoggerDriver) DisableLogging(messageType LogMessageType) { + log.allowedMessageTypes[messageType] = false +} + +func (log *LoggerDriver) WriteLog(messageType LogMessageType, message string) { + // checking to see if logging for given messageType is disabled + if log.allowedMessageTypes[messageType] { + log.setMessage(message) + // notify registered loggers to log + log.notify(messageType) + } +} + +func (log *LoggerDriver) Listen(logMType <-chan LogMessageType, logMsg <-chan string) { + for { + mType := <-logMType + msg := <-logMsg + log.WriteLog(mType, msg) + } +} diff --git a/logging/def/loggerFactory.go b/logging/def/loggerFactory.go new file mode 100644 index 0000000..5073579 --- /dev/null +++ b/logging/def/loggerFactory.go @@ -0,0 +1,78 @@ +package logging + +import ( + logUtils "bitbucket.org/sunybingcloud/elektron/logging/utils" + "strings" + "time" +) + +// Names of different loggers +const ( + conLogger = "console-logger" + schedTraceLogger = "schedTrace-logger" + pcpLogger = "pcp-logger" +) + +// Logger class factory +var Loggers map[string]loggerObserver = map[string]loggerObserver{ + conLogger: nil, + schedTraceLogger: nil, + pcpLogger: nil, +} + +// Logger options to help initialize loggers +type loggerOption func(l loggerObserver) error + +func withLogDirectory(startTime time.Time, prefix string) loggerOption { + return func(l loggerObserver) error { + l.(*loggerObserverImpl).setLogDirectory(logUtils.GetLogDir(startTime, prefix)) + return nil + } +} + +// This loggerOption initializes the specifics for each loggerObserver +func withLoggerSpecifics(prefix string) loggerOption { + return func(l loggerObserver) error { + l.(*loggerObserverImpl).logObserverSpecifics = map[string]*specifics{ + conLogger: &specifics{}, + schedTraceLogger: &specifics{}, + pcpLogger: &specifics{}, + } + l.(*loggerObserverImpl).setLogFilePrefix(prefix) + l.(*loggerObserverImpl).setLogFile() + return nil + } +} + +// Build and assign all loggers +func attachAllLoggers(lg *LoggerDriver, startTime time.Time, prefix string) { + loi := &loggerObserverImpl{} + loi.init(withLogDirectory(startTime, strings.Split(prefix, startTime.Format("20060102150405"))[0]), + withLoggerSpecifics(prefix)) + Loggers[conLogger] = &ConsoleLogger{ + loggerObserverImpl: *loi, + } + Loggers[schedTraceLogger] = &SchedTraceLogger{ + loggerObserverImpl: *loi, + } + Loggers[pcpLogger] = &PCPLogger{ + loggerObserverImpl: *loi, + } + + for _, lmt := range GetLogMessageTypes() { + switch lmt { + case SCHED_TRACE.String(): + lg.attach(SCHED_TRACE, Loggers[schedTraceLogger]) + case GENERAL.String(): + lg.attach(GENERAL, Loggers[conLogger]) + case WARNING.String(): + lg.attach(WARNING, Loggers[conLogger]) + case ERROR.String(): + lg.attach(ERROR, Loggers[conLogger]) + case SUCCESS.String(): + lg.attach(SUCCESS, Loggers[conLogger]) + case PCP.String(): + lg.attach(PCP, Loggers[pcpLogger]) + } + } +} diff --git a/logging/def/loggerObservers.go b/logging/def/loggerObservers.go new file mode 100644 index 0000000..6427a9f --- /dev/null +++ b/logging/def/loggerObservers.go @@ -0,0 +1,77 @@ +package logging + +import ( + "fmt" + "log" + "os" +) + +// Logging platform +type loggerObserver interface { + Log(message string) + setLogFile() + setLogFilePrefix(prefix string) + setLogDirectory(dirName string) + init(opts ...loggerOption) +} + +type specifics struct { + logFilePrefix string + logFile *log.Logger +} + +type loggerObserverImpl struct { + logFile *log.Logger + logObserverSpecifics map[string]*specifics + logDirectory string +} + +func (loi *loggerObserverImpl) init(opts ...loggerOption) { + for _, opt := range opts { + // applying logger options + if err := opt(loi); err != nil { + log.Fatal(err) + } + } +} + +func (loi loggerObserverImpl) Log(message string) {} + +// Requires logFilePrefix to have already been set +func (loi *loggerObserverImpl) setLogFile() { + for prefix, ls := range loi.logObserverSpecifics { + if logFile, err := os.Create(ls.logFilePrefix); err != nil { + log.Fatal("Unable to create logFile: ", err) + } else { + fmt.Printf("Creating logFile with pathname: %s, and prefix: %s\n", ls.logFilePrefix, prefix) + ls.logFile = log.New(logFile, "", log.LstdFlags) + } + } +} + +func (loi *loggerObserverImpl) setLogFilePrefix(prefix string) { + // Setting logFilePrefix for pcp logger + pcpLogFilePrefix := prefix + ".pcplog" + if loi.logDirectory != "" { + pcpLogFilePrefix = loi.logDirectory + "/" + pcpLogFilePrefix + } + loi.logObserverSpecifics[pcpLogger].logFilePrefix = pcpLogFilePrefix + + // Setting logFilePrefix for console logger + consoleLogFilePrefix := prefix + "_console.log" + if loi.logDirectory != "" { + consoleLogFilePrefix = loi.logDirectory + "/" + consoleLogFilePrefix + } + loi.logObserverSpecifics[conLogger].logFilePrefix = consoleLogFilePrefix + + // Setting logFilePrefix for schedTrace logger + schedTraceLogFilePrefix := prefix + "_schedTrace.log" + if loi.logDirectory != "" { + schedTraceLogFilePrefix = loi.logDirectory + "/" + schedTraceLogFilePrefix + } + loi.logObserverSpecifics[schedTraceLogger].logFilePrefix = schedTraceLogFilePrefix +} + +func (loi *loggerObserverImpl) setLogDirectory(dirName string) { + loi.logDirectory = dirName +} diff --git a/logging/def/loggerSubject.go b/logging/def/loggerSubject.go new file mode 100644 index 0000000..20f8519 --- /dev/null +++ b/logging/def/loggerSubject.go @@ -0,0 +1,23 @@ +package logging + +type loggerSubject struct { + Registry map[LogMessageType][]loggerObserver + message string +} + +func (ls *loggerSubject) setMessage(message string) { + ls.message = message +} + +func (ls *loggerSubject) attach(messageType LogMessageType, lo loggerObserver) { + if ls.Registry == nil { + ls.Registry = make(map[LogMessageType][]loggerObserver) + } + ls.Registry[messageType] = append(ls.Registry[messageType], lo) +} + +func (ls *loggerSubject) notify(messageType LogMessageType) { + for _, logObserver := range ls.Registry[messageType] { + logObserver.Log(ls.message) + } +} diff --git a/logging/def/pcpLogger.go b/logging/def/pcpLogger.go new file mode 100644 index 0000000..354141e --- /dev/null +++ b/logging/def/pcpLogger.go @@ -0,0 +1,9 @@ +package logging + +type PCPLogger struct { + loggerObserverImpl +} + +func (pl *PCPLogger) Log(message string) { + pl.logObserverSpecifics[pcpLogger].logFile.Println(message) +} diff --git a/logging/def/schedTraceLogger.go b/logging/def/schedTraceLogger.go new file mode 100644 index 0000000..98e4109 --- /dev/null +++ b/logging/def/schedTraceLogger.go @@ -0,0 +1,10 @@ +package logging + +type SchedTraceLogger struct { + loggerObserverImpl +} + +func (stl SchedTraceLogger) Log(message string) { + // Logging schedule trace to mentioned file + stl.logObserverSpecifics[schedTraceLogger].logFile.Println(message) +} diff --git a/logging/utils/createLogDir.go b/logging/utils/createLogDir.go new file mode 100644 index 0000000..6fae75d --- /dev/null +++ b/logging/utils/createLogDir.go @@ -0,0 +1,39 @@ +package logging + +import ( + "log" + "os" + "strconv" + "time" +) + +var LogDir string + +func GetLogDir(startTime time.Time, prefix string) string { + if LogDir == "" { + LogDir = createLogDir(prefix, startTime) + } + return LogDir +} + +func createLogDir(prefix string, startTime time.Time) string { + // Creating directory to store all logs for this run + logDirName := "./" + prefix + strconv.Itoa(startTime.Year()) + logDirName += "-" + logDirName += startTime.Month().String() + logDirName += "-" + logDirName += strconv.Itoa(startTime.Day()) + logDirName += "_" + logDirName += strconv.Itoa(startTime.Hour()) + logDirName += "-" + logDirName += strconv.Itoa(startTime.Minute()) + logDirName += "-" + logDirName += strconv.Itoa(startTime.Second()) + if _, err := os.Stat(logDirName); os.IsNotExist(err) { + os.Mkdir(logDirName, 0700) + } else { + log.Println("Unable to create log directory: ", err) + logDirName = "" + } + return logDirName +} diff --git a/pcp/pcp.go b/pcp/pcp.go index 906ecb4..5924d88 100644 --- a/pcp/pcp.go +++ b/pcp/pcp.go @@ -1,27 +1,19 @@ package pcp import ( + elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def" "bufio" "log" - "os" "os/exec" "syscall" "time" ) -func Start(quit chan struct{}, logging *bool, prefix string) { +func Start(quit chan struct{}, logging *bool, logMType chan elecLogDef.LogMessageType, logMsg chan string) { const pcpCommand string = "pmdumptext -m -l -f '' -t 1.0 -d , -c config" cmd := exec.Command("sh", "-c", pcpCommand) cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} - logFile, err := os.Create("./" + prefix + ".pcplog") - if err != nil { - log.Fatal(err) - } - log.Println("Writing pcp logs to file: " + logFile.Name()) - - defer logFile.Close() - pipe, err := cmd.StdoutPipe() if err != nil { log.Fatal(err) @@ -34,8 +26,9 @@ func Start(quit chan struct{}, logging *bool, prefix string) { // Get names of the columns. scanner.Scan() - // Write to logfile. - logFile.WriteString(scanner.Text() + "\n") + // Write to logfile + logMType <- elecLogDef.PCP + logMsg <- scanner.Text() // Throw away first set of results. scanner.Scan() @@ -44,15 +37,16 @@ func Start(quit chan struct{}, logging *bool, prefix string) { for scanner.Scan() { if *logging { - log.Println("Logging PCP...") - logFile.WriteString(scanner.Text() + "\n") + logMType <- elecLogDef.PCP + logMsg <- scanner.Text() } seconds++ } }(logging) - log.Println("PCP logging started") + logMType <- elecLogDef.GENERAL + logMsg <- "PCP logging started" if err := cmd.Start(); err != nil { log.Fatal(err) @@ -62,7 +56,8 @@ func Start(quit chan struct{}, logging *bool, prefix string) { select { case <-quit: - log.Println("Stopping PCP logging in 5 seconds") + logMType <- elecLogDef.GENERAL + logMsg <- "Stopping PCP logging in 5 seconds" time.Sleep(5 * time.Second) // http://stackoverflow.com/questions/22470193/why-wont-go-kill-a-child-process-correctly diff --git a/power-capping/extrema.go b/power-capping/extrema.go index 7eba523..23908a1 100644 --- a/power-capping/extrema.go +++ b/power-capping/extrema.go @@ -3,10 +3,11 @@ package pcp import ( "bitbucket.org/sunybingcloud/elektron/pcp" "bitbucket.org/sunybingcloud/elektron/rapl" + elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def" "bufio" "container/ring" + "fmt" "log" - "os" "os/exec" "sort" "strconv" @@ -15,7 +16,9 @@ import ( "time" ) -func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, prefix string, hiThreshold, loThreshold float64) { +func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, hiThreshold, loThreshold float64, + logMType chan elecLogDef.LogMessageType, logMsg chan string) { + const pcpCommand string = "pmdumptext -m -l -f '' -t 1.0 -d , -c config" cmd := exec.Command("sh", "-c", pcpCommand) cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} @@ -24,13 +27,6 @@ func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, prefix s log.Println("High threshold is lower than low threshold!") } - logFile, err := os.Create("./" + prefix + ".pcplog") - if err != nil { - log.Fatal(err) - } - - defer logFile.Close() - pipe, err := cmd.StdoutPipe() if err != nil { log.Fatal(err) @@ -43,8 +39,9 @@ func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, prefix s // Get names of the columns. scanner.Scan() - // Write to logfile. - logFile.WriteString(scanner.Text() + "\n") + // Write to logfile + logMType <- elecLogDef.PCP + logMsg <- scanner.Text() headers := strings.Split(scanner.Text(), ",") @@ -79,9 +76,11 @@ func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, prefix s for scanner.Scan() { if *logging { - log.Println("Logging PCP...") + logMType <- elecLogDef.GENERAL + logMsg <- "Logging PCP..." split := strings.Split(scanner.Text(), ",") - logFile.WriteString(scanner.Text() + "\n") + logMType <- elecLogDef.PCP + logMsg <- scanner.Text() totalPower := 0.0 for _, powerIndex := range powerIndexes { @@ -92,7 +91,8 @@ func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, prefix s powerHistories[host].Value = power powerHistories[host] = powerHistories[host].Next() - log.Printf("Host: %s, Power: %f", indexToHost[powerIndex], (power * pcp.RAPLUnits)) + logMType <- elecLogDef.GENERAL + logMsg <- fmt.Sprintf("Host: %s, Power: %f", indexToHost[powerIndex], (power * pcp.RAPLUnits)) totalPower += power } @@ -103,11 +103,13 @@ func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, prefix s clusterMean := pcp.AverageClusterPowerHistory(clusterPowerHist) - log.Printf("Total power: %f, %d Sec Avg: %f", clusterPower, clusterPowerHist.Len(), clusterMean) + logMType <- elecLogDef.GENERAL + logMsg <- fmt.Sprintf("Total power: %f, %d Sec Avg: %f", clusterPower, clusterPowerHist.Len(), clusterMean) if clusterMean > hiThreshold { - log.Printf("Need to cap a node") - // Create statics for all victims and choose one to cap. + logMType <- elecLogDef.GENERAL + logMsg <- "Need to cap a node" + // Create statics for all victims and choose one to cap victims := make([]pcp.Victim, 0, 8) // TODO: Just keep track of the largest to reduce fron nlogn to n @@ -127,9 +129,11 @@ func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, prefix s if !cappedHosts[victim.Host] { cappedHosts[victim.Host] = true orderCapped = append(orderCapped, victim.Host) - log.Printf("Capping Victim %s Avg. Wattage: %f", victim.Host, victim.Watts*pcp.RAPLUnits) + logMType <- elecLogDef.GENERAL + logMsg <- fmt.Sprintf("Capping Victim %s Avg. Wattage: %f", victim.Host, victim.Watts * pcp.RAPLUnits) if err := rapl.Cap(victim.Host, "rapl", 50); err != nil { - log.Print("Error capping host") + logMType <- elecLogDef.ERROR + logMsg <- "Error capping host" } break // Only cap one machine at at time. } @@ -143,8 +147,11 @@ func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, prefix s cappedHosts[host] = false // User RAPL package to send uncap. log.Printf("Uncapping host %s", host) + logMType <- elecLogDef.GENERAL + logMsg <- fmt.Sprintf("Uncapped host %s", host) if err := rapl.Cap(host, "rapl", 100); err != nil { - log.Print("Error uncapping host") + logMType <- elecLogDef.ERROR + logMsg <- "Error capping host" } } } @@ -154,7 +161,8 @@ func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, prefix s } }(logging, hiThreshold, loThreshold) - log.Println("PCP logging started") + logMType <- elecLogDef.GENERAL + logMsg <- "PCP logging started" if err := cmd.Start(); err != nil { log.Fatal(err) @@ -164,7 +172,8 @@ func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, prefix s select { case <-quit: - log.Println("Stopping PCP logging in 5 seconds") + logMType <- elecLogDef.GENERAL + logMsg <- "Stopping PCP logging in 5 seconds" time.Sleep(5 * time.Second) // http://stackoverflow.com/questions/22470193/why-wont-go-kill-a-child-process-correctly diff --git a/power-capping/progressive-extrema.go b/power-capping/progressive-extrema.go index 49688de..4905b0a 100644 --- a/power-capping/progressive-extrema.go +++ b/power-capping/progressive-extrema.go @@ -5,11 +5,12 @@ import ( "bitbucket.org/sunybingcloud/elektron/pcp" "bitbucket.org/sunybingcloud/elektron/rapl" "bitbucket.org/sunybingcloud/elektron/utilities" + elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def" "bufio" "container/ring" + "fmt" "log" "math" - "os" "os/exec" "sort" "strconv" @@ -28,23 +29,18 @@ func getNextCapValue(curCapValue float64, precision int) float64 { return float64(round(curCapValue*output)) / output } -func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, prefix string, hiThreshold, loThreshold float64) { - log.Println("Inside Log and Progressive Extrema") +func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, hiThreshold, loThreshold float64, + logMType chan elecLogDef.LogMessageType, logMsg chan string) { + const pcpCommand string = "pmdumptext -m -l -f '' -t 1.0 -d , -c config" cmd := exec.Command("sh", "-c", pcpCommand) cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} if hiThreshold < loThreshold { - log.Println("High threshold is lower than low threshold!") + logMType <- elecLogDef.GENERAL + logMsg <- "High threshold is lower than low threshold!" } - logFile, err := os.Create("./" + prefix + ".pcplog") - if err != nil { - log.Fatal(err) - } - - defer logFile.Close() - pipe, err := cmd.StdoutPipe() if err != nil { log.Fatal(err) @@ -57,8 +53,9 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref // Get names of the columns. scanner.Scan() - // Write to logfile. - logFile.WriteString(scanner.Text() + "\n") + // Write to logfile + logMType <- elecLogDef.PCP + logMsg <- scanner.Text() headers := strings.Split(scanner.Text(), ",") @@ -97,9 +94,11 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref for scanner.Scan() { if *logging { - log.Println("Logging PCP...") + logMType <- elecLogDef.GENERAL + logMsg <- "Logging PCP..." split := strings.Split(scanner.Text(), ",") - logFile.WriteString(scanner.Text() + "\n") + logMType <- elecLogDef.PCP + logMsg <- scanner.Text() totalPower := 0.0 for _, powerIndex := range powerIndexes { @@ -110,7 +109,9 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref powerHistories[host].Value = power powerHistories[host] = powerHistories[host].Next() - log.Printf("Host: %s, Power: %f", indexToHost[powerIndex], (power * pcp.RAPLUnits)) + logMType <- elecLogDef.GENERAL + logMsg <- fmt.Sprintf("Host: %s, Power %f", + indexToHost[powerIndex], (power * pcp.RAPLUnits)) totalPower += power } @@ -121,13 +122,17 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref clusterMean := pcp.AverageClusterPowerHistory(clusterPowerHist) - log.Printf("Total power: %f, %d Sec Avg: %f", clusterPower, clusterPowerHist.Len(), clusterMean) + logMType <- elecLogDef.GENERAL + logMsg <- fmt.Sprintf("Total power: %f, %d Sec Avg: %f", clusterPower, clusterPowerHist.Len(), clusterMean) if clusterMean >= hiThreshold { - log.Println("Need to cap a node") - log.Printf("Cap values of capped victims: %v", cappedVictims) - log.Printf("Cap values of victims to uncap: %v", orderCappedVictims) - // Create statics for all victims and choose one to cap. + logMType <- elecLogDef.GENERAL + logMsg <- "Need to cap a node" + logMType <- elecLogDef.GENERAL + logMsg <- fmt.Sprintf("Cap values of capped victims: %v", cappedVictims) + logMType <- elecLogDef.GENERAL + logMsg <- fmt.Sprintf("Cap values of victims to uncap: %v", orderCappedVictims) + // Create statics for all victims and choose one to cap victims := make([]pcp.Victim, 0, 8) // TODO: Just keep track of the largest to reduce fron nlogn to n @@ -153,10 +158,12 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref } // Need to cap this victim. if err := rapl.Cap(victims[i].Host, "rapl", 50.0); err != nil { - log.Printf("Error capping host %s", victims[i].Host) + logMType <- elecLogDef.GENERAL + logMsg <- fmt.Sprintf("Error capping host %s", victims[i].Host) } else { - log.Printf("Capped host[%s] at %f", victims[i].Host, 50.0) - // Keeping track of this victim and it's cap value. + logMType <- elecLogDef.GENERAL + logMsg <- fmt.Sprintf("Capped host[%s] at %f", victims[i].Host, 50.0) + // Keeping track of this victim and it's cap value cappedVictims[victims[i].Host] = 50.0 newVictimFound = true // This node can be uncapped and hence adding to orderCapped. @@ -178,11 +185,13 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref if capValue > constants.LowerCapLimit { newCapValue := getNextCapValue(capValue, 2) if err := rapl.Cap(alreadyCappedHosts[i], "rapl", newCapValue); err != nil { - log.Printf("Error capping host[%s]", alreadyCappedHosts[i]) + logMType <- elecLogDef.ERROR + logMsg <- fmt.Sprintf("Error capping host[%s]", alreadyCappedHosts[i]) } else { // Successful cap - log.Printf("Capped host[%s] at %f", alreadyCappedHosts[i], newCapValue) - // Checking whether this victim can be capped further. + logMType <- elecLogDef.GENERAL + logMsg <- fmt.Sprintf("Capped host[%s] at %f", alreadyCappedHosts[i], newCapValue) + // Checking whether this victim can be capped further if newCapValue <= constants.LowerCapLimit { // Deleting victim from cappedVictims. delete(cappedVictims, alreadyCappedHosts[i]) @@ -204,14 +213,18 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref } } if !canCapAlreadyCappedVictim { - log.Println("No Victim left to cap.") + logMType <- elecLogDef.GENERAL + logMsg <- "No Victim left to cap." } } } else if clusterMean < loThreshold { - log.Println("Need to uncap a node") - log.Printf("Cap values of capped victims: %v", cappedVictims) - log.Printf("Cap values of victims to uncap: %v", orderCappedVictims) + logMType <- elecLogDef.GENERAL + logMsg <- "Need to uncap a node" + logMType <- elecLogDef.GENERAL + logMsg <- fmt.Sprintf("Cap values of capped victims: %v", cappedVictims) + logMType <- elecLogDef.GENERAL + logMsg <- fmt.Sprintf("Cap values of victims to uncap: %v", orderCappedVictims) if len(orderCapped) > 0 { // We pick the host that is capped the most to uncap. orderCappedToSort := utilities.GetPairList(orderCappedVictims) @@ -221,13 +234,15 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref // This is a floating point operation and might suffer from precision loss. newUncapValue := orderCappedVictims[hostToUncap] * 2.0 if err := rapl.Cap(hostToUncap, "rapl", newUncapValue); err != nil { - log.Printf("Error uncapping host[%s]", hostToUncap) + logMType <- elecLogDef.ERROR + logMsg <- fmt.Sprintf("Error uncapping host[%s]", hostToUncap) } else { - // Successful uncap. - log.Printf("Uncapped host[%s] to %f", hostToUncap, newUncapValue) - // Can we uncap this host further. If not, then we remove its entry from orderCapped. - if newUncapValue >= 100.0 { // Can compare using == - // Deleting entry from orderCapped. + // Successful uncap + logMType <- elecLogDef.GENERAL + logMsg <- fmt.Sprintf("Uncapped host[%s] to %f", hostToUncap, newUncapValue) + // Can we uncap this host further. If not, then we remove its entry from orderCapped + if newUncapValue >= 100.0 { // can compare using == + // Deleting entry from orderCapped for i, victimHost := range orderCapped { if victimHost == hostToUncap { orderCapped = append(orderCapped[:i], orderCapped[i+1:]...) @@ -245,7 +260,8 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref } } } else { - log.Println("No host staged for Uncapping") + logMType <- elecLogDef.GENERAL + logMsg <- "No host staged for Uncapped" } } } @@ -254,7 +270,8 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref }(logging, hiThreshold, loThreshold) - log.Println("PCP logging started") + logMType <- elecLogDef.GENERAL + logMsg <- "PCP logging started" if err := cmd.Start(); err != nil { log.Fatal(err) @@ -264,7 +281,8 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref select { case <-quit: - log.Println("Stopping PCP logging in 5 seconds") + logMType <- elecLogDef.GENERAL + logMsg <- "Stopping PCP logging in 5 seconds" time.Sleep(5 * time.Second) // http://stackoverflow.com/questions/22470193/why-wont-go-kill-a-child-process-correctly diff --git a/scheduler.go b/scheduler.go index 3b64651..616fc8d 100644 --- a/scheduler.go +++ b/scheduler.go @@ -4,6 +4,7 @@ import ( "bitbucket.org/sunybingcloud/elektron/def" "bitbucket.org/sunybingcloud/elektron/pcp" "bitbucket.org/sunybingcloud/elektron/schedulers" + elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def" "flag" "fmt" "github.com/golang/protobuf/proto" @@ -12,44 +13,39 @@ import ( "log" "os" "os/signal" + "strings" "time" ) var master = flag.String("master", "", "Location of leading Mesos master -- :") var tasksFile = flag.String("workload", "", "JSON file containing task definitions") -var wattsAsAResource = flag.Bool("wattsAsAResource", false, "Enable Watts as a Resource. "+ - "This allows the usage of the Watts attribute (if present) in the workload definition during offer matching.") -var pcplogPrefix = flag.String("logPrefix", "", "Prefix for PCP log file") -var hiThreshold = flag.Float64("hiThreshold", 0.0, "Upperbound for Cluster average historical power consumption, "+ - "beyond which extrema/progressive-extrema would start power-capping") -var loThreshold = flag.Float64("loThreshold", 0.0, "Lowerbound for Cluster average historical power consumption, "+ - "below which extrema/progressive-extrema would stop power-capping") -var classMapWatts = flag.Bool("classMapWatts", false, "Enable mapping of watts to powerClass of node") -var schedPolicyName = flag.String("schedPolicy", "first-fit", "Name of the scheduling policy to be used (default = first-fit).\n "+ - "Use option -listSchedPolicies to get the names of available scheduling policies") -var listSchedPolicies = flag.Bool("listSchedPolicies", false, "Names of the pluaggable scheduling policies.") +var wattsAsAResource = flag.Bool("wattsAsAResource", false, "Enable Watts as a Resource") +var pcplogPrefix = flag.String("logPrefix", "", "Prefix for pcplog") +var hiThreshold = flag.Float64("hiThreshold", 0.0, "Upperbound for when we should start capping") +var loThreshold = flag.Float64("loThreshold", 0.0, "Lowerbound for when we should start uncapping") +var classMapWatts = flag.Bool("classMapWatts", false, "Enable mapping of watts to power class of node") +var schedPolicyName = flag.String("schedPolicy", "first-fit", "Name of the scheduling policy to be used.\n\tUse option -listSchedPolicies to get the names of available scheduling policies") +var listSchedPolicies = flag.Bool("listSchedPolicies", false, "List the names of the pluaggable scheduling policies.") +var enableSchedPolicySwitch = flag.Bool("switchSchedPolicy", false, "Enable switching of scheduling policies at runtime.") -// Short hand args. +// Short hand args func init() { flag.StringVar(master, "m", "", "Location of leading Mesos master (shorthand)") flag.StringVar(tasksFile, "w", "", "JSON file containing task definitions (shorthand)") - flag.BoolVar(wattsAsAResource, "waar", false, "Enable Watts as a Resource. "+ - "This allows the usage of the Watts attribute (if present) in the workload definition during offer matching. (shorthand)") - flag.StringVar(pcplogPrefix, "p", "", "Prefix for PCP log file (shorthand)") - flag.Float64Var(hiThreshold, "ht", 700.0, "Upperbound for Cluster average historical power consumption, "+ - "beyond which extrema/progressive-extrema would start power-capping (shorthand)") - flag.Float64Var(loThreshold, "lt", 400.0, "Lowerbound for Cluster average historical power consumption, "+ - "below which extrema/progressive-extrema would stop power-capping (shorthand)") - flag.BoolVar(classMapWatts, "cmw", false, "Enable mapping of watts to powerClass of node (shorthand)") - flag.StringVar(schedPolicyName, "sp", "first-fit", "Name of the scheduling policy to be used (default = first-fit).\n "+ - "Use option -listSchedPolicies to get the names of available scheduling policies (shorthand)") + flag.BoolVar(wattsAsAResource, "waar", false, "Enable Watts as a Resource (shorthand)") + flag.StringVar(pcplogPrefix, "p", "", "Prefix for pcplog (shorthand)") + flag.Float64Var(hiThreshold, "ht", 700.0, "Upperbound for when we should start capping (shorthand)") + flag.Float64Var(loThreshold, "lt", 400.0, "Lowerbound for when we should start uncapping (shorthand)") + flag.BoolVar(classMapWatts, "cmw", false, "Enable mapping of watts to power class of node (shorthand)") + flag.StringVar(schedPolicyName, "sp", "first-fit", "Name of the scheduling policy to be used.\n Use option -listSchedPolicies to get the names of available scheduling policies (shorthand)") flag.BoolVar(listSchedPolicies, "lsp", false, "Names of the pluaggable scheduling policies. (shorthand)") + flag.BoolVar(enableSchedPolicySwitch, "ssp", false, "Enable switching of scheduling policies at runtime.") } func listAllSchedulingPolicies() { fmt.Println("Scheduling Policies") fmt.Println("-------------------") - for policyName, _ := range schedulers.Schedulers { + for policyName, _ := range schedulers.SchedPolicies { fmt.Println(policyName) } } @@ -57,17 +53,32 @@ func listAllSchedulingPolicies() { func main() { flag.Parse() - // Checking to see if we need to just list the pluggable scheduling policies. + // checking to see if we need to just list the pluggable scheduling policies if *listSchedPolicies { listAllSchedulingPolicies() os.Exit(1) } + startTime := time.Now() + formattedStartTime := startTime.Format("20060102150405") + // Checking if prefix contains any special characters + if strings.Contains(*pcplogPrefix, "/") { + log.Fatal("log file prefix should not contain '/'.") + } + logPrefix := *pcplogPrefix + "_" + formattedStartTime + + // creating logger and attaching different logging platforms + logger := elecLogDef.BuildLogger(startTime, logPrefix) + // logging channels + logMType := make(chan elecLogDef.LogMessageType) + logMsg := make(chan string) + go logger.Listen(logMType, logMsg) + // If non-default scheduling policy given, - // checking if scheduling policyName exists. + // checking if scheduling policyName exists if *schedPolicyName != "first-fit" { - if _, ok := schedulers.Schedulers[*schedPolicyName]; !ok { - // Invalid scheduling policy. + if _, ok := schedulers.SchedPolicies[*schedPolicyName]; !ok { + // invalid scheduling policy log.Println("Invalid scheduling policy given. The possible scheduling policies are:") listAllSchedulingPolicies() os.Exit(1) @@ -75,41 +86,45 @@ func main() { } if *tasksFile == "" { - fmt.Println("No file containing tasks specifiction provided.") + //fmt.Println("No file containing tasks specifiction provided.") + logger.WriteLog(elecLogDef.ERROR, "No file containing tasks specification provided") os.Exit(1) } if *hiThreshold < *loThreshold { - fmt.Println("High threshold is of a lower value than low threshold.") + //fmt.Println("High threshold is of a lower value than low threshold.") + logger.WriteLog(elecLogDef.ERROR, "High threshold is of a lower value than low threshold") os.Exit(1) } tasks, err := def.TasksFromJSON(*tasksFile) if err != nil || len(tasks) == 0 { - fmt.Println("Invalid tasks specification file provided") + //fmt.Println("Invalid tasks specification file provided") + logger.WriteLog(elecLogDef.ERROR, "Invalid tasks specification file provided") os.Exit(1) } - log.Println("Scheduling the following tasks:") + //log.Println("Scheduling the following tasks:") + logger.WriteLog(elecLogDef.GENERAL, "Scheduling the following tasks:") for _, task := range tasks { fmt.Println(task) } - startTime := time.Now().Format("20060102150405") - logPrefix := *pcplogPrefix + "_" + startTime shutdown := make(chan struct{}) done := make(chan struct{}) pcpLog := make(chan struct{}) recordPCP := false - scheduler := schedulers.SchedFactory(*schedPolicyName, + scheduler := schedulers.SchedFactory( + schedulers.WithSchedPolicy(*schedPolicyName), schedulers.WithTasks(tasks), schedulers.WithWattsAsAResource(*wattsAsAResource), schedulers.WithClassMapWatts(*classMapWatts), - schedulers.WithSchedTracePrefix(logPrefix), schedulers.WithRecordPCP(&recordPCP), schedulers.WithShutdown(shutdown), schedulers.WithDone(done), - schedulers.WithPCPLog(pcpLog)) + schedulers.WithPCPLog(pcpLog), + schedulers.WithLoggingChannels(logMType, logMsg), + schedulers.WithSchedPolSwitchEnabled(*enableSchedPolicySwitch)) driver, err := sched.NewMesosSchedulerDriver(sched.DriverConfig{ Master: *master, Framework: &mesos.FrameworkInfo{ @@ -123,10 +138,10 @@ func main() { return } - go pcp.Start(pcpLog, &recordPCP, logPrefix) - //go pcp.StartPCPLogAndExtremaDynamicCap(pcpLog, &recordPCP, logPrefix, *hiThreshold, *loThreshold) - //go pcp.StartPCPLogAndProgressiveExtremaCap(pcpLog, &recordPCP, logPrefix, *hiThreshold, *loThreshold) - time.Sleep(1 * time.Second) // Take a second between starting PCP log and continuing. + go pcp.Start(pcpLog, &recordPCP, logMType, logMsg) + //go pcp.StartPCPLogAndExtremaDynamicCap(pcpLog, &recordPCP, *hiThreshold, *loThreshold, logMType, logMsg) + //go pcp.StartPCPLogAndProgressiveExtremaCap(pcpLog, &recordPCP, *hiThreshold, *loThreshold, logMType, logMsg) + time.Sleep(1 * time.Second) // Take a second between starting PCP log and continuing // Attempt to handle SIGINT to not leave pmdumptext running. // Catch interrupt. @@ -156,6 +171,8 @@ func main() { case <-done: close(pcpLog) time.Sleep(5 * time.Second) //Wait for PCP to log a few more seconds + close(logMType) + close(logMsg) //case <-time.After(shutdownTimeout): } diff --git a/schedulers/MaxGreedyMins.go b/schedulers/MaxGreedyMins.go new file mode 100644 index 0000000..61edc9d --- /dev/null +++ b/schedulers/MaxGreedyMins.go @@ -0,0 +1,186 @@ +package schedulers + +import ( + "bitbucket.org/sunybingcloud/elektron/def" + "bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils" + "bitbucket.org/sunybingcloud/elektron/utilities/offerUtils" + "fmt" + mesos "github.com/mesos/mesos-go/mesosproto" + sched "github.com/mesos/mesos-go/scheduler" + "log" + "math/rand" +) + +// Decides if to take an offer or not +func (s *MaxGreedyMins) takeOffer(spc SchedPolicyContext, offer *mesos.Offer, task def.Task, + totalCPU, totalRAM, totalWatts float64) bool { + baseSchedRef := spc.(*baseScheduler) + cpus, mem, watts := offerUtils.OfferAgg(offer) + + //TODO: Insert watts calculation here instead of taking them as a parameter + + wattsConsideration, err := def.WattsToConsider(task, baseSchedRef.classMapWatts, offer) + if err != nil { + // Error in determining wattsConsideration + log.Fatal(err) + } + if (cpus >= (totalCPU + task.CPU)) && (mem >= (totalRAM + task.RAM)) && + (!baseSchedRef.wattsAsAResource || (watts >= (totalWatts + wattsConsideration))) { + return true + } + return false +} + +type MaxGreedyMins struct { + SchedPolicyState +} + +// Determine if the remaining space inside of the offer is enough for this +// the task we need to create. If it is, create a TaskInfo and return it. +func (s *MaxGreedyMins) CheckFit( + spc SchedPolicyContext, + i int, + task def.Task, + wattsConsideration float64, + offer *mesos.Offer, + totalCPU *float64, + totalRAM *float64, + totalWatts *float64) (bool, *mesos.TaskInfo) { + + baseSchedRef := spc.(*baseScheduler) + // Does the task fit + if s.takeOffer(spc, offer, task, *totalCPU, *totalRAM, *totalWatts) { + + *totalWatts += wattsConsideration + *totalCPU += task.CPU + *totalRAM += task.RAM + baseSchedRef.LogCoLocatedTasks(offer.GetSlaveId().GoString()) + + taskToSchedule := baseSchedRef.newTask(offer, task) + + baseSchedRef.LogSchedTrace(taskToSchedule, offer) + *task.Instances-- + + if *task.Instances <= 0 { + // All instances of task have been scheduled, remove it + baseSchedRef.tasks = append(baseSchedRef.tasks[:i], baseSchedRef.tasks[i+1:]...) + + if len(baseSchedRef.tasks) <= 0 { + baseSchedRef.LogTerminateScheduler() + close(baseSchedRef.Shutdown) + } + } + + return true, taskToSchedule + } + + return false, nil +} + +func (s *MaxGreedyMins) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDriver, offers []*mesos.Offer) { + fmt.Println("Max-GreedyMins scheduling...") + baseSchedRef := spc.(*baseScheduler) + def.SortTasks(baseSchedRef.tasks, def.SortByWatts) + baseSchedRef.LogOffersReceived(offers) + + for _, offer := range offers { + offerUtils.UpdateEnvironment(offer) + select { + case <-baseSchedRef.Shutdown: + baseSchedRef.LogNoPendingTasksDeclineOffers(offer) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) + baseSchedRef.LogNumberOfRunningTasks() + continue + default: + } + + tasks := []*mesos.TaskInfo{} + + offerTaken := false + totalWatts := 0.0 + totalCPU := 0.0 + totalRAM := 0.0 + + // Assumes s.tasks is ordered in non-decreasing median max peak order + + // Attempt to schedule a single instance of the heaviest workload available first + // Start from the back until one fits + for i := len(baseSchedRef.tasks) - 1; i >= 0; i-- { + + task := baseSchedRef.tasks[i] + wattsConsideration, err := def.WattsToConsider(task, baseSchedRef.classMapWatts, offer) + if err != nil { + // Error in determining wattsConsideration + log.Fatal(err) + } + + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { + continue + } + + // TODO: Fix this so index doesn't need to be passed + taken, taskToSchedule := s.CheckFit(spc, i, task, wattsConsideration, offer, + &totalCPU, &totalRAM, &totalWatts) + + if taken { + offerTaken = true + tasks = append(tasks, taskToSchedule) + break + } + } + + // Pack the rest of the offer with the smallest tasks + for i := 0; i < len(baseSchedRef.tasks); i++ { + task := baseSchedRef.tasks[i] + wattsConsideration, err := def.WattsToConsider(task, baseSchedRef.classMapWatts, offer) + if err != nil { + // Error in determining wattsConsideration + log.Fatal(err) + } + + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { + continue + } + + for *task.Instances > 0 { + // TODO: Fix this so index doesn't need to be passed + taken, taskToSchedule := s.CheckFit(spc, i, task, wattsConsideration, offer, + &totalCPU, &totalRAM, &totalWatts) + + if taken { + offerTaken = true + tasks = append(tasks, taskToSchedule) + } else { + break // Continue on to next task + } + } + } + + if offerTaken { + baseSchedRef.LogTaskStarting(nil, offer) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) + } else { + + // If there was no match for the task + cpus, mem, watts := offerUtils.OfferAgg(offer) + baseSchedRef.LogInsufficientResourcesDeclineOffer(offer, cpus, mem, watts) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) + } + } + + // Switch scheduling policy only if feature enabled from CLI + if baseSchedRef.schedPolSwitchEnabled { + // Switching to a random scheduling policy. + // TODO: Switch based on some criteria. + index := rand.Intn(len(SchedPolicies)) + for _, v := range SchedPolicies { + if index == 0 { + spc.SwitchSchedPol(v) + break + } + index-- + } + } +} diff --git a/schedulers/MaxMin.go b/schedulers/MaxMin.go new file mode 100644 index 0000000..392aebf --- /dev/null +++ b/schedulers/MaxMin.go @@ -0,0 +1,180 @@ +package schedulers + +import ( + "bitbucket.org/sunybingcloud/elektron/def" + "bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils" + "bitbucket.org/sunybingcloud/elektron/utilities/offerUtils" + "fmt" + mesos "github.com/mesos/mesos-go/mesosproto" + sched "github.com/mesos/mesos-go/scheduler" + "log" + "math/rand" +) + +// Decides if to take an offer or not +func (s *MaxMin) takeOffer(spc SchedPolicyContext, offer *mesos.Offer, task def.Task, + totalCPU, totalRAM, totalWatts float64) bool { + baseSchedRef := spc.(*baseScheduler) + cpus, mem, watts := offerUtils.OfferAgg(offer) + + //TODO: Insert watts calculation here instead of taking them as a parameter + + wattsConsideration, err := def.WattsToConsider(task, baseSchedRef.classMapWatts, offer) + if err != nil { + // Error in determining wattsConsideration + log.Fatal(err) + } + if (cpus >= (totalCPU + task.CPU)) && (mem >= (totalRAM + task.RAM)) && + (!baseSchedRef.wattsAsAResource || (watts >= (totalWatts + wattsConsideration))) { + return true + } + return false +} + +type MaxMin struct { + SchedPolicyState +} + +// Determine if the remaining space inside of the offer is enough for this +// task that we need to create. If it is, create a TaskInfo and return it. +func (s *MaxMin) CheckFit( + spc SchedPolicyContext, + i int, + task def.Task, + wattsConsideration float64, + offer *mesos.Offer, + totalCPU *float64, + totalRAM *float64, + totalWatts *float64) (bool, *mesos.TaskInfo) { + + baseSchedRef := spc.(*baseScheduler) + // Does the task fit. + if s.takeOffer(spc, offer, task, *totalCPU, *totalRAM, *totalWatts) { + + *totalWatts += wattsConsideration + *totalCPU += task.CPU + *totalRAM += task.RAM + baseSchedRef.LogCoLocatedTasks(offer.GetSlaveId().GoString()) + + taskToSchedule := baseSchedRef.newTask(offer, task) + + baseSchedRef.LogSchedTrace(taskToSchedule, offer) + *task.Instances-- + + if *task.Instances <= 0 { + // All instances of task have been scheduled, remove it. + baseSchedRef.tasks = append(baseSchedRef.tasks[:i], baseSchedRef.tasks[i+1:]...) + + if len(baseSchedRef.tasks) <= 0 { + baseSchedRef.LogTerminateScheduler() + close(baseSchedRef.Shutdown) + } + } + + return true, taskToSchedule + } + return false, nil +} + +func (s *MaxMin) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDriver, offers []*mesos.Offer) { + fmt.Println("Max-Min scheduling...") + baseSchedRef := spc.(*baseScheduler) + def.SortTasks(baseSchedRef.tasks, def.SortByWatts) + baseSchedRef.LogOffersReceived(offers) + + for _, offer := range offers { + offerUtils.UpdateEnvironment(offer) + select { + case <-baseSchedRef.Shutdown: + baseSchedRef.LogNoPendingTasksDeclineOffers(offer) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) + baseSchedRef.LogNumberOfRunningTasks() + continue + default: + } + + tasks := []*mesos.TaskInfo{} + + offerTaken := false + totalWatts := 0.0 + totalCPU := 0.0 + totalRAM := 0.0 + + // Assumes s.tasks is ordered in non-decreasing median max-peak order + + // Attempt to schedule a single instance of the heaviest workload available first. + // Start from the back until one fits. + + direction := false // True = Min Max, False = Max Min + var index int + start := true // If false then index has changed and need to keep it that way + for i := 0; i < len(baseSchedRef.tasks); i++ { + // We need to pick a min task or a max task + // depending on the value of direction. + if direction && start { + index = 0 + } else if start { + index = len(baseSchedRef.tasks) - i - 1 + } + task := baseSchedRef.tasks[index] + + wattsConsideration, err := def.WattsToConsider(task, baseSchedRef.classMapWatts, offer) + if err != nil { + // Error in determining wattsConsideration. + log.Fatal(err) + } + + // Don't take offer if it doesn't match our task's host requirement. + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { + continue + } + + // TODO: Fix this so index doesn't need to be passed. + taken, taskToSchedule := s.CheckFit(spc, index, task, wattsConsideration, offer, + &totalCPU, &totalRAM, &totalWatts) + + if taken { + offerTaken = true + tasks = append(tasks, taskToSchedule) + // Need to change direction and set start to true. + // Setting start to true would ensure that index be set accurately again. + direction = !direction + start = true + i-- + } else { + // Need to move index depending on the value of direction. + if direction { + index++ + start = false + } else { + index-- + start = false + } + } + } + + if offerTaken { + baseSchedRef.LogTaskStarting(nil, offer) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) + } else { + // If there was no match for the task + cpus, mem, watts := offerUtils.OfferAgg(offer) + baseSchedRef.LogInsufficientResourcesDeclineOffer(offer, cpus, mem, watts) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) + } + } + + // Switch scheduling policy only if feature enabled from CLI + if baseSchedRef.schedPolSwitchEnabled { + // Switching to a random scheduling policy. + // TODO: Switch based on some criteria. + index := rand.Intn(len(SchedPolicies)) + for _, v := range SchedPolicies { + if index == 0 { + spc.SwitchSchedPol(v) + break + } + index-- + } + } +} diff --git a/schedulers/base.go b/schedulers/base.go index e74fb71..a7061b7 100644 --- a/schedulers/base.go +++ b/schedulers/base.go @@ -2,19 +2,24 @@ package schedulers import ( "bitbucket.org/sunybingcloud/elektron/def" + elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def" + "bytes" + "fmt" + "github.com/golang/protobuf/proto" mesos "github.com/mesos/mesos-go/mesosproto" + "github.com/mesos/mesos-go/mesosutil" sched "github.com/mesos/mesos-go/scheduler" "log" + "sync" + "time" ) -// Implements mesos scheduler. -type ElectronScheduler interface { - sched.Scheduler - init(opts ...schedPolicyOption) -} - -type base struct { +type baseScheduler struct { ElectronScheduler + SchedPolicyContext + // Current scheduling policy used for resource offer consumption. + curSchedPolicy SchedPolicyState + tasksCreated int tasksRunning int tasks []def.Task @@ -24,7 +29,7 @@ type base struct { classMapWatts bool // First set of PCP values are garbage values, signal to logger to start recording when we're - // about to schedule a new task. + // about to schedule a new task RecordPCP *bool // This channel is closed when the program receives an interrupt, @@ -38,55 +43,309 @@ type base struct { PCPLog chan struct{} schedTrace *log.Logger + + // Send the type of the message to be logged + logMsgType chan elecLogDef.LogMessageType + // Send the message to be logged + logMsg chan string + + mutex sync.Mutex + + // Whether switching of scheduling policies at runtime has been enabled + schedPolSwitchEnabled bool } -func (s *base) init(opts ...schedPolicyOption) { +func (s *baseScheduler) init(opts ...schedPolicyOption) { for _, opt := range opts { - // Applying options. + // applying options if err := opt(s); err != nil { log.Fatal(err) } } s.running = make(map[string]map[string]bool) + s.mutex = sync.Mutex{} } -func (s *base) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) { - log.Printf("Offer %s rescinded", offerID) +func (s *baseScheduler) SwitchSchedPol(newSchedPol SchedPolicyState) { + s.curSchedPolicy = newSchedPol } -func (s *base) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) { - log.Printf("Slave %s lost", slaveID) + +func (s *baseScheduler) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { + taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) + s.tasksCreated++ + + if !*s.RecordPCP { + // Turn on elecLogDef + *s.RecordPCP = true + time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts + } + + // If this is our first time running into this Agent + if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok { + s.running[offer.GetSlaveId().GoString()] = make(map[string]bool) + } + + // Add task to list of tasks running on node + s.running[offer.GetSlaveId().GoString()][taskName] = true + + resources := []*mesos.Resource{ + mesosutil.NewScalarResource("cpus", task.CPU), + mesosutil.NewScalarResource("mem", task.RAM), + } + + if s.wattsAsAResource { + if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil { + s.LogTaskWattsConsideration(task, *offer.Hostname, wattsToConsider) + resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider)) + } else { + // Error in determining wattsConsideration + s.LogElectronError(err) + } + } + + return &mesos.TaskInfo{ + Name: proto.String(taskName), + TaskId: &mesos.TaskID{ + Value: proto.String("electron-" + taskName), + }, + SlaveId: offer.SlaveId, + Resources: resources, + Command: &mesos.CommandInfo{ + Value: proto.String(task.CMD), + }, + Container: &mesos.ContainerInfo{ + Type: mesos.ContainerInfo_DOCKER.Enum(), + Docker: &mesos.ContainerInfo_DockerInfo{ + Image: proto.String(task.Image), + Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated + }, + }, + } } -func (s *base) ExecutorLost(_ sched.SchedulerDriver, - executorID *mesos.ExecutorID, + +func (s *baseScheduler) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) { + s.LogOfferRescinded(offerID) +} +func (s *baseScheduler) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) { + s.LogSlaveLost(slaveID) +} +func (s *baseScheduler) ExecutorLost(_ sched.SchedulerDriver, executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, status int) { - log.Printf("Executor %s on slave %s was lost", executorID, slaveID) + s.LogExecutorLost(executorID, slaveID) } -func (s *base) Error(_ sched.SchedulerDriver, err string) { - log.Printf("Receiving an error: %s", err) +func (s *baseScheduler) Error(_ sched.SchedulerDriver, err string) { + s.LogMesosError(err) } -func (s *base) FrameworkMessage( +func (s *baseScheduler) FrameworkMessage( driver sched.SchedulerDriver, executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, message string) { - - log.Println("Getting a framework message: ", message) - log.Printf("Received a framework message from some unknown source: %s", *executorID.Value) + s.LogFrameworkMessage(executorID, slaveID, message) } -func (s *base) Registered( +func (s *baseScheduler) Registered( _ sched.SchedulerDriver, frameworkID *mesos.FrameworkID, masterInfo *mesos.MasterInfo) { - log.Printf("Framework %s registered with master %s", frameworkID, masterInfo) + s.LogFrameworkRegistered(frameworkID, masterInfo) } -func (s *base) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) { - log.Printf("Framework re-registered with master %s", masterInfo) +func (s *baseScheduler) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) { + s.LogFrameworkReregistered(masterInfo) } -func (s *base) Disconnected(sched.SchedulerDriver) { - log.Println("Framework disconnected with master") +func (s *baseScheduler) Disconnected(sched.SchedulerDriver) { + s.LogDisconnected() +} + +func (s *baseScheduler) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { + s.curSchedPolicy.ConsumeOffers(s, driver, offers) +} + +func (s *baseScheduler) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { + s.LogTaskStatusUpdate(status) + if *status.State == mesos.TaskState_TASK_RUNNING { + s.tasksRunning++ + } else if IsTerminal(status.State) { + delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) + s.tasksRunning-- + if s.tasksRunning == 0 { + select { + case <-s.Shutdown: + close(s.Done) + default: + } + } + } +} + +func (s *baseScheduler) Log(lmt elecLogDef.LogMessageType, msg string) { + s.mutex.Lock() + s.logMsgType <- lmt + s.logMsg <- msg + s.mutex.Unlock() +} + +func (s *baseScheduler) LogTaskStarting(ts *def.Task, offer *mesos.Offer) { + lmt := elecLogDef.GENERAL + msgColor := elecLogDef.LogMessageColors[lmt] + var msg string + if ts == nil { + msg = msgColor.Sprintf("TASKS STARTING... host = [%s]", offer.GetHostname()) + } else { + msg = msgColor.Sprintf("TASK STARTING... task = [%s], Instance = %d, host = [%s]", + ts.Name, *ts.Instances, offer.GetHostname()) + } + s.Log(lmt, msg) +} + +func (s *baseScheduler) LogTaskWattsConsideration(ts def.Task, host string, wattsToConsider float64) { + lmt := elecLogDef.GENERAL + msgColor := elecLogDef.LogMessageColors[lmt] + msg := msgColor.Sprintf("Watts considered for task[%s] and host[%s] = %f Watts", + ts.Name, host, wattsToConsider) + s.Log(lmt, msg) +} + +func (s *baseScheduler) LogOffersReceived(offers []*mesos.Offer) { + lmt := elecLogDef.GENERAL + msgColor := elecLogDef.LogMessageColors[lmt] + msg := msgColor.Sprintf("Received %d resource offers", len(offers)) + s.Log(lmt, msg) +} + +func (s *baseScheduler) LogNoPendingTasksDeclineOffers(offer *mesos.Offer) { + lmt := elecLogDef.WARNING + msgColor := elecLogDef.LogMessageColors[lmt] + msg := msgColor.Sprintf("DECLINING OFFER for host[%s]... "+ + "No tasks left to schedule", offer.GetHostname()) + s.Log(lmt, msg) +} + +func (s *baseScheduler) LogNumberOfRunningTasks() { + lmt := elecLogDef.GENERAL + msgColor := elecLogDef.LogMessageColors[lmt] + msg := msgColor.Sprintf("Number of tasks still running = %d", s.tasksRunning) + s.Log(lmt, msg) +} + +func (s *baseScheduler) LogCoLocatedTasks(slaveID string) { + lmt := elecLogDef.GENERAL + msgColor := elecLogDef.LogMessageColors[lmt] + buffer := bytes.Buffer{} + buffer.WriteString(fmt.Sprintln("Colocated with:")) + for taskName := range s.running[slaveID] { + buffer.WriteString(fmt.Sprintln(taskName)) + } + msg := msgColor.Sprintf(buffer.String()) + s.Log(lmt, msg) +} + +func (s *baseScheduler) LogSchedTrace(taskToSchedule *mesos.TaskInfo, offer *mesos.Offer) { + msg := fmt.Sprint(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) + s.Log(elecLogDef.SCHED_TRACE, msg) +} + +func (s *baseScheduler) LogTerminateScheduler() { + lmt := elecLogDef.GENERAL + msgColor := elecLogDef.LogMessageColors[lmt] + msg := msgColor.Sprint("Done scheduling all tasks!") + s.Log(lmt, msg) +} + +func (s *baseScheduler) LogInsufficientResourcesDeclineOffer(offer *mesos.Offer, + offerResources ...interface{}) { + lmt := elecLogDef.WARNING + msgColor := elecLogDef.LogMessageColors[lmt] + buffer := bytes.Buffer{} + buffer.WriteString(fmt.Sprintln("DECLINING OFFER... Offer has insufficient resources to launch a task")) + buffer.WriteString(fmt.Sprintf("Offer Resources ", offerResources...)) + msg := msgColor.Sprint(buffer.String()) + s.Log(lmt, msg) +} + +func (s *baseScheduler) LogOfferRescinded(offerID *mesos.OfferID) { + lmt := elecLogDef.ERROR + msgColor := elecLogDef.LogMessageColors[lmt] + msg := msgColor.Sprintf("OFFER RESCINDED: OfferID = %s", offerID) + s.Log(lmt, msg) +} + +func (s *baseScheduler) LogSlaveLost(slaveID *mesos.SlaveID) { + lmt := elecLogDef.ERROR + msgColor := elecLogDef.LogMessageColors[lmt] + msg := msgColor.Sprintf("SLAVE LOST: SlaveID = %s", slaveID) + s.Log(lmt, msg) +} + +func (s *baseScheduler) LogExecutorLost(executorID *mesos.ExecutorID, slaveID *mesos.SlaveID) { + lmt := elecLogDef.ERROR + msgColor := elecLogDef.LogMessageColors[lmt] + msg := msgColor.Sprintf("EXECUTOR LOST: ExecutorID = %s, SlaveID = %s", executorID, slaveID) + s.Log(lmt, msg) +} + +func (s *baseScheduler) LogFrameworkMessage(executorID *mesos.ExecutorID, + slaveID *mesos.SlaveID, message string) { + lmt := elecLogDef.GENERAL + msgColor := elecLogDef.LogMessageColors[lmt] + msg := msgColor.Sprintf("Received Framework message from executor [%s]: %s", executorID, message) + s.Log(lmt, msg) +} + +func (s *baseScheduler) LogMesosError(err string) { + lmt := elecLogDef.ERROR + msgColor := elecLogDef.LogMessageColors[lmt] + msg := msgColor.Sprintf("MESOS ERROR: %s", err) + s.Log(lmt, msg) +} + +func (s *baseScheduler) LogElectronError(err error) { + lmt := elecLogDef.ERROR + msgColor := elecLogDef.LogMessageColors[lmt] + msg := msgColor.Sprintf("ELECTRON ERROR: %v", err) + s.Log(lmt, msg) +} + +func (s *baseScheduler) LogFrameworkRegistered(frameworkID *mesos.FrameworkID, + masterInfo *mesos.MasterInfo) { + lmt := elecLogDef.SUCCESS + msgColor := elecLogDef.LogMessageColors[lmt] + msg := msgColor.Sprintf("FRAMEWORK REGISTERED! frameworkID = %s, master = %s", + frameworkID, masterInfo) + s.Log(lmt, msg) +} + +func (s *baseScheduler) LogFrameworkReregistered(masterInfo *mesos.MasterInfo) { + lmt := elecLogDef.GENERAL + msgColor := elecLogDef.LogMessageColors[lmt] + msg := msgColor.Sprintf("Framework re-registered with master %s", masterInfo) + s.Log(lmt, msg) +} + +func (s *baseScheduler) LogDisconnected() { + lmt := elecLogDef.WARNING + msgColor := elecLogDef.LogMessageColors[lmt] + msg := msgColor.Sprint("Framework disconnected with master") + s.Log(lmt, msg) +} + +func (s *baseScheduler) LogTaskStatusUpdate(status *mesos.TaskStatus) { + var lmt elecLogDef.LogMessageType + switch *status.State { + case mesos.TaskState_TASK_ERROR, mesos.TaskState_TASK_FAILED, + mesos.TaskState_TASK_KILLED, mesos.TaskState_TASK_LOST: + lmt = elecLogDef.ERROR + case mesos.TaskState_TASK_FINISHED: + lmt = elecLogDef.SUCCESS + default: + lmt = elecLogDef.GENERAL + } + msgColor := elecLogDef.LogMessageColors[lmt] + msg := elecLogDef.LogMessageColors[elecLogDef.GENERAL].Sprintf("Task Status received for task [%s] --> %s", + *status.TaskId.Value, msgColor.Sprint(NameFor(status.State))) + s.Log(lmt, msg) } diff --git a/schedulers/bin-packing.go b/schedulers/bin-packing.go index e7311df..d3cc4e1 100644 --- a/schedulers/bin-packing.go +++ b/schedulers/bin-packing.go @@ -5,108 +5,49 @@ import ( "bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils" "bitbucket.org/sunybingcloud/elektron/utilities/offerUtils" "fmt" - "github.com/golang/protobuf/proto" mesos "github.com/mesos/mesos-go/mesosproto" - "github.com/mesos/mesos-go/mesosutil" sched "github.com/mesos/mesos-go/scheduler" "log" - "time" + "math/rand" ) -// Decides if to take an offer or not. -func (s *BinPacking) takeOffer(offer *mesos.Offer, task def.Task, totalCPU, totalRAM, totalWatts float64) bool { +// Decides if to take an offer or not +func (s *BinPackSortedWatts) takeOffer(spc SchedPolicyContext, offer *mesos.Offer, task def.Task, totalCPU, totalRAM, totalWatts float64) bool { + baseSchedRef := spc.(*baseScheduler) cpus, mem, watts := offerUtils.OfferAgg(offer) //TODO: Insert watts calculation here instead of taking them as a parameter - wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer) + wattsConsideration, err := def.WattsToConsider(task, baseSchedRef.classMapWatts, offer) if err != nil { // Error in determining wattsConsideration. log.Fatal(err) } if (cpus >= (totalCPU + task.CPU)) && (mem >= (totalRAM + task.RAM)) && - (!s.wattsAsAResource || (watts >= (totalWatts + wattsConsideration))) { + (!baseSchedRef.wattsAsAResource || (watts >= (totalWatts + wattsConsideration))) { return true } return false } -type BinPacking struct { - base // Type embedded to inherit common functions. +type BinPackSortedWatts struct { + SchedPolicyState } -// Initialization. -func (s *BinPacking) init(opts ...schedPolicyOption) { - s.base.init(opts...) - // Sorting the tasks based on watts. - def.SortTasks(s.tasks, def.SortByWatts) -} - -func (s *BinPacking) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { - taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) - s.tasksCreated++ - - if !*s.RecordPCP { - // Turn on logging. - *s.RecordPCP = true - time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts - } - - // If this is our first time running into this Agent. - if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok { - s.running[offer.GetSlaveId().GoString()] = make(map[string]bool) - } - - // Add task to list of tasks running on node. - s.running[offer.GetSlaveId().GoString()][taskName] = true - - resources := []*mesos.Resource{ - mesosutil.NewScalarResource("cpus", task.CPU), - mesosutil.NewScalarResource("mem", task.RAM), - } - - if s.wattsAsAResource { - if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil { - log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider) - resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider)) - } else { - // Error in determining wattsToConsider. - log.Fatal(err) - } - } - - return &mesos.TaskInfo{ - Name: proto.String(taskName), - TaskId: &mesos.TaskID{ - Value: proto.String("elektron-" + taskName), - }, - SlaveId: offer.SlaveId, - Resources: resources, - Command: &mesos.CommandInfo{ - Value: proto.String(task.CMD), - }, - Container: &mesos.ContainerInfo{ - Type: mesos.ContainerInfo_DOCKER.Enum(), - Docker: &mesos.ContainerInfo_DockerInfo{ - Image: proto.String(task.Image), - Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated - }, - }, - } -} - -func (s *BinPacking) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { - log.Printf("Received %d resource offers", len(offers)) +func (s *BinPackSortedWatts) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDriver, offers []*mesos.Offer) { + fmt.Println("BPSW scheduling...") + baseSchedRef := spc.(*baseScheduler) + def.SortTasks(baseSchedRef.tasks, def.SortByWatts) + baseSchedRef.LogOffersReceived(offers) for _, offer := range offers { offerUtils.UpdateEnvironment(offer) select { - case <-s.Shutdown: - log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") + case <-baseSchedRef.Shutdown: + baseSchedRef.LogNoPendingTasksDeclineOffers(offer) driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) - - log.Println("Number of tasks still running: ", s.tasksRunning) + baseSchedRef.LogNumberOfRunningTasks() continue default: } @@ -117,9 +58,9 @@ func (s *BinPacking) ResourceOffers(driver sched.SchedulerDriver, offers []*meso totalWatts := 0.0 totalCPU := 0.0 totalRAM := 0.0 - for i := 0; i < len(s.tasks); i++ { - task := s.tasks[i] - wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer) + for i := 0; i < len(baseSchedRef.tasks); i++ { + task := baseSchedRef.tasks[i] + wattsConsideration, err := def.WattsToConsider(task, baseSchedRef.classMapWatts, offer) if err != nil { // Error in determining wattsConsideration. log.Fatal(err) @@ -131,29 +72,28 @@ func (s *BinPacking) ResourceOffers(driver sched.SchedulerDriver, offers []*meso } for *task.Instances > 0 { - // Does the task fit. - if s.takeOffer(offer, task, totalCPU, totalRAM, totalWatts) { + // Does the task fit + if s.takeOffer(spc, offer, task, totalCPU, totalRAM, totalWatts) { offerTaken = true totalWatts += wattsConsideration totalCPU += task.CPU totalRAM += task.RAM - log.Println("Co-Located with: ") - coLocated(s.running[offer.GetSlaveId().GoString()]) - taskToSchedule := s.newTask(offer, task) + baseSchedRef.LogCoLocatedTasks(offer.GetSlaveId().GoString()) + taskToSchedule := baseSchedRef.newTask(offer, task) tasks = append(tasks, taskToSchedule) - fmt.Println("Inst: ", *task.Instances) - s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) + baseSchedRef.LogSchedTrace(taskToSchedule, offer) *task.Instances-- if *task.Instances <= 0 { - // All instances of task have been scheduled, remove it. - s.tasks = append(s.tasks[:i], s.tasks[i+1:]...) + // All instances of task have been scheduled, remove it + baseSchedRef.tasks = append(baseSchedRef.tasks[:i], + baseSchedRef.tasks[i+1:]...) - if len(s.tasks) <= 0 { - log.Println("Done scheduling all tasks") - close(s.Shutdown) + if len(baseSchedRef.tasks) <= 0 { + baseSchedRef.LogTerminateScheduler() + close(baseSchedRef.Shutdown) } } } else { @@ -163,35 +103,28 @@ func (s *BinPacking) ResourceOffers(driver sched.SchedulerDriver, offers []*meso } if offerTaken { - log.Printf("Starting on [%s]\n", offer.GetHostname()) + baseSchedRef.LogTaskStarting(nil, offer) driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) } else { - // If there was no match for the task. - fmt.Println("There is not enough resources to launch a task:") + // If there was no match for the task cpus, mem, watts := offerUtils.OfferAgg(offer) - - log.Printf("\n", cpus, mem, watts) + baseSchedRef.LogInsufficientResourcesDeclineOffer(offer, cpus, mem, watts) driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } -} -func (s *BinPacking) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { - log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) - - if *status.State == mesos.TaskState_TASK_RUNNING { - s.tasksRunning++ - } else if IsTerminal(status.State) { - delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) - s.tasksRunning-- - if s.tasksRunning == 0 { - select { - case <-s.Shutdown: - close(s.Done) - default: + // Switch scheduling policy only if feature enabled from CLI + if baseSchedRef.schedPolSwitchEnabled { + // Switching to a random scheduling policy. + // TODO: Switch based on some criteria. + index := rand.Intn(len(SchedPolicies)) + for _, v := range SchedPolicies { + if index == 0 { + spc.SwitchSchedPol(v) + break } + index-- } } - log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) } diff --git a/schedulers/electronScheduler.go b/schedulers/electronScheduler.go new file mode 100644 index 0000000..db69981 --- /dev/null +++ b/schedulers/electronScheduler.go @@ -0,0 +1,73 @@ +package schedulers + +import ( + "bitbucket.org/sunybingcloud/elektron/def" + elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def" + mesos "github.com/mesos/mesos-go/mesosproto" + sched "github.com/mesos/mesos-go/scheduler" +) + +// Implements mesos scheduler. +type ElectronScheduler interface { + sched.Scheduler + init(opts ...schedPolicyOption) + + // Interface for log messages. + // Every ElectronScheduler implementer should provide definitions for these functions. + // This interface serves as a template to maintain consistent log messages. + // Each of these functions are supposed to call the Log(...) that sends the + // log message type, and the log message to the corresponding channels. + + // Pass the logMessageType and the logMessage to the loggers for logging. + Log(logMType elecLogDef.LogMessageType, logMsg string) + // To be called when about to launch a task. + // Log message indicating that a task is about to start executing. + // Also, log the host on which the task is going to be launched. + LogTaskStarting(ts *def.Task, offer *mesos.Offer) + // To be called when an offer is taken. + // Log the chosen watts attribute for the task that has fit an offer. + LogTaskWattsConsideration(ts def.Task, host string, wattsToConsider float64) + // To be called when offers are received from Mesos. + // Log the number of offers received and/or information about the received offers. + LogOffersReceived(offers []*mesos.Offer) + // To be called when a scheduling policy declines Mesos offers, as + // there are no tasks pending to be scheduled. + // Log the host information corresponding to the offers that were declined. + LogNoPendingTasksDeclineOffers(offers *mesos.Offer) + // Log the number of tasks that are currently executing on the cluster. + LogNumberOfRunningTasks() + // To be called when a task fits a Mesos offer. + // Log information on the tasks that the new task is going to be coLocated with. + // Uses the coLocated(...) utility in helpers.go. + LogCoLocatedTasks(slaveID string) + // Log the scheduled trace of task. + // The schedTrace includes the TaskID and the hostname of the node + // where is the task is going to be launched. + LogSchedTrace(taskToSchedule *mesos.TaskInfo, offer *mesos.Offer) + // To be called when all the tasks have completed executing. + // Log message indicating that Electron has scheduled all the tasks. + LogTerminateScheduler() + // To be called when the offer is not consumed. + // Log message to indicate that the offer had insufficient resources. + LogInsufficientResourcesDeclineOffer(offer *mesos.Offer, offerResources ...interface{}) + // To be called when offer is rescinded by Mesos. + LogOfferRescinded(offerID *mesos.OfferID) + // To be called when Mesos agent is lost + LogSlaveLost(slaveID *mesos.SlaveID) + // To be called when executor lost. + LogExecutorLost(executorID *mesos.ExecutorID, slaveID *mesos.SlaveID) + // Log a mesos error + LogMesosError(err string) + // Log an Electron error + LogElectronError(err error) + // Log Framework message + LogFrameworkMessage(executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, message string) + // Log Framework has been registered + LogFrameworkRegistered(frameworkID *mesos.FrameworkID, masterInfo *mesos.MasterInfo) + // Log Framework has been re-registered + LogFrameworkReregistered(masterInfo *mesos.MasterInfo) + // Log Framework has been disconnected from the Mesos master + LogDisconnected() + // Log Status update of a task + LogTaskStatusUpdate(status *mesos.TaskStatus) +} diff --git a/schedulers/first-fit.go b/schedulers/first-fit.go index 3846a04..2d9dae3 100644 --- a/schedulers/first-fit.go +++ b/schedulers/first-fit.go @@ -5,27 +5,24 @@ import ( "bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils" "bitbucket.org/sunybingcloud/elektron/utilities/offerUtils" "fmt" - "github.com/golang/protobuf/proto" mesos "github.com/mesos/mesos-go/mesosproto" - "github.com/mesos/mesos-go/mesosutil" sched "github.com/mesos/mesos-go/scheduler" - "log" - "time" + "math/rand" ) -// Decides if to take an offer or not. -func (s *FirstFit) takeOffer(offer *mesos.Offer, task def.Task) bool { - +// Decides if to take an offer or not +func (s *FirstFit) takeOffer(spc SchedPolicyContext, offer *mesos.Offer, task def.Task) bool { + baseSchedRef := spc.(*baseScheduler) cpus, mem, watts := offerUtils.OfferAgg(offer) //TODO: Insert watts calculation here instead of taking them as a parameter - wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer) + wattsConsideration, err := def.WattsToConsider(task, baseSchedRef.classMapWatts, offer) if err != nil { - // Error in determining wattsConsideration. - log.Fatal(err) + // Error in determining wattsConsideration + baseSchedRef.LogElectronError(err) } - if cpus >= task.CPU && mem >= task.RAM && (!s.wattsAsAResource || watts >= wattsConsideration) { + if cpus >= task.CPU && mem >= task.RAM && (!baseSchedRef.wattsAsAResource || watts >= wattsConsideration) { return true } @@ -34,120 +31,61 @@ func (s *FirstFit) takeOffer(offer *mesos.Offer, task def.Task) bool { // Elektron scheduler implements the Scheduler interface. type FirstFit struct { - base // Type embedded to inherit common functions + SchedPolicyState } -// Initialization. -func (s *FirstFit) init(opts ...schedPolicyOption) { - s.base.init(opts...) -} - -func (s *FirstFit) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { - taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) - s.tasksCreated++ - - if !*s.RecordPCP { - // Turn on logging. - *s.RecordPCP = true - time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts. - } - - // If this is our first time running into this Agent. - if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok { - s.running[offer.GetSlaveId().GoString()] = make(map[string]bool) - } - - // Add task to list of tasks running on node. - s.running[offer.GetSlaveId().GoString()][taskName] = true - - resources := []*mesos.Resource{ - mesosutil.NewScalarResource("cpus", task.CPU), - mesosutil.NewScalarResource("mem", task.RAM), - } - - if s.wattsAsAResource { - if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil { - log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider) - resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider)) - } else { - // Error in determining wattsConsideration. - log.Fatal(err) - } - } - - return &mesos.TaskInfo{ - Name: proto.String(taskName), - TaskId: &mesos.TaskID{ - Value: proto.String("elektron-" + taskName), - }, - SlaveId: offer.SlaveId, - Resources: resources, - Command: &mesos.CommandInfo{ - Value: proto.String(task.CMD), - }, - Container: &mesos.ContainerInfo{ - Type: mesos.ContainerInfo_DOCKER.Enum(), - Docker: &mesos.ContainerInfo_DockerInfo{ - Image: proto.String(task.Image), - Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated - }, - }, - } -} - -func (s *FirstFit) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { - log.Printf("Received %d resource offers", len(offers)) +func (s *FirstFit) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDriver, offers []*mesos.Offer) { + fmt.Println("FirstFit scheduling...") + baseSchedRef := spc.(*baseScheduler) + baseSchedRef.LogOffersReceived(offers) for _, offer := range offers { offerUtils.UpdateEnvironment(offer) select { - case <-s.Shutdown: - log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") + case <-baseSchedRef.Shutdown: + baseSchedRef.LogNoPendingTasksDeclineOffers(offer) driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) - - log.Println("Number of tasks still running: ", s.tasksRunning) + baseSchedRef.LogNumberOfRunningTasks() continue default: } tasks := []*mesos.TaskInfo{} - // First fit strategy. + // First fit strategy offerTaken := false - for i := 0; i < len(s.tasks); i++ { - task := s.tasks[i] + for i := 0; i < len(baseSchedRef.tasks); i++ { + task := baseSchedRef.tasks[i] // Don't take offer if it doesn't match our task's host requirement. if offerUtils.HostMismatch(*offer.Hostname, task.Host) { continue } - // Decision to take the offer or not. - if s.takeOffer(offer, task) { + // Decision to take the offer or not + if s.takeOffer(spc, offer, task) { - log.Println("Co-Located with: ") - coLocated(s.running[offer.GetSlaveId().GoString()]) + baseSchedRef.LogCoLocatedTasks(offer.GetSlaveId().GoString()) - taskToSchedule := s.newTask(offer, task) + taskToSchedule := baseSchedRef.newTask(offer, task) tasks = append(tasks, taskToSchedule) - log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) + baseSchedRef.LogTaskStarting(&task, offer) driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) offerTaken = true - fmt.Println("Inst: ", *task.Instances) - s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) + baseSchedRef.LogSchedTrace(taskToSchedule, offer) *task.Instances-- if *task.Instances <= 0 { - // All instances of task have been scheduled, remove it. - s.tasks[i] = s.tasks[len(s.tasks)-1] - s.tasks = s.tasks[:len(s.tasks)-1] + // All instances of task have been scheduled, remove it + baseSchedRef.tasks[i] = baseSchedRef.tasks[len(baseSchedRef.tasks)-1] + baseSchedRef.tasks = baseSchedRef.tasks[:len(baseSchedRef.tasks)-1] - if len(s.tasks) <= 0 { - log.Println("Done scheduling all tasks") - close(s.Shutdown) + if len(baseSchedRef.tasks) <= 0 { + baseSchedRef.LogTerminateScheduler() + close(baseSchedRef.Shutdown) } } break // Offer taken, move on. @@ -156,31 +94,23 @@ func (s *FirstFit) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos. // If there was no match for the task. if !offerTaken { - fmt.Println("There is not enough resources to launch a task:") cpus, mem, watts := offerUtils.OfferAgg(offer) - - log.Printf("\n", cpus, mem, watts) + baseSchedRef.LogInsufficientResourcesDeclineOffer(offer, cpus, mem, watts) driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } - } -} -func (s *FirstFit) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { - log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) - - if *status.State == mesos.TaskState_TASK_RUNNING { - s.tasksRunning++ - } else if IsTerminal(status.State) { - delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) - s.tasksRunning-- - if s.tasksRunning == 0 { - select { - case <-s.Shutdown: - close(s.Done) - default: + // Switch scheduling policy only if feature enabled from CLI + if baseSchedRef.schedPolSwitchEnabled { + // Switching to a random scheduling policy. + // TODO: Switch based on some criteria. + index := rand.Intn(len(SchedPolicies)) + for _, v := range SchedPolicies { + if index == 0 { + spc.SwitchSchedPol(v) + break } + index-- } } - log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) } diff --git a/schedulers/helpers.go b/schedulers/helpers.go index 8010bbd..2289014 100644 --- a/schedulers/helpers.go +++ b/schedulers/helpers.go @@ -4,18 +4,16 @@ import ( "bitbucket.org/sunybingcloud/elektron/constants" "bitbucket.org/sunybingcloud/elektron/def" "errors" - "fmt" - "log" - "os" + elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def" ) -func coLocated(tasks map[string]bool) { +func coLocated(tasks map[string]bool, s baseScheduler) { for task := range tasks { - log.Println(task) + s.Log(elecLogDef.GENERAL, task) } - fmt.Println("---------------------") + s.Log(elecLogDef.GENERAL, "---------------------") } // Get the powerClass of the given hostname. @@ -28,15 +26,26 @@ func hostToPowerClass(hostName string) string { return "" } -// Scheduler policy options to help initialize schedulers. +// scheduler policy options to help initialize schedulers type schedPolicyOption func(e ElectronScheduler) error +func WithSchedPolicy(schedPolicyName string) schedPolicyOption { + return func(s ElectronScheduler) error { + if schedPolicy, ok := SchedPolicies[schedPolicyName]; !ok { + return errors.New("Incorrect scheduling policy.") + } else { + s.(*baseScheduler).curSchedPolicy = schedPolicy + return nil + } + } +} + func WithTasks(ts []def.Task) schedPolicyOption { return func(s ElectronScheduler) error { if ts == nil { return errors.New("Task[] is empty.") } else { - s.(*base).tasks = ts + s.(*baseScheduler).tasks = ts return nil } } @@ -44,43 +53,31 @@ func WithTasks(ts []def.Task) schedPolicyOption { func WithWattsAsAResource(waar bool) schedPolicyOption { return func(s ElectronScheduler) error { - s.(*base).wattsAsAResource = waar + s.(*baseScheduler).wattsAsAResource = waar return nil } } func WithClassMapWatts(cmw bool) schedPolicyOption { return func(s ElectronScheduler) error { - s.(*base).classMapWatts = cmw + s.(*baseScheduler).classMapWatts = cmw return nil } } func WithRecordPCP(recordPCP *bool) schedPolicyOption { return func(s ElectronScheduler) error { - s.(*base).RecordPCP = recordPCP + s.(*baseScheduler).RecordPCP = recordPCP return nil } } -func WithSchedTracePrefix(schedTracePrefix string) schedPolicyOption { - return func(s ElectronScheduler) error { - logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") - if err != nil { - return err - } else { - s.(*base).schedTrace = log.New(logFile, "", log.LstdFlags) - return nil - } - } -} - func WithShutdown(shutdown chan struct{}) schedPolicyOption { return func(s ElectronScheduler) error { if shutdown == nil { return errors.New("Shutdown channel is nil.") } else { - s.(*base).Shutdown = shutdown + s.(*baseScheduler).Shutdown = shutdown return nil } } @@ -91,7 +88,7 @@ func WithDone(done chan struct{}) schedPolicyOption { if done == nil { return errors.New("Done channel is nil.") } else { - s.(*base).Done = done + s.(*baseScheduler).Done = done return nil } } @@ -102,8 +99,23 @@ func WithPCPLog(pcpLog chan struct{}) schedPolicyOption { if pcpLog == nil { return errors.New("PCPLog channel is nil.") } else { - s.(*base).PCPLog = pcpLog + s.(*baseScheduler).PCPLog = pcpLog return nil } } } + +func WithLoggingChannels(lmt chan elecLogDef.LogMessageType, msg chan string) schedPolicyOption { + return func(s ElectronScheduler) error { + s.(*baseScheduler).logMsgType = lmt + s.(*baseScheduler).logMsg = msg + return nil + } +} + +func WithSchedPolSwitchEnabled(enableSchedPolicySwitch bool) schedPolicyOption { + return func(s ElectronScheduler) error { + s.(*baseScheduler).schedPolSwitchEnabled = enableSchedPolicySwitch + return nil + } +} diff --git a/schedulers/max-greedymins.go b/schedulers/max-greedymins.go deleted file mode 100644 index 57d8763..0000000 --- a/schedulers/max-greedymins.go +++ /dev/null @@ -1,255 +0,0 @@ -package schedulers - -import ( - "fmt" - "log" - "time" - - "bitbucket.org/sunybingcloud/elektron/def" - "bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils" - "bitbucket.org/sunybingcloud/elektron/utilities/offerUtils" - "github.com/golang/protobuf/proto" - mesos "github.com/mesos/mesos-go/mesosproto" - "github.com/mesos/mesos-go/mesosutil" - sched "github.com/mesos/mesos-go/scheduler" -) - -// Decides if to take an offer or not. -func (s *MaxGreedyMins) takeOffer(offer *mesos.Offer, task def.Task, - totalCPU, totalRAM, totalWatts float64) bool { - - cpus, mem, watts := offerUtils.OfferAgg(offer) - - //TODO: Insert watts calculation here instead of taking them as a parameter - - wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer) - if err != nil { - // Error in determining wattsConsideration. - log.Fatal(err) - } - if (cpus >= (totalCPU + task.CPU)) && (mem >= (totalRAM + task.RAM)) && - (!s.wattsAsAResource || (watts >= (totalWatts + wattsConsideration))) { - return true - } - return false -} - -type MaxGreedyMins struct { - base //Type embedding to inherit common functions. -} - -// Initialization. -func (s *MaxGreedyMins) init(opts ...schedPolicyOption) { - s.base.init(opts...) - // Sorting the tasks based on watts. - def.SortTasks(s.tasks, def.SortByWatts) -} - -func (s *MaxGreedyMins) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { - taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) - s.tasksCreated++ - - // Start recording only when we're creating the first task. - if !*s.RecordPCP { - // Turn on logging - *s.RecordPCP = true - time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts. - } - - // If this is our first time running into this Agent. - if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok { - s.running[offer.GetSlaveId().GoString()] = make(map[string]bool) - } - - // Add task to list of tasks running on node. - s.running[offer.GetSlaveId().GoString()][taskName] = true - - resources := []*mesos.Resource{ - mesosutil.NewScalarResource("cpus", task.CPU), - mesosutil.NewScalarResource("mem", task.RAM), - } - - if s.wattsAsAResource { - if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil { - log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider) - resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider)) - } else { - // Error in determining wattsConsideration. - log.Fatal(err) - } - } - - return &mesos.TaskInfo{ - Name: proto.String(taskName), - TaskId: &mesos.TaskID{ - Value: proto.String("elektron-" + taskName), - }, - SlaveId: offer.SlaveId, - Resources: resources, - Command: &mesos.CommandInfo{ - Value: proto.String(task.CMD), - }, - Container: &mesos.ContainerInfo{ - Type: mesos.ContainerInfo_DOCKER.Enum(), - Docker: &mesos.ContainerInfo_DockerInfo{ - Image: proto.String(task.Image), - Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated. - }, - }, - } -} - -// Determine if the remaining space inside of the offer is enough for this -// the task we need to create. If it is, create a TaskInfo and return it. -func (s *MaxGreedyMins) CheckFit( - i int, - task def.Task, - wattsConsideration float64, - offer *mesos.Offer, - totalCPU *float64, - totalRAM *float64, - totalWatts *float64) (bool, *mesos.TaskInfo) { - - // Does the task fit. - if s.takeOffer(offer, task, *totalCPU, *totalRAM, *totalWatts) { - - *totalWatts += wattsConsideration - *totalCPU += task.CPU - *totalRAM += task.RAM - log.Println("Co-Located with: ") - coLocated(s.running[offer.GetSlaveId().GoString()]) - - taskToSchedule := s.newTask(offer, task) - - fmt.Println("Inst: ", *task.Instances) - s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) - *task.Instances-- - - if *task.Instances <= 0 { - // All instances of task have been scheduled, remove it. - s.tasks = append(s.tasks[:i], s.tasks[i+1:]...) - - if len(s.tasks) <= 0 { - log.Println("Done scheduling all tasks") - close(s.Shutdown) - } - } - - return true, taskToSchedule - } - - return false, nil -} - -func (s *MaxGreedyMins) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { - log.Printf("Received %d resource offers", len(offers)) - - for _, offer := range offers { - offerUtils.UpdateEnvironment(offer) - select { - case <-s.Shutdown: - log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) - - log.Println("Number of tasks still running: ", s.tasksRunning) - continue - default: - } - - tasks := []*mesos.TaskInfo{} - - offerTaken := false - totalWatts := 0.0 - totalCPU := 0.0 - totalRAM := 0.0 - - // Assumes s.tasks is ordered in non-decreasing median max peak order. - - // Attempt to schedule a single instance of the heaviest workload available first. - // Start from the back until one fits. - for i := len(s.tasks) - 1; i >= 0; i-- { - - task := s.tasks[i] - wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer) - if err != nil { - // Error in determining wattsConsideration. - log.Fatal(err) - } - - // Don't take offer if it doesn't match our task's host requirement. - if offerUtils.HostMismatch(*offer.Hostname, task.Host) { - continue - } - - // TODO: Fix this so index doesn't need to be passed - taken, taskToSchedule := s.CheckFit(i, task, wattsConsideration, offer, - &totalCPU, &totalRAM, &totalWatts) - - if taken { - offerTaken = true - tasks = append(tasks, taskToSchedule) - break - } - } - - // Pack the rest of the offer with the smallest tasks. - for i := 0; i < len(s.tasks); i++ { - task := s.tasks[i] - wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer) - if err != nil { - // Error in determining wattsConsideration. - log.Fatal(err) - } - - // Don't take offer if it doesn't match our task's host requirement. - if offerUtils.HostMismatch(*offer.Hostname, task.Host) { - continue - } - - for *task.Instances > 0 { - // TODO: Fix this so index doesn't need to be passed - taken, taskToSchedule := s.CheckFit(i, task, wattsConsideration, offer, - &totalCPU, &totalRAM, &totalWatts) - - if taken { - offerTaken = true - tasks = append(tasks, taskToSchedule) - } else { - break // Continue on to next task. - } - } - } - - if offerTaken { - log.Printf("Starting on [%s]\n", offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) - } else { - - // If there was no match for the task. - fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := offerUtils.OfferAgg(offer) - - log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) - } - } -} - -func (s *MaxGreedyMins) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { - log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) - - if *status.State == mesos.TaskState_TASK_RUNNING { - s.tasksRunning++ - } else if IsTerminal(status.State) { - delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) - s.tasksRunning-- - if s.tasksRunning == 0 { - select { - case <-s.Shutdown: - close(s.Done) - default: - } - } - } - log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) -} diff --git a/schedulers/max-min.go b/schedulers/max-min.go deleted file mode 100644 index dd77607..0000000 --- a/schedulers/max-min.go +++ /dev/null @@ -1,251 +0,0 @@ -package schedulers - -import ( - "fmt" - "log" - "time" - - "bitbucket.org/sunybingcloud/elektron/def" - "bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils" - "bitbucket.org/sunybingcloud/elektron/utilities/offerUtils" - "github.com/golang/protobuf/proto" - mesos "github.com/mesos/mesos-go/mesosproto" - "github.com/mesos/mesos-go/mesosutil" - sched "github.com/mesos/mesos-go/scheduler" -) - -// Decides if to take an offer or not. -func (s *MaxMin) takeOffer(offer *mesos.Offer, task def.Task, - totalCPU, totalRAM, totalWatts float64) bool { - - cpus, mem, watts := offerUtils.OfferAgg(offer) - - //TODO: Insert watts calculation here instead of taking them as a parameter - - wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer) - if err != nil { - // Error in determining wattsConsideration. - log.Fatal(err) - } - if (cpus >= (totalCPU + task.CPU)) && (mem >= (totalRAM + task.RAM)) && - (!s.wattsAsAResource || (watts >= (totalWatts + wattsConsideration))) { - return true - } - return false -} - -type MaxMin struct { - base //Type embedding to inherit common functions. -} - -// Initialization. -func (s *MaxMin) init(opts ...schedPolicyOption) { - s.base.init(opts...) - // Sorting the tasks based on Watts. - def.SortTasks(s.tasks, def.SortByWatts) -} - -func (s *MaxMin) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { - taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) - s.tasksCreated++ - - // Start recording only when we're creating the first task. - if !*s.RecordPCP { - // Turn on logging. - *s.RecordPCP = true - time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts. - } - - // If this is our first time running into this Agent. - if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok { - s.running[offer.GetSlaveId().GoString()] = make(map[string]bool) - } - - // Add task to list of tasks running on node. - s.running[offer.GetSlaveId().GoString()][taskName] = true - - resources := []*mesos.Resource{ - mesosutil.NewScalarResource("cpus", task.CPU), - mesosutil.NewScalarResource("mem", task.RAM), - } - - if s.wattsAsAResource { - if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil { - log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider) - resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider)) - } else { - // Error in determining wattsConsideration. - log.Fatal(err) - } - } - - return &mesos.TaskInfo{ - Name: proto.String(taskName), - TaskId: &mesos.TaskID{ - Value: proto.String("elektron-" + taskName), - }, - SlaveId: offer.SlaveId, - Resources: resources, - Command: &mesos.CommandInfo{ - Value: proto.String(task.CMD), - }, - Container: &mesos.ContainerInfo{ - Type: mesos.ContainerInfo_DOCKER.Enum(), - Docker: &mesos.ContainerInfo_DockerInfo{ - Image: proto.String(task.Image), - Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated. - }, - }, - } -} - -// Determine if the remaining space inside of the offer is enough for this -// the task we need to create. If it is, create a TaskInfo and return it. -func (s *MaxMin) CheckFit( - i int, - task def.Task, - wattsConsideration float64, - offer *mesos.Offer, - totalCPU *float64, - totalRAM *float64, - totalWatts *float64) (bool, *mesos.TaskInfo) { - - // Does the task fit. - if s.takeOffer(offer, task, *totalCPU, *totalRAM, *totalWatts) { - - *totalWatts += wattsConsideration - *totalCPU += task.CPU - *totalRAM += task.RAM - log.Println("Co-Located with: ") - coLocated(s.running[offer.GetSlaveId().GoString()]) - - taskToSchedule := s.newTask(offer, task) - - fmt.Println("Inst: ", *task.Instances) - s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) - *task.Instances-- - - if *task.Instances <= 0 { - // All instances of task have been scheduled, remove it. - s.tasks = append(s.tasks[:i], s.tasks[i+1:]...) - - if len(s.tasks) <= 0 { - log.Println("Done scheduling all tasks") - close(s.Shutdown) - } - } - - return true, taskToSchedule - } - - return false, nil -} - -func (s *MaxMin) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { - log.Printf("Received %d resource offers", len(offers)) - - for _, offer := range offers { - offerUtils.UpdateEnvironment(offer) - select { - case <-s.Shutdown: - log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) - - log.Println("Number of tasks still running: ", s.tasksRunning) - continue - default: - } - - tasks := []*mesos.TaskInfo{} - - offerTaken := false - totalWatts := 0.0 - totalCPU := 0.0 - totalRAM := 0.0 - - // Assumes s.tasks is ordered in non-decreasing median max peak order. - - // Attempt to schedule a single instance of the heaviest workload available first. - // Start from the back until one fits. - - direction := false // True = Min Max, False = Max Min. - var index int - start := true // If false then index has changed and need to keep it that way. - for i := 0; i < len(s.tasks); i++ { - // We need to pick a min task or a max task - // depending on the value of direction. - if direction && start { - index = 0 - } else if start { - index = len(s.tasks) - i - 1 - } - task := s.tasks[index] - - wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer) - if err != nil { - // Error in determining wattsConsideration. - log.Fatal(err) - } - - // Don't take offer it is doesn't match our task's host requirement. - if offerUtils.HostMismatch(*offer.Hostname, task.Host) { - continue - } - - // TODO: Fix this so index doesn't need to be passed - taken, taskToSchedule := s.CheckFit(index, task, wattsConsideration, offer, - &totalCPU, &totalRAM, &totalWatts) - - if taken { - offerTaken = true - tasks = append(tasks, taskToSchedule) - // Need to change direction and set start to true. - // Setting start to true would ensure that index be set accurately again. - direction = !direction - start = true - i-- - } else { - // Need to move index depending on the value of direction. - if direction { - index++ - start = false - } else { - index-- - start = false - } - } - } - - if offerTaken { - log.Printf("Starting on [%s]\n", offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) - } else { - - // If there was no match for the task - fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := offerUtils.OfferAgg(offer) - - log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) - } - } -} - -func (s *MaxMin) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { - log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) - - if *status.State == mesos.TaskState_TASK_RUNNING { - s.tasksRunning++ - } else if IsTerminal(status.State) { - delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) - s.tasksRunning-- - if s.tasksRunning == 0 { - select { - case <-s.Shutdown: - close(s.Done) - default: - } - } - } - log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) -} diff --git a/schedulers/schedPolicy.go b/schedulers/schedPolicy.go new file mode 100644 index 0000000..38a688c --- /dev/null +++ b/schedulers/schedPolicy.go @@ -0,0 +1,16 @@ +package schedulers + +import ( + mesos "github.com/mesos/mesos-go/mesosproto" + sched "github.com/mesos/mesos-go/scheduler" +) + +type SchedPolicyContext interface { + // Change the state of scheduling. + SwitchSchedPol(s SchedPolicyState) +} + +type SchedPolicyState interface { + // Define the particular scheduling policy's methodology of resource offer consumption. + ConsumeOffers(SchedPolicyContext, sched.SchedulerDriver, []*mesos.Offer) +} diff --git a/schedulers/store.go b/schedulers/store.go index e594ca5..e6814c3 100644 --- a/schedulers/store.go +++ b/schedulers/store.go @@ -1,30 +1,32 @@ package schedulers -import "github.com/mesos/mesos-go/scheduler" +import ( + sched "github.com/mesos/mesos-go/scheduler" +) // Names of different scheduling policies. const ( - ff = "first-fit" - bp = "bin-packing" - mgm = "max-greedymins" - mm = "max-min" + ff = "first-fit" + bp = "bin-packing" + mgm = "max-greedymins" + mm = "max-min" ) -// Scheduler class factory. -var Schedulers map[string]scheduler.Scheduler = map[string]scheduler.Scheduler{ - ff: &FirstFit{base: base{}}, - bp: &BinPacking{base: base{}}, - mgm: &MaxGreedyMins{base: base{}}, - mm: &MaxMin{base: base{}}, +// Scheduling policy factory +var SchedPolicies map[string]SchedPolicyState = map[string]SchedPolicyState{ + ff: &FirstFit{}, + bp: &BinPackSortedWatts{}, + mgm: &MaxGreedyMins{}, + mm: &MaxMin{}, } -// Build the scheduling policy with the options being applied. -func BuildSchedPolicy(s scheduler.Scheduler, opts ...schedPolicyOption) { +// build the scheduling policy with the options being applied +func buildScheduler(s sched.Scheduler, opts ...schedPolicyOption) { s.(ElectronScheduler).init(opts...) } -func SchedFactory(schedPolicyName string, opts ...schedPolicyOption) scheduler.Scheduler { - s := Schedulers[schedPolicyName] - BuildSchedPolicy(s, opts...) +func SchedFactory(opts ...schedPolicyOption) sched.Scheduler { + s := &baseScheduler{} + buildScheduler(s, opts...) return s }