Added Error state. Fixed bug with tasks returning error because 0.0 watts was requested as a resource. Changed name to be more readable by eliminating electron. PCP logs can now have a prefix.

This commit is contained in:
Renan DelValle 2016-10-07 19:29:36 -04:00
parent 52d012a7ee
commit 5dd64f1e16
3 changed files with 25 additions and 23 deletions

View file

@ -8,13 +8,13 @@ import (
"os"
)
func Start(quit chan struct{}, logging *bool) {
func Start(quit chan struct{}, logging *bool, prefix string) {
const pcpCommand string = "pmdumptext -m -l -f '' -t 1.0 -d , -c config"
cmd := exec.Command("sh", "-c", pcpCommand)
startTime := time.Now().Format("20060102150405")
logFile, err := os.Create("./"+startTime+".pcplog")
logFile, err := os.Create("./"+prefix+startTime+".pcplog")
if err != nil {
log.Fatal(err)
}

View file

@ -55,10 +55,6 @@ func TakeOffer(offer *mesos.Offer, task Task) bool {
cpus, mem, watts := OfferAgg(offer)
if(IGNORE_WATTS) {
task.Watts = 0.0 // Don't take any watts in the offer
}
//TODO: Insert watts calculation here instead of taking them as a parameter
if cpus >= task.CPU && mem >= task.RAM && watts >= task.Watts {
@ -68,8 +64,7 @@ func TakeOffer(offer *mesos.Offer, task Task) bool {
return false
}
// rendlerScheduler implements the Scheduler interface and stores
// the state needed for Rendler to function.
// electronScheduler implements the Scheduler interface
type electronScheduler struct {
tasksCreated int
tasksRunning int
@ -77,12 +72,10 @@ type electronScheduler struct {
metrics map[string]Metric
running map[string]map[string]bool
// First set of PCP values are garbage values, signal to logger to start recording after
// we actually schedule a task
// First set of PCP values are garbage values, signal to logger to start recording when we're
// about to schedule a new task
recordPCP bool
// This channel is closed when the program receives an interrupt,
// signalling that the program should shut down.
shutdown chan struct{}
@ -110,7 +103,7 @@ func newElectronScheduler(tasks []Task) *electronScheduler {
}
func (s *electronScheduler) newTask(offer *mesos.Offer, task Task) *mesos.TaskInfo {
taskID := fmt.Sprintf("Electron-%s-%d", task.Name, *task.Instances)
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
s.tasksCreated++
if !s.recordPCP {
@ -125,19 +118,24 @@ func (s *electronScheduler) newTask(offer *mesos.Offer, task Task) *mesos.TaskIn
}
// Add task to list of tasks running on node
s.running[offer.GetSlaveId().GoString()][taskID] = true
s.running[offer.GetSlaveId().GoString()][taskName] = true
resources := []*mesos.Resource{
mesosutil.NewScalarResource("cpus", task.CPU),
mesosutil.NewScalarResource("mem", task.RAM),
}
if(!IGNORE_WATTS) {
resources = append(resources, mesosutil.NewScalarResource("watts", task.Watts))
}
return &mesos.TaskInfo{
Name: proto.String(taskID),
Name: proto.String(taskName),
TaskId: &mesos.TaskID{
Value: proto.String(taskID),
Value: proto.String("electron-" + taskName),
},
SlaveId: offer.SlaveId,
Resources: []*mesos.Resource{
mesosutil.NewScalarResource("cpus", task.CPU),
mesosutil.NewScalarResource("mem", task.RAM),
mesosutil.NewScalarResource("watts", task.Watts),
},
Resources: resources,
Command: &mesos.CommandInfo{
Value: proto.String(task.CMD),
},

View file

@ -2,6 +2,7 @@ package main
import (
mesos "github.com/mesos/mesos-go/mesosproto"
"fmt"
)
// NameFor returns the string name for a TaskState.
@ -21,8 +22,10 @@ func NameFor(state *mesos.TaskState) string {
return "TASK_KILLED" // TERMINAL
case mesos.TaskState_TASK_LOST:
return "TASK_LOST" // TERMINAL
case mesos.TaskState_TASK_ERROR:
return "TASK_ERROR"
default:
return "UNKNOWN"
return fmt.Sprintf("UNKNOWN: %d", *state)
}
}
@ -33,7 +36,8 @@ func IsTerminal(state *mesos.TaskState) bool {
case mesos.TaskState_TASK_FINISHED,
mesos.TaskState_TASK_FAILED,
mesos.TaskState_TASK_KILLED,
mesos.TaskState_TASK_LOST:
mesos.TaskState_TASK_LOST,
mesos.TaskState_TASK_ERROR:
return true
default:
return false