diff --git a/scheduler.go b/scheduler.go index 81640fe..8f5feb3 100644 --- a/scheduler.go +++ b/scheduler.go @@ -18,11 +18,10 @@ const ( var ( defaultFilter = &mesos.Filters{RefuseSeconds: proto.Float64(1)} + longFilter = &mesos.Filters{RefuseSeconds: proto.Float64(1000)} ) -// Decides if to take an offer or not -func offerDecision(offer *mesos.Offer, task Task) bool { - +func OfferAgg(offer *mesos.Offer) (float64, float64, float64) { var cpus, mem, watts float64 for _, resource := range offer.Resources { @@ -36,10 +35,16 @@ func offerDecision(offer *mesos.Offer, task Task) bool { } } + return cpus, mem, watts +} + +// Decides if to take an offer or not +func TakeOffer(offer *mesos.Offer, task Task) bool { + + cpus, mem, watts := OfferAgg(offer) //TODO: Insert watts calculation here instead of taking them as a parameter - if cpus >= task.CPU && mem >= task.RAM && watts >= task.Watts { return true } @@ -84,7 +89,7 @@ func (s *electronScheduler) newTask(offer *mesos.Offer, task Task) *mesos.TaskIn taskID := s.tasksCreated s.tasksCreated++ return &mesos.TaskInfo{ - Name: proto.String("Electron_" + fmt.Sprintf("Electron-%d", taskID)), + Name: proto.String(fmt.Sprintf("Electron-%s-%d", task.Name, *task.Instances)), TaskId: &mesos.TaskID{ Value: proto.String(fmt.Sprintf("Electron-%d", taskID)), }, @@ -129,12 +134,9 @@ func (s *electronScheduler) ResourceOffers(driver sched.SchedulerDriver, offers select { case <-s.shutdown: log.Println("Shutting down: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, defaultFilter) + driver.DeclineOffer(offer.Id, longFilter) - log.Println("Number of tasks running: ", s.tasksRunning) - if s.tasksRunning == 0 { - close(s.done) - } + log.Println("Number of tasks still running: ", s.tasksRunning) continue default: } @@ -147,8 +149,10 @@ func (s *electronScheduler) ResourceOffers(driver sched.SchedulerDriver, offers taken := false for i, task := range s.tasks { // Decision to take the offer or not - if offerDecision(offer, task) { + if TakeOffer(offer, task) { tasks = append(tasks, s.newTask(offer, task)) + + log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) taken = true @@ -157,21 +161,28 @@ func (s *electronScheduler) ResourceOffers(driver sched.SchedulerDriver, offers *task.Instances-- if *task.Instances <= 0 { - fmt.Println("Tasks left: ", len(s.tasks)-1) - fmt.Println("Position: ", i) - // All instances of task have been scheduled + // All instances of task have been scheduled, remove it s.tasks[i] = s.tasks[len(s.tasks)-1] s.tasks = s.tasks[:len(s.tasks)-1] + + + if(len(s.tasks) <= 0) { + log.Println("Done scheduling all tasks") + close(s.shutdown) + } } - break + break // Offer taken, move on } } // If there was no match for the task if !taken { - fmt.Println("There is not enough resources to launch a task!") + fmt.Println("There is not enough resources to launch a task:") + cpus, mem, watts := OfferAgg(offer) + + log.Printf("\n", cpus, mem, watts) driver.DeclineOffer(offer.Id, defaultFilter) } @@ -193,6 +204,7 @@ func (s *electronScheduler) StatusUpdate(driver sched.SchedulerDriver, status *m } } } + log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) } func (s *electronScheduler) FrameworkMessage( @@ -264,12 +276,9 @@ func main() { // Catch interrupt go func() { - for { - if (len(scheduler.tasks) <= 0) { - log.Println("Done with all tasks, shutting down") - close(scheduler.shutdown) - break - } + select { + case <-scheduler.shutdown: + // case <-time.After(shutdownTimeout): } select { diff --git a/task.go b/task.go index 2014932..9d4bd01 100644 --- a/task.go +++ b/task.go @@ -7,6 +7,7 @@ import ( ) type Task struct { + Name string `json:"name"` CPU float64 `json:"cpu"` RAM float64 `json:"ram"` Watts float64 `json:"watts"`