PCP is now controlled by main scheduler. It will start recording upon the acceptance of the very first offer. pcp is now it's own package. README has been updated with instructions on how to create workloads and running instructions.

This commit is contained in:
Renan DelValle 2016-09-26 19:14:51 -04:00
parent 3cb60d0ca2
commit 2575c2a20b
3 changed files with 116 additions and 26 deletions

View file

@ -10,6 +10,7 @@ import (
"log"
"os"
"time"
"bitbucket.org/bingcloud/electron/pcp"
)
const (
@ -66,17 +67,26 @@ func TakeOffer(offer *mesos.Offer, task Task) bool {
type electronScheduler struct {
tasksCreated int
tasksRunning int
tasks []Task
metrics map[string]Metric
running map[string]map[string]bool
tasks []Task
metrics map[string]Metric
running map[string]map[string]bool
// First set of PCP values are garbage values, signal to logger to start recording after
// we actually schedule a task
recordPCP bool
// This channel is closed when the program receives an interrupt,
// signalling that the program should shut down.
shutdown chan struct{}
shutdown chan struct{}
// This channel is closed after shutdown is closed, and only when all
// outstanding tasks have been cleaned up
done chan struct{}
done chan struct{}
// Controls when to shutdown pcp logging
pcpLog chan struct{}
}
// New electron scheduler
@ -86,7 +96,9 @@ func newElectronScheduler(tasks []Task) *electronScheduler {
tasks: tasks,
shutdown: make(chan struct{}),
done: make(chan struct{}),
pcpLog: make(chan struct{}),
running: make(map[string]map[string]bool),
recordPCP: false,
}
return s
}
@ -95,6 +107,12 @@ func (s *electronScheduler) newTask(offer *mesos.Offer, task Task) *mesos.TaskIn
taskID := fmt.Sprintf("Electron-%s-%d", task.Name, *task.Instances)
s.tasksCreated++
if !s.recordPCP {
// Turn on logging
s.recordPCP = true
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
}
// If this is our first time running into this Agent
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
@ -253,7 +271,7 @@ func (s *electronScheduler) Error(_ sched.SchedulerDriver, err string) {
func main() {
master := flag.String("master", "xavier:5050", "Location of leading Mesos master")
tasksFile := flag.String("tasks", "", "JSON file containing task definitions")
tasksFile := flag.String("workload", "", "JSON file containing task definitions")
flag.Parse()
@ -287,16 +305,23 @@ func main() {
return
}
go pcp.Start(scheduler.pcpLog, &scheduler.recordPCP)
time.Sleep(1 * time.Second)
// Catch interrupt
go func() {
// Signals we have scheduled every task we have
select {
case <-scheduler.shutdown:
// case <-time.After(shutdownTimeout):
}
// Signals all tasks have finished
select {
case <-scheduler.done:
close(scheduler.pcpLog)
time.Sleep(5 * time.Second) //Wait for PCP to log a few more seconds
// case <-time.After(shutdownTimeout):
}