diff --git a/README.md b/README.md index 801a7d4..b0154c7 100644 --- a/README.md +++ b/README.md @@ -3,8 +3,43 @@ Electron: A power budget manager To Do: - * Define schema for what workload would look like - * Add queue for jobs to be executed * Create metrics for each task launched [Time to schedule, run time, power used] * Have calibration phase? - * Add ability to use constraints + * Add ability to use constraints + * Running average calculations https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average + + + +** Requires Performance-Copilot tool pmdumptext ** + + + +How to run: + +`./electron -workload ` + + +Workload schema: + +``` +[ + { + "name": "minife", + "cpu": 3.0, + "ram": 4096, + "watts": 50, + "image": "gouravr/minife:v5", + "cmd": "cd src && mpirun -np 1 miniFE.x -nx 100 -ny 100 -nz 100", + "inst": 9 + }, + { + "name": "dgemm", + "cpu": 3.0, + "ram": 4096, + "watts": 50, + "image": "gouravr/dgemm:v2", + "cmd": "/./mt-dgemm 1024", + "inst": 9 + } +] +``` \ No newline at end of file diff --git a/pcp/pcp.go b/pcp/pcp.go index 4388ebe..b9743ab 100644 --- a/pcp/pcp.go +++ b/pcp/pcp.go @@ -1,61 +1,91 @@ -package main +package pcp import ( "bufio" - "fmt" "log" "os/exec" - "strings" + "time" + "os" ) -func main() { - const pcpCommand string = "pmdumptext -m -l -f '' -t 1.0 -d , -c config" // We always want the most granular +func Start(quit chan struct{}, logging *bool) { + const pcpCommand string = "pmdumptext -m -l -f '' -t 1.0 -d , -c config" cmd := exec.Command("sh", "-c", pcpCommand) - // time := time.Now().Format("200601021504") + startTime := time.Now().Format("20060102150405") + + + logFile, err := os.Create("./"+startTime+".pcplog") + if err != nil { + log.Fatal(err) + } + + defer logFile.Close() - // stdout, err := os.Create("./"+time+".txt") pipe, err := cmd.StdoutPipe() - + if err != nil { + log.Fatal(err) + } //cmd.Stdout = stdout scanner := bufio.NewScanner(pipe) - go func() { + go func(logging *bool) { // Get names of the columns scanner.Scan() + // Write to logfile + logFile.WriteString(scanner.Text() + "\n") + + /* headers := strings.Split(scanner.Text(), ",") for _, hostMetric := range headers { split := strings.Split(hostMetric, ":") fmt.Printf("Host %s: Metric: %s\n", split[0], split[1]) } + */ // Throw away first set of results scanner.Scan() seconds := 0 for scanner.Scan() { + + + if(*logging) { + log.Println("Logging PCP...") + logFile.WriteString(scanner.Text() + "\n") + } + + /* fmt.Printf("Second: %d\n", seconds) for i, val := range strings.Split(scanner.Text(), ",") { fmt.Printf("host metric: %s val: %s\n", headers[i], val) - } + }*/ seconds++ - fmt.Println("--------------------------------") + // fmt.Println("--------------------------------") } - }() + }(logging) + + log.Println("PCP logging started") - fmt.Println("PCP started: ") - if err != nil { - log.Fatal(err) - } if err := cmd.Start(); err != nil { log.Fatal(err) } + + select{ + case <- quit: + log.Println("Stopping PCP logging in 5 seconds") + time.Sleep(5 * time.Second) + cmd.Process.Kill() + return + } + + /* if err := cmd.Wait(); err != nil { log.Fatal(err) - } + }*/ } diff --git a/scheduler.go b/scheduler.go index 4c0c14c..5b0e080 100644 --- a/scheduler.go +++ b/scheduler.go @@ -10,6 +10,7 @@ import ( "log" "os" "time" + "bitbucket.org/bingcloud/electron/pcp" ) const ( @@ -66,17 +67,26 @@ func TakeOffer(offer *mesos.Offer, task Task) bool { type electronScheduler struct { tasksCreated int tasksRunning int - tasks []Task - metrics map[string]Metric - running map[string]map[string]bool + tasks []Task + metrics map[string]Metric + running map[string]map[string]bool + + + // First set of PCP values are garbage values, signal to logger to start recording after + // we actually schedule a task + recordPCP bool // This channel is closed when the program receives an interrupt, // signalling that the program should shut down. - shutdown chan struct{} + shutdown chan struct{} // This channel is closed after shutdown is closed, and only when all // outstanding tasks have been cleaned up - done chan struct{} + done chan struct{} + + + // Controls when to shutdown pcp logging + pcpLog chan struct{} } // New electron scheduler @@ -86,7 +96,9 @@ func newElectronScheduler(tasks []Task) *electronScheduler { tasks: tasks, shutdown: make(chan struct{}), done: make(chan struct{}), + pcpLog: make(chan struct{}), running: make(map[string]map[string]bool), + recordPCP: false, } return s } @@ -95,6 +107,12 @@ func (s *electronScheduler) newTask(offer *mesos.Offer, task Task) *mesos.TaskIn taskID := fmt.Sprintf("Electron-%s-%d", task.Name, *task.Instances) s.tasksCreated++ + if !s.recordPCP { + // Turn on logging + s.recordPCP = true + time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts + } + // If this is our first time running into this Agent if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok { s.running[offer.GetSlaveId().GoString()] = make(map[string]bool) @@ -253,7 +271,7 @@ func (s *electronScheduler) Error(_ sched.SchedulerDriver, err string) { func main() { master := flag.String("master", "xavier:5050", "Location of leading Mesos master") - tasksFile := flag.String("tasks", "", "JSON file containing task definitions") + tasksFile := flag.String("workload", "", "JSON file containing task definitions") flag.Parse() @@ -287,16 +305,23 @@ func main() { return } + go pcp.Start(scheduler.pcpLog, &scheduler.recordPCP) + time.Sleep(1 * time.Second) + // Catch interrupt go func() { + // Signals we have scheduled every task we have select { case <-scheduler.shutdown: // case <-time.After(shutdownTimeout): } + // Signals all tasks have finished select { case <-scheduler.done: + close(scheduler.pcpLog) + time.Sleep(5 * time.Second) //Wait for PCP to log a few more seconds // case <-time.After(shutdownTimeout): }