retrofitted to use offerUtils.PowerClass(...) instead of inlining the code in every scheduler. Reduced redundant code. Changed name of newTaskClass in newTask(...) to powerClass.
This commit is contained in:
parent
04d722d20f
commit
3af1d561c2
5 changed files with 35 additions and 59 deletions
|
@ -78,7 +78,7 @@ func NewBPSWClassMapWatts(tasks []def.Task, ignoreWatts bool, schedTracePrefix s
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *BPSWClassMapWatts) newTask(offer *mesos.Offer, task def.Task, newTaskClass string) *mesos.TaskInfo {
|
func (s *BPSWClassMapWatts) newTask(offer *mesos.Offer, task def.Task, powerClass string) *mesos.TaskInfo {
|
||||||
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
|
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
|
||||||
s.tasksCreated++
|
s.tasksCreated++
|
||||||
|
|
||||||
|
@ -102,7 +102,7 @@ func (s *BPSWClassMapWatts) newTask(offer *mesos.Offer, task def.Task, newTaskCl
|
||||||
}
|
}
|
||||||
|
|
||||||
if !s.ignoreWatts {
|
if !s.ignoreWatts {
|
||||||
resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[newTaskClass]))
|
resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[powerClass]))
|
||||||
}
|
}
|
||||||
|
|
||||||
return &mesos.TaskInfo{
|
return &mesos.TaskInfo{
|
||||||
|
@ -159,27 +159,22 @@ func (s *BPSWClassMapWatts) ResourceOffers(driver sched.SchedulerDriver, offers
|
||||||
}
|
}
|
||||||
|
|
||||||
for *task.Instances > 0 {
|
for *task.Instances > 0 {
|
||||||
var nodeClass string
|
powerClass := offerUtils.PowerClass(offer)
|
||||||
for _, attr := range offer.GetAttributes() {
|
|
||||||
if attr.GetName() == "class" {
|
|
||||||
nodeClass = attr.GetText().GetValue()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Does the task fit
|
// Does the task fit
|
||||||
// OR lazy evaluation. If ignore watts is set to true, second statement won't
|
// OR lazy evaluation. If ignore watts is set to true, second statement won't
|
||||||
// be evaluated.
|
// be evaluated.
|
||||||
if (s.ignoreWatts || (offerWatts >= (totalWatts + task.ClassToWatts[nodeClass]))) &&
|
if (s.ignoreWatts || (offerWatts >= (totalWatts + task.ClassToWatts[powerClass]))) &&
|
||||||
(offerCPU >= (totalCPU + task.CPU)) &&
|
(offerCPU >= (totalCPU + task.CPU)) &&
|
||||||
(offerRAM >= (totalRAM + task.RAM)) {
|
(offerRAM >= (totalRAM + task.RAM)) {
|
||||||
|
|
||||||
fmt.Println("Watts being used: ", task.ClassToWatts[nodeClass])
|
fmt.Println("Watts being used: ", task.ClassToWatts[powerClass])
|
||||||
taken = true
|
taken = true
|
||||||
totalWatts += task.ClassToWatts[nodeClass]
|
totalWatts += task.ClassToWatts[powerClass]
|
||||||
totalCPU += task.CPU
|
totalCPU += task.CPU
|
||||||
totalRAM += task.RAM
|
totalRAM += task.RAM
|
||||||
log.Println("Co-Located with: ")
|
log.Println("Co-Located with: ")
|
||||||
coLocated(s.running[offer.GetSlaveId().GoString()])
|
coLocated(s.running[offer.GetSlaveId().GoString()])
|
||||||
taskToSchedule := s.newTask(offer, task, nodeClass)
|
taskToSchedule := s.newTask(offer, task, powerClass)
|
||||||
tasks = append(tasks, taskToSchedule)
|
tasks = append(tasks, taskToSchedule)
|
||||||
|
|
||||||
fmt.Println("Inst: ", *task.Instances)
|
fmt.Println("Inst: ", *task.Instances)
|
||||||
|
|
|
@ -91,7 +91,7 @@ func NewBPSWClassMapWattsPistonCapping(tasks []def.Task, ignoreWatts bool, sched
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *BPSWClassMapWattsPistonCapping) newTask(offer *mesos.Offer, task def.Task, newTaskClass string) *mesos.TaskInfo {
|
func (s *BPSWClassMapWattsPistonCapping) newTask(offer *mesos.Offer, task def.Task, powerClass string) *mesos.TaskInfo {
|
||||||
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
|
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
|
||||||
s.tasksCreated++
|
s.tasksCreated++
|
||||||
|
|
||||||
|
@ -125,7 +125,7 @@ func (s *BPSWClassMapWattsPistonCapping) newTask(offer *mesos.Offer, task def.Ta
|
||||||
}
|
}
|
||||||
|
|
||||||
if !s.ignoreWatts {
|
if !s.ignoreWatts {
|
||||||
resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[newTaskClass]))
|
resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[powerClass]))
|
||||||
}
|
}
|
||||||
|
|
||||||
return &mesos.TaskInfo{
|
return &mesos.TaskInfo{
|
||||||
|
@ -260,16 +260,11 @@ func (s *BPSWClassMapWattsPistonCapping) ResourceOffers(driver sched.SchedulerDr
|
||||||
}
|
}
|
||||||
|
|
||||||
for *task.Instances > 0 {
|
for *task.Instances > 0 {
|
||||||
var nodeClass string
|
powerClass := offerUtils.PowerClass(offer)
|
||||||
for _, attr := range offer.GetAttributes() {
|
|
||||||
if attr.GetName() == "class" {
|
|
||||||
nodeClass = attr.GetText().GetValue()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Does the task fit
|
// Does the task fit
|
||||||
// OR lazy evaluation. If ignoreWatts is set to true, second statement won't
|
// OR lazy evaluation. If ignoreWatts is set to true, second statement won't
|
||||||
// be evaluated
|
// be evaluated
|
||||||
if (s.ignoreWatts || (offerWatts >= (totalWatts + task.ClassToWatts[nodeClass]))) &&
|
if (s.ignoreWatts || (offerWatts >= (totalWatts + task.ClassToWatts[powerClass]))) &&
|
||||||
(offerCPU >= (totalCPU + task.CPU)) &&
|
(offerCPU >= (totalCPU + task.CPU)) &&
|
||||||
(offerRAM >= (totalRAM + task.RAM)) {
|
(offerRAM >= (totalRAM + task.RAM)) {
|
||||||
|
|
||||||
|
@ -279,14 +274,14 @@ func (s *BPSWClassMapWattsPistonCapping) ResourceOffers(driver sched.SchedulerDr
|
||||||
s.startCapping()
|
s.startCapping()
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Println("Watts being used: ", task.ClassToWatts[nodeClass])
|
fmt.Println("Watts being used: ", task.ClassToWatts[powerClass])
|
||||||
taken = true
|
taken = true
|
||||||
totalWatts += task.ClassToWatts[nodeClass]
|
totalWatts += task.ClassToWatts[powerClass]
|
||||||
totalCPU += task.CPU
|
totalCPU += task.CPU
|
||||||
totalRAM += task.RAM
|
totalRAM += task.RAM
|
||||||
log.Println("Co-Located with: ")
|
log.Println("Co-Located with: ")
|
||||||
coLocated(s.running[offer.GetSlaveId().GoString()])
|
coLocated(s.running[offer.GetSlaveId().GoString()])
|
||||||
taskToSchedule := s.newTask(offer, task, nodeClass)
|
taskToSchedule := s.newTask(offer, task, powerClass)
|
||||||
tasks = append(tasks, taskToSchedule)
|
tasks = append(tasks, taskToSchedule)
|
||||||
|
|
||||||
fmt.Println("Inst: ", *task.Instances)
|
fmt.Println("Inst: ", *task.Instances)
|
||||||
|
|
|
@ -101,7 +101,7 @@ func NewBPSWClassMapWattsProacCC(tasks []def.Task, ignoreWatts bool, schedTraceP
|
||||||
// mutex
|
// mutex
|
||||||
var bpswClassMapWattsProacCCMutex sync.Mutex
|
var bpswClassMapWattsProacCCMutex sync.Mutex
|
||||||
|
|
||||||
func (s *BPSWClassMapWattsProacCC) newTask(offer *mesos.Offer, task def.Task, newTaskClass string) *mesos.TaskInfo {
|
func (s *BPSWClassMapWattsProacCC) newTask(offer *mesos.Offer, task def.Task, powerClass string) *mesos.TaskInfo {
|
||||||
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
|
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
|
||||||
s.tasksCreated++
|
s.tasksCreated++
|
||||||
|
|
||||||
|
@ -133,7 +133,7 @@ func (s *BPSWClassMapWattsProacCC) newTask(offer *mesos.Offer, task def.Task, ne
|
||||||
}
|
}
|
||||||
|
|
||||||
if !s.ignoreWatts {
|
if !s.ignoreWatts {
|
||||||
resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[newTaskClass]))
|
resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[powerClass]))
|
||||||
}
|
}
|
||||||
|
|
||||||
return &mesos.TaskInfo{
|
return &mesos.TaskInfo{
|
||||||
|
@ -295,16 +295,11 @@ func (s *BPSWClassMapWattsProacCC) ResourceOffers(driver sched.SchedulerDriver,
|
||||||
}
|
}
|
||||||
|
|
||||||
for *task.Instances > 0 {
|
for *task.Instances > 0 {
|
||||||
var nodeClass string
|
powerClass := offerUtils.PowerClass(offer)
|
||||||
for _, attr := range offer.GetAttributes() {
|
|
||||||
if attr.GetName() == "class" {
|
|
||||||
nodeClass = attr.GetText().GetValue()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Does the task fit
|
// Does the task fit
|
||||||
// OR Lazy evaluation. If ignore watts is set to true, second statement won't
|
// OR Lazy evaluation. If ignore watts is set to true, second statement won't
|
||||||
// be evaluated.
|
// be evaluated.
|
||||||
if (s.ignoreWatts || (offerWatts >= (totalWatts + task.ClassToWatts[nodeClass]))) &&
|
if (s.ignoreWatts || (offerWatts >= (totalWatts + task.ClassToWatts[powerClass]))) &&
|
||||||
(offerCPU >= (totalCPU + task.CPU)) &&
|
(offerCPU >= (totalCPU + task.CPU)) &&
|
||||||
(offerRAM >= (totalRAM + task.RAM)) {
|
(offerRAM >= (totalRAM + task.RAM)) {
|
||||||
|
|
||||||
|
@ -316,7 +311,7 @@ func (s *BPSWClassMapWattsProacCC) ResourceOffers(driver sched.SchedulerDriver,
|
||||||
s.startCapping()
|
s.startCapping()
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Println("Watts being used: ", task.ClassToWatts[nodeClass])
|
fmt.Println("Watts being used: ", task.ClassToWatts[powerClass])
|
||||||
tempCap, err := s.capper.FCFSDeterminedCap(s.totalPower, &task)
|
tempCap, err := s.capper.FCFSDeterminedCap(s.totalPower, &task)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
bpswClassMapWattsProacCCMutex.Lock()
|
bpswClassMapWattsProacCCMutex.Lock()
|
||||||
|
@ -327,12 +322,12 @@ func (s *BPSWClassMapWattsProacCC) ResourceOffers(driver sched.SchedulerDriver,
|
||||||
log.Println(err)
|
log.Println(err)
|
||||||
}
|
}
|
||||||
taken = true
|
taken = true
|
||||||
totalWatts += task.ClassToWatts[nodeClass]
|
totalWatts += task.ClassToWatts[powerClass]
|
||||||
totalCPU += task.CPU
|
totalCPU += task.CPU
|
||||||
totalRAM += task.RAM
|
totalRAM += task.RAM
|
||||||
log.Println("Co-Located with: ")
|
log.Println("Co-Located with: ")
|
||||||
coLocated(s.running[offer.GetSlaveId().GoString()])
|
coLocated(s.running[offer.GetSlaveId().GoString()])
|
||||||
taskToSchedule := s.newTask(offer, task, nodeClass)
|
taskToSchedule := s.newTask(offer, task, powerClass)
|
||||||
tasks = append(tasks, taskToSchedule)
|
tasks = append(tasks, taskToSchedule)
|
||||||
|
|
||||||
fmt.Println("Inst: ", *task.Instances)
|
fmt.Println("Inst: ", *task.Instances)
|
||||||
|
|
|
@ -65,7 +65,7 @@ func NewFirstFitSortedWattsClassMapWatts(tasks []def.Task, ignoreWatts bool, sch
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *FirstFitSortedWattsClassMapWatts) newTask(offer *mesos.Offer, task def.Task, newTaskClass string) *mesos.TaskInfo {
|
func (s *FirstFitSortedWattsClassMapWatts) newTask(offer *mesos.Offer, task def.Task, powerClass string) *mesos.TaskInfo {
|
||||||
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
|
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
|
||||||
s.tasksCreated++
|
s.tasksCreated++
|
||||||
|
|
||||||
|
@ -89,7 +89,7 @@ func (s *FirstFitSortedWattsClassMapWatts) newTask(offer *mesos.Offer, task def.
|
||||||
}
|
}
|
||||||
|
|
||||||
if !s.ignoreWatts {
|
if !s.ignoreWatts {
|
||||||
resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[newTaskClass]))
|
resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[powerClass]))
|
||||||
}
|
}
|
||||||
|
|
||||||
return &mesos.TaskInfo{
|
return &mesos.TaskInfo{
|
||||||
|
@ -140,21 +140,17 @@ func (s *FirstFitSortedWattsClassMapWatts) ResourceOffers(driver sched.Scheduler
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// retrieving the node class from the offer
|
// retrieving the powerClass from the offer
|
||||||
var nodeClass string
|
powerClass := offerUtils.PowerClass(offer)
|
||||||
for _, attr := range offer.GetAttributes() {
|
|
||||||
if attr.GetName() == "class" {
|
|
||||||
nodeClass = attr.GetText().GetValue()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Decision to take the offer or not
|
// Decision to take the offer or not
|
||||||
if (s.ignoreWatts || (offerWatts >= task.ClassToWatts[nodeClass])) &&
|
if (s.ignoreWatts || (offerWatts >= task.ClassToWatts[powerClass])) &&
|
||||||
(offerCPU >= task.CPU) && (offerRAM >= task.RAM) {
|
(offerCPU >= task.CPU) && (offerRAM >= task.RAM) {
|
||||||
|
fmt.Println("Watts being used: ", task.ClassToWatts[powerClass])
|
||||||
log.Println("Co-Located with: ")
|
log.Println("Co-Located with: ")
|
||||||
coLocated(s.running[offer.GetSlaveId().GoString()])
|
coLocated(s.running[offer.GetSlaveId().GoString()])
|
||||||
|
|
||||||
taskToSchedule := s.newTask(offer, task, nodeClass)
|
taskToSchedule := s.newTask(offer, task, powerClass)
|
||||||
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
|
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
|
||||||
log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname())
|
log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname())
|
||||||
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, []*mesos.TaskInfo{taskToSchedule}, mesosUtils.DefaultFilter)
|
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, []*mesos.TaskInfo{taskToSchedule}, mesosUtils.DefaultFilter)
|
||||||
|
|
|
@ -89,7 +89,7 @@ func NewFirstFitSortedWattsClassMapWattsProacCC(tasks []def.Task, ignoreWatts bo
|
||||||
// mutex
|
// mutex
|
||||||
var ffswClassMapWattsProacCCMutex sync.Mutex
|
var ffswClassMapWattsProacCCMutex sync.Mutex
|
||||||
|
|
||||||
func (s *FirstFitSortedWattsClassMapWattsProacCC) newTask(offer *mesos.Offer, task def.Task, newTaskClass string) *mesos.TaskInfo {
|
func (s *FirstFitSortedWattsClassMapWattsProacCC) newTask(offer *mesos.Offer, task def.Task, powerClass string) *mesos.TaskInfo {
|
||||||
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
|
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
|
||||||
s.tasksCreated++
|
s.tasksCreated++
|
||||||
|
|
||||||
|
@ -121,7 +121,7 @@ func (s *FirstFitSortedWattsClassMapWattsProacCC) newTask(offer *mesos.Offer, ta
|
||||||
}
|
}
|
||||||
|
|
||||||
if !s.ignoreWatts {
|
if !s.ignoreWatts {
|
||||||
resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[newTaskClass]))
|
resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[powerClass]))
|
||||||
}
|
}
|
||||||
|
|
||||||
return &mesos.TaskInfo{
|
return &mesos.TaskInfo{
|
||||||
|
@ -278,16 +278,11 @@ func (s *FirstFitSortedWattsClassMapWattsProacCC) ResourceOffers(driver sched.Sc
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Retrieving the node class from the offer
|
// retrieving the powerClass for the offer
|
||||||
var nodeClass string
|
powerClass := offerUtils.PowerClass(offer)
|
||||||
for _, attr := range offer.GetAttributes() {
|
|
||||||
if attr.GetName() == "class" {
|
|
||||||
nodeClass = attr.GetText().GetValue()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Decision to take the offer or not
|
// Decision to take the offer or not
|
||||||
if (s.ignoreWatts || (offerWatts >= task.ClassToWatts[nodeClass])) &&
|
if (s.ignoreWatts || (offerWatts >= task.ClassToWatts[powerClass])) &&
|
||||||
(offerCPU >= task.CPU) && (offerRAM >= task.RAM) {
|
(offerCPU >= task.CPU) && (offerRAM >= task.RAM) {
|
||||||
|
|
||||||
// Capping the cluster if haven't yet started
|
// Capping the cluster if haven't yet started
|
||||||
|
@ -298,7 +293,7 @@ func (s *FirstFitSortedWattsClassMapWattsProacCC) ResourceOffers(driver sched.Sc
|
||||||
s.startCapping()
|
s.startCapping()
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Println("Watts being used: ", task.ClassToWatts[nodeClass])
|
fmt.Println("Watts being used: ", task.ClassToWatts[powerClass])
|
||||||
tempCap, err := s.capper.FCFSDeterminedCap(s.totalPower, &task)
|
tempCap, err := s.capper.FCFSDeterminedCap(s.totalPower, &task)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
ffswClassMapWattsProacCCMutex.Lock()
|
ffswClassMapWattsProacCCMutex.Lock()
|
||||||
|
@ -312,7 +307,7 @@ func (s *FirstFitSortedWattsClassMapWattsProacCC) ResourceOffers(driver sched.Sc
|
||||||
log.Println("Co-Located with: ")
|
log.Println("Co-Located with: ")
|
||||||
coLocated(s.running[offer.GetSlaveId().GoString()])
|
coLocated(s.running[offer.GetSlaveId().GoString()])
|
||||||
|
|
||||||
taskToSchedule := s.newTask(offer, task, nodeClass)
|
taskToSchedule := s.newTask(offer, task, powerClass)
|
||||||
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
|
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
|
||||||
log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname())
|
log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname())
|
||||||
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, []*mesos.TaskInfo{taskToSchedule}, mesosUtils.DefaultFilter)
|
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, []*mesos.TaskInfo{taskToSchedule}, mesosUtils.DefaultFilter)
|
||||||
|
|
Reference in a new issue