diff --git a/pcp/pcp.go b/pcp/pcp.go index 0c8d0e7..4329477 100644 --- a/pcp/pcp.go +++ b/pcp/pcp.go @@ -6,11 +6,13 @@ import ( "os/exec" "time" "os" + "syscall" ) func Start(quit chan struct{}, logging *bool, prefix string) { const pcpCommand string = "pmdumptext -m -l -f '' -t 1.0 -d , -c config" cmd := exec.Command("sh", "-c", pcpCommand) + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} startTime := time.Now().Format("20060102150405") @@ -76,16 +78,16 @@ func Start(quit chan struct{}, logging *bool, prefix string) { log.Fatal(err) } + pgid, err := syscall.Getpgid(cmd.Process.Pid) + select{ case <- quit: log.Println("Stopping PCP logging in 5 seconds") time.Sleep(5 * time.Second) - cmd.Process.Kill() + + // http://stackoverflow.com/questions/22470193/why-wont-go-kill-a-child-process-correctly + // kill process and all children processes + syscall.Kill(-pgid, 15) return } - - /* - if err := cmd.Wait(); err != nil { - log.Fatal(err) - }*/ } diff --git a/scheduler.go b/scheduler.go index 742cc4a..c3a92eb 100644 --- a/scheduler.go +++ b/scheduler.go @@ -12,6 +12,7 @@ import ( "time" "bitbucket.org/bingcloud/electron/pcp" "strings" + "os/signal" ) const ( @@ -143,6 +144,7 @@ func (s *electronScheduler) newTask(offer *mesos.Offer, task Task) *mesos.TaskIn Type: mesos.ContainerInfo_DOCKER.Enum(), Docker: &mesos.ContainerInfo_DockerInfo{ Image: proto.String(task.Image), + Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated }, }, @@ -170,7 +172,7 @@ func (s *electronScheduler) ResourceOffers(driver sched.SchedulerDriver, offers for _, offer := range offers { select { case <-s.shutdown: - log.Println("Shutting down: declining offer on [", offer.GetHostname(), "]") + log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") driver.DeclineOffer(offer.Id, longFilter) log.Println("Number of tasks still running: ", s.tasksRunning) @@ -214,15 +216,12 @@ func (s *electronScheduler) ResourceOffers(driver sched.SchedulerDriver, offers s.tasks[i] = s.tasks[len(s.tasks)-1] s.tasks = s.tasks[:len(s.tasks)-1] - if(len(s.tasks) <= 0) { log.Println("Done scheduling all tasks") close(s.shutdown) } } - break // Offer taken, move on - } } @@ -332,7 +331,21 @@ func main() { go pcp.Start(scheduler.pcpLog, &scheduler.recordPCP, *pcplogPrefix) time.Sleep(1 * time.Second) + // Attempt to handle signint to not leave pmdumptext running // Catch interrupt + go func() { + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, os.Kill) + s := <-c + if s != os.Interrupt { + close(scheduler.pcpLog) + return + } + + log.Printf("Received SIGINT...stopping") + close(scheduler.done) + }() + go func() { // Signals we have scheduled every task we have @@ -341,7 +354,7 @@ func main() { // case <-time.After(shutdownTimeout): } - // Signals all tasks have finished + // All tasks have finished select { case <-scheduler.done: close(scheduler.pcpLog)