diff --git a/schedulers/base.go b/schedulers/base.go index 408082e..dfc0beb 100644 --- a/schedulers/base.go +++ b/schedulers/base.go @@ -4,9 +4,34 @@ import ( mesos "github.com/mesos/mesos-go/mesosproto" sched "github.com/mesos/mesos-go/scheduler" "log" + "bitbucket.org/sunybingcloud/electron/def" ) -type base struct{} +type base struct{ + tasksCreated int + tasksRunning int + tasks []def.Task + metrics map[string]def.Metric + running map[string]map[string]bool + wattsAsAResource bool + classMapWatts bool + + // First set of PCP values are garbage values, signal to logger to start recording when we're + // about to schedule a new task + RecordPCP bool + + // This channel is closed when the program receives an interrupt, + // signalling that the program should shut down. + Shutdown chan struct{} + // This channel is closed after shutdown is closed, and only when all + // outstanding tasks have been cleaned up + Done chan struct{} + + // Controls when to shutdown pcp logging + PCPLog chan struct{} + + schedTrace *log.Logger +} func (s *base) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) { log.Printf("Offer %s rescinded", offerID) diff --git a/schedulers/binPackSortedWattsSortedOffers.go b/schedulers/binPackSortedWattsSortedOffers.go index 9c27aad..700147e 100644 --- a/schedulers/binPackSortedWattsSortedOffers.go +++ b/schedulers/binPackSortedWattsSortedOffers.go @@ -36,30 +36,7 @@ func (s *BinPackSortedWattsSortedOffers) takeOffer(offer *mesos.Offer, task def. } type BinPackSortedWattsSortedOffers struct { - base // Type embedded to inherit common functions - tasksCreated int - tasksRunning int - tasks []def.Task - metrics map[string]def.Metric - running map[string]map[string]bool - wattsAsAResource bool - classMapWatts bool - - // First set of PCP values are garbage values, signal to logger to start recording when we're - // about to schedule a new task - RecordPCP bool - - // This channel is closed when the program receives an interrupt, - // signalling that the program should shut down. - Shutdown chan struct{} - // This channel is closed after shutdown is closed, and only when all - // outstanding tasks have been cleaned up - Done chan struct{} - - // Controls when to shutdown pcp logging - PCPLog chan struct{} - - schedTrace *log.Logger + base // Type embedded to inherit common functions } // New electron scheduler @@ -73,15 +50,17 @@ func NewBinPackSortedWattsSortedOffers(tasks []def.Task, wattsAsAResource bool, } s := &BinPackSortedWattsSortedOffers{ - tasks: tasks, - wattsAsAResource: wattsAsAResource, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - RecordPCP: false, - schedTrace: log.New(logFile, "", log.LstdFlags), + base: base{ + tasks: tasks, + wattsAsAResource: wattsAsAResource, + classMapWatts: classMapWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + RecordPCP: false, + schedTrace: log.New(logFile, "", log.LstdFlags), + }, } return s } diff --git a/schedulers/binpackedpistoncapping.go b/schedulers/binpackedpistoncapping.go index ca5ab5a..a9a3fab 100644 --- a/schedulers/binpackedpistoncapping.go +++ b/schedulers/binpackedpistoncapping.go @@ -27,35 +27,11 @@ import ( corresponding to the load on that node. */ type BinPackedPistonCapper struct { - base // Type embedded to inherit common functions - tasksCreated int - tasksRunning int - tasks []def.Task - metrics map[string]def.Metric - running map[string]map[string]bool - taskMonitor map[string][]def.Task - totalPower map[string]float64 - wattsAsAResource bool - classMapWatts bool - ticker *time.Ticker - isCapping bool - - // First set of PCP values are garbage values, signal to logger to start recording when we're - // about to schedule the new task. - RecordPCP bool - - // This channel is closed when the program receives an interrupt, - // signalling that the program should shut down. - Shutdown chan struct{} - - // This channel is closed after shutdown is closed, and only when all - // outstanding tasks have been cleaned up. - Done chan struct{} - - // Controls when to shutdown pcp logging. - PCPLog chan struct{} - - schedTrace *log.Logger + base // Type embedded to inherit common functions + taskMonitor map[string][]def.Task + totalPower map[string]float64 + ticker *time.Ticker + isCapping bool } // New electron scheduler. @@ -68,19 +44,21 @@ func NewBinPackedPistonCapper(tasks []def.Task, wattsAsAResource bool, schedTrac } s := &BinPackedPistonCapper{ - tasks: tasks, - wattsAsAResource: wattsAsAResource, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - taskMonitor: make(map[string][]def.Task), - totalPower: make(map[string]float64), - RecordPCP: false, - ticker: time.NewTicker(5 * time.Second), - isCapping: false, - schedTrace: log.New(logFile, "", log.LstdFlags), + base: base{ + tasks: tasks, + wattsAsAResource: wattsAsAResource, + classMapWatts: classMapWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + RecordPCP: false, + schedTrace: log.New(logFile, "", log.LstdFlags), + }, + taskMonitor: make(map[string][]def.Task), + totalPower: make(map[string]float64), + ticker: time.NewTicker(5 * time.Second), + isCapping: false, } return s } diff --git a/schedulers/binpacksortedwatts.go b/schedulers/binpacksortedwatts.go index 936f7f6..1f07e7d 100644 --- a/schedulers/binpacksortedwatts.go +++ b/schedulers/binpacksortedwatts.go @@ -36,30 +36,7 @@ func (s *BinPackSortedWatts) takeOffer(offer *mesos.Offer, task def.Task) bool { } type BinPackSortedWatts struct { - base // Type embedded to inherit common functions - tasksCreated int - tasksRunning int - tasks []def.Task - metrics map[string]def.Metric - running map[string]map[string]bool - wattsAsAResource bool - classMapWatts bool - - // First set of PCP values are garbage values, signal to logger to start recording when we're - // about to schedule a new task - RecordPCP bool - - // This channel is closed when the program receives an interrupt, - // signalling that the program should shut down. - Shutdown chan struct{} - // This channel is closed after shutdown is closed, and only when all - // outstanding tasks have been cleaned up - Done chan struct{} - - // Controls when to shutdown pcp logging - PCPLog chan struct{} - - schedTrace *log.Logger + base // Type embedded to inherit common functions } // New electron scheduler @@ -72,15 +49,17 @@ func NewBinPackSortedWatts(tasks []def.Task, wattsAsAResource bool, schedTracePr } s := &BinPackSortedWatts{ - tasks: tasks, - wattsAsAResource: wattsAsAResource, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - RecordPCP: false, - schedTrace: log.New(logFile, "", log.LstdFlags), + base: base{ + tasks: tasks, + wattsAsAResource: wattsAsAResource, + classMapWatts: classMapWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + RecordPCP: false, + schedTrace: log.New(logFile, "", log.LstdFlags), + }, } return s } diff --git a/schedulers/bottomHeavy.go b/schedulers/bottomHeavy.go index 7ee8fca..2af57d6 100644 --- a/schedulers/bottomHeavy.go +++ b/schedulers/bottomHeavy.go @@ -29,30 +29,7 @@ BinPacking has the most effect when co-scheduling of tasks is increased. Large t // electronScheduler implements the Scheduler interface type BottomHeavy struct { base // Type embedded to inherit common functions - tasksCreated int - tasksRunning int - tasks []def.Task - metrics map[string]def.Metric - running map[string]map[string]bool - wattsAsAResource bool - classMapWatts bool smallTasks, largeTasks []def.Task - - // First set of PCP values are garbage values, signal to logger to start recording when we're - // about to schedule a new task - RecordPCP bool - - // This channel is closed when the program receives an interrupt, - // signalling that the program should shut down. - Shutdown chan struct{} - // This channel is closed after shutdown is closed, and only when all - // outstanding tasks have been cleaned up - Done chan struct{} - - // Controls when to shutdown pcp logging - PCPLog chan struct{} - - schedTrace *log.Logger } // New electron scheduler @@ -68,16 +45,18 @@ func NewBottomHeavy(tasks []def.Task, wattsAsAResource bool, schedTracePrefix st // Classification done based on MMPU watts requirements. mid := int(math.Floor((float64(len(tasks)) / 2.0) + 0.5)) s := &BottomHeavy{ - smallTasks: tasks[:mid], - largeTasks: tasks[mid+1:], - wattsAsAResource: wattsAsAResource, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - RecordPCP: false, - schedTrace: log.New(logFile, "", log.LstdFlags), + base: base{ + wattsAsAResource: wattsAsAResource, + classMapWatts: classMapWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + RecordPCP: false, + schedTrace: log.New(logFile, "", log.LstdFlags), + }, + smallTasks: tasks[:mid], + largeTasks: tasks[mid+1:], } return s } diff --git a/schedulers/bpswMaxMin.go b/schedulers/bpswMaxMin.go index ae47645..54c4e0d 100644 --- a/schedulers/bpswMaxMin.go +++ b/schedulers/bpswMaxMin.go @@ -36,30 +36,7 @@ func (s *BPSWMaxMinWatts) takeOffer(offer *mesos.Offer, task def.Task) bool { } type BPSWMaxMinWatts struct { - base //Type embedding to inherit common functions - tasksCreated int - tasksRunning int - tasks []def.Task - metrics map[string]def.Metric - running map[string]map[string]bool - wattsAsAResource bool - classMapWatts bool - - // First set of PCP values are garbage values, signal to logger to start recording when we're - // about to schedule a new task - RecordPCP bool - - // This channel is closed when the program receives an interrupt, - // signalling that the program should shut down. - Shutdown chan struct{} - // This channel is closed after shutdown is closed, and only when all - // outstanding tasks have been cleaned up - Done chan struct{} - - // Controls when to shutdown pcp logging - PCPLog chan struct{} - - schedTrace *log.Logger + base //Type embedding to inherit common functions } // New electron scheduler @@ -72,15 +49,17 @@ func NewBPMaxMinWatts(tasks []def.Task, wattsAsAResource bool, schedTracePrefix } s := &BPSWMaxMinWatts{ - tasks: tasks, - wattsAsAResource: wattsAsAResource, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - RecordPCP: false, - schedTrace: log.New(logFile, "", log.LstdFlags), + base: base{ + tasks: tasks, + wattsAsAResource: wattsAsAResource, + classMapWatts: classMapWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + RecordPCP: false, + schedTrace: log.New(logFile, "", log.LstdFlags), + }, } return s } diff --git a/schedulers/bpswMaxMinPistonCapping.go b/schedulers/bpswMaxMinPistonCapping.go index d2f63e1..05c709a 100644 --- a/schedulers/bpswMaxMinPistonCapping.go +++ b/schedulers/bpswMaxMinPistonCapping.go @@ -41,34 +41,11 @@ func (s *BPSWMaxMinPistonCapping) takeOffer(offer *mesos.Offer, task def.Task) b } type BPSWMaxMinPistonCapping struct { - base //Type embedding to inherit common functions - tasksCreated int - tasksRunning int - tasks []def.Task - metrics map[string]def.Metric - running map[string]map[string]bool - taskMonitor map[string][]def.Task - totalPower map[string]float64 - wattsAsAResource bool - classMapWatts bool - ticker *time.Ticker - isCapping bool - - // First set of PCP values are garbage values, signal to logger to start recording when we're - // about to schedule a new task - RecordPCP bool - - // This channel is closed when the program receives an interrupt, - // signalling that the program should shut down. - Shutdown chan struct{} - // This channel is closed after shutdown is closed, and only when all - // outstanding tasks have been cleaned up - Done chan struct{} - - // Controls when to shutdown pcp logging - PCPLog chan struct{} - - schedTrace *log.Logger + base //Type embedding to inherit common functions + taskMonitor map[string][]def.Task + totalPower map[string]float64 + ticker *time.Ticker + isCapping bool } // New electron scheduler @@ -82,19 +59,21 @@ func NewBPSWMaxMinPistonCapping(tasks []def.Task, wattsAsAResource bool, schedTr } s := &BPSWMaxMinPistonCapping{ - tasks: tasks, - wattsAsAResource: wattsAsAResource, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - taskMonitor: make(map[string][]def.Task), - totalPower: make(map[string]float64), - RecordPCP: false, - ticker: time.NewTicker(5 * time.Second), - isCapping: false, - schedTrace: log.New(logFile, "", log.LstdFlags), + base: base{ + tasks: tasks, + wattsAsAResource: wattsAsAResource, + classMapWatts: classMapWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + RecordPCP: false, + schedTrace: log.New(logFile, "", log.LstdFlags), + }, + taskMonitor: make(map[string][]def.Task), + totalPower: make(map[string]float64), + ticker: time.NewTicker(5 * time.Second), + isCapping: false, } return s diff --git a/schedulers/bpswMaxMinProacCC.go b/schedulers/bpswMaxMinProacCC.go index 129b030..e1a48f0 100644 --- a/schedulers/bpswMaxMinProacCC.go +++ b/schedulers/bpswMaxMinProacCC.go @@ -40,38 +40,15 @@ func (s *BPSWMaxMinProacCC) takeOffer(offer *mesos.Offer, task def.Task) bool { } type BPSWMaxMinProacCC struct { - base // Type embedding to inherit common functions - tasksCreated int - tasksRunning int - tasks []def.Task - metrics map[string]def.Metric - running map[string]map[string]bool - taskMonitor map[string][]def.Task - availablePower map[string]float64 - totalPower map[string]float64 - wattsAsAResource bool - classMapWatts bool - capper *powCap.ClusterwideCapper - ticker *time.Ticker - recapTicker *time.Ticker - isCapping bool // indicate whether we are currently performing cluster-wide capping. - isRecapping bool // indicate whether we are currently performing cluster-wide recapping. - - // First set of PCP values are garbage values, signal to logger to start recording when we're - // about to schedule a new task - RecordPCP bool - - // This channel is closed when the program receives an interrupt, - // signalling that the program should shut down - Shutdown chan struct{} - // This channel is closed after shutdown is closed, and only when all - // outstanding tasks have been cleaned up - Done chan struct{} - - // Controls when to shutdown pcp logging - PCPLog chan struct{} - - schedTrace *log.Logger + base // Type embedding to inherit common functions + taskMonitor map[string][]def.Task + availablePower map[string]float64 + totalPower map[string]float64 + capper *powCap.ClusterwideCapper + ticker *time.Ticker + recapTicker *time.Ticker + isCapping bool // indicate whether we are currently performing cluster-wide capping. + isRecapping bool // indicate whether we are currently performing cluster-wide recapping. } // New electron scheduler @@ -84,23 +61,25 @@ func NewBPSWMaxMinProacCC(tasks []def.Task, wattsAsAResource bool, schedTracePre } s := &BPSWMaxMinProacCC{ - tasks: tasks, - wattsAsAResource: wattsAsAResource, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - taskMonitor: make(map[string][]def.Task), - availablePower: make(map[string]float64), - totalPower: make(map[string]float64), - RecordPCP: false, - capper: powCap.GetClusterwideCapperInstance(), - ticker: time.NewTicker(10 * time.Second), - recapTicker: time.NewTicker(20 * time.Second), - isCapping: false, - isRecapping: false, - schedTrace: log.New(logFile, "", log.LstdFlags), + base: base{ + tasks: tasks, + wattsAsAResource: wattsAsAResource, + classMapWatts: classMapWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + RecordPCP: false, + schedTrace: log.New(logFile, "", log.LstdFlags), + }, + taskMonitor: make(map[string][]def.Task), + availablePower: make(map[string]float64), + totalPower: make(map[string]float64), + capper: powCap.GetClusterwideCapperInstance(), + ticker: time.NewTicker(10 * time.Second), + recapTicker: time.NewTicker(20 * time.Second), + isCapping: false, + isRecapping: false, } return s } diff --git a/schedulers/firstfit.go b/schedulers/firstfit.go index 9992721..d0235ce 100644 --- a/schedulers/firstfit.go +++ b/schedulers/firstfit.go @@ -36,30 +36,7 @@ func (s *FirstFit) takeOffer(offer *mesos.Offer, task def.Task) bool { // electronScheduler implements the Scheduler interface type FirstFit struct { - base // Type embedded to inherit common functions - tasksCreated int - tasksRunning int - tasks []def.Task - metrics map[string]def.Metric - running map[string]map[string]bool - wattsAsAResource bool - classMapWatts bool - - // First set of PCP values are garbage values, signal to logger to start recording when we're - // about to schedule a new task - RecordPCP bool - - // This channel is closed when the program receives an interrupt, - // signalling that the program should shut down. - Shutdown chan struct{} - // This channel is closed after shutdown is closed, and only when all - // outstanding tasks have been cleaned up - Done chan struct{} - - // Controls when to shutdown pcp logging - PCPLog chan struct{} - - schedTrace *log.Logger + base // Type embedded to inherit common functions } // New electron scheduler @@ -71,15 +48,17 @@ func NewFirstFit(tasks []def.Task, wattsAsAResource bool, schedTracePrefix strin } s := &FirstFit{ - tasks: tasks, - wattsAsAResource: wattsAsAResource, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - RecordPCP: false, - schedTrace: log.New(logFile, "", log.LstdFlags), + base: base{ + tasks: tasks, + wattsAsAResource: wattsAsAResource, + classMapWatts: classMapWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + RecordPCP: false, + schedTrace: log.New(logFile, "", log.LstdFlags), + }, } return s } diff --git a/schedulers/firstfitProacCC.go b/schedulers/firstfitProacCC.go index ba06be6..4e361a5 100644 --- a/schedulers/firstfitProacCC.go +++ b/schedulers/firstfitProacCC.go @@ -37,39 +37,15 @@ func (s *FirstFitProacCC) takeOffer(offer *mesos.Offer, task def.Task) bool { // electronScheduler implements the Scheduler interface. type FirstFitProacCC struct { - base // Type embedded to inherit common functions - tasksCreated int - tasksRunning int - tasks []def.Task - metrics map[string]def.Metric - running map[string]map[string]bool - taskMonitor map[string][]def.Task // store tasks that are currently running. - availablePower map[string]float64 // available power for each node in the cluster. - totalPower map[string]float64 // total power for each node in the cluster. - wattsAsAResource bool - classMapWatts bool - capper *powCap.ClusterwideCapper - ticker *time.Ticker - recapTicker *time.Ticker - isCapping bool // indicate whether we are currently performing cluster wide capping. - isRecapping bool // indicate whether we are currently performing cluster wide re-capping. - - // First set of PCP values are garbage values, signal to logger to start recording when we're - // about to schedule the new task. - RecordPCP bool - - // This channel is closed when the program receives an interrupt, - // signalling that the program should shut down. - Shutdown chan struct{} - - // This channel is closed after shutdown is closed, and only when all - // outstanding tasks have been cleaned up. - Done chan struct{} - - // Controls when to shutdown pcp logging. - PCPLog chan struct{} - - schedTrace *log.Logger + base // Type embedded to inherit common functions + taskMonitor map[string][]def.Task // store tasks that are currently running. + availablePower map[string]float64 // available power for each node in the cluster. + totalPower map[string]float64 // total power for each node in the cluster. + capper *powCap.ClusterwideCapper + ticker *time.Ticker + recapTicker *time.Ticker + isCapping bool // indicate whether we are currently performing cluster wide capping. + isRecapping bool // indicate whether we are currently performing cluster wide re-capping. } // New electron scheduler. @@ -82,23 +58,25 @@ func NewFirstFitProacCC(tasks []def.Task, wattsAsAResource bool, schedTracePrefi } s := &FirstFitProacCC{ - tasks: tasks, - wattsAsAResource: wattsAsAResource, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - taskMonitor: make(map[string][]def.Task), - availablePower: make(map[string]float64), - totalPower: make(map[string]float64), - RecordPCP: false, - capper: powCap.GetClusterwideCapperInstance(), - ticker: time.NewTicker(10 * time.Second), - recapTicker: time.NewTicker(20 * time.Second), - isCapping: false, - isRecapping: false, - schedTrace: log.New(logFile, "", log.LstdFlags), + base: base{ + tasks: tasks, + wattsAsAResource: wattsAsAResource, + classMapWatts: classMapWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + RecordPCP: false, + schedTrace: log.New(logFile, "", log.LstdFlags), + }, + taskMonitor: make(map[string][]def.Task), + availablePower: make(map[string]float64), + totalPower: make(map[string]float64), + capper: powCap.GetClusterwideCapperInstance(), + ticker: time.NewTicker(10 * time.Second), + recapTicker: time.NewTicker(20 * time.Second), + isCapping: false, + isRecapping: false, } return s } diff --git a/schedulers/firstfitSortedOffers.go b/schedulers/firstfitSortedOffers.go index 3e4fabe..d088b3d 100644 --- a/schedulers/firstfitSortedOffers.go +++ b/schedulers/firstfitSortedOffers.go @@ -37,30 +37,7 @@ func (s *FirstFitSortedOffers) takeOffer(offer *mesos.Offer, task def.Task) bool // electronScheduler implements the Scheduler interface type FirstFitSortedOffers struct { - base // Type embedded to inherit common functions - tasksCreated int - tasksRunning int - tasks []def.Task - metrics map[string]def.Metric - running map[string]map[string]bool - wattsAsAResource bool - classMapWatts bool - - // First set of PCP values are garbage values, signal to logger to start recording when we're - // about to schedule a new task - RecordPCP bool - - // This channel is closed when the program receives an interrupt, - // signalling that the program should shut down. - Shutdown chan struct{} - // This channel is closed after shutdown is closed, and only when all - // outstanding tasks have been cleaned up - Done chan struct{} - - // Controls when to shutdown pcp logging - PCPLog chan struct{} - - schedTrace *log.Logger + base // Type embedded to inherit common functions } // New electron scheduler @@ -72,15 +49,17 @@ func NewFirstFitSortedOffers(tasks []def.Task, wattsAsAResource bool, schedTrace } s := &FirstFitSortedOffers{ - tasks: tasks, - wattsAsAResource: wattsAsAResource, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - RecordPCP: false, - schedTrace: log.New(logFile, "", log.LstdFlags), + base: base{ + tasks: tasks, + wattsAsAResource: wattsAsAResource, + classMapWatts: classMapWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + RecordPCP: false, + schedTrace: log.New(logFile, "", log.LstdFlags), + }, } return s } diff --git a/schedulers/firstfitSortedWattsProacCC.go b/schedulers/firstfitSortedWattsProacCC.go index bf4964e..cee8af5 100644 --- a/schedulers/firstfitSortedWattsProacCC.go +++ b/schedulers/firstfitSortedWattsProacCC.go @@ -48,39 +48,15 @@ func (s *FirstFitSortedWattsProacCC) takeOffer(offer *mesos.Offer, task def.Task // electronScheduler implements the Scheduler interface type FirstFitSortedWattsProacCC struct { - base // Type embedded to inherit common functions - tasksCreated int - tasksRunning int - tasks []def.Task - metrics map[string]def.Metric - running map[string]map[string]bool - taskMonitor map[string][]def.Task // store tasks that are currently running. - availablePower map[string]float64 // available power for each node in the cluster. - totalPower map[string]float64 // total power for each node in the cluster. - wattsAsAResource bool - classMapWatts bool - capper *powCap.ClusterwideCapper - ticker *time.Ticker - recapTicker *time.Ticker - isCapping bool // indicate whether we are currently performing cluster wide capping. - isRecapping bool // indicate whether we are currently performing cluster wide re-capping. - - // First set of PCP values are garbage values, signal to logger to start recording when we're - // about to schedule the new task. - RecordPCP bool - - // This channel is closed when the program receives an interrupt, - // signalling that the program should shut down. - Shutdown chan struct{} - - // This channel is closed after shutdown is closed, and only when all - // outstanding tasks have been cleaned up. - Done chan struct{} - - // Controls when to shutdown pcp logging. - PCPLog chan struct{} - - schedTrace *log.Logger + base // Type embedded to inherit common functions + taskMonitor map[string][]def.Task // store tasks that are currently running. + availablePower map[string]float64 // available power for each node in the cluster. + totalPower map[string]float64 // total power for each node in the cluster. + capper *powCap.ClusterwideCapper + ticker *time.Ticker + recapTicker *time.Ticker + isCapping bool // indicate whether we are currently performing cluster wide capping. + isRecapping bool // indicate whether we are currently performing cluster wide re-capping. } // New electron scheduler. @@ -96,23 +72,25 @@ func NewFirstFitSortedWattsProacCC(tasks []def.Task, wattsAsAResource bool, sche } s := &FirstFitSortedWattsProacCC{ - tasks: tasks, - wattsAsAResource: wattsAsAResource, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - taskMonitor: make(map[string][]def.Task), - availablePower: make(map[string]float64), - totalPower: make(map[string]float64), - RecordPCP: false, - capper: powCap.GetClusterwideCapperInstance(), - ticker: time.NewTicker(10 * time.Second), - recapTicker: time.NewTicker(20 * time.Second), - isCapping: false, - isRecapping: false, - schedTrace: log.New(logFile, "", log.LstdFlags), + base: base{ + tasks: tasks, + wattsAsAResource: wattsAsAResource, + classMapWatts: classMapWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + RecordPCP: false, + schedTrace: log.New(logFile, "", log.LstdFlags), + }, + taskMonitor: make(map[string][]def.Task), + availablePower: make(map[string]float64), + totalPower: make(map[string]float64), + capper: powCap.GetClusterwideCapperInstance(), + ticker: time.NewTicker(10 * time.Second), + recapTicker: time.NewTicker(20 * time.Second), + isCapping: false, + isRecapping: false, } return s } diff --git a/schedulers/firstfitSortedWattsSortedOffers.go b/schedulers/firstfitSortedWattsSortedOffers.go index 3742db2..2f309fd 100644 --- a/schedulers/firstfitSortedWattsSortedOffers.go +++ b/schedulers/firstfitSortedWattsSortedOffers.go @@ -37,30 +37,7 @@ func (s *FirstFitSortedWattsSortedOffers) takeOffer(offer *mesos.Offer, task def // electronScheduler implements the Scheduler interface type FirstFitSortedWattsSortedOffers struct { - base // Type embedded to inherit common functions - tasksCreated int - tasksRunning int - tasks []def.Task - metrics map[string]def.Metric - running map[string]map[string]bool - wattsAsAResource bool - classMapWatts bool - - // First set of PCP values are garbage values, signal to logger to start recording when we're - // about to schedule a new task - RecordPCP bool - - // This channel is closed when the program receives an interrupt, - // signalling that the program should shut down. - Shutdown chan struct{} - // This channel is closed after shutdown is closed, and only when all - // outstanding tasks have been cleaned up - Done chan struct{} - - // Controls when to shutdown pcp logging - PCPLog chan struct{} - - schedTrace *log.Logger + base // Type embedded to inherit common functions } // New electron scheduler @@ -76,15 +53,17 @@ func NewFirstFitSortedWattsSortedOffers(tasks []def.Task, wattsAsAResource bool, } s := &FirstFitSortedWattsSortedOffers{ - tasks: tasks, - wattsAsAResource: wattsAsAResource, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - RecordPCP: false, - schedTrace: log.New(logFile, "", log.LstdFlags), + base: base{ + tasks: tasks, + wattsAsAResource: wattsAsAResource, + classMapWatts: classMapWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + RecordPCP: false, + schedTrace: log.New(logFile, "", log.LstdFlags), + }, } return s } diff --git a/schedulers/firstfitsortedwatts.go b/schedulers/firstfitsortedwatts.go index 5d624cf..0b05f12 100644 --- a/schedulers/firstfitsortedwatts.go +++ b/schedulers/firstfitsortedwatts.go @@ -37,30 +37,7 @@ func (s *FirstFitSortedWatts) takeOffer(offer *mesos.Offer, task def.Task) bool // electronScheduler implements the Scheduler interface type FirstFitSortedWatts struct { - base // Type embedded to inherit common functions - tasksCreated int - tasksRunning int - tasks []def.Task - metrics map[string]def.Metric - running map[string]map[string]bool - wattsAsAResource bool - classMapWatts bool - - // First set of PCP values are garbage values, signal to logger to start recording when we're - // about to schedule a new task - RecordPCP bool - - // This channel is closed when the program receives an interrupt, - // signalling that the program should shut down. - Shutdown chan struct{} - // This channel is closed after shutdown is closed, and only when all - // outstanding tasks have been cleaned up - Done chan struct{} - - // Controls when to shutdown pcp logging - PCPLog chan struct{} - - schedTrace *log.Logger + base // Type embedded to inherit common functions } // New electron scheduler @@ -74,15 +51,17 @@ func NewFirstFitSortedWatts(tasks []def.Task, wattsAsAResource bool, schedTraceP } s := &FirstFitSortedWatts{ - tasks: tasks, - wattsAsAResource: wattsAsAResource, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - RecordPCP: false, - schedTrace: log.New(logFile, "", log.LstdFlags), + base: base{ + tasks: tasks, + wattsAsAResource: wattsAsAResource, + classMapWatts: classMapWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + RecordPCP: false, + schedTrace: log.New(logFile, "", log.LstdFlags), + }, } return s } diff --git a/schedulers/firstfitwattsonly.go b/schedulers/firstfitwattsonly.go index 2d531c9..5b29914 100644 --- a/schedulers/firstfitwattsonly.go +++ b/schedulers/firstfitwattsonly.go @@ -35,30 +35,7 @@ func (s *FirstFitWattsOnly) takeOffer(offer *mesos.Offer, task def.Task) bool { } type FirstFitWattsOnly struct { - base // Type embedded to inherit common functions - tasksCreated int - tasksRunning int - tasks []def.Task - metrics map[string]def.Metric - running map[string]map[string]bool - wattsAsAResource bool - classMapWatts bool - - // First set of PCP values are garbage values, signal to logger to start recording when we're - // about to schedule a new task - RecordPCP bool - - // This channel is closed when the program receives an interrupt, - // signalling that the program should shut down. - Shutdown chan struct{} - // This channel is closed after shutdown is closed, and only when all - // outstanding tasks have been cleaned up - Done chan struct{} - - // Controls when to shutdown pcp logging - PCPLog chan struct{} - - schedTrace *log.Logger + base // Type embedded to inherit common functions } // New electron scheduler @@ -70,15 +47,17 @@ func NewFirstFitWattsOnly(tasks []def.Task, wattsAsAResource bool, schedTracePre } s := &FirstFitWattsOnly{ - tasks: tasks, - wattsAsAResource: wattsAsAResource, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - RecordPCP: false, - schedTrace: log.New(logFile, "", log.LstdFlags), + base: base{ + tasks: tasks, + wattsAsAResource: wattsAsAResource, + classMapWatts: classMapWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + RecordPCP: false, + schedTrace: log.New(logFile, "", log.LstdFlags), + }, } return s } diff --git a/schedulers/topHeavy.go b/schedulers/topHeavy.go index 39ffe03..1281634 100644 --- a/schedulers/topHeavy.go +++ b/schedulers/topHeavy.go @@ -29,30 +29,7 @@ starvation of power intensive tasks. // electronScheduler implements the Scheduler interface type TopHeavy struct { base // Type embedded to inherit common functions - tasksCreated int - tasksRunning int - tasks []def.Task - metrics map[string]def.Metric - running map[string]map[string]bool - wattsAsAResource bool - classMapWatts bool smallTasks, largeTasks []def.Task - - // First set of PCP values are garbage values, signal to logger to start recording when we're - // about to schedule a new task - RecordPCP bool - - // This channel is closed when the program receives an interrupt, - // signalling that the program should shut down. - Shutdown chan struct{} - // This channel is closed after shutdown is closed, and only when all - // outstanding tasks have been cleaned up - Done chan struct{} - - // Controls when to shutdown pcp logging - PCPLog chan struct{} - - schedTrace *log.Logger } // New electron scheduler @@ -68,16 +45,18 @@ func NewTopHeavy(tasks []def.Task, wattsAsAResource bool, schedTracePrefix strin // Classification done based on MMPU watts requirements. mid := int(math.Floor((float64(len(tasks)) / 2.0) + 0.5)) s := &TopHeavy{ - smallTasks: tasks[:mid], - largeTasks: tasks[mid+1:], - wattsAsAResource: wattsAsAResource, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - RecordPCP: false, - schedTrace: log.New(logFile, "", log.LstdFlags), + base: base{ + wattsAsAResource: wattsAsAResource, + classMapWatts: classMapWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + RecordPCP: false, + schedTrace: log.New(logFile, "", log.LstdFlags), + }, + smallTasks: tasks[:mid], + largeTasks: tasks[mid+1:], } return s }