Electron now launches a series of benchmarks and then shuts down when everything has been sucessfully scheduled

This commit is contained in:
Renan DelValle 2016-09-17 18:55:35 -04:00
parent 6a3716f4e4
commit 3679510909
3 changed files with 32 additions and 26 deletions

View file

@ -9,7 +9,6 @@ import (
sched "github.com/mesos/mesos-go/scheduler" sched "github.com/mesos/mesos-go/scheduler"
"log" "log"
"os" "os"
"os/signal"
"time" "time"
) )
@ -125,6 +124,7 @@ func (s *electronScheduler) Disconnected(sched.SchedulerDriver) {
func (s *electronScheduler) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { func (s *electronScheduler) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {
log.Printf("Received %d resource offers", len(offers)) log.Printf("Received %d resource offers", len(offers))
for _, offer := range offers { for _, offer := range offers {
select { select {
case <-s.shutdown: case <-s.shutdown:
@ -137,10 +137,6 @@ func (s *electronScheduler) ResourceOffers(driver sched.SchedulerDriver, offers
default: default:
} }
if(len(s.tasks) <= 0) {
log.Println("Done with scheduling all tasks...")
os.Exit(0)
}
tasks := []*mesos.TaskInfo{} tasks := []*mesos.TaskInfo{}
@ -153,17 +149,22 @@ func (s *electronScheduler) ResourceOffers(driver sched.SchedulerDriver, offers
tasks = append(tasks, s.newTask(offer, task)) tasks = append(tasks, s.newTask(offer, task))
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter)
// Delete scheduled task fmt.Println("Inst: ", *task.Instances)
s.tasks[i] = s.tasks[len(s.tasks)-1] *task.Instances--
s.tasks = s.tasks[:len(s.tasks)-1]
taken = true if *task.Instances <= 0 {
// All instances of task have been scheduled
s.tasks[i] = s.tasks[len(s.tasks)-1]
s.tasks = s.tasks[:len(s.tasks)-1]
taken = true
}
} }
} }
// If there was no match for the task // If there was no match for the task
if !taken { if !taken {
fmt.Println("There is enough resources to launch a task!") fmt.Println("There is not enough resources to launch a task!")
driver.DeclineOffer(offer.Id, defaultFilter) driver.DeclineOffer(offer.Id, defaultFilter)
} }
@ -234,6 +235,11 @@ func main() {
os.Exit(1) os.Exit(1)
} }
log.Println("Scheduling the following tasks:")
for _, task := range tasks {
fmt.Println(task)
}
scheduler := newElectronScheduler(tasks) scheduler := newElectronScheduler(tasks)
driver, err := sched.NewMesosSchedulerDriver(sched.DriverConfig{ driver, err := sched.NewMesosSchedulerDriver(sched.DriverConfig{
Master: *master, Master: *master,
@ -250,23 +256,23 @@ func main() {
// Catch interrupt // Catch interrupt
go func() { go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, os.Kill)
s := <-c
if s != os.Interrupt {
return
}
log.Println("Electron is shutting down") for {
close(scheduler.shutdown) if (len(scheduler.tasks) <= 0) {
log.Println("Done with all tasks, shutting down")
close(scheduler.shutdown)
break
}
}
select { select {
case <-scheduler.done: case <-scheduler.done:
case <-time.After(shutdownTimeout): // case <-time.After(shutdownTimeout):
} }
// Done shutting down // Done shutting down
driver.Stop(false) driver.Stop(false)
}() }()
log.Printf("Starting...") log.Printf("Starting...")

12
task.go
View file

@ -7,12 +7,12 @@ import (
) )
type Task struct { type Task struct {
CPU float64 `json: "cpu"` CPU float64 `json:"cpu"`
RAM float64 `json: "ram"` RAM float64 `json:"ram"`
Watts float64 `json: "watts"` Watts float64 `json:"watts"`
Image string `json: "image"` Image string `json:"image"`
CMD string `json: "cmd"` CMD string `json:"cmd"`
Instances int `default 1, json: "inst"` Instances *int `json:"inst"`
} }
func TasksFromJSON(uri string) ([]Task, error) { func TasksFromJSON(uri string) ([]Task, error) {

View file

@ -5,7 +5,7 @@
"watts": 50, "watts": 50,
"image": "gouravr/minife:v5", "image": "gouravr/minife:v5",
"cmd": "cd src && mpirun -np 1 miniFE.x -nx 100 -ny 100 -nz 100", "cmd": "cd src && mpirun -np 1 miniFE.x -nx 100 -ny 100 -nz 100",
"inst": 1 "inst": 10
} }
] ]