retrofitted to use offerUtils.PowerClass(...) instead of inlining the code in every scheduler. Reduced redundant code.

This commit is contained in:
Pradyumna Kaushik 2017-01-31 15:33:31 -05:00
parent 04d722d20f
commit 84cdea08fc
5 changed files with 35 additions and 59 deletions

View file

@ -89,7 +89,7 @@ func NewFirstFitSortedWattsClassMapWattsProacCC(tasks []def.Task, ignoreWatts bo
// mutex
var ffswClassMapWattsProacCCMutex sync.Mutex
func (s *FirstFitSortedWattsClassMapWattsProacCC) newTask(offer *mesos.Offer, task def.Task, newTaskClass string) *mesos.TaskInfo {
func (s *FirstFitSortedWattsClassMapWattsProacCC) newTask(offer *mesos.Offer, task def.Task, powerClass string) *mesos.TaskInfo {
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
s.tasksCreated++
@ -121,7 +121,7 @@ func (s *FirstFitSortedWattsClassMapWattsProacCC) newTask(offer *mesos.Offer, ta
}
if !s.ignoreWatts {
resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[newTaskClass]))
resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[powerClass]))
}
return &mesos.TaskInfo{
@ -278,16 +278,11 @@ func (s *FirstFitSortedWattsClassMapWattsProacCC) ResourceOffers(driver sched.Sc
}
}
// Retrieving the node class from the offer
var nodeClass string
for _, attr := range offer.GetAttributes() {
if attr.GetName() == "class" {
nodeClass = attr.GetText().GetValue()
}
}
// retrieving the powerClass for the offer
powerClass := offerUtils.PowerClass(offer)
// Decision to take the offer or not
if (s.ignoreWatts || (offerWatts >= task.ClassToWatts[nodeClass])) &&
if (s.ignoreWatts || (offerWatts >= task.ClassToWatts[powerClass])) &&
(offerCPU >= task.CPU) && (offerRAM >= task.RAM) {
// Capping the cluster if haven't yet started
@ -298,7 +293,7 @@ func (s *FirstFitSortedWattsClassMapWattsProacCC) ResourceOffers(driver sched.Sc
s.startCapping()
}
fmt.Println("Watts being used: ", task.ClassToWatts[nodeClass])
fmt.Println("Watts being used: ", task.ClassToWatts[powerClass])
tempCap, err := s.capper.FCFSDeterminedCap(s.totalPower, &task)
if err == nil {
ffswClassMapWattsProacCCMutex.Lock()
@ -312,7 +307,7 @@ func (s *FirstFitSortedWattsClassMapWattsProacCC) ResourceOffers(driver sched.Sc
log.Println("Co-Located with: ")
coLocated(s.running[offer.GetSlaveId().GoString()])
taskToSchedule := s.newTask(offer, task, nodeClass)
taskToSchedule := s.newTask(offer, task, powerClass)
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, []*mesos.TaskInfo{taskToSchedule}, mesosUtils.DefaultFilter)