diff --git a/pcp/pcp.go b/pcp/pcp.go index b9743ab..0c8d0e7 100644 --- a/pcp/pcp.go +++ b/pcp/pcp.go @@ -8,13 +8,13 @@ import ( "os" ) -func Start(quit chan struct{}, logging *bool) { +func Start(quit chan struct{}, logging *bool, prefix string) { const pcpCommand string = "pmdumptext -m -l -f '' -t 1.0 -d , -c config" cmd := exec.Command("sh", "-c", pcpCommand) startTime := time.Now().Format("20060102150405") - logFile, err := os.Create("./"+startTime+".pcplog") + logFile, err := os.Create("./"+prefix+startTime+".pcplog") if err != nil { log.Fatal(err) } diff --git a/scheduler.go b/scheduler.go index cb5ce2e..742cc4a 100644 --- a/scheduler.go +++ b/scheduler.go @@ -55,10 +55,6 @@ func TakeOffer(offer *mesos.Offer, task Task) bool { cpus, mem, watts := OfferAgg(offer) - if(IGNORE_WATTS) { - task.Watts = 0.0 // Don't take any watts in the offer - } - //TODO: Insert watts calculation here instead of taking them as a parameter if cpus >= task.CPU && mem >= task.RAM && watts >= task.Watts { @@ -68,8 +64,7 @@ func TakeOffer(offer *mesos.Offer, task Task) bool { return false } -// rendlerScheduler implements the Scheduler interface and stores -// the state needed for Rendler to function. +// electronScheduler implements the Scheduler interface type electronScheduler struct { tasksCreated int tasksRunning int @@ -77,12 +72,10 @@ type electronScheduler struct { metrics map[string]Metric running map[string]map[string]bool - - // First set of PCP values are garbage values, signal to logger to start recording after - // we actually schedule a task + // First set of PCP values are garbage values, signal to logger to start recording when we're + // about to schedule a new task recordPCP bool - // This channel is closed when the program receives an interrupt, // signalling that the program should shut down. shutdown chan struct{} @@ -110,7 +103,7 @@ func newElectronScheduler(tasks []Task) *electronScheduler { } func (s *electronScheduler) newTask(offer *mesos.Offer, task Task) *mesos.TaskInfo { - taskID := fmt.Sprintf("Electron-%s-%d", task.Name, *task.Instances) + taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) s.tasksCreated++ if !s.recordPCP { @@ -125,19 +118,24 @@ func (s *electronScheduler) newTask(offer *mesos.Offer, task Task) *mesos.TaskIn } // Add task to list of tasks running on node - s.running[offer.GetSlaveId().GoString()][taskID] = true + s.running[offer.GetSlaveId().GoString()][taskName] = true + + resources := []*mesos.Resource{ + mesosutil.NewScalarResource("cpus", task.CPU), + mesosutil.NewScalarResource("mem", task.RAM), + } + + if(!IGNORE_WATTS) { + resources = append(resources, mesosutil.NewScalarResource("watts", task.Watts)) + } return &mesos.TaskInfo{ - Name: proto.String(taskID), + Name: proto.String(taskName), TaskId: &mesos.TaskID{ - Value: proto.String(taskID), + Value: proto.String("electron-" + taskName), }, SlaveId: offer.SlaveId, - Resources: []*mesos.Resource{ - mesosutil.NewScalarResource("cpus", task.CPU), - mesosutil.NewScalarResource("mem", task.RAM), - mesosutil.NewScalarResource("watts", task.Watts), - }, + Resources: resources, Command: &mesos.CommandInfo{ Value: proto.String(task.CMD), }, diff --git a/states.go b/states.go index d3b8afa..69a227b 100644 --- a/states.go +++ b/states.go @@ -2,6 +2,7 @@ package main import ( mesos "github.com/mesos/mesos-go/mesosproto" + "fmt" ) // NameFor returns the string name for a TaskState. @@ -21,8 +22,10 @@ func NameFor(state *mesos.TaskState) string { return "TASK_KILLED" // TERMINAL case mesos.TaskState_TASK_LOST: return "TASK_LOST" // TERMINAL + case mesos.TaskState_TASK_ERROR: + return "TASK_ERROR" default: - return "UNKNOWN" + return fmt.Sprintf("UNKNOWN: %d", *state) } } @@ -33,7 +36,8 @@ func IsTerminal(state *mesos.TaskState) bool { case mesos.TaskState_TASK_FINISHED, mesos.TaskState_TASK_FAILED, mesos.TaskState_TASK_KILLED, - mesos.TaskState_TASK_LOST: + mesos.TaskState_TASK_LOST, + mesos.TaskState_TASK_ERROR: return true default: return false