Moved schedulers from the main programs to schedulers package. Can now choose different scheduelrs to use. Work on code sharing between schedulers remains to be done.
This commit is contained in:
parent
407c350d3c
commit
c2e2b7e554
9 changed files with 575 additions and 323 deletions
8
def/metric.go
Normal file
8
def/metric.go
Normal file
|
@ -0,0 +1,8 @@
|
|||
package def
|
||||
|
||||
type Metric struct {
|
||||
Name string `json:"name"`
|
||||
CPU float64 `json:"cpu"`
|
||||
RAM float64 `json:"ram"`
|
||||
Watts float64 `json:"watts"`
|
||||
}
|
|
@ -1,20 +1,20 @@
|
|||
package main
|
||||
package def
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"os"
|
||||
"github.com/pkg/errors"
|
||||
"os"
|
||||
)
|
||||
|
||||
type Task struct {
|
||||
Name string `json:"name"`
|
||||
CPU float64 `json:"cpu"`
|
||||
RAM float64 `json:"ram"`
|
||||
Watts float64 `json:"watts"`
|
||||
Image string `json:"image"`
|
||||
CMD string `json:"cmd"`
|
||||
Instances *int `json:"inst"`
|
||||
Host string `json:"host"`
|
||||
Name string `json:"name"`
|
||||
CPU float64 `json:"cpu"`
|
||||
RAM float64 `json:"ram"`
|
||||
Watts float64 `json:"watts"`
|
||||
Image string `json:"image"`
|
||||
CMD string `json:"cmd"`
|
||||
Instances *int `json:"inst"`
|
||||
Host string `json:"host"`
|
||||
}
|
||||
|
||||
func TasksFromJSON(uri string) ([]Task, error) {
|
||||
|
@ -32,4 +32,4 @@ func TasksFromJSON(uri string) ([]Task, error) {
|
|||
}
|
||||
|
||||
return tasks, nil
|
||||
}
|
||||
}
|
|
@ -1,8 +0,0 @@
|
|||
package main
|
||||
|
||||
type Metric struct{
|
||||
Name string `json:"name"`
|
||||
CPU float64 `json:"cpu"`
|
||||
RAM float64 `json:"ram"`
|
||||
Watts float64 `json:"watts"`
|
||||
}
|
33
pcp/pcp.go
33
pcp/pcp.go
|
@ -3,10 +3,10 @@ package pcp
|
|||
import (
|
||||
"bufio"
|
||||
"log"
|
||||
"os/exec"
|
||||
"time"
|
||||
"os"
|
||||
"os/exec"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
||||
func Start(quit chan struct{}, logging *bool, prefix string) {
|
||||
|
@ -15,8 +15,7 @@ func Start(quit chan struct{}, logging *bool, prefix string) {
|
|||
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
|
||||
startTime := time.Now().Format("20060102150405")
|
||||
|
||||
|
||||
logFile, err := os.Create("./"+prefix+startTime+".pcplog")
|
||||
logFile, err := os.Create("./" + prefix + startTime + ".pcplog")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
@ -39,12 +38,12 @@ func Start(quit chan struct{}, logging *bool, prefix string) {
|
|||
logFile.WriteString(scanner.Text() + "\n")
|
||||
|
||||
/*
|
||||
headers := strings.Split(scanner.Text(), ",")
|
||||
headers := strings.Split(scanner.Text(), ",")
|
||||
|
||||
for _, hostMetric := range headers {
|
||||
split := strings.Split(hostMetric, ":")
|
||||
fmt.Printf("Host %s: Metric: %s\n", split[0], split[1])
|
||||
}
|
||||
for _, hostMetric := range headers {
|
||||
split := strings.Split(hostMetric, ":")
|
||||
fmt.Printf("Host %s: Metric: %s\n", split[0], split[1])
|
||||
}
|
||||
*/
|
||||
|
||||
// Throw away first set of results
|
||||
|
@ -53,17 +52,16 @@ func Start(quit chan struct{}, logging *bool, prefix string) {
|
|||
seconds := 0
|
||||
for scanner.Scan() {
|
||||
|
||||
|
||||
if(*logging) {
|
||||
if *logging {
|
||||
log.Println("Logging PCP...")
|
||||
logFile.WriteString(scanner.Text() + "\n")
|
||||
}
|
||||
|
||||
/*
|
||||
fmt.Printf("Second: %d\n", seconds)
|
||||
for i, val := range strings.Split(scanner.Text(), ",") {
|
||||
fmt.Printf("host metric: %s val: %s\n", headers[i], val)
|
||||
}*/
|
||||
fmt.Printf("Second: %d\n", seconds)
|
||||
for i, val := range strings.Split(scanner.Text(), ",") {
|
||||
fmt.Printf("host metric: %s val: %s\n", headers[i], val)
|
||||
}*/
|
||||
|
||||
seconds++
|
||||
|
||||
|
@ -73,15 +71,14 @@ func Start(quit chan struct{}, logging *bool, prefix string) {
|
|||
|
||||
log.Println("PCP logging started")
|
||||
|
||||
|
||||
if err := cmd.Start(); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
pgid, err := syscall.Getpgid(cmd.Process.Pid)
|
||||
|
||||
select{
|
||||
case <- quit:
|
||||
select {
|
||||
case <-quit:
|
||||
log.Println("Stopping PCP logging in 5 seconds")
|
||||
time.Sleep(5 * time.Second)
|
||||
|
||||
|
|
301
scheduler.go
301
scheduler.go
|
@ -1,310 +1,43 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bitbucket.org/bingcloud/electron/def"
|
||||
"bitbucket.org/bingcloud/electron/pcp"
|
||||
"bitbucket.org/bingcloud/electron/schedulers"
|
||||
"flag"
|
||||
"fmt"
|
||||
"github.com/golang/protobuf/proto"
|
||||
mesos "github.com/mesos/mesos-go/mesosproto"
|
||||
"github.com/mesos/mesos-go/mesosutil"
|
||||
sched "github.com/mesos/mesos-go/scheduler"
|
||||
"log"
|
||||
"os"
|
||||
"time"
|
||||
"bitbucket.org/bingcloud/electron/pcp"
|
||||
"strings"
|
||||
"os/signal"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
shutdownTimeout = time.Duration(30) * time.Second
|
||||
)
|
||||
|
||||
var (
|
||||
defaultFilter = &mesos.Filters{RefuseSeconds: proto.Float64(1)}
|
||||
longFilter = &mesos.Filters{RefuseSeconds: proto.Float64(1000)}
|
||||
IGNORE_WATTS = false
|
||||
)
|
||||
|
||||
func CoLocated(tasks map[string]bool) {
|
||||
|
||||
for task := range tasks {
|
||||
log.Println(task)
|
||||
}
|
||||
|
||||
fmt.Println("---------------------")
|
||||
}
|
||||
|
||||
func OfferAgg(offer *mesos.Offer) (float64, float64, float64) {
|
||||
var cpus, mem, watts float64
|
||||
|
||||
for _, resource := range offer.Resources {
|
||||
switch resource.GetName() {
|
||||
case "cpus":
|
||||
cpus += *resource.GetScalar().Value
|
||||
case "mem":
|
||||
mem += *resource.GetScalar().Value
|
||||
case "watts":
|
||||
watts += *resource.GetScalar().Value
|
||||
}
|
||||
}
|
||||
|
||||
return cpus, mem, watts
|
||||
}
|
||||
|
||||
// Decides if to take an offer or not
|
||||
func TakeOffer(offer *mesos.Offer, task Task) bool {
|
||||
|
||||
cpus, mem, watts := OfferAgg(offer)
|
||||
|
||||
//TODO: Insert watts calculation here instead of taking them as a parameter
|
||||
|
||||
if cpus >= task.CPU && mem >= task.RAM && watts >= task.Watts {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// electronScheduler implements the Scheduler interface
|
||||
type electronScheduler struct {
|
||||
tasksCreated int
|
||||
tasksRunning int
|
||||
tasks []Task
|
||||
metrics map[string]Metric
|
||||
running map[string]map[string]bool
|
||||
|
||||
// First set of PCP values are garbage values, signal to logger to start recording when we're
|
||||
// about to schedule a new task
|
||||
recordPCP bool
|
||||
|
||||
// This channel is closed when the program receives an interrupt,
|
||||
// signalling that the program should shut down.
|
||||
shutdown chan struct{}
|
||||
// This channel is closed after shutdown is closed, and only when all
|
||||
// outstanding tasks have been cleaned up
|
||||
done chan struct{}
|
||||
|
||||
|
||||
// Controls when to shutdown pcp logging
|
||||
pcpLog chan struct{}
|
||||
}
|
||||
|
||||
// New electron scheduler
|
||||
func newElectronScheduler(tasks []Task) *electronScheduler {
|
||||
|
||||
s := &electronScheduler{
|
||||
tasks: tasks,
|
||||
shutdown: make(chan struct{}),
|
||||
done: make(chan struct{}),
|
||||
pcpLog: make(chan struct{}),
|
||||
running: make(map[string]map[string]bool),
|
||||
recordPCP: false,
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *electronScheduler) newTask(offer *mesos.Offer, task Task) *mesos.TaskInfo {
|
||||
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
|
||||
s.tasksCreated++
|
||||
|
||||
if !s.recordPCP {
|
||||
// Turn on logging
|
||||
s.recordPCP = true
|
||||
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
|
||||
}
|
||||
|
||||
// If this is our first time running into this Agent
|
||||
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
|
||||
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
|
||||
}
|
||||
|
||||
// Add task to list of tasks running on node
|
||||
s.running[offer.GetSlaveId().GoString()][taskName] = true
|
||||
|
||||
resources := []*mesos.Resource{
|
||||
mesosutil.NewScalarResource("cpus", task.CPU),
|
||||
mesosutil.NewScalarResource("mem", task.RAM),
|
||||
}
|
||||
|
||||
if(!IGNORE_WATTS) {
|
||||
resources = append(resources, mesosutil.NewScalarResource("watts", task.Watts))
|
||||
}
|
||||
|
||||
return &mesos.TaskInfo{
|
||||
Name: proto.String(taskName),
|
||||
TaskId: &mesos.TaskID{
|
||||
Value: proto.String("electron-" + taskName),
|
||||
},
|
||||
SlaveId: offer.SlaveId,
|
||||
Resources: resources,
|
||||
Command: &mesos.CommandInfo{
|
||||
Value: proto.String(task.CMD),
|
||||
},
|
||||
Container: &mesos.ContainerInfo{
|
||||
Type: mesos.ContainerInfo_DOCKER.Enum(),
|
||||
Docker: &mesos.ContainerInfo_DockerInfo{
|
||||
Image: proto.String(task.Image),
|
||||
Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated
|
||||
},
|
||||
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (s *electronScheduler) Registered(
|
||||
_ sched.SchedulerDriver,
|
||||
frameworkID *mesos.FrameworkID,
|
||||
masterInfo *mesos.MasterInfo) {
|
||||
log.Printf("Framework %s registered with master %s", frameworkID, masterInfo)
|
||||
}
|
||||
|
||||
func (s *electronScheduler) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) {
|
||||
log.Printf("Framework re-registered with master %s", masterInfo)
|
||||
}
|
||||
|
||||
func (s *electronScheduler) Disconnected(sched.SchedulerDriver) {
|
||||
log.Println("Framework disconnected with master")
|
||||
}
|
||||
|
||||
func (s *electronScheduler) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {
|
||||
log.Printf("Received %d resource offers", len(offers))
|
||||
|
||||
for _, offer := range offers {
|
||||
select {
|
||||
case <-s.shutdown:
|
||||
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
|
||||
driver.DeclineOffer(offer.Id, longFilter)
|
||||
|
||||
log.Println("Number of tasks still running: ", s.tasksRunning)
|
||||
continue
|
||||
default:
|
||||
}
|
||||
|
||||
tasks := []*mesos.TaskInfo{}
|
||||
|
||||
// First fit strategy
|
||||
|
||||
taken := false
|
||||
for i, task := range s.tasks {
|
||||
|
||||
// Check host if it exists
|
||||
if task.Host != "" {
|
||||
// Don't take offer if it doesn't match our task's host requirement
|
||||
if !strings.HasPrefix(*offer.Hostname, task.Host) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Decision to take the offer or not
|
||||
if TakeOffer(offer, task) {
|
||||
|
||||
log.Println("Co-Located with: ")
|
||||
CoLocated(s.running[offer.GetSlaveId().GoString()])
|
||||
|
||||
tasks = append(tasks, s.newTask(offer, task))
|
||||
|
||||
log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname())
|
||||
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter)
|
||||
|
||||
taken = true
|
||||
|
||||
fmt.Println("Inst: ", *task.Instances)
|
||||
*task.Instances--
|
||||
|
||||
if *task.Instances <= 0 {
|
||||
// All instances of task have been scheduled, remove it
|
||||
s.tasks[i] = s.tasks[len(s.tasks)-1]
|
||||
s.tasks = s.tasks[:len(s.tasks)-1]
|
||||
|
||||
if(len(s.tasks) <= 0) {
|
||||
log.Println("Done scheduling all tasks")
|
||||
close(s.shutdown)
|
||||
}
|
||||
}
|
||||
break // Offer taken, move on
|
||||
}
|
||||
}
|
||||
|
||||
// If there was no match for the task
|
||||
if !taken {
|
||||
fmt.Println("There is not enough resources to launch a task:")
|
||||
cpus, mem, watts := OfferAgg(offer)
|
||||
|
||||
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
|
||||
driver.DeclineOffer(offer.Id, defaultFilter)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func (s *electronScheduler) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) {
|
||||
log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
|
||||
|
||||
if *status.State == mesos.TaskState_TASK_RUNNING {
|
||||
s.tasksRunning++
|
||||
} else if IsTerminal(status.State) {
|
||||
delete(s.running[status.GetSlaveId().GoString()],*status.TaskId.Value)
|
||||
s.tasksRunning--
|
||||
if s.tasksRunning == 0 {
|
||||
select {
|
||||
case <-s.shutdown:
|
||||
close(s.done)
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
|
||||
}
|
||||
|
||||
func (s *electronScheduler) FrameworkMessage(
|
||||
driver sched.SchedulerDriver,
|
||||
executorID *mesos.ExecutorID,
|
||||
slaveID *mesos.SlaveID,
|
||||
message string) {
|
||||
|
||||
log.Println("Getting a framework message: ", message)
|
||||
log.Printf("Received a framework message from some unknown source: %s", *executorID.Value)
|
||||
}
|
||||
|
||||
func (s *electronScheduler) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) {
|
||||
log.Printf("Offer %s rescinded", offerID)
|
||||
}
|
||||
func (s *electronScheduler) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) {
|
||||
log.Printf("Slave %s lost", slaveID)
|
||||
}
|
||||
func (s *electronScheduler) ExecutorLost(_ sched.SchedulerDriver, executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, status int) {
|
||||
log.Printf("Executor %s on slave %s was lost", executorID, slaveID)
|
||||
}
|
||||
|
||||
func (s *electronScheduler) Error(_ sched.SchedulerDriver, err string) {
|
||||
log.Printf("Receiving an error: %s", err)
|
||||
}
|
||||
|
||||
var master = flag.String("master", "xavier:5050", "Location of leading Mesos master")
|
||||
var tasksFile = flag.String("workload", "", "JSON file containing task definitions")
|
||||
var ignoreWatts = flag.Bool("ignoreWatts", false, "Ignore watts in offers")
|
||||
var pcplogPrefix = flag.String("logPrefix", "", "Prefix for pcplog")
|
||||
|
||||
// Short hand args
|
||||
func init(){
|
||||
func init() {
|
||||
flag.StringVar(master, "m", "xavier:5050", "Location of leading Mesos master (shorthand)")
|
||||
flag.StringVar(tasksFile, "w", "", "JSON file containing task definitions (shorthand)")
|
||||
flag.BoolVar(ignoreWatts, "i", false, "Ignore watts in offers (shorthand)")
|
||||
flag.StringVar(pcplogPrefix, "p", "", "Prefix for pcplog")
|
||||
flag.StringVar(pcplogPrefix, "p", "", "Prefix for pcplog (shorthand)")
|
||||
}
|
||||
|
||||
func main() {
|
||||
flag.Parse()
|
||||
|
||||
IGNORE_WATTS = *ignoreWatts
|
||||
|
||||
if *tasksFile == "" {
|
||||
fmt.Println("No file containing tasks specifiction provided.")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
tasks, err := TasksFromJSON(*tasksFile)
|
||||
if(err != nil || len(tasks) == 0) {
|
||||
tasks, err := def.TasksFromJSON(*tasksFile)
|
||||
if err != nil || len(tasks) == 0 {
|
||||
fmt.Println("Invalid tasks specification file provided")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
@ -314,7 +47,7 @@ func main() {
|
|||
fmt.Println(task)
|
||||
}
|
||||
|
||||
scheduler := newElectronScheduler(tasks)
|
||||
scheduler := schedulers.NewFirstFit(tasks, *ignoreWatts)
|
||||
driver, err := sched.NewMesosSchedulerDriver(sched.DriverConfig{
|
||||
Master: *master,
|
||||
Framework: &mesos.FrameworkInfo{
|
||||
|
@ -328,7 +61,7 @@ func main() {
|
|||
return
|
||||
}
|
||||
|
||||
go pcp.Start(scheduler.pcpLog, &scheduler.recordPCP, *pcplogPrefix)
|
||||
go pcp.Start(scheduler.PCPLog, &scheduler.RecordPCP, *pcplogPrefix)
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
// Attempt to handle signint to not leave pmdumptext running
|
||||
|
@ -338,28 +71,28 @@ func main() {
|
|||
signal.Notify(c, os.Interrupt, os.Kill)
|
||||
s := <-c
|
||||
if s != os.Interrupt {
|
||||
close(scheduler.pcpLog)
|
||||
close(scheduler.PCPLog)
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("Received SIGINT...stopping")
|
||||
close(scheduler.done)
|
||||
close(scheduler.Done)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
|
||||
// Signals we have scheduled every task we have
|
||||
select {
|
||||
case <-scheduler.shutdown:
|
||||
// case <-time.After(shutdownTimeout):
|
||||
case <-scheduler.Shutdown:
|
||||
// case <-time.After(shutdownTimeout):
|
||||
}
|
||||
|
||||
// All tasks have finished
|
||||
select {
|
||||
case <-scheduler.done:
|
||||
close(scheduler.pcpLog)
|
||||
case <-scheduler.Done:
|
||||
close(scheduler.PCPLog)
|
||||
time.Sleep(5 * time.Second) //Wait for PCP to log a few more seconds
|
||||
// case <-time.After(shutdownTimeout):
|
||||
// case <-time.After(shutdownTimeout):
|
||||
}
|
||||
|
||||
// Done shutting down
|
||||
|
|
241
schedulers/binpackwatts.go
Normal file
241
schedulers/binpackwatts.go
Normal file
|
@ -0,0 +1,241 @@
|
|||
package schedulers
|
||||
|
||||
import (
|
||||
"bitbucket.org/bingcloud/electron/def"
|
||||
"fmt"
|
||||
"github.com/golang/protobuf/proto"
|
||||
mesos "github.com/mesos/mesos-go/mesosproto"
|
||||
"github.com/mesos/mesos-go/mesosutil"
|
||||
sched "github.com/mesos/mesos-go/scheduler"
|
||||
"log"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Decides if to take an offer or not
|
||||
func (*BinPackWatts) takeOffer(offer *mesos.Offer, task def.Task) bool {
|
||||
|
||||
cpus, mem, watts := OfferAgg(offer)
|
||||
|
||||
//TODO: Insert watts calculation here instead of taking them as a parameter
|
||||
|
||||
if cpus >= task.CPU && mem >= task.RAM && watts >= task.Watts {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
type BinPackWatts struct {
|
||||
tasksCreated int
|
||||
tasksRunning int
|
||||
tasks []def.Task
|
||||
metrics map[string]def.Metric
|
||||
running map[string]map[string]bool
|
||||
ignoreWatts bool
|
||||
|
||||
// First set of PCP values are garbage values, signal to logger to start recording when we're
|
||||
// about to schedule a new task
|
||||
RecordPCP bool
|
||||
|
||||
// This channel is closed when the program receives an interrupt,
|
||||
// signalling that the program should shut down.
|
||||
Shutdown chan struct{}
|
||||
// This channel is closed after shutdown is closed, and only when all
|
||||
// outstanding tasks have been cleaned up
|
||||
Done chan struct{}
|
||||
|
||||
// Controls when to shutdown pcp logging
|
||||
PCPLog chan struct{}
|
||||
}
|
||||
|
||||
// New electron scheduler
|
||||
func NewBinPackWatts(tasks []def.Task, ignoreWatts bool) *BinPackWatts {
|
||||
|
||||
s := &BinPackWatts{
|
||||
tasks: tasks,
|
||||
ignoreWatts: ignoreWatts,
|
||||
Shutdown: make(chan struct{}),
|
||||
Done: make(chan struct{}),
|
||||
PCPLog: make(chan struct{}),
|
||||
running: make(map[string]map[string]bool),
|
||||
RecordPCP: false,
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *BinPackWatts) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
|
||||
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
|
||||
s.tasksCreated++
|
||||
|
||||
if !s.RecordPCP {
|
||||
// Turn on logging
|
||||
s.RecordPCP = true
|
||||
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
|
||||
}
|
||||
|
||||
// If this is our first time running into this Agent
|
||||
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
|
||||
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
|
||||
}
|
||||
|
||||
// Add task to list of tasks running on node
|
||||
s.running[offer.GetSlaveId().GoString()][taskName] = true
|
||||
|
||||
resources := []*mesos.Resource{
|
||||
mesosutil.NewScalarResource("cpus", task.CPU),
|
||||
mesosutil.NewScalarResource("mem", task.RAM),
|
||||
}
|
||||
|
||||
if !s.ignoreWatts {
|
||||
resources = append(resources, mesosutil.NewScalarResource("watts", task.Watts))
|
||||
}
|
||||
|
||||
return &mesos.TaskInfo{
|
||||
Name: proto.String(taskName),
|
||||
TaskId: &mesos.TaskID{
|
||||
Value: proto.String("electron-" + taskName),
|
||||
},
|
||||
SlaveId: offer.SlaveId,
|
||||
Resources: resources,
|
||||
Command: &mesos.CommandInfo{
|
||||
Value: proto.String(task.CMD),
|
||||
},
|
||||
Container: &mesos.ContainerInfo{
|
||||
Type: mesos.ContainerInfo_DOCKER.Enum(),
|
||||
Docker: &mesos.ContainerInfo_DockerInfo{
|
||||
Image: proto.String(task.Image),
|
||||
Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (s *BinPackWatts) Registered(
|
||||
_ sched.SchedulerDriver,
|
||||
frameworkID *mesos.FrameworkID,
|
||||
masterInfo *mesos.MasterInfo) {
|
||||
log.Printf("Framework %s registered with master %s", frameworkID, masterInfo)
|
||||
}
|
||||
|
||||
func (s *BinPackWatts) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) {
|
||||
log.Printf("Framework re-registered with master %s", masterInfo)
|
||||
}
|
||||
|
||||
func (s *BinPackWatts) Disconnected(sched.SchedulerDriver) {
|
||||
log.Println("Framework disconnected with master")
|
||||
}
|
||||
|
||||
func (s *BinPackWatts) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {
|
||||
log.Printf("Received %d resource offers", len(offers))
|
||||
|
||||
for _, offer := range offers {
|
||||
select {
|
||||
case <-s.Shutdown:
|
||||
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
|
||||
driver.DeclineOffer(offer.Id, longFilter)
|
||||
|
||||
log.Println("Number of tasks still running: ", s.tasksRunning)
|
||||
continue
|
||||
default:
|
||||
}
|
||||
|
||||
tasks := []*mesos.TaskInfo{}
|
||||
|
||||
// First fit strategy
|
||||
|
||||
taken := false
|
||||
for i, task := range s.tasks {
|
||||
|
||||
// Check host if it exists
|
||||
if task.Host != "" {
|
||||
// Don't take offer if it doesn't match our task's host requirement
|
||||
if !strings.HasPrefix(*offer.Hostname, task.Host) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Decision to take the offer or not
|
||||
if s.takeOffer(offer, task) {
|
||||
|
||||
log.Println("Co-Located with: ")
|
||||
coLocated(s.running[offer.GetSlaveId().GoString()])
|
||||
|
||||
tasks = append(tasks, s.newTask(offer, task))
|
||||
|
||||
log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname())
|
||||
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter)
|
||||
|
||||
taken = true
|
||||
|
||||
fmt.Println("Inst: ", *task.Instances)
|
||||
*task.Instances--
|
||||
|
||||
if *task.Instances <= 0 {
|
||||
// All instances of task have been scheduled, remove it
|
||||
s.tasks[i] = s.tasks[len(s.tasks)-1]
|
||||
s.tasks = s.tasks[:len(s.tasks)-1]
|
||||
|
||||
if len(s.tasks) <= 0 {
|
||||
log.Println("Done scheduling all tasks")
|
||||
close(s.Shutdown)
|
||||
}
|
||||
}
|
||||
break // Offer taken, move on
|
||||
}
|
||||
}
|
||||
|
||||
// If there was no match for the task
|
||||
if !taken {
|
||||
fmt.Println("There is not enough resources to launch a task:")
|
||||
cpus, mem, watts := OfferAgg(offer)
|
||||
|
||||
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
|
||||
driver.DeclineOffer(offer.Id, defaultFilter)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func (s *BinPackWatts) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) {
|
||||
log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
|
||||
|
||||
if *status.State == mesos.TaskState_TASK_RUNNING {
|
||||
s.tasksRunning++
|
||||
} else if IsTerminal(status.State) {
|
||||
delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value)
|
||||
s.tasksRunning--
|
||||
if s.tasksRunning == 0 {
|
||||
select {
|
||||
case <-s.Shutdown:
|
||||
close(s.Done)
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
|
||||
}
|
||||
|
||||
func (s *BinPackWatts) FrameworkMessage(
|
||||
driver sched.SchedulerDriver,
|
||||
executorID *mesos.ExecutorID,
|
||||
slaveID *mesos.SlaveID,
|
||||
message string) {
|
||||
|
||||
log.Println("Getting a framework message: ", message)
|
||||
log.Printf("Received a framework message from some unknown source: %s", *executorID.Value)
|
||||
}
|
||||
|
||||
func (s *BinPackWatts) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) {
|
||||
log.Printf("Offer %s rescinded", offerID)
|
||||
}
|
||||
func (s *BinPackWatts) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) {
|
||||
log.Printf("Slave %s lost", slaveID)
|
||||
}
|
||||
func (s *BinPackWatts) ExecutorLost(_ sched.SchedulerDriver, executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, status int) {
|
||||
log.Printf("Executor %s on slave %s was lost", executorID, slaveID)
|
||||
}
|
||||
|
||||
func (s *BinPackWatts) Error(_ sched.SchedulerDriver, err string) {
|
||||
log.Printf("Receiving an error: %s", err)
|
||||
}
|
242
schedulers/firstfit.go
Normal file
242
schedulers/firstfit.go
Normal file
|
@ -0,0 +1,242 @@
|
|||
package schedulers
|
||||
|
||||
import (
|
||||
"bitbucket.org/bingcloud/electron/def"
|
||||
"fmt"
|
||||
"github.com/golang/protobuf/proto"
|
||||
mesos "github.com/mesos/mesos-go/mesosproto"
|
||||
"github.com/mesos/mesos-go/mesosutil"
|
||||
sched "github.com/mesos/mesos-go/scheduler"
|
||||
"log"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Decides if to take an offer or not
|
||||
func (*FirstFit) takeOffer(offer *mesos.Offer, task def.Task) bool {
|
||||
|
||||
cpus, mem, watts := OfferAgg(offer)
|
||||
|
||||
//TODO: Insert watts calculation here instead of taking them as a parameter
|
||||
|
||||
if cpus >= task.CPU && mem >= task.RAM && watts >= task.Watts {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// electronScheduler implements the Scheduler interface
|
||||
type FirstFit struct {
|
||||
tasksCreated int
|
||||
tasksRunning int
|
||||
tasks []def.Task
|
||||
metrics map[string]def.Metric
|
||||
running map[string]map[string]bool
|
||||
ignoreWatts bool
|
||||
|
||||
// First set of PCP values are garbage values, signal to logger to start recording when we're
|
||||
// about to schedule a new task
|
||||
RecordPCP bool
|
||||
|
||||
// This channel is closed when the program receives an interrupt,
|
||||
// signalling that the program should shut down.
|
||||
Shutdown chan struct{}
|
||||
// This channel is closed after shutdown is closed, and only when all
|
||||
// outstanding tasks have been cleaned up
|
||||
Done chan struct{}
|
||||
|
||||
// Controls when to shutdown pcp logging
|
||||
PCPLog chan struct{}
|
||||
}
|
||||
|
||||
// New electron scheduler
|
||||
func NewFirstFit(tasks []def.Task, ignoreWatts bool) *FirstFit {
|
||||
|
||||
s := &FirstFit{
|
||||
tasks: tasks,
|
||||
ignoreWatts: ignoreWatts,
|
||||
Shutdown: make(chan struct{}),
|
||||
Done: make(chan struct{}),
|
||||
PCPLog: make(chan struct{}),
|
||||
running: make(map[string]map[string]bool),
|
||||
RecordPCP: false,
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *FirstFit) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
|
||||
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
|
||||
s.tasksCreated++
|
||||
|
||||
if !s.RecordPCP {
|
||||
// Turn on logging
|
||||
s.RecordPCP = true
|
||||
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
|
||||
}
|
||||
|
||||
// If this is our first time running into this Agent
|
||||
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
|
||||
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
|
||||
}
|
||||
|
||||
// Add task to list of tasks running on node
|
||||
s.running[offer.GetSlaveId().GoString()][taskName] = true
|
||||
|
||||
resources := []*mesos.Resource{
|
||||
mesosutil.NewScalarResource("cpus", task.CPU),
|
||||
mesosutil.NewScalarResource("mem", task.RAM),
|
||||
}
|
||||
|
||||
if !s.ignoreWatts {
|
||||
resources = append(resources, mesosutil.NewScalarResource("watts", task.Watts))
|
||||
}
|
||||
|
||||
return &mesos.TaskInfo{
|
||||
Name: proto.String(taskName),
|
||||
TaskId: &mesos.TaskID{
|
||||
Value: proto.String("electron-" + taskName),
|
||||
},
|
||||
SlaveId: offer.SlaveId,
|
||||
Resources: resources,
|
||||
Command: &mesos.CommandInfo{
|
||||
Value: proto.String(task.CMD),
|
||||
},
|
||||
Container: &mesos.ContainerInfo{
|
||||
Type: mesos.ContainerInfo_DOCKER.Enum(),
|
||||
Docker: &mesos.ContainerInfo_DockerInfo{
|
||||
Image: proto.String(task.Image),
|
||||
Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (s *FirstFit) Registered(
|
||||
_ sched.SchedulerDriver,
|
||||
frameworkID *mesos.FrameworkID,
|
||||
masterInfo *mesos.MasterInfo) {
|
||||
log.Printf("Framework %s registered with master %s", frameworkID, masterInfo)
|
||||
}
|
||||
|
||||
func (s *FirstFit) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) {
|
||||
log.Printf("Framework re-registered with master %s", masterInfo)
|
||||
}
|
||||
|
||||
func (s *FirstFit) Disconnected(sched.SchedulerDriver) {
|
||||
log.Println("Framework disconnected with master")
|
||||
}
|
||||
|
||||
func (s *FirstFit) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {
|
||||
log.Printf("Received %d resource offers", len(offers))
|
||||
|
||||
for _, offer := range offers {
|
||||
select {
|
||||
case <-s.Shutdown:
|
||||
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
|
||||
driver.DeclineOffer(offer.Id, longFilter)
|
||||
|
||||
log.Println("Number of tasks still running: ", s.tasksRunning)
|
||||
continue
|
||||
default:
|
||||
}
|
||||
|
||||
tasks := []*mesos.TaskInfo{}
|
||||
|
||||
// First fit strategy
|
||||
|
||||
taken := false
|
||||
for i, task := range s.tasks {
|
||||
|
||||
// Check host if it exists
|
||||
if task.Host != "" {
|
||||
// Don't take offer if it doesn't match our task's host requirement
|
||||
if !strings.HasPrefix(*offer.Hostname, task.Host) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Decision to take the offer or not
|
||||
if s.takeOffer(offer, task) {
|
||||
|
||||
log.Println("Co-Located with: ")
|
||||
coLocated(s.running[offer.GetSlaveId().GoString()])
|
||||
|
||||
tasks = append(tasks, s.newTask(offer, task))
|
||||
|
||||
log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname())
|
||||
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter)
|
||||
|
||||
taken = true
|
||||
|
||||
fmt.Println("Inst: ", *task.Instances)
|
||||
*task.Instances--
|
||||
|
||||
if *task.Instances <= 0 {
|
||||
// All instances of task have been scheduled, remove it
|
||||
s.tasks[i] = s.tasks[len(s.tasks)-1]
|
||||
s.tasks = s.tasks[:len(s.tasks)-1]
|
||||
|
||||
if len(s.tasks) <= 0 {
|
||||
log.Println("Done scheduling all tasks")
|
||||
close(s.Shutdown)
|
||||
}
|
||||
}
|
||||
break // Offer taken, move on
|
||||
}
|
||||
}
|
||||
|
||||
// If there was no match for the task
|
||||
if !taken {
|
||||
fmt.Println("There is not enough resources to launch a task:")
|
||||
cpus, mem, watts := OfferAgg(offer)
|
||||
|
||||
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
|
||||
driver.DeclineOffer(offer.Id, defaultFilter)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func (s *FirstFit) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) {
|
||||
log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
|
||||
|
||||
if *status.State == mesos.TaskState_TASK_RUNNING {
|
||||
s.tasksRunning++
|
||||
} else if IsTerminal(status.State) {
|
||||
delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value)
|
||||
s.tasksRunning--
|
||||
if s.tasksRunning == 0 {
|
||||
select {
|
||||
case <-s.Shutdown:
|
||||
close(s.Done)
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
|
||||
}
|
||||
|
||||
func (s *FirstFit) FrameworkMessage(
|
||||
driver sched.SchedulerDriver,
|
||||
executorID *mesos.ExecutorID,
|
||||
slaveID *mesos.SlaveID,
|
||||
message string) {
|
||||
|
||||
log.Println("Getting a framework message: ", message)
|
||||
log.Printf("Received a framework message from some unknown source: %s", *executorID.Value)
|
||||
}
|
||||
|
||||
func (s *FirstFit) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) {
|
||||
log.Printf("Offer %s rescinded", offerID)
|
||||
}
|
||||
func (s *FirstFit) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) {
|
||||
log.Printf("Slave %s lost", slaveID)
|
||||
}
|
||||
func (s *FirstFit) ExecutorLost(_ sched.SchedulerDriver, executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, status int) {
|
||||
log.Printf("Executor %s on slave %s was lost", executorID, slaveID)
|
||||
}
|
||||
|
||||
func (s *FirstFit) Error(_ sched.SchedulerDriver, err string) {
|
||||
log.Printf("Receiving an error: %s", err)
|
||||
}
|
39
schedulers/helpers.go
Normal file
39
schedulers/helpers.go
Normal file
|
@ -0,0 +1,39 @@
|
|||
package schedulers
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/golang/protobuf/proto"
|
||||
mesos "github.com/mesos/mesos-go/mesosproto"
|
||||
"log"
|
||||
)
|
||||
|
||||
var (
|
||||
defaultFilter = &mesos.Filters{RefuseSeconds: proto.Float64(1)}
|
||||
longFilter = &mesos.Filters{RefuseSeconds: proto.Float64(1000)}
|
||||
)
|
||||
|
||||
func OfferAgg(offer *mesos.Offer) (float64, float64, float64) {
|
||||
var cpus, mem, watts float64
|
||||
|
||||
for _, resource := range offer.Resources {
|
||||
switch resource.GetName() {
|
||||
case "cpus":
|
||||
cpus += *resource.GetScalar().Value
|
||||
case "mem":
|
||||
mem += *resource.GetScalar().Value
|
||||
case "watts":
|
||||
watts += *resource.GetScalar().Value
|
||||
}
|
||||
}
|
||||
|
||||
return cpus, mem, watts
|
||||
}
|
||||
|
||||
func coLocated(tasks map[string]bool) {
|
||||
|
||||
for task := range tasks {
|
||||
log.Println(task)
|
||||
}
|
||||
|
||||
fmt.Println("---------------------")
|
||||
}
|
|
@ -1,8 +1,8 @@
|
|||
package main
|
||||
package schedulers
|
||||
|
||||
import (
|
||||
mesos "github.com/mesos/mesos-go/mesosproto"
|
||||
"fmt"
|
||||
mesos "github.com/mesos/mesos-go/mesosproto"
|
||||
)
|
||||
|
||||
// NameFor returns the string name for a TaskState.
|
Reference in a new issue