scheduling policies pluggable from commandline

This commit is contained in:
Pradyumna Kaushik 2017-09-26 13:17:47 -04:00
parent 051aca4d10
commit 04f24beac5
10 changed files with 216 additions and 121 deletions

View file

@ -1,8 +1,8 @@
package pcp package pcp
import ( import (
"bitbucket.org/sunybingcloud/elektron/rapl"
"bitbucket.org/sunybingcloud/elektron/pcp" "bitbucket.org/sunybingcloud/elektron/pcp"
"bitbucket.org/sunybingcloud/elektron/rapl"
"bufio" "bufio"
"container/ring" "container/ring"
"log" "log"

View file

@ -2,8 +2,8 @@ package pcp
import ( import (
"bitbucket.org/sunybingcloud/elektron/constants" "bitbucket.org/sunybingcloud/elektron/constants"
"bitbucket.org/sunybingcloud/elektron/rapl"
"bitbucket.org/sunybingcloud/elektron/pcp" "bitbucket.org/sunybingcloud/elektron/pcp"
"bitbucket.org/sunybingcloud/elektron/rapl"
"bitbucket.org/sunybingcloud/elektron/utilities" "bitbucket.org/sunybingcloud/elektron/utilities"
"bufio" "bufio"
"container/ring" "container/ring"

View file

@ -17,11 +17,17 @@ import (
var master = flag.String("master", "<mesos-master>:5050", "Location of leading Mesos master") var master = flag.String("master", "<mesos-master>:5050", "Location of leading Mesos master")
var tasksFile = flag.String("workload", "", "JSON file containing task definitions") var tasksFile = flag.String("workload", "", "JSON file containing task definitions")
var wattsAsAResource = flag.Bool("wattsAsAResource", false, "Enable Watts as a Resource. This allows the usage of the Watts attribute (if present) in the workload definition during offer matching.") var wattsAsAResource = flag.Bool("wattsAsAResource", false, "Enable Watts as a Resource. "+
"This allows the usage of the Watts attribute (if present) in the workload definition during offer matching.")
var pcplogPrefix = flag.String("logPrefix", "", "Prefix for PCP log file") var pcplogPrefix = flag.String("logPrefix", "", "Prefix for PCP log file")
var hiThreshold = flag.Float64("hiThreshold", 0.0, "Upperbound for Cluster average historical power consumption, beyond which extrema/progressive-extrema would start power-capping") var hiThreshold = flag.Float64("hiThreshold", 0.0, "Upperbound for Cluster average historical power consumption, "+
var loThreshold = flag.Float64("loThreshold", 0.0, "Lowerbound for Cluster average historical power consumption, below which extrema/progressive-extrema would stop power-capping") "beyond which extrema/progressive-extrema would start power-capping")
var loThreshold = flag.Float64("loThreshold", 0.0, "Lowerbound for Cluster average historical power consumption, "+
"below which extrema/progressive-extrema would stop power-capping")
var classMapWatts = flag.Bool("classMapWatts", false, "Enable mapping of watts to powerClass of node") var classMapWatts = flag.Bool("classMapWatts", false, "Enable mapping of watts to powerClass of node")
var schedPolicyName = flag.String("schedPolicy", "first-fit", "Name of the scheduling policy to be used (default = first-fit).\n "+
"Use option -listSchedPolicies to get the names of available scheduling policies")
var listSchedPolicies = flag.Bool("listSchedPolicies", false, "Names of the pluaggable scheduling policies.")
// Short hand args // Short hand args
func init() { func init() {
@ -35,11 +41,39 @@ func init() {
flag.Float64Var(loThreshold, "lt", 400.0, "Lowerbound for Cluster average historical power consumption, "+ flag.Float64Var(loThreshold, "lt", 400.0, "Lowerbound for Cluster average historical power consumption, "+
"below which extrema/progressive-extrema would stop power-capping (shorthand)") "below which extrema/progressive-extrema would stop power-capping (shorthand)")
flag.BoolVar(classMapWatts, "cmw", false, "Enable mapping of watts to powerClass of node (shorthand)") flag.BoolVar(classMapWatts, "cmw", false, "Enable mapping of watts to powerClass of node (shorthand)")
flag.StringVar(schedPolicyName, "sp", "first-fit", "Name of the scheduling policy to be used (default = first-fit).\n "+
"Use option -listSchedPolicies to get the names of available scheduling policies (shorthand)")
flag.BoolVar(listSchedPolicies, "lsp", false, "Names of the pluaggable scheduling policies. (shorthand)")
}
func listAllSchedulingPolicies() {
fmt.Println("Scheduling Policies")
fmt.Println("-------------------")
for policyName, _ := range schedulers.Schedulers {
fmt.Println(policyName)
}
} }
func main() { func main() {
flag.Parse() flag.Parse()
// checking to see if we need to just list the pluggable scheduling policies
if *listSchedPolicies {
listAllSchedulingPolicies()
os.Exit(1)
}
// If non-default scheduling policy given,
// checking if scheduling policyName exists
if *schedPolicyName != "first-fit" {
if _, ok := schedulers.Schedulers[*schedPolicyName]; !ok {
// invalid scheduling policy
log.Println("Invalid scheduling policy given. The possible scheduling policies are:")
listAllSchedulingPolicies()
os.Exit(1)
}
}
if *tasksFile == "" { if *tasksFile == "" {
fmt.Println("No file containing tasks specifiction provided.") fmt.Println("No file containing tasks specifiction provided.")
os.Exit(1) os.Exit(1)
@ -63,7 +97,19 @@ func main() {
startTime := time.Now().Format("20060102150405") startTime := time.Now().Format("20060102150405")
logPrefix := *pcplogPrefix + "_" + startTime logPrefix := *pcplogPrefix + "_" + startTime
scheduler := schedulers.NewFirstFit(tasks, *wattsAsAResource, logPrefix, *classMapWatts) shutdown := make(chan struct{})
done := make(chan struct{})
pcpLog := make(chan struct{})
recordPCP := false
scheduler := schedulers.SchedFactory(*schedPolicyName,
schedulers.WithTasks(tasks),
schedulers.WithWattsAsAResource(*wattsAsAResource),
schedulers.WithClassMapWatts(*classMapWatts),
schedulers.WithSchedTracePrefix(logPrefix),
schedulers.WithRecordPCP(&recordPCP),
schedulers.WithShutdown(shutdown),
schedulers.WithDone(done),
schedulers.WithPCPLog(pcpLog))
driver, err := sched.NewMesosSchedulerDriver(sched.DriverConfig{ driver, err := sched.NewMesosSchedulerDriver(sched.DriverConfig{
Master: *master, Master: *master,
Framework: &mesos.FrameworkInfo{ Framework: &mesos.FrameworkInfo{
@ -77,9 +123,9 @@ func main() {
return return
} }
go pcp.Start(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix) go pcp.Start(pcpLog, &recordPCP, logPrefix)
//go pcp.StartPCPLogAndExtremaDynamicCap(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix, *hiThreshold, *loThreshold) //go pcp.StartPCPLogAndExtremaDynamicCap(pcpLog, &recordPCP, logPrefix, *hiThreshold, *loThreshold)
//go pcp.StartPCPLogAndProgressiveExtremaCap(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix, *hiThreshold, *loThreshold) //go pcp.StartPCPLogAndProgressiveExtremaCap(pcpLog, &recordPCP, logPrefix, *hiThreshold, *loThreshold)
time.Sleep(1 * time.Second) // Take a second between starting PCP log and continuing time.Sleep(1 * time.Second) // Take a second between starting PCP log and continuing
// Attempt to handle SIGINT to not leave pmdumptext running // Attempt to handle SIGINT to not leave pmdumptext running
@ -89,26 +135,26 @@ func main() {
signal.Notify(c, os.Interrupt, os.Kill) signal.Notify(c, os.Interrupt, os.Kill)
s := <-c s := <-c
if s != os.Interrupt { if s != os.Interrupt {
close(scheduler.PCPLog) close(pcpLog)
return return
} }
log.Printf("Received SIGINT...stopping") log.Printf("Received SIGINT...stopping")
close(scheduler.Done) close(done)
}() }()
go func() { go func() {
// Signals we have scheduled every task we have // Signals we have scheduled every task we have
select { select {
case <-scheduler.Shutdown: case <-shutdown:
//case <-time.After(shutdownTimeout): //case <-time.After(shutdownTimeout):
} }
// All tasks have finished // All tasks have finished
select { select {
case <-scheduler.Done: case <-done:
close(scheduler.PCPLog) close(pcpLog)
time.Sleep(5 * time.Second) //Wait for PCP to log a few more seconds time.Sleep(5 * time.Second) //Wait for PCP to log a few more seconds
//case <-time.After(shutdownTimeout): //case <-time.After(shutdownTimeout):
} }

View file

@ -7,7 +7,14 @@ import (
"log" "log"
) )
// Implements mesos scheduler.
type ElectronScheduler interface {
sched.Scheduler
init(opts ...schedPolicyOption)
}
type base struct { type base struct {
ElectronScheduler
tasksCreated int tasksCreated int
tasksRunning int tasksRunning int
tasks []def.Task tasks []def.Task
@ -18,7 +25,7 @@ type base struct {
// First set of PCP values are garbage values, signal to logger to start recording when we're // First set of PCP values are garbage values, signal to logger to start recording when we're
// about to schedule a new task // about to schedule a new task
RecordPCP bool RecordPCP *bool
// This channel is closed when the program receives an interrupt, // This channel is closed when the program receives an interrupt,
// signalling that the program should shut down. // signalling that the program should shut down.
@ -33,6 +40,16 @@ type base struct {
schedTrace *log.Logger schedTrace *log.Logger
} }
func (s *base) init(opts ...schedPolicyOption) {
for _, opt := range opts {
// applying options
if err := opt(s); err != nil {
log.Fatal(err)
}
}
s.running = make(map[string]map[string]bool)
}
func (s *base) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) { func (s *base) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) {
log.Printf("Offer %s rescinded", offerID) log.Printf("Offer %s rescinded", offerID)
} }

View file

@ -10,7 +10,6 @@ import (
"github.com/mesos/mesos-go/mesosutil" "github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler" sched "github.com/mesos/mesos-go/scheduler"
"log" "log"
"os"
"time" "time"
) )
@ -37,38 +36,20 @@ type BinPacking struct {
base // Type embedded to inherit common functions base // Type embedded to inherit common functions
} }
// New elektron scheduler // Initialization
func NewBinPacking(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *BinPacking { func (s *BinPacking) init(opts ...schedPolicyOption) {
def.SortTasks(tasks, def.SortByWatts) s.base.init(opts...)
// sorting the tasks based on watts
logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") def.SortTasks(s.tasks, def.SortByWatts)
if err != nil {
log.Fatal(err)
}
s := &BinPacking{
base: base{
tasks: tasks,
wattsAsAResource: wattsAsAResource,
classMapWatts: classMapWatts,
Shutdown: make(chan struct{}),
Done: make(chan struct{}),
PCPLog: make(chan struct{}),
running: make(map[string]map[string]bool),
RecordPCP: false,
schedTrace: log.New(logFile, "", log.LstdFlags),
},
}
return s
} }
func (s *BinPacking) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { func (s *BinPacking) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
s.tasksCreated++ s.tasksCreated++
if !s.RecordPCP { if !*s.RecordPCP {
// Turn on logging // Turn on logging
s.RecordPCP = true *s.RecordPCP = true
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
} }

View file

@ -10,7 +10,6 @@ import (
"github.com/mesos/mesos-go/mesosutil" "github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler" sched "github.com/mesos/mesos-go/scheduler"
"log" "log"
"os"
"time" "time"
) )
@ -38,37 +37,18 @@ type FirstFit struct {
base // Type embedded to inherit common functions base // Type embedded to inherit common functions
} }
// New elektron scheduler // Initialization
func NewFirstFit(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *FirstFit { func (s *FirstFit) init(opts ...schedPolicyOption) {
s.base.init(opts...)
logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log")
if err != nil {
log.Fatal(err)
}
s := &FirstFit{
base: base{
tasks: tasks,
wattsAsAResource: wattsAsAResource,
classMapWatts: classMapWatts,
Shutdown: make(chan struct{}),
Done: make(chan struct{}),
PCPLog: make(chan struct{}),
running: make(map[string]map[string]bool),
RecordPCP: false,
schedTrace: log.New(logFile, "", log.LstdFlags),
},
}
return s
} }
func (s *FirstFit) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { func (s *FirstFit) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
s.tasksCreated++ s.tasksCreated++
if !s.RecordPCP { if !*s.RecordPCP {
// Turn on logging // Turn on logging
s.RecordPCP = true *s.RecordPCP = true
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
} }

View file

@ -2,8 +2,11 @@ package schedulers
import ( import (
"bitbucket.org/sunybingcloud/elektron/constants" "bitbucket.org/sunybingcloud/elektron/constants"
"bitbucket.org/sunybingcloud/elektron/def"
"errors"
"fmt" "fmt"
"log" "log"
"os"
) )
func coLocated(tasks map[string]bool) { func coLocated(tasks map[string]bool) {
@ -24,3 +27,83 @@ func hostToPowerClass(hostName string) string {
} }
return "" return ""
} }
// scheduler policy options to help initialize schedulers
type schedPolicyOption func(e ElectronScheduler) error
func WithTasks(ts []def.Task) schedPolicyOption {
return func(s ElectronScheduler) error {
if ts == nil {
return errors.New("Task[] is empty.")
} else {
s.(*base).tasks = ts
return nil
}
}
}
func WithWattsAsAResource(waar bool) schedPolicyOption {
return func(s ElectronScheduler) error {
s.(*base).wattsAsAResource = waar
return nil
}
}
func WithClassMapWatts(cmw bool) schedPolicyOption {
return func(s ElectronScheduler) error {
s.(*base).classMapWatts = cmw
return nil
}
}
func WithRecordPCP(recordPCP *bool) schedPolicyOption {
return func(s ElectronScheduler) error {
s.(*base).RecordPCP = recordPCP
return nil
}
}
func WithSchedTracePrefix(schedTracePrefix string) schedPolicyOption {
return func(s ElectronScheduler) error {
logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log")
if err != nil {
return err
} else {
s.(*base).schedTrace = log.New(logFile, "", log.LstdFlags)
return nil
}
}
}
func WithShutdown(shutdown chan struct{}) schedPolicyOption {
return func(s ElectronScheduler) error {
if shutdown == nil {
return errors.New("Shutdown channel is nil.")
} else {
s.(*base).Shutdown = shutdown
return nil
}
}
}
func WithDone(done chan struct{}) schedPolicyOption {
return func(s ElectronScheduler) error {
if done == nil {
return errors.New("Done channel is nil.")
} else {
s.(*base).Done = done
return nil
}
}
}
func WithPCPLog(pcpLog chan struct{}) schedPolicyOption {
return func(s ElectronScheduler) error {
if pcpLog == nil {
return errors.New("PCPLog channel is nil.")
} else {
s.(*base).PCPLog = pcpLog
return nil
}
}
}

View file

@ -10,7 +10,6 @@ import (
"github.com/mesos/mesos-go/mesosutil" "github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler" sched "github.com/mesos/mesos-go/scheduler"
"log" "log"
"os"
"time" "time"
) )
@ -38,29 +37,9 @@ type MaxGreedyMins struct {
base //Type embedding to inherit common functions base //Type embedding to inherit common functions
} }
// New elektron scheduler // Initialization
func NewMaxGreedyMins(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *MaxGreedyMins { func (s *MaxGreedyMins) init(opts ...schedPolicyOption) {
def.SortTasks(tasks, def.SortByWatts) s.base.init(opts...)
logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log")
if err != nil {
log.Fatal(err)
}
s := &MaxGreedyMins{
base: base{
tasks: tasks,
wattsAsAResource: wattsAsAResource,
classMapWatts: classMapWatts,
Shutdown: make(chan struct{}),
Done: make(chan struct{}),
PCPLog: make(chan struct{}),
running: make(map[string]map[string]bool),
RecordPCP: false,
schedTrace: log.New(logFile, "", log.LstdFlags),
},
}
return s
} }
func (s *MaxGreedyMins) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { func (s *MaxGreedyMins) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
@ -68,9 +47,9 @@ func (s *MaxGreedyMins) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskIn
s.tasksCreated++ s.tasksCreated++
// Start recording only when we're creating the first task // Start recording only when we're creating the first task
if !s.RecordPCP { if !*s.RecordPCP {
// Turn on logging // Turn on logging
s.RecordPCP = true *s.RecordPCP = true
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
} }

View file

@ -10,7 +10,6 @@ import (
"github.com/mesos/mesos-go/mesosutil" "github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler" sched "github.com/mesos/mesos-go/scheduler"
"log" "log"
"os"
"time" "time"
) )
@ -38,29 +37,9 @@ type MaxMin struct {
base //Type embedding to inherit common functions base //Type embedding to inherit common functions
} }
// New elektron scheduler // Initialization
func NewMaxMin(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *MaxMin { func (s *MaxMin) init(opts ...schedPolicyOption) {
def.SortTasks(tasks, def.SortByWatts) s.base.init(opts...)
logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log")
if err != nil {
log.Fatal(err)
}
s := &MaxMin{
base: base{
tasks: tasks,
wattsAsAResource: wattsAsAResource,
classMapWatts: classMapWatts,
Shutdown: make(chan struct{}),
Done: make(chan struct{}),
PCPLog: make(chan struct{}),
running: make(map[string]map[string]bool),
RecordPCP: false,
schedTrace: log.New(logFile, "", log.LstdFlags),
},
}
return s
} }
func (s *MaxMin) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { func (s *MaxMin) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
@ -68,9 +47,9 @@ func (s *MaxMin) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
s.tasksCreated++ s.tasksCreated++
// Start recording only when we're creating the first task // Start recording only when we're creating the first task
if !s.RecordPCP { if !*s.RecordPCP {
// Turn on logging // Turn on logging
s.RecordPCP = true *s.RecordPCP = true
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
} }

30
schedulers/store.go Normal file
View file

@ -0,0 +1,30 @@
package schedulers
import "github.com/mesos/mesos-go/scheduler"
// Names of different scheduling policies.
const (
ff = "first-fit"
bp = "bin-packing"
mgm = "max-greedymins"
mm = "max-min"
)
// Scheduler class factory
var Schedulers map[string]scheduler.Scheduler = map[string]scheduler.Scheduler{
ff: &FirstFit{base: base{}},
bp: &BinPacking{base: base{}},
mgm: &MaxGreedyMins{base: base{}},
mm: &MaxMin{base: base{}},
}
// build the scheduling policy with the options being applied
func BuildSchedPolicy(s scheduler.Scheduler, opts ...schedPolicyOption) {
s.(ElectronScheduler).init(opts...)
}
func SchedFactory(schedPolicyName string, opts ...schedPolicyOption) scheduler.Scheduler {
s := Schedulers[schedPolicyName]
BuildSchedPolicy(s, opts...)
return s
}