resolved merge conflict with master. Also, changed the name of the constructor for BPSWMaxMin from NewBPMaxMinWatts to NewBPSWMaxMinWatts

This commit is contained in:
Pradyumna Kaushik 2017-02-11 14:26:27 -05:00
commit 02fede7184
21 changed files with 142 additions and 171 deletions

View file

@ -15,8 +15,8 @@ To Do:
* Log fix for declining offer -- different reason when insufficient resources as compared to when there are no * Log fix for declining offer -- different reason when insufficient resources as compared to when there are no
longer any tasks to schedule. longer any tasks to schedule.
* Have a centralised logFile that can be filtered by identifier. All electron logs should go into this file. * Have a centralised logFile that can be filtered by identifier. All electron logs should go into this file.
* Make ClassMapWatts to commandLine arguments so Electron can be run with ClassMapWatts enabled/disabled.
* Make def.Task an interface for further modularization and flexibility. * Make def.Task an interface for further modularization and flexibility.
* Convert def#WattsToConsider(...) to be a receiver of def.Task and change the name of it to Watts(...).
**Requires [Performance Co-Pilot](http://pcp.io/) tool pmdumptext to be installed on the **Requires [Performance Co-Pilot](http://pcp.io/) tool pmdumptext to be installed on the
machine on which electron is launched for logging to work and PCP collector agents installed machine on which electron is launched for logging to work and PCP collector agents installed

View file

@ -1,8 +1,9 @@
/* /*
Constants that are used across scripts Constants that are used across scripts
1. The available hosts = stratos-00x (x varies from 1 to 8) 1. The available hosts = stratos-00x (x varies from 1 to 8)
2. CapMargin = percentage of the requested power to allocate 2. Tolerance = tolerance for a task that when exceeded would starve the task.
3. ConsiderationWindowSize = number of tasks to consider for computation of the dynamic cap. 3. ConsiderationWindowSize = number of tasks to consider for computation of the dynamic cap.
TODO: Clean this up and use Mesos Attributes instead.
*/ */
package constants package constants
@ -11,7 +12,11 @@ var Hosts = []string{"stratos-001.cs.binghamton.edu", "stratos-002.cs.binghamton
"stratos-005.cs.binghamton.edu", "stratos-006.cs.binghamton.edu", "stratos-005.cs.binghamton.edu", "stratos-006.cs.binghamton.edu",
"stratos-007.cs.binghamton.edu", "stratos-008.cs.binghamton.edu"} "stratos-007.cs.binghamton.edu", "stratos-008.cs.binghamton.edu"}
// Classification of the nodes in the cluster based on their power consumption. /*
Classification of the nodes in the cluster based on their Thermal Design Power (TDP).
The power classes are labelled in the decreasing order of the corresponding TDP, with class A nodes
having the highest TDP and class C nodes having the lowest TDP.
*/
var PowerClasses = map[string]map[string]bool{ var PowerClasses = map[string]map[string]bool{
"A": map[string]bool{ "A": map[string]bool{
"stratos-005.cs.binghamton.edu": true, "stratos-005.cs.binghamton.edu": true,
@ -31,10 +36,10 @@ var PowerClasses = map[string]map[string]bool{
/* /*
Margin with respect to the required power for a job. Margin with respect to the required power for a job.
So, if power required = 10W, the node would be capped to CapMargin * 10W. So, if power required = 10W, the node would be capped to Tolerance * 10W.
This value can be changed upon convenience. This value can be changed upon convenience.
*/ */
var CapMargin = 0.70 var Tolerance = 0.70
// Window size for running average // Window size for running average
var ConsiderationWindowSize = 20 var ConsiderationWindowSize = 20

View file

@ -23,7 +23,7 @@ type taskWrapper struct {
} }
func (tw taskWrapper) Val() float64 { func (tw taskWrapper) Val() float64 {
return tw.task.Watts * constants.CapMargin return tw.task.Watts * constants.Tolerance
} }
func (tw taskWrapper) ID() string { func (tw taskWrapper) ID() string {
@ -121,7 +121,7 @@ func (capper ClusterwideCapper) CleverRecap(totalPower map[string]float64,
// Not considering this task for the computation of totalAllocatedPower and totalRunningTasks // Not considering this task for the computation of totalAllocatedPower and totalRunningTasks
continue continue
} }
wattsUsages[host] = append(wattsUsages[host], float64(task.Watts)*constants.CapMargin) wattsUsages[host] = append(wattsUsages[host], float64(task.Watts)*constants.Tolerance)
} }
} }
@ -202,7 +202,7 @@ func (capper ClusterwideCapper) NaiveRecap(totalPower map[string]float64,
// Not considering this task for the computation of totalAllocatedPower and totalRunningTasks // Not considering this task for the computation of totalAllocatedPower and totalRunningTasks
continue continue
} }
totalAllocatedPower += (float64(task.Watts) * constants.CapMargin) totalAllocatedPower += (float64(task.Watts) * constants.Tolerance)
totalRunningTasks++ totalRunningTasks++
} }
} }

