From 463735572124dd4c2d32ef9eb5653224ef52dc69 Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Thu, 4 Oct 2018 19:24:16 -0400 Subject: [PATCH] Reorganized code. Building the scheduler in phases. To be able to do this, increased visibility of schedulers/helpers.go#schedulerOptions. Refactored dependent code. --- scheduler.go | 167 ++++++++++++++++++-------------- schedulers/base.go | 2 +- schedulers/electronScheduler.go | 2 +- schedulers/helpers.go | 26 ++--- schedulers/store.go | 4 +- 5 files changed, 110 insertions(+), 91 deletions(-) diff --git a/scheduler.go b/scheduler.go index 51ec447..1939ecc 100644 --- a/scheduler.go +++ b/scheduler.go @@ -15,8 +15,8 @@ import ( "gitlab.com/spdf/elektron/def" elekLogDef "gitlab.com/spdf/elektron/logging/def" "gitlab.com/spdf/elektron/pcp" - "gitlab.com/spdf/elektron/schedulers" "gitlab.com/spdf/elektron/powerCap" + "gitlab.com/spdf/elektron/schedulers" ) var master = flag.String("master", "", "Location of leading Mesos master -- :") @@ -71,12 +71,13 @@ func listAllSchedulingPolicies() { func main() { flag.Parse() - // checking to see if we need to just list the pluggable scheduling policies + // Checking to see if we need to just list the pluggable scheduling policies if *listSchedPolicies { listAllSchedulingPolicies() os.Exit(1) } + // Creating logger and attaching different logging platforms. startTime := time.Now() formattedStartTime := startTime.Format("20060102150405") // Checking if prefix contains any special characters @@ -84,16 +85,18 @@ func main() { log.Fatal("log file prefix should not contain '/'.") } logPrefix := *pcplogPrefix + "_" + formattedStartTime - - // creating logger and attaching different logging platforms logger := elekLogDef.BuildLogger(startTime, logPrefix) - // logging channels + // Logging channels. logMType := make(chan elekLogDef.LogMessageType) logMsg := make(chan string) go logger.Listen(logMType, logMsg) - // If non-default scheduling policy given, - // checking if scheduling policyName exists + // First we need to build the scheduler using scheduler options. + var schedOptions []schedulers.SchedulerOptions = make([]schedulers.SchedulerOptions, 0, 10) + + // OPTIONAL PARAMETERS + // Scheduling Policy Name + // If non-default scheduling policy given, checking if name exists. if *schedPolicyName != "first-fit" { if _, ok := schedulers.SchedPolicies[*schedPolicyName]; !ok { // invalid scheduling policy @@ -103,68 +106,54 @@ func main() { } } - if *tasksFile == "" { - //fmt.Println("No file containing tasks specifiction provided.") - logger.WriteLog(elekLogDef.ERROR, "No file containing tasks specification provided") - os.Exit(1) - } - - if *hiThreshold < *loThreshold { - //fmt.Println("High threshold is of a lower value than low threshold.") - logger.WriteLog(elekLogDef.ERROR, "High threshold is of a lower value than low threshold") - os.Exit(1) - } - - tasks, err := def.TasksFromJSON(*tasksFile) - if err != nil || len(tasks) == 0 { - //fmt.Println("Invalid tasks specification file provided") - logger.WriteLog(elekLogDef.ERROR, "Invalid tasks specification file provided") - os.Exit(1) - } - - //log.Println("Scheduling the following tasks:") - logger.WriteLog(elekLogDef.GENERAL, "Scheduling the following tasks:") - for _, task := range tasks { - fmt.Println(task) - } - - if *enableSchedPolicySwitch { - if spcf := *schedPolConfigFile; spcf == "" { - logger.WriteLog(elekLogDef.ERROR, "No file containing characteristics for scheduling policies") - } else { - // Initializing the characteristics of the scheduling policies. - schedulers.InitSchedPolicyCharacteristics(spcf) - } - } - + // CHANNELS AND FLAGS. shutdown := make(chan struct{}) done := make(chan struct{}) pcpLog := make(chan struct{}) recordPCP := false - scheduler := schedulers.SchedFactory( - schedulers.WithSchedPolicy(*schedPolicyName), - schedulers.WithTasks(tasks), - schedulers.WithWattsAsAResource(*wattsAsAResource), - schedulers.WithClassMapWatts(*classMapWatts), - schedulers.WithRecordPCP(&recordPCP), - schedulers.WithShutdown(shutdown), - schedulers.WithDone(done), - schedulers.WithPCPLog(pcpLog), - schedulers.WithLoggingChannels(logMType, logMsg), - schedulers.WithSchedPolSwitchEnabled(*enableSchedPolicySwitch, *schedPolSwitchCriteria), - schedulers.WithNameOfFirstSchedPolToFix(*fixFirstSchedPol), - schedulers.WithFixedSchedulingWindow(*fixSchedWindow, *schedWindowSize)) - driver, err := sched.NewMesosSchedulerDriver(sched.DriverConfig{ - Master: *master, - Framework: &mesos.FrameworkInfo{ - Name: proto.String("Elektron"), - User: proto.String(""), - }, - Scheduler: scheduler, - }) - if err != nil { - log.Printf("Unable to create scheduler driver: %s", err) - return + + // Logging channels. + // These channels are used by the framework to log messages. + // The channels are used to send the type of log message and the message string. + schedOptions = append(schedOptions, schedulers.WithLoggingChannels(logMType, logMsg)) + + // Shutdown indicator channels. + // These channels are used to notify, + // 1. scheduling is complete. + // 2. all scheduled tasks have completed execution and framework can shutdown. + schedOptions = append(schedOptions, schedulers.WithShutdown(shutdown)) + schedOptions = append(schedOptions, schedulers.WithDone(done)) + + // If here, then valid scheduling policy name provided. + schedOptions = append(schedOptions, schedulers.WithSchedPolicy(*schedPolicyName)) + + // Scheduling Policy Switching. + if *enableSchedPolicySwitch { + // Scheduling policy config file required. + if spcf := *schedPolConfigFile; spcf == "" { + logger.WriteLog(elekLogDef.ERROR, "No file containing characteristics for"+ + " scheduling policies") + os.Exit(1) + } else { + // Initializing the characteristics of the scheduling policies. + schedulers.InitSchedPolicyCharacteristics(spcf) + schedOptions = append(schedOptions, schedulers.WithSchedPolSwitchEnabled(*enableSchedPolicySwitch, *schedPolSwitchCriteria)) + // Fix First Scheduling Policy. + schedOptions = append(schedOptions, schedulers.WithNameOfFirstSchedPolToFix(*fixFirstSchedPol)) + // Fix Scheduling Window. + schedOptions = append(schedOptions, schedulers.WithFixedSchedulingWindow(*fixSchedWindow, *schedWindowSize)) + } + } + + // Watts as a Resource (WaaR) and ClassMapWatts (CMW). + // If WaaR and CMW is enabled then for each task the class_to_watts mapping is used to + // fit tasks into offers. + // If CMW is disabled, then the Median of Medians Max Peak Power Usage value is used + // as the watts value for each task. + if *wattsAsAResource { + logger.WriteLog(elekLogDef.GENERAL, "WaaR enabled...") + schedOptions = append(schedOptions, schedulers.WithWattsAsAResource(*wattsAsAResource)) + schedOptions = append(schedOptions, schedulers.WithClassMapWatts(*classMapWatts)) } // REQUIRED PARAMETERS. // PCP logging, Power capping and High and Low thresholds. @@ -204,12 +193,40 @@ func main() { } } - // Checking if pcp config file exists. - if _, err := os.Stat(*pcpConfigFile); os.IsNotExist(err) { - logger.WriteLog(elekLogDef.ERROR, "PCP config file does not exist!") + // Tasks + // If httpServer is disabled, then path of file containing workload needs to be provided. + if *tasksFile == "" { + logger.WriteLog(elekLogDef.ERROR, "No file containing tasks specification"+ + " provided.") + os.Exit(1) + } + tasks, err := def.TasksFromJSON(*tasksFile) + if err != nil || len(tasks) == 0 { + logger.WriteLog(elekLogDef.ERROR, "Invalid tasks specification file "+ + "provided.") + os.Exit(1) + } + schedOptions = append(schedOptions, schedulers.WithTasks(tasks)) + + // Scheduler. + scheduler := schedulers.SchedFactory(schedOptions...) + + // Scheduler driver. + driver, err := sched.NewMesosSchedulerDriver(sched.DriverConfig{ + Master: *master, + Framework: &mesos.FrameworkInfo{ + Name: proto.String("Elektron"), + User: proto.String(""), + }, + Scheduler: scheduler, + }) + if err != nil { + logger.WriteLog(elekLogDef.ERROR, fmt.Sprintf("Unable to create scheduler driver:"+ + " %s", err)) os.Exit(1) } + // Starting PCP logging. if noPowercap { go pcp.Start(pcpLog, &recordPCP, logMType, logMsg, *pcpConfigFile, scheduler) } else if extrema { @@ -220,7 +237,8 @@ func main() { *loThreshold, logMType, logMsg, *pcpConfigFile) } - time.Sleep(1 * time.Second) // Take a second between starting PCP log and continuing + // Take a second between starting PCP log and continuing. + time.Sleep(1 * time.Second) // Attempt to handle SIGINT to not leave pmdumptext running. // Catch interrupt. @@ -233,34 +251,35 @@ func main() { return } - log.Println("Received SIGINT...stopping") + log.Println("Received SIGINT... stopping") close(done) }() go func() { - // Signals we have scheduled every task we have. + // Signals we have scheduled every task we have select { case <-shutdown: //case <-time.After(shutdownTimeout): } - // All tasks have finished. + // All tasks have finished select { case <-done: close(pcpLog) time.Sleep(5 * time.Second) //Wait for PCP to log a few more seconds + // Closing logging channels. close(logMType) close(logMsg) //case <-time.After(shutdownTimeout): } - // Done shutting down. + // Done shutting down driver.Stop(false) }() - log.Println("Starting...") + // Starting the scheduler driver. if status, err := driver.Run(); err != nil { log.Printf("Framework stopped with status %s and error: %s\n", status.String(), err.Error()) } diff --git a/schedulers/base.go b/schedulers/base.go index b5c4551..351eae9 100644 --- a/schedulers/base.go +++ b/schedulers/base.go @@ -84,7 +84,7 @@ type BaseScheduler struct { hasReceivedResourceOffers bool } -func (s *BaseScheduler) init(opts ...schedulerOptions) { +func (s *BaseScheduler) init(opts ...SchedulerOptions) { for _, opt := range opts { // applying options if err := opt(s); err != nil { diff --git a/schedulers/electronScheduler.go b/schedulers/electronScheduler.go index 69581d4..39b229b 100644 --- a/schedulers/electronScheduler.go +++ b/schedulers/electronScheduler.go @@ -12,7 +12,7 @@ import ( // Implements mesos scheduler. type ElectronScheduler interface { sched.Scheduler - init(opts ...schedulerOptions) + init(opts ...SchedulerOptions) // Interface for log messages. // Every ElectronScheduler implementer should provide definitions for these functions. diff --git a/schedulers/helpers.go b/schedulers/helpers.go index db08229..f8e8d11 100644 --- a/schedulers/helpers.go +++ b/schedulers/helpers.go @@ -31,9 +31,9 @@ func hostToPowerClass(hostName string) string { } // scheduler policy options to help initialize schedulers -type schedulerOptions func(e ElectronScheduler) error +type SchedulerOptions func(e ElectronScheduler) error -func WithSchedPolicy(schedPolicyName string) schedulerOptions { +func WithSchedPolicy(schedPolicyName string) SchedulerOptions { return func(s ElectronScheduler) error { if schedPolicy, ok := SchedPolicies[schedPolicyName]; !ok { return errors.New("Incorrect scheduling policy.") @@ -44,7 +44,7 @@ func WithSchedPolicy(schedPolicyName string) schedulerOptions { } } -func WithTasks(ts []def.Task) schedulerOptions { +func WithTasks(ts []def.Task) SchedulerOptions { return func(s ElectronScheduler) error { if ts == nil { return errors.New("Task[] is empty.") @@ -55,28 +55,28 @@ func WithTasks(ts []def.Task) schedulerOptions { } } -func WithWattsAsAResource(waar bool) schedulerOptions { +func WithWattsAsAResource(waar bool) SchedulerOptions { return func(s ElectronScheduler) error { s.(*BaseScheduler).wattsAsAResource = waar return nil } } -func WithClassMapWatts(cmw bool) schedulerOptions { +func WithClassMapWatts(cmw bool) SchedulerOptions { return func(s ElectronScheduler) error { s.(*BaseScheduler).classMapWatts = cmw return nil } } -func WithRecordPCP(recordPCP *bool) schedulerOptions { +func WithRecordPCP(recordPCP *bool) SchedulerOptions { return func(s ElectronScheduler) error { s.(*BaseScheduler).RecordPCP = recordPCP return nil } } -func WithShutdown(shutdown chan struct{}) schedulerOptions { +func WithShutdown(shutdown chan struct{}) SchedulerOptions { return func(s ElectronScheduler) error { if shutdown == nil { return errors.New("Shutdown channel is nil.") @@ -87,7 +87,7 @@ func WithShutdown(shutdown chan struct{}) schedulerOptions { } } -func WithDone(done chan struct{}) schedulerOptions { +func WithDone(done chan struct{}) SchedulerOptions { return func(s ElectronScheduler) error { if done == nil { return errors.New("Done channel is nil.") @@ -98,7 +98,7 @@ func WithDone(done chan struct{}) schedulerOptions { } } -func WithPCPLog(pcpLog chan struct{}) schedulerOptions { +func WithPCPLog(pcpLog chan struct{}) SchedulerOptions { return func(s ElectronScheduler) error { if pcpLog == nil { return errors.New("PCPLog channel is nil.") @@ -109,7 +109,7 @@ func WithPCPLog(pcpLog chan struct{}) schedulerOptions { } } -func WithLoggingChannels(lmt chan elekLogDef.LogMessageType, msg chan string) schedulerOptions { +func WithLoggingChannels(lmt chan elekLogDef.LogMessageType, msg chan string) SchedulerOptions { return func(s ElectronScheduler) error { s.(*BaseScheduler).logMsgType = lmt s.(*BaseScheduler).logMsg = msg @@ -117,7 +117,7 @@ func WithLoggingChannels(lmt chan elekLogDef.LogMessageType, msg chan string) sc } } -func WithSchedPolSwitchEnabled(enableSchedPolicySwitch bool, switchingCriteria string) schedulerOptions { +func WithSchedPolSwitchEnabled(enableSchedPolicySwitch bool, switchingCriteria string) SchedulerOptions { return func(s ElectronScheduler) error { s.(*BaseScheduler).schedPolSwitchEnabled = enableSchedPolicySwitch // Checking if valid switching criteria. @@ -129,7 +129,7 @@ func WithSchedPolSwitchEnabled(enableSchedPolicySwitch bool, switchingCriteria s } } -func WithNameOfFirstSchedPolToFix(nameOfFirstSchedPol string) schedulerOptions { +func WithNameOfFirstSchedPolToFix(nameOfFirstSchedPol string) SchedulerOptions { return func(s ElectronScheduler) error { if nameOfFirstSchedPol == "" { lmt := elekLogDef.WARNING @@ -146,7 +146,7 @@ func WithNameOfFirstSchedPolToFix(nameOfFirstSchedPol string) schedulerOptions { } } -func WithFixedSchedulingWindow(toFixSchedWindow bool, fixedSchedWindowSize int) schedulerOptions { +func WithFixedSchedulingWindow(toFixSchedWindow bool, fixedSchedWindowSize int) SchedulerOptions { return func(s ElectronScheduler) error { if toFixSchedWindow { if fixedSchedWindowSize <= 0 { diff --git a/schedulers/store.go b/schedulers/store.go index b7c9133..0fa88f2 100644 --- a/schedulers/store.go +++ b/schedulers/store.go @@ -111,11 +111,11 @@ func InitSchedPolicyCharacteristics(schedPoliciesConfigFilename string) error { } // build the scheduler with the options being applied -func buildScheduler(s sched.Scheduler, opts ...schedulerOptions) { +func buildScheduler(s sched.Scheduler, opts ...SchedulerOptions) { s.(ElectronScheduler).init(opts...) } -func SchedFactory(opts ...schedulerOptions) sched.Scheduler { +func SchedFactory(opts ...SchedulerOptions) sched.Scheduler { s := &BaseScheduler{} buildScheduler(s, opts...) return s