From cd430eede00c0e7d71bcf673ee7c17de8c62bec2 Mon Sep 17 00:00:00 2001
From: Renan DelValle <rdelval1@binghamton.edu>
Date: Mon, 26 Sep 2016 19:14:51 -0400
Subject: [PATCH] PCP is now controlled by main scheduler. It will start
 recording upon the acceptance of the very first offer. pcp is now it's own
 package. README has been updated with instructions on how to create workloads
 and running instructions.

---
 README.md    | 41 ++++++++++++++++++++++++++++++---
 pcp/pcp.go   | 64 ++++++++++++++++++++++++++++++++++++++--------------
 scheduler.go | 37 +++++++++++++++++++++++++-----
 3 files changed, 116 insertions(+), 26 deletions(-)

diff --git a/README.md b/README.md
index 801a7d4..b0154c7 100644
--- a/README.md
+++ b/README.md
@@ -3,8 +3,43 @@ Electron: A power budget manager
 
 To Do:
 
- * Define schema for what workload would look like
- * Add queue for jobs to be executed
  * Create metrics for each task launched [Time to schedule, run time, power used]
  * Have calibration phase?
- * Add ability to use constraints
+ * Add ability to use constraints 
+ * Running average calculations https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
+
+ 
+
+** Requires Performance-Copilot tool pmdumptext **
+
+
+
+How to run:
+
+`./electron -workload <workload.json>`
+
+
+Workload schema:
+
+```
+[
+   {
+      "name": "minife",
+      "cpu": 3.0,
+      "ram": 4096,
+      "watts": 50,
+      "image": "gouravr/minife:v5",
+      "cmd": "cd src && mpirun -np 1 miniFE.x -nx 100 -ny 100 -nz 100",
+      "inst": 9
+   },
+   {
+      "name": "dgemm",
+      "cpu": 3.0,
+      "ram": 4096,
+      "watts": 50,
+      "image": "gouravr/dgemm:v2",
+      "cmd": "/./mt-dgemm 1024",
+      "inst": 9
+   }
+]
+```
\ No newline at end of file
diff --git a/pcp/pcp.go b/pcp/pcp.go
index 4388ebe..b9743ab 100644
--- a/pcp/pcp.go
+++ b/pcp/pcp.go
@@ -1,61 +1,91 @@
-package main
+package pcp
 
 import (
 	"bufio"
-	"fmt"
 	"log"
 	"os/exec"
-	"strings"
+	"time"
+	"os"
 )
 