View file

@ -12,26 +12,23 @@ import (
"log" "log"
"os" "os"
"sort" "sort"
"strings"
"time" "time"
) )
// Decides if to take an offer or not // Decides if to take an offer or not
func (s *BinPackSortedWattsSortedOffers) takeOffer(offer *mesos.Offer, task def.Task) bool { func (s *BinPackSortedWattsSortedOffers) takeOffer(offer *mesos.Offer, task def.Task) bool {
cpus, mem, watts := offerUtils.OfferAgg(offer) offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
//TODO: Insert watts calculation here instead of taking them as a parameter //TODO: Insert watts calculation here instead of taking them as a parameter
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer) wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
if err != nil { if err != nil {
// Error in determining wattsConsideration // Error in determining wattsConsideration
log.Fatal(err) log.Fatal(err)
} }
if cpus >= task.CPU && mem >= task.RAM && (!s.wattsAsAResource || (watts >= wattsConsideration)) { if offerCPU >= task.CPU && offerRAM >= task.RAM && (!s.wattsAsAResource || (offerWatts >= wattsConsideration)) {
return true return true
} }
return false return false
} }
@ -145,8 +142,6 @@ func (s *BinPackSortedWattsSortedOffers) ResourceOffers(driver sched.SchedulerDr
tasks := []*mesos.TaskInfo{} tasks := []*mesos.TaskInfo{}
offer_cpu, offer_ram, offer_watts := offerUtils.OfferAgg(offer)
offerTaken := false offerTaken := false
totalWatts := 0.0 totalWatts := 0.0
totalCPU := 0.0 totalCPU := 0.0
@ -159,19 +154,14 @@ func (s *BinPackSortedWattsSortedOffers) ResourceOffers(driver sched.SchedulerDr
log.Fatal(err) log.Fatal(err)
} }
// Check host if it exists // Don't take offer if it doesn't match our task's host requirement
if task.Host != "" { if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
// Don't take offer if it doesn't match our task's host requirement continue
if !strings.HasPrefix(*offer.Hostname, task.Host) {
continue
}
} }
for *task.Instances > 0 { for *task.Instances > 0 {
// Does the task fit // Does the task fit
if (!s.wattsAsAResource || (offer_watts >= (totalWatts + wattsConsideration))) && if s.takeOffer(offer, task) {
(offer_cpu >= (totalCPU + task.CPU)) &&
(offer_ram >= (totalRAM + task.RAM)) {
offerTaken = true offerTaken = true
totalWatts += wattsConsideration totalWatts += wattsConsideration

View file

@ -15,7 +15,6 @@ import (
"log" "log"
"math" "math"
"os" "os"
"strings"
"sync" "sync"
"time" "time"
) )
@ -257,12 +256,9 @@ func (s *BinPackedPistonCapper) ResourceOffers(driver sched.SchedulerDriver, off
log.Fatal(err) log.Fatal(err)
} }
// Check host if it exists // Don't take offer if it doesn't match our task's host requirement
if task.Host != "" { if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
// Don't take offer if it doens't match our task's host requirement. continue
if !strings.HasPrefix(*offer.Hostname, task.Host) {
continue
}
} }
for *task.Instances > 0 { for *task.Instances > 0 {
@ -289,7 +285,7 @@ func (s *BinPackedPistonCapper) ResourceOffers(driver sched.SchedulerDriver, off
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
*task.Instances-- *task.Instances--
// updating the cap value for offer.Hostname // updating the cap value for offer.Hostname
partialLoad += ((wattsConsideration * constants.CapMargin) / s.totalPower[*offer.Hostname]) * 100 partialLoad += ((wattsConsideration * constants.Tolerance) / s.totalPower[*offer.Hostname]) * 100
if *task.Instances <= 0 { if *task.Instances <= 0 {
// All instances of task have been scheduled. Remove it // All instances of task have been scheduled. Remove it
@ -379,7 +375,7 @@ func (s *BinPackedPistonCapper) StatusUpdate(driver sched.SchedulerDriver, statu
} }
// Need to update the cap values for host of the finishedTask // Need to update the cap values for host of the finishedTask
bpPistonMutex.Lock() bpPistonMutex.Lock()
bpPistonCapValues[hostOfFinishedTask] -= ((wattsConsideration * constants.CapMargin) / s.totalPower[hostOfFinishedTask]) * 100 bpPistonCapValues[hostOfFinishedTask] -= ((wattsConsideration * constants.Tolerance) / s.totalPower[hostOfFinishedTask]) * 100
// Checking to see if the cap value has become 0, in which case we uncap the host. // Checking to see if the cap value has become 0, in which case we uncap the host.
if int(math.Floor(bpPistonCapValues[hostOfFinishedTask]+0.5)) == 0 { if int(math.Floor(bpPistonCapValues[hostOfFinishedTask]+0.5)) == 0 {
bpPistonCapValues[hostOfFinishedTask] = 100 bpPistonCapValues[hostOfFinishedTask] = 100

View file

@ -12,7 +12,6 @@ import (
"log" "log"
"os" "os"
"sort" "sort"
"strings"
"time" "time"
) )
@ -31,7 +30,6 @@ func (s *BinPackSortedWatts) takeOffer(offer *mesos.Offer, task def.Task) bool {
if cpus >= task.CPU && mem >= task.RAM && (!s.wattsAsAResource || (watts >= wattsConsideration)) { if cpus >= task.CPU && mem >= task.RAM && (!s.wattsAsAResource || (watts >= wattsConsideration)) {
return true return true
} }
return false return false
} }
@ -133,8 +131,6 @@ func (s *BinPackSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offers
tasks := []*mesos.TaskInfo{} tasks := []*mesos.TaskInfo{}
offer_cpu, offer_ram, offer_watts := offerUtils.OfferAgg(offer)
offerTaken := false offerTaken := false
totalWatts := 0.0 totalWatts := 0.0
totalCPU := 0.0 totalCPU := 0.0
@ -147,19 +143,14 @@ func (s *BinPackSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offers
log.Fatal(err) log.Fatal(err)
} }
// Check host if it exists // Don't take offer if it doesn't match our task's host requirement
if task.Host != "" { if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
// Don't take offer if it doesn't match our task's host requirement continue
if !strings.HasPrefix(*offer.Hostname, task.Host) {
continue
}
} }
for *task.Instances > 0 { for *task.Instances > 0 {
// Does the task fit // Does the task fit
if (!s.wattsAsAResource || (offer_watts >= (totalWatts + wattsConsideration))) && if s.takeOffer(offer, task) {
(offer_cpu >= (totalCPU + task.CPU)) &&
(offer_ram >= (totalRAM + task.RAM)) {
offerTaken = true offerTaken = true
totalWatts += wattsConsideration totalWatts += wattsConsideration

View file

@ -26,6 +26,31 @@ BinPacking has the most effect when co-scheduling of tasks is increased. Large t
co-scheduling them has a great impact on the total power utilization. co-scheduling them has a great impact on the total power utilization.
*/ */
func (s *BottomHeavy) takeOfferBinPack(offer *mesos.Offer, totalCPU, totalRAM, totalWatts,
wattsToConsider float64, task def.Task) bool {
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
//TODO: Insert watts calculation here instead of taking them as a parameter
if (!s.wattsAsAResource || (offerWatts >= (totalWatts + wattsToConsider))) &&
(offerCPU >= (totalCPU + task.CPU)) &&
(offerRAM >= (totalRAM + task.RAM)) {
return true
}
return false
}
func (s *BottomHeavy) takeOfferFirstFit(offer *mesos.Offer, wattsConsideration float64, task def.Task) bool {
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
//TODO: Insert watts calculation here instead of taking them as a parameter
if (!s.wattsAsAResource || (offerWatts >= wattsConsideration)) &&
(offerCPU >= task.CPU) && (offerRAM >= task.RAM) {
return true
}
return false
}
// electronScheduler implements the Scheduler interface // electronScheduler implements the Scheduler interface
type BottomHeavy struct { type BottomHeavy struct {
base // Type embedded to inherit common functions base // Type embedded to inherit common functions
@ -148,7 +173,6 @@ func (s *BottomHeavy) pack(offers []*mesos.Offer, driver sched.SchedulerDriver)
} }
tasks := []*mesos.TaskInfo{} tasks := []*mesos.TaskInfo{}
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
totalWatts := 0.0 totalWatts := 0.0
totalCPU := 0.0 totalCPU := 0.0
totalRAM := 0.0 totalRAM := 0.0
@ -165,9 +189,7 @@ func (s *BottomHeavy) pack(offers []*mesos.Offer, driver sched.SchedulerDriver)
// Does the task fit // Does the task fit
// OR lazy evaluation. If ignore watts is set to true, second statement won't // OR lazy evaluation. If ignore watts is set to true, second statement won't
// be evaluated. // be evaluated.
if (!s.wattsAsAResource || (offerWatts >= (totalWatts + wattsConsideration))) && if s.takeOfferBinPack(offer, totalCPU, totalRAM, totalWatts, wattsConsideration, task) {
(offerCPU >= (totalCPU + task.CPU)) &&
(offerRAM >= (totalRAM + task.RAM)) {
offerTaken = true offerTaken = true
totalWatts += wattsConsideration totalWatts += wattsConsideration
totalCPU += task.CPU totalCPU += task.CPU
@ -213,7 +235,6 @@ func (s *BottomHeavy) spread(offers []*mesos.Offer, driver sched.SchedulerDriver
} }
tasks := []*mesos.TaskInfo{} tasks := []*mesos.TaskInfo{}
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
taken := false taken := false
for i := 0; i < len(s.smallTasks); i++ { for i := 0; i < len(s.smallTasks); i++ {
task := s.smallTasks[i] task := s.smallTasks[i]
@ -221,14 +242,10 @@ func (s *BottomHeavy) spread(offers []*mesos.Offer, driver sched.SchedulerDriver
if err != nil { if err != nil {
// Error in determining wattsConsideration // Error in determining wattsConsideration
log.Fatal(err) log.Fatal(err)
} else {
// Logging the watts consideration
log.Printf("Watts Considered for host[%s], task[%s] = %f\n", *offer.Hostname, task.Name, wattsConsideration)
} }
// Decision to take the offer or not // Decision to take the offer or not
if (!s.wattsAsAResource || (offerWatts >= wattsConsideration)) && if s.takeOfferFirstFit(offer, wattsConsideration, task) {
(offerCPU >= task.CPU) && (offerRAM >= task.RAM) {
taken = true taken = true
tasks = append(tasks, s.createTaskInfoAndLogSchedTrace(offer, task)) tasks = append(tasks, s.createTaskInfoAndLogSchedTrace(offer, task))
log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname())

View file

@ -12,7 +12,6 @@ import (
"log" "log"
"os" "os"
"sort" "sort"
"strings"
"time" "time"
) )
@ -31,7 +30,6 @@ func (s *BPSWMaxMinWatts) takeOffer(offer *mesos.Offer, task def.Task) bool {
if cpus >= task.CPU && mem >= task.RAM && (!s.wattsAsAResource || (watts >= wattsConsideration)) { if cpus >= task.CPU && mem >= task.RAM && (!s.wattsAsAResource || (watts >= wattsConsideration)) {
return true return true
} }
return false return false
} }
@ -40,7 +38,7 @@ type BPSWMaxMinWatts struct {
} }
// New electron scheduler // New electron scheduler
func NewBPMaxMinWatts(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *BPSWMaxMinWatts { func NewBPSWMaxMinWatts(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *BPSWMaxMinWatts {
sort.Sort(def.WattsSorter(tasks)) sort.Sort(def.WattsSorter(tasks))
logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log")
@ -120,7 +118,8 @@ func (s *BPSWMaxMinWatts) newTask(offer *mesos.Offer, task def.Task) *mesos.Task
// Determine if the remaining space inside of the offer is enough for this // Determine if the remaining space inside of the offer is enough for this
// the task we need to create. If it is, create a TaskInfo and return it. // the task we need to create. If it is, create a TaskInfo and return it.
func (s *BPSWMaxMinWatts) CheckFit(i int, func (s *BPSWMaxMinWatts) CheckFit(
i int,
task def.Task, task def.Task,
wattsConsideration float64, wattsConsideration float64,
offer *mesos.Offer, offer *mesos.Offer,
@ -128,12 +127,8 @@ func (s *BPSWMaxMinWatts) CheckFit(i int,
totalRAM *float64, totalRAM *float64,
totalWatts *float64) (bool, *mesos.TaskInfo) { totalWatts *float64) (bool, *mesos.TaskInfo) {
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
// Does the task fit // Does the task fit
if (!s.wattsAsAResource || (offerWatts >= (*totalWatts + wattsConsideration))) && if s.takeOffer(offer, task) {
(offerCPU >= (*totalCPU + task.CPU)) &&
(offerRAM >= (*totalRAM + task.RAM)) {
*totalWatts += wattsConsideration *totalWatts += wattsConsideration
*totalCPU += task.CPU *totalCPU += task.CPU
@ -197,12 +192,9 @@ func (s *BPSWMaxMinWatts) ResourceOffers(driver sched.SchedulerDriver, offers []
log.Fatal(err) log.Fatal(err)
} }
// Check host if it exists // Don't take offer if it doesn't match our task's host requirement
if task.Host != "" { if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
// Don't take offer if it doesn't match our task's host requirement continue
if !strings.HasPrefix(*offer.Hostname, task.Host) {
continue
}
} }
// TODO: Fix this so index doesn't need to be passed // TODO: Fix this so index doesn't need to be passed
@ -225,12 +217,9 @@ func (s *BPSWMaxMinWatts) ResourceOffers(driver sched.SchedulerDriver, offers []
log.Fatal(err) log.Fatal(err)
} }
// Check host if it exists // Don't take offer if it doesn't match our task's host requirement
if task.Host != "" { if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
// Don't take offer if it doesn't match our task's host requirement continue
if !strings.HasPrefix(*offer.Hostname, task.Host) {
continue
}
} }
for *task.Instances > 0 { for *task.Instances > 0 {

View file

@ -16,7 +16,6 @@ import (
"math" "math"
"os" "os"
"sort" "sort"
"strings"
"sync" "sync"
"time" "time"
) )
@ -36,7 +35,6 @@ func (s *BPSWMaxMinPistonCapping) takeOffer(offer *mesos.Offer, task def.Task) b
if cpus >= task.CPU && mem >= task.RAM && (!s.wattsAsAResource || (watts >= wattsConsideration)) { if cpus >= task.CPU && mem >= task.RAM && (!s.wattsAsAResource || (watts >= wattsConsideration)) {
return true return true
} }
return false return false
} }
@ -209,7 +207,8 @@ func (s *BPSWMaxMinPistonCapping) stopCapping() {
// Determine if the remaining sapce inside of the offer is enough for // Determine if the remaining sapce inside of the offer is enough for
// the task we need to create. If it is, create a TaskInfo and return it. // the task we need to create. If it is, create a TaskInfo and return it.
func (s *BPSWMaxMinPistonCapping) CheckFit(i int, func (s *BPSWMaxMinPistonCapping) CheckFit(
i int,
task def.Task, task def.Task,
wattsConsideration float64, wattsConsideration float64,
offer *mesos.Offer, offer *mesos.Offer,
@ -218,12 +217,8 @@ func (s *BPSWMaxMinPistonCapping) CheckFit(i int,
totalWatts *float64, totalWatts *float64,
partialLoad *float64) (bool, *mesos.TaskInfo) { partialLoad *float64) (bool, *mesos.TaskInfo) {
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
// Does the task fit // Does the task fit
if (!s.wattsAsAResource || (offerWatts >= (*totalWatts + wattsConsideration))) && if s.takeOffer(offer, task) {
(offerCPU >= (*totalCPU + task.CPU)) &&
(offerRAM >= (*totalRAM + task.RAM)) {
// Start piston capping if haven't started yet // Start piston capping if haven't started yet
if !s.isCapping { if !s.isCapping {
@ -242,7 +237,7 @@ func (s *BPSWMaxMinPistonCapping) CheckFit(i int,
fmt.Println("Inst: ", *task.Instances) fmt.Println("Inst: ", *task.Instances)
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
*task.Instances-- *task.Instances--
*partialLoad += ((wattsConsideration * constants.CapMargin) / s.totalPower[*offer.Hostname]) * 100 *partialLoad += ((wattsConsideration * constants.Tolerance) / s.totalPower[*offer.Hostname]) * 100
if *task.Instances <= 0 { if *task.Instances <= 0 {
// All instances of task have been scheduled, remove it // All instances of task have been scheduled, remove it
@ -297,12 +292,9 @@ func (s *BPSWMaxMinPistonCapping) ResourceOffers(driver sched.SchedulerDriver, o
log.Fatal(err) log.Fatal(err)
} }
// Check host if it exists // Don't take offer if it doesn't match our task's host requirement
if task.Host != "" { if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
// Don't take offer if it doesn't match our task's host requirement continue
if !strings.HasPrefix(*offer.Hostname, task.Host) {
continue
}
} }
// TODO: Fix this so index doesn't need to be passed // TODO: Fix this so index doesn't need to be passed
@ -325,12 +317,9 @@ func (s *BPSWMaxMinPistonCapping) ResourceOffers(driver sched.SchedulerDriver, o
log.Fatal(err) log.Fatal(err)
} }
// Check host if it exists // Don't take offer if it doesn't match our task's host requirement
if task.Host != "" { if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
// Don't take offer if it doesn't match our task's host requirement continue
if !strings.HasPrefix(*offer.Hostname, task.Host) {
continue
}
} }
for *task.Instances > 0 { for *task.Instances > 0 {
@ -422,7 +411,7 @@ func (s *BPSWMaxMinPistonCapping) StatusUpdate(driver sched.SchedulerDriver, sta
} }
// Need to update the cap values for host of the finishedTask // Need to update the cap values for host of the finishedTask
bpMaxMinPistonCappingMutex.Lock() bpMaxMinPistonCappingMutex.Lock()
bpMaxMinPistonCappingCapValues[hostOfFinishedTask] -= ((wattsConsideration * constants.CapMargin) / s.totalPower[hostOfFinishedTask]) * 100 bpMaxMinPistonCappingCapValues[hostOfFinishedTask] -= ((wattsConsideration * constants.Tolerance) / s.totalPower[hostOfFinishedTask]) * 100
// Checking to see if the cap value has become 0, in which case we uncap the host. // Checking to see if the cap value has become 0, in which case we uncap the host.
if int(math.Floor(bpMaxMinPistonCappingCapValues[hostOfFinishedTask]+0.5)) == 0 { if int(math.Floor(bpMaxMinPistonCappingCapValues[hostOfFinishedTask]+0.5)) == 0 {
bpMaxMinPistonCappingCapValues[hostOfFinishedTask] = 100 bpMaxMinPistonCappingCapValues[hostOfFinishedTask] = 100

View file

@ -16,7 +16,6 @@ import (
"math" "math"
"os" "os"
"sort" "sort"
"strings"
"sync" "sync"
"time" "time"
) )
@ -35,7 +34,6 @@ func (s *BPSWMaxMinProacCC) takeOffer(offer *mesos.Offer, task def.Task) bool {
if cpus >= task.CPU && mem >= task.RAM && (!s.wattsAsAResource || (watts >= wattsConsideration)) { if cpus >= task.CPU && mem >= task.RAM && (!s.wattsAsAResource || (watts >= wattsConsideration)) {
return true return true
} }
return false return false
} }
@ -233,7 +231,8 @@ func (s *BPSWMaxMinProacCC) stopRecapping() {
// Determine if the remaining space inside of the offer is enough for // Determine if the remaining space inside of the offer is enough for
// the task we need to create. If it is, create TaskInfo and return it. // the task we need to create. If it is, create TaskInfo and return it.
func (s *BPSWMaxMinProacCC) CheckFit(i int, func (s *BPSWMaxMinProacCC) CheckFit(
i int,
task def.Task, task def.Task,
wattsConsideration float64, wattsConsideration float64,
offer *mesos.Offer, offer *mesos.Offer,
@ -241,12 +240,8 @@ func (s *BPSWMaxMinProacCC) CheckFit(i int,
totalRAM *float64, totalRAM *float64,
totalWatts *float64) (bool, *mesos.TaskInfo) { totalWatts *float64) (bool, *mesos.TaskInfo) {
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
// Does the task fit // Does the task fit
if (!s.wattsAsAResource || (offerWatts >= (*totalWatts + wattsConsideration))) && if s.takeOffer(offer, task) {
(offerCPU >= (*totalCPU + task.CPU)) &&
(offerRAM >= (*totalRAM + task.RAM)) {
// Capping the cluster if haven't yet started // Capping the cluster if haven't yet started
if !s.isCapping { if !s.isCapping {
@ -345,12 +340,9 @@ func (s *BPSWMaxMinProacCC) ResourceOffers(driver sched.SchedulerDriver, offers
// Error in determining wattsConsideration // Error in determining wattsConsideration
log.Fatal(err) log.Fatal(err)
} }
// Check host if it exists // Don't take offer if it doesn't match our task's host requirement
if task.Host != "" { if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
// Don't take offer if it doesn't match our task's host requirement continue
if !strings.HasPrefix(*offer.Hostname, task.Host) {
continue
}
} }
// TODO: Fix this so index doesn't need to be passed // TODO: Fix this so index doesn't need to be passed
@ -373,12 +365,9 @@ func (s *BPSWMaxMinProacCC) ResourceOffers(driver sched.SchedulerDriver, offers
log.Fatal(err) log.Fatal(err)
} }
// Check host if it exists // Don't take offer if it doesn't match our task's host requirement
if task.Host != "" { if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
// Don't take offer if it doesn't match our task's host requirement continue
if !strings.HasPrefix(*offer.Hostname, task.Host) {
continue
}
} }
for *task.Instances > 0 { for *task.Instances > 0 {

View file

@ -11,7 +11,6 @@ import (
sched "github.com/mesos/mesos-go/scheduler" sched "github.com/mesos/mesos-go/scheduler"
"log" "log"
"os" "os"
"strings"
"time" "time"
) )
@ -138,12 +137,9 @@ func (s *FirstFit) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.
for i := 0; i < len(s.tasks); i++ { for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i] task := s.tasks[i]
// Check host if it exists // Don't take offer if it doesn't match our task's host requirement
if task.Host != "" { if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
// Don't take offer if it doesn't match our task's host requirement continue
if !strings.HasPrefix(*offer.Hostname, task.Host) {
continue
}
} }
// Decision to take the offer or not // Decision to take the offer or not

View file

@ -15,7 +15,6 @@ import (
"log" "log"
"math" "math"
"os" "os"
"strings"
"sync" "sync"
"time" "time"
) )
@ -271,8 +270,8 @@ func (s *FirstFitProacCC) ResourceOffers(driver sched.SchedulerDriver, offers []
for i := 0; i < len(s.tasks); i++ { for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i] task := s.tasks[i]
// Don't take offer if it doesn't match our task's host requirement. // Don't take offer if it doesn't match our task's host requirement
if !strings.HasPrefix(*offer.Hostname, task.Host) { if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue continue
} }

View file

@ -12,7 +12,6 @@ import (
"log" "log"
"os" "os"
"sort" "sort"
"strings"
"time" "time"
) )
@ -150,12 +149,9 @@ func (s *FirstFitSortedOffers) ResourceOffers(driver sched.SchedulerDriver, offe
for i := 0; i < len(s.tasks); i++ { for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i] task := s.tasks[i]
// Check host if it exists // Don't take offer if it doesn't match our task's host requirement
if task.Host != "" { if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
// Don't take offer if it doesn't match our task's host requirement continue
if !strings.HasPrefix(*offer.Hostname, task.Host) {
continue
}
} }
// Decision to take the offer or not // Decision to take the offer or not

View file

@ -26,7 +26,6 @@ import (
"math" "math"
"os" "os"
"sort" "sort"
"strings"
"sync" "sync"
"time" "time"
) )
@ -287,9 +286,8 @@ func (s *FirstFitSortedWattsProacCC) ResourceOffers(driver sched.SchedulerDriver
for i := 0; i < len(s.tasks); i++ { for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i] task := s.tasks[i]
// Don't take offer if it doesn't match our task's host requirement
// Don't take offer if it doesn't match our task's host requirement. if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
if !strings.HasPrefix(*offer.Hostname, task.Host) {
continue continue
} }

View file

@ -12,7 +12,6 @@ import (
"log" "log"
"os" "os"
"sort" "sort"
"strings"
"time" "time"
) )
@ -154,12 +153,9 @@ func (s *FirstFitSortedWattsSortedOffers) ResourceOffers(driver sched.SchedulerD
for i := 0; i < len(s.tasks); i++ { for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i] task := s.tasks[i]
// Check host if it exists // Don't take offer if it doesn't match our task's host requirement
if task.Host != "" { if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
// Don't take offer if it doesn't match our task's host requirement continue
if !strings.HasPrefix(*offer.Hostname, task.Host) {
continue
}
} }
// Decision to take the offer or not // Decision to take the offer or not

View file

@ -12,7 +12,6 @@ import (
"log" "log"
"os" "os"
"sort" "sort"
"strings"
"time" "time"
) )
@ -141,12 +140,9 @@ func (s *FirstFitSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offer
for i := 0; i < len(s.tasks); i++ { for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i] task := s.tasks[i]
// Check host if it exists // Don't take offer if it doesn't match our task's host requirement
if task.Host != "" { if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
// Don't take offer if it doesn't match our task's host requirement continue
if !strings.HasPrefix(*offer.Hostname, task.Host) {
continue
}
} }
// Decision to take the offer or not // Decision to take the offer or not

View file

@ -11,7 +11,6 @@ import (
sched "github.com/mesos/mesos-go/scheduler" sched "github.com/mesos/mesos-go/scheduler"
"log" "log"
"os" "os"
"strings"
"time" "time"
) )
@ -131,12 +130,9 @@ func (s *FirstFitWattsOnly) ResourceOffers(driver sched.SchedulerDriver, offers
for i := 0; i < len(s.tasks); i++ { for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i] task := s.tasks[i]
// Check host if it exists // Don't take offer if it doesn't match our task's host requirement
if task.Host != "" { if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
// Don't take offer if it doesn't match our task's host requirement continue
if !strings.HasPrefix(*offer.Hostname, task.Host) {
continue
}
} }
// Decision to take the offer or not // Decision to take the offer or not

View file

@ -1,9 +1,9 @@
package schedulers package schedulers
import ( import (
"bitbucket.org/sunybingcloud/electron/constants"
"fmt" "fmt"
"log" "log"
"bitbucket.org/sunybingcloud/electron/constants"
) )
func coLocated(tasks map[string]bool) { func coLocated(tasks map[string]bool) {
@ -24,4 +24,3 @@ func hostToPowerClass(hostName string) string {
} }
return "" return ""
} }

View file

@ -26,6 +26,30 @@ This was done to give a little more room for the large tasks (power intensive) f
starvation of power intensive tasks. starvation of power intensive tasks.
*/ */
func (s *TopHeavy) takeOfferBinPack(offer *mesos.Offer, totalCPU, totalRAM, totalWatts,
wattsToConsider float64, task def.Task) bool {
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
//TODO: Insert watts calculation here instead of taking them as a parameter
if (!s.wattsAsAResource || (offerWatts >= (totalWatts + wattsToConsider))) &&
(offerCPU >= (totalCPU + task.CPU)) &&
(offerRAM >= (totalRAM + task.RAM)) {
return true
}
return false
}
func (s *TopHeavy) takeOfferFirstFit(offer *mesos.Offer, wattsConsideration float64, task def.Task) bool {
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
//TODO: Insert watts calculation here instead of taking them as a parameter
if (!s.wattsAsAResource || (offerWatts >= wattsConsideration)) &&
(offerCPU >= task.CPU) && (offerRAM >= task.RAM) {
return true
}
return false
}
// electronScheduler implements the Scheduler interface // electronScheduler implements the Scheduler interface
type TopHeavy struct { type TopHeavy struct {
base // Type embedded to inherit common functions base // Type embedded to inherit common functions
@ -148,7 +172,6 @@ func (s *TopHeavy) pack(offers []*mesos.Offer, driver sched.SchedulerDriver) {
} }
tasks := []*mesos.TaskInfo{} tasks := []*mesos.TaskInfo{}
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
totalWatts := 0.0 totalWatts := 0.0
totalCPU := 0.0 totalCPU := 0.0
totalRAM := 0.0 totalRAM := 0.0
@ -165,9 +188,7 @@ func (s *TopHeavy) pack(offers []*mesos.Offer, driver sched.SchedulerDriver) {
// Does the task fit // Does the task fit
// OR lazy evaluation. If ignore watts is set to true, second statement won't // OR lazy evaluation. If ignore watts is set to true, second statement won't
// be evaluated. // be evaluated.
if (!s.wattsAsAResource || (offerWatts >= (totalWatts + wattsConsideration))) && if s.takeOfferBinPack(offer, totalCPU, totalRAM, totalWatts, wattsConsideration, task) {
(offerCPU >= (totalCPU + task.CPU)) &&
(offerRAM >= (totalRAM + task.RAM)) {
taken = true taken = true
totalWatts += wattsConsideration totalWatts += wattsConsideration
totalCPU += task.CPU totalCPU += task.CPU
@ -213,7 +234,6 @@ func (s *TopHeavy) spread(offers []*mesos.Offer, driver sched.SchedulerDriver) {
} }
tasks := []*mesos.TaskInfo{} tasks := []*mesos.TaskInfo{}
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
offerTaken := false offerTaken := false
for i := 0; i < len(s.largeTasks); i++ { for i := 0; i < len(s.largeTasks); i++ {
task := s.largeTasks[i] task := s.largeTasks[i]
@ -224,8 +244,7 @@ func (s *TopHeavy) spread(offers []*mesos.Offer, driver sched.SchedulerDriver) {
} }
// Decision to take the offer or not // Decision to take the offer or not
if (!s.wattsAsAResource || (offerWatts >= wattsConsideration)) && if s.takeOfferFirstFit(offer, wattsConsideration, task) {
(offerCPU >= task.CPU) && (offerRAM >= task.RAM) {
offerTaken = true offerTaken = true
tasks = append(tasks, s.createTaskInfoAndLogSchedTrace(offer, task)) tasks = append(tasks, s.createTaskInfoAndLogSchedTrace(offer, task))
log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname())

View file

@ -2,6 +2,7 @@ package offerUtils
import ( import (
mesos "github.com/mesos/mesos-go/mesosproto" mesos "github.com/mesos/mesos-go/mesosproto"
"strings"
) )
func OfferAgg(offer *mesos.Offer) (float64, float64, float64) { func OfferAgg(offer *mesos.Offer) (float64, float64, float64) {
@ -32,6 +33,8 @@ func PowerClass(offer *mesos.Offer) string {
return powerClass return powerClass
} }
// Implements the sort.Sort interface to sort Offers based on CPU.
// TODO: Have a generic sorter that sorts based on a defined requirement (CPU, RAM, DISK or Watts)
type OffersSorter []*mesos.Offer type OffersSorter []*mesos.Offer
func (offersSorter OffersSorter) Len() int { func (offersSorter OffersSorter) Len() int {
@ -49,3 +52,11 @@ func (offersSorter OffersSorter) Less(i, j int) bool {
cpu2, _, _ := OfferAgg(offersSorter[j]) cpu2, _, _ := OfferAgg(offersSorter[j])
return cpu1 <= cpu2 return cpu1 <= cpu2
} }
// Is there a mismatch between the task's host requirement and the host corresponding to the offer.
func HostMismatch(offerHost string, taskHost string) bool {
if taskHost != "" && !strings.HasPrefix(offerHost, taskHost) {
return true
}
return false
}

View file

@ -45,4 +45,3 @@ func OrderedKeys(plist PairList) ([]string, error) {
} }
return orderedKeys, nil return orderedKeys, nil
} }