From 3b46dbc3573ce368dcad68e2a5bb1675d9b3b8d9 Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Sat, 14 Jan 2017 16:20:21 -0500 Subject: [PATCH 01/11] First Fit SortedWatts retrofitted to use ClassMapWatts in the workload. --- .../firstfitSortedWattsClassMapWatts.go | 204 ++++++++++++++++++ 1 file changed, 204 insertions(+) create mode 100644 schedulers/firstfitSortedWattsClassMapWatts.go diff --git a/schedulers/firstfitSortedWattsClassMapWatts.go b/schedulers/firstfitSortedWattsClassMapWatts.go new file mode 100644 index 0000000..1de3e12 --- /dev/null +++ b/schedulers/firstfitSortedWattsClassMapWatts.go @@ -0,0 +1,204 @@ +package schedulers + +import ( + "bitbucket.org/sunybingcloud/electron/def" + "fmt" + "github.com/golang/protobuf/proto" + mesos "github.com/mesos/mesos-go/mesosproto" + "github.com/mesos/mesos-go/mesosutil" + sched "github.com/mesos/mesos-go/scheduler" + "log" + "strings" + "time" + "sort" + "os" +) + +// electron scheduler implements the Scheduler interface +type FirstFitSortedWattsClassMapWatts struct { + base // Type embedded to inherit common features. + tasksCreated int + tasksRunning int + tasks []def.Task + metrics map[string]def.Metric + running map[string]map[string]bool + ignoreWatts bool + + // First set of PCP values are garbage values, signal to logger to start recording when we're + // about to schedule a new task + RecordPCP bool + + // This channel is closed when the program receives an interrupt, + // signalling that the program should shut down. + Shutdown chan struct{} + // This channel is closed after shutdown is closed, and only when all + // outstanding tasks have been cleaned up + Done chan struct{} + + // Controls when to shutdown pcp logging + PCPLog chan struct{} + + schedTrace *log.Logger +} + +// New electorn scheduler +func NewFirstFitSortedWattsClassMapWatts(tasks []def.Task, ignoreWatts bool, schedTracePrefix string) *FirstFitSortedWattsClassMapWatts { + sort.Sort(def.WattsSorter(tasks)) + + logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") + if err != nil { + log.Fatal(err) + } + + s := &FirstFitSortedWattsClassMapWatts{ + tasks: tasks, + ignoreWatts: ignoreWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + RecordPCP: false, + schedTrace: log.New(logFile, "", log.LstdFlags), + } + return s +} + +func (s *FirstFitSortedWattsClassMapWatts) newTask(offer *mesos.Offer, task def.Task, newTaskClass string) *mesos.TaskInfo { + taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) + s.tasksCreated++ + + if !s.RecordPCP { + // Turn on logging + s.RecordPCP = true + time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts + } + + // If this is our first time running into this Agent + if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok { + s.running[offer.GetSlaveId().GoString()] = make(map[string]bool) + } + + // Add task to list of tasks running on node + s.running[offer.GetSlaveId().GoString()][taskName] = true + + resources := []*mesos.Resource{ + mesosutil.NewScalarResource("cpus", task.CPU), + mesosutil.NewScalarResource("mem", task.RAM), + } + + if !s.ignoreWatts { + resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[newTaskClass])) + } + + return &mesos.TaskInfo{ + Name: proto.String(taskName), + TaskId: &mesos.TaskID{ + Value: proto.String("electron-" + taskName), + }, + SlaveId: offer.SlaveId, + Resources: resources, + Command: &mesos.CommandInfo{ + Value: proto.String(task.CMD), + }, + Container: &mesos.ContainerInfo{ + Type: mesos.ContainerInfo_DOCKER.Enum(), + Docker: &mesos.ContainerInfo_DockerInfo{ + Image: proto.String(task.Image), + Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated + }, + }, + } +} + +func (s *FirstFitSortedWattsClassMapWatts) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { + log.Printf("Received %d resource offers", len(offers)) + + for _, offer := range offers { + select { + case <-s.Shutdown: + log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") + driver.DeclineOffer(offer.Id, longFilter) + + log.Println("Number of tasks still running: ", s.tasksRunning) + continue + default: + } + + offerCPU, offerRAM, offerWatts := OfferAgg(offer) + + // First fit strategy + taken := false + for i, task := range s.tasks { + // Check host if it exists + if task.Host != "" { + // Don't take offer if it doens't match our task's host requirement. + if !strings.HasPrefix(*offer.Hostname, task.Host) { + continue + } + } + + // retrieving the node class from the offer + var nodeClass string + for _, attr := range offer.GetAttributes() { + if attr.GetName() == "class" { + nodeClass = attr.GetText().GetValue() + } + } + + // Decision to take the offer or not + if (s.ignoreWatts || (offerWatts >= task.ClassToWatts[nodeClass])) && + (offerCPU >= task.CPU) && (offerRAM >= task.RAM) { + log.Println("Co-Located with: ") + coLocated(s.running[offer.GetSlaveId().GoString()]) + + taskToSchedule := s.newTask(offer, task, nodeClass) + s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) + log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, []*mesos.TaskInfo{taskToSchedule}, defaultFilter) + + taken = true + fmt.Println("Inst: ", *task.Instances) + *task.Instances-- + if *task.Instances <= 0 { + // All instances of task have been scheduled, remove it + s.tasks = append(s.tasks[:i], s.tasks[i+1:]...) + + if len(s.tasks) == 0 { + log.Println("Done scheduling all tasks") + close(s.Shutdown) + } + } + break // Offer taken, move on + } + } + + // If there was no match for the task + if !taken { + fmt.Println("There is not enough resources to launch a task:") + cpus, mem, watts := OfferAgg(offer) + + log.Printf("\n", cpus, mem, watts) + driver.DeclineOffer(offer.Id, defaultFilter) + + } + } +} + +func (s *FirstFitSortedWattsClassMapWatts) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { + log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) + + if *status.State == mesos.TaskState_TASK_RUNNING { + s.tasksRunning++ + } else if IsTerminal(status.State) { + delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) + s.tasksRunning-- + if s.tasksRunning == 0 { + select { + case <-s.Shutdown: + close(s.Done) + default: + } + } + } + log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) +} \ No newline at end of file From cb7e697cfa8c1e0fe94b045d4b8beffcea21020c Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Sat, 14 Jan 2017 16:20:58 -0500 Subject: [PATCH 02/11] First Fit SortedWatts ClassMapWatts with Proactive Clusterwide Capping. --- ...firstfitSortedWattsClassMapWattsProacCC.go | 389 ++++++++++++++++++ 1 file changed, 389 insertions(+) create mode 100644 schedulers/firstfitSortedWattsClassMapWattsProacCC.go diff --git a/schedulers/firstfitSortedWattsClassMapWattsProacCC.go b/schedulers/firstfitSortedWattsClassMapWattsProacCC.go new file mode 100644 index 0000000..09a51cd --- /dev/null +++ b/schedulers/firstfitSortedWattsClassMapWattsProacCC.go @@ -0,0 +1,389 @@ +package schedulers + +import ( + "bitbucket.org/sunybingcloud/electron/def" + "fmt" + "github.com/golang/protobuf/proto" + mesos "github.com/mesos/mesos-go/mesosproto" + "github.com/mesos/mesos-go/mesosutil" + sched "github.com/mesos/mesos-go/scheduler" + "log" + "strings" + "time" + "sort" + "os" + "bitbucket.org/sunybingcloud/electron/pcp" + "sync" + "math" + "bitbucket.org/sunybingcloud/electron/constants" + "bitbucket.org/sunybingcloud/electron/rapl" +) + +// electron scheduler implements the Scheduler interface +type FirstFitSortedWattsClassMapWattsProacCC struct { + base // Type embedded to inherit common features. + tasksCreated int + tasksRunning int + tasks []def.Task + metrics map[string]def.Metric + running map[string]map[string]bool + taskMonitor map[string][]def.Task + availablePower map[string]float64 + totalPower map[string]float64 + ignoreWatts bool + capper *pcp.ClusterwideCapper + ticker *time.Ticker + recapTicker *time.Ticker + isCapping bool // indicate whether we are currently performing cluster-wide capping. + isRecapping bool // indicate whether we are currently performing cluster-wide recapping. + + // First set of PCP values are garbage values, signal to logger to start recording when we're + // about to schedule a new task + RecordPCP bool + + // This channel is closed when the program receives an interrupt, + // signalling that the program should shut down. + Shutdown chan struct{} + // This channel is closed after shutdown is closed, and only when all + // outstanding tasks have been cleaned up + Done chan struct{} + + // Controls when to shutdown pcp logging + PCPLog chan struct{} + + schedTrace *log.Logger +} + +// New electron scheduler +func NewFirstFitSortedWattsClassMapWattsProacCC(tasks []def.Task, ignoreWatts bool, schedTracePrefix string) *FirstFitSortedWattsClassMapWattsProacCC { + sort.Sort(def.WattsSorter(tasks)) + + logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") + if err != nil { + log.Fatal(err) + } + + s := &FirstFitSortedWattsClassMapWattsProacCC{ + tasks: tasks, + ignoreWatts: ignoreWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + taskMonitor: make(map[string][]def.Task), + availablePower: make(map[string]float64), + totalPower: make(map[string]float64), + RecordPCP: false, + capper: pcp.GetClusterwideCapperInstance(), + ticker: time.NewTicker(10 * time.Second), + recapTicker: time.NewTicker(20 * time.Second), + isCapping: false, + isRecapping: false, + schedTrace: log.New(logFile, "", log.LstdFlags), + + } + return s +} + +// mutex +var ffswClassMapWattsProacCCMutex sync.Mutex + +func (s *FirstFitSortedWattsClassMapWattsProacCC) newTask(offer *mesos.Offer, task def.Task, newTaskClass string) *mesos.TaskInfo { + taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) + s.tasksCreated++ + + if !s.RecordPCP { + // Turn on logging. + s.RecordPCP = true + time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts + } + + // If this is our first time running into this Agent + if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok { + s.running[offer.GetSlaveId().GoString()] = make(map[string]bool) + } + + // Setting the task ID to the task. This is done so that we can consider each task to be different, + // even though they have the same parameters. + task.SetTaskID(*proto.String("electron-" + taskName)) + // Add task to the list of tasks running on the node. + s.running[offer.GetSlaveId().GoString()][taskName] = true + if len(s.taskMonitor[*offer.Hostname]) == 0 { + s.taskMonitor[*offer.Hostname] = []def.Task{task} + } else { + s.taskMonitor[*offer.Hostname] = append(s.taskMonitor[*offer.Hostname], task) + } + + resources := []*mesos.Resource{ + mesosutil.NewScalarResource("cpus", task.CPU), + mesosutil.NewScalarResource("mem", task.RAM), + } + + if !s.ignoreWatts { + resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[newTaskClass])) + } + + return &mesos.TaskInfo{ + Name: proto.String(taskName), + TaskId: &mesos.TaskID{ + Value: proto.String("electron-" + taskName), + }, + SlaveId: offer.SlaveId, + Resources: resources, + Command: &mesos.CommandInfo{ + Value: proto.String(task.CMD), + }, + Container: &mesos.ContainerInfo{ + Type: mesos.ContainerInfo_DOCKER.Enum(), + Docker: &mesos.ContainerInfo_DockerInfo{ + Image: proto.String(task.Image), + Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated + }, + }, + } +} + +func (s *FirstFitSortedWattsClassMapWattsProacCC) Disconnected(sched.SchedulerDriver) { + // Need to stop the capping process + s.ticker.Stop() + s.recapTicker.Stop() + ffswClassMapWattsProacCCMutex.Lock() + s.isCapping = false + ffswClassMapWattsProacCCMutex.Unlock() + log.Println("Framework disconnected with master") +} + +// go routine to cap the entire cluster in regular intervals of time +var ffswClassMapWattsProacCCCapValue = 0.0 // initial value to indicate that we haven't capped the cluster yet. +var ffswClassMapWattsProacCCNewCapValue = 0.0 // newly computed cap value +func (s *FirstFitSortedWattsClassMapWattsProacCC) startCapping() { + go func() { + for { + select { + case <-s.ticker.C: + // Need to cap the cluster only if new cap value different from the old cap value. + // This way we don't unnecessarily cap the cluster. + ffswClassMapWattsProacCCMutex.Lock() + if s.isCapping { + if int(math.Floor(ffswClassMapWattsProacCCNewCapValue+0.5)) != int(math.Floor(ffswClassMapWattsProacCCCapValue+0.5)) { + // updating cap value + ffswClassMapWattsProacCCCapValue = ffswClassMapWattsProacCCNewCapValue + if ffswClassMapWattsProacCCCapValue > 0.0 { + for _, host := range constants.Hosts { + // Rounding cap value to the nearest int + if err := rapl.Cap(host, "rapl", int(math.Floor(ffswClassMapWattsProacCCCapValue+0.5))); err != nil { + log.Println(err) + } + } + log.Printf("Capped the cluster to %d", int(math.Floor(ffswClassMapWattsProacCCCapValue+0.5))) + } + } + } + ffswClassMapWattsProacCCMutex.Unlock() + } + } + }() +} + +// go routine to recap the entire cluster in regular intervals of time. +var ffswClassMapWattsProacCCRecapValue = 0.0 // The cluster-wide cap value when recapping. +func (s *FirstFitSortedWattsClassMapWattsProacCC) startRecapping() { + go func() { + for { + select { + case <-s.recapTicker.C: + ffswClassMapWattsProacCCMutex.Lock() + // If stopped performing cluster wide capping, then we need to recap + if s.isRecapping && ffswClassMapWattsProacCCRecapValue > 0.0 { + for _, host := range constants.Hosts { + // Rounding the cap value to the nearest int + if err := rapl.Cap(host, "rapl", int(math.Floor(ffswClassMapWattsProacCCRecapValue+0.5))); err != nil { + log.Println(err) + } + } + log.Printf("Recapping the cluster to %d", int(math.Floor(ffswClassMapWattsProacCCRecapValue+0.5))) + } + // Setting recapping to false + s.isRecapping = false + ffswClassMapWattsProacCCMutex.Unlock() + } + } + }() +} + +// Stop the cluster wide capping +func (s *FirstFitSortedWattsClassMapWattsProacCC) stopCapping() { + if s.isCapping { + log.Println("Stopping the cluster-wide capping.") + s.ticker.Stop() + ffswClassMapWattsProacCCMutex.Lock() + s.isCapping = false + s.isRecapping = true + ffswClassMapWattsProacCCMutex.Unlock() + } +} + +// Stop the cluster wide recapping +func (s *FirstFitSortedWattsClassMapWattsProacCC) stopRecapping() { + // If not capping, then definitely recapping. + if !s.isCapping && s.isRecapping { + log.Println("Stopping the cluster-wide re-capping.") + s.recapTicker.Stop() + ffswClassMapWattsProacCCMutex.Lock() + s.isRecapping = false + ffswClassMapWattsProacCCMutex.Unlock() + } +} + +func (s *FirstFitSortedWattsClassMapWattsProacCC) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { + log.Printf("Received %d resource offers", len(offers)) + + // retrieving the available power for all the hosts in the offers. + for _, offer := range offers { + _, _, offerWatts := OfferAgg(offer) + s.availablePower[*offer.Hostname] = offerWatts + // setting total power if the first time + if _, ok := s.totalPower[*offer.Hostname]; !ok { + s.totalPower[*offer.Hostname] = offerWatts + } + } + + for host, tpower := range s.totalPower { + log.Printf("TotalPower[%s] = %f", host, tpower) + } + + for _, offer := range offers { + select { + case <-s.Shutdown: + log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") + driver.DeclineOffer(offer.Id, longFilter) + + log.Println("Number of tasks still running: ", s.tasksRunning) + continue + default: + } + + offerCPU, offerRAM, offerWatts := OfferAgg(offer) + + // First fit strategy + taken := false + for i, task := range s.tasks { + // Check host if it exists + if task.Host != "" { + // Don't take offer if it doens't match our task's host requirement. + if !strings.HasPrefix(*offer.Hostname, task.Host) { + continue + } + } + + // Retrieving the node class from the offer + var nodeClass string + for _, attr := range offer.GetAttributes() { + if attr.GetName() == "class" { + nodeClass = attr.GetText().GetValue() + } + } + + // Decision to take the offer or not + if (s.ignoreWatts || (offerWatts >= task.ClassToWatts[nodeClass])) && + (offerCPU >= task.CPU) && (offerRAM >= task.RAM) { + + // Capping the cluster if haven't yet started + if !s.isCapping { + ffswClassMapWattsProacCCMutex.Lock() + s.isCapping = true + ffswClassMapWattsProacCCMutex.Unlock() + s.startCapping() + } + + fmt.Println("Watts being used: ", task.ClassToWatts[nodeClass]) + tempCap, err := s.capper.FCFSDeterminedCap(s.totalPower, &task) + if err == nil { + ffswClassMapWattsProacCCMutex.Lock() + ffswClassMapWattsProacCCNewCapValue = tempCap + ffswClassMapWattsProacCCMutex.Unlock() + } else { + log.Println("Failed to determine new cluster-wide cap: ") + log.Println(err) + } + + log.Println("Co-Located with: ") + coLocated(s.running[offer.GetSlaveId().GoString()]) + + taskToSchedule := s.newTask(offer, task, nodeClass) + s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) + log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, []*mesos.TaskInfo{taskToSchedule}, defaultFilter) + + taken = true + fmt.Println("Inst: ", *task.Instances) + *task.Instances-- + if *task.Instances <= 0 { + // All instances of task have been scheduled, remove it + s.tasks = append(s.tasks[:i], s.tasks[i+1:]...) + + if len(s.tasks) == 0 { + log.Println("Done scheduling all tasks") + // Need to stop the cluster-wide capping as there aren't any more tasks to schedule + s.stopCapping() + s.startRecapping() // Load changes after every task finishes and hence, we need to change the capping of the cluster + close(s.Shutdown) + } + } + break // Offer taken, move on + } + } + + // If there was no match for the task + if !taken { + fmt.Println("There is not enough resources to launch a task:") + cpus, mem, watts := OfferAgg(offer) + + log.Printf("\n", cpus, mem, watts) + driver.DeclineOffer(offer.Id, defaultFilter) + } + } +} + +func (s *FirstFitSortedWattsClassMapWattsProacCC) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { + log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) + + if *status.State == mesos.TaskState_TASK_RUNNING { + s.tasksRunning++ + } else if IsTerminal(status.State) { + delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) + // Need to remove the task from the window + s.capper.TaskFinished(*status.TaskId.Value) + // Determining the new cluster wide recap value + //tempCap, err := s.capper.NaiveRecap(s.totalPower, s.taskMonitor, *status.TaskId.Value) + tempCap, err := s.capper.CleverRecap(s.totalPower, s.taskMonitor, *status.TaskId.Value) + if err == nil { + // If new determined cap value is different from the current recap value, then we need to recap + if int(math.Floor(tempCap+0.5)) != int(math.Floor(ffswClassMapWattsProacCCRecapValue+0.5)) { + ffswClassMapWattsProacCCRecapValue = tempCap + ffswClassMapWattsProacCCMutex.Lock() + s.isRecapping = true + ffswClassMapWattsProacCCMutex.Unlock() + log.Printf("Determined re-cap value: %f\n", ffswClassMapWattsProacCCRecapValue) + } else { + ffswClassMapWattsProacCCMutex.Lock() + s.isRecapping = false + ffswClassMapWattsProacCCMutex.Unlock() + } + } else { + log.Println(err) + } + + s.tasksRunning-- + if s.tasksRunning == 0 { + select { + case <-s.Shutdown: + // Need to stop the cluster-wide recapping + s.stopRecapping() + close(s.Done) + default: + } + } + } + log.Printf("DONE: Task status [%s] for task[%s]", NameFor(status.State), *status.TaskId.Value) +} From 6798807a0b204e5f1f3c6b1294a4361d40583ec0 Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Sat, 14 Jan 2017 17:21:46 -0500 Subject: [PATCH 03/11] Fixed bug in logging the correct cap value. --- schedulers/bpswClassMapWattsPistonCapping.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/schedulers/bpswClassMapWattsPistonCapping.go b/schedulers/bpswClassMapWattsPistonCapping.go index 5079980..5eeb413 100644 --- a/schedulers/bpswClassMapWattsPistonCapping.go +++ b/schedulers/bpswClassMapWattsPistonCapping.go @@ -179,7 +179,7 @@ func (s *BPSWClassMapWattsPistonCapping) startCapping() { if err := rapl.Cap(host, "rapl", roundedCapValue); err != nil { log.Println(err) } else { - log.Printf("Capped [%s] at %d", host, int(math.Floor(capValue))) + log.Printf("Capped [%s] at %d", host, roundedCapValue) } bpswClassMapWattsPistonPreviousRoundedCapValues[host] = roundedCapValue } @@ -187,7 +187,7 @@ func (s *BPSWClassMapWattsPistonCapping) startCapping() { if err := rapl.Cap(host, "rapl", roundedCapValue); err != nil { log.Println(err) } else { - log.Printf("Capped [%s] at %d", host, int(math.Floor(capValue+0.5))) + log.Printf("Capped [%s] at %d", host, roundedCapValue) } bpswClassMapWattsPistonPreviousRoundedCapValues[host] = roundedCapValue } From 65263426b476ed755b53bc95c614d16e04307bf3 Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Sat, 14 Jan 2017 17:22:15 -0500 Subject: [PATCH 04/11] formatted code --- .../firstfitSortedWattsClassMapWatts.go | 8 ++-- ...firstfitSortedWattsClassMapWattsProacCC.go | 37 +++++++++---------- 2 files changed, 22 insertions(+), 23 deletions(-) diff --git a/schedulers/firstfitSortedWattsClassMapWatts.go b/schedulers/firstfitSortedWattsClassMapWatts.go index 1de3e12..c4b891f 100644 --- a/schedulers/firstfitSortedWattsClassMapWatts.go +++ b/schedulers/firstfitSortedWattsClassMapWatts.go @@ -8,10 +8,10 @@ import ( "github.com/mesos/mesos-go/mesosutil" sched "github.com/mesos/mesos-go/scheduler" "log" + "os" + "sort" "strings" "time" - "sort" - "os" ) // electron scheduler implements the Scheduler interface @@ -58,7 +58,7 @@ func NewFirstFitSortedWattsClassMapWatts(tasks []def.Task, ignoreWatts bool, sch PCPLog: make(chan struct{}), running: make(map[string]map[string]bool), RecordPCP: false, - schedTrace: log.New(logFile, "", log.LstdFlags), + schedTrace: log.New(logFile, "", log.LstdFlags), } return s } @@ -201,4 +201,4 @@ func (s *FirstFitSortedWattsClassMapWatts) StatusUpdate(driver sched.SchedulerDr } } log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) -} \ No newline at end of file +} diff --git a/schedulers/firstfitSortedWattsClassMapWattsProacCC.go b/schedulers/firstfitSortedWattsClassMapWattsProacCC.go index 09a51cd..14aba6e 100644 --- a/schedulers/firstfitSortedWattsClassMapWattsProacCC.go +++ b/schedulers/firstfitSortedWattsClassMapWattsProacCC.go @@ -1,36 +1,36 @@ package schedulers import ( + "bitbucket.org/sunybingcloud/electron/constants" "bitbucket.org/sunybingcloud/electron/def" + "bitbucket.org/sunybingcloud/electron/pcp" + "bitbucket.org/sunybingcloud/electron/rapl" "fmt" "github.com/golang/protobuf/proto" mesos "github.com/mesos/mesos-go/mesosproto" "github.com/mesos/mesos-go/mesosutil" sched "github.com/mesos/mesos-go/scheduler" "log" - "strings" - "time" - "sort" - "os" - "bitbucket.org/sunybingcloud/electron/pcp" - "sync" "math" - "bitbucket.org/sunybingcloud/electron/constants" - "bitbucket.org/sunybingcloud/electron/rapl" + "os" + "sort" + "strings" + "sync" + "time" ) // electron scheduler implements the Scheduler interface type FirstFitSortedWattsClassMapWattsProacCC struct { - base // Type embedded to inherit common features. - tasksCreated int - tasksRunning int - tasks []def.Task - metrics map[string]def.Metric - running map[string]map[string]bool + base // Type embedded to inherit common features. + tasksCreated int + tasksRunning int + tasks []def.Task + metrics map[string]def.Metric + running map[string]map[string]bool taskMonitor map[string][]def.Task availablePower map[string]float64 totalPower map[string]float64 - ignoreWatts bool + ignoreWatts bool capper *pcp.ClusterwideCapper ticker *time.Ticker recapTicker *time.Ticker @@ -80,7 +80,6 @@ func NewFirstFitSortedWattsClassMapWattsProacCC(tasks []def.Task, ignoreWatts bo isCapping: false, isRecapping: false, schedTrace: log.New(logFile, "", log.LstdFlags), - } return s } @@ -88,7 +87,7 @@ func NewFirstFitSortedWattsClassMapWattsProacCC(tasks []def.Task, ignoreWatts bo // mutex var ffswClassMapWattsProacCCMutex sync.Mutex -func (s *FirstFitSortedWattsClassMapWattsProacCC) newTask(offer *mesos.Offer, task def.Task, newTaskClass string) *mesos.TaskInfo { +func (s *FirstFitSortedWattsClassMapWattsProacCC) newTask(offer *mesos.Offer, task def.Task, newTaskClass string) *mesos.TaskInfo { taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) s.tasksCreated++ @@ -154,7 +153,7 @@ func (s *FirstFitSortedWattsClassMapWattsProacCC) Disconnected(sched.SchedulerDr } // go routine to cap the entire cluster in regular intervals of time -var ffswClassMapWattsProacCCCapValue = 0.0 // initial value to indicate that we haven't capped the cluster yet. +var ffswClassMapWattsProacCCCapValue = 0.0 // initial value to indicate that we haven't capped the cluster yet. var ffswClassMapWattsProacCCNewCapValue = 0.0 // newly computed cap value func (s *FirstFitSortedWattsClassMapWattsProacCC) startCapping() { go func() { @@ -378,7 +377,7 @@ func (s *FirstFitSortedWattsClassMapWattsProacCC) StatusUpdate(driver sched.Sche if s.tasksRunning == 0 { select { case <-s.Shutdown: - // Need to stop the cluster-wide recapping + // Need to stop the cluster-wide recapping s.stopRecapping() close(s.Done) default: From 3b52fb36199ebbf5e8912de0c3e0c8b61c11122e Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Sat, 14 Jan 2017 19:44:50 -0500 Subject: [PATCH 05/11] retrofitted schedulers to use base.go and to log the scheduling trace. Changed the name of piston capper to binpackedpistoncapping and also changed the variable names inside binpackedpistoncapping.go to indicate the name of the scheduler. --- ...toncapper.go => binpackedpistoncapping.go} | 122 ++++++++---------- schedulers/binpacksortedwatts.go | 58 +++------ schedulers/bpswClassMapWatts.go | 3 +- schedulers/bpswClassMapWattsPistonCapping.go | 4 +- schedulers/bpswClassMapWattsProacCC.go | 3 +- schedulers/firstfit.go | 58 +++------ .../firstfitSortedWattsClassMapWatts.go | 3 +- ...firstfitSortedWattsClassMapWattsProacCC.go | 3 +- schedulers/firstfitsortedwatts.go | 57 +++----- schedulers/firstfitwattsonly.go | 58 +++------ schedulers/proactiveclusterwidecappingfcfs.go | 55 +++----- .../proactiveclusterwidecappingranked.go | 55 +++----- 12 files changed, 161 insertions(+), 318 deletions(-) rename schedulers/{pistoncapper.go => binpackedpistoncapping.go} (77%) diff --git a/schedulers/pistoncapper.go b/schedulers/binpackedpistoncapping.go similarity index 77% rename from schedulers/pistoncapper.go rename to schedulers/binpackedpistoncapping.go index d5a22d1..2ed96f4 100644 --- a/schedulers/pistoncapper.go +++ b/schedulers/binpackedpistoncapping.go @@ -12,6 +12,7 @@ import ( sched "github.com/mesos/mesos-go/scheduler" "log" "math" + "os" "strings" "sync" "time" @@ -23,7 +24,8 @@ import ( This is basically extending the BinPacking algorithm to also cap each node at a different values, corresponding to the load on that node. */ -type PistonCapper struct { +type BinPackedPistonCapper struct { + base // Type embedded to inherit common functions tasksCreated int tasksRunning int tasks []def.Task @@ -49,11 +51,19 @@ type PistonCapper struct { // Controls when to shutdown pcp logging. PCPLog chan struct{} + + schedTrace *log.Logger } // New electron scheduler. -func NewPistonCapper(tasks []def.Task, ignoreWatts bool) *PistonCapper { - s := &PistonCapper{ +func NewBinPackedPistonCapper(tasks []def.Task, ignoreWatts bool, schedTracePrefix string) *BinPackedPistonCapper { + + logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") + if err != nil { + log.Fatal(err) + } + + s := &BinPackedPistonCapper{ tasks: tasks, ignoreWatts: ignoreWatts, Shutdown: make(chan struct{}), @@ -65,12 +75,13 @@ func NewPistonCapper(tasks []def.Task, ignoreWatts bool) *PistonCapper { RecordPCP: false, ticker: time.NewTicker(5 * time.Second), isCapping: false, + schedTrace: log.New(logFile, "", log.LstdFlags), } return s } // check whether task fits the offer or not. -func (s *PistonCapper) takeOffer(offerWatts float64, offerCPU float64, offerRAM float64, +func (s *BinPackedPistonCapper) takeOffer(offerWatts float64, offerCPU float64, offerRAM float64, totalWatts float64, totalCPU float64, totalRAM float64, task def.Task) bool { if (s.ignoreWatts || (offerWatts >= (totalWatts + task.Watts))) && (offerCPU >= (totalCPU + task.CPU)) && @@ -82,9 +93,9 @@ func (s *PistonCapper) takeOffer(offerWatts float64, offerCPU float64, offerRAM } // mutex -var mutex sync.Mutex +var bpPistonMutex sync.Mutex -func (s *PistonCapper) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { +func (s *BinPackedPistonCapper) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) s.tasksCreated++ @@ -140,45 +151,39 @@ func (s *PistonCapper) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInf } } -func (s *PistonCapper) Registered( - _ sched.SchedulerDriver, - frameworkID *mesos.FrameworkID, - masterInfo *mesos.MasterInfo) { - log.Printf("Framework %s registered with master %s", frameworkID, masterInfo) -} - -func (s *PistonCapper) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) { - log.Printf("Framework re-registered with master %s", masterInfo) -} - -func (s *PistonCapper) Disconnected(sched.SchedulerDriver) { +func (s *BinPackedPistonCapper) Disconnected(sched.SchedulerDriver) { + // Need to stop the capping process + s.ticker.Stop() + bpPistonMutex.Lock() + s.isCapping = false + bpPistonMutex.Unlock() log.Println("Framework disconnected with master") } // go routine to cap the each node in the cluster at regular intervals of time. -var capValues = make(map[string]float64) +var bpPistonCapValues = make(map[string]float64) // Storing the previous cap value for each host so as to not repeatedly cap the nodes to the same value. (reduces overhead) -var previousRoundedCapValues = make(map[string]int) +var bpPistonPreviousRoundedCapValues = make(map[string]int) -func (s *PistonCapper) startCapping() { +func (s *BinPackedPistonCapper) startCapping() { go func() { for { select { case <-s.ticker.C: // Need to cap each node - mutex.Lock() - for host, capValue := range capValues { + bpPistonMutex.Lock() + for host, capValue := range bpPistonCapValues { roundedCapValue := int(math.Floor(capValue + 0.5)) // has the cap value changed - if prevRoundedCap, ok := previousRoundedCapValues[host]; ok { + if prevRoundedCap, ok := bpPistonPreviousRoundedCapValues[host]; ok { if prevRoundedCap != roundedCapValue { if err := rapl.Cap(host, "rapl", roundedCapValue); err != nil { log.Println(err) } else { log.Printf("Capped [%s] at %d", host, int(math.Floor(capValue+0.5))) } - previousRoundedCapValues[host] = roundedCapValue + bpPistonPreviousRoundedCapValues[host] = roundedCapValue } } else { if err := rapl.Cap(host, "rapl", roundedCapValue); err != nil { @@ -186,27 +191,27 @@ func (s *PistonCapper) startCapping() { } else { log.Printf("Capped [%s] at %d", host, int(math.Floor(capValue+0.5))) } - previousRoundedCapValues[host] = roundedCapValue + bpPistonPreviousRoundedCapValues[host] = roundedCapValue } } - mutex.Unlock() + bpPistonMutex.Unlock() } } }() } // Stop the capping -func (s *PistonCapper) stopCapping() { +func (s *BinPackedPistonCapper) stopCapping() { if s.isCapping { log.Println("Stopping the capping.") s.ticker.Stop() - mutex.Lock() + bpPistonMutex.Lock() s.isCapping = false - mutex.Unlock() + bpPistonMutex.Unlock() } } -func (s *PistonCapper) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { +func (s *BinPackedPistonCapper) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { log.Printf("Received %d resource offers", len(offers)) // retrieving the total power for each host in the offers @@ -249,7 +254,8 @@ func (s *PistonCapper) ResourceOffers(driver sched.SchedulerDriver, offers []*me // Store the partialLoad for host corresponding to this offer. // Once we can't fit any more tasks, we update capValue for this host with partialLoad and then launch the fit tasks. partialLoad := 0.0 - for i, task := range s.tasks { + for i := 0; i < len(s.tasks); i++ { + task := s.tasks[i] // Check host if it exists if task.Host != "" { // Don't take offer if it doens't match our task's host requirement. @@ -274,9 +280,11 @@ func (s *PistonCapper) ResourceOffers(driver sched.SchedulerDriver, offers []*me totalRAM += task.RAM log.Println("Co-Located with: ") coLocated(s.running[offer.GetSlaveId().GoString()]) - fitTasks = append(fitTasks, s.newTask(offer, task)) + taskToSchedule := s.newTask(offer, task) + fitTasks = append(fitTasks, taskToSchedule) log.Println("Inst: ", *task.Instances) + s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) *task.Instances-- // updating the cap value for offer.Hostname partialLoad += ((task.Watts * constants.CapMargin) / s.totalPower[*offer.Hostname]) * 100 @@ -297,9 +305,9 @@ func (s *PistonCapper) ResourceOffers(driver sched.SchedulerDriver, offers []*me if taken { // Updating the cap value for offer.Hostname - mutex.Lock() - capValues[*offer.Hostname] += partialLoad - mutex.Unlock() + bpPistonMutex.Lock() + bpPistonCapValues[*offer.Hostname] += partialLoad + bpPistonMutex.Unlock() log.Printf("Starting on [%s]\n", offer.GetHostname()) driver.LaunchTasks([]*mesos.OfferID{offer.Id}, fitTasks, defaultFilter) } else { @@ -314,7 +322,7 @@ func (s *PistonCapper) ResourceOffers(driver sched.SchedulerDriver, offers []*me } // Remove finished task from the taskMonitor -func (s *PistonCapper) deleteFromTaskMonitor(finishedTaskID string) (def.Task, string, error) { +func (s *BinPackedPistonCapper) deleteFromTaskMonitor(finishedTaskID string) (def.Task, string, error) { hostOfFinishedTask := "" indexOfFinishedTask := -1 found := false @@ -345,13 +353,13 @@ func (s *PistonCapper) deleteFromTaskMonitor(finishedTaskID string) (def.Task, s return finishedTask, hostOfFinishedTask, nil } -func (s *PistonCapper) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { +func (s *BinPackedPistonCapper) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { log.Printf("Received task status [%s] for task [%s]\n", NameFor(status.State), *status.TaskId.Value) if *status.State == mesos.TaskState_TASK_RUNNING { - mutex.Lock() + bpPistonMutex.Lock() s.tasksRunning++ - mutex.Unlock() + bpPistonMutex.Unlock() } else if IsTerminal(status.State) { delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) // Deleting the task from the taskMonitor @@ -361,14 +369,14 @@ func (s *PistonCapper) StatusUpdate(driver sched.SchedulerDriver, status *mesos. } // Need to update the cap values for host of the finishedTask - mutex.Lock() - capValues[hostOfFinishedTask] -= ((finishedTask.Watts * constants.CapMargin) / s.totalPower[hostOfFinishedTask]) * 100 + bpPistonMutex.Lock() + bpPistonCapValues[hostOfFinishedTask] -= ((finishedTask.Watts * constants.CapMargin) / s.totalPower[hostOfFinishedTask]) * 100 // Checking to see if the cap value has become 0, in which case we uncap the host. - if int(math.Floor(capValues[hostOfFinishedTask]+0.5)) == 0 { - capValues[hostOfFinishedTask] = 100 + if int(math.Floor(bpPistonCapValues[hostOfFinishedTask]+0.5)) == 0 { + bpPistonCapValues[hostOfFinishedTask] = 100 } s.tasksRunning-- - mutex.Unlock() + bpPistonMutex.Unlock() if s.tasksRunning == 0 { select { @@ -381,27 +389,3 @@ func (s *PistonCapper) StatusUpdate(driver sched.SchedulerDriver, status *mesos. } log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) } - -func (s *PistonCapper) FrameworkMessage( - driver sched.SchedulerDriver, - executorID *mesos.ExecutorID, - slaveID *mesos.SlaveID, - message string) { - - log.Println("Getting a framework message: ", message) - log.Printf("Received a framework message from some unknown source: %s", *executorID.Value) -} - -func (s *PistonCapper) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) { - log.Printf("Offer %s rescinded", offerID) -} -func (s *PistonCapper) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) { - log.Printf("Slave %s lost", slaveID) -} -func (s *PistonCapper) ExecutorLost(_ sched.SchedulerDriver, executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, status int) { - log.Printf("Executor %s on slave %s was lost", executorID, slaveID) -} - -func (s *PistonCapper) Error(_ sched.SchedulerDriver, err string) { - log.Printf("Receiving an error: %s", err) -} diff --git a/schedulers/binpacksortedwatts.go b/schedulers/binpacksortedwatts.go index b05a3e3..f8c43ef 100644 --- a/schedulers/binpacksortedwatts.go +++ b/schedulers/binpacksortedwatts.go @@ -11,6 +11,7 @@ import ( "sort" "strings" "time" + "os" ) // Decides if to take an offer or not @@ -28,6 +29,7 @@ func (*BinPackSortedWatts) takeOffer(offer *mesos.Offer, task def.Task) bool { } type BinPackSortedWatts struct { + base // Type embedded to inherit common functions tasksCreated int tasksRunning int tasks []def.Task @@ -48,12 +50,19 @@ type BinPackSortedWatts struct { // Controls when to shutdown pcp logging PCPLog chan struct{} + + schedTrace *log.Logger } // New electron scheduler -func NewBinPackSortedWatts(tasks []def.Task, ignoreWatts bool) *BinPackSortedWatts { +func NewBinPackSortedWatts(tasks []def.Task, ignoreWatts bool, schedTracePrefix string) *BinPackSortedWatts { sort.Sort(def.WattsSorter(tasks)) + logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") + if err != nil { + log.Fatal(err) + } + s := &BinPackSortedWatts{ tasks: tasks, ignoreWatts: ignoreWatts, @@ -62,6 +71,7 @@ func NewBinPackSortedWatts(tasks []def.Task, ignoreWatts bool) *BinPackSortedWat PCPLog: make(chan struct{}), running: make(map[string]map[string]bool), RecordPCP: false, + schedTrace: log.New(logFile, "", log.LstdFlags), } return s } @@ -113,21 +123,6 @@ func (s *BinPackSortedWatts) newTask(offer *mesos.Offer, task def.Task) *mesos.T } } -func (s *BinPackSortedWatts) Registered( - _ sched.SchedulerDriver, - frameworkID *mesos.FrameworkID, - masterInfo *mesos.MasterInfo) { - log.Printf("Framework %s registered with master %s", frameworkID, masterInfo) -} - -func (s *BinPackSortedWatts) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) { - log.Printf("Framework re-registered with master %s", masterInfo) -} - -func (s *BinPackSortedWatts) Disconnected(sched.SchedulerDriver) { - log.Println("Framework disconnected with master") -} - func (s *BinPackSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { log.Printf("Received %d resource offers", len(offers)) @@ -150,7 +145,8 @@ func (s *BinPackSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offers totalWatts := 0.0 totalCPU := 0.0 totalRAM := 0.0 - for i, task := range s.tasks { + for i := 0; i < len(s.tasks); i++ { + task := s.tasks[i] // Check host if it exists if task.Host != "" { @@ -172,9 +168,11 @@ func (s *BinPackSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offers totalRAM += task.RAM log.Println("Co-Located with: ") coLocated(s.running[offer.GetSlaveId().GoString()]) - tasks = append(tasks, s.newTask(offer, task)) + taskToSchedule := s.newTask(offer, task) + tasks = append(tasks, taskToSchedule) fmt.Println("Inst: ", *task.Instances) + s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) *task.Instances-- if *task.Instances <= 0 { @@ -225,27 +223,3 @@ func (s *BinPackSortedWatts) StatusUpdate(driver sched.SchedulerDriver, status * } log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) } - -func (s *BinPackSortedWatts) FrameworkMessage( - driver sched.SchedulerDriver, - executorID *mesos.ExecutorID, - slaveID *mesos.SlaveID, - message string) { - - log.Println("Getting a framework message: ", message) - log.Printf("Received a framework message from some unknown source: %s", *executorID.Value) -} - -func (s *BinPackSortedWatts) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) { - log.Printf("Offer %s rescinded", offerID) -} -func (s *BinPackSortedWatts) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) { - log.Printf("Slave %s lost", slaveID) -} -func (s *BinPackSortedWatts) ExecutorLost(_ sched.SchedulerDriver, executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, status int) { - log.Printf("Executor %s on slave %s was lost", executorID, slaveID) -} - -func (s *BinPackSortedWatts) Error(_ sched.SchedulerDriver, err string) { - log.Printf("Receiving an error: %s", err) -} diff --git a/schedulers/bpswClassMapWatts.go b/schedulers/bpswClassMapWatts.go index f3167ac..1196459 100644 --- a/schedulers/bpswClassMapWatts.go +++ b/schedulers/bpswClassMapWatts.go @@ -145,7 +145,8 @@ func (s *BPSWClassMapWatts) ResourceOffers(driver sched.SchedulerDriver, offers totalWatts := 0.0 totalCPU := 0.0 totalRAM := 0.0 - for i, task := range s.tasks { + for i := 0; i < len(s.tasks); i++ { + task := s.tasks[i] // Check host if it exists if task.Host != "" { diff --git a/schedulers/bpswClassMapWattsPistonCapping.go b/schedulers/bpswClassMapWattsPistonCapping.go index 5eeb413..cae8cc3 100644 --- a/schedulers/bpswClassMapWattsPistonCapping.go +++ b/schedulers/bpswClassMapWattsPistonCapping.go @@ -247,8 +247,8 @@ func (s *BPSWClassMapWattsPistonCapping) ResourceOffers(driver sched.SchedulerDr // Store the partialLoad for host corresponding to this offer // Once we can't fit any more tasks, we update the capValue for this host with partialLoad and then launch the fitted tasks. partialLoad := 0.0 - for i, task := range s.tasks { - + for i := 0; i < len(s.tasks); i++ { + task := s.tasks[i] // Check host if it exists if task.Host != "" { // Don't take offer if it doesn't match our task's host requirement diff --git a/schedulers/bpswClassMapWattsProacCC.go b/schedulers/bpswClassMapWattsProacCC.go index 4c4506a..ddcadc1 100644 --- a/schedulers/bpswClassMapWattsProacCC.go +++ b/schedulers/bpswClassMapWattsProacCC.go @@ -282,7 +282,8 @@ func (s *BPSWClassMapWattsProacCC) ResourceOffers(driver sched.SchedulerDriver, totalWatts := 0.0 totalCPU := 0.0 totalRAM := 0.0 - for i, task := range s.tasks { + for i := 0; i < len(s.tasks); i++ { + task := s.tasks[i] // Check host if it exists if task.Host != "" { // Don't take offer it it doesn't match our task's host requirement. diff --git a/schedulers/firstfit.go b/schedulers/firstfit.go index e426ab1..d7819de 100644 --- a/schedulers/firstfit.go +++ b/schedulers/firstfit.go @@ -10,6 +10,7 @@ import ( "log" "strings" "time" + "os" ) // Decides if to take an offer or not @@ -28,6 +29,7 @@ func (s *FirstFit) takeOffer(offer *mesos.Offer, task def.Task) bool { // electronScheduler implements the Scheduler interface type FirstFit struct { + base // Type embedded to inherit common functions tasksCreated int tasksRunning int tasks []def.Task @@ -48,10 +50,17 @@ type FirstFit struct { // Controls when to shutdown pcp logging PCPLog chan struct{} + + schedTrace *log.Logger } // New electron scheduler -func NewFirstFit(tasks []def.Task, ignoreWatts bool) *FirstFit { +func NewFirstFit(tasks []def.Task, ignoreWatts bool, schedTracePrefix string) *FirstFit { + + logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") + if err != nil { + log.Fatal(err) + } s := &FirstFit{ tasks: tasks, @@ -61,6 +70,7 @@ func NewFirstFit(tasks []def.Task, ignoreWatts bool) *FirstFit { PCPLog: make(chan struct{}), running: make(map[string]map[string]bool), RecordPCP: false, + schedTrace: log.New(logFile, "", log.LstdFlags), } return s } @@ -112,21 +122,6 @@ func (s *FirstFit) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { } } -func (s *FirstFit) Registered( - _ sched.SchedulerDriver, - frameworkID *mesos.FrameworkID, - masterInfo *mesos.MasterInfo) { - log.Printf("Framework %s registered with master %s", frameworkID, masterInfo) -} - -func (s *FirstFit) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) { - log.Printf("Framework re-registered with master %s", masterInfo) -} - -func (s *FirstFit) Disconnected(sched.SchedulerDriver) { - log.Println("Framework disconnected with master") -} - func (s *FirstFit) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { log.Printf("Received %d resource offers", len(offers)) @@ -146,7 +141,8 @@ func (s *FirstFit) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos. // First fit strategy taken := false - for i, task := range s.tasks { + for i := 0; i < len(s.tasks); i++ { + task := s.tasks[i] // Check host if it exists if task.Host != "" { @@ -162,7 +158,8 @@ func (s *FirstFit) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos. log.Println("Co-Located with: ") coLocated(s.running[offer.GetSlaveId().GoString()]) - tasks = append(tasks, s.newTask(offer, task)) + taskToSchedule := s.newTask(offer, task) + tasks = append(tasks, taskToSchedule) log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) @@ -170,6 +167,7 @@ func (s *FirstFit) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos. taken = true fmt.Println("Inst: ", *task.Instances) + s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) *task.Instances-- if *task.Instances <= 0 { @@ -216,27 +214,3 @@ func (s *FirstFit) StatusUpdate(driver sched.SchedulerDriver, status *mesos.Task } log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) } - -func (s *FirstFit) FrameworkMessage( - driver sched.SchedulerDriver, - executorID *mesos.ExecutorID, - slaveID *mesos.SlaveID, - message string) { - - log.Println("Getting a framework message: ", message) - log.Printf("Received a framework message from some unknown source: %s", *executorID.Value) -} - -func (s *FirstFit) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) { - log.Printf("Offer %s rescinded", offerID) -} -func (s *FirstFit) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) { - log.Printf("Slave %s lost", slaveID) -} -func (s *FirstFit) ExecutorLost(_ sched.SchedulerDriver, executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, status int) { - log.Printf("Executor %s on slave %s was lost", executorID, slaveID) -} - -func (s *FirstFit) Error(_ sched.SchedulerDriver, err string) { - log.Printf("Receiving an error: %s", err) -} diff --git a/schedulers/firstfitSortedWattsClassMapWatts.go b/schedulers/firstfitSortedWattsClassMapWatts.go index c4b891f..4a03d89 100644 --- a/schedulers/firstfitSortedWattsClassMapWatts.go +++ b/schedulers/firstfitSortedWattsClassMapWatts.go @@ -128,7 +128,8 @@ func (s *FirstFitSortedWattsClassMapWatts) ResourceOffers(driver sched.Scheduler // First fit strategy taken := false - for i, task := range s.tasks { + for i := 0; i < len(s.tasks); i++ { + task := s.tasks[i] // Check host if it exists if task.Host != "" { // Don't take offer if it doens't match our task's host requirement. diff --git a/schedulers/firstfitSortedWattsClassMapWattsProacCC.go b/schedulers/firstfitSortedWattsClassMapWattsProacCC.go index 14aba6e..3cc9fb9 100644 --- a/schedulers/firstfitSortedWattsClassMapWattsProacCC.go +++ b/schedulers/firstfitSortedWattsClassMapWattsProacCC.go @@ -266,7 +266,8 @@ func (s *FirstFitSortedWattsClassMapWattsProacCC) ResourceOffers(driver sched.Sc // First fit strategy taken := false - for i, task := range s.tasks { + for i := 0; i < len(s.tasks); i++ { + task := s.tasks[i] // Check host if it exists if task.Host != "" { // Don't take offer if it doens't match our task's host requirement. diff --git a/schedulers/firstfitsortedwatts.go b/schedulers/firstfitsortedwatts.go index 9067e1c..fbd740c 100644 --- a/schedulers/firstfitsortedwatts.go +++ b/schedulers/firstfitsortedwatts.go @@ -11,6 +11,7 @@ import ( "sort" "strings" "time" + "os" ) // Decides if to take an offer or not @@ -29,6 +30,7 @@ func (s *FirstFitSortedWatts) takeOffer(offer *mesos.Offer, task def.Task) bool // electronScheduler implements the Scheduler interface type FirstFitSortedWatts struct { + base // Type embedded to inherit common functions tasksCreated int tasksRunning int tasks []def.Task @@ -49,13 +51,20 @@ type FirstFitSortedWatts struct { // Controls when to shutdown pcp logging PCPLog chan struct{} + + schedTrace *log.Logger } // New electron scheduler -func NewFirstFitSortedWatts(tasks []def.Task, ignoreWatts bool) *FirstFitSortedWatts { +func NewFirstFitSortedWatts(tasks []def.Task, ignoreWatts bool, schedTracePrefix string) *FirstFitSortedWatts { sort.Sort(def.WattsSorter(tasks)) + logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") + if err != nil { + log.Fatal(err) + } + s := &FirstFitSortedWatts{ tasks: tasks, ignoreWatts: ignoreWatts, @@ -64,6 +73,7 @@ func NewFirstFitSortedWatts(tasks []def.Task, ignoreWatts bool) *FirstFitSortedW PCPLog: make(chan struct{}), running: make(map[string]map[string]bool), RecordPCP: false, + schedTrace: log.New(logFile, "", log.LstdFlags), } return s } @@ -115,21 +125,6 @@ func (s *FirstFitSortedWatts) newTask(offer *mesos.Offer, task def.Task) *mesos. } } -func (s *FirstFitSortedWatts) Registered( - _ sched.SchedulerDriver, - frameworkID *mesos.FrameworkID, - masterInfo *mesos.MasterInfo) { - log.Printf("Framework %s registered with master %s", frameworkID, masterInfo) -} - -func (s *FirstFitSortedWatts) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) { - log.Printf("Framework re-registered with master %s", masterInfo) -} - -func (s *FirstFitSortedWatts) Disconnected(sched.SchedulerDriver) { - log.Println("Framework disconnected with master") -} - func (s *FirstFitSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { log.Printf("Received %d resource offers", len(offers)) @@ -149,7 +144,8 @@ func (s *FirstFitSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offer // First fit strategy taken := false - for i, task := range s.tasks { + for i := 0; i < len(s.tasks); i++ { + task := s.tasks[i] // Check host if it exists if task.Host != "" { @@ -165,7 +161,8 @@ func (s *FirstFitSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offer log.Println("Co-Located with: ") coLocated(s.running[offer.GetSlaveId().GoString()]) - tasks = append(tasks, s.newTask(offer, task)) + taskToSchedule := s.newTask(offer, task) + tasks = append(tasks, taskToSchedule) log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) @@ -173,6 +170,7 @@ func (s *FirstFitSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offer taken = true fmt.Println("Inst: ", *task.Instances) + s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) *task.Instances-- if *task.Instances <= 0 { @@ -219,26 +217,3 @@ func (s *FirstFitSortedWatts) StatusUpdate(driver sched.SchedulerDriver, status log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) } -func (s *FirstFitSortedWatts) FrameworkMessage( - driver sched.SchedulerDriver, - executorID *mesos.ExecutorID, - slaveID *mesos.SlaveID, - message string) { - - log.Println("Getting a framework message: ", message) - log.Printf("Received a framework message from some unknown source: %s", *executorID.Value) -} - -func (s *FirstFitSortedWatts) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) { - log.Printf("Offer %s rescinded", offerID) -} -func (s *FirstFitSortedWatts) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) { - log.Printf("Slave %s lost", slaveID) -} -func (s *FirstFitSortedWatts) ExecutorLost(_ sched.SchedulerDriver, executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, status int) { - log.Printf("Executor %s on slave %s was lost", executorID, slaveID) -} - -func (s *FirstFitSortedWatts) Error(_ sched.SchedulerDriver, err string) { - log.Printf("Receiving an error: %s", err) -} diff --git a/schedulers/firstfitwattsonly.go b/schedulers/firstfitwattsonly.go index e5962a7..798a5f7 100644 --- a/schedulers/firstfitwattsonly.go +++ b/schedulers/firstfitwattsonly.go @@ -10,6 +10,7 @@ import ( "log" "strings" "time" + "os" ) // Decides if to take an offer or not @@ -27,6 +28,7 @@ func (*FirstFitWattsOnly) takeOffer(offer *mesos.Offer, task def.Task) bool { } type FirstFitWattsOnly struct { + base // Type embedded to inherit common functions tasksCreated int tasksRunning int tasks []def.Task @@ -47,10 +49,17 @@ type FirstFitWattsOnly struct { // Controls when to shutdown pcp logging PCPLog chan struct{} + + schedTrace *log.Logger } // New electron scheduler -func NewFirstFitWattsOnly(tasks []def.Task, ignoreWatts bool) *FirstFitWattsOnly { +func NewFirstFitWattsOnly(tasks []def.Task, ignoreWatts bool, schedTracePrefix string) *FirstFitWattsOnly { + + logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") + if err != nil { + log.Fatal(err) + } s := &FirstFitWattsOnly{ tasks: tasks, @@ -60,6 +69,7 @@ func NewFirstFitWattsOnly(tasks []def.Task, ignoreWatts bool) *FirstFitWattsOnly PCPLog: make(chan struct{}), running: make(map[string]map[string]bool), RecordPCP: false, + schedTrace: log.New(logFile, "", log.LstdFlags), } return s } @@ -106,21 +116,6 @@ func (s *FirstFitWattsOnly) newTask(offer *mesos.Offer, task def.Task) *mesos.Ta } } -func (s *FirstFitWattsOnly) Registered( - _ sched.SchedulerDriver, - frameworkID *mesos.FrameworkID, - masterInfo *mesos.MasterInfo) { - log.Printf("Framework %s registered with master %s", frameworkID, masterInfo) -} - -func (s *FirstFitWattsOnly) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) { - log.Printf("Framework re-registered with master %s", masterInfo) -} - -func (s *FirstFitWattsOnly) Disconnected(sched.SchedulerDriver) { - log.Println("Framework disconnected with master") -} - func (s *FirstFitWattsOnly) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { log.Printf("Received %d resource offers", len(offers)) @@ -140,7 +135,8 @@ func (s *FirstFitWattsOnly) ResourceOffers(driver sched.SchedulerDriver, offers // First fit strategy taken := false - for i, task := range s.tasks { + for i := 0; i < len(s.tasks); i++ { + task := s.tasks[i] // Check host if it exists if task.Host != "" { @@ -156,7 +152,8 @@ func (s *FirstFitWattsOnly) ResourceOffers(driver sched.SchedulerDriver, offers log.Println("Co-Located with: ") coLocated(s.running[offer.GetSlaveId().GoString()]) - tasks = append(tasks, s.newTask(offer, task)) + taskToSchedule := s.newTask(offer, task) + tasks = append(tasks, taskToSchedule) log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) @@ -164,6 +161,7 @@ func (s *FirstFitWattsOnly) ResourceOffers(driver sched.SchedulerDriver, offers taken = true fmt.Println("Inst: ", *task.Instances) + s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) *task.Instances-- if *task.Instances <= 0 { @@ -210,27 +208,3 @@ func (s *FirstFitWattsOnly) StatusUpdate(driver sched.SchedulerDriver, status *m } log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) } - -func (s *FirstFitWattsOnly) FrameworkMessage( - driver sched.SchedulerDriver, - executorID *mesos.ExecutorID, - slaveID *mesos.SlaveID, - message string) { - - log.Println("Getting a framework message: ", message) - log.Printf("Received a framework message from some unknown source: %s", *executorID.Value) -} - -func (s *FirstFitWattsOnly) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) { - log.Printf("Offer %s rescinded", offerID) -} -func (s *FirstFitWattsOnly) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) { - log.Printf("Slave %s lost", slaveID) -} -func (s *FirstFitWattsOnly) ExecutorLost(_ sched.SchedulerDriver, executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, status int) { - log.Printf("Executor %s on slave %s was lost", executorID, slaveID) -} - -func (s *FirstFitWattsOnly) Error(_ sched.SchedulerDriver, err string) { - log.Printf("Receiving an error: %s", err) -} diff --git a/schedulers/proactiveclusterwidecappingfcfs.go b/schedulers/proactiveclusterwidecappingfcfs.go index a35c5dc..8456c7c 100644 --- a/schedulers/proactiveclusterwidecappingfcfs.go +++ b/schedulers/proactiveclusterwidecappingfcfs.go @@ -15,6 +15,7 @@ import ( "strings" "sync" "time" + "os" ) // Decides if to take an offer or not @@ -29,6 +30,7 @@ func (_ *ProactiveClusterwideCapFCFS) takeOffer(offer *mesos.Offer, task def.Tas // electronScheduler implements the Scheduler interface. type ProactiveClusterwideCapFCFS struct { + base // Type embedded to inherit common functions tasksCreated int tasksRunning int tasks []def.Task @@ -58,10 +60,18 @@ type ProactiveClusterwideCapFCFS struct { // Controls when to shutdown pcp logging. PCPLog chan struct{} + + schedTrace *log.Logger } // New electron scheduler. -func NewProactiveClusterwideCapFCFS(tasks []def.Task, ignoreWatts bool) *ProactiveClusterwideCapFCFS { +func NewProactiveClusterwideCapFCFS(tasks []def.Task, ignoreWatts bool, schedTracePrefix string) *ProactiveClusterwideCapFCFS { + + logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") + if err != nil { + log.Fatal(err) + } + s := &ProactiveClusterwideCapFCFS{ tasks: tasks, ignoreWatts: ignoreWatts, @@ -78,6 +88,7 @@ func NewProactiveClusterwideCapFCFS(tasks []def.Task, ignoreWatts bool) *Proacti recapTicker: time.NewTicker(20 * time.Second), isCapping: false, isRecapping: false, + schedTrace: log.New(logFile, "", log.LstdFlags), } return s } @@ -140,17 +151,6 @@ func (s *ProactiveClusterwideCapFCFS) newTask(offer *mesos.Offer, task def.Task) } } -func (s *ProactiveClusterwideCapFCFS) Registered( - _ sched.SchedulerDriver, - frameworkID *mesos.FrameworkID, - masterInfo *mesos.MasterInfo) { - log.Printf("Framework %s registered with master %s", frameworkID, masterInfo) -} - -func (s *ProactiveClusterwideCapFCFS) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) { - log.Printf("Framework re-registered with master %s", masterInfo) -} - func (s *ProactiveClusterwideCapFCFS) Disconnected(sched.SchedulerDriver) { // Need to stop the capping process. s.ticker.Stop() @@ -275,7 +275,8 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive */ taken := false - for i, task := range s.tasks { + for i := 0; i < len(s.tasks); i++ { + task := s.tasks[i] // Don't take offer if it doesn't match our task's host requirement. if !strings.HasPrefix(*offer.Hostname, task.Host) { continue @@ -302,9 +303,11 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive log.Println(err) } log.Printf("Starting on [%s]\n", offer.GetHostname()) - toSchedule := []*mesos.TaskInfo{s.newTask(offer, task)} + taskToSchedule := s.newTask(offer, task) + toSchedule := []*mesos.TaskInfo{taskToSchedule} driver.LaunchTasks([]*mesos.OfferID{offer.Id}, toSchedule, defaultFilter) log.Printf("Inst: %d", *task.Instances) + s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) *task.Instances-- if *task.Instances <= 0 { // All instances of the task have been scheduled. Need to remove it from the list of tasks to schedule. @@ -384,27 +387,3 @@ func (s *ProactiveClusterwideCapFCFS) StatusUpdate(driver sched.SchedulerDriver, log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) } -func (s *ProactiveClusterwideCapFCFS) FrameworkMessage(driver sched.SchedulerDriver, - executorID *mesos.ExecutorID, - slaveID *mesos.SlaveID, - message string) { - - log.Println("Getting a framework message: ", message) - log.Printf("Received a framework message from some unknown source: %s", *executorID.Value) -} - -func (s *ProactiveClusterwideCapFCFS) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) { - log.Printf("Offer %s rescinded", offerID) -} - -func (s *ProactiveClusterwideCapFCFS) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) { - log.Printf("Slave %s lost", slaveID) -} - -func (s *ProactiveClusterwideCapFCFS) ExecutorLost(_ sched.SchedulerDriver, executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, status int) { - log.Printf("Executor %s on slave %s was lost", executorID, slaveID) -} - -func (s *ProactiveClusterwideCapFCFS) Error(_ sched.SchedulerDriver, err string) { - log.Printf("Receiving an error: %s", err) -} diff --git a/schedulers/proactiveclusterwidecappingranked.go b/schedulers/proactiveclusterwidecappingranked.go index 42f5c78..4e9aa82 100644 --- a/schedulers/proactiveclusterwidecappingranked.go +++ b/schedulers/proactiveclusterwidecappingranked.go @@ -26,6 +26,7 @@ import ( "strings" "sync" "time" + "os" ) // Decides if to taken an offer or not @@ -40,6 +41,7 @@ func (_ *ProactiveClusterwideCapRanked) takeOffer(offer *mesos.Offer, task def.T // electronScheduler implements the Scheduler interface type ProactiveClusterwideCapRanked struct { + base // Type embedded to inherit common functions tasksCreated int tasksRunning int tasks []def.Task @@ -69,10 +71,18 @@ type ProactiveClusterwideCapRanked struct { // Controls when to shutdown pcp logging. PCPLog chan struct{} + + schedTrace *log.Logger } // New electron scheduler. -func NewProactiveClusterwideCapRanked(tasks []def.Task, ignoreWatts bool) *ProactiveClusterwideCapRanked { +func NewProactiveClusterwideCapRanked(tasks []def.Task, ignoreWatts bool, schedTracePrefix string) *ProactiveClusterwideCapRanked { + + logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") + if err != nil { + log.Fatal(err) + } + s := &ProactiveClusterwideCapRanked{ tasks: tasks, ignoreWatts: ignoreWatts, @@ -89,6 +99,7 @@ func NewProactiveClusterwideCapRanked(tasks []def.Task, ignoreWatts bool) *Proac recapTicker: time.NewTicker(20 * time.Second), isCapping: false, isRecapping: false, + schedTrace: log.New(logFile, "", log.LstdFlags), } return s } @@ -151,17 +162,6 @@ func (s *ProactiveClusterwideCapRanked) newTask(offer *mesos.Offer, task def.Tas } } -func (s *ProactiveClusterwideCapRanked) Registered( - _ sched.SchedulerDriver, - frameworkID *mesos.FrameworkID, - masterInfo *mesos.MasterInfo) { - log.Printf("Framework %s registered with master %s", frameworkID, masterInfo) -} - -func (s *ProactiveClusterwideCapRanked) Reregistered(_ sched.SchedulerDriver, masterInfo *mesos.MasterInfo) { - log.Printf("Framework re-registered with master %s", masterInfo) -} - func (s *ProactiveClusterwideCapRanked) Disconnected(sched.SchedulerDriver) { // Need to stop the capping process. s.ticker.Stop() @@ -299,7 +299,8 @@ func (s *ProactiveClusterwideCapRanked) ResourceOffers(driver sched.SchedulerDri */ taken := false - for i, task := range s.tasks { + for i := 0; i < len(s.tasks); i++ { + task := s.tasks[i] // Don't take offer if it doesn't match our task's host requirement. if !strings.HasPrefix(*offer.Hostname, task.Host) { continue @@ -325,9 +326,11 @@ func (s *ProactiveClusterwideCapRanked) ResourceOffers(driver sched.SchedulerDri log.Println("Failed to determine the new cluster wide cap: ", err) } log.Printf("Starting on [%s]\n", offer.GetHostname()) - to_schedule := []*mesos.TaskInfo{s.newTask(offer, task)} + taskToSchedule := s.newTask(offer, task) + to_schedule := []*mesos.TaskInfo{taskToSchedule} driver.LaunchTasks([]*mesos.OfferID{offer.Id}, to_schedule, defaultFilter) log.Printf("Inst: %d", *task.Instances) + s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) *task.Instances-- if *task.Instances <= 0 { // All instances of the task have been scheduled. Need to remove it from the list of tasks to schedule. @@ -408,27 +411,3 @@ func (s *ProactiveClusterwideCapRanked) StatusUpdate(driver sched.SchedulerDrive log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) } -func (s *ProactiveClusterwideCapRanked) FrameworkMessage(driver sched.SchedulerDriver, - executorID *mesos.ExecutorID, - slaveID *mesos.SlaveID, - message string) { - - log.Println("Getting a framework message: ", message) - log.Printf("Received a framework message from some unknown source: %s", *executorID.Value) -} - -func (s *ProactiveClusterwideCapRanked) OfferRescinded(_ sched.SchedulerDriver, offerID *mesos.OfferID) { - log.Printf("Offer %s rescinded", offerID) -} - -func (s *ProactiveClusterwideCapRanked) SlaveLost(_ sched.SchedulerDriver, slaveID *mesos.SlaveID) { - log.Printf("Slave %s lost", slaveID) -} - -func (s *ProactiveClusterwideCapRanked) ExecutorLost(_ sched.SchedulerDriver, executorID *mesos.ExecutorID, slaveID *mesos.SlaveID, status int) { - log.Printf("Executor %s on slave %s was lost", executorID, slaveID) -} - -func (s *ProactiveClusterwideCapRanked) Error(_ sched.SchedulerDriver, err string) { - log.Printf("Receiving an error: %s", err) -} From 15c4f04d75eddc30af899e1b15aca862972d7282 Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Sat, 14 Jan 2017 19:57:01 -0500 Subject: [PATCH 06/11] changed the scheduler to ffswClassMapWatts --- scheduler.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/scheduler.go b/scheduler.go index 4a45608..25b796f 100644 --- a/scheduler.go +++ b/scheduler.go @@ -58,7 +58,7 @@ func main() { startTime := time.Now().Format("20060102150405") logPrefix := *pcplogPrefix + "_" + startTime - scheduler := schedulers.NewBPSWClassMapWatts(tasks, *ignoreWatts, logPrefix) + scheduler := schedulers.NewFirstFitSortedWattsClassMapWatts(tasks, *ignoreWatts, logPrefix) driver, err := sched.NewMesosSchedulerDriver(sched.DriverConfig{ Master: *master, Framework: &mesos.FrameworkInfo{ @@ -72,8 +72,8 @@ func main() { return } - //go pcp.Start(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix) - go pcp.StartLogAndDynamicCap(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix, *hiThreshold, *loThreshold) + go pcp.Start(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix) + //go pcp.StartLogAndDynamicCap(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix, *hiThreshold, *loThreshold) time.Sleep(1 * time.Second) // Take a second between starting PCP log and continuing // Attempt to handle signint to not leave pmdumptext running From 62f199773a86324f7e8bf8f9d54ef509e359845c Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Sat, 14 Jan 2017 20:04:37 -0500 Subject: [PATCH 07/11] formatted the files --- schedulers/binpacksortedwatts.go | 2 +- schedulers/bpMaxMin.go | 4 +--- schedulers/bpMaxMinProacCC.go | 2 +- schedulers/bpswClassMapWattsProacCC.go | 8 ++++---- schedulers/firstfit.go | 2 +- schedulers/firstfitsortedwatts.go | 3 +-- schedulers/firstfitwattsonly.go | 2 +- schedulers/proactiveclusterwidecappingfcfs.go | 5 ++--- schedulers/proactiveclusterwidecappingranked.go | 5 ++--- 9 files changed, 14 insertions(+), 19 deletions(-) diff --git a/schedulers/binpacksortedwatts.go b/schedulers/binpacksortedwatts.go index f8c43ef..fdcc82a 100644 --- a/schedulers/binpacksortedwatts.go +++ b/schedulers/binpacksortedwatts.go @@ -8,10 +8,10 @@ import ( "github.com/mesos/mesos-go/mesosutil" sched "github.com/mesos/mesos-go/scheduler" "log" + "os" "sort" "strings" "time" - "os" ) // Decides if to take an offer or not diff --git a/schedulers/bpMaxMin.go b/schedulers/bpMaxMin.go index f5f0c5c..9221476 100644 --- a/schedulers/bpMaxMin.go +++ b/schedulers/bpMaxMin.go @@ -124,7 +124,6 @@ func (s *BPMaxMinWatts) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskIn } } - // Determine if the remaining space inside of the offer is enough for this // the task we need to create. If it is, create a TaskInfo and return it. func (s *BPMaxMinWatts) CheckFit(i int, @@ -194,7 +193,7 @@ func (s *BPMaxMinWatts) ResourceOffers(driver sched.SchedulerDriver, offers []*m // Attempt to schedule a single instance of the heaviest workload available first // Start from the back until one fits - for i:= len(s.tasks)-1; i >= 0; i-- { + for i := len(s.tasks) - 1; i >= 0; i-- { task := s.tasks[i] // Check host if it exists @@ -239,7 +238,6 @@ func (s *BPMaxMinWatts) ResourceOffers(driver sched.SchedulerDriver, offers []*m } } - if offerTaken { log.Printf("Starting on [%s]\n", offer.GetHostname()) driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) diff --git a/schedulers/bpMaxMinProacCC.go b/schedulers/bpMaxMinProacCC.go index bda367d..39e96fc 100644 --- a/schedulers/bpMaxMinProacCC.go +++ b/schedulers/bpMaxMinProacCC.go @@ -155,7 +155,7 @@ func (s *BPMaxMinProacCC) newTask(offer *mesos.Offer, task def.Task) *mesos.Task } // go routine to cap the entire cluster in regular intervals of time. -var bpMaxMinProacCCCapValue = 0.0 // initial value to indicate that we haven't capped the cluster yet. +var bpMaxMinProacCCCapValue = 0.0 // initial value to indicate that we haven't capped the cluster yet. var bpMaxMinProacCCNewCapValue = 0.0 // newly computed cap value func (s *BPMaxMinProacCC) startCapping() { go func() { diff --git a/schedulers/bpswClassMapWattsProacCC.go b/schedulers/bpswClassMapWattsProacCC.go index ddcadc1..19eb393 100644 --- a/schedulers/bpswClassMapWattsProacCC.go +++ b/schedulers/bpswClassMapWattsProacCC.go @@ -165,7 +165,7 @@ func (s *BPSWClassMapWattsProacCC) Disconnected(sched.SchedulerDriver) { } // go routine to cap the entire cluster in regular intervals of time. -var bpswClassMapWattsProacCCCapValue = 0.0 // initial value to indicate that we haven't capped the cluster yet. +var bpswClassMapWattsProacCCCapValue = 0.0 // initial value to indicate that we haven't capped the cluster yet. var bpswClassMapWattsProacCCNewCapValue = 0.0 // newly computed cap value func (s *BPSWClassMapWattsProacCC) startCapping() { go func() { @@ -208,11 +208,11 @@ func (s *BPSWClassMapWattsProacCC) startRecapping() { if s.isRecapping && bpswClassMapWattsProacCCRecapValue > 0.0 { for _, host := range constants.Hosts { // Rounding capValue to the nearest int - if err := rapl.Cap(host, "rapl", int(math.Floor(bpswClassMapWattsProacCCRecapValue +0.5))); err != nil { + if err := rapl.Cap(host, "rapl", int(math.Floor(bpswClassMapWattsProacCCRecapValue+0.5))); err != nil { log.Println(err) } } - log.Printf("Recapping the cluster to %d", int(math.Floor(bpswClassMapWattsProacCCRecapValue +0.5))) + log.Printf("Recapping the cluster to %d", int(math.Floor(bpswClassMapWattsProacCCRecapValue+0.5))) } // Setting recapping to false s.isRecapping = false @@ -383,7 +383,7 @@ func (s *BPSWClassMapWattsProacCC) StatusUpdate(driver sched.SchedulerDriver, st tempCap, err := s.capper.CleverRecap(s.totalPower, s.taskMonitor, *status.TaskId.Value) if err == nil { // If new determined cap value is different from the current recap value, then we need to recap - if int(math.Floor(tempCap+0.5)) != int(math.Floor(bpswClassMapWattsProacCCRecapValue +0.5)) { + if int(math.Floor(tempCap+0.5)) != int(math.Floor(bpswClassMapWattsProacCCRecapValue+0.5)) { bpswClassMapWattsProacCCRecapValue = tempCap bpswClassMapWattsProacCCMutex.Lock() s.isRecapping = true diff --git a/schedulers/firstfit.go b/schedulers/firstfit.go index d7819de..4eaecdd 100644 --- a/schedulers/firstfit.go +++ b/schedulers/firstfit.go @@ -8,9 +8,9 @@ import ( "github.com/mesos/mesos-go/mesosutil" sched "github.com/mesos/mesos-go/scheduler" "log" + "os" "strings" "time" - "os" ) // Decides if to take an offer or not diff --git a/schedulers/firstfitsortedwatts.go b/schedulers/firstfitsortedwatts.go index fbd740c..940ef90 100644 --- a/schedulers/firstfitsortedwatts.go +++ b/schedulers/firstfitsortedwatts.go @@ -8,10 +8,10 @@ import ( "github.com/mesos/mesos-go/mesosutil" sched "github.com/mesos/mesos-go/scheduler" "log" + "os" "sort" "strings" "time" - "os" ) // Decides if to take an offer or not @@ -216,4 +216,3 @@ func (s *FirstFitSortedWatts) StatusUpdate(driver sched.SchedulerDriver, status } log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) } - diff --git a/schedulers/firstfitwattsonly.go b/schedulers/firstfitwattsonly.go index 798a5f7..c23727f 100644 --- a/schedulers/firstfitwattsonly.go +++ b/schedulers/firstfitwattsonly.go @@ -8,9 +8,9 @@ import ( "github.com/mesos/mesos-go/mesosutil" sched "github.com/mesos/mesos-go/scheduler" "log" + "os" "strings" "time" - "os" ) // Decides if to take an offer or not diff --git a/schedulers/proactiveclusterwidecappingfcfs.go b/schedulers/proactiveclusterwidecappingfcfs.go index 8456c7c..d89390b 100644 --- a/schedulers/proactiveclusterwidecappingfcfs.go +++ b/schedulers/proactiveclusterwidecappingfcfs.go @@ -12,10 +12,10 @@ import ( sched "github.com/mesos/mesos-go/scheduler" "log" "math" + "os" "strings" "sync" "time" - "os" ) // Decides if to take an offer or not @@ -30,7 +30,7 @@ func (_ *ProactiveClusterwideCapFCFS) takeOffer(offer *mesos.Offer, task def.Tas // electronScheduler implements the Scheduler interface. type ProactiveClusterwideCapFCFS struct { - base // Type embedded to inherit common functions + base // Type embedded to inherit common functions tasksCreated int tasksRunning int tasks []def.Task @@ -386,4 +386,3 @@ func (s *ProactiveClusterwideCapFCFS) StatusUpdate(driver sched.SchedulerDriver, } log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) } - diff --git a/schedulers/proactiveclusterwidecappingranked.go b/schedulers/proactiveclusterwidecappingranked.go index 4e9aa82..f4c3484 100644 --- a/schedulers/proactiveclusterwidecappingranked.go +++ b/schedulers/proactiveclusterwidecappingranked.go @@ -22,11 +22,11 @@ import ( sched "github.com/mesos/mesos-go/scheduler" "log" "math" + "os" "sort" "strings" "sync" "time" - "os" ) // Decides if to taken an offer or not @@ -41,7 +41,7 @@ func (_ *ProactiveClusterwideCapRanked) takeOffer(offer *mesos.Offer, task def.T // electronScheduler implements the Scheduler interface type ProactiveClusterwideCapRanked struct { - base // Type embedded to inherit common functions + base // Type embedded to inherit common functions tasksCreated int tasksRunning int tasks []def.Task @@ -410,4 +410,3 @@ func (s *ProactiveClusterwideCapRanked) StatusUpdate(driver sched.SchedulerDrive } log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) } - From 7d3403d250457aa6e4c73d5705a4a6f79a4af31a Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Sat, 14 Jan 2017 20:09:49 -0500 Subject: [PATCH 08/11] removed TODO for retrofitting schedulers to log scheduling trace. --- README.md | 1 - 1 file changed, 1 deletion(-) diff --git a/README.md b/README.md index 5fd1f54..f50fbc7 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,6 @@ To Do: * Write test code for each scheduler (This should be after the design change) * Some of the constants in constants/constants.go can vary based on the environment. Possible to setup the constants at runtime based on the environment? - * Retrofit schedulers for scheduling tracing **Requires [Performance Co-Pilot](http://pcp.io/) tool pmdumptext to be installed on the From 8c6ad36b5e38c0567105e4dc15f87766b8ac904c Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Sun, 15 Jan 2017 15:23:57 -0500 Subject: [PATCH 09/11] changed name of StartLogAndDynamicCap(...) to StartPCPLogAndExtremaDynamicCap(...).] --- pcp/loganddynamiccap.go | 2 +- scheduler.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pcp/loganddynamiccap.go b/pcp/loganddynamiccap.go index a9a3f1f..c6ae8f5 100644 --- a/pcp/loganddynamiccap.go +++ b/pcp/loganddynamiccap.go @@ -57,7 +57,7 @@ func meanCluster(history *ring.Ring) float64 { return (total / count) } -func StartLogAndDynamicCap(quit chan struct{}, logging *bool, prefix string, hiThreshold, loThreshold float64) { +func StartPCPLogAndExtremaDynamicCap(quit chan struct{}, logging *bool, prefix string, hiThreshold, loThreshold float64) { const pcpCommand string = "pmdumptext -m -l -f '' -t 1.0 -d , -c config" cmd := exec.Command("sh", "-c", pcpCommand) cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} diff --git a/scheduler.go b/scheduler.go index 25b796f..0041939 100644 --- a/scheduler.go +++ b/scheduler.go @@ -58,7 +58,7 @@ func main() { startTime := time.Now().Format("20060102150405") logPrefix := *pcplogPrefix + "_" + startTime - scheduler := schedulers.NewFirstFitSortedWattsClassMapWatts(tasks, *ignoreWatts, logPrefix) + scheduler := schedulers.NewFirstFitSortedWattsReducedWAR(tasks, *ignoreWatts, logPrefix) driver, err := sched.NewMesosSchedulerDriver(sched.DriverConfig{ Master: *master, Framework: &mesos.FrameworkInfo{ @@ -72,8 +72,8 @@ func main() { return } - go pcp.Start(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix) - //go pcp.StartLogAndDynamicCap(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix, *hiThreshold, *loThreshold) + //go pcp.Start(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix) + go pcp.StartPCPLogAndExtremaDynamicCap(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix, *hiThreshold, *loThreshold) time.Sleep(1 * time.Second) // Take a second between starting PCP log and continuing // Attempt to handle signint to not leave pmdumptext running From a28acfcf5078411249309c1a5d8343bdaeff559f Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Sun, 15 Jan 2017 19:48:41 -0500 Subject: [PATCH 10/11] added TODO for to fix log for declining offer, where we need to mention the correct reason for declining the offer --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index f50fbc7..c5700bc 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,8 @@ To Do: * Write test code for each scheduler (This should be after the design change) * Some of the constants in constants/constants.go can vary based on the environment. Possible to setup the constants at runtime based on the environment? + * Log fix for declining offer -- different reason when insufficient resources as compared to when there are no + longer any tasks to schedule. **Requires [Performance Co-Pilot](http://pcp.io/) tool pmdumptext to be installed on the From fa17ae82e1cd1b5b7b50d94c8ef1bed95597fd2f Mon Sep 17 00:00:00 2001 From: Pradyumna Kaushik Date: Sun, 15 Jan 2017 21:19:01 -0500 Subject: [PATCH 11/11] Added TODO for a centralised logFile that can filtered by an identifier. --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index c5700bc..95edb6b 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ To Do: Possible to setup the constants at runtime based on the environment? * Log fix for declining offer -- different reason when insufficient resources as compared to when there are no longer any tasks to schedule. + * Have a centralised logFile that can be filtered by identifier. All electron logs should go into this file. **Requires [Performance Co-Pilot](http://pcp.io/) tool pmdumptext to be installed on the