Merged in schedPolSwitchConfigFile (pull request #8)

SchedPolSwitchConfigFile

Approved-by: Akash Kothawale <akothaw1@binghamton.edu>
This commit is contained in:
Pradyumna Kaushik 2018-02-26 01:59:09 +00:00
parent b569bd3060
commit b877d31cb8
6 changed files with 71 additions and 16 deletions

View file

@ -27,6 +27,7 @@ var classMapWatts = flag.Bool("classMapWatts", false, "Enable mapping of watts t
var schedPolicyName = flag.String("schedPolicy", "first-fit", "Name of the scheduling policy to be used.\n\tUse option -listSchedPolicies to get the names of available scheduling policies")
var listSchedPolicies = flag.Bool("listSchedPolicies", false, "List the names of the pluaggable scheduling policies.")
var enableSchedPolicySwitch = flag.Bool("switchSchedPolicy", false, "Enable switching of scheduling policies at runtime.")
var schedPolConfigFile = flag.String("schedPolConfig", "", "Config file that contains information for each scheduling policy.")
// Short hand args
func init() {
@ -40,6 +41,7 @@ func init() {
flag.StringVar(schedPolicyName, "sp", "first-fit", "Name of the scheduling policy to be used.\n Use option -listSchedPolicies to get the names of available scheduling policies (shorthand)")
flag.BoolVar(listSchedPolicies, "lsp", false, "Names of the pluaggable scheduling policies. (shorthand)")
flag.BoolVar(enableSchedPolicySwitch, "ssp", false, "Enable switching of scheduling policies at runtime.")
flag.StringVar(schedPolConfigFile, "spConfig", "", "Config file that contains information for each scheduling policy (shorthand).")
}
func listAllSchedulingPolicies() {
@ -110,6 +112,15 @@ func main() {
fmt.Println(task)
}
if *enableSchedPolicySwitch {
if spcf := *schedPolConfigFile; spcf == "" {
logger.WriteLog(elecLogDef.ERROR, "No file containing characteristics for scheduling policies")
} else {
// Initializing the characteristics of the scheduling policies.
schedulers.InitSchedPolicyCharacteristics(spcf)
}
}
shutdown := make(chan struct{})
done := make(chan struct{})
pcpLog := make(chan struct{})

View file

@ -74,7 +74,7 @@ type BaseScheduler struct {
hasReceivedResourceOffers bool
}
func (s *BaseScheduler) init(opts ...schedPolicyOption) {
func (s *BaseScheduler) init(opts ...schedulerOptions) {
for _, opt := range opts {
// applying options
if err := opt(s); err != nil {

View file

@ -10,7 +10,7 @@ import (
// Implements mesos scheduler.
type ElectronScheduler interface {
sched.Scheduler
init(opts ...schedPolicyOption)
init(opts ...schedulerOptions)
// Interface for log messages.
// Every ElectronScheduler implementer should provide definitions for these functions.

View file

@ -31,9 +31,9 @@ func hostToPowerClass(hostName string) string {
}
// scheduler policy options to help initialize schedulers
type schedPolicyOption func(e ElectronScheduler) error
type schedulerOptions func(e ElectronScheduler) error
func WithSchedPolicy(schedPolicyName string) schedPolicyOption {
func WithSchedPolicy(schedPolicyName string) schedulerOptions {
return func(s ElectronScheduler) error {
if schedPolicy, ok := SchedPolicies[schedPolicyName]; !ok {
return errors.New("Incorrect scheduling policy.")
@ -44,7 +44,7 @@ func WithSchedPolicy(schedPolicyName string) schedPolicyOption {
}
}
func WithTasks(ts []def.Task) schedPolicyOption {
func WithTasks(ts []def.Task) schedulerOptions {
return func(s ElectronScheduler) error {
if ts == nil {
return errors.New("Task[] is empty.")
@ -55,28 +55,28 @@ func WithTasks(ts []def.Task) schedPolicyOption {
}
}
func WithWattsAsAResource(waar bool) schedPolicyOption {
func WithWattsAsAResource(waar bool) schedulerOptions {
return func(s ElectronScheduler) error {
s.(*BaseScheduler).wattsAsAResource = waar
return nil
}
}
func WithClassMapWatts(cmw bool) schedPolicyOption {
func WithClassMapWatts(cmw bool) schedulerOptions {
return func(s ElectronScheduler) error {
s.(*BaseScheduler).classMapWatts = cmw
return nil
}
}
func WithRecordPCP(recordPCP *bool) schedPolicyOption {
func WithRecordPCP(recordPCP *bool) schedulerOptions {
return func(s ElectronScheduler) error {
s.(*BaseScheduler).RecordPCP = recordPCP
return nil
}
}
func WithShutdown(shutdown chan struct{}) schedPolicyOption {
func WithShutdown(shutdown chan struct{}) schedulerOptions {
return func(s ElectronScheduler) error {
if shutdown == nil {
return errors.New("Shutdown channel is nil.")
@ -87,7 +87,7 @@ func WithShutdown(shutdown chan struct{}) schedPolicyOption {
}
}
func WithDone(done chan struct{}) schedPolicyOption {
func WithDone(done chan struct{}) schedulerOptions {
return func(s ElectronScheduler) error {
if done == nil {
return errors.New("Done channel is nil.")
@ -98,7 +98,7 @@ func WithDone(done chan struct{}) schedPolicyOption {
}
}
func WithPCPLog(pcpLog chan struct{}) schedPolicyOption {
func WithPCPLog(pcpLog chan struct{}) schedulerOptions {
return func(s ElectronScheduler) error {
if pcpLog == nil {
return errors.New("PCPLog channel is nil.")
@ -109,7 +109,7 @@ func WithPCPLog(pcpLog chan struct{}) schedPolicyOption {
}
}
func WithLoggingChannels(lmt chan elecLogDef.LogMessageType, msg chan string) schedPolicyOption {
func WithLoggingChannels(lmt chan elecLogDef.LogMessageType, msg chan string) schedulerOptions {
return func(s ElectronScheduler) error {
s.(*BaseScheduler).logMsgType = lmt
s.(*BaseScheduler).logMsg = msg
@ -117,7 +117,7 @@ func WithLoggingChannels(lmt chan elecLogDef.LogMessageType, msg chan string) sc
}
}
func WithSchedPolSwitchEnabled(enableSchedPolicySwitch bool) schedPolicyOption {
func WithSchedPolSwitchEnabled(enableSchedPolicySwitch bool) schedulerOptions {
return func(s ElectronScheduler) error {
s.(*BaseScheduler).schedPolSwitchEnabled = enableSchedPolicySwitch
return nil

View file

@ -20,6 +20,13 @@ type baseSchedPolicyState struct {
SchedPolicyState
// Keep track of the number of tasks that have been scheduled.
numTasksScheduled int
// Distribution of tasks that the scheduling policy is most appropriate for.
// This distribution corresponds to the ratio of low power consuming tasks to
// high power consuming tasks.
TaskDistribution float64 `json:"taskDist"`
// The average variance in cpu-share per task that this scheduling policy can cause.
// Note: This number corresponds to a given workload.
VarianceCpuSharePerTask float64 `json:"varCpuShare"`
}
func (bsps *baseSchedPolicyState) switchIfNecessary(spc SchedPolicyContext) {

View file

@ -1,7 +1,10 @@
package schedulers
import (
"encoding/json"
sched "github.com/mesos/mesos-go/api/v0/scheduler"
"github.com/pkg/errors"
"os"
)
// Names of different scheduling policies.
@ -20,12 +23,46 @@ var SchedPolicies map[string]SchedPolicyState = map[string]SchedPolicyState{
mm: &MaxMin{},
}
// build the scheduling policy with the options being applied
func buildScheduler(s sched.Scheduler, opts ...schedPolicyOption) {
// Initialize scheduling policy characteristics using the provided config file.
func InitSchedPolicyCharacteristics(schedPoliciesConfigFilename string) error {
var schedPolConfig map[string]baseSchedPolicyState
if file, err := os.Open(schedPoliciesConfigFilename); err != nil {
return errors.Wrap(err, "Error opening file")
} else {
err := json.NewDecoder(file).Decode(&schedPolConfig)
if err != nil {
return errors.Wrap(err, "Error unmarshalling")
}
// Initializing.
// TODO: Be able to unmarshal a schedPolConfig JSON into any number of scheduling policies.
for schedPolName, schedPolState := range SchedPolicies {
switch t := schedPolState.(type) {
case *FirstFit:
t.TaskDistribution = schedPolConfig[schedPolName].TaskDistribution
t.VarianceCpuSharePerTask = schedPolConfig[schedPolName].VarianceCpuSharePerTask
case *BinPackSortedWatts:
t.TaskDistribution = schedPolConfig[schedPolName].TaskDistribution
t.VarianceCpuSharePerTask = schedPolConfig[schedPolName].VarianceCpuSharePerTask
case *MaxMin:
t.TaskDistribution = schedPolConfig[schedPolName].TaskDistribution
t.VarianceCpuSharePerTask = schedPolConfig[schedPolName].VarianceCpuSharePerTask
case *MaxGreedyMins:
t.TaskDistribution = schedPolConfig[schedPolName].TaskDistribution
t.VarianceCpuSharePerTask = schedPolConfig[schedPolName].VarianceCpuSharePerTask
}
}
}
return nil
}
// build the scheduler with the options being applied
func buildScheduler(s sched.Scheduler, opts ...schedulerOptions) {
s.(ElectronScheduler).init(opts...)
}
func SchedFactory(opts ...schedPolicyOption) sched.Scheduler {
func SchedFactory(opts ...schedulerOptions) sched.Scheduler {
s := &BaseScheduler{}
buildScheduler(s, opts...)
return s