diff --git a/README.md b/README.md index 4d5c250..801a7d4 100644 --- a/README.md +++ b/README.md @@ -7,5 +7,4 @@ To Do: * Add queue for jobs to be executed * Create metrics for each task launched [Time to schedule, run time, power used] * Have calibration phase? - - + * Add ability to use constraints diff --git a/pcp.go b/pcp.go index 95e907e..872a38a 100644 --- a/pcp.go +++ b/pcp.go @@ -7,7 +7,7 @@ import ( "os/exec" ) -func main() { +func PCP() { cmd := exec.Command("sh", "-c", "pmdumptext -m -l -o -d , -c config") stdout, err := os.Create("./output.txt") cmd.Stdout = stdout diff --git a/scheduler.go b/scheduler.go index 5dae4c7..3dbddb7 100644 --- a/scheduler.go +++ b/scheduler.go @@ -17,59 +17,12 @@ const ( shutdownTimeout = time.Duration(30) * time.Second ) -const ( - dockerCommand = "echo Hello_World!" -) - var ( defaultFilter = &mesos.Filters{RefuseSeconds: proto.Float64(1)} ) -type Task struct { - cpu float64 - mem float64 - watts float64 - image string -} - -// NameFor returns the string name for a TaskState. -func NameFor(state *mesos.TaskState) string { - switch *state { - case mesos.TaskState_TASK_STAGING: - return "TASK_STAGING" - case mesos.TaskState_TASK_STARTING: - return "TASK_STARTING" - case mesos.TaskState_TASK_RUNNING: - return "TASK_RUNNING" - case mesos.TaskState_TASK_FINISHED: - return "TASK_FINISHED" // TERMINAL - case mesos.TaskState_TASK_FAILED: - return "TASK_FAILED" // TERMINAL - case mesos.TaskState_TASK_KILLED: - return "TASK_KILLED" // TERMINAL - case mesos.TaskState_TASK_LOST: - return "TASK_LOST" // TERMINAL - default: - return "UNKNOWN" - } -} - -// IsTerminal determines if a TaskState is a terminal state, i.e. if it singals -// that the task has stopped running. -func IsTerminal(state *mesos.TaskState) bool { - switch *state { - case mesos.TaskState_TASK_FINISHED, - mesos.TaskState_TASK_FAILED, - mesos.TaskState_TASK_KILLED, - mesos.TaskState_TASK_LOST: - return true - default: - return false - } -} - // Decides if to take an offer or not -func offerDecision(offer *mesos.Offer) bool { +func offerDecision(offer *mesos.Offer, task Task) bool { var cpus, mem, watts float64 @@ -81,18 +34,14 @@ func offerDecision(offer *mesos.Offer) bool { mem += *resource.GetScalar().Value case "watts": watts += *resource.GetScalar().Value - fmt.Println("Got watts!: ", *resource.GetScalar().Value) } } - var taskCPUs, taskMem, taskWatts float64 - // Insert calculation here - taskWatts = 50 - taskMem = 4096 - taskCPUs = 3.0 + //TODO: Insert watts calculation here instead of taking them as a parameter - if cpus >= taskCPUs && mem >= taskMem && watts >= taskWatts { + + if cpus >= task.CPU && mem >= task.RAM && watts >= task.Watts { return true } @@ -104,7 +53,7 @@ func offerDecision(offer *mesos.Offer) bool { type electronScheduler struct { tasksCreated int tasksRunning int - taskQueue []Task //FIFO + tasks []Task dockerExecutor *mesos.ExecutorInfo @@ -117,56 +66,48 @@ type electronScheduler struct { } // New electron scheduler -func newElectronScheduler() *electronScheduler { +func newElectronScheduler(tasks []Task) *electronScheduler { s := &electronScheduler{ dockerExecutor: &mesos.ExecutorInfo{ ExecutorId: &mesos.ExecutorID{Value: proto.String("docker-runner")}, - Command: &mesos.CommandInfo{ - Value: proto.String(dockerCommand), - }, Name: proto.String("Runner"), }, - + tasks: tasks, shutdown: make(chan struct{}), done: make(chan struct{}), } return s } -func (s *electronScheduler) newTask(offer *mesos.Offer, taskCPUs, taskMem, taskWatts float64) *mesos.TaskInfo { +func (s *electronScheduler) newTask(offer *mesos.Offer, task Task) *mesos.TaskInfo { taskID := s.tasksCreated s.tasksCreated++ return &mesos.TaskInfo{ + Name: proto.String("Electron_" + fmt.Sprintf("Electron-%d", taskID)), TaskId: &mesos.TaskID{ Value: proto.String(fmt.Sprintf("Electron-%d", taskID)), }, SlaveId: offer.SlaveId, Resources: []*mesos.Resource{ - mesosutil.NewScalarResource("cpus", taskCPUs), - mesosutil.NewScalarResource("mem", taskMem), - mesosutil.NewScalarResource("watts", taskWatts), + mesosutil.NewScalarResource("cpus", task.CPU), + mesosutil.NewScalarResource("mem", task.RAM), + mesosutil.NewScalarResource("watts", task.Watts), + }, + Command: &mesos.CommandInfo{ + Value: proto.String(task.CMD), }, Container: &mesos.ContainerInfo{ Type: mesos.ContainerInfo_DOCKER.Enum(), Docker: &mesos.ContainerInfo_DockerInfo{ - Image: proto.String("gouravr/minife:v5"), + Image: proto.String(task.Image), }, }, } } -func (s *electronScheduler) newDockerTask(offer *mesos.Offer, taskCPUs, taskMem, taskWatts float64) *mesos.TaskInfo { - task := s.newTask(offer, taskCPUs, taskMem, taskWatts) - task.Name = proto.String("Electron_" + *task.TaskId.Value) - task.Command = &mesos.CommandInfo{ - Value: proto.String("cd src && mpirun -np 1 miniFE.x -nx 100 -ny 100 -nz 100"), - } - return task -} - func (s *electronScheduler) Registered( _ sched.SchedulerDriver, frameworkID *mesos.FrameworkID, @@ -196,15 +137,36 @@ func (s *electronScheduler) ResourceOffers(driver sched.SchedulerDriver, offers default: } + if(len(s.tasks) <= 0) { + log.Println("Done with scheduling all tasks...") + os.Exit(0) + } + tasks := []*mesos.TaskInfo{} - if offerDecision(offer) { - tasks = append(tasks, s.newDockerTask(offer, 3.0, 4096, 50)) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) - } else { + // First fit strategy + + taken := false + for i, task := range s.tasks { + // Decision to take the offer or not + if offerDecision(offer, task) { + tasks = append(tasks, s.newTask(offer, task)) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) + + // Delete scheduled task + s.tasks[i] = s.tasks[len(s.tasks)-1] + s.tasks = s.tasks[:len(s.tasks)-1] + taken = true + + } + } + + // If there was no match for the task + if !taken { fmt.Println("There is enough resources to launch a task!") driver.DeclineOffer(offer.Id, defaultFilter) } + } } @@ -257,9 +219,22 @@ func (s *electronScheduler) Error(_ sched.SchedulerDriver, err string) { func main() { master := flag.String("master", "xavier:5050", "Location of leading Mesos master") + tasksFile := flag.String("tasks", "", "JSON file containing task definitions") flag.Parse() - scheduler := newElectronScheduler() + + if *tasksFile == "" { + fmt.Println("No file containing tasks specifiction provided.") + os.Exit(1) + } + + tasks, err := TasksFromJSON(*tasksFile) + if(err != nil || len(tasks) == 0) { + fmt.Println("Invalid tasks specification file provided") + os.Exit(1) + } + + scheduler := newElectronScheduler(tasks) driver, err := sched.NewMesosSchedulerDriver(sched.DriverConfig{ Master: *master, Framework: &mesos.FrameworkInfo{ diff --git a/states.go b/states.go new file mode 100644 index 0000000..d3b8afa --- /dev/null +++ b/states.go @@ -0,0 +1,41 @@ +package main + +import ( + mesos "github.com/mesos/mesos-go/mesosproto" +) + +// NameFor returns the string name for a TaskState. +func NameFor(state *mesos.TaskState) string { + switch *state { + case mesos.TaskState_TASK_STAGING: + return "TASK_STAGING" + case mesos.TaskState_TASK_STARTING: + return "TASK_STARTING" + case mesos.TaskState_TASK_RUNNING: + return "TASK_RUNNING" + case mesos.TaskState_TASK_FINISHED: + return "TASK_FINISHED" // TERMINAL + case mesos.TaskState_TASK_FAILED: + return "TASK_FAILED" // TERMINAL + case mesos.TaskState_TASK_KILLED: + return "TASK_KILLED" // TERMINAL + case mesos.TaskState_TASK_LOST: + return "TASK_LOST" // TERMINAL + default: + return "UNKNOWN" + } +} + +// IsTerminal determines if a TaskState is a terminal state, i.e. if it singals +// that the task has stopped running. +func IsTerminal(state *mesos.TaskState) bool { + switch *state { + case mesos.TaskState_TASK_FINISHED, + mesos.TaskState_TASK_FAILED, + mesos.TaskState_TASK_KILLED, + mesos.TaskState_TASK_LOST: + return true + default: + return false + } +} diff --git a/task.go b/task.go new file mode 100644 index 0000000..467379c --- /dev/null +++ b/task.go @@ -0,0 +1,33 @@ +package main + +import ( + "encoding/json" + "os" + "github.com/pkg/errors" +) + +type Task struct { + CPU float64 `json: "cpu"` + RAM float64 `json: "ram"` + Watts float64 `json: "watts"` + Image string `json: "image"` + CMD string `json: "cmd"` + Instances int `default 1, json: "inst"` +} + +func TasksFromJSON(uri string) ([]Task, error) { + + var tasks []Task + + file, err := os.Open(uri) + if err != nil { + return nil, errors.Wrap(err, "Error opening file") + } + + err = json.NewDecoder(file).Decode(&tasks) + if err != nil { + return nil, errors.Wrap(err, "Error unmarshalling") + } + + return tasks, nil +} \ No newline at end of file diff --git a/workload_1.json b/workload_1.json new file mode 100644 index 0000000..db50981 --- /dev/null +++ b/workload_1.json @@ -0,0 +1,11 @@ +[ + { + "cpu": 3.0, + "ram": 4096, + "watts": 50, + "image": "gouravr/minife:v5", + "cmd": "cd src && mpirun -np 1 miniFE.x -nx 100 -ny 100 -nz 100", + "inst": 1 + } + +]