From 3cb60d0ca27019bfcf9ec5038d9ad182e929d977 Mon Sep 17 00:00:00 2001
From: Renan DelValle <rdelval1@binghamton.edu>
Date: Thu, 22 Sep 2016 20:20:22 -0400
Subject: [PATCH] Detection of co-scheduled benchmarks is complete

---
 scheduler.go | 46 ++++++++++++++++++++++++++++++----------------
 1 file changed, 30 insertions(+), 16 deletions(-)

diff --git a/scheduler.go b/scheduler.go
index 8f5feb3..4c0c14c 100644
--- a/scheduler.go
+++ b/scheduler.go
@@ -21,6 +21,15 @@ var (
 	longFilter = &mesos.Filters{RefuseSeconds: proto.Float64(1000)}
 )
 
+func CoLocated(tasks map[string]bool) {
+
+	for task := range tasks {
+		log.Println(task)
+	}
+
+	fmt.Println("---------------------")
+}
+
 func OfferAgg(offer *mesos.Offer) (float64, float64, float64) {
 	var cpus, mem, watts float64
 
@@ -58,8 +67,9 @@ type electronScheduler struct {
 	tasksCreated int
 	tasksRunning int
 	tasks []Task
+	metrics map[string]Metric
+	running map[string]map[string]bool
 
-	dockerExecutor *mesos.ExecutorInfo
 
 	// This channel is closed when the program receives an interrupt,
 	// signalling that the program should shut down.
@@ -73,25 +83,30 @@ type electronScheduler struct {
 func newElectronScheduler(tasks []Task) *electronScheduler {
 
 	s := &electronScheduler{
-
-		dockerExecutor: &mesos.ExecutorInfo{
-			ExecutorId: &mesos.ExecutorID{Value: proto.String("docker-runner")},
-			Name: proto.String("Runner"),
-		},
 		tasks: tasks,
 		shutdown: make(chan struct{}),
 		done:     make(chan struct{}),
+		running: make(map[string]map[string]bool),
 	}
 	return s
 }
 
 func (s *electronScheduler) newTask(offer *mesos.Offer, task Task) *mesos.TaskInfo {
-	taskID := s.tasksCreated
+	taskID := fmt.Sprintf("Electron-%s-%d", task.Name, *task.Instances)
 	s.tasksCreated++
+
+	// If this is our first time running into this Agent
+	if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
+		s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
+	}
+
+	// Add task to list of tasks running on node
+	s.running[offer.GetSlaveId().GoString()][taskID] = true
+
 	return &mesos.TaskInfo{
-		Name: proto.String(fmt.Sprintf("Electron-%s-%d", task.Name, *task.Instances)),
+		Name: proto.String(taskID),
 		TaskId: &mesos.TaskID{
-			Value: proto.String(fmt.Sprintf("Electron-%d", taskID)),
+			Value: proto.String(taskID),
 		},
 		SlaveId: offer.SlaveId,
 		Resources: []*mesos.Resource{
@@ -150,6 +165,10 @@ func (s *electronScheduler) ResourceOffers(driver sched.SchedulerDriver, offers
 		for i, task := range s.tasks {
 			// Decision to take the offer or not
 			if TakeOffer(offer, task) {
+
+				log.Println("Co-Located with: ")
+				CoLocated(s.running[offer.GetSlaveId().GoString()])
+
 				tasks = append(tasks, s.newTask(offer, task))
 
 				log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname())
@@ -195,6 +214,7 @@ func (s *electronScheduler) StatusUpdate(driver sched.SchedulerDriver, status *m
 	if *status.State == mesos.TaskState_TASK_RUNNING {
 		s.tasksRunning++
 	} else if IsTerminal(status.State) {
+		delete(s.running[status.GetSlaveId().GoString()],*status.TaskId.Value)
 		s.tasksRunning--
 		if s.tasksRunning == 0 {
 			select {
@@ -214,13 +234,7 @@ func (s *electronScheduler) FrameworkMessage(
 	message string) {
 
 	log.Println("Getting a framework message: ", message)
-	switch *executorID.Value {
-	case *s.dockerExecutor.ExecutorId.Value:
-		log.Print("Received framework message ", message)
-
-	default:
-		log.Printf("Received a framework message from some unknown source: %s", *executorID.Value)
-	}
+	log.Printf("Received a framework message from some unknown source: %s", *executorID.Value)
 }
 
 func (s *electronScheduler) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) {