retrofitted schedulers to use base.go and to log the scheduling trace. Changed the name of piston capper to binpackedpistoncapping and also changed the variable names inside binpackedpistoncapping.go to indicate the name of the scheduler.
This commit is contained in:
parent
65263426b4
commit
3b52fb3619
12 changed files with 161 additions and 318 deletions
|
@ -15,6 +15,7 @@ import (
|
|||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
"os"
|
||||
)
|
||||
|
||||
// Decides if to take an offer or not
|
||||
|
@ -29,6 +30,7 @@ func (_ *ProactiveClusterwideCapFCFS) takeOffer(offer *mesos.Offer, task def.Tas
|
|||
|
||||
// electronScheduler implements the Scheduler interface.
|
||||
type ProactiveClusterwideCapFCFS struct {
|
||||
base // Type embedded to inherit common functions
|
||||
tasksCreated int
|
||||
tasksRunning int
|
||||
tasks []def.Task
|
||||
|
@ -58,10 +60,18 @@ type ProactiveClusterwideCapFCFS struct {
|
|||
|
||||
// Controls when to shutdown pcp logging.
|
||||
PCPLog chan struct{}
|
||||
|
||||
schedTrace *log.Logger
|
||||
}
|
||||
|
||||
// New electron scheduler.
|
||||
func NewProactiveClusterwideCapFCFS(tasks []def.Task, ignoreWatts bool) *ProactiveClusterwideCapFCFS {
|
||||
func NewProactiveClusterwideCapFCFS(tasks []def.Task, ignoreWatts bool, schedTracePrefix string) *ProactiveClusterwideCapFCFS {
|
||||
|
||||
logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
s := &ProactiveClusterwideCapFCFS{
|
||||
tasks: tasks,
|
||||
ignoreWatts: ignoreWatts,
|
||||
|
@ -78,6 +88,7 @@ func NewProactiveClusterwideCapFCFS(tasks []def.Task, ignoreWatts bool) *Proacti
|
|||
recapTicker: time.NewTicker(20 * time.Second),
|
||||
isCapping: false,
|
||||
isRecapping: false,
|
||||
schedTrace: log.New(logFile, "", log.LstdFlags),
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
@ -140,17 +151,6 @@ func (s *ProactiveClusterwideCapFCFS) newTask(offer *mesos.Offer, task def.Task)
|
|||
}
|
||||
}
|
||||
|
||||
func (s *ProactiveClusterwideCapFCFS) Registered(
|
||||
_ sched.SchedulerDriver,
|
||||
frameworkID *mesos.FrameworkID,
|
||||
masterInfo *mesos.MasterInfo) {
|
||||
log.Printf("Framework %s registered with master %s", frameworkID, masterInfo)
|
||||
}
|
||||
|
||||
func (s *ProactiveClusterwideCapFCFS) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) {
|
||||
log.Printf("Framework re-registered with master %s", masterInfo)
|
||||
}
|
||||
|
||||
func (s *ProactiveClusterwideCapFCFS) Disconnected(sched.SchedulerDriver) {
|
||||
// Need to stop the capping process.
|
||||
s.ticker.Stop()
|
||||
|
@ -275,7 +275,8 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive
|
|||
*/
|
||||
taken := false
|
||||
|
||||
for i, task := range s.tasks {
|
||||
for i := 0; i < len(s.tasks); i++ {
|
||||
task := s.tasks[i]
|
||||
// Don't take offer if it doesn't match our task's host requirement.
|
||||
if !strings.HasPrefix(*offer.Hostname, task.Host) {
|
||||
continue
|
||||
|
@ -302,9 +303,11 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive
|
|||
log.Println(err)
|
||||
}
|
||||
log.Printf("Starting on [%s]\n", offer.GetHostname())
|
||||
toSchedule := []*mesos.TaskInfo{s.newTask(offer, task)}
|
||||
taskToSchedule := s.newTask(offer, task)
|
||||
toSchedule := []*mesos.TaskInfo{taskToSchedule}
|
||||
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, toSchedule, defaultFilter)
|
||||
log.Printf("Inst: %d", *task.Instances)
|
||||
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
|
||||
*task.Instances--
|
||||
if *task.Instances <= 0 {
|
||||
// All instances of the task have been scheduled. Need to remove it from the list of tasks to schedule.
|
||||
|
@ -384,27 +387,3 @@ func (s *ProactiveClusterwideCapFCFS) StatusUpdate(driver sched.SchedulerDriver,
|
|||
log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
|
||||
}
|
||||
|
||||
func (s *ProactiveClusterwideCapFCFS) FrameworkMessage(driver sched.SchedulerDriver,
|
||||
executorID *mesos.ExecutorID,
|
||||
slaveID *mesos.SlaveID,
|
||||
message string) {
|
||||
|
||||
log.Println("Getting a framework message: ", message)
|
||||
log.Printf("Received a framework message from some unknown source: %s", *executorID.Value)
|
||||
}
|
||||
|
||||
func (s *ProactiveClusterwideCapFCFS) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) {
|
||||
log.Printf("Offer %s rescinded", offerID)
|
||||
}
|
||||
|
||||
func (s *ProactiveClusterwideCapFCFS) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) {
|
||||
log.Printf("Slave %s lost", slaveID)
|
||||
}
|
||||
|
||||
func (s *ProactiveClusterwideCapFCFS) ExecutorLost(_ sched.SchedulerDriver, executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, status int) {
|
||||
log.Printf("Executor %s on slave %s was lost", executorID, slaveID)
|
||||
}
|
||||
|
||||
func (s *ProactiveClusterwideCapFCFS) Error(_ sched.SchedulerDriver, err string) {
|
||||
log.Printf("Receiving an error: %s", err)
|
||||
}
|
||||
|
|
Reference in a new issue