diff --git a/logging/def/degColLogger.go b/logging/def/degColLogger.go deleted file mode 100644 index 9b9d0c7..0000000 --- a/logging/def/degColLogger.go +++ /dev/null @@ -1,9 +0,0 @@ -package logging - -type DegColLogger struct { - loggerObserverImpl -} - -func (pl *DegColLogger) Log(message string) { - pl.logObserverSpecifics[degColLogger].logFile.Println(message) -} diff --git a/logging/def/logType.go b/logging/def/logType.go index 891b06e..59efc06 100644 --- a/logging/def/logType.go +++ b/logging/def/logType.go @@ -13,7 +13,6 @@ var ( SUCCESS = messageNametoMessageType("SUCCESS") SCHED_TRACE = messageNametoMessageType("SCHED_TRACE") PCP = messageNametoMessageType("PCP") - DEG_COL = messageNametoMessageType("DEG_COL") SPS = messageNametoMessageType("SPS") CLSFN_TASKDIST_OVERHEAD = messageNametoMessageType("CLSFN_TASKDIST_OVERHEAD") SCHED_WINDOW = messageNametoMessageType("SCHED_WINDOW") diff --git a/logging/def/logger.go b/logging/def/logger.go index 42e946a..f55aa86 100644 --- a/logging/def/logger.go +++ b/logging/def/logger.go @@ -18,7 +18,6 @@ func newLogger() *LoggerDriver { SCHED_TRACE: true, SUCCESS: true, PCP: true, - DEG_COL: true, SPS: true, CLSFN_TASKDIST_OVERHEAD: true, SCHED_WINDOW: true, diff --git a/logging/def/loggerFactory.go b/logging/def/loggerFactory.go index 321128e..a83b434 100644 --- a/logging/def/loggerFactory.go +++ b/logging/def/loggerFactory.go @@ -12,7 +12,6 @@ const ( conLogger = "console-logger" schedTraceLogger = "schedTrace-logger" pcpLogger = "pcp-logger" - degColLogger = "degCol-logger" spsLogger = "schedPolicySwitch-logger" clsfnTaskDistOverheadLogger = "classificationOverhead-logger" schedWindowLogger = "schedWindow-logger" @@ -23,7 +22,6 @@ var Loggers map[string]loggerObserver = map[string]loggerObserver{ conLogger: nil, schedTraceLogger: nil, pcpLogger: nil, - degColLogger: nil, spsLogger: nil, clsfnTaskDistOverheadLogger: nil, schedWindowLogger: nil, @@ -46,7 +44,6 @@ func withLoggerSpecifics(prefix string) loggerOption { conLogger: &specifics{}, schedTraceLogger: &specifics{}, pcpLogger: &specifics{}, - degColLogger: &specifics{}, spsLogger: &specifics{}, clsfnTaskDistOverheadLogger: &specifics{}, schedWindowLogger: &specifics{}, @@ -71,9 +68,6 @@ func attachAllLoggers(lg *LoggerDriver, startTime time.Time, prefix string) { Loggers[pcpLogger] = &PCPLogger{ loggerObserverImpl: *loi, } - Loggers[degColLogger] = &DegColLogger{ - loggerObserverImpl: *loi, - } Loggers[spsLogger] = &SchedPolicySwitchLogger{ loggerObserverImpl: *loi, } @@ -98,8 +92,6 @@ func attachAllLoggers(lg *LoggerDriver, startTime time.Time, prefix string) { lg.attach(SUCCESS, Loggers[conLogger]) case PCP.String(): lg.attach(PCP, Loggers[pcpLogger]) - case DEG_COL.String(): - lg.attach(DEG_COL, Loggers[degColLogger]) case SPS.String(): lg.attach(SPS, Loggers[spsLogger]) case CLSFN_TASKDIST_OVERHEAD.String(): diff --git a/logging/def/loggerObservers.go b/logging/def/loggerObservers.go index 87f5b46..e0395b2 100644 --- a/logging/def/loggerObservers.go +++ b/logging/def/loggerObservers.go @@ -71,13 +71,6 @@ func (loi *loggerObserverImpl) setLogFilePrefix(prefix string) { } loi.logObserverSpecifics[schedTraceLogger].logFilePrefix = schedTraceLogFilePrefix - // Setting logFilePrefix for degCol logger - degColLogFilePrefix := prefix + "_degCol.log" - if loi.logDirectory != "" { - degColLogFilePrefix = loi.logDirectory + "/" + degColLogFilePrefix - } - loi.logObserverSpecifics[degColLogger].logFilePrefix = degColLogFilePrefix - // Setting logFilePrefix for schedulingPolicySwitch logger schedPolicySwitchLogFilePrefix := prefix + "_schedPolicySwitch.log" if loi.logDirectory != "" { diff --git a/pcp/pcp.go b/pcp/pcp.go index 407376b..7c48839 100644 --- a/pcp/pcp.go +++ b/pcp/pcp.go @@ -2,21 +2,16 @@ package pcp import ( "bufio" - "fmt" "log" "os/exec" "syscall" "time" - "github.com/mesos/mesos-go/api/v0/scheduler" - "github.com/montanaflynn/stats" elekLogDef "gitlab.com/spdf/elektron/logging/def" - "gitlab.com/spdf/elektron/schedulers" ) func Start(quit chan struct{}, logging *bool, logMType chan elekLogDef.LogMessageType, - logMsg chan string, pcpConfigFile string, s scheduler.Scheduler) { - baseSchedRef := s.(*schedulers.BaseScheduler) + logMsg chan string, pcpConfigFile string) { var pcpCommand string = "pmdumptext -m -l -f '' -t 1.0 -d , -c " + pcpConfigFile cmd := exec.Command("sh", "-c", pcpCommand) cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} @@ -37,9 +32,6 @@ func Start(quit chan struct{}, logging *bool, logMType chan elekLogDef.LogMessag logMType <- elekLogDef.PCP logMsg <- scanner.Text() - logMType <- elekLogDef.DEG_COL - logMsg <- "CPU Variance, CPU Task Share Variance, Memory Variance, Memory Task Share Variance" - // Throw away first set of results scanner.Scan() @@ -54,39 +46,6 @@ func Start(quit chan struct{}, logging *bool, logMType chan elekLogDef.LogMessag } seconds++ - - memUtils := MemUtilPerNode(text) - memTaskShares := make([]float64, len(memUtils)) - - cpuUtils := CpuUtilPerNode(text) - cpuTaskShares := make([]float64, len(cpuUtils)) - - for i := 0; i < 8; i++ { - host := fmt.Sprintf("stratos-00%d.cs.binghamton.edu", i+1) - if slaveID, ok := baseSchedRef.HostNameToSlaveID[host]; ok { - baseSchedRef.TasksRunningMutex.Lock() - tasksRunning := len(baseSchedRef.Running[slaveID]) - baseSchedRef.TasksRunningMutex.Unlock() - if tasksRunning > 0 { - cpuTaskShares[i] = cpuUtils[i] / float64(tasksRunning) - memTaskShares[i] = memUtils[i] / float64(tasksRunning) - } - } - } - - // Variance in resource utilization shows how the current workload has been distributed. - // However, if the number of tasks running are not equally distributed, utilization variance figures become - // less relevant as they do not express the distribution of CPU intensive tasks. - // We thus also calculate `task share variance`, which basically signifies how the workload is distributed - // across each node per share. - - cpuVariance, _ := stats.Variance(cpuUtils) - cpuTaskSharesVariance, _ := stats.Variance(cpuTaskShares) - memVariance, _ := stats.Variance(memUtils) - memTaskSharesVariance, _ := stats.Variance(memTaskShares) - - logMType <- elekLogDef.DEG_COL - logMsg <- fmt.Sprintf("%f, %f, %f, %f", cpuVariance, cpuTaskSharesVariance, memVariance, memTaskSharesVariance) } }(logging) diff --git a/pcp/utils.go b/pcp/utils.go index 60b4839..ff2ef55 100644 --- a/pcp/utils.go +++ b/pcp/utils.go @@ -3,8 +3,6 @@ package pcp import ( "container/ring" "math" - "strconv" - "strings" ) var RAPLUnits = math.Pow(2, -32) @@ -49,27 +47,3 @@ func AverageClusterPowerHistory(history *ring.Ring) float64 { return (total / count) } - -func utilization(used string, free string) float64 { - u, _ := strconv.ParseFloat(used, 64) - f, _ := strconv.ParseFloat(free, 64) - return u / (u + f) -} - -func CpuUtilPerNode(text string) []float64 { - tokenSlice := strings.Split(text, ",") - cpuUtil := make([]float64, 8) - for i := 0; i < 8; i++ { - cpuUtil[i] = utilization(tokenSlice[8+i], tokenSlice[24+i]) - } - return cpuUtil -} - -func MemUtilPerNode(text string) []float64 { - tokenSlice := strings.Split(text, ",") - memUtil := make([]float64, 8) - for i := 0; i < 8; i++ { - memUtil[i] = utilization(tokenSlice[40+i], tokenSlice[32+i]) - } - return memUtil -} diff --git a/powerCap/extrema.go b/powerCap/extrema.go index d732f24..8b7a2d4 100644 --- a/powerCap/extrema.go +++ b/powerCap/extrema.go @@ -12,18 +12,14 @@ import ( "syscall" "time" - "github.com/mesos/mesos-go/api/v0/scheduler" - "github.com/montanaflynn/stats" elekLogDef "gitlab.com/spdf/elektron/logging/def" "gitlab.com/spdf/elektron/pcp" "gitlab.com/spdf/elektron/rapl" - "gitlab.com/spdf/elektron/schedulers" ) func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, hiThreshold, loThreshold float64, - logMType chan elekLogDef.LogMessageType, logMsg chan string, pcpConfigFile string, s scheduler.Scheduler) { + logMType chan elekLogDef.LogMessageType, logMsg chan string, pcpConfigFile string) { - baseSchedRef := s.(*schedulers.BaseScheduler) var pcpCommand string = "pmdumptext -m -l -f '' -t 1.0 -d , -c " + pcpConfigFile cmd := exec.Command("sh", "-c", pcpCommand, pcpConfigFile) cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} @@ -50,9 +46,6 @@ func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, hiThresh headers := strings.Split(scanner.Text(), ",") - logMType <- elekLogDef.DEG_COL - logMsg <- "CPU Variance, CPU Task Share Variance, Memory Variance, Memory Task Share Variance" - powerIndexes := make([]int, 0, 0) powerHistories := make(map[string]*ring.Ring) indexToHost := make(map[int]string) @@ -91,39 +84,6 @@ func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, hiThresh logMType <- elekLogDef.PCP logMsg <- text - memUtils := pcp.MemUtilPerNode(text) - memTaskShares := make([]float64, len(memUtils)) - - cpuUtils := pcp.CpuUtilPerNode(text) - cpuTaskShares := make([]float64, len(cpuUtils)) - - for i := 0; i < 8; i++ { - host := fmt.Sprintf("stratos-00%d.cs.binghamton.edu", i+1) - if slaveID, ok := baseSchedRef.HostNameToSlaveID[host]; ok { - baseSchedRef.TasksRunningMutex.Lock() - tasksRunning := len(baseSchedRef.Running[slaveID]) - baseSchedRef.TasksRunningMutex.Unlock() - if tasksRunning > 0 { - cpuTaskShares[i] = cpuUtils[i] / float64(tasksRunning) - memTaskShares[i] = memUtils[i] / float64(tasksRunning) - } - } - } - - // Variance in resource utilization shows how the current workload has been distributed. - // However, if the number of tasks running are not equally distributed, utilization variance figures become - // less relevant as they do not express the distribution of CPU intensive tasks. - // We thus also calculate `task share variance`, which basically signifies how the workload is distributed - // across each node per share. - - cpuVariance, _ := stats.Variance(cpuUtils) - cpuTaskSharesVariance, _ := stats.Variance(cpuTaskShares) - memVariance, _ := stats.Variance(memUtils) - memTaskSharesVariance, _ := stats.Variance(memTaskShares) - - logMType <- elekLogDef.DEG_COL - logMsg <- fmt.Sprintf("%f, %f, %f, %f", cpuVariance, cpuTaskSharesVariance, memVariance, memTaskSharesVariance) - totalPower := 0.0 for _, powerIndex := range powerIndexes { power, _ := strconv.ParseFloat(split[powerIndex], 64) diff --git a/powerCap/progressiveExtrema.go b/powerCap/progressiveExtrema.go index da4a0ba..e8bc5e4 100644 --- a/powerCap/progressiveExtrema.go +++ b/powerCap/progressiveExtrema.go @@ -13,13 +13,10 @@ import ( "syscall" "time" - "github.com/mesos/mesos-go/api/v0/scheduler" - "github.com/montanaflynn/stats" "gitlab.com/spdf/elektron/constants" elekLogDef "gitlab.com/spdf/elektron/logging/def" "gitlab.com/spdf/elektron/pcp" "gitlab.com/spdf/elektron/rapl" - "gitlab.com/spdf/elektron/schedulers" "gitlab.com/spdf/elektron/utilities" ) @@ -34,9 +31,8 @@ func getNextCapValue(curCapValue float64, precision int) float64 { } func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, hiThreshold, loThreshold float64, - logMType chan elekLogDef.LogMessageType, logMsg chan string, pcpConfigFile string, s scheduler.Scheduler) { + logMType chan elekLogDef.LogMessageType, logMsg chan string, pcpConfigFile string) { - baseSchedRef := s.(*schedulers.BaseScheduler) var pcpCommand string = "pmdumptext -m -l -f '' -t 1.0 -d , -c " + pcpConfigFile cmd := exec.Command("sh", "-c", pcpCommand, pcpConfigFile) cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} @@ -64,9 +60,6 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, hiTh headers := strings.Split(scanner.Text(), ",") - logMType <- elekLogDef.DEG_COL - logMsg <- "CPU Variance, CPU Task Share Variance, Memory Variance, Memory Task Share Variance" - powerIndexes := make([]int, 0, 0) powerHistories := make(map[string]*ring.Ring) indexToHost := make(map[int]string) @@ -110,39 +103,6 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, hiTh logMType <- elekLogDef.PCP logMsg <- text - memUtils := pcp.MemUtilPerNode(text) - memTaskShares := make([]float64, len(memUtils)) - - cpuUtils := pcp.CpuUtilPerNode(text) - cpuTaskShares := make([]float64, len(cpuUtils)) - - for i := 0; i < 8; i++ { - host := fmt.Sprintf("stratos-00%d.cs.binghamton.edu", i+1) - if slaveID, ok := baseSchedRef.HostNameToSlaveID[host]; ok { - baseSchedRef.TasksRunningMutex.Lock() - tasksRunning := len(baseSchedRef.Running[slaveID]) - baseSchedRef.TasksRunningMutex.Unlock() - if tasksRunning > 0 { - cpuTaskShares[i] = cpuUtils[i] / float64(tasksRunning) - memTaskShares[i] = memUtils[i] / float64(tasksRunning) - } - } - } - - // Variance in resource utilization shows how the current workload has been distributed. - // However, if the number of tasks running are not equally distributed, utilization variance figures become - // less relevant as they do not express the distribution of CPU intensive tasks. - // We thus also calculate `task share variance`, which basically signifies how the workload is distributed - // across each node per share. - - cpuVariance, _ := stats.Variance(cpuUtils) - cpuTaskSharesVariance, _ := stats.Variance(cpuTaskShares) - memVariance, _ := stats.Variance(memUtils) - memTaskSharesVariance, _ := stats.Variance(memTaskShares) - - logMType <- elekLogDef.DEG_COL - logMsg <- fmt.Sprintf("%f, %f, %f, %f", cpuVariance, cpuTaskSharesVariance, memVariance, memTaskSharesVariance) - totalPower := 0.0 for _, powerIndex := range powerIndexes { power, _ := strconv.ParseFloat(split[powerIndex], 64) diff --git a/scheduler.go b/scheduler.go index 2353ead..f7aa7f8 100644 --- a/scheduler.go +++ b/scheduler.go @@ -232,13 +232,13 @@ func main() { // Starting PCP logging. if noPowercap { - go pcp.Start(pcpLog, &recordPCP, logMType, logMsg, *pcpConfigFile, scheduler) + go pcp.Start(pcpLog, &recordPCP, logMType, logMsg, *pcpConfigFile) } else if extrema { go powerCap.StartPCPLogAndExtremaDynamicCap(pcpLog, &recordPCP, *hiThreshold, - *loThreshold, logMType, logMsg, *pcpConfigFile, scheduler) + *loThreshold, logMType, logMsg, *pcpConfigFile) } else if progExtrema { go powerCap.StartPCPLogAndProgressiveExtremaCap(pcpLog, &recordPCP, *hiThreshold, - *loThreshold, logMType, logMsg, *pcpConfigFile, scheduler) + *loThreshold, logMType, logMsg, *pcpConfigFile) } // Take a second between starting PCP log and continuing.