diff --git a/scheduler.go b/scheduler.go index 3951494..c532ae4 100644 --- a/scheduler.go +++ b/scheduler.go @@ -27,6 +27,7 @@ var classMapWatts = flag.Bool("classMapWatts", false, "Enable mapping of watts t var schedPolicyName = flag.String("schedPolicy", "first-fit", "Name of the scheduling policy to be used.\n\tUse option -listSchedPolicies to get the names of available scheduling policies") var listSchedPolicies = flag.Bool("listSchedPolicies", false, "List the names of the pluaggable scheduling policies.") var enableSchedPolicySwitch = flag.Bool("switchSchedPolicy", false, "Enable switching of scheduling policies at runtime.") +var schedPolConfigFile = flag.String("schedPolConfig", "", "Config file that contains information for each scheduling policy.") // Short hand args func init() { @@ -40,6 +41,7 @@ func init() { flag.StringVar(schedPolicyName, "sp", "first-fit", "Name of the scheduling policy to be used.\n Use option -listSchedPolicies to get the names of available scheduling policies (shorthand)") flag.BoolVar(listSchedPolicies, "lsp", false, "Names of the pluaggable scheduling policies. (shorthand)") flag.BoolVar(enableSchedPolicySwitch, "ssp", false, "Enable switching of scheduling policies at runtime.") + flag.StringVar(schedPolConfigFile, "spConfig", "", "Config file that contains information for each scheduling policy (shorthand).") } func listAllSchedulingPolicies() { @@ -110,6 +112,15 @@ func main() { fmt.Println(task) } + if *enableSchedPolicySwitch { + if spcf := *schedPolConfigFile; spcf == "" { + logger.WriteLog(elecLogDef.ERROR, "No file containing characteristics for scheduling policies") + } else { + // Initializing the characteristics of the scheduling policies. + schedulers.InitSchedPolicyCharacteristics(spcf) + } + } + shutdown := make(chan struct{}) done := make(chan struct{}) pcpLog := make(chan struct{}) diff --git a/schedulers/base.go b/schedulers/base.go index cc74215..3a624c2 100644 --- a/schedulers/base.go +++ b/schedulers/base.go @@ -74,7 +74,7 @@ type BaseScheduler struct { hasReceivedResourceOffers bool } -func (s *BaseScheduler) init(opts ...schedPolicyOption) { +func (s *BaseScheduler) init(opts ...schedulerOptions) { for _, opt := range opts { // applying options if err := opt(s); err != nil { diff --git a/schedulers/electronScheduler.go b/schedulers/electronScheduler.go index c9d93f3..0c64bbb 100644 --- a/schedulers/electronScheduler.go +++ b/schedulers/electronScheduler.go @@ -10,7 +10,7 @@ import ( // Implements mesos scheduler. type ElectronScheduler interface { sched.Scheduler - init(opts ...schedPolicyOption) + init(opts ...schedulerOptions) // Interface for log messages. // Every ElectronScheduler implementer should provide definitions for these functions. diff --git a/schedulers/helpers.go b/schedulers/helpers.go index a820837..91a1d79 100644 --- a/schedulers/helpers.go +++ b/schedulers/helpers.go @@ -31,9 +31,9 @@ func hostToPowerClass(hostName string) string { } // scheduler policy options to help initialize schedulers -type schedPolicyOption func(e ElectronScheduler) error +type schedulerOptions func(e ElectronScheduler) error -func WithSchedPolicy(schedPolicyName string) schedPolicyOption { +func WithSchedPolicy(schedPolicyName string) schedulerOptions { return func(s ElectronScheduler) error { if schedPolicy, ok := SchedPolicies[schedPolicyName]; !ok { return errors.New("Incorrect scheduling policy.") @@ -44,7 +44,7 @@ func WithSchedPolicy(schedPolicyName string) schedPolicyOption { } } -func WithTasks(ts []def.Task) schedPolicyOption { +func WithTasks(ts []def.Task) schedulerOptions { return func(s ElectronScheduler) error { if ts == nil { return errors.New("Task[] is empty.") @@ -55,28 +55,28 @@ func WithTasks(ts []def.Task) schedPolicyOption { } } -func WithWattsAsAResource(waar bool) schedPolicyOption { +func WithWattsAsAResource(waar bool) schedulerOptions { return func(s ElectronScheduler) error { s.(*BaseScheduler).wattsAsAResource = waar return nil } } -func WithClassMapWatts(cmw bool) schedPolicyOption { +func WithClassMapWatts(cmw bool) schedulerOptions { return func(s ElectronScheduler) error { s.(*BaseScheduler).classMapWatts = cmw return nil } } -func WithRecordPCP(recordPCP *bool) schedPolicyOption { +func WithRecordPCP(recordPCP *bool) schedulerOptions { return func(s ElectronScheduler) error { s.(*BaseScheduler).RecordPCP = recordPCP return nil } } -func WithShutdown(shutdown chan struct{}) schedPolicyOption { +func WithShutdown(shutdown chan struct{}) schedulerOptions { return func(s ElectronScheduler) error { if shutdown == nil { return errors.New("Shutdown channel is nil.") @@ -87,7 +87,7 @@ func WithShutdown(shutdown chan struct{}) schedPolicyOption { } } -func WithDone(done chan struct{}) schedPolicyOption { +func WithDone(done chan struct{}) schedulerOptions { return func(s ElectronScheduler) error { if done == nil { return errors.New("Done channel is nil.") @@ -98,7 +98,7 @@ func WithDone(done chan struct{}) schedPolicyOption { } } -func WithPCPLog(pcpLog chan struct{}) schedPolicyOption { +func WithPCPLog(pcpLog chan struct{}) schedulerOptions { return func(s ElectronScheduler) error { if pcpLog == nil { return errors.New("PCPLog channel is nil.") @@ -109,7 +109,7 @@ func WithPCPLog(pcpLog chan struct{}) schedPolicyOption { } } -func WithLoggingChannels(lmt chan elecLogDef.LogMessageType, msg chan string) schedPolicyOption { +func WithLoggingChannels(lmt chan elecLogDef.LogMessageType, msg chan string) schedulerOptions { return func(s ElectronScheduler) error { s.(*BaseScheduler).logMsgType = lmt s.(*BaseScheduler).logMsg = msg @@ -117,7 +117,7 @@ func WithLoggingChannels(lmt chan elecLogDef.LogMessageType, msg chan string) sc } } -func WithSchedPolSwitchEnabled(enableSchedPolicySwitch bool) schedPolicyOption { +func WithSchedPolSwitchEnabled(enableSchedPolicySwitch bool) schedulerOptions { return func(s ElectronScheduler) error { s.(*BaseScheduler).schedPolSwitchEnabled = enableSchedPolicySwitch return nil diff --git a/schedulers/schedPolicy.go b/schedulers/schedPolicy.go index 2822330..1dec900 100644 --- a/schedulers/schedPolicy.go +++ b/schedulers/schedPolicy.go @@ -20,6 +20,13 @@ type baseSchedPolicyState struct { SchedPolicyState // Keep track of the number of tasks that have been scheduled. numTasksScheduled int + // Distribution of tasks that the scheduling policy is most appropriate for. + // This distribution corresponds to the ratio of low power consuming tasks to + // high power consuming tasks. + TaskDistribution float64 `json:"taskDist"` + // The average variance in cpu-share per task that this scheduling policy can cause. + // Note: This number corresponds to a given workload. + VarianceCpuSharePerTask float64 `json:"varCpuShare"` } func (bsps *baseSchedPolicyState) switchIfNecessary(spc SchedPolicyContext) { diff --git a/schedulers/store.go b/schedulers/store.go index b13ad76..1e40ba6 100644 --- a/schedulers/store.go +++ b/schedulers/store.go @@ -1,7 +1,10 @@ package schedulers import ( + "encoding/json" sched "github.com/mesos/mesos-go/api/v0/scheduler" + "github.com/pkg/errors" + "os" ) // Names of different scheduling policies. @@ -20,12 +23,46 @@ var SchedPolicies map[string]SchedPolicyState = map[string]SchedPolicyState{ mm: &MaxMin{}, } -// build the scheduling policy with the options being applied -func buildScheduler(s sched.Scheduler, opts ...schedPolicyOption) { +// Initialize scheduling policy characteristics using the provided config file. +func InitSchedPolicyCharacteristics(schedPoliciesConfigFilename string) error { + var schedPolConfig map[string]baseSchedPolicyState + if file, err := os.Open(schedPoliciesConfigFilename); err != nil { + return errors.Wrap(err, "Error opening file") + } else { + err := json.NewDecoder(file).Decode(&schedPolConfig) + if err != nil { + return errors.Wrap(err, "Error unmarshalling") + } + + // Initializing. + // TODO: Be able to unmarshal a schedPolConfig JSON into any number of scheduling policies. + for schedPolName, schedPolState := range SchedPolicies { + switch t := schedPolState.(type) { + case *FirstFit: + t.TaskDistribution = schedPolConfig[schedPolName].TaskDistribution + t.VarianceCpuSharePerTask = schedPolConfig[schedPolName].VarianceCpuSharePerTask + case *BinPackSortedWatts: + t.TaskDistribution = schedPolConfig[schedPolName].TaskDistribution + t.VarianceCpuSharePerTask = schedPolConfig[schedPolName].VarianceCpuSharePerTask + case *MaxMin: + t.TaskDistribution = schedPolConfig[schedPolName].TaskDistribution + t.VarianceCpuSharePerTask = schedPolConfig[schedPolName].VarianceCpuSharePerTask + case *MaxGreedyMins: + t.TaskDistribution = schedPolConfig[schedPolName].TaskDistribution + t.VarianceCpuSharePerTask = schedPolConfig[schedPolName].VarianceCpuSharePerTask + } + } + } + + return nil +} + +// build the scheduler with the options being applied +func buildScheduler(s sched.Scheduler, opts ...schedulerOptions) { s.(ElectronScheduler).init(opts...) } -func SchedFactory(opts ...schedPolicyOption) sched.Scheduler { +func SchedFactory(opts ...schedulerOptions) sched.Scheduler { s := &BaseScheduler{} buildScheduler(s, opts...) return s