Added Error state. Fixed bug with tasks returning error because 0.0 watts was requested as a resource. Changed name to be more readable by eliminating electron. PCP logs can now have a prefix.
This commit is contained in:
parent
6878bf55ec
commit
086b06256d
3 changed files with 25 additions and 23 deletions
|
@ -8,13 +8,13 @@ import (
|
|||
"os"
|
||||
)
|
||||
|
||||
func Start(quit chan struct{}, logging *bool) {
|
||||
func Start(quit chan struct{}, logging *bool, prefix string) {
|
||||
const pcpCommand string = "pmdumptext -m -l -f '' -t 1.0 -d , -c config"
|
||||
cmd := exec.Command("sh", "-c", pcpCommand)
|
||||
startTime := time.Now().Format("20060102150405")
|
||||
|
||||
|
||||
logFile, err := os.Create("./"+startTime+".pcplog")
|
||||
logFile, err := os.Create("./"+prefix+startTime+".pcplog")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
|
36
scheduler.go
36
scheduler.go
|
@ -55,10 +55,6 @@ func TakeOffer(offer *mesos.Offer, task Task) bool {
|
|||
|
||||
cpus, mem, watts := OfferAgg(offer)
|
||||
|
||||
if(IGNORE_WATTS) {
|
||||
task.Watts = 0.0 // Don't take any watts in the offer
|
||||
}
|
||||
|
||||
//TODO: Insert watts calculation here instead of taking them as a parameter
|
||||
|
||||
if cpus >= task.CPU && mem >= task.RAM && watts >= task.Watts {
|
||||
|
@ -68,8 +64,7 @@ func TakeOffer(offer *mesos.Offer, task Task) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
// rendlerScheduler implements the Scheduler interface and stores
|
||||
// the state needed for Rendler to function.
|
||||
// electronScheduler implements the Scheduler interface
|
||||
type electronScheduler struct {
|
||||
tasksCreated int
|
||||
tasksRunning int
|
||||
|
@ -77,12 +72,10 @@ type electronScheduler struct {
|
|||
metrics map[string]Metric
|
||||
running map[string]map[string]bool
|
||||
|
||||
|
||||
// First set of PCP values are garbage values, signal to logger to start recording after
|
||||
// we actually schedule a task
|
||||
// First set of PCP values are garbage values, signal to logger to start recording when we're
|
||||
// about to schedule a new task
|
||||
recordPCP bool
|
||||
|
||||
|
||||
// This channel is closed when the program receives an interrupt,
|
||||
// signalling that the program should shut down.
|
||||
shutdown chan struct{}
|
||||
|
@ -110,7 +103,7 @@ func newElectronScheduler(tasks []Task) *electronScheduler {
|
|||
}
|
||||
|
||||
func (s *electronScheduler) newTask(offer *mesos.Offer, task Task) *mesos.TaskInfo {
|
||||
taskID := fmt.Sprintf("Electron-%s-%d", task.Name, *task.Instances)
|
||||
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
|
||||
s.tasksCreated++
|
||||
|
||||
if !s.recordPCP {
|
||||
|
@ -125,19 +118,24 @@ func (s *electronScheduler) newTask(offer *mesos.Offer, task Task) *mesos.TaskIn
|
|||
}
|
||||
|
||||
// Add task to list of tasks running on node
|
||||
s.running[offer.GetSlaveId().GoString()][taskID] = true
|
||||
s.running[offer.GetSlaveId().GoString()][taskName] = true
|
||||
|
||||
resources := []*mesos.Resource{
|
||||
mesosutil.NewScalarResource("cpus", task.CPU),
|
||||
mesosutil.NewScalarResource("mem", task.RAM),
|
||||
}
|
||||
|
||||
if(!IGNORE_WATTS) {
|
||||
resources = append(resources, mesosutil.NewScalarResource("watts", task.Watts))
|
||||
}
|
||||
|
||||
return &mesos.TaskInfo{
|
||||
Name: proto.String(taskID),
|
||||
Name: proto.String(taskName),
|
||||
TaskId: &mesos.TaskID{
|
||||
Value: proto.String(taskID),
|
||||
Value: proto.String("electron-" + taskName),
|
||||
},
|
||||
SlaveId: offer.SlaveId,
|
||||
Resources: []*mesos.Resource{
|
||||
mesosutil.NewScalarResource("cpus", task.CPU),
|
||||
mesosutil.NewScalarResource("mem", task.RAM),
|
||||
mesosutil.NewScalarResource("watts", task.Watts),
|
||||
},
|
||||
Resources: resources,
|
||||
Command: &mesos.CommandInfo{
|
||||
Value: proto.String(task.CMD),
|
||||
},
|
||||
|
|
|
@ -2,6 +2,7 @@ package main
|
|||
|
||||
import (
|
||||
mesos "github.com/mesos/mesos-go/mesosproto"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// NameFor returns the string name for a TaskState.
|
||||
|
@ -21,8 +22,10 @@ func NameFor(state *mesos.TaskState) string {
|
|||
return "TASK_KILLED" // TERMINAL
|
||||
case mesos.TaskState_TASK_LOST:
|
||||
return "TASK_LOST" // TERMINAL
|
||||
case mesos.TaskState_TASK_ERROR:
|
||||
return "TASK_ERROR"
|
||||
default:
|
||||
return "UNKNOWN"
|
||||
return fmt.Sprintf("UNKNOWN: %d", *state)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -33,7 +36,8 @@ func IsTerminal(state *mesos.TaskState) bool {
|
|||
case mesos.TaskState_TASK_FINISHED,
|
||||
mesos.TaskState_TASK_FAILED,
|
||||
mesos.TaskState_TASK_KILLED,
|
||||
mesos.TaskState_TASK_LOST:
|
||||
mesos.TaskState_TASK_LOST,
|
||||
mesos.TaskState_TASK_ERROR:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
|
|
Reference in a new issue