diff --git a/schedulers/bpswClassMapWattsPistonCapping.go b/schedulers/bpswClassMapWattsPistonCapping.go index 114b204..9f725e4 100644 --- a/schedulers/bpswClassMapWattsPistonCapping.go +++ b/schedulers/bpswClassMapWattsPistonCapping.go @@ -12,6 +12,7 @@ import ( sched "github.com/mesos/mesos-go/scheduler" "log" "math" + "os" "sort" "strings" "sync" @@ -32,6 +33,7 @@ func (s *BPSWClassMapWattsPistonCapping) takeOffer(offer *mesos.Offer, task def. } type BPSWClassMapWattsPistonCapping struct { + base // Type embedded to inherit common functions tasksCreated int tasksRunning int tasks []def.Task @@ -57,12 +59,19 @@ type BPSWClassMapWattsPistonCapping struct { // Controls when to shutdown pcp logging PCPLog chan struct{} + + schedTrace *log.Logger } // New electron scheduler -func NewBPSWClassMapWattsPistonCapping(tasks []def.Task, ignoreWatts bool) *BPSWClassMapWattsPistonCapping { +func NewBPSWClassMapWattsPistonCapping(tasks []def.Task, ignoreWatts bool, schedTracePrefix string) *BPSWClassMapWattsPistonCapping { sort.Sort(def.WattsSorter(tasks)) + logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") + if err != nil { + log.Fatal(err) + } + s := &BPSWClassMapWattsPistonCapping{ tasks: tasks, ignoreWatts: ignoreWatts, @@ -75,6 +84,7 @@ func NewBPSWClassMapWattsPistonCapping(tasks []def.Task, ignoreWatts bool) *BPSW RecordPCP: false, ticker: time.NewTicker(5 * time.Second), isCapping: false, + schedTrace: log.New(logFile, "", log.LstdFlags), } return s } @@ -133,18 +143,12 @@ func (s *BPSWClassMapWattsPistonCapping) newTask(offer *mesos.Offer, task def.Ta } } -func (s *BPSWClassMapWattsPistonCapping) Registered( - _ sched.SchedulerDriver, - frameworkID *mesos.FrameworkID, - masterInfo *mesos.MasterInfo) { - log.Printf("Framework %s registered with master %s", frameworkID, masterInfo) -} - -func (s *BPSWClassMapWattsPistonCapping) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) { - log.Printf("Framework re-registered with master %s", masterInfo) -} - func (s *BPSWClassMapWattsPistonCapping) Disconnected(sched.SchedulerDriver) { + // Need to stop the capping process + s.ticker.Stop() + bpswClassMapWattsPistonMutex.Lock() + s.isCapping = false + bpswClassMapWattsPistonMutex.Unlock() log.Println("Framework disconnected with master") } @@ -271,9 +275,11 @@ func (s *BPSWClassMapWattsPistonCapping) ResourceOffers(driver sched.SchedulerDr totalRAM += task.RAM log.Println("Co-Located with: ") coLocated(s.running[offer.GetSlaveId().GoString()]) - tasks = append(tasks, s.newTask(offer, task, nodeClass)) + taskToSchedule := s.newTask(offer, task, nodeClass) + tasks = append(tasks, taskToSchedule) fmt.Println("Inst: ", *task.Instances) + s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) *task.Instances-- partialLoad += ((task.Watts * constants.CapMargin) / s.totalPower[*offer.Hostname]) * 100 @@ -377,27 +383,3 @@ func (s *BPSWClassMapWattsPistonCapping) StatusUpdate(driver sched.SchedulerDriv } log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) } - -func (s *BPSWClassMapWattsPistonCapping) FrameworkMessage( - driver sched.SchedulerDriver, - executorID *mesos.ExecutorID, - slaveID *mesos.SlaveID, - message string) { - - log.Println("Getting a framework message: ", message) - log.Printf("Received a framework message from some unknown source: %s", *executorID.Value) -} - -func (s *BPSWClassMapWattsPistonCapping) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) { - log.Printf("Offer %s rescinded", offerID) -} -func (s *BPSWClassMapWattsPistonCapping) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) { - log.Printf("Slave %s lost", slaveID) -} -func (s *BPSWClassMapWattsPistonCapping) ExecutorLost(_ sched.SchedulerDriver, executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, status int) { - log.Printf("Executor %s on slave %s was lost", executorID, slaveID) -} - -func (s *BPSWClassMapWattsPistonCapping) Error(_ sched.SchedulerDriver, err string) { - log.Printf("Receiving an error: %s", err) -} diff --git a/schedulers/bpswClassMapWattsProacCC.go b/schedulers/bpswClassMapWattsProacCC.go index 1b62a66..cd21b03 100644 --- a/schedulers/bpswClassMapWattsProacCC.go +++ b/schedulers/bpswClassMapWattsProacCC.go @@ -12,10 +12,11 @@ import ( sched "github.com/mesos/mesos-go/scheduler" "log" "math" + "os" + "sort" "strings" "sync" "time" - "sort" ) // Decides if to take an offer or not @@ -32,6 +33,7 @@ func (*BPSWClassMapWattsProacCC) takeOffer(offer *mesos.Offer, task def.Task) bo } type BPSWClassMapWattsProacCC struct { + base // Type embedding to inherit common functions tasksCreated int tasksRunning int tasks []def.Task @@ -60,12 +62,19 @@ type BPSWClassMapWattsProacCC struct { // Controls when to shutdown pcp logging PCPLog chan struct{} + + schedTrace *log.Logger } // New electron scheduler -func NewBPSWClassMapWattsProacCC(tasks []def.Task, ignoreWatts bool) *BPSWClassMapWattsProacCC { +func NewBPSWClassMapWattsProacCC(tasks []def.Task, ignoreWatts bool, schedTracePrefix string) *BPSWClassMapWattsProacCC { sort.Sort(def.WattsSorter(tasks)) + logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") + if err != nil { + log.Fatal(err) + } + s := &BPSWClassMapWattsProacCC{ tasks: tasks, ignoreWatts: ignoreWatts, @@ -82,6 +91,7 @@ func NewBPSWClassMapWattsProacCC(tasks []def.Task, ignoreWatts bool) *BPSWClassM recapTicker: time.NewTicker(20 * time.Second), isCapping: false, isRecapping: false, + schedTrace: log.New(logFile, "", log.LstdFlags), } return s } @@ -144,17 +154,6 @@ func (s *BPSWClassMapWattsProacCC) newTask(offer *mesos.Offer, task def.Task, ne } } -func (s *BPSWClassMapWattsProacCC) Registered( - _ sched.SchedulerDriver, - frameworkID *mesos.FrameworkID, - masterInfo *mesos.MasterInfo) { - log.Printf("Framework %s registered with master %s", frameworkID, masterInfo) -} - -func (s *BPSWClassMapWattsProacCC) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) { - log.Printf("Framework re-registered with master %s", masterInfo) -} - func (s *BPSWClassMapWattsProacCC) Disconnected(sched.SchedulerDriver) { // Need to stop the capping process s.ticker.Stop() @@ -322,9 +321,11 @@ func (s *BPSWClassMapWattsProacCC) ResourceOffers(driver sched.SchedulerDriver, totalRAM += task.RAM log.Println("Co-Located with: ") coLocated(s.running[offer.GetSlaveId().GoString()]) - tasks = append(tasks, s.newTask(offer, task, nodeClass)) + taskToSchedule := s.newTask(offer, task, nodeClass) + tasks = append(tasks, taskToSchedule) fmt.Println("Inst: ", *task.Instances) + s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) *task.Instances-- if *task.Instances <= 0 { @@ -369,7 +370,7 @@ func (s *BPSWClassMapWattsProacCC) StatusUpdate(driver sched.SchedulerDriver, st // Need to remove the task from the window s.capper.TaskFinished(*status.TaskId.Value) // Determining the new cluster wide recap value - tempCap, err := s.capper.Recap(s.totalPower, s.taskMonitor, *status.TaskId.Value) + tempCap, err := s.capper.CleverRecap(s.totalPower, s.taskMonitor, *status.TaskId.Value) //tempCap, err := s.capper.CleverRecap(s.totalPower, s.taskMonitor, *status.TaskId.Value) if err == nil { // If new determined cap value is different from the current recap value, then we need to recap @@ -401,28 +402,3 @@ func (s *BPSWClassMapWattsProacCC) StatusUpdate(driver sched.SchedulerDriver, st } log.Printf("DONE: Task status [%s] for task[%s]", NameFor(status.State), *status.TaskId.Value) } - -func (s *BPSWClassMapWattsProacCC) FrameworkMessage( - driver sched.SchedulerDriver, - executorID *mesos.ExecutorID, - slaveID *mesos.SlaveID, - message string) { - log.Println("Getting a framework message: ", message) - log.Printf("Received framework message from some unknown source: %s", *executorID.Value) -} - -func (s *BPSWClassMapWattsProacCC) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) { - log.Printf("Offer %s rescinded", offerID) -} - -func (s *BPSWClassMapWattsProacCC) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) { - log.Printf("Slave %s lost", slaveID) -} - -func (s *BPSWClassMapWattsProacCC) ExecutorLost(_ sched.SchedulerDriver, executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, status int) { - log.Printf("Executor %s on slave %s was lost", executorID, slaveID) -} - -func (s *BPSWClassMapWattsProacCC) Error(_ sched.SchedulerDriver, err string) { - log.Printf("Receiving an error: %s", err) -}