From fce62981da23b4f1b67fe7e3f193f158c70da2ce Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Thu, 13 Oct 2016 17:15:09 -0400 Subject: [PATCH] Moved schedulers from the main programs to schedulers package. Can now choose different scheduelrs to use. Work on code sharing between schedulers remains to be done. --- def/metric.go | 8 + task.go => def/task.go | 22 +-- metrics.go | 8 - pcp/pcp.go | 33 ++-- scheduler.go | 301 ++---------------------------- schedulers/binpackwatts.go | 241 ++++++++++++++++++++++++ schedulers/firstfit.go | 242 ++++++++++++++++++++++++ schedulers/helpers.go | 39 ++++ states.go => schedulers/states.go | 4 +- 9 files changed, 575 insertions(+), 323 deletions(-) create mode 100644 def/metric.go rename task.go => def/task.go (58%) delete mode 100644 metrics.go create mode 100644 schedulers/binpackwatts.go create mode 100644 schedulers/firstfit.go create mode 100644 schedulers/helpers.go rename states.go => schedulers/states.go (98%) diff --git a/def/metric.go b/def/metric.go new file mode 100644 index 0000000..6278a31 --- /dev/null +++ b/def/metric.go @@ -0,0 +1,8 @@ +package def + +type Metric struct { + Name string `json:"name"` + CPU float64 `json:"cpu"` + RAM float64 `json:"ram"` + Watts float64 `json:"watts"` +} diff --git a/task.go b/def/task.go similarity index 58% rename from task.go rename to def/task.go index cbc0e5a..94629d1 100644 --- a/task.go +++ b/def/task.go @@ -1,20 +1,20 @@ -package main +package def import ( "encoding/json" - "os" "github.com/pkg/errors" + "os" ) type Task struct { - Name string `json:"name"` - CPU float64 `json:"cpu"` - RAM float64 `json:"ram"` - Watts float64 `json:"watts"` - Image string `json:"image"` - CMD string `json:"cmd"` - Instances *int `json:"inst"` - Host string `json:"host"` + Name string `json:"name"` + CPU float64 `json:"cpu"` + RAM float64 `json:"ram"` + Watts float64 `json:"watts"` + Image string `json:"image"` + CMD string `json:"cmd"` + Instances *int `json:"inst"` + Host string `json:"host"` } func TasksFromJSON(uri string) ([]Task, error) { @@ -32,4 +32,4 @@ func TasksFromJSON(uri string) ([]Task, error) { } return tasks, nil -} \ No newline at end of file +} diff --git a/metrics.go b/metrics.go deleted file mode 100644 index 2041007..0000000 --- a/metrics.go +++ /dev/null @@ -1,8 +0,0 @@ -package main - -type Metric struct{ - Name string `json:"name"` - CPU float64 `json:"cpu"` - RAM float64 `json:"ram"` - Watts float64 `json:"watts"` -} diff --git a/pcp/pcp.go b/pcp/pcp.go index 4329477..8a1d46d 100644 --- a/pcp/pcp.go +++ b/pcp/pcp.go @@ -3,10 +3,10 @@ package pcp import ( "bufio" "log" - "os/exec" - "time" "os" + "os/exec" "syscall" + "time" ) func Start(quit chan struct{}, logging *bool, prefix string) { @@ -15,8 +15,7 @@ func Start(quit chan struct{}, logging *bool, prefix string) { cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} startTime := time.Now().Format("20060102150405") - - logFile, err := os.Create("./"+prefix+startTime+".pcplog") + logFile, err := os.Create("./" + prefix + startTime + ".pcplog") if err != nil { log.Fatal(err) } @@ -39,12 +38,12 @@ func Start(quit chan struct{}, logging *bool, prefix string) { logFile.WriteString(scanner.Text() + "\n") /* - headers := strings.Split(scanner.Text(), ",") + headers := strings.Split(scanner.Text(), ",") - for _, hostMetric := range headers { - split := strings.Split(hostMetric, ":") - fmt.Printf("Host %s: Metric: %s\n", split[0], split[1]) - } + for _, hostMetric := range headers { + split := strings.Split(hostMetric, ":") + fmt.Printf("Host %s: Metric: %s\n", split[0], split[1]) + } */ // Throw away first set of results @@ -53,17 +52,16 @@ func Start(quit chan struct{}, logging *bool, prefix string) { seconds := 0 for scanner.Scan() { - - if(*logging) { + if *logging { log.Println("Logging PCP...") logFile.WriteString(scanner.Text() + "\n") } /* - fmt.Printf("Second: %d\n", seconds) - for i, val := range strings.Split(scanner.Text(), ",") { - fmt.Printf("host metric: %s val: %s\n", headers[i], val) - }*/ + fmt.Printf("Second: %d\n", seconds) + for i, val := range strings.Split(scanner.Text(), ",") { + fmt.Printf("host metric: %s val: %s\n", headers[i], val) + }*/ seconds++ @@ -73,15 +71,14 @@ func Start(quit chan struct{}, logging *bool, prefix string) { log.Println("PCP logging started") - if err := cmd.Start(); err != nil { log.Fatal(err) } pgid, err := syscall.Getpgid(cmd.Process.Pid) - select{ - case <- quit: + select { + case <-quit: log.Println("Stopping PCP logging in 5 seconds") time.Sleep(5 * time.Second) diff --git a/scheduler.go b/scheduler.go index c3a92eb..6b11506 100644 --- a/scheduler.go +++ b/scheduler.go @@ -1,310 +1,43 @@ package main import ( + "bitbucket.org/bingcloud/electron/def" + "bitbucket.org/bingcloud/electron/pcp" + "bitbucket.org/bingcloud/electron/schedulers" "flag" "fmt" "github.com/golang/protobuf/proto" mesos "github.com/mesos/mesos-go/mesosproto" - "github.com/mesos/mesos-go/mesosutil" sched "github.com/mesos/mesos-go/scheduler" "log" "os" - "time" - "bitbucket.org/bingcloud/electron/pcp" - "strings" "os/signal" + "time" ) -const ( - shutdownTimeout = time.Duration(30) * time.Second -) - -var ( - defaultFilter = &mesos.Filters{RefuseSeconds: proto.Float64(1)} - longFilter = &mesos.Filters{RefuseSeconds: proto.Float64(1000)} - IGNORE_WATTS = false -) - -func CoLocated(tasks map[string]bool) { - - for task := range tasks { - log.Println(task) - } - - fmt.Println("---------------------") -} - -func OfferAgg(offer *mesos.Offer) (float64, float64, float64) { - var cpus, mem, watts float64 - - for _, resource := range offer.Resources { - switch resource.GetName() { - case "cpus": - cpus += *resource.GetScalar().Value - case "mem": - mem += *resource.GetScalar().Value - case "watts": - watts += *resource.GetScalar().Value - } - } - - return cpus, mem, watts -} - -// Decides if to take an offer or not -func TakeOffer(offer *mesos.Offer, task Task) bool { - - cpus, mem, watts := OfferAgg(offer) - - //TODO: Insert watts calculation here instead of taking them as a parameter - - if cpus >= task.CPU && mem >= task.RAM && watts >= task.Watts { - return true - } - - return false -} - -// electronScheduler implements the Scheduler interface -type electronScheduler struct { - tasksCreated int - tasksRunning int - tasks []Task - metrics map[string]Metric - running map[string]map[string]bool - - // First set of PCP values are garbage values, signal to logger to start recording when we're - // about to schedule a new task - recordPCP bool - - // This channel is closed when the program receives an interrupt, - // signalling that the program should shut down. - shutdown chan struct{} - // This channel is closed after shutdown is closed, and only when all - // outstanding tasks have been cleaned up - done chan struct{} - - - // Controls when to shutdown pcp logging - pcpLog chan struct{} -} - -// New electron scheduler -func newElectronScheduler(tasks []Task) *electronScheduler { - - s := &electronScheduler{ - tasks: tasks, - shutdown: make(chan struct{}), - done: make(chan struct{}), - pcpLog: make(chan struct{}), - running: make(map[string]map[string]bool), - recordPCP: false, - } - return s -} - -func (s *electronScheduler) newTask(offer *mesos.Offer, task Task) *mesos.TaskInfo { - taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) - s.tasksCreated++ - - if !s.recordPCP { - // Turn on logging - s.recordPCP = true - time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts - } - - // If this is our first time running into this Agent - if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok { - s.running[offer.GetSlaveId().GoString()] = make(map[string]bool) - } - - // Add task to list of tasks running on node - s.running[offer.GetSlaveId().GoString()][taskName] = true - - resources := []*mesos.Resource{ - mesosutil.NewScalarResource("cpus", task.CPU), - mesosutil.NewScalarResource("mem", task.RAM), - } - - if(!IGNORE_WATTS) { - resources = append(resources, mesosutil.NewScalarResource("watts", task.Watts)) - } - - return &mesos.TaskInfo{ - Name: proto.String(taskName), - TaskId: &mesos.TaskID{ - Value: proto.String("electron-" + taskName), - }, - SlaveId: offer.SlaveId, - Resources: resources, - Command: &mesos.CommandInfo{ - Value: proto.String(task.CMD), - }, - Container: &mesos.ContainerInfo{ - Type: mesos.ContainerInfo_DOCKER.Enum(), - Docker: &mesos.ContainerInfo_DockerInfo{ - Image: proto.String(task.Image), - Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated - }, - - }, - } -} - -func (s *electronScheduler) Registered( - _ sched.SchedulerDriver, - frameworkID *mesos.FrameworkID, - masterInfo *mesos.MasterInfo) { - log.Printf("Framework %s registered with master %s", frameworkID, masterInfo) -} - -func (s *electronScheduler) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) { - log.Printf("Framework re-registered with master %s", masterInfo) -} - -func (s *electronScheduler) Disconnected(sched.SchedulerDriver) { - log.Println("Framework disconnected with master") -} - -func (s *electronScheduler) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { - log.Printf("Received %d resource offers", len(offers)) - - for _, offer := range offers { - select { - case <-s.shutdown: - log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) - - log.Println("Number of tasks still running: ", s.tasksRunning) - continue - default: - } - - tasks := []*mesos.TaskInfo{} - - // First fit strategy - - taken := false - for i, task := range s.tasks { - - // Check host if it exists - if task.Host != "" { - // Don't take offer if it doesn't match our task's host requirement - if !strings.HasPrefix(*offer.Hostname, task.Host) { - continue - } - } - - // Decision to take the offer or not - if TakeOffer(offer, task) { - - log.Println("Co-Located with: ") - CoLocated(s.running[offer.GetSlaveId().GoString()]) - - tasks = append(tasks, s.newTask(offer, task)) - - log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) - - taken = true - - fmt.Println("Inst: ", *task.Instances) - *task.Instances-- - - if *task.Instances <= 0 { - // All instances of task have been scheduled, remove it - s.tasks[i] = s.tasks[len(s.tasks)-1] - s.tasks = s.tasks[:len(s.tasks)-1] - - if(len(s.tasks) <= 0) { - log.Println("Done scheduling all tasks") - close(s.shutdown) - } - } - break // Offer taken, move on - } - } - - // If there was no match for the task - if !taken { - fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := OfferAgg(offer) - - log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) - } - - } -} - -func (s *electronScheduler) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { - log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) - - if *status.State == mesos.TaskState_TASK_RUNNING { - s.tasksRunning++ - } else if IsTerminal(status.State) { - delete(s.running[status.GetSlaveId().GoString()],*status.TaskId.Value) - s.tasksRunning-- - if s.tasksRunning == 0 { - select { - case <-s.shutdown: - close(s.done) - default: - } - } - } - log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) -} - -func (s *electronScheduler) FrameworkMessage( - driver sched.SchedulerDriver, - executorID *mesos.ExecutorID, - slaveID *mesos.SlaveID, - message string) { - - log.Println("Getting a framework message: ", message) - log.Printf("Received a framework message from some unknown source: %s", *executorID.Value) -} - -func (s *electronScheduler) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) { - log.Printf("Offer %s rescinded", offerID) -} -func (s *electronScheduler) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) { - log.Printf("Slave %s lost", slaveID) -} -func (s *electronScheduler) ExecutorLost(_ sched.SchedulerDriver, executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, status int) { - log.Printf("Executor %s on slave %s was lost", executorID, slaveID) -} - -func (s *electronScheduler) Error(_ sched.SchedulerDriver, err string) { - log.Printf("Receiving an error: %s", err) -} - var master = flag.String("master", "xavier:5050", "Location of leading Mesos master") var tasksFile = flag.String("workload", "", "JSON file containing task definitions") var ignoreWatts = flag.Bool("ignoreWatts", false, "Ignore watts in offers") var pcplogPrefix = flag.String("logPrefix", "", "Prefix for pcplog") // Short hand args -func init(){ +func init() { flag.StringVar(master, "m", "xavier:5050", "Location of leading Mesos master (shorthand)") flag.StringVar(tasksFile, "w", "", "JSON file containing task definitions (shorthand)") flag.BoolVar(ignoreWatts, "i", false, "Ignore watts in offers (shorthand)") - flag.StringVar(pcplogPrefix, "p", "", "Prefix for pcplog") + flag.StringVar(pcplogPrefix, "p", "", "Prefix for pcplog (shorthand)") } func main() { flag.Parse() - IGNORE_WATTS = *ignoreWatts - if *tasksFile == "" { fmt.Println("No file containing tasks specifiction provided.") os.Exit(1) } - tasks, err := TasksFromJSON(*tasksFile) - if(err != nil || len(tasks) == 0) { + tasks, err := def.TasksFromJSON(*tasksFile) + if err != nil || len(tasks) == 0 { fmt.Println("Invalid tasks specification file provided") os.Exit(1) } @@ -314,7 +47,7 @@ func main() { fmt.Println(task) } - scheduler := newElectronScheduler(tasks) + scheduler := schedulers.NewFirstFit(tasks, *ignoreWatts) driver, err := sched.NewMesosSchedulerDriver(sched.DriverConfig{ Master: *master, Framework: &mesos.FrameworkInfo{ @@ -328,7 +61,7 @@ func main() { return } - go pcp.Start(scheduler.pcpLog, &scheduler.recordPCP, *pcplogPrefix) + go pcp.Start(scheduler.PCPLog, &scheduler.RecordPCP, *pcplogPrefix) time.Sleep(1 * time.Second) // Attempt to handle signint to not leave pmdumptext running @@ -338,28 +71,28 @@ func main() { signal.Notify(c, os.Interrupt, os.Kill) s := <-c if s != os.Interrupt { - close(scheduler.pcpLog) + close(scheduler.PCPLog) return } log.Printf("Received SIGINT...stopping") - close(scheduler.done) + close(scheduler.Done) }() go func() { // Signals we have scheduled every task we have select { - case <-scheduler.shutdown: - // case <-time.After(shutdownTimeout): + case <-scheduler.Shutdown: + // case <-time.After(shutdownTimeout): } // All tasks have finished select { - case <-scheduler.done: - close(scheduler.pcpLog) + case <-scheduler.Done: + close(scheduler.PCPLog) time.Sleep(5 * time.Second) //Wait for PCP to log a few more seconds -// case <-time.After(shutdownTimeout): + // case <-time.After(shutdownTimeout): } // Done shutting down diff --git a/schedulers/binpackwatts.go b/schedulers/binpackwatts.go new file mode 100644 index 0000000..690e793 --- /dev/null +++ b/schedulers/binpackwatts.go @@ -0,0 +1,241 @@ +package schedulers + +import ( + "bitbucket.org/bingcloud/electron/def" + "fmt" + "github.com/golang/protobuf/proto" + mesos "github.com/mesos/mesos-go/mesosproto" + "github.com/mesos/mesos-go/mesosutil" + sched "github.com/mesos/mesos-go/scheduler" + "log" + "strings" + "time" +) + +// Decides if to take an offer or not +func (*BinPackWatts) takeOffer(offer *mesos.Offer, task def.Task) bool { + + cpus, mem, watts := OfferAgg(offer) + + //TODO: Insert watts calculation here instead of taking them as a parameter + + if cpus >= task.CPU && mem >= task.RAM && watts >= task.Watts { + return true + } + + return false +} + +type BinPackWatts struct { + tasksCreated int + tasksRunning int + tasks []def.Task + metrics map[string]def.Metric + running map[string]map[string]bool + ignoreWatts bool + + // First set of PCP values are garbage values, signal to logger to start recording when we're + // about to schedule a new task + RecordPCP bool + + // This channel is closed when the program receives an interrupt, + // signalling that the program should shut down. + Shutdown chan struct{} + // This channel is closed after shutdown is closed, and only when all + // outstanding tasks have been cleaned up + Done chan struct{} + + // Controls when to shutdown pcp logging + PCPLog chan struct{} +} + +// New electron scheduler +func NewBinPackWatts(tasks []def.Task, ignoreWatts bool) *BinPackWatts { + + s := &BinPackWatts{ + tasks: tasks, + ignoreWatts: ignoreWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + RecordPCP: false, + } + return s +} + +func (s *BinPackWatts) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { + taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) + s.tasksCreated++ + + if !s.RecordPCP { + // Turn on logging + s.RecordPCP = true + time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts + } + + // If this is our first time running into this Agent + if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok { + s.running[offer.GetSlaveId().GoString()] = make(map[string]bool) + } + + // Add task to list of tasks running on node + s.running[offer.GetSlaveId().GoString()][taskName] = true + + resources := []*mesos.Resource{ + mesosutil.NewScalarResource("cpus", task.CPU), + mesosutil.NewScalarResource("mem", task.RAM), + } + + if !s.ignoreWatts { + resources = append(resources, mesosutil.NewScalarResource("watts", task.Watts)) + } + + return &mesos.TaskInfo{ + Name: proto.String(taskName), + TaskId: &mesos.TaskID{ + Value: proto.String("electron-" + taskName), + }, + SlaveId: offer.SlaveId, + Resources: resources, + Command: &mesos.CommandInfo{ + Value: proto.String(task.CMD), + }, + Container: &mesos.ContainerInfo{ + Type: mesos.ContainerInfo_DOCKER.Enum(), + Docker: &mesos.ContainerInfo_DockerInfo{ + Image: proto.String(task.Image), + Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated + }, + }, + } +} + +func (s *BinPackWatts) Registered( + _ sched.SchedulerDriver, + frameworkID *mesos.FrameworkID, + masterInfo *mesos.MasterInfo) { + log.Printf("Framework %s registered with master %s", frameworkID, masterInfo) +} + +func (s *BinPackWatts) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) { + log.Printf("Framework re-registered with master %s", masterInfo) +} + +func (s *BinPackWatts) Disconnected(sched.SchedulerDriver) { + log.Println("Framework disconnected with master") +} + +func (s *BinPackWatts) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { + log.Printf("Received %d resource offers", len(offers)) + + for _, offer := range offers { + select { + case <-s.Shutdown: + log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") + driver.DeclineOffer(offer.Id, longFilter) + + log.Println("Number of tasks still running: ", s.tasksRunning) + continue + default: + } + + tasks := []*mesos.TaskInfo{} + + // First fit strategy + + taken := false + for i, task := range s.tasks { + + // Check host if it exists + if task.Host != "" { + // Don't take offer if it doesn't match our task's host requirement + if !strings.HasPrefix(*offer.Hostname, task.Host) { + continue + } + } + + // Decision to take the offer or not + if s.takeOffer(offer, task) { + + log.Println("Co-Located with: ") + coLocated(s.running[offer.GetSlaveId().GoString()]) + + tasks = append(tasks, s.newTask(offer, task)) + + log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) + + taken = true + + fmt.Println("Inst: ", *task.Instances) + *task.Instances-- + + if *task.Instances <= 0 { + // All instances of task have been scheduled, remove it + s.tasks[i] = s.tasks[len(s.tasks)-1] + s.tasks = s.tasks[:len(s.tasks)-1] + + if len(s.tasks) <= 0 { + log.Println("Done scheduling all tasks") + close(s.Shutdown) + } + } + break // Offer taken, move on + } + } + + // If there was no match for the task + if !taken { + fmt.Println("There is not enough resources to launch a task:") + cpus, mem, watts := OfferAgg(offer) + + log.Printf("\n", cpus, mem, watts) + driver.DeclineOffer(offer.Id, defaultFilter) + } + + } +} + +func (s *BinPackWatts) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { + log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) + + if *status.State == mesos.TaskState_TASK_RUNNING { + s.tasksRunning++ + } else if IsTerminal(status.State) { + delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) + s.tasksRunning-- + if s.tasksRunning == 0 { + select { + case <-s.Shutdown: + close(s.Done) + default: + } + } + } + log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) +} + +func (s *BinPackWatts) FrameworkMessage( + driver sched.SchedulerDriver, + executorID *mesos.ExecutorID, + slaveID *mesos.SlaveID, + message string) { + + log.Println("Getting a framework message: ", message) + log.Printf("Received a framework message from some unknown source: %s", *executorID.Value) +} + +func (s *BinPackWatts) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) { + log.Printf("Offer %s rescinded", offerID) +} +func (s *BinPackWatts) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) { + log.Printf("Slave %s lost", slaveID) +} +func (s *BinPackWatts) ExecutorLost(_ sched.SchedulerDriver, executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, status int) { + log.Printf("Executor %s on slave %s was lost", executorID, slaveID) +} + +func (s *BinPackWatts) Error(_ sched.SchedulerDriver, err string) { + log.Printf("Receiving an error: %s", err) +} diff --git a/schedulers/firstfit.go b/schedulers/firstfit.go new file mode 100644 index 0000000..91a68f1 --- /dev/null +++ b/schedulers/firstfit.go @@ -0,0 +1,242 @@ +package schedulers + +import ( + "bitbucket.org/bingcloud/electron/def" + "fmt" + "github.com/golang/protobuf/proto" + mesos "github.com/mesos/mesos-go/mesosproto" + "github.com/mesos/mesos-go/mesosutil" + sched "github.com/mesos/mesos-go/scheduler" + "log" + "strings" + "time" +) + +// Decides if to take an offer or not +func (*FirstFit) takeOffer(offer *mesos.Offer, task def.Task) bool { + + cpus, mem, watts := OfferAgg(offer) + + //TODO: Insert watts calculation here instead of taking them as a parameter + + if cpus >= task.CPU && mem >= task.RAM && watts >= task.Watts { + return true + } + + return false +} + +// electronScheduler implements the Scheduler interface +type FirstFit struct { + tasksCreated int + tasksRunning int + tasks []def.Task + metrics map[string]def.Metric + running map[string]map[string]bool + ignoreWatts bool + + // First set of PCP values are garbage values, signal to logger to start recording when we're + // about to schedule a new task + RecordPCP bool + + // This channel is closed when the program receives an interrupt, + // signalling that the program should shut down. + Shutdown chan struct{} + // This channel is closed after shutdown is closed, and only when all + // outstanding tasks have been cleaned up + Done chan struct{} + + // Controls when to shutdown pcp logging + PCPLog chan struct{} +} + +// New electron scheduler +func NewFirstFit(tasks []def.Task, ignoreWatts bool) *FirstFit { + + s := &FirstFit{ + tasks: tasks, + ignoreWatts: ignoreWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + RecordPCP: false, + } + return s +} + +func (s *FirstFit) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { + taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) + s.tasksCreated++ + + if !s.RecordPCP { + // Turn on logging + s.RecordPCP = true + time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts + } + + // If this is our first time running into this Agent + if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok { + s.running[offer.GetSlaveId().GoString()] = make(map[string]bool) + } + + // Add task to list of tasks running on node + s.running[offer.GetSlaveId().GoString()][taskName] = true + + resources := []*mesos.Resource{ + mesosutil.NewScalarResource("cpus", task.CPU), + mesosutil.NewScalarResource("mem", task.RAM), + } + + if !s.ignoreWatts { + resources = append(resources, mesosutil.NewScalarResource("watts", task.Watts)) + } + + return &mesos.TaskInfo{ + Name: proto.String(taskName), + TaskId: &mesos.TaskID{ + Value: proto.String("electron-" + taskName), + }, + SlaveId: offer.SlaveId, + Resources: resources, + Command: &mesos.CommandInfo{ + Value: proto.String(task.CMD), + }, + Container: &mesos.ContainerInfo{ + Type: mesos.ContainerInfo_DOCKER.Enum(), + Docker: &mesos.ContainerInfo_DockerInfo{ + Image: proto.String(task.Image), + Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated + }, + }, + } +} + +func (s *FirstFit) Registered( + _ sched.SchedulerDriver, + frameworkID *mesos.FrameworkID, + masterInfo *mesos.MasterInfo) { + log.Printf("Framework %s registered with master %s", frameworkID, masterInfo) +} + +func (s *FirstFit) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) { + log.Printf("Framework re-registered with master %s", masterInfo) +} + +func (s *FirstFit) Disconnected(sched.SchedulerDriver) { + log.Println("Framework disconnected with master") +} + +func (s *FirstFit) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { + log.Printf("Received %d resource offers", len(offers)) + + for _, offer := range offers { + select { + case <-s.Shutdown: + log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") + driver.DeclineOffer(offer.Id, longFilter) + + log.Println("Number of tasks still running: ", s.tasksRunning) + continue + default: + } + + tasks := []*mesos.TaskInfo{} + + // First fit strategy + + taken := false + for i, task := range s.tasks { + + // Check host if it exists + if task.Host != "" { + // Don't take offer if it doesn't match our task's host requirement + if !strings.HasPrefix(*offer.Hostname, task.Host) { + continue + } + } + + // Decision to take the offer or not + if s.takeOffer(offer, task) { + + log.Println("Co-Located with: ") + coLocated(s.running[offer.GetSlaveId().GoString()]) + + tasks = append(tasks, s.newTask(offer, task)) + + log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) + + taken = true + + fmt.Println("Inst: ", *task.Instances) + *task.Instances-- + + if *task.Instances <= 0 { + // All instances of task have been scheduled, remove it + s.tasks[i] = s.tasks[len(s.tasks)-1] + s.tasks = s.tasks[:len(s.tasks)-1] + + if len(s.tasks) <= 0 { + log.Println("Done scheduling all tasks") + close(s.Shutdown) + } + } + break // Offer taken, move on + } + } + + // If there was no match for the task + if !taken { + fmt.Println("There is not enough resources to launch a task:") + cpus, mem, watts := OfferAgg(offer) + + log.Printf("\n", cpus, mem, watts) + driver.DeclineOffer(offer.Id, defaultFilter) + } + + } +} + +func (s *FirstFit) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { + log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) + + if *status.State == mesos.TaskState_TASK_RUNNING { + s.tasksRunning++ + } else if IsTerminal(status.State) { + delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) + s.tasksRunning-- + if s.tasksRunning == 0 { + select { + case <-s.Shutdown: + close(s.Done) + default: + } + } + } + log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) +} + +func (s *FirstFit) FrameworkMessage( + driver sched.SchedulerDriver, + executorID *mesos.ExecutorID, + slaveID *mesos.SlaveID, + message string) { + + log.Println("Getting a framework message: ", message) + log.Printf("Received a framework message from some unknown source: %s", *executorID.Value) +} + +func (s *FirstFit) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) { + log.Printf("Offer %s rescinded", offerID) +} +func (s *FirstFit) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) { + log.Printf("Slave %s lost", slaveID) +} +func (s *FirstFit) ExecutorLost(_ sched.SchedulerDriver, executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, status int) { + log.Printf("Executor %s on slave %s was lost", executorID, slaveID) +} + +func (s *FirstFit) Error(_ sched.SchedulerDriver, err string) { + log.Printf("Receiving an error: %s", err) +} diff --git a/schedulers/helpers.go b/schedulers/helpers.go new file mode 100644 index 0000000..2c6ffd2 --- /dev/null +++ b/schedulers/helpers.go @@ -0,0 +1,39 @@ +package schedulers + +import ( + "fmt" + "github.com/golang/protobuf/proto" + mesos "github.com/mesos/mesos-go/mesosproto" + "log" +) + +var ( + defaultFilter = &mesos.Filters{RefuseSeconds: proto.Float64(1)} + longFilter = &mesos.Filters{RefuseSeconds: proto.Float64(1000)} +) + +func OfferAgg(offer *mesos.Offer) (float64, float64, float64) { + var cpus, mem, watts float64 + + for _, resource := range offer.Resources { + switch resource.GetName() { + case "cpus": + cpus += *resource.GetScalar().Value + case "mem": + mem += *resource.GetScalar().Value + case "watts": + watts += *resource.GetScalar().Value + } + } + + return cpus, mem, watts +} + +func coLocated(tasks map[string]bool) { + + for task := range tasks { + log.Println(task) + } + + fmt.Println("---------------------") +} diff --git a/states.go b/schedulers/states.go similarity index 98% rename from states.go rename to schedulers/states.go index 69a227b..8aa775e 100644 --- a/states.go +++ b/schedulers/states.go @@ -1,8 +1,8 @@ -package main +package schedulers import ( - mesos "github.com/mesos/mesos-go/mesosproto" "fmt" + mesos "github.com/mesos/mesos-go/mesosproto" ) // NameFor returns the string name for a TaskState.