diff --git a/scheduler.go b/scheduler.go index 8f5feb3..4c0c14c 100644 --- a/scheduler.go +++ b/scheduler.go @@ -21,6 +21,15 @@ var ( longFilter = &mesos.Filters{RefuseSeconds: proto.Float64(1000)} ) +func CoLocated(tasks map[string]bool) { + + for task := range tasks { + log.Println(task) + } + + fmt.Println("---------------------") +} + func OfferAgg(offer *mesos.Offer) (float64, float64, float64) { var cpus, mem, watts float64 @@ -58,8 +67,9 @@ type electronScheduler struct { tasksCreated int tasksRunning int tasks []Task + metrics map[string]Metric + running map[string]map[string]bool - dockerExecutor *mesos.ExecutorInfo // This channel is closed when the program receives an interrupt, // signalling that the program should shut down. @@ -73,25 +83,30 @@ type electronScheduler struct { func newElectronScheduler(tasks []Task) *electronScheduler { s := &electronScheduler{ - - dockerExecutor: &mesos.ExecutorInfo{ - ExecutorId: &mesos.ExecutorID{Value: proto.String("docker-runner")}, - Name: proto.String("Runner"), - }, tasks: tasks, shutdown: make(chan struct{}), done: make(chan struct{}), + running: make(map[string]map[string]bool), } return s } func (s *electronScheduler) newTask(offer *mesos.Offer, task Task) *mesos.TaskInfo { - taskID := s.tasksCreated + taskID := fmt.Sprintf("Electron-%s-%d", task.Name, *task.Instances) s.tasksCreated++ + + // If this is our first time running into this Agent + if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok { + s.running[offer.GetSlaveId().GoString()] = make(map[string]bool) + } + + // Add task to list of tasks running on node + s.running[offer.GetSlaveId().GoString()][taskID] = true + return &mesos.TaskInfo{ - Name: proto.String(fmt.Sprintf("Electron-%s-%d", task.Name, *task.Instances)), + Name: proto.String(taskID), TaskId: &mesos.TaskID{ - Value: proto.String(fmt.Sprintf("Electron-%d", taskID)), + Value: proto.String(taskID), }, SlaveId: offer.SlaveId, Resources: []*mesos.Resource{ @@ -150,6 +165,10 @@ func (s *electronScheduler) ResourceOffers(driver sched.SchedulerDriver, offers for i, task := range s.tasks { // Decision to take the offer or not if TakeOffer(offer, task) { + + log.Println("Co-Located with: ") + CoLocated(s.running[offer.GetSlaveId().GoString()]) + tasks = append(tasks, s.newTask(offer, task)) log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) @@ -195,6 +214,7 @@ func (s *electronScheduler) StatusUpdate(driver sched.SchedulerDriver, status *m if *status.State == mesos.TaskState_TASK_RUNNING { s.tasksRunning++ } else if IsTerminal(status.State) { + delete(s.running[status.GetSlaveId().GoString()],*status.TaskId.Value) s.tasksRunning-- if s.tasksRunning == 0 { select { @@ -214,13 +234,7 @@ func (s *electronScheduler) FrameworkMessage( message string) { log.Println("Getting a framework message: ", message) - switch *executorID.Value { - case *s.dockerExecutor.ExecutorId.Value: - log.Print("Received framework message ", message) - - default: - log.Printf("Received a framework message from some unknown source: %s", *executorID.Value) - } + log.Printf("Received a framework message from some unknown source: %s", *executorID.Value) } func (s *electronScheduler) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) {