This repository has been archived on 2024-04-10. You can view files and clone it, but you cannot make any changes to it's state, such as pushing and creating new issues, pull requests or comments.
elektron/pcp/pcp.go
Pradyumna Kaushik 1e9828c35b Provide pcp config file from commandline.
Taking pcp config file name (or path) from commandline.
In scheduler.go, also checking to see if provided pcp config file
exists. If not, then exitting.
2018-10-04 13:57:35 -04:00

113 lines
3.1 KiB
Go

package pcp
import (
"bufio"
"fmt"
"log"
"os/exec"
"syscall"
"time"
"github.com/mesos/mesos-go/api/v0/scheduler"
"github.com/montanaflynn/stats"
elekLogDef "gitlab.com/spdf/elektron/logging/def"
"gitlab.com/spdf/elektron/schedulers"
)
func Start(quit chan struct{}, logging *bool, logMType chan elekLogDef.LogMessageType,
logMsg chan string, pcpConfigFile string, s scheduler.Scheduler) {
baseSchedRef := s.(*schedulers.BaseScheduler)
const pcpCommand string = "pmdumptext -m -l -f '' -t 1.0 -d , -c " + pcpConfigFile
cmd := exec.Command("sh", "-c", pcpCommand)
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
pipe, err := cmd.StdoutPipe()
if err != nil {
log.Fatal(err)
}
//cmd.Stdout = stdout
scanner := bufio.NewScanner(pipe)
go func(logging *bool) {
// Get names of the columns.
scanner.Scan()
// Write to logfile
logMType <- elekLogDef.PCP
logMsg <- scanner.Text()
logMType <- elekLogDef.DEG_COL
logMsg <- "CPU Variance, CPU Task Share Variance, Memory Variance, Memory Task Share Variance"
// Throw away first set of results
scanner.Scan()
seconds := 0
for scanner.Scan() {
text := scanner.Text()
if *logging {
logMType <- elekLogDef.PCP
logMsg <- text
}
seconds++
memUtils := memUtilPerNode(text)
memTaskShares := make([]float64, len(memUtils))
cpuUtils := cpuUtilPerNode(text)
cpuTaskShares := make([]float64, len(cpuUtils))
for i := 0; i < 8; i++ {
host := fmt.Sprintf("stratos-00%d.cs.binghamton.edu", i+1)
if slaveID, ok := baseSchedRef.HostNameToSlaveID[host]; ok {
baseSchedRef.TasksRunningMutex.Lock()
tasksRunning := len(baseSchedRef.Running[slaveID])
baseSchedRef.TasksRunningMutex.Unlock()
if tasksRunning > 0 {
cpuTaskShares[i] = cpuUtils[i] / float64(tasksRunning)
memTaskShares[i] = memUtils[i] / float64(tasksRunning)
}
}
}
// Variance in resource utilization shows how the current workload has been distributed.
// However, if the number of tasks running are not equally distributed, utilization variance figures become
// less relevant as they do not express the distribution of CPU intensive tasks.
// We thus also calculate `task share variance`, which basically signifies how the workload is distributed
// across each node per share.
cpuVariance, _ := stats.Variance(cpuUtils)
cpuTaskSharesVariance, _ := stats.Variance(cpuTaskShares)
memVariance, _ := stats.Variance(memUtils)
memTaskSharesVariance, _ := stats.Variance(memTaskShares)
logMType <- elekLogDef.DEG_COL
logMsg <- fmt.Sprintf("%f, %f, %f, %f", cpuVariance, cpuTaskSharesVariance, memVariance, memTaskSharesVariance)
}
}(logging)
logMType <- elekLogDef.GENERAL
logMsg <- "PCP logging started"
if err := cmd.Start(); err != nil {
log.Fatal(err)
}
pgid, err := syscall.Getpgid(cmd.Process.Pid)
select {
case <-quit:
logMType <- elekLogDef.GENERAL
logMsg <- "Stopping PCP logging in 5 seconds"
time.Sleep(5 * time.Second)
// http://stackoverflow.com/questions/22470193/why-wont-go-kill-a-child-process-correctly
// Kill process and all children processes.
syscall.Kill(-pgid, 15)
return
}
}