2016-11-10 19:59:20 -05:00
package schedulers
import (
"bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/rapl"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler"
"log"
"strings"
"sync"
"time"
2016-11-10 21:18:05 -05:00
"math"
2016-11-10 19:59:20 -05:00
)
// electronScheduler implements the Scheduler interface.
type ProactiveClusterwideCapFCFS struct {
tasksCreated int
tasksRunning int
tasks [ ] def . Task
metrics map [ string ] def . Metric
running map [ string ] map [ string ] bool
ignoreWatts bool
capper * clusterwideCapper
ticker * time . Ticker
isCapping bool
// First set of PCP values are garbage values, signal to logger to start recording when we're
// about to schedule the new task.
RecordPCP bool
// This channel is closed when the program receives an interrupt,
// signalling that the program should shut down.
Shutdown chan struct { }
// This channel is closed after shutdown is closed, and only when all
// outstanding tasks have been cleaned up.
Done chan struct { }
// Controls when to shutdown pcp logging.
PCPLog chan struct { }
}
// New electron scheduler.
func NewProactiveClusterwideCapFCFS ( tasks [ ] def . Task , ignoreWatts bool ) * ProactiveClusterwideCapFCFS {
s := & ProactiveClusterwideCapFCFS {
tasks : tasks ,
ignoreWatts : ignoreWatts ,
Shutdown : make ( chan struct { } ) ,
Done : make ( chan struct { } ) ,
PCPLog : make ( chan struct { } ) ,
2016-11-10 21:18:05 -05:00
running : make ( map [ string ] map [ string ] bool ) ,
2016-11-10 19:59:20 -05:00
RecordPCP : false ,
capper : getClusterwideCapperInstance ( ) ,
2016-11-10 21:18:05 -05:00
ticker : time . NewTicker ( 10 * time . Second ) ,
isCapping : false ,
2016-11-10 19:59:20 -05:00
}
return s
}
func ( s * ProactiveClusterwideCapFCFS ) newTask ( offer * mesos . Offer , task def . Task ) * mesos . TaskInfo {
taskName := fmt . Sprintf ( "%s-%d" , task . Name , * task . Instances )
s . tasksCreated ++
if ! s . RecordPCP {
// Turn on logging.
s . RecordPCP = true
time . Sleep ( 1 * time . Second ) // Make sure we're recording by the time the first task starts
}
// If this is our first time running into this Agent
if _ , ok := s . running [ offer . GetSlaveId ( ) . GoString ( ) ] ; ! ok {
s . running [ offer . GetSlaveId ( ) . GoString ( ) ] = make ( map [ string ] bool )
}
// Setting the task ID to the task. This is done so that we can consider each task to be different,
// even though they have the same parameters.
2016-11-10 21:18:05 -05:00
task . SetTaskID ( * proto . String ( taskName ) )
2016-11-10 19:59:20 -05:00
// Add task to the list of tasks running on the node.
s . running [ offer . GetSlaveId ( ) . GoString ( ) ] [ taskName ] = true
resources := [ ] * mesos . Resource {
mesosutil . NewScalarResource ( "cpus" , task . CPU ) ,
mesosutil . NewScalarResource ( "mem" , task . RAM ) ,
}
if ! s . ignoreWatts {
resources = append ( resources , mesosutil . NewScalarResource ( "watts" , task . Watts ) )
}
return & mesos . TaskInfo {
Name : proto . String ( taskName ) ,
TaskId : & mesos . TaskID {
Value : proto . String ( "electron-" + taskName ) ,
} ,
SlaveId : offer . SlaveId ,
Resources : resources ,
Command : & mesos . CommandInfo {
Value : proto . String ( task . CMD ) ,
} ,
Container : & mesos . ContainerInfo {
Type : mesos . ContainerInfo_DOCKER . Enum ( ) ,
Docker : & mesos . ContainerInfo_DockerInfo {
Image : proto . String ( task . Image ) ,
Network : mesos . ContainerInfo_DockerInfo_BRIDGE . Enum ( ) , // Run everything isolated
} ,
} ,
}
}
func ( s * ProactiveClusterwideCapFCFS ) Registered (
_ sched . SchedulerDriver ,
2016-11-10 21:18:05 -05:00
frameworkID * mesos . FrameworkID ,
2016-11-10 19:59:20 -05:00
masterInfo * mesos . MasterInfo ) {
log . Printf ( "Framework %s registered with master %s" , frameworkID , masterInfo )
}
func ( s * ProactiveClusterwideCapFCFS ) Reregistered ( _ sched . SchedulerDriver , masterInfo * mesos . MasterInfo ) {
log . Printf ( "Framework re-registered with master %s" , masterInfo )
}
func ( s * ProactiveClusterwideCapFCFS ) Disconnected ( sched . SchedulerDriver ) {
log . Println ( "Framework disconnected with master" )
}
// go routine to cap the entire cluster in regular intervals of time.
2016-11-10 21:18:05 -05:00
var currentCapValue = 0.0 // initial value to indicate that we haven't capped the cluster yet.
func ( s * ProactiveClusterwideCapFCFS ) startCapping ( mutex sync . Mutex ) {
2016-11-10 19:59:20 -05:00
go func ( ) {
2016-11-10 21:18:05 -05:00
for {
select {
case <- s . ticker . C :
// Need to cap the cluster to the currentCapValue.
if currentCapValue > 0.0 {
mutex . Lock ( )
for _ , host := range constants . Hosts {
if err := rapl . Cap ( host , "rapl" , int ( math . Floor ( currentCapValue + 0.5 ) ) ) ; err != nil {
fmt . Println ( err )
} else {
fmt . Printf ( "Successfully capped %s to %d\\% at %\n" , host , currentCapValue )
}
}
mutex . Unlock ( )
2016-11-10 19:59:20 -05:00
}
}
}
2016-11-10 21:18:05 -05:00
} ( )
2016-11-10 19:59:20 -05:00
}
// TODO: Need to reduce the time complexity: looping over offers twice (Possible to do it just once?).
func ( s * ProactiveClusterwideCapFCFS ) ResourceOffers ( driver sched . SchedulerDriver , offers [ ] * mesos . Offer ) {
log . Printf ( "Received %d resource offers" , len ( offers ) )
// retrieving the available power for all the hosts in the offers.
available_power := make ( map [ string ] float64 )
for _ , offer := range offers {
_ , _ , offer_watts := OfferAgg ( offer )
2016-11-10 21:18:05 -05:00
available_power [ * offer . Hostname ] = offer_watts
2016-11-10 19:59:20 -05:00
}
for _ , offer := range offers {
select {
2016-11-10 21:18:05 -05:00
case <- s . Shutdown :
2016-11-10 19:59:20 -05:00
log . Println ( "Done scheduling tasks: declining offerf on [" , offer . GetHostname ( ) , "]" )
driver . DeclineOffer ( offer . Id , longFilter )
log . Println ( "Number og tasks still running: " , s . tasksRunning )
continue
default :
}
/ *
Clusterwide Capping strategy
For each task in s . tasks ,
1. I need to check whether the mesos offer can be taken or not ( based on CPU and RAM ) .
2. If the tasks fits the offer then I need to detemrine the cluster wide cap .
3. First need to cap the cluster to the determine cap value and then launch the task on the host corresponding to the offer .
Capping the cluster for every task would create a lot of overhead . Hence , clusterwide capping is performed at regular intervals .
TODO : We can choose to cap the cluster only if the clusterwide cap varies more than the current clusterwide cap .
Although this sounds like a better approach , it only works when the resource requirements of neighbouring tasks are similar .
* /
offer_cpu , offer_ram , _ := OfferAgg ( offer )
taken := false
var mutex sync . Mutex
2016-11-10 21:18:05 -05:00
// If haven't started cluster wide capping then doing so,
if ! s . isCapping {
s . startCapping ( mutex )
s . isCapping = true
}
2016-11-10 19:59:20 -05:00
for _ , task := range s . tasks {
// Don't take offer if it doesn't match our task's host requirement.
if ! strings . HasPrefix ( * offer . Hostname , task . Host ) {
continue
}
// Does the task fit.
2016-11-10 21:18:05 -05:00
if ( s . ignoreWatts || offer_cpu >= task . CPU || offer_ram >= task . RAM ) {
2016-11-10 19:59:20 -05:00
taken = true
mutex . Lock ( )
2016-11-10 21:18:05 -05:00
tempCap , err := s . capper . fcfsDetermineCap ( available_power , & task )
2016-11-10 19:59:20 -05:00
if err == nil {
currentCapValue = tempCap
} else {
2016-11-10 21:18:05 -05:00
fmt . Printf ( "Failed to determine cluster wide cap: " )
fmt . Println ( err )
2016-11-10 19:59:20 -05:00
}
mutex . Unlock ( )
fmt . Printf ( "Starting on [%s]\n" , offer . GetHostname ( ) )
2016-11-10 21:18:05 -05:00
to_schedule := [ ] * mesos . TaskInfo { s . newTask ( offer , task ) }
driver . LaunchTasks ( [ ] * mesos . OfferID { offer . Id } , to_schedule , defaultFilter )
2016-11-10 19:59:20 -05:00
} else {
// Task doesn't fit the offer. Move onto the next offer.
}
}
// If no task fit the offer, then declining the offer.
if ! taken {
fmt . Println ( "There is not enough resources to launch a task:" )
cpus , mem , watts := OfferAgg ( offer )
log . Printf ( "<CPU: %f, RAM: %f, Watts: %f>\n" , cpus , mem , watts )
driver . DeclineOffer ( offer . Id , defaultFilter )
}
}
}
func ( s * ProactiveClusterwideCapFCFS ) StatusUpdate ( driver sched . SchedulerDriver , status * mesos . TaskStatus ) {
log . Printf ( "Received task status [%s] for task [%s]" , NameFor ( status . State ) , * status . TaskId . Value )
if * status . State == mesos . TaskState_TASK_RUNNING {
s . tasksRunning ++
} else if IsTerminal ( status . State ) {
delete ( s . running [ status . GetSlaveId ( ) . GoString ( ) ] , * status . TaskId . Value )
// Need to remove the task from the window of tasks.
2016-11-10 21:18:05 -05:00
s . capper . taskFinished ( * status . TaskId . Value )
2016-11-10 19:59:20 -05:00
s . tasksRunning --
if s . tasksRunning == 0 {
select {
case <- s . Shutdown :
close ( s . Done )
default :
}
}
}
log . Printf ( "DONE: Task status [%s] for task [%s]" , NameFor ( status . State ) , * status . TaskId . Value )
}
func ( s * ProactiveClusterwideCapFCFS ) FrameworkMessage ( driver sched . SchedulerDriver ,
executorID * mesos . ExecutorID ,
slaveID * mesos . SlaveID ,
message string ) {
log . Println ( "Getting a framework message: " , message )
log . Printf ( "Received a framework message from some unknown source: %s" , * executorID . Value )
}
func ( s * ProactiveClusterwideCapFCFS ) OfferRescinded ( _ sched . SchedulerDriver , offerID * mesos . OfferID ) {
log . Printf ( "Offer %s rescinded" , offerID )
}
func ( s * ProactiveClusterwideCapFCFS ) SlaveLost ( _ sched . SchedulerDriver , slaveID * mesos . SlaveID ) {
log . Printf ( "Slave %s lost" , slaveID )
}
func ( s * ProactiveClusterwideCapFCFS ) ExecutorLost ( _ sched . SchedulerDriver , executorID * mesos . ExecutorID , slaveID * mesos . SlaveID , status int ) {
log . Printf ( "Executor %s on slave %s was lost" , executorID , slaveID )
}
func ( s * ProactiveClusterwideCapFCFS ) Error ( _ sched . SchedulerDriver , err string ) {
log . Printf ( "Receiving an error: %s" , err )
}