From 2575c2a20b0500341d7eb20c31873779cf961d82 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Mon, 26 Sep 2016 19:14:51 -0400 Subject: [PATCH] PCP is now controlled by main scheduler. It will start recording upon the acceptance of the very first offer. pcp is now it's own package. README has been updated with instructions on how to create workloads and running instructions. --- README.md | 41 ++++++++++++++++++++++++++++++--- pcp/pcp.go | 64 ++++++++++++++++++++++++++++++++++++++-------------- scheduler.go | 37 +++++++++++++++++++++++++----- 3 files changed, 116 insertions(+), 26 deletions(-) diff --git a/README.md b/README.md index 801a7d4..b0154c7 100644 --- a/README.md +++ b/README.md @@ -3,8 +3,43 @@ Electron: A power budget manager To Do: - * Define schema for what workload would look like - * Add queue for jobs to be executed * Create metrics for each task launched [Time to schedule, run time, power used] * Have calibration phase? - * Add ability to use constraints + * Add ability to use constraints + * Running average calculations https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average + + + +** Requires Performance-Copilot tool pmdumptext ** + + + +How to run: + +`./electron -workload ` + + +Workload schema: + +``` +[ + { + "name": "minife", + "cpu": 3.0, + "ram": 4096, + "watts": 50, + "image": "gouravr/minife:v5", + "cmd": "cd src && mpirun -np 1 miniFE.x -nx 100 -ny 100 -nz 100", + "inst": 9 + }, + { + "name": "dgemm", + "cpu": 3.0, + "ram": 4096, + "watts": 50, + "image": "gouravr/dgemm:v2", + "cmd": "/./mt-dgemm 1024", + "inst": 9 + } +] +``` \ No newline at end of file diff --git a/pcp/pcp.go b/pcp/pcp.go index 4388ebe..b9743ab 100644 --- a/pcp/pcp.go +++ b/pcp/pcp.go @@ -1,61 +1,91 @@ -package main +package pcp import ( "bufio" - "fmt" "log" "os/exec" - "strings" + "time" + "os" ) -func main() { - const pcpCommand string = "pmdumptext -m -l -f '' -t 1.0 -d , -c config" // We always want the most granular +func Start(quit chan struct{}, logging *bool) { + const pcpCommand string = "pmdumptext -m -l -f '' -t 1.0 -d , -c config" cmd := exec.Command("sh", "-c", pcpCommand) - // time := time.Now().Format("200601021504") + startTime := time.Now().Format("20060102150405") + + + logFile, err := os.Create("./"+startTime+".pcplog") + if err != nil { + log.Fatal(err) + } + + defer logFile.Close() - // stdout, err := os.Create("./"+time+".txt") pipe, err := cmd.StdoutPipe() - + if err != nil { + log.Fatal(err) + } //cmd.Stdout = stdout scanner := bufio.NewScanner(pipe) - go func() { + go func(logging *bool) { // Get names of the columns scanner.Scan() + // Write to logfile + logFile.WriteString(scanner.Text() + "\n") + + /* headers := strings.Split(scanner.Text(), ",") for _, hostMetric := range headers { split := strings.Split(hostMetric, ":") fmt.Printf("Host %s: Metric: %s\n", split[0], split[1]) } + */ // Throw away first set of results scanner.Scan() seconds := 0 for scanner.Scan() { + + + if(*logging) { + log.Println("Logging PCP...") + logFile.WriteString(scanner.Text() + "\n") + } + + /* fmt.Printf("Second: %d\n", seconds) for i, val := range strings.Split(scanner.Text(), ",") { fmt.Printf("host metric: %s val: %s\n", headers[i], val) - } + }*/ seconds++ - fmt.Println("--------------------------------") + // fmt.Println("--------------------------------") } - }() + }(logging) + + log.Println("PCP logging started") - fmt.Println("PCP started: ") - if err != nil { - log.Fatal(err) - } if err := cmd.Start(); err != nil { log.Fatal(err) } + + select{ + case <- quit: + log.Println("Stopping PCP logging in 5 seconds") + time.Sleep(5 * time.Second) + cmd.Process.Kill() + return + } + + /* if err := cmd.Wait(); err != nil { log.Fatal(err) - } + }*/ } diff --git a/scheduler.go b/scheduler.go index 4c0c14c..5b0e080 100644 --- a/scheduler.go +++ b/scheduler.go @@ -10,6 +10,7 @@ import ( "log" "os" "time" + "bitbucket.org/bingcloud/electron/pcp" ) const ( @@ -66,17 +67,26 @@ func TakeOffer(offer *mesos.Offer, task Task) bool { type electronScheduler struct { tasksCreated int tasksRunning int - tasks []Task - metrics map[string]Metric - running map[string]map[string]bool + tasks []Task + metrics map[string]Metric + running map[string]map[string]bool + + + // First set of PCP values are garbage values, signal to logger to start recording after + // we actually schedule a task + recordPCP bool // This channel is closed when the program receives an interrupt, // signalling that the program should shut down. - shutdown chan struct{} + shutdown chan struct{} // This channel is closed after shutdown is closed, and only when all // outstanding tasks have been cleaned up - done chan struct{} + done chan struct{} + + + // Controls when to shutdown pcp logging + pcpLog chan struct{} } // New electron scheduler @@ -86,7 +96,9 @@ func newElectronScheduler(tasks []Task) *electronScheduler { tasks: tasks, shutdown: make(chan struct{}), done: make(chan struct{}), + pcpLog: make(chan struct{}), running: make(map[string]map[string]bool), + recordPCP: false, } return s } @@ -95,6 +107,12 @@ func (s *electronScheduler) newTask(offer *mesos.Offer, task Task) *mesos.TaskIn taskID := fmt.Sprintf("Electron-%s-%d", task.Name, *task.Instances) s.tasksCreated++ + if !s.recordPCP { + // Turn on logging + s.recordPCP = true + time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts + } + // If this is our first time running into this Agent if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok { s.running[offer.GetSlaveId().GoString()] = make(map[string]bool) @@ -253,7 +271,7 @@ func (s *electronScheduler) Error(_ sched.SchedulerDriver, err string) { func main() { master := flag.String("master", "xavier:5050", "Location of leading Mesos master") - tasksFile := flag.String("tasks", "", "JSON file containing task definitions") + tasksFile := flag.String("workload", "", "JSON file containing task definitions") flag.Parse() @@ -287,16 +305,23 @@ func main() { return } + go pcp.Start(scheduler.pcpLog, &scheduler.recordPCP) + time.Sleep(1 * time.Second) + // Catch interrupt go func() { + // Signals we have scheduled every task we have select { case <-scheduler.shutdown: // case <-time.After(shutdownTimeout): } + // Signals all tasks have finished select { case <-scheduler.done: + close(scheduler.pcpLog) + time.Sleep(5 * time.Second) //Wait for PCP to log a few more seconds // case <-time.After(shutdownTimeout): }