From 046faac4b2099513f07415650ccbd4d6c9d9d137 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Sun, 16 Oct 2016 14:48:31 -0400 Subject: [PATCH] Binpacking algorithm based on 3 dimensions using a list of tasks sorted by watts --- ...{binpackwatts.go => binpacksortedwatts.go} | 40 +++++++++++-------- 1 file changed, 23 insertions(+), 17 deletions(-) rename schedulers/{binpackwatts.go => binpacksortedwatts.go} (79%) diff --git a/schedulers/binpackwatts.go b/schedulers/binpacksortedwatts.go similarity index 79% rename from schedulers/binpackwatts.go rename to schedulers/binpacksortedwatts.go index 9656ddc..d73b64c 100644 --- a/schedulers/binpackwatts.go +++ b/schedulers/binpacksortedwatts.go @@ -14,7 +14,7 @@ import ( ) // Decides if to take an offer or not -func (*BinPackWatts) takeOffer(offer *mesos.Offer, task def.Task) bool { +func (*BinPackSortedWatts) takeOffer(offer *mesos.Offer, task def.Task) bool { cpus, mem, watts := OfferAgg(offer) @@ -27,7 +27,7 @@ func (*BinPackWatts) takeOffer(offer *mesos.Offer, task def.Task) bool { return false } -type BinPackWatts struct { +type BinPackSortedWatts struct { tasksCreated int tasksRunning int tasks []def.Task @@ -51,10 +51,10 @@ type BinPackWatts struct { } // New electron scheduler -func NewBinPackWatts(tasks []def.Task, ignoreWatts bool) *BinPackWatts { +func NewBinPackSortedWatts(tasks []def.Task, ignoreWatts bool) *BinPackSortedWatts { sort.Sort(def.WattsSorter(tasks)) - s := &BinPackWatts{ + s := &BinPackSortedWatts{ tasks: tasks, ignoreWatts: ignoreWatts, Shutdown: make(chan struct{}), @@ -66,7 +66,7 @@ func NewBinPackWatts(tasks []def.Task, ignoreWatts bool) *BinPackWatts { return s } -func (s *BinPackWatts) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { +func (s *BinPackSortedWatts) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) s.tasksCreated++ @@ -113,22 +113,22 @@ func (s *BinPackWatts) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInf } } -func (s *BinPackWatts) Registered( +func (s *BinPackSortedWatts) Registered( _ sched.SchedulerDriver, frameworkID *mesos.FrameworkID, masterInfo *mesos.MasterInfo) { log.Printf("Framework %s registered with master %s", frameworkID, masterInfo) } -func (s *BinPackWatts) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) { +func (s *BinPackSortedWatts) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) { log.Printf("Framework re-registered with master %s", masterInfo) } -func (s *BinPackWatts) Disconnected(sched.SchedulerDriver) { +func (s *BinPackSortedWatts) Disconnected(sched.SchedulerDriver) { log.Println("Framework disconnected with master") } -func (s *BinPackWatts) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { +func (s *BinPackSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { log.Printf("Received %d resource offers", len(offers)) for _, offer := range offers { @@ -144,10 +144,12 @@ func (s *BinPackWatts) ResourceOffers(driver sched.SchedulerDriver, offers []*me tasks := []*mesos.TaskInfo{} - _, _, offer_watts := OfferAgg(offer) + offer_cpu, offer_ram, offer_watts := OfferAgg(offer) taken := false totalWatts := 0.0 + totalCPU := 0.0 + totalRAM := 0.0 for i, task := range s.tasks { // Check host if it exists @@ -160,10 +162,14 @@ func (s *BinPackWatts) ResourceOffers(driver sched.SchedulerDriver, offers []*me for *task.Instances > 0 { // Does the task fit - if offer_watts >= (totalWatts + task.Watts) { + if (s.ignoreWatts || offer_watts >= (totalWatts+task.Watts)) && + (offer_cpu >= (totalCPU + task.CPU)) && + (offer_ram >= (totalRAM + task.RAM)) { taken = true totalWatts += task.Watts + totalCPU += task.CPU + totalRAM += task.RAM log.Println("Co-Located with: ") coLocated(s.running[offer.GetSlaveId().GoString()]) tasks = append(tasks, s.newTask(offer, task)) @@ -201,7 +207,7 @@ func (s *BinPackWatts) ResourceOffers(driver sched.SchedulerDriver, offers []*me } } -func (s *BinPackWatts) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { +func (s *BinPackSortedWatts) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) if *status.State == mesos.TaskState_TASK_RUNNING { @@ -220,7 +226,7 @@ func (s *BinPackWatts) StatusUpdate(driver sched.SchedulerDriver, status *mesos. log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) } -func (s *BinPackWatts) FrameworkMessage( +func (s *BinPackSortedWatts) FrameworkMessage( driver sched.SchedulerDriver, executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, @@ -230,16 +236,16 @@ func (s *BinPackWatts) FrameworkMessage( log.Printf("Received a framework message from some unknown source: %s", *executorID.Value) } -func (s *BinPackWatts) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) { +func (s *BinPackSortedWatts) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) { log.Printf("Offer %s rescinded", offerID) } -func (s *BinPackWatts) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) { +func (s *BinPackSortedWatts) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) { log.Printf("Slave %s lost", slaveID) } -func (s *BinPackWatts) ExecutorLost(_ sched.SchedulerDriver, executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, status int) { +func (s *BinPackSortedWatts) ExecutorLost(_ sched.SchedulerDriver, executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, status int) { log.Printf("Executor %s on slave %s was lost", executorID, slaveID) } -func (s *BinPackWatts) Error(_ sched.SchedulerDriver, err string) { +func (s *BinPackSortedWatts) Error(_ sched.SchedulerDriver, err string) { log.Printf("Receiving an error: %s", err) }