2016-09-08 02:06:24 -04:00
package main
import (
"flag"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler"
"log"
"os"
"time"
)
const (
shutdownTimeout = time . Duration ( 30 ) * time . Second
)
var (
defaultFilter = & mesos . Filters { RefuseSeconds : proto . Float64 ( 1 ) }
2016-09-19 20:25:10 -04:00
longFilter = & mesos . Filters { RefuseSeconds : proto . Float64 ( 1000 ) }
2016-09-08 02:06:24 -04:00
)
2016-09-22 20:20:22 -04:00
func CoLocated ( tasks map [ string ] bool ) {
for task := range tasks {
log . Println ( task )
}
fmt . Println ( "---------------------" )
}
2016-09-19 20:25:10 -04:00
func OfferAgg ( offer * mesos . Offer ) ( float64 , float64 , float64 ) {
2016-09-08 02:06:24 -04:00
var cpus , mem , watts float64
for _ , resource := range offer . Resources {
switch resource . GetName ( ) {
case "cpus" :
cpus += * resource . GetScalar ( ) . Value
case "mem" :
mem += * resource . GetScalar ( ) . Value
case "watts" :
watts += * resource . GetScalar ( ) . Value
}
}
2016-09-19 20:25:10 -04:00
return cpus , mem , watts
}
2016-09-15 15:53:56 -04:00
2016-09-19 20:25:10 -04:00
// Decides if to take an offer or not
func TakeOffer ( offer * mesos . Offer , task Task ) bool {
cpus , mem , watts := OfferAgg ( offer )
2016-09-15 15:53:56 -04:00
2016-09-19 20:25:10 -04:00
//TODO: Insert watts calculation here instead of taking them as a parameter
2016-09-16 19:06:53 -04:00
if cpus >= task . CPU && mem >= task . RAM && watts >= task . Watts {
2016-09-15 15:53:56 -04:00
return true
2016-09-08 02:06:24 -04:00
}
2016-09-15 15:53:56 -04:00
return false
2016-09-08 02:06:24 -04:00
}
// rendlerScheduler implements the Scheduler interface and stores
// the state needed for Rendler to function.
type electronScheduler struct {
tasksCreated int
tasksRunning int
2016-09-16 19:06:53 -04:00
tasks [ ] Task
2016-09-22 20:20:22 -04:00
metrics map [ string ] Metric
running map [ string ] map [ string ] bool
2016-09-08 02:06:24 -04:00
// This channel is closed when the program receives an interrupt,
// signalling that the program should shut down.
shutdown chan struct { }
// This channel is closed after shutdown is closed, and only when all
// outstanding tasks have been cleaned up
done chan struct { }
}
// New electron scheduler
2016-09-16 19:06:53 -04:00
func newElectronScheduler ( tasks [ ] Task ) * electronScheduler {
2016-09-08 02:06:24 -04:00
s := & electronScheduler {
2016-09-16 19:06:53 -04:00
tasks : tasks ,
2016-09-08 02:06:24 -04:00
shutdown : make ( chan struct { } ) ,
done : make ( chan struct { } ) ,
2016-09-22 20:20:22 -04:00
running : make ( map [ string ] map [ string ] bool ) ,
2016-09-08 02:06:24 -04:00
}
return s
}
2016-09-16 19:06:53 -04:00
func ( s * electronScheduler ) newTask ( offer * mesos . Offer , task Task ) * mesos . TaskInfo {
2016-09-22 20:20:22 -04:00
taskID := fmt . Sprintf ( "Electron-%s-%d" , task . Name , * task . Instances )
2016-09-08 02:06:24 -04:00
s . tasksCreated ++
2016-09-22 20:20:22 -04:00
// If this is our first time running into this Agent
if _ , ok := s . running [ offer . GetSlaveId ( ) . GoString ( ) ] ; ! ok {
s . running [ offer . GetSlaveId ( ) . GoString ( ) ] = make ( map [ string ] bool )
}
// Add task to list of tasks running on node
s . running [ offer . GetSlaveId ( ) . GoString ( ) ] [ taskID ] = true
2016-09-08 02:06:24 -04:00
return & mesos . TaskInfo {
2016-09-22 20:20:22 -04:00
Name : proto . String ( taskID ) ,
2016-09-08 02:06:24 -04:00
TaskId : & mesos . TaskID {
2016-09-22 20:20:22 -04:00
Value : proto . String ( taskID ) ,
2016-09-08 02:06:24 -04:00
} ,
SlaveId : offer . SlaveId ,
Resources : [ ] * mesos . Resource {
2016-09-16 19:06:53 -04:00
mesosutil . NewScalarResource ( "cpus" , task . CPU ) ,
mesosutil . NewScalarResource ( "mem" , task . RAM ) ,
mesosutil . NewScalarResource ( "watts" , task . Watts ) ,
} ,
Command : & mesos . CommandInfo {
Value : proto . String ( task . CMD ) ,
2016-09-15 15:53:56 -04:00
} ,
Container : & mesos . ContainerInfo {
Type : mesos . ContainerInfo_DOCKER . Enum ( ) ,
Docker : & mesos . ContainerInfo_DockerInfo {
2016-09-16 19:06:53 -04:00
Image : proto . String ( task . Image ) ,
2016-09-15 15:53:56 -04:00
} ,
2016-09-08 02:06:24 -04:00
} ,
}
}
func ( s * electronScheduler ) Registered (
_ sched . SchedulerDriver ,
frameworkID * mesos . FrameworkID ,
masterInfo * mesos . MasterInfo ) {
log . Printf ( "Framework %s registered with master %s" , frameworkID , masterInfo )
}
func ( s * electronScheduler ) Reregistered ( _ sched . SchedulerDriver , masterInfo * mesos . MasterInfo ) {
log . Printf ( "Framework re-registered with master %s" , masterInfo )
}
func ( s * electronScheduler ) Disconnected ( sched . SchedulerDriver ) {
log . Println ( "Framework disconnected with master" )
}
func ( s * electronScheduler ) ResourceOffers ( driver sched . SchedulerDriver , offers [ ] * mesos . Offer ) {
log . Printf ( "Received %d resource offers" , len ( offers ) )
2016-09-17 18:55:35 -04:00
2016-09-08 02:06:24 -04:00
for _ , offer := range offers {
select {
case <- s . shutdown :
2016-09-19 17:51:32 -04:00
log . Println ( "Shutting down: declining offer on [" , offer . GetHostname ( ) , "]" )
2016-09-19 20:25:10 -04:00
driver . DeclineOffer ( offer . Id , longFilter )
2016-09-19 17:51:32 -04:00
2016-09-19 20:25:10 -04:00
log . Println ( "Number of tasks still running: " , s . tasksRunning )
2016-09-08 02:06:24 -04:00
continue
default :
}
2016-09-16 19:06:53 -04:00
2016-09-08 02:06:24 -04:00
tasks := [ ] * mesos . TaskInfo { }
2016-09-16 19:06:53 -04:00
// First fit strategy
taken := false
for i , task := range s . tasks {
// Decision to take the offer or not
2016-09-19 20:25:10 -04:00
if TakeOffer ( offer , task ) {
2016-09-22 20:20:22 -04:00
log . Println ( "Co-Located with: " )
CoLocated ( s . running [ offer . GetSlaveId ( ) . GoString ( ) ] )
2016-09-16 19:06:53 -04:00
tasks = append ( tasks , s . newTask ( offer , task ) )
2016-09-19 20:25:10 -04:00
log . Printf ( "Starting %s on [%s]\n" , task . Name , offer . GetHostname ( ) )
2016-09-16 19:06:53 -04:00
driver . LaunchTasks ( [ ] * mesos . OfferID { offer . Id } , tasks , defaultFilter )
2016-09-19 17:51:32 -04:00
taken = true
2016-09-17 18:55:35 -04:00
fmt . Println ( "Inst: " , * task . Instances )
* task . Instances --
if * task . Instances <= 0 {
2016-09-19 20:25:10 -04:00
// All instances of task have been scheduled, remove it
2016-09-17 18:55:35 -04:00
s . tasks [ i ] = s . tasks [ len ( s . tasks ) - 1 ]
s . tasks = s . tasks [ : len ( s . tasks ) - 1 ]
2016-09-19 20:25:10 -04:00
if ( len ( s . tasks ) <= 0 ) {
log . Println ( "Done scheduling all tasks" )
close ( s . shutdown )
}
2016-09-17 18:55:35 -04:00
}
2016-09-16 19:06:53 -04:00
2016-09-19 20:25:10 -04:00
break // Offer taken, move on
2016-09-19 17:51:32 -04:00
2016-09-16 19:06:53 -04:00
}
}
// If there was no match for the task
if ! taken {
2016-09-19 20:25:10 -04:00
fmt . Println ( "There is not enough resources to launch a task:" )
cpus , mem , watts := OfferAgg ( offer )
log . Printf ( "<CPU: %f, RAM: %f, Watts: %f>\n" , cpus , mem , watts )
2016-09-15 15:53:56 -04:00
driver . DeclineOffer ( offer . Id , defaultFilter )
2016-09-08 02:06:24 -04:00
}
2016-09-16 19:06:53 -04:00
2016-09-08 02:06:24 -04:00
}
}
func ( s * electronScheduler ) StatusUpdate ( driver sched . SchedulerDriver , status * mesos . TaskStatus ) {
2016-09-10 18:40:56 -04:00
log . Printf ( "Received task status [%s] for task [%s]" , NameFor ( status . State ) , * status . TaskId . Value )
2016-09-08 02:06:24 -04:00
if * status . State == mesos . TaskState_TASK_RUNNING {
s . tasksRunning ++
2016-09-10 18:40:56 -04:00
} else if IsTerminal ( status . State ) {
2016-09-22 20:20:22 -04:00
delete ( s . running [ status . GetSlaveId ( ) . GoString ( ) ] , * status . TaskId . Value )
2016-09-08 02:06:24 -04:00
s . tasksRunning --
if s . tasksRunning == 0 {
select {
case <- s . shutdown :
close ( s . done )
default :
}
}
}
2016-09-19 20:25:10 -04:00
log . Printf ( "DONE: Task status [%s] for task [%s]" , NameFor ( status . State ) , * status . TaskId . Value )
2016-09-08 02:06:24 -04:00
}
func ( s * electronScheduler ) FrameworkMessage (
driver sched . SchedulerDriver ,
executorID * mesos . ExecutorID ,
slaveID * mesos . SlaveID ,
message string ) {
log . Println ( "Getting a framework message: " , message )
2016-09-22 20:20:22 -04:00
log . Printf ( "Received a framework message from some unknown source: %s" , * executorID . Value )
2016-09-08 02:06:24 -04:00
}
func ( s * electronScheduler ) OfferRescinded ( _ sched . SchedulerDriver , offerID * mesos . OfferID ) {
log . Printf ( "Offer %s rescinded" , offerID )
}
func ( s * electronScheduler ) SlaveLost ( _ sched . SchedulerDriver , slaveID * mesos . SlaveID ) {
log . Printf ( "Slave %s lost" , slaveID )
}
func ( s * electronScheduler ) ExecutorLost ( _ sched . SchedulerDriver , executorID * mesos . ExecutorID , slaveID * mesos . SlaveID , status int ) {
log . Printf ( "Executor %s on slave %s was lost" , executorID , slaveID )
}
func ( s * electronScheduler ) Error ( _ sched . SchedulerDriver , err string ) {
log . Printf ( "Receiving an error: %s" , err )
}
func main ( ) {
2016-09-15 15:53:56 -04:00
master := flag . String ( "master" , "xavier:5050" , "Location of leading Mesos master" )
2016-09-16 19:06:53 -04:00
tasksFile := flag . String ( "tasks" , "" , "JSON file containing task definitions" )
2016-09-08 02:06:24 -04:00
flag . Parse ( )
2016-09-16 19:06:53 -04:00
if * tasksFile == "" {
fmt . Println ( "No file containing tasks specifiction provided." )
os . Exit ( 1 )
}
tasks , err := TasksFromJSON ( * tasksFile )
if ( err != nil || len ( tasks ) == 0 ) {
fmt . Println ( "Invalid tasks specification file provided" )
os . Exit ( 1 )
}
2016-09-17 18:55:35 -04:00
log . Println ( "Scheduling the following tasks:" )
for _ , task := range tasks {
fmt . Println ( task )
}
2016-09-16 19:06:53 -04:00
scheduler := newElectronScheduler ( tasks )
2016-09-08 02:06:24 -04:00
driver , err := sched . NewMesosSchedulerDriver ( sched . DriverConfig {
Master : * master ,
Framework : & mesos . FrameworkInfo {
2016-09-15 15:53:56 -04:00
Name : proto . String ( "Electron" ) ,
2016-09-08 02:06:24 -04:00
User : proto . String ( "" ) ,
} ,
Scheduler : scheduler ,
} )
if err != nil {
log . Printf ( "Unable to create scheduler driver: %s" , err )
return
}
// Catch interrupt
go func ( ) {
2016-09-19 20:25:10 -04:00
select {
case <- scheduler . shutdown :
// case <-time.After(shutdownTimeout):
2016-09-17 18:55:35 -04:00
}
2016-09-08 02:06:24 -04:00
select {
case <- scheduler . done :
2016-09-17 18:55:35 -04:00
// case <-time.After(shutdownTimeout):
2016-09-08 02:06:24 -04:00
}
// Done shutting down
driver . Stop ( false )
2016-09-17 18:55:35 -04:00
2016-09-08 02:06:24 -04:00
} ( )
2016-09-15 15:53:56 -04:00
log . Printf ( "Starting..." )
2016-09-08 02:06:24 -04:00
if status , err := driver . Run ( ) ; err != nil {
log . Printf ( "Framework stopped with status %s and error: %s\n" , status . String ( ) , err . Error ( ) )
}
log . Println ( "Exiting..." )
}