From 3679510909849a29857fe99dc1792470378b00c2 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Sat, 17 Sep 2016 18:55:35 -0400 Subject: [PATCH] Electron now launches a series of benchmarks and then shuts down when everything has been sucessfully scheduled --- scheduler.go | 44 +++++++++++++++++++++++++------------------- task.go | 12 ++++++------ workload_1.json | 2 +- 3 files changed, 32 insertions(+), 26 deletions(-) diff --git a/scheduler.go b/scheduler.go index 3dbddb7..36a9e55 100644 --- a/scheduler.go +++ b/scheduler.go @@ -9,7 +9,6 @@ import ( sched "github.com/mesos/mesos-go/scheduler" "log" "os" - "os/signal" "time" ) @@ -125,6 +124,7 @@ func (s *electronScheduler) Disconnected(sched.SchedulerDriver) { func (s *electronScheduler) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { log.Printf("Received %d resource offers", len(offers)) + for _, offer := range offers { select { case <-s.shutdown: @@ -137,10 +137,6 @@ func (s *electronScheduler) ResourceOffers(driver sched.SchedulerDriver, offers default: } - if(len(s.tasks) <= 0) { - log.Println("Done with scheduling all tasks...") - os.Exit(0) - } tasks := []*mesos.TaskInfo{} @@ -153,17 +149,22 @@ func (s *electronScheduler) ResourceOffers(driver sched.SchedulerDriver, offers tasks = append(tasks, s.newTask(offer, task)) driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) - // Delete scheduled task - s.tasks[i] = s.tasks[len(s.tasks)-1] - s.tasks = s.tasks[:len(s.tasks)-1] - taken = true + fmt.Println("Inst: ", *task.Instances) + *task.Instances-- + + if *task.Instances <= 0 { + // All instances of task have been scheduled + s.tasks[i] = s.tasks[len(s.tasks)-1] + s.tasks = s.tasks[:len(s.tasks)-1] + taken = true + } } } // If there was no match for the task if !taken { - fmt.Println("There is enough resources to launch a task!") + fmt.Println("There is not enough resources to launch a task!") driver.DeclineOffer(offer.Id, defaultFilter) } @@ -234,6 +235,11 @@ func main() { os.Exit(1) } + log.Println("Scheduling the following tasks:") + for _, task := range tasks { + fmt.Println(task) + } + scheduler := newElectronScheduler(tasks) driver, err := sched.NewMesosSchedulerDriver(sched.DriverConfig{ Master: *master, @@ -250,23 +256,23 @@ func main() { // Catch interrupt go func() { - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, os.Kill) - s := <-c - if s != os.Interrupt { - return - } - log.Println("Electron is shutting down") - close(scheduler.shutdown) + for { + if (len(scheduler.tasks) <= 0) { + log.Println("Done with all tasks, shutting down") + close(scheduler.shutdown) + break + } + } select { case <-scheduler.done: - case <-time.After(shutdownTimeout): +// case <-time.After(shutdownTimeout): } // Done shutting down driver.Stop(false) + }() log.Printf("Starting...") diff --git a/task.go b/task.go index 467379c..2014932 100644 --- a/task.go +++ b/task.go @@ -7,12 +7,12 @@ import ( ) type Task struct { - CPU float64 `json: "cpu"` - RAM float64 `json: "ram"` - Watts float64 `json: "watts"` - Image string `json: "image"` - CMD string `json: "cmd"` - Instances int `default 1, json: "inst"` + CPU float64 `json:"cpu"` + RAM float64 `json:"ram"` + Watts float64 `json:"watts"` + Image string `json:"image"` + CMD string `json:"cmd"` + Instances *int `json:"inst"` } func TasksFromJSON(uri string) ([]Task, error) { diff --git a/workload_1.json b/workload_1.json index db50981..e4bb9ed 100644 --- a/workload_1.json +++ b/workload_1.json @@ -5,7 +5,7 @@ "watts": 50, "image": "gouravr/minife:v5", "cmd": "cd src && mpirun -np 1 miniFE.x -nx 100 -ny 100 -nz 100", - "inst": 1 + "inst": 10 } ]