retrofitted all schedulers to call OfferAgg(...) and OffersSorter from utilities/offerUtils and also to use defaultFilter and longFilter from utilities/mesosUtils

This commit is contained in:
Pradyumna Kaushik 2017-01-28 19:40:39 -05:00
parent 354e89cac7
commit 8581749435
20 changed files with 182 additions and 143 deletions

View file

@ -16,11 +16,13 @@ import (
"strings"
"sync"
"time"
"bitbucket.org/sunybingcloud/electron/utilities/offerUtils"
"bitbucket.org/sunybingcloud/electron/utilities/mesosUtils"
)
// Decides if to take an offer or not
func (_ *ProactiveClusterwideCapFCFS) takeOffer(offer *mesos.Offer, task def.Task) bool {
offer_cpu, offer_mem, offer_watts := OfferAgg(offer)
offer_cpu, offer_mem, offer_watts := offerUtils.OfferAgg(offer)
if offer_cpu >= task.CPU && offer_mem >= task.RAM && offer_watts >= task.Watts {
return true
@ -240,7 +242,7 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive
// retrieving the available power for all the hosts in the offers.
for _, offer := range offers {
_, _, offer_watts := OfferAgg(offer)
_, _, offer_watts := offerUtils.OfferAgg(offer)
s.availablePower[*offer.Hostname] = offer_watts
// setting total power if the first time.
if _, ok := s.totalPower[*offer.Hostname]; !ok {
@ -256,7 +258,7 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, longFilter)
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
@ -305,7 +307,7 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive
log.Printf("Starting on [%s]\n", offer.GetHostname())
taskToSchedule := s.newTask(offer, task)
toSchedule := []*mesos.TaskInfo{taskToSchedule}
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, toSchedule, defaultFilter)
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, toSchedule, mesosUtils.DefaultFilter)
log.Printf("Inst: %d", *task.Instances)
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
*task.Instances--
@ -331,10 +333,10 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive
// If no task fit the offer, then declining the offer.
if !taken {
log.Printf("There is not enough resources to launch a task on Host: %s\n", offer.GetHostname())
cpus, mem, watts := OfferAgg(offer)
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, defaultFilter)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
}