From 3b52fb36199ebbf5e8912de0c3e0c8b61c11122e Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Sat, 14 Jan 2017 19:44:50 -0500 Subject: [PATCH] retrofitted schedulers to use base.go and to log the scheduling trace. Changed the name of piston capper to binpackedpistoncapping and also changed the variable names inside binpackedpistoncapping.go to indicate the name of the scheduler. --- ...toncapper.go => binpackedpistoncapping.go} | 122 ++++++++---------- schedulers/binpacksortedwatts.go | 58 +++------ schedulers/bpswClassMapWatts.go | 3 +- schedulers/bpswClassMapWattsPistonCapping.go | 4 +- schedulers/bpswClassMapWattsProacCC.go | 3 +- schedulers/firstfit.go | 58 +++------ .../firstfitSortedWattsClassMapWatts.go | 3 +- ...firstfitSortedWattsClassMapWattsProacCC.go | 3 +- schedulers/firstfitsortedwatts.go | 57 +++----- schedulers/firstfitwattsonly.go | 58 +++------ schedulers/proactiveclusterwidecappingfcfs.go | 55 +++----- .../proactiveclusterwidecappingranked.go | 55 +++----- 12 files changed, 161 insertions(+), 318 deletions(-) rename schedulers/{pistoncapper.go => binpackedpistoncapping.go} (77%) diff --git a/schedulers/pistoncapper.go b/schedulers/binpackedpistoncapping.go similarity index 77% rename from schedulers/pistoncapper.go rename to schedulers/binpackedpistoncapping.go index d5a22d1..2ed96f4 100644 --- a/schedulers/pistoncapper.go +++ b/schedulers/binpackedpistoncapping.go @@ -12,6 +12,7 @@ import ( sched "github.com/mesos/mesos-go/scheduler" "log" "math" + "os" "strings" "sync" "time" @@ -23,7 +24,8 @@ import ( This is basically extending the BinPacking algorithm to also cap each node at a different values, corresponding to the load on that node. */ -type PistonCapper struct { +type BinPackedPistonCapper struct { + base // Type embedded to inherit common functions tasksCreated int tasksRunning int tasks []def.Task @@ -49,11 +51,19 @@ type PistonCapper struct { // Controls when to shutdown pcp logging. PCPLog chan struct{} + + schedTrace *log.Logger } // New electron scheduler. -func NewPistonCapper(tasks []def.Task, ignoreWatts bool) *PistonCapper { - s := &PistonCapper{ +func NewBinPackedPistonCapper(tasks []def.Task, ignoreWatts bool, schedTracePrefix string) *BinPackedPistonCapper { + + logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") + if err != nil { + log.Fatal(err) + } + + s := &BinPackedPistonCapper{ tasks: tasks, ignoreWatts: ignoreWatts, Shutdown: make(chan struct{}), @@ -65,12 +75,13 @@ func NewPistonCapper(tasks []def.Task, ignoreWatts bool) *PistonCapper { RecordPCP: false, ticker: time.NewTicker(5 * time.Second), isCapping: false, + schedTrace: log.New(logFile, "", log.LstdFlags), } return s } // check whether task fits the offer or not. -func (s *PistonCapper) takeOffer(offerWatts float64, offerCPU float64, offerRAM float64, +func (s *BinPackedPistonCapper) takeOffer(offerWatts float64, offerCPU float64, offerRAM float64, totalWatts float64, totalCPU float64, totalRAM float64, task def.Task) bool { if (s.ignoreWatts || (offerWatts >= (totalWatts + task.Watts))) && (offerCPU >= (totalCPU + task.CPU)) && @@ -82,9 +93,9 @@ func (s *PistonCapper) takeOffer(offerWatts float64, offerCPU float64, offerRAM } // mutex -var mutex sync.Mutex +var bpPistonMutex sync.Mutex -func (s *PistonCapper) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { +func (s *BinPackedPistonCapper) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) s.tasksCreated++ @@ -140,45 +151,39 @@ func (s *PistonCapper) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInf } } -func (s *PistonCapper) Registered( - _ sched.SchedulerDriver, - frameworkID *mesos.FrameworkID, - masterInfo *mesos.MasterInfo) { - log.Printf("Framework %s registered with master %s", frameworkID, masterInfo) -} - -func (s *PistonCapper) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) { - log.Printf("Framework re-registered with master %s", masterInfo) -} - -func (s *PistonCapper) Disconnected(sched.SchedulerDriver) { +func (s *BinPackedPistonCapper) Disconnected(sched.SchedulerDriver) { + // Need to stop the capping process + s.ticker.Stop() + bpPistonMutex.Lock() + s.isCapping = false + bpPistonMutex.Unlock() log.Println("Framework disconnected with master") } // go routine to cap the each node in the cluster at regular intervals of time. -var capValues = make(map[string]float64) +var bpPistonCapValues = make(map[string]float64) // Storing the previous cap value for each host so as to not repeatedly cap the nodes to the same value. (reduces overhead) -var previousRoundedCapValues = make(map[string]int) +var bpPistonPreviousRoundedCapValues = make(map[string]int) -func (s *PistonCapper) startCapping() { +func (s *BinPackedPistonCapper) startCapping() { go func() { for { select { case <-s.ticker.C: // Need to cap each node - mutex.Lock() - for host, capValue := range capValues { + bpPistonMutex.Lock() + for host, capValue := range bpPistonCapValues { roundedCapValue := int(math.Floor(capValue + 0.5)) // has the cap value changed - if prevRoundedCap, ok := previousRoundedCapValues[host]; ok { + if prevRoundedCap, ok := bpPistonPreviousRoundedCapValues[host]; ok { if prevRoundedCap != roundedCapValue { if err := rapl.Cap(host, "rapl", roundedCapValue); err != nil { log.Println(err) } else { log.Printf("Capped [%s] at %d", host, int(math.Floor(capValue+0.5))) } - previousRoundedCapValues[host] = roundedCapValue + bpPistonPreviousRoundedCapValues[host] = roundedCapValue } } else { if err := rapl.Cap(host, "rapl", roundedCapValue); err != nil { @@ -186,27 +191,27 @@ func (s *PistonCapper) startCapping() { } else { log.Printf("Capped [%s] at %d", host, int(math.Floor(capValue+0.5))) } - previousRoundedCapValues[host] = roundedCapValue + bpPistonPreviousRoundedCapValues[host] = roundedCapValue } } - mutex.Unlock() + bpPistonMutex.Unlock() } } }() } // Stop the capping -func (s *PistonCapper) stopCapping() { +func (s *BinPackedPistonCapper) stopCapping() { if s.isCapping { log.Println("Stopping the capping.") s.ticker.Stop() - mutex.Lock() + bpPistonMutex.Lock() s.isCapping = false - mutex.Unlock() + bpPistonMutex.Unlock() } } -func (s *PistonCapper) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { +func (s *BinPackedPistonCapper) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { log.Printf("Received %d resource offers", len(offers)) // retrieving the total power for each host in the offers @@ -249,7 +254,8 @@ func (s *PistonCapper) ResourceOffers(driver sched.SchedulerDriver, offers []*me // Store the partialLoad for host corresponding to this offer. // Once we can't fit any more tasks, we update capValue for this host with partialLoad and then launch the fit tasks. partialLoad := 0.0 - for i, task := range s.tasks { + for i := 0; i < len(s.tasks); i++ { + task := s.tasks[i] // Check host if it exists if task.Host != "" { // Don't take offer if it doens't match our task's host requirement. @@ -274,9 +280,11 @@ func (s *PistonCapper) ResourceOffers(driver sched.SchedulerDriver, offers []*me totalRAM += task.RAM log.Println("Co-Located with: ") coLocated(s.running[offer.GetSlaveId().GoString()]) - fitTasks = append(fitTasks, s.newTask(offer, task)) + taskToSchedule := s.newTask(offer, task) + fitTasks = append(fitTasks, taskToSchedule) log.Println("Inst: ", *task.Instances) + s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) *task.Instances-- // updating the cap value for offer.Hostname partialLoad += ((task.Watts * constants.CapMargin) / s.totalPower[*offer.Hostname]) * 100 @@ -297,9 +305,9 @@ func (s *PistonCapper) ResourceOffers(driver sched.SchedulerDriver, offers []*me if taken { // Updating the cap value for offer.Hostname - mutex.Lock() - capValues[*offer.Hostname] += partialLoad - mutex.Unlock() + bpPistonMutex.Lock() + bpPistonCapValues[*offer.Hostname] += partialLoad + bpPistonMutex.Unlock() log.Printf("Starting on [%s]\n", offer.GetHostname()) driver.LaunchTasks([]*mesos.OfferID{offer.Id}, fitTasks, defaultFilter) } else { @@ -314,7 +322,7 @@ func (s *PistonCapper) ResourceOffers(driver sched.SchedulerDriver, offers []*me } // Remove finished task from the taskMonitor -func (s *PistonCapper) deleteFromTaskMonitor(finishedTaskID string) (def.Task, string, error) { +func (s *BinPackedPistonCapper) deleteFromTaskMonitor(finishedTaskID string) (def.Task, string, error) { hostOfFinishedTask := "" indexOfFinishedTask := -1 found := false @@ -345,13 +353,13 @@ func (s *PistonCapper) deleteFromTaskMonitor(finishedTaskID string) (def.Task, s return finishedTask, hostOfFinishedTask, nil } -func (s *PistonCapper) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { +func (s *BinPackedPistonCapper) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { log.Printf("Received task status [%s] for task [%s]\n", NameFor(status.State), *status.TaskId.Value) if *status.State == mesos.TaskState_TASK_RUNNING { - mutex.Lock() + bpPistonMutex.Lock() s.tasksRunning++ - mutex.Unlock() + bpPistonMutex.Unlock() } else if IsTerminal(status.State) { delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) // Deleting the task from the taskMonitor @@ -361,14 +369,14 @@ func (s *PistonCapper) StatusUpdate(driver sched.SchedulerDriver, status *mesos. } // Need to update the cap values for host of the finishedTask - mutex.Lock() - capValues[hostOfFinishedTask] -= ((finishedTask.Watts * constants.CapMargin) / s.totalPower[hostOfFinishedTask]) * 100 + bpPistonMutex.Lock() + bpPistonCapValues[hostOfFinishedTask] -= ((finishedTask.Watts * constants.CapMargin) / s.totalPower[hostOfFinishedTask]) * 100 // Checking to see if the cap value has become 0, in which case we uncap the host. - if int(math.Floor(capValues[hostOfFinishedTask]+0.5)) == 0 { - capValues[hostOfFinishedTask] = 100 + if int(math.Floor(bpPistonCapValues[hostOfFinishedTask]+0.5)) == 0 { + bpPistonCapValues[hostOfFinishedTask] = 100 } s.tasksRunning-- - mutex.Unlock() + bpPistonMutex.Unlock() if s.tasksRunning == 0 { select { @@ -381,27 +389,3 @@ func (s *PistonCapper) StatusUpdate(driver sched.SchedulerDriver, status *mesos. } log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) } - -func (s *PistonCapper) FrameworkMessage( - driver sched.SchedulerDriver, - executorID *mesos.ExecutorID, - slaveID *mesos.SlaveID, - message string) { - - log.Println("Getting a framework message: ", message) - log.Printf("Received a framework message from some unknown source: %s", *executorID.Value) -} - -func (s *PistonCapper) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) { - log.Printf("Offer %s rescinded", offerID) -} -func (s *PistonCapper) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) { - log.Printf("Slave %s lost", slaveID) -} -func (s *PistonCapper) ExecutorLost(_ sched.SchedulerDriver, executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, status int) { - log.Printf("Executor %s on slave %s was lost", executorID, slaveID) -} - -func (s *PistonCapper) Error(_ sched.SchedulerDriver, err string) { - log.Printf("Receiving an error: %s", err) -} diff --git a/schedulers/binpacksortedwatts.go b/schedulers/binpacksortedwatts.go index b05a3e3..f8c43ef 100644 --- a/schedulers/binpacksortedwatts.go +++ b/schedulers/binpacksortedwatts.go @@ -11,6 +11,7 @@ import ( "sort" "strings" "time" + "os" ) // Decides if to take an offer or not @@ -28,6 +29,7 @@ func (*BinPackSortedWatts) takeOffer(offer *mesos.Offer, task def.Task) bool { } type BinPackSortedWatts struct { + base // Type embedded to inherit common functions tasksCreated int tasksRunning int tasks []def.Task @@ -48,12 +50,19 @@ type BinPackSortedWatts struct { // Controls when to shutdown pcp logging PCPLog chan struct{} + + schedTrace *log.Logger } // New electron scheduler -func NewBinPackSortedWatts(tasks []def.Task, ignoreWatts bool) *BinPackSortedWatts { +func NewBinPackSortedWatts(tasks []def.Task, ignoreWatts bool, schedTracePrefix string) *BinPackSortedWatts { sort.Sort(def.WattsSorter(tasks)) + logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") + if err != nil { + log.Fatal(err) + } + s := &BinPackSortedWatts{ tasks: tasks, ignoreWatts: ignoreWatts, @@ -62,6 +71,7 @@ func NewBinPackSortedWatts(tasks []def.Task, ignoreWatts bool) *BinPackSortedWat PCPLog: make(chan struct{}), running: make(map[string]map[string]bool), RecordPCP: false, + schedTrace: log.New(logFile, "", log.LstdFlags), } return s } @@ -113,21 +123,6 @@ func (s *BinPackSortedWatts) newTask(offer *mesos.Offer, task def.Task) *mesos.T } } -func (s *BinPackSortedWatts) Registered( - _ sched.SchedulerDriver, - frameworkID *mesos.FrameworkID, - masterInfo *mesos.MasterInfo) { - log.Printf("Framework %s registered with master %s", frameworkID, masterInfo) -} - -func (s *BinPackSortedWatts) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) { - log.Printf("Framework re-registered with master %s", masterInfo) -} - -func (s *BinPackSortedWatts) Disconnected(sched.SchedulerDriver) { - log.Println("Framework disconnected with master") -} - func (s *BinPackSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { log.Printf("Received %d resource offers", len(offers)) @@ -150,7 +145,8 @@ func (s *BinPackSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offers totalWatts := 0.0 totalCPU := 0.0 totalRAM := 0.0 - for i, task := range s.tasks { + for i := 0; i < len(s.tasks); i++ { + task := s.tasks[i] // Check host if it exists if task.Host != "" { @@ -172,9 +168,11 @@ func (s *BinPackSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offers totalRAM += task.RAM log.Println("Co-Located with: ") coLocated(s.running[offer.GetSlaveId().GoString()]) - tasks = append(tasks, s.newTask(offer, task)) + taskToSchedule := s.newTask(offer, task) + tasks = append(tasks, taskToSchedule) fmt.Println("Inst: ", *task.Instances) + s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) *task.Instances-- if *task.Instances <= 0 { @@ -225,27 +223,3 @@ func (s *BinPackSortedWatts) StatusUpdate(driver sched.SchedulerDriver, status * } log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) } - -func (s *BinPackSortedWatts) FrameworkMessage( - driver sched.SchedulerDriver, - executorID *mesos.ExecutorID, - slaveID *mesos.SlaveID, - message string) { - - log.Println("Getting a framework message: ", message) - log.Printf("Received a framework message from some unknown source: %s", *executorID.Value) -} - -func (s *BinPackSortedWatts) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) { - log.Printf("Offer %s rescinded", offerID) -} -func (s *BinPackSortedWatts) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) { - log.Printf("Slave %s lost", slaveID) -} -func (s *BinPackSortedWatts) ExecutorLost(_ sched.SchedulerDriver, executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, status int) { - log.Printf("Executor %s on slave %s was lost", executorID, slaveID) -} - -func (s *BinPackSortedWatts) Error(_ sched.SchedulerDriver, err string) { - log.Printf("Receiving an error: %s", err) -} diff --git a/schedulers/bpswClassMapWatts.go b/schedulers/bpswClassMapWatts.go index f3167ac..1196459 100644 --- a/schedulers/bpswClassMapWatts.go +++ b/schedulers/bpswClassMapWatts.go @@ -145,7 +145,8 @@ func (s *BPSWClassMapWatts) ResourceOffers(driver sched.SchedulerDriver, offers totalWatts := 0.0 totalCPU := 0.0 totalRAM := 0.0 - for i, task := range s.tasks { + for i := 0; i < len(s.tasks); i++ { + task := s.tasks[i] // Check host if it exists if task.Host != "" { diff --git a/schedulers/bpswClassMapWattsPistonCapping.go b/schedulers/bpswClassMapWattsPistonCapping.go index 5eeb413..cae8cc3 100644 --- a/schedulers/bpswClassMapWattsPistonCapping.go +++ b/schedulers/bpswClassMapWattsPistonCapping.go @@ -247,8 +247,8 @@ func (s *BPSWClassMapWattsPistonCapping) ResourceOffers(driver sched.SchedulerDr // Store the partialLoad for host corresponding to this offer // Once we can't fit any more tasks, we update the capValue for this host with partialLoad and then launch the fitted tasks. partialLoad := 0.0 - for i, task := range s.tasks { - + for i := 0; i < len(s.tasks); i++ { + task := s.tasks[i] // Check host if it exists if task.Host != "" { // Don't take offer if it doesn't match our task's host requirement diff --git a/schedulers/bpswClassMapWattsProacCC.go b/schedulers/bpswClassMapWattsProacCC.go index 4c4506a..ddcadc1 100644 --- a/schedulers/bpswClassMapWattsProacCC.go +++ b/schedulers/bpswClassMapWattsProacCC.go @@ -282,7 +282,8 @@ func (s *BPSWClassMapWattsProacCC) ResourceOffers(driver sched.SchedulerDriver, totalWatts := 0.0 totalCPU := 0.0 totalRAM := 0.0 - for i, task := range s.tasks { + for i := 0; i < len(s.tasks); i++ { + task := s.tasks[i] // Check host if it exists if task.Host != "" { // Don't take offer it it doesn't match our task's host requirement. diff --git a/schedulers/firstfit.go b/schedulers/firstfit.go index e426ab1..d7819de 100644 --- a/schedulers/firstfit.go +++ b/schedulers/firstfit.go @@ -10,6 +10,7 @@ import ( "log" "strings" "time" + "os" ) // Decides if to take an offer or not @@ -28,6 +29,7 @@ func (s *FirstFit) takeOffer(offer *mesos.Offer, task def.Task) bool { // electronScheduler implements the Scheduler interface type FirstFit struct { + base // Type embedded to inherit common functions tasksCreated int tasksRunning int tasks []def.Task @@ -48,10 +50,17 @@ type FirstFit struct { // Controls when to shutdown pcp logging PCPLog chan struct{} + + schedTrace *log.Logger } // New electron scheduler -func NewFirstFit(tasks []def.Task, ignoreWatts bool) *FirstFit { +func NewFirstFit(tasks []def.Task, ignoreWatts bool, schedTracePrefix string) *FirstFit { + + logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") + if err != nil { + log.Fatal(err) + } s := &FirstFit{ tasks: tasks, @@ -61,6 +70,7 @@ func NewFirstFit(tasks []def.Task, ignoreWatts bool) *FirstFit { PCPLog: make(chan struct{}), running: make(map[string]map[string]bool), RecordPCP: false, + schedTrace: log.New(logFile, "", log.LstdFlags), } return s } @@ -112,21 +122,6 @@ func (s *FirstFit) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { } } -func (s *FirstFit) Registered( - _ sched.SchedulerDriver, - frameworkID *mesos.FrameworkID, - masterInfo *mesos.MasterInfo) { - log.Printf("Framework %s registered with master %s", frameworkID, masterInfo) -} - -func (s *FirstFit) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) { - log.Printf("Framework re-registered with master %s", masterInfo) -} - -func (s *FirstFit) Disconnected(sched.SchedulerDriver) { - log.Println("Framework disconnected with master") -} - func (s *FirstFit) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { log.Printf("Received %d resource offers", len(offers)) @@ -146,7 +141,8 @@ func (s *FirstFit) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos. // First fit strategy taken := false - for i, task := range s.tasks { + for i := 0; i < len(s.tasks); i++ { + task := s.tasks[i] // Check host if it exists if task.Host != "" { @@ -162,7 +158,8 @@ func (s *FirstFit) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos. log.Println("Co-Located with: ") coLocated(s.running[offer.GetSlaveId().GoString()]) - tasks = append(tasks, s.newTask(offer, task)) + taskToSchedule := s.newTask(offer, task) + tasks = append(tasks, taskToSchedule) log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) @@ -170,6 +167,7 @@ func (s *FirstFit) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos. taken = true fmt.Println("Inst: ", *task.Instances) + s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) *task.Instances-- if *task.Instances <= 0 { @@ -216,27 +214,3 @@ func (s *FirstFit) StatusUpdate(driver sched.SchedulerDriver, status *mesos.Task } log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) } - -func (s *FirstFit) FrameworkMessage( - driver sched.SchedulerDriver, - executorID *mesos.ExecutorID, - slaveID *mesos.SlaveID, - message string) { - - log.Println("Getting a framework message: ", message) - log.Printf("Received a framework message from some unknown source: %s", *executorID.Value) -} - -func (s *FirstFit) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) { - log.Printf("Offer %s rescinded", offerID) -} -func (s *FirstFit) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) { - log.Printf("Slave %s lost", slaveID) -} -func (s *FirstFit) ExecutorLost(_ sched.SchedulerDriver, executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, status int) { - log.Printf("Executor %s on slave %s was lost", executorID, slaveID) -} - -func (s *FirstFit) Error(_ sched.SchedulerDriver, err string) { - log.Printf("Receiving an error: %s", err) -} diff --git a/schedulers/firstfitSortedWattsClassMapWatts.go b/schedulers/firstfitSortedWattsClassMapWatts.go index c4b891f..4a03d89 100644 --- a/schedulers/firstfitSortedWattsClassMapWatts.go +++ b/schedulers/firstfitSortedWattsClassMapWatts.go @@ -128,7 +128,8 @@ func (s *FirstFitSortedWattsClassMapWatts) ResourceOffers(driver sched.Scheduler // First fit strategy taken := false - for i, task := range s.tasks { + for i := 0; i < len(s.tasks); i++ { + task := s.tasks[i] // Check host if it exists if task.Host != "" { // Don't take offer if it doens't match our task's host requirement. diff --git a/schedulers/firstfitSortedWattsClassMapWattsProacCC.go b/schedulers/firstfitSortedWattsClassMapWattsProacCC.go index 14aba6e..3cc9fb9 100644 --- a/schedulers/firstfitSortedWattsClassMapWattsProacCC.go +++ b/schedulers/firstfitSortedWattsClassMapWattsProacCC.go @@ -266,7 +266,8 @@ func (s *FirstFitSortedWattsClassMapWattsProacCC) ResourceOffers(driver sched.Sc // First fit strategy taken := false - for i, task := range s.tasks { + for i := 0; i < len(s.tasks); i++ { + task := s.tasks[i] // Check host if it exists if task.Host != "" { // Don't take offer if it doens't match our task's host requirement. diff --git a/schedulers/firstfitsortedwatts.go b/schedulers/firstfitsortedwatts.go index 9067e1c..fbd740c 100644 --- a/schedulers/firstfitsortedwatts.go +++ b/schedulers/firstfitsortedwatts.go @@ -11,6 +11,7 @@ import ( "sort" "strings" "time" + "os" ) // Decides if to take an offer or not @@ -29,6 +30,7 @@ func (s *FirstFitSortedWatts) takeOffer(offer *mesos.Offer, task def.Task) bool // electronScheduler implements the Scheduler interface type FirstFitSortedWatts struct { + base // Type embedded to inherit common functions tasksCreated int tasksRunning int tasks []def.Task @@ -49,13 +51,20 @@ type FirstFitSortedWatts struct { // Controls when to shutdown pcp logging PCPLog chan struct{} + + schedTrace *log.Logger } // New electron scheduler -func NewFirstFitSortedWatts(tasks []def.Task, ignoreWatts bool) *FirstFitSortedWatts { +func NewFirstFitSortedWatts(tasks []def.Task, ignoreWatts bool, schedTracePrefix string) *FirstFitSortedWatts { sort.Sort(def.WattsSorter(tasks)) + logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") + if err != nil { + log.Fatal(err) + } + s := &FirstFitSortedWatts{ tasks: tasks, ignoreWatts: ignoreWatts, @@ -64,6 +73,7 @@ func NewFirstFitSortedWatts(tasks []def.Task, ignoreWatts bool) *FirstFitSortedW PCPLog: make(chan struct{}), running: make(map[string]map[string]bool), RecordPCP: false, + schedTrace: log.New(logFile, "", log.LstdFlags), } return s } @@ -115,21 +125,6 @@ func (s *FirstFitSortedWatts) newTask(offer *mesos.Offer, task def.Task) *mesos. } } -func (s *FirstFitSortedWatts) Registered( - _ sched.SchedulerDriver, - frameworkID *mesos.FrameworkID, - masterInfo *mesos.MasterInfo) { - log.Printf("Framework %s registered with master %s", frameworkID, masterInfo) -} - -func (s *FirstFitSortedWatts) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) { - log.Printf("Framework re-registered with master %s", masterInfo) -} - -func (s *FirstFitSortedWatts) Disconnected(sched.SchedulerDriver) { - log.Println("Framework disconnected with master") -} - func (s *FirstFitSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { log.Printf("Received %d resource offers", len(offers)) @@ -149,7 +144,8 @@ func (s *FirstFitSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offer // First fit strategy taken := false - for i, task := range s.tasks { + for i := 0; i < len(s.tasks); i++ { + task := s.tasks[i] // Check host if it exists if task.Host != "" { @@ -165,7 +161,8 @@ func (s *FirstFitSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offer log.Println("Co-Located with: ") coLocated(s.running[offer.GetSlaveId().GoString()]) - tasks = append(tasks, s.newTask(offer, task)) + taskToSchedule := s.newTask(offer, task) + tasks = append(tasks, taskToSchedule) log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) @@ -173,6 +170,7 @@ func (s *FirstFitSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offer taken = true fmt.Println("Inst: ", *task.Instances) + s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) *task.Instances-- if *task.Instances <= 0 { @@ -219,26 +217,3 @@ func (s *FirstFitSortedWatts) StatusUpdate(driver sched.SchedulerDriver, status log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) } -func (s *FirstFitSortedWatts) FrameworkMessage( - driver sched.SchedulerDriver, - executorID *mesos.ExecutorID, - slaveID *mesos.SlaveID, - message string) { - - log.Println("Getting a framework message: ", message) - log.Printf("Received a framework message from some unknown source: %s", *executorID.Value) -} - -func (s *FirstFitSortedWatts) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) { - log.Printf("Offer %s rescinded", offerID) -} -func (s *FirstFitSortedWatts) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) { - log.Printf("Slave %s lost", slaveID) -} -func (s *FirstFitSortedWatts) ExecutorLost(_ sched.SchedulerDriver, executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, status int) { - log.Printf("Executor %s on slave %s was lost", executorID, slaveID) -} - -func (s *FirstFitSortedWatts) Error(_ sched.SchedulerDriver, err string) { - log.Printf("Receiving an error: %s", err) -} diff --git a/schedulers/firstfitwattsonly.go b/schedulers/firstfitwattsonly.go index e5962a7..798a5f7 100644 --- a/schedulers/firstfitwattsonly.go +++ b/schedulers/firstfitwattsonly.go @@ -10,6 +10,7 @@ import ( "log" "strings" "time" + "os" ) // Decides if to take an offer or not @@ -27,6 +28,7 @@ func (*FirstFitWattsOnly) takeOffer(offer *mesos.Offer, task def.Task) bool { } type FirstFitWattsOnly struct { + base // Type embedded to inherit common functions tasksCreated int tasksRunning int tasks []def.Task @@ -47,10 +49,17 @@ type FirstFitWattsOnly struct { // Controls when to shutdown pcp logging PCPLog chan struct{} + + schedTrace *log.Logger } // New electron scheduler -func NewFirstFitWattsOnly(tasks []def.Task, ignoreWatts bool) *FirstFitWattsOnly { +func NewFirstFitWattsOnly(tasks []def.Task, ignoreWatts bool, schedTracePrefix string) *FirstFitWattsOnly { + + logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") + if err != nil { + log.Fatal(err) + } s := &FirstFitWattsOnly{ tasks: tasks, @@ -60,6 +69,7 @@ func NewFirstFitWattsOnly(tasks []def.Task, ignoreWatts bool) *FirstFitWattsOnly PCPLog: make(chan struct{}), running: make(map[string]map[string]bool), RecordPCP: false, + schedTrace: log.New(logFile, "", log.LstdFlags), } return s } @@ -106,21 +116,6 @@ func (s *FirstFitWattsOnly) newTask(offer *mesos.Offer, task def.Task) *mesos.Ta } } -func (s *FirstFitWattsOnly) Registered( - _ sched.SchedulerDriver, - frameworkID *mesos.FrameworkID, - masterInfo *mesos.MasterInfo) { - log.Printf("Framework %s registered with master %s", frameworkID, masterInfo) -} - -func (s *FirstFitWattsOnly) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) { - log.Printf("Framework re-registered with master %s", masterInfo) -} - -func (s *FirstFitWattsOnly) Disconnected(sched.SchedulerDriver) { - log.Println("Framework disconnected with master") -} - func (s *FirstFitWattsOnly) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { log.Printf("Received %d resource offers", len(offers)) @@ -140,7 +135,8 @@ func (s *FirstFitWattsOnly) ResourceOffers(driver sched.SchedulerDriver, offers // First fit strategy taken := false - for i, task := range s.tasks { + for i := 0; i < len(s.tasks); i++ { + task := s.tasks[i] // Check host if it exists if task.Host != "" { @@ -156,7 +152,8 @@ func (s *FirstFitWattsOnly) ResourceOffers(driver sched.SchedulerDriver, offers log.Println("Co-Located with: ") coLocated(s.running[offer.GetSlaveId().GoString()]) - tasks = append(tasks, s.newTask(offer, task)) + taskToSchedule := s.newTask(offer, task) + tasks = append(tasks, taskToSchedule) log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) @@ -164,6 +161,7 @@ func (s *FirstFitWattsOnly) ResourceOffers(driver sched.SchedulerDriver, offers taken = true fmt.Println("Inst: ", *task.Instances) + s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) *task.Instances-- if *task.Instances <= 0 { @@ -210,27 +208,3 @@ func (s *FirstFitWattsOnly) StatusUpdate(driver sched.SchedulerDriver, status *m } log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) } - -func (s *FirstFitWattsOnly) FrameworkMessage( - driver sched.SchedulerDriver, - executorID *mesos.ExecutorID, - slaveID *mesos.SlaveID, - message string) { - - log.Println("Getting a framework message: ", message) - log.Printf("Received a framework message from some unknown source: %s", *executorID.Value) -} - -func (s *FirstFitWattsOnly) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) { - log.Printf("Offer %s rescinded", offerID) -} -func (s *FirstFitWattsOnly) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) { - log.Printf("Slave %s lost", slaveID) -} -func (s *FirstFitWattsOnly) ExecutorLost(_ sched.SchedulerDriver, executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, status int) { - log.Printf("Executor %s on slave %s was lost", executorID, slaveID) -} - -func (s *FirstFitWattsOnly) Error(_ sched.SchedulerDriver, err string) { - log.Printf("Receiving an error: %s", err) -} diff --git a/schedulers/proactiveclusterwidecappingfcfs.go b/schedulers/proactiveclusterwidecappingfcfs.go index a35c5dc..8456c7c 100644 --- a/schedulers/proactiveclusterwidecappingfcfs.go +++ b/schedulers/proactiveclusterwidecappingfcfs.go @@ -15,6 +15,7 @@ import ( "strings" "sync" "time" + "os" ) // Decides if to take an offer or not @@ -29,6 +30,7 @@ func (_ *ProactiveClusterwideCapFCFS) takeOffer(offer *mesos.Offer, task def.Tas // electronScheduler implements the Scheduler interface. type ProactiveClusterwideCapFCFS struct { + base // Type embedded to inherit common functions tasksCreated int tasksRunning int tasks []def.Task @@ -58,10 +60,18 @@ type ProactiveClusterwideCapFCFS struct { // Controls when to shutdown pcp logging. PCPLog chan struct{} + + schedTrace *log.Logger } // New electron scheduler. -func NewProactiveClusterwideCapFCFS(tasks []def.Task, ignoreWatts bool) *ProactiveClusterwideCapFCFS { +func NewProactiveClusterwideCapFCFS(tasks []def.Task, ignoreWatts bool, schedTracePrefix string) *ProactiveClusterwideCapFCFS { + + logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") + if err != nil { + log.Fatal(err) + } + s := &ProactiveClusterwideCapFCFS{ tasks: tasks, ignoreWatts: ignoreWatts, @@ -78,6 +88,7 @@ func NewProactiveClusterwideCapFCFS(tasks []def.Task, ignoreWatts bool) *Proacti recapTicker: time.NewTicker(20 * time.Second), isCapping: false, isRecapping: false, + schedTrace: log.New(logFile, "", log.LstdFlags), } return s } @@ -140,17 +151,6 @@ func (s *ProactiveClusterwideCapFCFS) newTask(offer *mesos.Offer, task def.Task) } } -func (s *ProactiveClusterwideCapFCFS) Registered( - _ sched.SchedulerDriver, - frameworkID *mesos.FrameworkID, - masterInfo *mesos.MasterInfo) { - log.Printf("Framework %s registered with master %s", frameworkID, masterInfo) -} - -func (s *ProactiveClusterwideCapFCFS) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) { - log.Printf("Framework re-registered with master %s", masterInfo) -} - func (s *ProactiveClusterwideCapFCFS) Disconnected(sched.SchedulerDriver) { // Need to stop the capping process. s.ticker.Stop() @@ -275,7 +275,8 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive */ taken := false - for i, task := range s.tasks { + for i := 0; i < len(s.tasks); i++ { + task := s.tasks[i] // Don't take offer if it doesn't match our task's host requirement. if !strings.HasPrefix(*offer.Hostname, task.Host) { continue @@ -302,9 +303,11 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive log.Println(err) } log.Printf("Starting on [%s]\n", offer.GetHostname()) - toSchedule := []*mesos.TaskInfo{s.newTask(offer, task)} + taskToSchedule := s.newTask(offer, task) + toSchedule := []*mesos.TaskInfo{taskToSchedule} driver.LaunchTasks([]*mesos.OfferID{offer.Id}, toSchedule, defaultFilter) log.Printf("Inst: %d", *task.Instances) + s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) *task.Instances-- if *task.Instances <= 0 { // All instances of the task have been scheduled. Need to remove it from the list of tasks to schedule. @@ -384,27 +387,3 @@ func (s *ProactiveClusterwideCapFCFS) StatusUpdate(driver sched.SchedulerDriver, log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) } -func (s *ProactiveClusterwideCapFCFS) FrameworkMessage(driver sched.SchedulerDriver, - executorID *mesos.ExecutorID, - slaveID *mesos.SlaveID, - message string) { - - log.Println("Getting a framework message: ", message) - log.Printf("Received a framework message from some unknown source: %s", *executorID.Value) -} - -func (s *ProactiveClusterwideCapFCFS) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) { - log.Printf("Offer %s rescinded", offerID) -} - -func (s *ProactiveClusterwideCapFCFS) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) { - log.Printf("Slave %s lost", slaveID) -} - -func (s *ProactiveClusterwideCapFCFS) ExecutorLost(_ sched.SchedulerDriver, executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, status int) { - log.Printf("Executor %s on slave %s was lost", executorID, slaveID) -} - -func (s *ProactiveClusterwideCapFCFS) Error(_ sched.SchedulerDriver, err string) { - log.Printf("Receiving an error: %s", err) -} diff --git a/schedulers/proactiveclusterwidecappingranked.go b/schedulers/proactiveclusterwidecappingranked.go index 42f5c78..4e9aa82 100644 --- a/schedulers/proactiveclusterwidecappingranked.go +++ b/schedulers/proactiveclusterwidecappingranked.go @@ -26,6 +26,7 @@ import ( "strings" "sync" "time" + "os" ) // Decides if to taken an offer or not @@ -40,6 +41,7 @@ func (_ *ProactiveClusterwideCapRanked) takeOffer(offer *mesos.Offer, task def.T // electronScheduler implements the Scheduler interface type ProactiveClusterwideCapRanked struct { + base // Type embedded to inherit common functions tasksCreated int tasksRunning int tasks []def.Task @@ -69,10 +71,18 @@ type ProactiveClusterwideCapRanked struct { // Controls when to shutdown pcp logging. PCPLog chan struct{} + + schedTrace *log.Logger } // New electron scheduler. -func NewProactiveClusterwideCapRanked(tasks []def.Task, ignoreWatts bool) *ProactiveClusterwideCapRanked { +func NewProactiveClusterwideCapRanked(tasks []def.Task, ignoreWatts bool, schedTracePrefix string) *ProactiveClusterwideCapRanked { + + logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") + if err != nil { + log.Fatal(err) + } + s := &ProactiveClusterwideCapRanked{ tasks: tasks, ignoreWatts: ignoreWatts, @@ -89,6 +99,7 @@ func NewProactiveClusterwideCapRanked(tasks []def.Task, ignoreWatts bool) *Proac recapTicker: time.NewTicker(20 * time.Second), isCapping: false, isRecapping: false, + schedTrace: log.New(logFile, "", log.LstdFlags), } return s } @@ -151,17 +162,6 @@ func (s *ProactiveClusterwideCapRanked) newTask(offer *mesos.Offer, task def.Tas } } -func (s *ProactiveClusterwideCapRanked) Registered( - _ sched.SchedulerDriver, - frameworkID *mesos.FrameworkID, - masterInfo *mesos.MasterInfo) { - log.Printf("Framework %s registered with master %s", frameworkID, masterInfo) -} - -func (s *ProactiveClusterwideCapRanked) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) { - log.Printf("Framework re-registered with master %s", masterInfo) -} - func (s *ProactiveClusterwideCapRanked) Disconnected(sched.SchedulerDriver) { // Need to stop the capping process. s.ticker.Stop() @@ -299,7 +299,8 @@ func (s *ProactiveClusterwideCapRanked) ResourceOffers(driver sched.SchedulerDri */ taken := false - for i, task := range s.tasks { + for i := 0; i < len(s.tasks); i++ { + task := s.tasks[i] // Don't take offer if it doesn't match our task's host requirement. if !strings.HasPrefix(*offer.Hostname, task.Host) { continue @@ -325,9 +326,11 @@ func (s *ProactiveClusterwideCapRanked) ResourceOffers(driver sched.SchedulerDri log.Println("Failed to determine the new cluster wide cap: ", err) } log.Printf("Starting on [%s]\n", offer.GetHostname()) - to_schedule := []*mesos.TaskInfo{s.newTask(offer, task)} + taskToSchedule := s.newTask(offer, task) + to_schedule := []*mesos.TaskInfo{taskToSchedule} driver.LaunchTasks([]*mesos.OfferID{offer.Id}, to_schedule, defaultFilter) log.Printf("Inst: %d", *task.Instances) + s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) *task.Instances-- if *task.Instances <= 0 { // All instances of the task have been scheduled. Need to remove it from the list of tasks to schedule. @@ -408,27 +411,3 @@ func (s *ProactiveClusterwideCapRanked) StatusUpdate(driver sched.SchedulerDrive log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) } -func (s *ProactiveClusterwideCapRanked) FrameworkMessage(driver sched.SchedulerDriver, - executorID *mesos.ExecutorID, - slaveID *mesos.SlaveID, - message string) { - - log.Println("Getting a framework message: ", message) - log.Printf("Received a framework message from some unknown source: %s", *executorID.Value) -} - -func (s *ProactiveClusterwideCapRanked) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) { - log.Printf("Offer %s rescinded", offerID) -} - -func (s *ProactiveClusterwideCapRanked) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) { - log.Printf("Slave %s lost", slaveID) -} - -func (s *ProactiveClusterwideCapRanked) ExecutorLost(_ sched.SchedulerDriver, executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, status int) { - log.Printf("Executor %s on slave %s was lost", executorID, slaveID) -} - -func (s *ProactiveClusterwideCapRanked) Error(_ sched.SchedulerDriver, err string) { - log.Printf("Receiving an error: %s", err) -}