-func main() {
-	const pcpCommand string = "pmdumptext -m -l -f '' -t 1.0 -d , -c config" // We always want the most granular
+func Start(quit chan struct{}, logging *bool) {
+	const pcpCommand string = "pmdumptext -m -l -f '' -t 1.0 -d , -c config"
 	cmd := exec.Command("sh", "-c", pcpCommand)
-	//	time := time.Now().Format("200601021504")
+	startTime := time.Now().Format("20060102150405")
+
+
+	logFile, err := os.Create("./"+startTime+".pcplog")
+	if err != nil {
+		log.Fatal(err)
+	}
+
+	defer logFile.Close()
 
-	//	stdout, err := os.Create("./"+time+".txt")
 	pipe, err := cmd.StdoutPipe()
-
+	if err != nil {
+		log.Fatal(err)
+	}
 	//cmd.Stdout = stdout
 
 	scanner := bufio.NewScanner(pipe)
 
-	go func() {
+	go func(logging *bool) {
 		// Get names of the columns
 		scanner.Scan()
 
+		// Write to logfile
+		logFile.WriteString(scanner.Text() + "\n")
+
+		/*
 		headers := strings.Split(scanner.Text(), ",")
 
 		for _, hostMetric := range headers {
 			split := strings.Split(hostMetric, ":")
 			fmt.Printf("Host %s: Metric: %s\n", split[0], split[1])
 		}
+		*/
 
 		// Throw away first set of results
 		scanner.Scan()
 
 		seconds := 0
 		for scanner.Scan() {
+
+
+			if(*logging) {
+				log.Println("Logging PCP...")
+				logFile.WriteString(scanner.Text() + "\n")
+			}
+
+			/*
 			fmt.Printf("Second: %d\n", seconds)
 			for i, val := range strings.Split(scanner.Text(), ",") {
 				fmt.Printf("host metric: %s val: %s\n", headers[i], val)
-			}
+			}*/
 
 			seconds++
 
-			fmt.Println("--------------------------------")
+			// fmt.Println("--------------------------------")
 		}
-	}()
+	}(logging)
+
+	log.Println("PCP logging started")
 
-	fmt.Println("PCP started: ")
 
-	if err != nil {
-		log.Fatal(err)
-	}
 	if err := cmd.Start(); err != nil {
 		log.Fatal(err)
 	}
+
+	select{
+	case <- quit:
+		log.Println("Stopping PCP logging in 5 seconds")
+		time.Sleep(5 * time.Second)
+		cmd.Process.Kill()
+		return
+	}
+
+		/*
 	if err := cmd.Wait(); err != nil {
 		log.Fatal(err)
-	}
+	}*/
 }
diff --git a/scheduler.go b/scheduler.go
index 4c0c14c..5b0e080 100644
--- a/scheduler.go
+++ b/scheduler.go
@@ -10,6 +10,7 @@ import (
 	"log"
 	"os"
 	"time"
+	"bitbucket.org/bingcloud/electron/pcp"
 )
 
 const (
@@ -66,17 +67,26 @@ func TakeOffer(offer *mesos.Offer, task Task) bool {
 type electronScheduler struct {
 	tasksCreated int
 	tasksRunning int
-	tasks []Task
-	metrics map[string]Metric
-	running map[string]map[string]bool
+	tasks        []Task
+	metrics      map[string]Metric
+	running      map[string]map[string]bool
+
+
+	// First set of PCP values are garbage values, signal to logger to start recording after
+	// we actually schedule a task
+	recordPCP    bool
 
 
 	// This channel is closed when the program receives an interrupt,
 	// signalling that the program should shut down.
-	shutdown chan struct{}
+	shutdown     chan struct{}
 	// This channel is closed after shutdown is closed, and only when all
 	// outstanding tasks have been cleaned up
-	done chan struct{}
+	done         chan struct{}
+
+
+	// Controls when to shutdown pcp logging
+	pcpLog       chan struct{}
 }
 
 // New electron scheduler
@@ -86,7 +96,9 @@ func newElectronScheduler(tasks []Task) *electronScheduler {
 		tasks: tasks,
 		shutdown: make(chan struct{}),
 		done:     make(chan struct{}),
+		pcpLog: make(chan struct{}),
 		running: make(map[string]map[string]bool),
+		recordPCP: false,
 	}
 	return s
 }
@@ -95,6 +107,12 @@ func (s *electronScheduler) newTask(offer *mesos.Offer, task Task) *mesos.TaskIn
 	taskID := fmt.Sprintf("Electron-%s-%d", task.Name, *task.Instances)
 	s.tasksCreated++
 
+	if !s.recordPCP {
+		// Turn on logging
+		s.recordPCP = true
+		time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
+	}
+
 	// If this is our first time running into this Agent
 	if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
 		s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
@@ -253,7 +271,7 @@ func (s *electronScheduler) Error(_ sched.SchedulerDriver, err string) {
 
 func main() {
 	master := flag.String("master", "xavier:5050", "Location of leading Mesos master")
-	tasksFile := flag.String("tasks", "", "JSON file containing task definitions")
+	tasksFile := flag.String("workload", "", "JSON file containing task definitions")
 	flag.Parse()
 
 
@@ -287,16 +305,23 @@ func main() {
 		return
 	}
 
+	go pcp.Start(scheduler.pcpLog, &scheduler.recordPCP)
+	time.Sleep(1 * time.Second)
+
 	// Catch interrupt
 	go func() {
 
+		// Signals we have scheduled every task we have
 		select {
 		case <-scheduler.shutdown:
 		//			case <-time.After(shutdownTimeout):
 		}
 
+		// Signals all tasks have finished
 		select {
 		case <-scheduler.done:
+			close(scheduler.pcpLog)
+			time.Sleep(5 * time.Second) //Wait for PCP to log a few more seconds
 //			case <-time.After(shutdownTimeout):
 		}