Merged in experimentation/schedPolicySwitcher (pull request #1)

Experimentation/schedPolicySwitcher
1. Initial commit for consolidated loggers using observer pattern.
2. class factory for schedulers.
3. Using the scheduling policy class factory in schedulers/store.go and the scheduler builder helpers in schedulers/helpers.go, feature to be able to be able to plug a scheduling policy of your choice from the command line (right now only first-fit and bin-packing are possible. Will be updating the class factory to include other scheduling policies as well.
4. Removed TODO for using generic task sorters. Modified TODO for a config file input to run electron.
5. Added other schedulers to the factory
6. Partially retrofitted the other scheduling policies to use the logging library.
7. Retrofitted extrema and progressive to use the consolidated logging library. Fixed parameter issue with s.base.Disconnected(). Formatted project
8. Move statusUpdate(...) into base.go to remove redundant code.
9. Converted the baseScheduler into a state machine where the state is a scheduling policy that defines an approach to consume resource offers.
10. Added another command line argument to be used to enable switching of scheduling policies. Retrofitted scheduling policies to switch only if the particular feature has been enabled.

changed argument to coLocated(...) to take base type rather than ElectronScheduler type. Also, prepended the prefix to the directory of the logs so that it would be easier to determine what the files in a directory correspond to without viewing the contents of the directory.
Defined methods in ElectronScheduler. Each of these methods corresponds to a type of log that an ElectronScheduler would make. Each of these methods would need to be implemented by the scheduling policy.

Electron has only one scheduler that implements the mesos scheduler interface. All the scheduling policies are just different implementations of ways to consume mesos resource offers. Retrofitted scheduling policies to now embed SchedPolicyState instead of baseScheduler.

Approved-by: Pradyumna Kaushik <pkaushi1@binghamton.edu>
This commit is contained in:
Pradyumna Kaushik 2018-01-19 21:20:43 +00:00
parent cb71153362
commit 065705d480
24 changed files with 1392 additions and 917 deletions

View file

@ -0,0 +1,18 @@
package logging
import (
"log"
)
type ConsoleLogger struct {
loggerObserverImpl
}
func (cl ConsoleLogger) Log(message string) {
// We need to log to console only if the message is not empty
if message != "" {
log.Println(message)
// Also logging the message to the console log file
cl.logObserverSpecifics[conLogger].logFile.Println(message)
}
}

41
logging/def/logType.go Normal file
View file

@ -0,0 +1,41 @@
package logging
import "github.com/fatih/color"
// Defining enums of log message types
var logMessageNames []string
// Possible log message types
var (
ERROR = messageNametoMessageType("ERROR")
WARNING = messageNametoMessageType("WARNING")
GENERAL = messageNametoMessageType("GENERAL")
SUCCESS = messageNametoMessageType("SUCCESS")
SCHED_TRACE = messageNametoMessageType("SCHED_TRACE")
PCP = messageNametoMessageType("PCP")
)
// Text colors for the different types of log messages.
var LogMessageColors map[LogMessageType]*color.Color = map[LogMessageType]*color.Color{
ERROR: color.New(color.FgRed, color.Bold),
WARNING: color.New(color.FgYellow, color.Bold),
GENERAL: color.New(color.FgWhite, color.Bold),
SUCCESS: color.New(color.FgGreen, color.Bold),
}
type LogMessageType int
func (lmt LogMessageType) String() string {
return logMessageNames[lmt]
}
func GetLogMessageTypes() []string {
return logMessageNames
}
func messageNametoMessageType(messageName string) LogMessageType {
// Appending messageName to LogMessageNames
logMessageNames = append(logMessageNames, messageName)
// Mapping messageName to int
return LogMessageType(len(logMessageNames) - 1)
}

56
logging/def/logger.go Normal file
View file

@ -0,0 +1,56 @@
package logging
import (
"time"
)
type LoggerDriver struct {
loggerSubject
allowedMessageTypes map[LogMessageType]bool
}
func newLogger() *LoggerDriver {
logger := &LoggerDriver{
allowedMessageTypes: map[LogMessageType]bool{
ERROR: true,
GENERAL: true,
WARNING: true,
SCHED_TRACE: true,
SUCCESS: true,
PCP: true,
},
}
return logger
}
func BuildLogger(startTime time.Time, prefix string) *LoggerDriver {
// building logger
l := newLogger()
attachAllLoggers(l, startTime, prefix)
return l
}
func (log *LoggerDriver) EnabledLogging(messageType LogMessageType) {
log.allowedMessageTypes[messageType] = true
}
func (log *LoggerDriver) DisableLogging(messageType LogMessageType) {
log.allowedMessageTypes[messageType] = false
}
func (log *LoggerDriver) WriteLog(messageType LogMessageType, message string) {
// checking to see if logging for given messageType is disabled
if log.allowedMessageTypes[messageType] {
log.setMessage(message)
// notify registered loggers to log
log.notify(messageType)
}
}
func (log *LoggerDriver) Listen(logMType <-chan LogMessageType, logMsg <-chan string) {
for {
mType := <-logMType
msg := <-logMsg
log.WriteLog(mType, msg)
}
}

View file

@ -0,0 +1,78 @@
package logging
import (
logUtils "bitbucket.org/sunybingcloud/elektron/logging/utils"
"strings"
"time"
)
// Names of different loggers
const (
conLogger = "console-logger"
schedTraceLogger = "schedTrace-logger"
pcpLogger = "pcp-logger"
)
// Logger class factory
var Loggers map[string]loggerObserver = map[string]loggerObserver{
conLogger: nil,
schedTraceLogger: nil,
pcpLogger: nil,
}
// Logger options to help initialize loggers
type loggerOption func(l loggerObserver) error
func withLogDirectory(startTime time.Time, prefix string) loggerOption {
return func(l loggerObserver) error {
l.(*loggerObserverImpl).setLogDirectory(logUtils.GetLogDir(startTime, prefix))
return nil
}
}
// This loggerOption initializes the specifics for each loggerObserver
func withLoggerSpecifics(prefix string) loggerOption {
return func(l loggerObserver) error {
l.(*loggerObserverImpl).logObserverSpecifics = map[string]*specifics{
conLogger: &specifics{},
schedTraceLogger: &specifics{},
pcpLogger: &specifics{},
}
l.(*loggerObserverImpl).setLogFilePrefix(prefix)
l.(*loggerObserverImpl).setLogFile()
return nil
}
}
// Build and assign all loggers
func attachAllLoggers(lg *LoggerDriver, startTime time.Time, prefix string) {
loi := &loggerObserverImpl{}
loi.init(withLogDirectory(startTime, strings.Split(prefix, startTime.Format("20060102150405"))[0]),
withLoggerSpecifics(prefix))
Loggers[conLogger] = &ConsoleLogger{
loggerObserverImpl: *loi,
}
Loggers[schedTraceLogger] = &SchedTraceLogger{
loggerObserverImpl: *loi,
}
Loggers[pcpLogger] = &PCPLogger{
loggerObserverImpl: *loi,
}
for _, lmt := range GetLogMessageTypes() {
switch lmt {
case SCHED_TRACE.String():
lg.attach(SCHED_TRACE, Loggers[schedTraceLogger])
case GENERAL.String():
lg.attach(GENERAL, Loggers[conLogger])
case WARNING.String():
lg.attach(WARNING, Loggers[conLogger])
case ERROR.String():
lg.attach(ERROR, Loggers[conLogger])
case SUCCESS.String():
lg.attach(SUCCESS, Loggers[conLogger])
case PCP.String():
lg.attach(PCP, Loggers[pcpLogger])
}
}
}

View file

@ -0,0 +1,77 @@
package logging
import (
"fmt"
"log"
"os"
)
// Logging platform
type loggerObserver interface {
Log(message string)
setLogFile()
setLogFilePrefix(prefix string)
setLogDirectory(dirName string)
init(opts ...loggerOption)
}
type specifics struct {
logFilePrefix string
logFile *log.Logger
}
type loggerObserverImpl struct {
logFile *log.Logger
logObserverSpecifics map[string]*specifics
logDirectory string
}
func (loi *loggerObserverImpl) init(opts ...loggerOption) {
for _, opt := range opts {
// applying logger options
if err := opt(loi); err != nil {
log.Fatal(err)
}
}
}
func (loi loggerObserverImpl) Log(message string) {}
// Requires logFilePrefix to have already been set
func (loi *loggerObserverImpl) setLogFile() {
for prefix, ls := range loi.logObserverSpecifics {
if logFile, err := os.Create(ls.logFilePrefix); err != nil {
log.Fatal("Unable to create logFile: ", err)
} else {
fmt.Printf("Creating logFile with pathname: %s, and prefix: %s\n", ls.logFilePrefix, prefix)
ls.logFile = log.New(logFile, "", log.LstdFlags)
}
}
}
func (loi *loggerObserverImpl) setLogFilePrefix(prefix string) {
// Setting logFilePrefix for pcp logger
pcpLogFilePrefix := prefix + ".pcplog"
if loi.logDirectory != "" {
pcpLogFilePrefix = loi.logDirectory + "/" + pcpLogFilePrefix
}
loi.logObserverSpecifics[pcpLogger].logFilePrefix = pcpLogFilePrefix
// Setting logFilePrefix for console logger
consoleLogFilePrefix := prefix + "_console.log"
if loi.logDirectory != "" {
consoleLogFilePrefix = loi.logDirectory + "/" + consoleLogFilePrefix
}
loi.logObserverSpecifics[conLogger].logFilePrefix = consoleLogFilePrefix
// Setting logFilePrefix for schedTrace logger
schedTraceLogFilePrefix := prefix + "_schedTrace.log"
if loi.logDirectory != "" {
schedTraceLogFilePrefix = loi.logDirectory + "/" + schedTraceLogFilePrefix
}
loi.logObserverSpecifics[schedTraceLogger].logFilePrefix = schedTraceLogFilePrefix
}
func (loi *loggerObserverImpl) setLogDirectory(dirName string) {
loi.logDirectory = dirName
}

View file

@ -0,0 +1,23 @@
package logging
type loggerSubject struct {
Registry map[LogMessageType][]loggerObserver
message string
}
func (ls *loggerSubject) setMessage(message string) {
ls.message = message
}
func (ls *loggerSubject) attach(messageType LogMessageType, lo loggerObserver) {
if ls.Registry == nil {
ls.Registry = make(map[LogMessageType][]loggerObserver)
}
ls.Registry[messageType] = append(ls.Registry[messageType], lo)
}
func (ls *loggerSubject) notify(messageType LogMessageType) {
for _, logObserver := range ls.Registry[messageType] {
logObserver.Log(ls.message)
}
}

9
logging/def/pcpLogger.go Normal file
View file

@ -0,0 +1,9 @@
package logging
type PCPLogger struct {
loggerObserverImpl
}
func (pl *PCPLogger) Log(message string) {
pl.logObserverSpecifics[pcpLogger].logFile.Println(message)
}

View file

@ -0,0 +1,10 @@
package logging
type SchedTraceLogger struct {
loggerObserverImpl
}
func (stl SchedTraceLogger) Log(message string) {
// Logging schedule trace to mentioned file
stl.logObserverSpecifics[schedTraceLogger].logFile.Println(message)
}

View file

@ -0,0 +1,39 @@
package logging
import (
"log"
"os"
"strconv"
"time"
)
var LogDir string
func GetLogDir(startTime time.Time, prefix string) string {
if LogDir == "" {
LogDir = createLogDir(prefix, startTime)
}
return LogDir
}
func createLogDir(prefix string, startTime time.Time) string {
// Creating directory to store all logs for this run
logDirName := "./" + prefix + strconv.Itoa(startTime.Year())
logDirName += "-"
logDirName += startTime.Month().String()
logDirName += "-"
logDirName += strconv.Itoa(startTime.Day())
logDirName += "_"
logDirName += strconv.Itoa(startTime.Hour())
logDirName += "-"
logDirName += strconv.Itoa(startTime.Minute())
logDirName += "-"
logDirName += strconv.Itoa(startTime.Second())
if _, err := os.Stat(logDirName); os.IsNotExist(err) {
os.Mkdir(logDirName, 0700)
} else {
log.Println("Unable to create log directory: ", err)
logDirName = ""
}
return logDirName
}

View file

@ -1,27 +1,19 @@
package pcp
import (
elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def"
"bufio"
"log"
"os"
"os/exec"
"syscall"
"time"
)
func Start(quit chan struct{}, logging *bool, prefix string) {
func Start(quit chan struct{}, logging *bool, logMType chan elecLogDef.LogMessageType, logMsg chan string) {
const pcpCommand string = "pmdumptext -m -l -f '' -t 1.0 -d , -c config"
cmd := exec.Command("sh", "-c", pcpCommand)
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
logFile, err := os.Create("./" + prefix + ".pcplog")
if err != nil {
log.Fatal(err)
}
log.Println("Writing pcp logs to file: " + logFile.Name())
defer logFile.Close()
pipe, err := cmd.StdoutPipe()
if err != nil {
log.Fatal(err)
@ -34,8 +26,9 @@ func Start(quit chan struct{}, logging *bool, prefix string) {
// Get names of the columns.
scanner.Scan()
// Write to logfile.
logFile.WriteString(scanner.Text() + "\n")
// Write to logfile
logMType <- elecLogDef.PCP
logMsg <- scanner.Text()
// Throw away first set of results.
scanner.Scan()
@ -44,15 +37,16 @@ func Start(quit chan struct{}, logging *bool, prefix string) {
for scanner.Scan() {
if *logging {
log.Println("Logging PCP...")
logFile.WriteString(scanner.Text() + "\n")
logMType <- elecLogDef.PCP
logMsg <- scanner.Text()
}
seconds++
}
}(logging)
log.Println("PCP logging started")
logMType <- elecLogDef.GENERAL
logMsg <- "PCP logging started"
if err := cmd.Start(); err != nil {
log.Fatal(err)
@ -62,7 +56,8 @@ func Start(quit chan struct{}, logging *bool, prefix string) {
select {
case <-quit:
log.Println("Stopping PCP logging in 5 seconds")
logMType <- elecLogDef.GENERAL
logMsg <- "Stopping PCP logging in 5 seconds"
time.Sleep(5 * time.Second)
// http://stackoverflow.com/questions/22470193/why-wont-go-kill-a-child-process-correctly

View file

@ -3,10 +3,11 @@ package pcp
import (
"bitbucket.org/sunybingcloud/elektron/pcp"
"bitbucket.org/sunybingcloud/elektron/rapl"
elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def"
"bufio"
"container/ring"
"fmt"
"log"
"os"
"os/exec"
"sort"
"strconv"
@ -15,7 +16,9 @@ import (
"time"
)
func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, prefix string, hiThreshold, loThreshold float64) {
func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, hiThreshold, loThreshold float64,
logMType chan elecLogDef.LogMessageType, logMsg chan string) {
const pcpCommand string = "pmdumptext -m -l -f '' -t 1.0 -d , -c config"
cmd := exec.Command("sh", "-c", pcpCommand)
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
@ -24,13 +27,6 @@ func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, prefix s
log.Println("High threshold is lower than low threshold!")
}
logFile, err := os.Create("./" + prefix + ".pcplog")
if err != nil {
log.Fatal(err)
}
defer logFile.Close()
pipe, err := cmd.StdoutPipe()
if err != nil {
log.Fatal(err)
@ -43,8 +39,9 @@ func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, prefix s
// Get names of the columns.
scanner.Scan()
// Write to logfile.
logFile.WriteString(scanner.Text() + "\n")
// Write to logfile
logMType <- elecLogDef.PCP
logMsg <- scanner.Text()
headers := strings.Split(scanner.Text(), ",")
@ -79,9 +76,11 @@ func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, prefix s
for scanner.Scan() {
if *logging {
log.Println("Logging PCP...")
logMType <- elecLogDef.GENERAL
logMsg <- "Logging PCP..."
split := strings.Split(scanner.Text(), ",")
logFile.WriteString(scanner.Text() + "\n")
logMType <- elecLogDef.PCP
logMsg <- scanner.Text()
totalPower := 0.0
for _, powerIndex := range powerIndexes {
@ -92,7 +91,8 @@ func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, prefix s
powerHistories[host].Value = power
powerHistories[host] = powerHistories[host].Next()
log.Printf("Host: %s, Power: %f", indexToHost[powerIndex], (power * pcp.RAPLUnits))
logMType <- elecLogDef.GENERAL
logMsg <- fmt.Sprintf("Host: %s, Power: %f", indexToHost[powerIndex], (power * pcp.RAPLUnits))
totalPower += power
}
@ -103,11 +103,13 @@ func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, prefix s
clusterMean := pcp.AverageClusterPowerHistory(clusterPowerHist)
log.Printf("Total power: %f, %d Sec Avg: %f", clusterPower, clusterPowerHist.Len(), clusterMean)
logMType <- elecLogDef.GENERAL
logMsg <- fmt.Sprintf("Total power: %f, %d Sec Avg: %f", clusterPower, clusterPowerHist.Len(), clusterMean)
if clusterMean > hiThreshold {
log.Printf("Need to cap a node")
// Create statics for all victims and choose one to cap.
logMType <- elecLogDef.GENERAL
logMsg <- "Need to cap a node"
// Create statics for all victims and choose one to cap
victims := make([]pcp.Victim, 0, 8)
// TODO: Just keep track of the largest to reduce fron nlogn to n
@ -127,9 +129,11 @@ func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, prefix s
if !cappedHosts[victim.Host] {
cappedHosts[victim.Host] = true
orderCapped = append(orderCapped, victim.Host)
log.Printf("Capping Victim %s Avg. Wattage: %f", victim.Host, victim.Watts*pcp.RAPLUnits)
logMType <- elecLogDef.GENERAL
logMsg <- fmt.Sprintf("Capping Victim %s Avg. Wattage: %f", victim.Host, victim.Watts * pcp.RAPLUnits)
if err := rapl.Cap(victim.Host, "rapl", 50); err != nil {
log.Print("Error capping host")
logMType <- elecLogDef.ERROR
logMsg <- "Error capping host"
}
break // Only cap one machine at at time.
}
@ -143,8 +147,11 @@ func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, prefix s
cappedHosts[host] = false
// User RAPL package to send uncap.
log.Printf("Uncapping host %s", host)
logMType <- elecLogDef.GENERAL
logMsg <- fmt.Sprintf("Uncapped host %s", host)
if err := rapl.Cap(host, "rapl", 100); err != nil {
log.Print("Error uncapping host")
logMType <- elecLogDef.ERROR
logMsg <- "Error capping host"
}
}
}
@ -154,7 +161,8 @@ func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, prefix s
}
}(logging, hiThreshold, loThreshold)
log.Println("PCP logging started")
logMType <- elecLogDef.GENERAL
logMsg <- "PCP logging started"
if err := cmd.Start(); err != nil {
log.Fatal(err)
@ -164,7 +172,8 @@ func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, prefix s
select {
case <-quit:
log.Println("Stopping PCP logging in 5 seconds")
logMType <- elecLogDef.GENERAL
logMsg <- "Stopping PCP logging in 5 seconds"
time.Sleep(5 * time.Second)
// http://stackoverflow.com/questions/22470193/why-wont-go-kill-a-child-process-correctly

View file

@ -5,11 +5,12 @@ import (
"bitbucket.org/sunybingcloud/elektron/pcp"
"bitbucket.org/sunybingcloud/elektron/rapl"
"bitbucket.org/sunybingcloud/elektron/utilities"
elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def"
"bufio"
"container/ring"
"fmt"
"log"
"math"
"os"
"os/exec"
"sort"
"strconv"
@ -28,23 +29,18 @@ func getNextCapValue(curCapValue float64, precision int) float64 {
return float64(round(curCapValue*output)) / output
}
func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, prefix string, hiThreshold, loThreshold float64) {
log.Println("Inside Log and Progressive Extrema")
func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, hiThreshold, loThreshold float64,
logMType chan elecLogDef.LogMessageType, logMsg chan string) {
const pcpCommand string = "pmdumptext -m -l -f '' -t 1.0 -d , -c config"
cmd := exec.Command("sh", "-c", pcpCommand)
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
if hiThreshold < loThreshold {
log.Println("High threshold is lower than low threshold!")
logMType <- elecLogDef.GENERAL
logMsg <- "High threshold is lower than low threshold!"
}
logFile, err := os.Create("./" + prefix + ".pcplog")
if err != nil {
log.Fatal(err)
}
defer logFile.Close()
pipe, err := cmd.StdoutPipe()
if err != nil {
log.Fatal(err)
@ -57,8 +53,9 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref
// Get names of the columns.
scanner.Scan()
// Write to logfile.
logFile.WriteString(scanner.Text() + "\n")
// Write to logfile
logMType <- elecLogDef.PCP
logMsg <- scanner.Text()
headers := strings.Split(scanner.Text(), ",")
@ -97,9 +94,11 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref
for scanner.Scan() {
if *logging {
log.Println("Logging PCP...")
logMType <- elecLogDef.GENERAL
logMsg <- "Logging PCP..."
split := strings.Split(scanner.Text(), ",")
logFile.WriteString(scanner.Text() + "\n")
logMType <- elecLogDef.PCP
logMsg <- scanner.Text()
totalPower := 0.0
for _, powerIndex := range powerIndexes {
@ -110,7 +109,9 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref
powerHistories[host].Value = power
powerHistories[host] = powerHistories[host].Next()
log.Printf("Host: %s, Power: %f", indexToHost[powerIndex], (power * pcp.RAPLUnits))
logMType <- elecLogDef.GENERAL
logMsg <- fmt.Sprintf("Host: %s, Power %f",
indexToHost[powerIndex], (power * pcp.RAPLUnits))
totalPower += power
}
@ -121,13 +122,17 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref
clusterMean := pcp.AverageClusterPowerHistory(clusterPowerHist)
log.Printf("Total power: %f, %d Sec Avg: %f", clusterPower, clusterPowerHist.Len(), clusterMean)
logMType <- elecLogDef.GENERAL
logMsg <- fmt.Sprintf("Total power: %f, %d Sec Avg: %f", clusterPower, clusterPowerHist.Len(), clusterMean)
if clusterMean >= hiThreshold {
log.Println("Need to cap a node")
log.Printf("Cap values of capped victims: %v", cappedVictims)
log.Printf("Cap values of victims to uncap: %v", orderCappedVictims)
// Create statics for all victims and choose one to cap.
logMType <- elecLogDef.GENERAL
logMsg <- "Need to cap a node"
logMType <- elecLogDef.GENERAL
logMsg <- fmt.Sprintf("Cap values of capped victims: %v", cappedVictims)
logMType <- elecLogDef.GENERAL
logMsg <- fmt.Sprintf("Cap values of victims to uncap: %v", orderCappedVictims)
// Create statics for all victims and choose one to cap
victims := make([]pcp.Victim, 0, 8)
// TODO: Just keep track of the largest to reduce fron nlogn to n
@ -153,10 +158,12 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref
}
// Need to cap this victim.
if err := rapl.Cap(victims[i].Host, "rapl", 50.0); err != nil {
log.Printf("Error capping host %s", victims[i].Host)
logMType <- elecLogDef.GENERAL
logMsg <- fmt.Sprintf("Error capping host %s", victims[i].Host)
} else {
log.Printf("Capped host[%s] at %f", victims[i].Host, 50.0)
// Keeping track of this victim and it's cap value.
logMType <- elecLogDef.GENERAL
logMsg <- fmt.Sprintf("Capped host[%s] at %f", victims[i].Host, 50.0)
// Keeping track of this victim and it's cap value
cappedVictims[victims[i].Host] = 50.0
newVictimFound = true
// This node can be uncapped and hence adding to orderCapped.
@ -178,11 +185,13 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref
if capValue > constants.LowerCapLimit {
newCapValue := getNextCapValue(capValue, 2)
if err := rapl.Cap(alreadyCappedHosts[i], "rapl", newCapValue); err != nil {
log.Printf("Error capping host[%s]", alreadyCappedHosts[i])
logMType <- elecLogDef.ERROR
logMsg <- fmt.Sprintf("Error capping host[%s]", alreadyCappedHosts[i])
} else {
// Successful cap
log.Printf("Capped host[%s] at %f", alreadyCappedHosts[i], newCapValue)
// Checking whether this victim can be capped further.
logMType <- elecLogDef.GENERAL
logMsg <- fmt.Sprintf("Capped host[%s] at %f", alreadyCappedHosts[i], newCapValue)
// Checking whether this victim can be capped further
if newCapValue <= constants.LowerCapLimit {
// Deleting victim from cappedVictims.
delete(cappedVictims, alreadyCappedHosts[i])
@ -204,14 +213,18 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref
}
}
if !canCapAlreadyCappedVictim {
log.Println("No Victim left to cap.")
logMType <- elecLogDef.GENERAL
logMsg <- "No Victim left to cap."
}
}
} else if clusterMean < loThreshold {
log.Println("Need to uncap a node")
log.Printf("Cap values of capped victims: %v", cappedVictims)
log.Printf("Cap values of victims to uncap: %v", orderCappedVictims)
logMType <- elecLogDef.GENERAL
logMsg <- "Need to uncap a node"
logMType <- elecLogDef.GENERAL
logMsg <- fmt.Sprintf("Cap values of capped victims: %v", cappedVictims)
logMType <- elecLogDef.GENERAL
logMsg <- fmt.Sprintf("Cap values of victims to uncap: %v", orderCappedVictims)
if len(orderCapped) > 0 {
// We pick the host that is capped the most to uncap.
orderCappedToSort := utilities.GetPairList(orderCappedVictims)
@ -221,13 +234,15 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref
// This is a floating point operation and might suffer from precision loss.
newUncapValue := orderCappedVictims[hostToUncap] * 2.0
if err := rapl.Cap(hostToUncap, "rapl", newUncapValue); err != nil {
log.Printf("Error uncapping host[%s]", hostToUncap)
logMType <- elecLogDef.ERROR
logMsg <- fmt.Sprintf("Error uncapping host[%s]", hostToUncap)
} else {
// Successful uncap.
log.Printf("Uncapped host[%s] to %f", hostToUncap, newUncapValue)
// Can we uncap this host further. If not, then we remove its entry from orderCapped.
if newUncapValue >= 100.0 { // Can compare using ==
// Deleting entry from orderCapped.
// Successful uncap
logMType <- elecLogDef.GENERAL
logMsg <- fmt.Sprintf("Uncapped host[%s] to %f", hostToUncap, newUncapValue)
// Can we uncap this host further. If not, then we remove its entry from orderCapped
if newUncapValue >= 100.0 { // can compare using ==
// Deleting entry from orderCapped
for i, victimHost := range orderCapped {
if victimHost == hostToUncap {
orderCapped = append(orderCapped[:i], orderCapped[i+1:]...)
@ -245,7 +260,8 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref
}
}
} else {
log.Println("No host staged for Uncapping")
logMType <- elecLogDef.GENERAL
logMsg <- "No host staged for Uncapped"
}
}
}
@ -254,7 +270,8 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref
}(logging, hiThreshold, loThreshold)
log.Println("PCP logging started")
logMType <- elecLogDef.GENERAL
logMsg <- "PCP logging started"
if err := cmd.Start(); err != nil {
log.Fatal(err)
@ -264,7 +281,8 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref
select {
case <-quit:
log.Println("Stopping PCP logging in 5 seconds")
logMType <- elecLogDef.GENERAL
logMsg <- "Stopping PCP logging in 5 seconds"
time.Sleep(5 * time.Second)
// http://stackoverflow.com/questions/22470193/why-wont-go-kill-a-child-process-correctly

View file

@ -4,6 +4,7 @@ import (
"bitbucket.org/sunybingcloud/elektron/def"
"bitbucket.org/sunybingcloud/elektron/pcp"
"bitbucket.org/sunybingcloud/elektron/schedulers"
elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def"
"flag"
"fmt"
"github.com/golang/protobuf/proto"
@ -12,44 +13,39 @@ import (
"log"
"os"
"os/signal"
"strings"
"time"
)
var master = flag.String("master", "", "Location of leading Mesos master -- <mesos-master>:<port>")
var tasksFile = flag.String("workload", "", "JSON file containing task definitions")
var wattsAsAResource = flag.Bool("wattsAsAResource", false, "Enable Watts as a Resource. "+
"This allows the usage of the Watts attribute (if present) in the workload definition during offer matching.")
var pcplogPrefix = flag.String("logPrefix", "", "Prefix for PCP log file")
var hiThreshold = flag.Float64("hiThreshold", 0.0, "Upperbound for Cluster average historical power consumption, "+
"beyond which extrema/progressive-extrema would start power-capping")
var loThreshold = flag.Float64("loThreshold", 0.0, "Lowerbound for Cluster average historical power consumption, "+
"below which extrema/progressive-extrema would stop power-capping")
var classMapWatts = flag.Bool("classMapWatts", false, "Enable mapping of watts to powerClass of node")
var schedPolicyName = flag.String("schedPolicy", "first-fit", "Name of the scheduling policy to be used (default = first-fit).\n "+
"Use option -listSchedPolicies to get the names of available scheduling policies")
var listSchedPolicies = flag.Bool("listSchedPolicies", false, "Names of the pluaggable scheduling policies.")
var wattsAsAResource = flag.Bool("wattsAsAResource", false, "Enable Watts as a Resource")
var pcplogPrefix = flag.String("logPrefix", "", "Prefix for pcplog")
var hiThreshold = flag.Float64("hiThreshold", 0.0, "Upperbound for when we should start capping")
var loThreshold = flag.Float64("loThreshold", 0.0, "Lowerbound for when we should start uncapping")
var classMapWatts = flag.Bool("classMapWatts", false, "Enable mapping of watts to power class of node")
var schedPolicyName = flag.String("schedPolicy", "first-fit", "Name of the scheduling policy to be used.\n\tUse option -listSchedPolicies to get the names of available scheduling policies")
var listSchedPolicies = flag.Bool("listSchedPolicies", false, "List the names of the pluaggable scheduling policies.")
var enableSchedPolicySwitch = flag.Bool("switchSchedPolicy", false, "Enable switching of scheduling policies at runtime.")
// Short hand args.
// Short hand args
func init() {
flag.StringVar(master, "m", "", "Location of leading Mesos master (shorthand)")
flag.StringVar(tasksFile, "w", "", "JSON file containing task definitions (shorthand)")
flag.BoolVar(wattsAsAResource, "waar", false, "Enable Watts as a Resource. "+
"This allows the usage of the Watts attribute (if present) in the workload definition during offer matching. (shorthand)")
flag.StringVar(pcplogPrefix, "p", "", "Prefix for PCP log file (shorthand)")
flag.Float64Var(hiThreshold, "ht", 700.0, "Upperbound for Cluster average historical power consumption, "+
"beyond which extrema/progressive-extrema would start power-capping (shorthand)")
flag.Float64Var(loThreshold, "lt", 400.0, "Lowerbound for Cluster average historical power consumption, "+
"below which extrema/progressive-extrema would stop power-capping (shorthand)")
flag.BoolVar(classMapWatts, "cmw", false, "Enable mapping of watts to powerClass of node (shorthand)")
flag.StringVar(schedPolicyName, "sp", "first-fit", "Name of the scheduling policy to be used (default = first-fit).\n "+
"Use option -listSchedPolicies to get the names of available scheduling policies (shorthand)")
flag.BoolVar(wattsAsAResource, "waar", false, "Enable Watts as a Resource (shorthand)")
flag.StringVar(pcplogPrefix, "p", "", "Prefix for pcplog (shorthand)")
flag.Float64Var(hiThreshold, "ht", 700.0, "Upperbound for when we should start capping (shorthand)")
flag.Float64Var(loThreshold, "lt", 400.0, "Lowerbound for when we should start uncapping (shorthand)")
flag.BoolVar(classMapWatts, "cmw", false, "Enable mapping of watts to power class of node (shorthand)")
flag.StringVar(schedPolicyName, "sp", "first-fit", "Name of the scheduling policy to be used.\n Use option -listSchedPolicies to get the names of available scheduling policies (shorthand)")
flag.BoolVar(listSchedPolicies, "lsp", false, "Names of the pluaggable scheduling policies. (shorthand)")
flag.BoolVar(enableSchedPolicySwitch, "ssp", false, "Enable switching of scheduling policies at runtime.")
}
func listAllSchedulingPolicies() {
fmt.Println("Scheduling Policies")
fmt.Println("-------------------")
for policyName, _ := range schedulers.Schedulers {
for policyName, _ := range schedulers.SchedPolicies {
fmt.Println(policyName)
}
}
@ -57,17 +53,32 @@ func listAllSchedulingPolicies() {
func main() {
flag.Parse()
// Checking to see if we need to just list the pluggable scheduling policies.
// checking to see if we need to just list the pluggable scheduling policies
if *listSchedPolicies {
listAllSchedulingPolicies()
os.Exit(1)
}
startTime := time.Now()
formattedStartTime := startTime.Format("20060102150405")
// Checking if prefix contains any special characters
if strings.Contains(*pcplogPrefix, "/") {
log.Fatal("log file prefix should not contain '/'.")
}
logPrefix := *pcplogPrefix + "_" + formattedStartTime
// creating logger and attaching different logging platforms
logger := elecLogDef.BuildLogger(startTime, logPrefix)
// logging channels
logMType := make(chan elecLogDef.LogMessageType)
logMsg := make(chan string)
go logger.Listen(logMType, logMsg)
// If non-default scheduling policy given,
// checking if scheduling policyName exists.
// checking if scheduling policyName exists
if *schedPolicyName != "first-fit" {
if _, ok := schedulers.Schedulers[*schedPolicyName]; !ok {
// Invalid scheduling policy.
if _, ok := schedulers.SchedPolicies[*schedPolicyName]; !ok {
// invalid scheduling policy
log.Println("Invalid scheduling policy given. The possible scheduling policies are:")
listAllSchedulingPolicies()
os.Exit(1)
@ -75,41 +86,45 @@ func main() {
}
if *tasksFile == "" {
fmt.Println("No file containing tasks specifiction provided.")
//fmt.Println("No file containing tasks specifiction provided.")
logger.WriteLog(elecLogDef.ERROR, "No file containing tasks specification provided")
os.Exit(1)
}
if *hiThreshold < *loThreshold {
fmt.Println("High threshold is of a lower value than low threshold.")
//fmt.Println("High threshold is of a lower value than low threshold.")
logger.WriteLog(elecLogDef.ERROR, "High threshold is of a lower value than low threshold")
os.Exit(1)
}
tasks, err := def.TasksFromJSON(*tasksFile)
if err != nil || len(tasks) == 0 {
fmt.Println("Invalid tasks specification file provided")
//fmt.Println("Invalid tasks specification file provided")
logger.WriteLog(elecLogDef.ERROR, "Invalid tasks specification file provided")
os.Exit(1)
}
log.Println("Scheduling the following tasks:")
//log.Println("Scheduling the following tasks:")
logger.WriteLog(elecLogDef.GENERAL, "Scheduling the following tasks:")
for _, task := range tasks {
fmt.Println(task)
}
startTime := time.Now().Format("20060102150405")
logPrefix := *pcplogPrefix + "_" + startTime
shutdown := make(chan struct{})
done := make(chan struct{})
pcpLog := make(chan struct{})
recordPCP := false
scheduler := schedulers.SchedFactory(*schedPolicyName,
scheduler := schedulers.SchedFactory(
schedulers.WithSchedPolicy(*schedPolicyName),
schedulers.WithTasks(tasks),
schedulers.WithWattsAsAResource(*wattsAsAResource),
schedulers.WithClassMapWatts(*classMapWatts),
schedulers.WithSchedTracePrefix(logPrefix),
schedulers.WithRecordPCP(&recordPCP),
schedulers.WithShutdown(shutdown),
schedulers.WithDone(done),
schedulers.WithPCPLog(pcpLog))
schedulers.WithPCPLog(pcpLog),
schedulers.WithLoggingChannels(logMType, logMsg),
schedulers.WithSchedPolSwitchEnabled(*enableSchedPolicySwitch))
driver, err := sched.NewMesosSchedulerDriver(sched.DriverConfig{
Master: *master,
Framework: &mesos.FrameworkInfo{
@ -123,10 +138,10 @@ func main() {
return
}
go pcp.Start(pcpLog, &recordPCP, logPrefix)
//go pcp.StartPCPLogAndExtremaDynamicCap(pcpLog, &recordPCP, logPrefix, *hiThreshold, *loThreshold)
//go pcp.StartPCPLogAndProgressiveExtremaCap(pcpLog, &recordPCP, logPrefix, *hiThreshold, *loThreshold)
time.Sleep(1 * time.Second) // Take a second between starting PCP log and continuing.
go pcp.Start(pcpLog, &recordPCP, logMType, logMsg)
//go pcp.StartPCPLogAndExtremaDynamicCap(pcpLog, &recordPCP, *hiThreshold, *loThreshold, logMType, logMsg)
//go pcp.StartPCPLogAndProgressiveExtremaCap(pcpLog, &recordPCP, *hiThreshold, *loThreshold, logMType, logMsg)
time.Sleep(1 * time.Second) // Take a second between starting PCP log and continuing
// Attempt to handle SIGINT to not leave pmdumptext running.
// Catch interrupt.
@ -156,6 +171,8 @@ func main() {
case <-done:
close(pcpLog)
time.Sleep(5 * time.Second) //Wait for PCP to log a few more seconds
close(logMType)
close(logMsg)
//case <-time.After(shutdownTimeout):
}

186
schedulers/MaxGreedyMins.go Normal file
View file

@ -0,0 +1,186 @@
package schedulers
import (
"bitbucket.org/sunybingcloud/elektron/def"
"bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/elektron/utilities/offerUtils"
"fmt"
mesos "github.com/mesos/mesos-go/mesosproto"
sched "github.com/mesos/mesos-go/scheduler"
"log"
"math/rand"
)
// Decides if to take an offer or not
func (s *MaxGreedyMins) takeOffer(spc SchedPolicyContext, offer *mesos.Offer, task def.Task,
totalCPU, totalRAM, totalWatts float64) bool {
baseSchedRef := spc.(*baseScheduler)
cpus, mem, watts := offerUtils.OfferAgg(offer)
//TODO: Insert watts calculation here instead of taking them as a parameter
wattsConsideration, err := def.WattsToConsider(task, baseSchedRef.classMapWatts, offer)
if err != nil {
// Error in determining wattsConsideration
log.Fatal(err)
}
if (cpus >= (totalCPU + task.CPU)) && (mem >= (totalRAM + task.RAM)) &&
(!baseSchedRef.wattsAsAResource || (watts >= (totalWatts + wattsConsideration))) {
return true
}
return false
}
type MaxGreedyMins struct {
SchedPolicyState
}
// Determine if the remaining space inside of the offer is enough for this
// the task we need to create. If it is, create a TaskInfo and return it.
func (s *MaxGreedyMins) CheckFit(
spc SchedPolicyContext,
i int,
task def.Task,
wattsConsideration float64,
offer *mesos.Offer,
totalCPU *float64,
totalRAM *float64,
totalWatts *float64) (bool, *mesos.TaskInfo) {
baseSchedRef := spc.(*baseScheduler)
// Does the task fit
if s.takeOffer(spc, offer, task, *totalCPU, *totalRAM, *totalWatts) {
*totalWatts += wattsConsideration
*totalCPU += task.CPU
*totalRAM += task.RAM
baseSchedRef.LogCoLocatedTasks(offer.GetSlaveId().GoString())
taskToSchedule := baseSchedRef.newTask(offer, task)
baseSchedRef.LogSchedTrace(taskToSchedule, offer)
*task.Instances--
if *task.Instances <= 0 {
// All instances of task have been scheduled, remove it
baseSchedRef.tasks = append(baseSchedRef.tasks[:i], baseSchedRef.tasks[i+1:]...)
if len(baseSchedRef.tasks) <= 0 {
baseSchedRef.LogTerminateScheduler()
close(baseSchedRef.Shutdown)
}
}
return true, taskToSchedule
}
return false, nil
}
func (s *MaxGreedyMins) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDriver, offers []*mesos.Offer) {
fmt.Println("Max-GreedyMins scheduling...")
baseSchedRef := spc.(*baseScheduler)
def.SortTasks(baseSchedRef.tasks, def.SortByWatts)
baseSchedRef.LogOffersReceived(offers)
for _, offer := range offers {
offerUtils.UpdateEnvironment(offer)
select {
case <-baseSchedRef.Shutdown:
baseSchedRef.LogNoPendingTasksDeclineOffers(offer)
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
baseSchedRef.LogNumberOfRunningTasks()
continue
default:
}
tasks := []*mesos.TaskInfo{}
offerTaken := false
totalWatts := 0.0
totalCPU := 0.0
totalRAM := 0.0
// Assumes s.tasks is ordered in non-decreasing median max peak order
// Attempt to schedule a single instance of the heaviest workload available first
// Start from the back until one fits
for i := len(baseSchedRef.tasks) - 1; i >= 0; i-- {
task := baseSchedRef.tasks[i]
wattsConsideration, err := def.WattsToConsider(task, baseSchedRef.classMapWatts, offer)
if err != nil {
// Error in determining wattsConsideration
log.Fatal(err)
}
// Don't take offer if it doesn't match our task's host requirement
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
// TODO: Fix this so index doesn't need to be passed
taken, taskToSchedule := s.CheckFit(spc, i, task, wattsConsideration, offer,
&totalCPU, &totalRAM, &totalWatts)
if taken {
offerTaken = true
tasks = append(tasks, taskToSchedule)
break
}
}
// Pack the rest of the offer with the smallest tasks
for i := 0; i < len(baseSchedRef.tasks); i++ {
task := baseSchedRef.tasks[i]
wattsConsideration, err := def.WattsToConsider(task, baseSchedRef.classMapWatts, offer)
if err != nil {
// Error in determining wattsConsideration
log.Fatal(err)
}
// Don't take offer if it doesn't match our task's host requirement
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
for *task.Instances > 0 {
// TODO: Fix this so index doesn't need to be passed
taken, taskToSchedule := s.CheckFit(spc, i, task, wattsConsideration, offer,
&totalCPU, &totalRAM, &totalWatts)
if taken {
offerTaken = true
tasks = append(tasks, taskToSchedule)
} else {
break // Continue on to next task
}
}
}
if offerTaken {
baseSchedRef.LogTaskStarting(nil, offer)
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
} else {
// If there was no match for the task
cpus, mem, watts := offerUtils.OfferAgg(offer)
baseSchedRef.LogInsufficientResourcesDeclineOffer(offer, cpus, mem, watts)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
// Switch scheduling policy only if feature enabled from CLI
if baseSchedRef.schedPolSwitchEnabled {
// Switching to a random scheduling policy.
// TODO: Switch based on some criteria.
index := rand.Intn(len(SchedPolicies))
for _, v := range SchedPolicies {
if index == 0 {
spc.SwitchSchedPol(v)
break
}
index--
}
}
}

180
schedulers/MaxMin.go Normal file
View file

@ -0,0 +1,180 @@
package schedulers
import (
"bitbucket.org/sunybingcloud/elektron/def"
"bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/elektron/utilities/offerUtils"
"fmt"
mesos "github.com/mesos/mesos-go/mesosproto"
sched "github.com/mesos/mesos-go/scheduler"
"log"
"math/rand"
)
// Decides if to take an offer or not
func (s *MaxMin) takeOffer(spc SchedPolicyContext, offer *mesos.Offer, task def.Task,
totalCPU, totalRAM, totalWatts float64) bool {
baseSchedRef := spc.(*baseScheduler)
cpus, mem, watts := offerUtils.OfferAgg(offer)
//TODO: Insert watts calculation here instead of taking them as a parameter
wattsConsideration, err := def.WattsToConsider(task, baseSchedRef.classMapWatts, offer)
if err != nil {
// Error in determining wattsConsideration
log.Fatal(err)
}
if (cpus >= (totalCPU + task.CPU)) && (mem >= (totalRAM + task.RAM)) &&
(!baseSchedRef.wattsAsAResource || (watts >= (totalWatts + wattsConsideration))) {
return true
}
return false
}
type MaxMin struct {
SchedPolicyState
}
// Determine if the remaining space inside of the offer is enough for this
// task that we need to create. If it is, create a TaskInfo and return it.
func (s *MaxMin) CheckFit(
spc SchedPolicyContext,
i int,
task def.Task,
wattsConsideration float64,
offer *mesos.Offer,
totalCPU *float64,
totalRAM *float64,
totalWatts *float64) (bool, *mesos.TaskInfo) {
baseSchedRef := spc.(*baseScheduler)
// Does the task fit.
if s.takeOffer(spc, offer, task, *totalCPU, *totalRAM, *totalWatts) {
*totalWatts += wattsConsideration
*totalCPU += task.CPU
*totalRAM += task.RAM
baseSchedRef.LogCoLocatedTasks(offer.GetSlaveId().GoString())
taskToSchedule := baseSchedRef.newTask(offer, task)
baseSchedRef.LogSchedTrace(taskToSchedule, offer)
*task.Instances--
if *task.Instances <= 0 {
// All instances of task have been scheduled, remove it.
baseSchedRef.tasks = append(baseSchedRef.tasks[:i], baseSchedRef.tasks[i+1:]...)
if len(baseSchedRef.tasks) <= 0 {
baseSchedRef.LogTerminateScheduler()
close(baseSchedRef.Shutdown)
}
}
return true, taskToSchedule
}
return false, nil
}
func (s *MaxMin) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDriver, offers []*mesos.Offer) {
fmt.Println("Max-Min scheduling...")
baseSchedRef := spc.(*baseScheduler)
def.SortTasks(baseSchedRef.tasks, def.SortByWatts)
baseSchedRef.LogOffersReceived(offers)
for _, offer := range offers {
offerUtils.UpdateEnvironment(offer)
select {
case <-baseSchedRef.Shutdown:
baseSchedRef.LogNoPendingTasksDeclineOffers(offer)
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
baseSchedRef.LogNumberOfRunningTasks()
continue
default:
}
tasks := []*mesos.TaskInfo{}
offerTaken := false
totalWatts := 0.0
totalCPU := 0.0
totalRAM := 0.0
// Assumes s.tasks is ordered in non-decreasing median max-peak order
// Attempt to schedule a single instance of the heaviest workload available first.
// Start from the back until one fits.
direction := false // True = Min Max, False = Max Min
var index int
start := true // If false then index has changed and need to keep it that way
for i := 0; i < len(baseSchedRef.tasks); i++ {
// We need to pick a min task or a max task
// depending on the value of direction.
if direction && start {
index = 0
} else if start {
index = len(baseSchedRef.tasks) - i - 1
}
task := baseSchedRef.tasks[index]
wattsConsideration, err := def.WattsToConsider(task, baseSchedRef.classMapWatts, offer)
if err != nil {
// Error in determining wattsConsideration.
log.Fatal(err)
}
// Don't take offer if it doesn't match our task's host requirement.
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
// TODO: Fix this so index doesn't need to be passed.
taken, taskToSchedule := s.CheckFit(spc, index, task, wattsConsideration, offer,
&totalCPU, &totalRAM, &totalWatts)
if taken {
offerTaken = true
tasks = append(tasks, taskToSchedule)
// Need to change direction and set start to true.
// Setting start to true would ensure that index be set accurately again.
direction = !direction
start = true
i--
} else {
// Need to move index depending on the value of direction.
if direction {
index++
start = false
} else {
index--
start = false
}
}
}
if offerTaken {
baseSchedRef.LogTaskStarting(nil, offer)
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
} else {
// If there was no match for the task
cpus, mem, watts := offerUtils.OfferAgg(offer)
baseSchedRef.LogInsufficientResourcesDeclineOffer(offer, cpus, mem, watts)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
// Switch scheduling policy only if feature enabled from CLI
if baseSchedRef.schedPolSwitchEnabled {
// Switching to a random scheduling policy.
// TODO: Switch based on some criteria.
index := rand.Intn(len(SchedPolicies))
for _, v := range SchedPolicies {
if index == 0 {
spc.SwitchSchedPol(v)
break
}
index--
}
}
}

View file

@ -2,19 +2,24 @@ package schedulers
import (
"bitbucket.org/sunybingcloud/elektron/def"
elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def"
"bytes"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler"
"log"
"sync"
"time"
)
// Implements mesos scheduler.
type ElectronScheduler interface {
sched.Scheduler
init(opts ...schedPolicyOption)
}
type base struct {
type baseScheduler struct {
ElectronScheduler
SchedPolicyContext
// Current scheduling policy used for resource offer consumption.
curSchedPolicy SchedPolicyState
tasksCreated int
tasksRunning int
tasks []def.Task
@ -24,7 +29,7 @@ type base struct {
classMapWatts bool
// First set of PCP values are garbage values, signal to logger to start recording when we're
// about to schedule a new task.
// about to schedule a new task
RecordPCP *bool
// This channel is closed when the program receives an interrupt,
@ -38,55 +43,309 @@ type base struct {
PCPLog chan struct{}
schedTrace *log.Logger
// Send the type of the message to be logged
logMsgType chan elecLogDef.LogMessageType
// Send the message to be logged
logMsg chan string
mutex sync.Mutex
// Whether switching of scheduling policies at runtime has been enabled
schedPolSwitchEnabled bool
}
func (s *base) init(opts ...schedPolicyOption) {
func (s *baseScheduler) init(opts ...schedPolicyOption) {
for _, opt := range opts {
// Applying options.
// applying options
if err := opt(s); err != nil {
log.Fatal(err)
}
}
s.running = make(map[string]map[string]bool)
s.mutex = sync.Mutex{}
}
func (s *base) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) {
log.Printf("Offer %s rescinded", offerID)
func (s *baseScheduler) SwitchSchedPol(newSchedPol SchedPolicyState) {
s.curSchedPolicy = newSchedPol
}
func (s *base) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) {
log.Printf("Slave %s lost", slaveID)
func (s *baseScheduler) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
s.tasksCreated++
if !*s.RecordPCP {
// Turn on elecLogDef
*s.RecordPCP = true
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
}
// If this is our first time running into this Agent
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
}
// Add task to list of tasks running on node
s.running[offer.GetSlaveId().GoString()][taskName] = true
resources := []*mesos.Resource{
mesosutil.NewScalarResource("cpus", task.CPU),
mesosutil.NewScalarResource("mem", task.RAM),
}
if s.wattsAsAResource {
if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil {
s.LogTaskWattsConsideration(task, *offer.Hostname, wattsToConsider)
resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider))
} else {
// Error in determining wattsConsideration
s.LogElectronError(err)
}
}
return &mesos.TaskInfo{
Name: proto.String(taskName),
TaskId: &mesos.TaskID{
Value: proto.String("electron-" + taskName),
},
SlaveId: offer.SlaveId,
Resources: resources,
Command: &mesos.CommandInfo{
Value: proto.String(task.CMD),
},
Container: &mesos.ContainerInfo{
Type: mesos.ContainerInfo_DOCKER.Enum(),
Docker: &mesos.ContainerInfo_DockerInfo{
Image: proto.String(task.Image),
Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated
},
},
}
}
func (s *base) ExecutorLost(_ sched.SchedulerDriver,
executorID *mesos.ExecutorID,
func (s *baseScheduler) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) {
s.LogOfferRescinded(offerID)
}
func (s *baseScheduler) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) {
s.LogSlaveLost(slaveID)
}
func (s *baseScheduler) ExecutorLost(_ sched.SchedulerDriver, executorID *mesos.ExecutorID,
slaveID *mesos.SlaveID, status int) {
log.Printf("Executor %s on slave %s was lost", executorID, slaveID)
s.LogExecutorLost(executorID, slaveID)
}
func (s *base) Error(_ sched.SchedulerDriver, err string) {
log.Printf("Receiving an error: %s", err)
func (s *baseScheduler) Error(_ sched.SchedulerDriver, err string) {
s.LogMesosError(err)
}
func (s *base) FrameworkMessage(
func (s *baseScheduler) FrameworkMessage(
driver sched.SchedulerDriver,
executorID *mesos.ExecutorID,
slaveID *mesos.SlaveID,
message string) {
log.Println("Getting a framework message: ", message)
log.Printf("Received a framework message from some unknown source: %s", *executorID.Value)
s.LogFrameworkMessage(executorID, slaveID, message)
}
func (s *base) Registered(
func (s *baseScheduler) Registered(
_ sched.SchedulerDriver,
frameworkID *mesos.FrameworkID,
masterInfo *mesos.MasterInfo) {
log.Printf("Framework %s registered with master %s", frameworkID, masterInfo)
s.LogFrameworkRegistered(frameworkID, masterInfo)
}
func (s *base) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) {
log.Printf("Framework re-registered with master %s", masterInfo)
func (s *baseScheduler) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) {
s.LogFrameworkReregistered(masterInfo)
}
func (s *base) Disconnected(sched.SchedulerDriver) {
log.Println("Framework disconnected with master")
func (s *baseScheduler) Disconnected(sched.SchedulerDriver) {
s.LogDisconnected()
}
func (s *baseScheduler) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {
s.curSchedPolicy.ConsumeOffers(s, driver, offers)
}
func (s *baseScheduler) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) {
s.LogTaskStatusUpdate(status)
if *status.State == mesos.TaskState_TASK_RUNNING {
s.tasksRunning++
} else if IsTerminal(status.State) {
delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value)
s.tasksRunning--
if s.tasksRunning == 0 {
select {
case <-s.Shutdown:
close(s.Done)
default:
}
}
}
}
func (s *baseScheduler) Log(lmt elecLogDef.LogMessageType, msg string) {
s.mutex.Lock()
s.logMsgType <- lmt
s.logMsg <- msg
s.mutex.Unlock()
}
func (s *baseScheduler) LogTaskStarting(ts *def.Task, offer *mesos.Offer) {
lmt := elecLogDef.GENERAL
msgColor := elecLogDef.LogMessageColors[lmt]
var msg string
if ts == nil {
msg = msgColor.Sprintf("TASKS STARTING... host = [%s]", offer.GetHostname())
} else {
msg = msgColor.Sprintf("TASK STARTING... task = [%s], Instance = %d, host = [%s]",
ts.Name, *ts.Instances, offer.GetHostname())
}
s.Log(lmt, msg)
}
func (s *baseScheduler) LogTaskWattsConsideration(ts def.Task, host string, wattsToConsider float64) {
lmt := elecLogDef.GENERAL
msgColor := elecLogDef.LogMessageColors[lmt]
msg := msgColor.Sprintf("Watts considered for task[%s] and host[%s] = %f Watts",
ts.Name, host, wattsToConsider)
s.Log(lmt, msg)
}
func (s *baseScheduler) LogOffersReceived(offers []*mesos.Offer) {
lmt := elecLogDef.GENERAL
msgColor := elecLogDef.LogMessageColors[lmt]
msg := msgColor.Sprintf("Received %d resource offers", len(offers))
s.Log(lmt, msg)
}
func (s *baseScheduler) LogNoPendingTasksDeclineOffers(offer *mesos.Offer) {
lmt := elecLogDef.WARNING
msgColor := elecLogDef.LogMessageColors[lmt]
msg := msgColor.Sprintf("DECLINING OFFER for host[%s]... "+
"No tasks left to schedule", offer.GetHostname())
s.Log(lmt, msg)
}
func (s *baseScheduler) LogNumberOfRunningTasks() {
lmt := elecLogDef.GENERAL
msgColor := elecLogDef.LogMessageColors[lmt]
msg := msgColor.Sprintf("Number of tasks still running = %d", s.tasksRunning)
s.Log(lmt, msg)
}
func (s *baseScheduler) LogCoLocatedTasks(slaveID string) {
lmt := elecLogDef.GENERAL
msgColor := elecLogDef.LogMessageColors[lmt]
buffer := bytes.Buffer{}
buffer.WriteString(fmt.Sprintln("Colocated with:"))
for taskName := range s.running[slaveID] {
buffer.WriteString(fmt.Sprintln(taskName))
}
msg := msgColor.Sprintf(buffer.String())
s.Log(lmt, msg)
}
func (s *baseScheduler) LogSchedTrace(taskToSchedule *mesos.TaskInfo, offer *mesos.Offer) {
msg := fmt.Sprint(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
s.Log(elecLogDef.SCHED_TRACE, msg)
}
func (s *baseScheduler) LogTerminateScheduler() {
lmt := elecLogDef.GENERAL
msgColor := elecLogDef.LogMessageColors[lmt]
msg := msgColor.Sprint("Done scheduling all tasks!")
s.Log(lmt, msg)
}
func (s *baseScheduler) LogInsufficientResourcesDeclineOffer(offer *mesos.Offer,
offerResources ...interface{}) {
lmt := elecLogDef.WARNING
msgColor := elecLogDef.LogMessageColors[lmt]
buffer := bytes.Buffer{}
buffer.WriteString(fmt.Sprintln("DECLINING OFFER... Offer has insufficient resources to launch a task"))
buffer.WriteString(fmt.Sprintf("Offer Resources <CPU: %f, RAM: %f, Watts: %f>", offerResources...))
msg := msgColor.Sprint(buffer.String())
s.Log(lmt, msg)
}
func (s *baseScheduler) LogOfferRescinded(offerID *mesos.OfferID) {
lmt := elecLogDef.ERROR
msgColor := elecLogDef.LogMessageColors[lmt]
msg := msgColor.Sprintf("OFFER RESCINDED: OfferID = %s", offerID)
s.Log(lmt, msg)
}
func (s *baseScheduler) LogSlaveLost(slaveID *mesos.SlaveID) {
lmt := elecLogDef.ERROR
msgColor := elecLogDef.LogMessageColors[lmt]
msg := msgColor.Sprintf("SLAVE LOST: SlaveID = %s", slaveID)
s.Log(lmt, msg)
}
func (s *baseScheduler) LogExecutorLost(executorID *mesos.ExecutorID, slaveID *mesos.SlaveID) {
lmt := elecLogDef.ERROR
msgColor := elecLogDef.LogMessageColors[lmt]
msg := msgColor.Sprintf("EXECUTOR LOST: ExecutorID = %s, SlaveID = %s", executorID, slaveID)
s.Log(lmt, msg)
}
func (s *baseScheduler) LogFrameworkMessage(executorID *mesos.ExecutorID,
slaveID *mesos.SlaveID, message string) {
lmt := elecLogDef.GENERAL
msgColor := elecLogDef.LogMessageColors[lmt]
msg := msgColor.Sprintf("Received Framework message from executor [%s]: %s", executorID, message)
s.Log(lmt, msg)
}
func (s *baseScheduler) LogMesosError(err string) {
lmt := elecLogDef.ERROR
msgColor := elecLogDef.LogMessageColors[lmt]
msg := msgColor.Sprintf("MESOS ERROR: %s", err)
s.Log(lmt, msg)
}
func (s *baseScheduler) LogElectronError(err error) {
lmt := elecLogDef.ERROR
msgColor := elecLogDef.LogMessageColors[lmt]
msg := msgColor.Sprintf("ELECTRON ERROR: %v", err)
s.Log(lmt, msg)
}
func (s *baseScheduler) LogFrameworkRegistered(frameworkID *mesos.FrameworkID,
masterInfo *mesos.MasterInfo) {
lmt := elecLogDef.SUCCESS
msgColor := elecLogDef.LogMessageColors[lmt]
msg := msgColor.Sprintf("FRAMEWORK REGISTERED! frameworkID = %s, master = %s",
frameworkID, masterInfo)
s.Log(lmt, msg)
}
func (s *baseScheduler) LogFrameworkReregistered(masterInfo *mesos.MasterInfo) {
lmt := elecLogDef.GENERAL
msgColor := elecLogDef.LogMessageColors[lmt]
msg := msgColor.Sprintf("Framework re-registered with master %s", masterInfo)
s.Log(lmt, msg)
}
func (s *baseScheduler) LogDisconnected() {
lmt := elecLogDef.WARNING
msgColor := elecLogDef.LogMessageColors[lmt]
msg := msgColor.Sprint("Framework disconnected with master")
s.Log(lmt, msg)
}
func (s *baseScheduler) LogTaskStatusUpdate(status *mesos.TaskStatus) {
var lmt elecLogDef.LogMessageType
switch *status.State {
case mesos.TaskState_TASK_ERROR, mesos.TaskState_TASK_FAILED,
mesos.TaskState_TASK_KILLED, mesos.TaskState_TASK_LOST:
lmt = elecLogDef.ERROR
case mesos.TaskState_TASK_FINISHED:
lmt = elecLogDef.SUCCESS
default:
lmt = elecLogDef.GENERAL
}
msgColor := elecLogDef.LogMessageColors[lmt]
msg := elecLogDef.LogMessageColors[elecLogDef.GENERAL].Sprintf("Task Status received for task [%s] --> %s",
*status.TaskId.Value, msgColor.Sprint(NameFor(status.State)))
s.Log(lmt, msg)
}

View file

@ -5,108 +5,49 @@ import (
"bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/elektron/utilities/offerUtils"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler"
"log"
"time"
"math/rand"
)
// Decides if to take an offer or not.
func (s *BinPacking) takeOffer(offer *mesos.Offer, task def.Task, totalCPU, totalRAM, totalWatts float64) bool {
// Decides if to take an offer or not
func (s *BinPackSortedWatts) takeOffer(spc SchedPolicyContext, offer *mesos.Offer, task def.Task, totalCPU, totalRAM, totalWatts float64) bool {
baseSchedRef := spc.(*baseScheduler)
cpus, mem, watts := offerUtils.OfferAgg(offer)
//TODO: Insert watts calculation here instead of taking them as a parameter
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
wattsConsideration, err := def.WattsToConsider(task, baseSchedRef.classMapWatts, offer)
if err != nil {
// Error in determining wattsConsideration.
log.Fatal(err)
}
if (cpus >= (totalCPU + task.CPU)) && (mem >= (totalRAM + task.RAM)) &&
(!s.wattsAsAResource || (watts >= (totalWatts + wattsConsideration))) {
(!baseSchedRef.wattsAsAResource || (watts >= (totalWatts + wattsConsideration))) {
return true
}
return false
}
type BinPacking struct {
base // Type embedded to inherit common functions.
type BinPackSortedWatts struct {
SchedPolicyState
}
// Initialization.
func (s *BinPacking) init(opts ...schedPolicyOption) {
s.base.init(opts...)
// Sorting the tasks based on watts.
def.SortTasks(s.tasks, def.SortByWatts)
}
func (s *BinPacking) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
s.tasksCreated++
if !*s.RecordPCP {
// Turn on logging.
*s.RecordPCP = true
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
}
// If this is our first time running into this Agent.
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
}
// Add task to list of tasks running on node.
s.running[offer.GetSlaveId().GoString()][taskName] = true
resources := []*mesos.Resource{
mesosutil.NewScalarResource("cpus", task.CPU),
mesosutil.NewScalarResource("mem", task.RAM),
}
if s.wattsAsAResource {
if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil {
log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider)
resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider))
} else {
// Error in determining wattsToConsider.
log.Fatal(err)
}
}
return &mesos.TaskInfo{
Name: proto.String(taskName),
TaskId: &mesos.TaskID{
Value: proto.String("elektron-" + taskName),
},
SlaveId: offer.SlaveId,
Resources: resources,
Command: &mesos.CommandInfo{
Value: proto.String(task.CMD),
},
Container: &mesos.ContainerInfo{
Type: mesos.ContainerInfo_DOCKER.Enum(),
Docker: &mesos.ContainerInfo_DockerInfo{
Image: proto.String(task.Image),
Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated
},
},
}
}
func (s *BinPacking) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {
log.Printf("Received %d resource offers", len(offers))
func (s *BinPackSortedWatts) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDriver, offers []*mesos.Offer) {
fmt.Println("BPSW scheduling...")
baseSchedRef := spc.(*baseScheduler)
def.SortTasks(baseSchedRef.tasks, def.SortByWatts)
baseSchedRef.LogOffersReceived(offers)
for _, offer := range offers {
offerUtils.UpdateEnvironment(offer)
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
case <-baseSchedRef.Shutdown:
baseSchedRef.LogNoPendingTasksDeclineOffers(offer)
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
baseSchedRef.LogNumberOfRunningTasks()
continue
default:
}
@ -117,9 +58,9 @@ func (s *BinPacking) ResourceOffers(driver sched.SchedulerDriver, offers []*meso
totalWatts := 0.0
totalCPU := 0.0
totalRAM := 0.0
for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i]
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
for i := 0; i < len(baseSchedRef.tasks); i++ {
task := baseSchedRef.tasks[i]
wattsConsideration, err := def.WattsToConsider(task, baseSchedRef.classMapWatts, offer)
if err != nil {
// Error in determining wattsConsideration.
log.Fatal(err)
@ -131,29 +72,28 @@ func (s *BinPacking) ResourceOffers(driver sched.SchedulerDriver, offers []*meso
}
for *task.Instances > 0 {
// Does the task fit.
if s.takeOffer(offer, task, totalCPU, totalRAM, totalWatts) {
// Does the task fit
if s.takeOffer(spc, offer, task, totalCPU, totalRAM, totalWatts) {
offerTaken = true
totalWatts += wattsConsideration
totalCPU += task.CPU
totalRAM += task.RAM
log.Println("Co-Located with: ")
coLocated(s.running[offer.GetSlaveId().GoString()])
taskToSchedule := s.newTask(offer, task)
baseSchedRef.LogCoLocatedTasks(offer.GetSlaveId().GoString())
taskToSchedule := baseSchedRef.newTask(offer, task)
tasks = append(tasks, taskToSchedule)
fmt.Println("Inst: ", *task.Instances)
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
baseSchedRef.LogSchedTrace(taskToSchedule, offer)
*task.Instances--
if *task.Instances <= 0 {
// All instances of task have been scheduled, remove it.
s.tasks = append(s.tasks[:i], s.tasks[i+1:]...)
// All instances of task have been scheduled, remove it
baseSchedRef.tasks = append(baseSchedRef.tasks[:i],
baseSchedRef.tasks[i+1:]...)
if len(s.tasks) <= 0 {
log.Println("Done scheduling all tasks")
close(s.Shutdown)
if len(baseSchedRef.tasks) <= 0 {
baseSchedRef.LogTerminateScheduler()
close(baseSchedRef.Shutdown)
}
}
} else {
@ -163,35 +103,28 @@ func (s *BinPacking) ResourceOffers(driver sched.SchedulerDriver, offers []*meso
}
if offerTaken {
log.Printf("Starting on [%s]\n", offer.GetHostname())
baseSchedRef.LogTaskStarting(nil, offer)
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
} else {
// If there was no match for the task.
fmt.Println("There is not enough resources to launch a task:")
// If there was no match for the task
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
baseSchedRef.LogInsufficientResourcesDeclineOffer(offer, cpus, mem, watts)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
}
func (s *BinPacking) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) {
log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
if *status.State == mesos.TaskState_TASK_RUNNING {
s.tasksRunning++
} else if IsTerminal(status.State) {
delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value)
s.tasksRunning--
if s.tasksRunning == 0 {
select {
case <-s.Shutdown:
close(s.Done)
default:
// Switch scheduling policy only if feature enabled from CLI
if baseSchedRef.schedPolSwitchEnabled {
// Switching to a random scheduling policy.
// TODO: Switch based on some criteria.
index := rand.Intn(len(SchedPolicies))
for _, v := range SchedPolicies {
if index == 0 {
spc.SwitchSchedPol(v)
break
}
index--
}
}
log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
}

View file

@ -0,0 +1,73 @@
package schedulers
import (
"bitbucket.org/sunybingcloud/elektron/def"
elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def"
mesos "github.com/mesos/mesos-go/mesosproto"
sched "github.com/mesos/mesos-go/scheduler"
)
// Implements mesos scheduler.
type ElectronScheduler interface {
sched.Scheduler
init(opts ...schedPolicyOption)
// Interface for log messages.
// Every ElectronScheduler implementer should provide definitions for these functions.
// This interface serves as a template to maintain consistent log messages.
// Each of these functions are supposed to call the Log(...) that sends the
// log message type, and the log message to the corresponding channels.
// Pass the logMessageType and the logMessage to the loggers for logging.
Log(logMType elecLogDef.LogMessageType, logMsg string)
// To be called when about to launch a task.
// Log message indicating that a task is about to start executing.
// Also, log the host on which the task is going to be launched.
LogTaskStarting(ts *def.Task, offer *mesos.Offer)
// To be called when an offer is taken.
// Log the chosen watts attribute for the task that has fit an offer.
LogTaskWattsConsideration(ts def.Task, host string, wattsToConsider float64)
// To be called when offers are received from Mesos.
// Log the number of offers received and/or information about the received offers.
LogOffersReceived(offers []*mesos.Offer)
// To be called when a scheduling policy declines Mesos offers, as
// there are no tasks pending to be scheduled.
// Log the host information corresponding to the offers that were declined.
LogNoPendingTasksDeclineOffers(offers *mesos.Offer)
// Log the number of tasks that are currently executing on the cluster.
LogNumberOfRunningTasks()
// To be called when a task fits a Mesos offer.
// Log information on the tasks that the new task is going to be coLocated with.
// Uses the coLocated(...) utility in helpers.go.
LogCoLocatedTasks(slaveID string)
// Log the scheduled trace of task.
// The schedTrace includes the TaskID and the hostname of the node
// where is the task is going to be launched.
LogSchedTrace(taskToSchedule *mesos.TaskInfo, offer *mesos.Offer)
// To be called when all the tasks have completed executing.
// Log message indicating that Electron has scheduled all the tasks.
LogTerminateScheduler()
// To be called when the offer is not consumed.
// Log message to indicate that the offer had insufficient resources.
LogInsufficientResourcesDeclineOffer(offer *mesos.Offer, offerResources ...interface{})
// To be called when offer is rescinded by Mesos.
LogOfferRescinded(offerID *mesos.OfferID)
// To be called when Mesos agent is lost
LogSlaveLost(slaveID *mesos.SlaveID)
// To be called when executor lost.
LogExecutorLost(executorID *mesos.ExecutorID, slaveID *mesos.SlaveID)
// Log a mesos error
LogMesosError(err string)
// Log an Electron error
LogElectronError(err error)
// Log Framework message
LogFrameworkMessage(executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, message string)
// Log Framework has been registered
LogFrameworkRegistered(frameworkID *mesos.FrameworkID, masterInfo *mesos.MasterInfo)
// Log Framework has been re-registered
LogFrameworkReregistered(masterInfo *mesos.MasterInfo)
// Log Framework has been disconnected from the Mesos master
LogDisconnected()
// Log Status update of a task
LogTaskStatusUpdate(status *mesos.TaskStatus)
}

View file

@ -5,27 +5,24 @@ import (
"bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/elektron/utilities/offerUtils"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler"
"log"
"time"
"math/rand"
)
// Decides if to take an offer or not.
func (s *FirstFit) takeOffer(offer *mesos.Offer, task def.Task) bool {
// Decides if to take an offer or not
func (s *FirstFit) takeOffer(spc SchedPolicyContext, offer *mesos.Offer, task def.Task) bool {
baseSchedRef := spc.(*baseScheduler)
cpus, mem, watts := offerUtils.OfferAgg(offer)
//TODO: Insert watts calculation here instead of taking them as a parameter
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
wattsConsideration, err := def.WattsToConsider(task, baseSchedRef.classMapWatts, offer)
if err != nil {
// Error in determining wattsConsideration.
log.Fatal(err)
// Error in determining wattsConsideration
baseSchedRef.LogElectronError(err)
}
if cpus >= task.CPU && mem >= task.RAM && (!s.wattsAsAResource || watts >= wattsConsideration) {
if cpus >= task.CPU && mem >= task.RAM && (!baseSchedRef.wattsAsAResource || watts >= wattsConsideration) {
return true
}
@ -34,120 +31,61 @@ func (s *FirstFit) takeOffer(offer *mesos.Offer, task def.Task) bool {
// Elektron scheduler implements the Scheduler interface.
type FirstFit struct {
base // Type embedded to inherit common functions
SchedPolicyState
}
// Initialization.
func (s *FirstFit) init(opts ...schedPolicyOption) {
s.base.init(opts...)
}
func (s *FirstFit) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
s.tasksCreated++
if !*s.RecordPCP {
// Turn on logging.
*s.RecordPCP = true
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts.
}
// If this is our first time running into this Agent.
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
}
// Add task to list of tasks running on node.
s.running[offer.GetSlaveId().GoString()][taskName] = true
resources := []*mesos.Resource{
mesosutil.NewScalarResource("cpus", task.CPU),
mesosutil.NewScalarResource("mem", task.RAM),
}
if s.wattsAsAResource {
if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil {
log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider)
resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider))
} else {
// Error in determining wattsConsideration.
log.Fatal(err)
}
}
return &mesos.TaskInfo{
Name: proto.String(taskName),
TaskId: &mesos.TaskID{
Value: proto.String("elektron-" + taskName),
},
SlaveId: offer.SlaveId,
Resources: resources,
Command: &mesos.CommandInfo{
Value: proto.String(task.CMD),
},
Container: &mesos.ContainerInfo{
Type: mesos.ContainerInfo_DOCKER.Enum(),
Docker: &mesos.ContainerInfo_DockerInfo{
Image: proto.String(task.Image),
Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated
},
},
}
}
func (s *FirstFit) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {
log.Printf("Received %d resource offers", len(offers))
func (s *FirstFit) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDriver, offers []*mesos.Offer) {
fmt.Println("FirstFit scheduling...")
baseSchedRef := spc.(*baseScheduler)
baseSchedRef.LogOffersReceived(offers)
for _, offer := range offers {
offerUtils.UpdateEnvironment(offer)
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
case <-baseSchedRef.Shutdown:
baseSchedRef.LogNoPendingTasksDeclineOffers(offer)
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
baseSchedRef.LogNumberOfRunningTasks()
continue
default:
}
tasks := []*mesos.TaskInfo{}
// First fit strategy.
// First fit strategy
offerTaken := false
for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i]
for i := 0; i < len(baseSchedRef.tasks); i++ {
task := baseSchedRef.tasks[i]
// Don't take offer if it doesn't match our task's host requirement.
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
// Decision to take the offer or not.
if s.takeOffer(offer, task) {
// Decision to take the offer or not
if s.takeOffer(spc, offer, task) {
log.Println("Co-Located with: ")
coLocated(s.running[offer.GetSlaveId().GoString()])
baseSchedRef.LogCoLocatedTasks(offer.GetSlaveId().GoString())
taskToSchedule := s.newTask(offer, task)
taskToSchedule := baseSchedRef.newTask(offer, task)
tasks = append(tasks, taskToSchedule)
log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname())
baseSchedRef.LogTaskStarting(&task, offer)
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
offerTaken = true
fmt.Println("Inst: ", *task.Instances)
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
baseSchedRef.LogSchedTrace(taskToSchedule, offer)
*task.Instances--
if *task.Instances <= 0 {
// All instances of task have been scheduled, remove it.
s.tasks[i] = s.tasks[len(s.tasks)-1]
s.tasks = s.tasks[:len(s.tasks)-1]
// All instances of task have been scheduled, remove it
baseSchedRef.tasks[i] = baseSchedRef.tasks[len(baseSchedRef.tasks)-1]
baseSchedRef.tasks = baseSchedRef.tasks[:len(baseSchedRef.tasks)-1]
if len(s.tasks) <= 0 {
log.Println("Done scheduling all tasks")
close(s.Shutdown)
if len(baseSchedRef.tasks) <= 0 {
baseSchedRef.LogTerminateScheduler()
close(baseSchedRef.Shutdown)
}
}
break // Offer taken, move on.
@ -156,31 +94,23 @@ func (s *FirstFit) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.
// If there was no match for the task.
if !offerTaken {
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
baseSchedRef.LogInsufficientResourcesDeclineOffer(offer, cpus, mem, watts)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
}
func (s *FirstFit) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) {
log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
if *status.State == mesos.TaskState_TASK_RUNNING {
s.tasksRunning++
} else if IsTerminal(status.State) {
delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value)
s.tasksRunning--
if s.tasksRunning == 0 {
select {
case <-s.Shutdown:
close(s.Done)
default:
// Switch scheduling policy only if feature enabled from CLI
if baseSchedRef.schedPolSwitchEnabled {
// Switching to a random scheduling policy.
// TODO: Switch based on some criteria.
index := rand.Intn(len(SchedPolicies))
for _, v := range SchedPolicies {
if index == 0 {
spc.SwitchSchedPol(v)
break
}
index--
}
}
log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
}

View file

@ -4,18 +4,16 @@ import (
"bitbucket.org/sunybingcloud/elektron/constants"
"bitbucket.org/sunybingcloud/elektron/def"
"errors"
"fmt"
"log"
"os"
elecLogDef "bitbucket.org/sunybingcloud/elektron/logging/def"
)
func coLocated(tasks map[string]bool) {
func coLocated(tasks map[string]bool, s baseScheduler) {
for task := range tasks {
log.Println(task)
s.Log(elecLogDef.GENERAL, task)
}
fmt.Println("---------------------")
s.Log(elecLogDef.GENERAL, "---------------------")
}
// Get the powerClass of the given hostname.
@ -28,15 +26,26 @@ func hostToPowerClass(hostName string) string {
return ""
}
// Scheduler policy options to help initialize schedulers.
// scheduler policy options to help initialize schedulers
type schedPolicyOption func(e ElectronScheduler) error
func WithSchedPolicy(schedPolicyName string) schedPolicyOption {
return func(s ElectronScheduler) error {
if schedPolicy, ok := SchedPolicies[schedPolicyName]; !ok {
return errors.New("Incorrect scheduling policy.")
} else {
s.(*baseScheduler).curSchedPolicy = schedPolicy
return nil
}
}
}
func WithTasks(ts []def.Task) schedPolicyOption {
return func(s ElectronScheduler) error {
if ts == nil {
return errors.New("Task[] is empty.")
} else {
s.(*base).tasks = ts
s.(*baseScheduler).tasks = ts
return nil
}
}
@ -44,43 +53,31 @@ func WithTasks(ts []def.Task) schedPolicyOption {
func WithWattsAsAResource(waar bool) schedPolicyOption {
return func(s ElectronScheduler) error {
s.(*base).wattsAsAResource = waar
s.(*baseScheduler).wattsAsAResource = waar
return nil
}
}
func WithClassMapWatts(cmw bool) schedPolicyOption {
return func(s ElectronScheduler) error {
s.(*base).classMapWatts = cmw
s.(*baseScheduler).classMapWatts = cmw
return nil
}
}
func WithRecordPCP(recordPCP *bool) schedPolicyOption {
return func(s ElectronScheduler) error {
s.(*base).RecordPCP = recordPCP
s.(*baseScheduler).RecordPCP = recordPCP
return nil
}
}
func WithSchedTracePrefix(schedTracePrefix string) schedPolicyOption {
return func(s ElectronScheduler) error {
logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log")
if err != nil {
return err
} else {
s.(*base).schedTrace = log.New(logFile, "", log.LstdFlags)
return nil
}
}
}
func WithShutdown(shutdown chan struct{}) schedPolicyOption {
return func(s ElectronScheduler) error {
if shutdown == nil {
return errors.New("Shutdown channel is nil.")
} else {
s.(*base).Shutdown = shutdown
s.(*baseScheduler).Shutdown = shutdown
return nil
}
}
@ -91,7 +88,7 @@ func WithDone(done chan struct{}) schedPolicyOption {
if done == nil {
return errors.New("Done channel is nil.")
} else {
s.(*base).Done = done
s.(*baseScheduler).Done = done
return nil
}
}
@ -102,8 +99,23 @@ func WithPCPLog(pcpLog chan struct{}) schedPolicyOption {
if pcpLog == nil {
return errors.New("PCPLog channel is nil.")
} else {
s.(*base).PCPLog = pcpLog
s.(*baseScheduler).PCPLog = pcpLog
return nil
}
}
}
func WithLoggingChannels(lmt chan elecLogDef.LogMessageType, msg chan string) schedPolicyOption {
return func(s ElectronScheduler) error {
s.(*baseScheduler).logMsgType = lmt
s.(*baseScheduler).logMsg = msg
return nil
}
}
func WithSchedPolSwitchEnabled(enableSchedPolicySwitch bool) schedPolicyOption {
return func(s ElectronScheduler) error {
s.(*baseScheduler).schedPolSwitchEnabled = enableSchedPolicySwitch
return nil
}
}

View file

@ -1,255 +0,0 @@
package schedulers
import (
"fmt"
"log"
"time"
"bitbucket.org/sunybingcloud/elektron/def"
"bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/elektron/utilities/offerUtils"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler"
)
// Decides if to take an offer or not.
func (s *MaxGreedyMins) takeOffer(offer *mesos.Offer, task def.Task,
totalCPU, totalRAM, totalWatts float64) bool {
cpus, mem, watts := offerUtils.OfferAgg(offer)
//TODO: Insert watts calculation here instead of taking them as a parameter
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
if err != nil {
// Error in determining wattsConsideration.
log.Fatal(err)
}
if (cpus >= (totalCPU + task.CPU)) && (mem >= (totalRAM + task.RAM)) &&
(!s.wattsAsAResource || (watts >= (totalWatts + wattsConsideration))) {
return true
}
return false
}
type MaxGreedyMins struct {
base //Type embedding to inherit common functions.
}
// Initialization.
func (s *MaxGreedyMins) init(opts ...schedPolicyOption) {
s.base.init(opts...)
// Sorting the tasks based on watts.
def.SortTasks(s.tasks, def.SortByWatts)
}
func (s *MaxGreedyMins) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
s.tasksCreated++
// Start recording only when we're creating the first task.
if !*s.RecordPCP {
// Turn on logging
*s.RecordPCP = true
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts.
}
// If this is our first time running into this Agent.
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
}
// Add task to list of tasks running on node.
s.running[offer.GetSlaveId().GoString()][taskName] = true
resources := []*mesos.Resource{
mesosutil.NewScalarResource("cpus", task.CPU),
mesosutil.NewScalarResource("mem", task.RAM),
}
if s.wattsAsAResource {
if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil {
log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider)
resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider))
} else {
// Error in determining wattsConsideration.
log.Fatal(err)
}
}
return &mesos.TaskInfo{
Name: proto.String(taskName),
TaskId: &mesos.TaskID{
Value: proto.String("elektron-" + taskName),
},
SlaveId: offer.SlaveId,
Resources: resources,
Command: &mesos.CommandInfo{
Value: proto.String(task.CMD),
},
Container: &mesos.ContainerInfo{
Type: mesos.ContainerInfo_DOCKER.Enum(),
Docker: &mesos.ContainerInfo_DockerInfo{
Image: proto.String(task.Image),
Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated.
},
},
}
}
// Determine if the remaining space inside of the offer is enough for this
// the task we need to create. If it is, create a TaskInfo and return it.
func (s *MaxGreedyMins) CheckFit(
i int,
task def.Task,
wattsConsideration float64,
offer *mesos.Offer,
totalCPU *float64,
totalRAM *float64,
totalWatts *float64) (bool, *mesos.TaskInfo) {
// Does the task fit.
if s.takeOffer(offer, task, *totalCPU, *totalRAM, *totalWatts) {
*totalWatts += wattsConsideration
*totalCPU += task.CPU
*totalRAM += task.RAM
log.Println("Co-Located with: ")
coLocated(s.running[offer.GetSlaveId().GoString()])
taskToSchedule := s.newTask(offer, task)
fmt.Println("Inst: ", *task.Instances)
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
*task.Instances--
if *task.Instances <= 0 {
// All instances of task have been scheduled, remove it.
s.tasks = append(s.tasks[:i], s.tasks[i+1:]...)
if len(s.tasks) <= 0 {
log.Println("Done scheduling all tasks")
close(s.Shutdown)
}
}
return true, taskToSchedule
}
return false, nil
}
func (s *MaxGreedyMins) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {
log.Printf("Received %d resource offers", len(offers))
for _, offer := range offers {
offerUtils.UpdateEnvironment(offer)
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
default:
}
tasks := []*mesos.TaskInfo{}
offerTaken := false
totalWatts := 0.0
totalCPU := 0.0
totalRAM := 0.0
// Assumes s.tasks is ordered in non-decreasing median max peak order.
// Attempt to schedule a single instance of the heaviest workload available first.
// Start from the back until one fits.
for i := len(s.tasks) - 1; i >= 0; i-- {
task := s.tasks[i]
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
if err != nil {
// Error in determining wattsConsideration.
log.Fatal(err)
}
// Don't take offer if it doesn't match our task's host requirement.
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
// TODO: Fix this so index doesn't need to be passed
taken, taskToSchedule := s.CheckFit(i, task, wattsConsideration, offer,
&totalCPU, &totalRAM, &totalWatts)
if taken {
offerTaken = true
tasks = append(tasks, taskToSchedule)
break
}
}
// Pack the rest of the offer with the smallest tasks.
for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i]
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
if err != nil {
// Error in determining wattsConsideration.
log.Fatal(err)
}
// Don't take offer if it doesn't match our task's host requirement.
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
for *task.Instances > 0 {
// TODO: Fix this so index doesn't need to be passed
taken, taskToSchedule := s.CheckFit(i, task, wattsConsideration, offer,
&totalCPU, &totalRAM, &totalWatts)
if taken {
offerTaken = true
tasks = append(tasks, taskToSchedule)
} else {
break // Continue on to next task.
}
}
}
if offerTaken {
log.Printf("Starting on [%s]\n", offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
} else {
// If there was no match for the task.
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
}
func (s *MaxGreedyMins) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) {
log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
if *status.State == mesos.TaskState_TASK_RUNNING {
s.tasksRunning++
} else if IsTerminal(status.State) {
delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value)
s.tasksRunning--
if s.tasksRunning == 0 {
select {
case <-s.Shutdown:
close(s.Done)
default:
}
}
}
log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
}

View file

@ -1,251 +0,0 @@
package schedulers
import (
"fmt"
"log"
"time"
"bitbucket.org/sunybingcloud/elektron/def"
"bitbucket.org/sunybingcloud/elektron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/elektron/utilities/offerUtils"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler"
)
// Decides if to take an offer or not.
func (s *MaxMin) takeOffer(offer *mesos.Offer, task def.Task,
totalCPU, totalRAM, totalWatts float64) bool {
cpus, mem, watts := offerUtils.OfferAgg(offer)
//TODO: Insert watts calculation here instead of taking them as a parameter
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
if err != nil {
// Error in determining wattsConsideration.
log.Fatal(err)
}
if (cpus >= (totalCPU + task.CPU)) && (mem >= (totalRAM + task.RAM)) &&
(!s.wattsAsAResource || (watts >= (totalWatts + wattsConsideration))) {
return true
}
return false
}
type MaxMin struct {
base //Type embedding to inherit common functions.
}
// Initialization.
func (s *MaxMin) init(opts ...schedPolicyOption) {
s.base.init(opts...)
// Sorting the tasks based on Watts.
def.SortTasks(s.tasks, def.SortByWatts)
}
func (s *MaxMin) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
s.tasksCreated++
// Start recording only when we're creating the first task.
if !*s.RecordPCP {
// Turn on logging.
*s.RecordPCP = true
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts.
}
// If this is our first time running into this Agent.
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
}
// Add task to list of tasks running on node.
s.running[offer.GetSlaveId().GoString()][taskName] = true
resources := []*mesos.Resource{
mesosutil.NewScalarResource("cpus", task.CPU),
mesosutil.NewScalarResource("mem", task.RAM),
}
if s.wattsAsAResource {
if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil {
log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider)
resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider))
} else {
// Error in determining wattsConsideration.
log.Fatal(err)
}
}
return &mesos.TaskInfo{
Name: proto.String(taskName),
TaskId: &mesos.TaskID{
Value: proto.String("elektron-" + taskName),
},
SlaveId: offer.SlaveId,
Resources: resources,
Command: &mesos.CommandInfo{
Value: proto.String(task.CMD),
},
Container: &mesos.ContainerInfo{
Type: mesos.ContainerInfo_DOCKER.Enum(),
Docker: &mesos.ContainerInfo_DockerInfo{
Image: proto.String(task.Image),
Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated.
},
},
}
}
// Determine if the remaining space inside of the offer is enough for this
// the task we need to create. If it is, create a TaskInfo and return it.
func (s *MaxMin) CheckFit(
i int,
task def.Task,
wattsConsideration float64,
offer *mesos.Offer,
totalCPU *float64,
totalRAM *float64,
totalWatts *float64) (bool, *mesos.TaskInfo) {
// Does the task fit.
if s.takeOffer(offer, task, *totalCPU, *totalRAM, *totalWatts) {
*totalWatts += wattsConsideration
*totalCPU += task.CPU
*totalRAM += task.RAM
log.Println("Co-Located with: ")
coLocated(s.running[offer.GetSlaveId().GoString()])
taskToSchedule := s.newTask(offer, task)
fmt.Println("Inst: ", *task.Instances)
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
*task.Instances--
if *task.Instances <= 0 {
// All instances of task have been scheduled, remove it.
s.tasks = append(s.tasks[:i], s.tasks[i+1:]...)
if len(s.tasks) <= 0 {
log.Println("Done scheduling all tasks")
close(s.Shutdown)
}
}
return true, taskToSchedule
}
return false, nil
}
func (s *MaxMin) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {
log.Printf("Received %d resource offers", len(offers))
for _, offer := range offers {
offerUtils.UpdateEnvironment(offer)
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
default:
}
tasks := []*mesos.TaskInfo{}
offerTaken := false
totalWatts := 0.0
totalCPU := 0.0
totalRAM := 0.0
// Assumes s.tasks is ordered in non-decreasing median max peak order.
// Attempt to schedule a single instance of the heaviest workload available first.
// Start from the back until one fits.
direction := false // True = Min Max, False = Max Min.
var index int
start := true // If false then index has changed and need to keep it that way.
for i := 0; i < len(s.tasks); i++ {
// We need to pick a min task or a max task
// depending on the value of direction.
if direction && start {
index = 0
} else if start {
index = len(s.tasks) - i - 1
}
task := s.tasks[index]
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
if err != nil {
// Error in determining wattsConsideration.
log.Fatal(err)
}
// Don't take offer it is doesn't match our task's host requirement.
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
// TODO: Fix this so index doesn't need to be passed
taken, taskToSchedule := s.CheckFit(index, task, wattsConsideration, offer,
&totalCPU, &totalRAM, &totalWatts)
if taken {
offerTaken = true
tasks = append(tasks, taskToSchedule)
// Need to change direction and set start to true.
// Setting start to true would ensure that index be set accurately again.
direction = !direction
start = true
i--
} else {
// Need to move index depending on the value of direction.
if direction {
index++
start = false
} else {
index--
start = false
}
}
}
if offerTaken {
log.Printf("Starting on [%s]\n", offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
} else {
// If there was no match for the task
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
}
func (s *MaxMin) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) {
log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
if *status.State == mesos.TaskState_TASK_RUNNING {
s.tasksRunning++
} else if IsTerminal(status.State) {
delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value)
s.tasksRunning--
if s.tasksRunning == 0 {
select {
case <-s.Shutdown:
close(s.Done)
default:
}
}
}
log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
}

16
schedulers/schedPolicy.go Normal file
View file

@ -0,0 +1,16 @@
package schedulers
import (
mesos "github.com/mesos/mesos-go/mesosproto"
sched "github.com/mesos/mesos-go/scheduler"
)
type SchedPolicyContext interface {
// Change the state of scheduling.
SwitchSchedPol(s SchedPolicyState)
}
type SchedPolicyState interface {
// Define the particular scheduling policy's methodology of resource offer consumption.
ConsumeOffers(SchedPolicyContext, sched.SchedulerDriver, []*mesos.Offer)
}

View file

@ -1,30 +1,32 @@
package schedulers
import "github.com/mesos/mesos-go/scheduler"
import (
sched "github.com/mesos/mesos-go/scheduler"
)
// Names of different scheduling policies.
const (
ff = "first-fit"
bp = "bin-packing"
mgm = "max-greedymins"
mm = "max-min"
ff = "first-fit"
bp = "bin-packing"
mgm = "max-greedymins"
mm = "max-min"
)
// Scheduler class factory.
var Schedulers map[string]scheduler.Scheduler = map[string]scheduler.Scheduler{
ff: &FirstFit{base: base{}},
bp: &BinPacking{base: base{}},
mgm: &MaxGreedyMins{base: base{}},
mm: &MaxMin{base: base{}},
// Scheduling policy factory
var SchedPolicies map[string]SchedPolicyState = map[string]SchedPolicyState{
ff: &FirstFit{},
bp: &BinPackSortedWatts{},
mgm: &MaxGreedyMins{},
mm: &MaxMin{},
}
// Build the scheduling policy with the options being applied.
func BuildSchedPolicy(s scheduler.Scheduler, opts ...schedPolicyOption) {
// build the scheduling policy with the options being applied
func buildScheduler(s sched.Scheduler, opts ...schedPolicyOption) {
s.(ElectronScheduler).init(opts...)
}
func SchedFactory(schedPolicyName string, opts ...schedPolicyOption) scheduler.Scheduler {
s := Schedulers[schedPolicyName]
BuildSchedPolicy(s, opts...)
func SchedFactory(opts ...schedPolicyOption) sched.Scheduler {
s := &baseScheduler{}
buildScheduler(s, opts...)
return s
}