Fixed deadlock issue while running a large number of benchmarks. Changed names of tasks to indicate what they are running. Added name to task schema to append it to the name more easily.

This commit is contained in:
Renan DelValle 2016-09-19 20:25:10 -04:00
parent 8ddfe8e585
commit e0dc0c7368
2 changed files with 32 additions and 22 deletions

View file

@ -18,11 +18,10 @@ const (
var ( var (
defaultFilter = &mesos.Filters{RefuseSeconds: proto.Float64(1)} defaultFilter = &mesos.Filters{RefuseSeconds: proto.Float64(1)}
longFilter = &mesos.Filters{RefuseSeconds: proto.Float64(1000)}
) )
// Decides if to take an offer or not func OfferAgg(offer *mesos.Offer) (float64, float64, float64) {
func offerDecision(offer *mesos.Offer, task Task) bool {
var cpus, mem, watts float64 var cpus, mem, watts float64
for _, resource := range offer.Resources { for _, resource := range offer.Resources {
@ -36,10 +35,16 @@ func offerDecision(offer *mesos.Offer, task Task) bool {
} }
} }
return cpus, mem, watts
}
// Decides if to take an offer or not
func TakeOffer(offer *mesos.Offer, task Task) bool {
cpus, mem, watts := OfferAgg(offer)
//TODO: Insert watts calculation here instead of taking them as a parameter //TODO: Insert watts calculation here instead of taking them as a parameter
if cpus >= task.CPU && mem >= task.RAM && watts >= task.Watts { if cpus >= task.CPU && mem >= task.RAM && watts >= task.Watts {
return true return true
} }
@ -84,7 +89,7 @@ func (s *electronScheduler) newTask(offer *mesos.Offer, task Task) *mesos.TaskIn
taskID := s.tasksCreated taskID := s.tasksCreated
s.tasksCreated++ s.tasksCreated++
return &mesos.TaskInfo{ return &mesos.TaskInfo{
Name: proto.String("Electron_" + fmt.Sprintf("Electron-%d", taskID)), Name: proto.String(fmt.Sprintf("Electron-%s-%d", task.Name, *task.Instances)),
TaskId: &mesos.TaskID{ TaskId: &mesos.TaskID{
Value: proto.String(fmt.Sprintf("Electron-%d", taskID)), Value: proto.String(fmt.Sprintf("Electron-%d", taskID)),
}, },
@ -129,12 +134,9 @@ func (s *electronScheduler) ResourceOffers(driver sched.SchedulerDriver, offers
select { select {
case <-s.shutdown: case <-s.shutdown:
log.Println("Shutting down: declining offer on [", offer.GetHostname(), "]") log.Println("Shutting down: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, defaultFilter) driver.DeclineOffer(offer.Id, longFilter)
log.Println("Number of tasks running: ", s.tasksRunning) log.Println("Number of tasks still running: ", s.tasksRunning)
if s.tasksRunning == 0 {
close(s.done)
}
continue continue
default: default:
} }
@ -147,8 +149,10 @@ func (s *electronScheduler) ResourceOffers(driver sched.SchedulerDriver, offers
taken := false taken := false
for i, task := range s.tasks { for i, task := range s.tasks {
// Decision to take the offer or not // Decision to take the offer or not
if offerDecision(offer, task) { if TakeOffer(offer, task) {
tasks = append(tasks, s.newTask(offer, task)) tasks = append(tasks, s.newTask(offer, task))
log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter)
taken = true taken = true
@ -157,21 +161,28 @@ func (s *electronScheduler) ResourceOffers(driver sched.SchedulerDriver, offers
*task.Instances-- *task.Instances--
if *task.Instances <= 0 { if *task.Instances <= 0 {
fmt.Println("Tasks left: ", len(s.tasks)-1) // All instances of task have been scheduled, remove it
fmt.Println("Position: ", i)
// All instances of task have been scheduled
s.tasks[i] = s.tasks[len(s.tasks)-1] s.tasks[i] = s.tasks[len(s.tasks)-1]
s.tasks = s.tasks[:len(s.tasks)-1] s.tasks = s.tasks[:len(s.tasks)-1]
if(len(s.tasks) <= 0) {
log.Println("Done scheduling all tasks")
close(s.shutdown)
}
} }
break break // Offer taken, move on
} }
} }
// If there was no match for the task // If there was no match for the task
if !taken { if !taken {
fmt.Println("There is not enough resources to launch a task!") fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, defaultFilter) driver.DeclineOffer(offer.Id, defaultFilter)
} }
@ -193,6 +204,7 @@ func (s *electronScheduler) StatusUpdate(driver sched.SchedulerDriver, status *m
} }
} }
} }
log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
} }
func (s *electronScheduler) FrameworkMessage( func (s *electronScheduler) FrameworkMessage(
@ -264,12 +276,9 @@ func main() {
// Catch interrupt // Catch interrupt
go func() { go func() {
for { select {
if (len(scheduler.tasks) <= 0) { case <-scheduler.shutdown:
log.Println("Done with all tasks, shutting down") // case <-time.After(shutdownTimeout):
close(scheduler.shutdown)
break
}
} }
select { select {

View file

@ -7,6 +7,7 @@ import (
) )
type Task struct { type Task struct {
Name string `json:"name"`
CPU float64 `json:"cpu"` CPU float64 `json:"cpu"`
RAM float64 `json:"ram"` RAM float64 `json:"ram"`
Watts float64 `json:"watts"` Watts float64 `json:"watts"`