From 04f24beac5cd6654330c01c70a5b404c291ee3ea Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Tue, 26 Sep 2017 13:17:47 -0400 Subject: [PATCH] scheduling policies pluggable from commandline --- power-capping/extrema.go | 2 +- power-capping/progressive-extrema.go | 2 +- scheduler.go | 76 ++++++++++++++++++++----- schedulers/base.go | 19 ++++++- schedulers/bin-packing.go | 33 +++-------- schedulers/first-fit.go | 30 ++-------- schedulers/helpers.go | 83 ++++++++++++++++++++++++++++ schedulers/max-greedymins.go | 31 ++--------- schedulers/max-min.go | 31 ++--------- schedulers/store.go | 30 ++++++++++ 10 files changed, 216 insertions(+), 121 deletions(-) create mode 100644 schedulers/store.go diff --git a/power-capping/extrema.go b/power-capping/extrema.go index a4c3dea..b17d60d 100644 --- a/power-capping/extrema.go +++ b/power-capping/extrema.go @@ -1,8 +1,8 @@ package pcp import ( - "bitbucket.org/sunybingcloud/elektron/rapl" "bitbucket.org/sunybingcloud/elektron/pcp" + "bitbucket.org/sunybingcloud/elektron/rapl" "bufio" "container/ring" "log" diff --git a/power-capping/progressive-extrema.go b/power-capping/progressive-extrema.go index 03a80bc..10d38d8 100644 --- a/power-capping/progressive-extrema.go +++ b/power-capping/progressive-extrema.go @@ -2,8 +2,8 @@ package pcp import ( "bitbucket.org/sunybingcloud/elektron/constants" - "bitbucket.org/sunybingcloud/elektron/rapl" "bitbucket.org/sunybingcloud/elektron/pcp" + "bitbucket.org/sunybingcloud/elektron/rapl" "bitbucket.org/sunybingcloud/elektron/utilities" "bufio" "container/ring" diff --git a/scheduler.go b/scheduler.go index 15add1f..dc6d256 100644 --- a/scheduler.go +++ b/scheduler.go @@ -17,29 +17,63 @@ import ( var master = flag.String("master", ":5050", "Location of leading Mesos master") var tasksFile = flag.String("workload", "", "JSON file containing task definitions") -var wattsAsAResource = flag.Bool("wattsAsAResource", false, "Enable Watts as a Resource. This allows the usage of the Watts attribute (if present) in the workload definition during offer matching.") +var wattsAsAResource = flag.Bool("wattsAsAResource", false, "Enable Watts as a Resource. "+ + "This allows the usage of the Watts attribute (if present) in the workload definition during offer matching.") var pcplogPrefix = flag.String("logPrefix", "", "Prefix for PCP log file") -var hiThreshold = flag.Float64("hiThreshold", 0.0, "Upperbound for Cluster average historical power consumption, beyond which extrema/progressive-extrema would start power-capping") -var loThreshold = flag.Float64("loThreshold", 0.0, "Lowerbound for Cluster average historical power consumption, below which extrema/progressive-extrema would stop power-capping") +var hiThreshold = flag.Float64("hiThreshold", 0.0, "Upperbound for Cluster average historical power consumption, "+ + "beyond which extrema/progressive-extrema would start power-capping") +var loThreshold = flag.Float64("loThreshold", 0.0, "Lowerbound for Cluster average historical power consumption, "+ + "below which extrema/progressive-extrema would stop power-capping") var classMapWatts = flag.Bool("classMapWatts", false, "Enable mapping of watts to powerClass of node") +var schedPolicyName = flag.String("schedPolicy", "first-fit", "Name of the scheduling policy to be used (default = first-fit).\n "+ + "Use option -listSchedPolicies to get the names of available scheduling policies") +var listSchedPolicies = flag.Bool("listSchedPolicies", false, "Names of the pluaggable scheduling policies.") // Short hand args func init() { flag.StringVar(master, "m", ":5050", "Location of leading Mesos master (shorthand)") flag.StringVar(tasksFile, "w", "", "JSON file containing task definitions (shorthand)") - flag.BoolVar(wattsAsAResource, "waar", false, "Enable Watts as a Resource. " + + flag.BoolVar(wattsAsAResource, "waar", false, "Enable Watts as a Resource. "+ "This allows the usage of the Watts attribute (if present) in the workload definition during offer matching. (shorthand)") flag.StringVar(pcplogPrefix, "p", "", "Prefix for PCP log file (shorthand)") - flag.Float64Var(hiThreshold, "ht", 700.0, "Upperbound for Cluster average historical power consumption, " + + flag.Float64Var(hiThreshold, "ht", 700.0, "Upperbound for Cluster average historical power consumption, "+ "beyond which extrema/progressive-extrema would start power-capping (shorthand)") - flag.Float64Var(loThreshold, "lt", 400.0, "Lowerbound for Cluster average historical power consumption, " + + flag.Float64Var(loThreshold, "lt", 400.0, "Lowerbound for Cluster average historical power consumption, "+ "below which extrema/progressive-extrema would stop power-capping (shorthand)") flag.BoolVar(classMapWatts, "cmw", false, "Enable mapping of watts to powerClass of node (shorthand)") + flag.StringVar(schedPolicyName, "sp", "first-fit", "Name of the scheduling policy to be used (default = first-fit).\n "+ + "Use option -listSchedPolicies to get the names of available scheduling policies (shorthand)") + flag.BoolVar(listSchedPolicies, "lsp", false, "Names of the pluaggable scheduling policies. (shorthand)") +} + +func listAllSchedulingPolicies() { + fmt.Println("Scheduling Policies") + fmt.Println("-------------------") + for policyName, _ := range schedulers.Schedulers { + fmt.Println(policyName) + } } func main() { flag.Parse() + // checking to see if we need to just list the pluggable scheduling policies + if *listSchedPolicies { + listAllSchedulingPolicies() + os.Exit(1) + } + + // If non-default scheduling policy given, + // checking if scheduling policyName exists + if *schedPolicyName != "first-fit" { + if _, ok := schedulers.Schedulers[*schedPolicyName]; !ok { + // invalid scheduling policy + log.Println("Invalid scheduling policy given. The possible scheduling policies are:") + listAllSchedulingPolicies() + os.Exit(1) + } + } + if *tasksFile == "" { fmt.Println("No file containing tasks specifiction provided.") os.Exit(1) @@ -63,7 +97,19 @@ func main() { startTime := time.Now().Format("20060102150405") logPrefix := *pcplogPrefix + "_" + startTime - scheduler := schedulers.NewFirstFit(tasks, *wattsAsAResource, logPrefix, *classMapWatts) + shutdown := make(chan struct{}) + done := make(chan struct{}) + pcpLog := make(chan struct{}) + recordPCP := false + scheduler := schedulers.SchedFactory(*schedPolicyName, + schedulers.WithTasks(tasks), + schedulers.WithWattsAsAResource(*wattsAsAResource), + schedulers.WithClassMapWatts(*classMapWatts), + schedulers.WithSchedTracePrefix(logPrefix), + schedulers.WithRecordPCP(&recordPCP), + schedulers.WithShutdown(shutdown), + schedulers.WithDone(done), + schedulers.WithPCPLog(pcpLog)) driver, err := sched.NewMesosSchedulerDriver(sched.DriverConfig{ Master: *master, Framework: &mesos.FrameworkInfo{ @@ -77,9 +123,9 @@ func main() { return } - go pcp.Start(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix) - //go pcp.StartPCPLogAndExtremaDynamicCap(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix, *hiThreshold, *loThreshold) - //go pcp.StartPCPLogAndProgressiveExtremaCap(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix, *hiThreshold, *loThreshold) + go pcp.Start(pcpLog, &recordPCP, logPrefix) + //go pcp.StartPCPLogAndExtremaDynamicCap(pcpLog, &recordPCP, logPrefix, *hiThreshold, *loThreshold) + //go pcp.StartPCPLogAndProgressiveExtremaCap(pcpLog, &recordPCP, logPrefix, *hiThreshold, *loThreshold) time.Sleep(1 * time.Second) // Take a second between starting PCP log and continuing // Attempt to handle SIGINT to not leave pmdumptext running @@ -89,26 +135,26 @@ func main() { signal.Notify(c, os.Interrupt, os.Kill) s := <-c if s != os.Interrupt { - close(scheduler.PCPLog) + close(pcpLog) return } log.Printf("Received SIGINT...stopping") - close(scheduler.Done) + close(done) }() go func() { // Signals we have scheduled every task we have select { - case <-scheduler.Shutdown: + case <-shutdown: //case <-time.After(shutdownTimeout): } // All tasks have finished select { - case <-scheduler.Done: - close(scheduler.PCPLog) + case <-done: + close(pcpLog) time.Sleep(5 * time.Second) //Wait for PCP to log a few more seconds //case <-time.After(shutdownTimeout): } diff --git a/schedulers/base.go b/schedulers/base.go index 62a4b9b..4cda5cb 100644 --- a/schedulers/base.go +++ b/schedulers/base.go @@ -7,7 +7,14 @@ import ( "log" ) +// Implements mesos scheduler. +type ElectronScheduler interface { + sched.Scheduler + init(opts ...schedPolicyOption) +} + type base struct { + ElectronScheduler tasksCreated int tasksRunning int tasks []def.Task @@ -18,7 +25,7 @@ type base struct { // First set of PCP values are garbage values, signal to logger to start recording when we're // about to schedule a new task - RecordPCP bool + RecordPCP *bool // This channel is closed when the program receives an interrupt, // signalling that the program should shut down. @@ -33,6 +40,16 @@ type base struct { schedTrace *log.Logger } +func (s *base) init(opts ...schedPolicyOption) { + for _, opt := range opts { + // applying options + if err := opt(s); err != nil { + log.Fatal(err) + } + } + s.running = make(map[string]map[string]bool) +} + func (s *base) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) { log.Printf("Offer %s rescinded", offerID) } diff --git a/schedulers/bin-packing.go b/schedulers/bin-packing.go index 15ae698..d5b0419 100644 --- a/schedulers/bin-packing.go +++ b/schedulers/bin-packing.go @@ -10,7 +10,6 @@ import ( "github.com/mesos/mesos-go/mesosutil" sched "github.com/mesos/mesos-go/scheduler" "log" - "os" "time" ) @@ -37,38 +36,20 @@ type BinPacking struct { base // Type embedded to inherit common functions } -// New elektron scheduler -func NewBinPacking(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *BinPacking { - def.SortTasks(tasks, def.SortByWatts) - - logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") - if err != nil { - log.Fatal(err) - } - - s := &BinPacking{ - base: base{ - tasks: tasks, - wattsAsAResource: wattsAsAResource, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - RecordPCP: false, - schedTrace: log.New(logFile, "", log.LstdFlags), - }, - } - return s +// Initialization +func (s *BinPacking) init(opts ...schedPolicyOption) { + s.base.init(opts...) + // sorting the tasks based on watts + def.SortTasks(s.tasks, def.SortByWatts) } func (s *BinPacking) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) s.tasksCreated++ - if !s.RecordPCP { + if !*s.RecordPCP { // Turn on logging - s.RecordPCP = true + *s.RecordPCP = true time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts } diff --git a/schedulers/first-fit.go b/schedulers/first-fit.go index 7894b5d..0b8d46a 100644 --- a/schedulers/first-fit.go +++ b/schedulers/first-fit.go @@ -10,7 +10,6 @@ import ( "github.com/mesos/mesos-go/mesosutil" sched "github.com/mesos/mesos-go/scheduler" "log" - "os" "time" ) @@ -38,37 +37,18 @@ type FirstFit struct { base // Type embedded to inherit common functions } -// New elektron scheduler -func NewFirstFit(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *FirstFit { - - logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") - if err != nil { - log.Fatal(err) - } - - s := &FirstFit{ - base: base{ - tasks: tasks, - wattsAsAResource: wattsAsAResource, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - RecordPCP: false, - schedTrace: log.New(logFile, "", log.LstdFlags), - }, - } - return s +// Initialization +func (s *FirstFit) init(opts ...schedPolicyOption) { + s.base.init(opts...) } func (s *FirstFit) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) s.tasksCreated++ - if !s.RecordPCP { + if !*s.RecordPCP { // Turn on logging - s.RecordPCP = true + *s.RecordPCP = true time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts } diff --git a/schedulers/helpers.go b/schedulers/helpers.go index b7c5563..a0a6338 100644 --- a/schedulers/helpers.go +++ b/schedulers/helpers.go @@ -2,8 +2,11 @@ package schedulers import ( "bitbucket.org/sunybingcloud/elektron/constants" + "bitbucket.org/sunybingcloud/elektron/def" + "errors" "fmt" "log" + "os" ) func coLocated(tasks map[string]bool) { @@ -24,3 +27,83 @@ func hostToPowerClass(hostName string) string { } return "" } + +// scheduler policy options to help initialize schedulers +type schedPolicyOption func(e ElectronScheduler) error + +func WithTasks(ts []def.Task) schedPolicyOption { + return func(s ElectronScheduler) error { + if ts == nil { + return errors.New("Task[] is empty.") + } else { + s.(*base).tasks = ts + return nil + } + } +} + +func WithWattsAsAResource(waar bool) schedPolicyOption { + return func(s ElectronScheduler) error { + s.(*base).wattsAsAResource = waar + return nil + } +} + +func WithClassMapWatts(cmw bool) schedPolicyOption { + return func(s ElectronScheduler) error { + s.(*base).classMapWatts = cmw + return nil + } +} + +func WithRecordPCP(recordPCP *bool) schedPolicyOption { + return func(s ElectronScheduler) error { + s.(*base).RecordPCP = recordPCP + return nil + } +} + +func WithSchedTracePrefix(schedTracePrefix string) schedPolicyOption { + return func(s ElectronScheduler) error { + logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") + if err != nil { + return err + } else { + s.(*base).schedTrace = log.New(logFile, "", log.LstdFlags) + return nil + } + } +} + +func WithShutdown(shutdown chan struct{}) schedPolicyOption { + return func(s ElectronScheduler) error { + if shutdown == nil { + return errors.New("Shutdown channel is nil.") + } else { + s.(*base).Shutdown = shutdown + return nil + } + } +} + +func WithDone(done chan struct{}) schedPolicyOption { + return func(s ElectronScheduler) error { + if done == nil { + return errors.New("Done channel is nil.") + } else { + s.(*base).Done = done + return nil + } + } +} + +func WithPCPLog(pcpLog chan struct{}) schedPolicyOption { + return func(s ElectronScheduler) error { + if pcpLog == nil { + return errors.New("PCPLog channel is nil.") + } else { + s.(*base).PCPLog = pcpLog + return nil + } + } +} diff --git a/schedulers/max-greedymins.go b/schedulers/max-greedymins.go index 64584d0..5ca6013 100644 --- a/schedulers/max-greedymins.go +++ b/schedulers/max-greedymins.go @@ -10,7 +10,6 @@ import ( "github.com/mesos/mesos-go/mesosutil" sched "github.com/mesos/mesos-go/scheduler" "log" - "os" "time" ) @@ -38,29 +37,9 @@ type MaxGreedyMins struct { base //Type embedding to inherit common functions } -// New elektron scheduler -func NewMaxGreedyMins(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *MaxGreedyMins { - def.SortTasks(tasks, def.SortByWatts) - - logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") - if err != nil { - log.Fatal(err) - } - - s := &MaxGreedyMins{ - base: base{ - tasks: tasks, - wattsAsAResource: wattsAsAResource, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - RecordPCP: false, - schedTrace: log.New(logFile, "", log.LstdFlags), - }, - } - return s +// Initialization +func (s *MaxGreedyMins) init(opts ...schedPolicyOption) { + s.base.init(opts...) } func (s *MaxGreedyMins) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { @@ -68,9 +47,9 @@ func (s *MaxGreedyMins) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskIn s.tasksCreated++ // Start recording only when we're creating the first task - if !s.RecordPCP { + if !*s.RecordPCP { // Turn on logging - s.RecordPCP = true + *s.RecordPCP = true time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts } diff --git a/schedulers/max-min.go b/schedulers/max-min.go index e9e11be..587f358 100644 --- a/schedulers/max-min.go +++ b/schedulers/max-min.go @@ -10,7 +10,6 @@ import ( "github.com/mesos/mesos-go/mesosutil" sched "github.com/mesos/mesos-go/scheduler" "log" - "os" "time" ) @@ -38,29 +37,9 @@ type MaxMin struct { base //Type embedding to inherit common functions } -// New elektron scheduler -func NewMaxMin(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *MaxMin { - def.SortTasks(tasks, def.SortByWatts) - - logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") - if err != nil { - log.Fatal(err) - } - - s := &MaxMin{ - base: base{ - tasks: tasks, - wattsAsAResource: wattsAsAResource, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - RecordPCP: false, - schedTrace: log.New(logFile, "", log.LstdFlags), - }, - } - return s +// Initialization +func (s *MaxMin) init(opts ...schedPolicyOption) { + s.base.init(opts...) } func (s *MaxMin) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { @@ -68,9 +47,9 @@ func (s *MaxMin) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { s.tasksCreated++ // Start recording only when we're creating the first task - if !s.RecordPCP { + if !*s.RecordPCP { // Turn on logging - s.RecordPCP = true + *s.RecordPCP = true time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts } diff --git a/schedulers/store.go b/schedulers/store.go new file mode 100644 index 0000000..f0b2204 --- /dev/null +++ b/schedulers/store.go @@ -0,0 +1,30 @@ +package schedulers + +import "github.com/mesos/mesos-go/scheduler" + +// Names of different scheduling policies. +const ( + ff = "first-fit" + bp = "bin-packing" + mgm = "max-greedymins" + mm = "max-min" +) + +// Scheduler class factory +var Schedulers map[string]scheduler.Scheduler = map[string]scheduler.Scheduler{ + ff: &FirstFit{base: base{}}, + bp: &BinPacking{base: base{}}, + mgm: &MaxGreedyMins{base: base{}}, + mm: &MaxMin{base: base{}}, +} + +// build the scheduling policy with the options being applied +func BuildSchedPolicy(s scheduler.Scheduler, opts ...schedPolicyOption) { + s.(ElectronScheduler).init(opts...) +} + +func SchedFactory(schedPolicyName string, opts ...schedPolicyOption) scheduler.Scheduler { + s := Schedulers[schedPolicyName] + BuildSchedPolicy(s, opts...) + return s +}