diff --git a/scheduler.go b/scheduler.go index 2ce4531..bef2610 100644 --- a/scheduler.go +++ b/scheduler.go @@ -10,13 +10,10 @@ import ( "log" "os" "os/signal" - "path/filepath" "time" ) const ( - taskCPUs = 0.1 - taskMem = 32.0 shutdownTimeout = time.Duration(30) * time.Second ) @@ -28,9 +25,51 @@ var ( defaultFilter = &mesos.Filters{RefuseSeconds: proto.Float64(1)} ) -// maxTasksForOffer computes how many tasks can be launched using a given offer -func maxTasksForOffer(offer *mesos.Offer) int { - count := 0 +type Task struct { + cpu float64 + mem float64 + watts float64 + image string +} + +// NameFor returns the string name for a TaskState. +func NameFor(state *mesos.TaskState) string { + switch *state { + case mesos.TaskState_TASK_STAGING: + return "TASK_STAGING" + case mesos.TaskState_TASK_STARTING: + return "TASK_STARTING" + case mesos.TaskState_TASK_RUNNING: + return "TASK_RUNNING" + case mesos.TaskState_TASK_FINISHED: + return "TASK_FINISHED" // TERMINAL + case mesos.TaskState_TASK_FAILED: + return "TASK_FAILED" // TERMINAL + case mesos.TaskState_TASK_KILLED: + return "TASK_KILLED" // TERMINAL + case mesos.TaskState_TASK_LOST: + return "TASK_LOST" // TERMINAL + default: + return "UNKNOWN" + } +} + +// IsTerminal determines if a TaskState is a terminal state, i.e. if it singals +// that the task has stopped running. +func IsTerminal(state *mesos.TaskState) bool { + switch *state { + case mesos.TaskState_TASK_FINISHED, + mesos.TaskState_TASK_FAILED, + mesos.TaskState_TASK_KILLED, + mesos.TaskState_TASK_LOST: + return true + default: + return false + } +} + +// Decides if to take an offer or not +func offerDecision(offer *mesos.Offer) bool { var cpus, mem, watts float64 @@ -46,13 +85,18 @@ func maxTasksForOffer(offer *mesos.Offer) int { } } - for cpus >= taskCPUs && mem >= taskMem { - count++ - cpus -= taskCPUs - mem -= taskMem + var taskCPUs, taskMem, taskWatts float64 + + // Insert calculation here + taskWatts = 50 + taskMem = 4096 + taskCPUs = 3.0 + + if cpus >= taskCPUs && mem >= taskMem && watts >= taskWatts { + return true } - return count + return false } // rendlerScheduler implements the Scheduler interface and stores @@ -60,9 +104,9 @@ func maxTasksForOffer(offer *mesos.Offer) int { type electronScheduler struct { tasksCreated int tasksRunning int + taskQueue []Task //FIFO dockerExecutor *mesos.ExecutorInfo - renderExecutor *mesos.ExecutorInfo // This channel is closed when the program receives an interrupt, // signalling that the program should shut down. @@ -74,17 +118,15 @@ type electronScheduler struct { // New electron scheduler func newElectronScheduler() *electronScheduler { - rendlerArtifacts := executorURIs() s := &electronScheduler{ dockerExecutor: &mesos.ExecutorInfo{ - ExecutorId: &mesos.ExecutorID{Value: proto.String("crawl-executor")}, + ExecutorId: &mesos.ExecutorID{Value: proto.String("docker-runner")}, Command: &mesos.CommandInfo{ Value: proto.String(dockerCommand), - Uris: rendlerArtifacts, }, - Name: proto.String("Crawler"), + Name: proto.String("Runner"), }, shutdown: make(chan struct{}), @@ -93,26 +135,35 @@ func newElectronScheduler() *electronScheduler { return s } -func (s *electronScheduler) newTaskPrototype(offer *mesos.Offer) *mesos.TaskInfo { +func (s *electronScheduler) newTask(offer *mesos.Offer, taskCPUs, taskMem, taskWatts float64) *mesos.TaskInfo { taskID := s.tasksCreated s.tasksCreated++ return &mesos.TaskInfo{ TaskId: &mesos.TaskID{ - Value: proto.String(fmt.Sprintf("RENDLER-%d", taskID)), + Value: proto.String(fmt.Sprintf("Electron-%d", taskID)), }, SlaveId: offer.SlaveId, Resources: []*mesos.Resource{ mesosutil.NewScalarResource("cpus", taskCPUs), mesosutil.NewScalarResource("mem", taskMem), + mesosutil.NewScalarResource("watts", taskWatts), + }, + Container: &mesos.ContainerInfo{ + Type: mesos.ContainerInfo_DOCKER.Enum(), + Docker: &mesos.ContainerInfo_DockerInfo{ + Image: proto.String("gouravr/minife:v5"), + }, + }, } } -func (s *electronScheduler) newCrawlTask(url string, offer *mesos.Offer) *mesos.TaskInfo { - task := s.newTaskPrototype(offer) +func (s *electronScheduler) newDockerTask(offer *mesos.Offer, taskCPUs, taskMem, taskWatts float64) *mesos.TaskInfo { + task := s.newTask(offer, taskCPUs, taskMem, taskWatts) task.Name = proto.String("Electron_" + *task.TaskId.Value) - task.Executor = s.dockerExecutor - task.Data = []byte(url) + task.Command = &mesos.CommandInfo{ + Value: proto.String("cd src && mpirun -np 1 miniFE.x -nx 100 -ny 100 -nz 100"), + } return task } @@ -146,15 +197,15 @@ func (s *electronScheduler) ResourceOffers(driver sched.SchedulerDriver, offers } tasks := []*mesos.TaskInfo{} - tasksToLaunch := maxTasksForOffer(offer) - for tasksToLaunch > 0 { - fmt.Println("There is enough resources to launch a task!") - } - if len(tasks) == 0 { - driver.DeclineOffer(offer.Id, defaultFilter) - } else { + if offerDecision(offer) { + tasks = append(tasks, s.newDockerTask(offer, 3.0, 4096, 50)) driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) + time.Sleep(15 * time.Minute) + } else { + fmt.Println("There is enough resources to launch a task!") + driver.DeclineOffer(offer.Id, defaultFilter) + time.Sleep(15 * time.Minute) } } } @@ -206,39 +257,15 @@ func (s *electronScheduler) Error(_ sched.SchedulerDriver, err string) { log.Printf("Receiving an error: %s", err) } -func executorURIs() []*mesos.CommandInfo_URI { - basePath, err := filepath.Abs(filepath.Dir(os.Args[0]) + "/../..") - if err != nil { - log.Fatal("Failed to find the path to RENDLER") - } - baseURI := fmt.Sprintf("%s/", basePath) - - pathToURI := func(path string, extract bool) *mesos.CommandInfo_URI { - return &mesos.CommandInfo_URI{ - Value: &path, - Extract: &extract, - } - } - - return []*mesos.CommandInfo_URI{ - pathToURI(baseURI+"render.js", false), - pathToURI(baseURI+"python/crawl_executor.py", false), - pathToURI(baseURI+"python/render_executor.py", false), - pathToURI(baseURI+"python/results.py", false), - pathToURI(baseURI+"python/task_state.py", false), - } -} - func main() { - master := flag.String("master", "127.0.1.1:5050", "Location of leading Mesos master") - + master := flag.String("master", "xavier:5050", "Location of leading Mesos master") flag.Parse() scheduler := newElectronScheduler() driver, err := sched.NewMesosSchedulerDriver(sched.DriverConfig{ Master: *master, Framework: &mesos.FrameworkInfo{ - Name: proto.String("RENDLER"), + Name: proto.String("Electron"), User: proto.String(""), }, Scheduler: scheduler, @@ -269,6 +296,7 @@ func main() { driver.Stop(false) }() + log.Printf("Starting...") if status, err := driver.Run(); err != nil { log.Printf("Framework stopped with status %s and error: %s\n", status.String(), err.Error()) }