diff --git a/README.md b/README.md index 95edb6b..769efaa 100644 --- a/README.md +++ b/README.md @@ -8,12 +8,14 @@ To Do: * Add ability to use constraints * Running average calculations https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average * Make parameters corresponding to each scheduler configurable (possible to have a config template for each scheduler?) + * TODO : Adding type of scheduler to be used, to be picked from a config file, along with it's configurable parameters. * Write test code for each scheduler (This should be after the design change) * Some of the constants in constants/constants.go can vary based on the environment. Possible to setup the constants at runtime based on the environment? * Log fix for declining offer -- different reason when insufficient resources as compared to when there are no longer any tasks to schedule. * Have a centralised logFile that can be filtered by identifier. All electron logs should go into this file. + * Make ClassMapWatts to commandLine arguments so Electron can be run with ClassMapWatts enabled/disabled. **Requires [Performance Co-Pilot](http://pcp.io/) tool pmdumptext to be installed on the diff --git a/constants/constants.go b/constants/constants.go index bcd051b..01cfc8a 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -6,6 +6,8 @@ Constants that are used across scripts 5. window_size = number of tasks to consider for computation of the dynamic cap. Also, exposing functions to update or initialize some of the constants. + +TODO: Clean this up and use Mesos Attributes instead. */ package constants @@ -14,6 +16,24 @@ var Hosts = []string{"stratos-001.cs.binghamton.edu", "stratos-002.cs.binghamton "stratos-005.cs.binghamton.edu", "stratos-006.cs.binghamton.edu", "stratos-007.cs.binghamton.edu", "stratos-008.cs.binghamton.edu"} +// Classification of the nodes in the cluster based on their power consumption. +var PowerClasses = map[string]map[string]bool{ + "ClassA": map[string]bool{ + "stratos-005.cs.binghamton.edu": true, + "stratos-006.cs.binghamton.edu": true, + }, + "ClassB": map[string]bool{ + "stratos-007.cs.binghamton.edu": true, + "stratos-008.cs.binghamton.edu": true, + }, + "ClassC": map[string]bool{ + "stratos-001.cs.binghamton.edu": true, + "stratos-002.cs.binghamton.edu": true, + "stratos-003.cs.binghamton.edu": true, + "stratos-004.cs.binghamton.edu": true, + }, +} + // Add a new host to the slice of hosts. func AddNewHost(newHost string) bool { // Validation @@ -68,7 +88,7 @@ func UpdateCapMargin(newCapMargin float64) bool { var StarvationFactor = PowerThreshold / CapMargin // Window size for running average -var WindowSize = 20 +var ConsiderationWindowSize = 20 // Update the window size. func UpdateWindowSize(newWindowSize int) bool { @@ -76,7 +96,7 @@ func UpdateWindowSize(newWindowSize int) bool { if newWindowSize == 0 { return false } else { - WindowSize = newWindowSize + ConsiderationWindowSize = newWindowSize return true } } diff --git a/pcp/proactiveclusterwidecappers.go b/powerCapping/proactiveclusterwidecappers.go similarity index 97% rename from pcp/proactiveclusterwidecappers.go rename to powerCapping/proactiveclusterwidecappers.go index ae90b61..b3cc84a 100644 --- a/pcp/proactiveclusterwidecappers.go +++ b/powerCapping/proactiveclusterwidecappers.go @@ -1,9 +1,9 @@ /* Cluster wide dynamic capping -This is not a scheduler but a scheduling scheme that schedulers can use. +This is a capping strategy that can be used with schedulers to improve the power consumption. */ -package pcp +package powerCapping import ( "bitbucket.org/sunybingcloud/electron/constants" @@ -251,7 +251,7 @@ func (capper ClusterwideCapper) FCFSDeterminedCap(totalPower map[string]float64, return 100, errors.New("Invalid argument: totalPower") } else { // Need to calculate the running average - runningAverage := runAvg.Calc(taskWrapper{task: *newTask}, constants.WindowSize) + runningAverage := runAvg.Calc(taskWrapper{task: *newTask}, constants.ConsiderationWindowSize) // For each node, calculate the percentage of the running average to the total power. ratios := make(map[string]float64) for host, tpower := range totalPower { @@ -271,5 +271,5 @@ func (capper ClusterwideCapper) FCFSDeterminedCap(totalPower map[string]float64, // Stringer for an instance of clusterwideCapper func (capper ClusterwideCapper) String() string { - return "Cluster Capper -- Proactively cap the entire cluster." + return "Cluster-wide Capper -- Proactively cap the entire cluster." } diff --git a/scheduler.go b/scheduler.go index 0041939..a6b11de 100644 --- a/scheduler.go +++ b/scheduler.go @@ -58,7 +58,7 @@ func main() { startTime := time.Now().Format("20060102150405") logPrefix := *pcplogPrefix + "_" + startTime - scheduler := schedulers.NewFirstFitSortedWattsReducedWAR(tasks, *ignoreWatts, logPrefix) + scheduler := schedulers.NewBinPackSortedWatts(tasks, *ignoreWatts, logPrefix) driver, err := sched.NewMesosSchedulerDriver(sched.DriverConfig{ Master: *master, Framework: &mesos.FrameworkInfo{ @@ -96,7 +96,7 @@ func main() { // Signals we have scheduled every task we have select { case <-scheduler.Shutdown: - // case <-time.After(shutdownTimeout): + //case <-time.After(shutdownTimeout): } // All tasks have finished @@ -104,7 +104,7 @@ func main() { case <-scheduler.Done: close(scheduler.PCPLog) time.Sleep(5 * time.Second) //Wait for PCP to log a few more seconds - // case <-time.After(shutdownTimeout): + //case <-time.After(shutdownTimeout): } // Done shutting down diff --git a/schedulers/README.md b/schedulers/README.md index 8fb8dcf..275798b 100644 --- a/schedulers/README.md +++ b/schedulers/README.md @@ -8,12 +8,20 @@ To Do: * Separate the capping strategies from the scheduling algorithms and make it possible to use any capping strategy with any scheduler. * Make newTask(...) variadic where the newTaskClass argument can either be given or not. If not give, then pick task.Watts as the watts attribute, else pick task.ClassToWatts[newTaskClass]. * Retrofit pcp/proactiveclusterwidecappers.go to include the power capping go routines and to cap only when necessary. + * Create a package that would contain routines to perform various logging and move helpers.coLocated(...) into that. + * Retrofit schedulers to be able to run either using ClassMapWatts enabled or disabled. Scheduling Algorithms: * First Fit * First Fit with sorted watts * Bin-packing with sorted watts - * FCFS Proactive Cluster-wide Capping - * Ranked Proactive Cluster-wide Capping - * Piston Capping -- Works when scheduler is run with WAR + * ClassMapWatts -- Bin-packing and First Fit that now use Watts per power class. + * Top Heavy -- Hybrid scheduler that packs small tasks (less power intensive) using Bin-packing and spreads large tasks (power intensive) using First Fit. + * Bottom Heavy -- Hybrid scheduler that packs large tasks (power intensive) using Bin-packing and spreads small tasks (less power intensive) using First Fit. + + Capping Strategies + + * Extrema Dynamic Capping + * Proactive Cluster-wide Capping + * Piston Capping diff --git a/schedulers/binPackSortedWattsSortedOffers.go b/schedulers/binPackSortedWattsSortedOffers.go new file mode 100644 index 0000000..7f73bb3 --- /dev/null +++ b/schedulers/binPackSortedWattsSortedOffers.go @@ -0,0 +1,232 @@ +package schedulers + +import ( + "bitbucket.org/sunybingcloud/electron/def" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" + "fmt" + "github.com/golang/protobuf/proto" + mesos "github.com/mesos/mesos-go/mesosproto" + "github.com/mesos/mesos-go/mesosutil" + sched "github.com/mesos/mesos-go/scheduler" + "log" + "os" + "sort" + "time" +) + +// Decides if to take an offer or not +func (s *BinPackSortedWattsSortedOffers) takeOffer(offer *mesos.Offer, totalCPU, totalRAM, + totalWatts float64, task def.Task) bool { + + offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) + + //TODO: Insert watts calculation here instead of taking them as a parameter + // Does the task fit + if (s.ignoreWatts || (offerWatts >= (totalWatts + task.Watts))) && + (offerCPU >= (totalCPU + task.CPU)) && + (offerRAM >= (totalRAM + task.RAM)) { + return true + } + return false +} + +type BinPackSortedWattsSortedOffers struct { + base // Type embedded to inherit common functions + tasksCreated int + tasksRunning int + tasks []def.Task + metrics map[string]def.Metric + running map[string]map[string]bool + ignoreWatts bool + + // First set of PCP values are garbage values, signal to logger to start recording when we're + // about to schedule a new task + RecordPCP bool + + // This channel is closed when the program receives an interrupt, + // signalling that the program should shut down. + Shutdown chan struct{} + // This channel is closed after shutdown is closed, and only when all + // outstanding tasks have been cleaned up + Done chan struct{} + + // Controls when to shutdown pcp logging + PCPLog chan struct{} + + schedTrace *log.Logger +} + +// New electron scheduler +func NewBinPackSortedWattsSortedOffers(tasks []def.Task, ignoreWatts bool, schedTracePrefix string) *BinPackSortedWattsSortedOffers { + sort.Sort(def.WattsSorter(tasks)) + + logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") + if err != nil { + log.Fatal(err) + } + + s := &BinPackSortedWattsSortedOffers{ + tasks: tasks, + ignoreWatts: ignoreWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + RecordPCP: false, + schedTrace: log.New(logFile, "", log.LstdFlags), + } + return s +} + +func (s *BinPackSortedWattsSortedOffers) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { + taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) + s.tasksCreated++ + + if !s.RecordPCP { + // Turn on logging + s.RecordPCP = true + time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts + } + + // If this is our first time running into this Agent + if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok { + s.running[offer.GetSlaveId().GoString()] = make(map[string]bool) + } + + // Add task to list of tasks running on node + s.running[offer.GetSlaveId().GoString()][taskName] = true + + resources := []*mesos.Resource{ + mesosutil.NewScalarResource("cpus", task.CPU), + mesosutil.NewScalarResource("mem", task.RAM), + } + + if !s.ignoreWatts { + resources = append(resources, mesosutil.NewScalarResource("watts", task.Watts)) + } + + return &mesos.TaskInfo{ + Name: proto.String(taskName), + TaskId: &mesos.TaskID{ + Value: proto.String("electron-" + taskName), + }, + SlaveId: offer.SlaveId, + Resources: resources, + Command: &mesos.CommandInfo{ + Value: proto.String(task.CMD), + }, + Container: &mesos.ContainerInfo{ + Type: mesos.ContainerInfo_DOCKER.Enum(), + Docker: &mesos.ContainerInfo_DockerInfo{ + Image: proto.String(task.Image), + Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated + }, + }, + } +} + +func (s *BinPackSortedWattsSortedOffers) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { + log.Printf("Received %d resource offers", len(offers)) + + // Sorting the offers + sort.Sort(offerUtils.OffersSorter(offers)) + + // Printing the sorted offers and the corresponding CPU resource availability + log.Println("Sorted Offers:") + for i := 0; i < len(offers); i++ { + offer := offers[i] + offerCPU, _, _ := offerUtils.OfferAgg(offer) + log.Printf("Offer[%s].CPU = %f\n", offer.GetHostname(), offerCPU) + } + + for _, offer := range offers { + select { + case <-s.Shutdown: + log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) + + log.Println("Number of tasks still running: ", s.tasksRunning) + continue + default: + } + + tasks := []*mesos.TaskInfo{} + + offerTaken := false + totalWatts := 0.0 + totalCPU := 0.0 + totalRAM := 0.0 + for i := 0; i < len(s.tasks); i++ { + task := s.tasks[i] + + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { + continue + } + + for *task.Instances > 0 { + // Does the task fit + if s.takeOffer(offer, totalCPU, totalRAM, totalWatts, task) { + + offerTaken = true + totalWatts += task.Watts + totalCPU += task.CPU + totalRAM += task.RAM + log.Println("Co-Located with: ") + coLocated(s.running[offer.GetSlaveId().GoString()]) + taskToSchedule := s.newTask(offer, task) + tasks = append(tasks, taskToSchedule) + + fmt.Println("Inst: ", *task.Instances) + s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) + *task.Instances-- + + if *task.Instances <= 0 { + // All instances of task have been scheduled, remove it + s.tasks = append(s.tasks[:i], s.tasks[i+1:]...) + + if len(s.tasks) <= 0 { + log.Println("Done scheduling all tasks") + close(s.Shutdown) + } + } + } else { + break // Continue on to next offer + } + } + } + + if offerTaken { + log.Printf("Starting on [%s]\n", offer.GetHostname()) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) + } else { + + // If there was no match for the task + fmt.Println("There is not enough resources to launch a task:") + cpus, mem, watts := offerUtils.OfferAgg(offer) + + log.Printf("\n", cpus, mem, watts) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) + } + } +} + +func (s *BinPackSortedWattsSortedOffers) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { + log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) + + if *status.State == mesos.TaskState_TASK_RUNNING { + s.tasksRunning++ + } else if IsTerminal(status.State) { + delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) + s.tasksRunning-- + if s.tasksRunning == 0 { + select { + case <-s.Shutdown: + close(s.Done) + default: + } + } + } + log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) +} diff --git a/schedulers/binpackedpistoncapping.go b/schedulers/binpackedpistoncapping.go index 2ed96f4..2fce0b9 100644 --- a/schedulers/binpackedpistoncapping.go +++ b/schedulers/binpackedpistoncapping.go @@ -4,6 +4,8 @@ import ( "bitbucket.org/sunybingcloud/electron/constants" "bitbucket.org/sunybingcloud/electron/def" "bitbucket.org/sunybingcloud/electron/rapl" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" "errors" "fmt" "github.com/golang/protobuf/proto" @@ -13,7 +15,6 @@ import ( "log" "math" "os" - "strings" "sync" "time" ) @@ -217,7 +218,7 @@ func (s *BinPackedPistonCapper) ResourceOffers(driver sched.SchedulerDriver, off // retrieving the total power for each host in the offers for _, offer := range offers { if _, ok := s.totalPower[*offer.Hostname]; !ok { - _, _, offer_watts := OfferAgg(offer) + _, _, offer_watts := offerUtils.OfferAgg(offer) s.totalPower[*offer.Hostname] = offer_watts } } @@ -238,7 +239,7 @@ func (s *BinPackedPistonCapper) ResourceOffers(driver sched.SchedulerDriver, off select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue @@ -246,8 +247,8 @@ func (s *BinPackedPistonCapper) ResourceOffers(driver sched.SchedulerDriver, off } fitTasks := []*mesos.TaskInfo{} - offerCPU, offerRAM, offerWatts := OfferAgg(offer) - taken := false + offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) + offerTaken := false totalWatts := 0.0 totalCPU := 0.0 totalRAM := 0.0 @@ -256,13 +257,8 @@ func (s *BinPackedPistonCapper) ResourceOffers(driver sched.SchedulerDriver, off partialLoad := 0.0 for i := 0; i < len(s.tasks); i++ { task := s.tasks[i] - // Check host if it exists - if task.Host != "" { - // Don't take offer if it doens't match our task's host requirement. - if !strings.HasPrefix(*offer.Hostname, task.Host) { - continue - } - } + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) {continue} for *task.Instances > 0 { // Does the task fit @@ -274,7 +270,7 @@ func (s *BinPackedPistonCapper) ResourceOffers(driver sched.SchedulerDriver, off s.startCapping() } - taken = true + offerTaken = true totalWatts += task.Watts totalCPU += task.CPU totalRAM += task.RAM @@ -303,20 +299,20 @@ func (s *BinPackedPistonCapper) ResourceOffers(driver sched.SchedulerDriver, off } } - if taken { + if offerTaken { // Updating the cap value for offer.Hostname bpPistonMutex.Lock() bpPistonCapValues[*offer.Hostname] += partialLoad bpPistonMutex.Unlock() log.Printf("Starting on [%s]\n", offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, fitTasks, defaultFilter) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, fitTasks, mesosUtils.DefaultFilter) } else { // If there was no match for task log.Println("There is not enough resources to launch task: ") - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } } diff --git a/schedulers/binpacksortedwatts.go b/schedulers/binpacksortedwatts.go index fdcc82a..87ee69b 100644 --- a/schedulers/binpacksortedwatts.go +++ b/schedulers/binpacksortedwatts.go @@ -2,6 +2,8 @@ package schedulers import ( "bitbucket.org/sunybingcloud/electron/def" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" "fmt" "github.com/golang/protobuf/proto" mesos "github.com/mesos/mesos-go/mesosproto" @@ -10,21 +12,19 @@ import ( "log" "os" "sort" - "strings" "time" ) // Decides if to take an offer or not -func (*BinPackSortedWatts) takeOffer(offer *mesos.Offer, task def.Task) bool { - - cpus, mem, watts := OfferAgg(offer) +func (s *BinPackSortedWatts) takeOffer(offer *mesos.Offer, totalCPU, totalRAM, totalWatts float64, task def.Task) bool { + offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) //TODO: Insert watts calculation here instead of taking them as a parameter - - if cpus >= task.CPU && mem >= task.RAM && watts >= task.Watts { + if (s.ignoreWatts || (offerWatts >= (totalWatts + task.Watts))) && + (offerCPU >= (totalCPU + task.CPU)) && + (offerRAM >= (totalRAM + task.RAM)) { return true } - return false } @@ -130,7 +130,7 @@ func (s *BinPackSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offers select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue @@ -139,30 +139,23 @@ func (s *BinPackSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offers tasks := []*mesos.TaskInfo{} - offer_cpu, offer_ram, offer_watts := OfferAgg(offer) - - taken := false + offerTaken := false totalWatts := 0.0 totalCPU := 0.0 totalRAM := 0.0 for i := 0; i < len(s.tasks); i++ { task := s.tasks[i] - // Check host if it exists - if task.Host != "" { - // Don't take offer if it doesn't match our task's host requirement - if !strings.HasPrefix(*offer.Hostname, task.Host) { - continue - } + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { + continue } for *task.Instances > 0 { // Does the task fit - if (s.ignoreWatts || offer_watts >= (totalWatts+task.Watts)) && - (offer_cpu >= (totalCPU + task.CPU)) && - (offer_ram >= (totalRAM + task.RAM)) { + if s.takeOffer(offer, totalCPU, totalRAM, totalWatts, task) { - taken = true + offerTaken = true totalWatts += task.Watts totalCPU += task.CPU totalRAM += task.RAM @@ -190,17 +183,17 @@ func (s *BinPackSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offers } } - if taken { + if offerTaken { log.Printf("Starting on [%s]\n", offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) } else { // If there was no match for the task fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } } diff --git a/schedulers/bottomHeavy.go b/schedulers/bottomHeavy.go new file mode 100644 index 0000000..2379725 --- /dev/null +++ b/schedulers/bottomHeavy.go @@ -0,0 +1,340 @@ +package schedulers + +import ( + "bitbucket.org/sunybingcloud/electron/constants" + "bitbucket.org/sunybingcloud/electron/def" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" + "fmt" + "github.com/golang/protobuf/proto" + mesos "github.com/mesos/mesos-go/mesosproto" + "github.com/mesos/mesos-go/mesosutil" + sched "github.com/mesos/mesos-go/scheduler" + "log" + "math" + "os" + "sort" + "time" +) + +/* +Tasks are categorized into small and large tasks based on the watts requirement. +All the small tasks are packed into offers from agents belonging to power class C, using BinPacking. +All the large tasks are spread among the offers from agents belonging to power class A and power class B, using FirstFit. + +BinPacking has the most effect when co-scheduling of tasks is increased. Large tasks typically utilize more resources and hence, + co-scheduling them has a great impact on the total power utilization. +*/ + +func (s *BottomHeavy) takeOffer(offer *mesos.Offer, totalCPU, totalRAM, totalWatts, + wattsToConsider float64, task def.Task) bool { + offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) + + //TODO: Insert watts calculation here instead of taking them as a parameter + if (s.ignoreWatts || (offerWatts >= (totalWatts + wattsToConsider))) && + (offerCPU >= (totalCPU + task.CPU)) && + (offerRAM >= (totalRAM + task.RAM)) { + return true + } + return false + +} + +// electronScheduler implements the Scheduler interface +type BottomHeavy struct { + base // Type embedded to inherit common functions + tasksCreated int + tasksRunning int + tasks []def.Task + metrics map[string]def.Metric + running map[string]map[string]bool + ignoreWatts bool + smallTasks, largeTasks []def.Task + + // First set of PCP values are garbage values, signal to logger to start recording when we're + // about to schedule a new task + RecordPCP bool + + // This channel is closed when the program receives an interrupt, + // signalling that the program should shut down. + Shutdown chan struct{} + // This channel is closed after shutdown is closed, and only when all + // outstanding tasks have been cleaned up + Done chan struct{} + + // Controls when to shutdown pcp logging + PCPLog chan struct{} + + schedTrace *log.Logger +} + +// New electron scheduler +func NewBottomHeavy(tasks []def.Task, ignoreWatts bool, schedTracePrefix string) *BottomHeavy { + sort.Sort(def.WattsSorter(tasks)) + + logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") + if err != nil { + log.Fatal(err) + } + + // Separating small tasks from large tasks. + // Classification done based on MMPU watts requirements. + mid := int(math.Floor((float64(len(tasks)) / 2.0) + 0.5)) + s := &BottomHeavy{ + smallTasks: tasks[:mid], + largeTasks: tasks[mid+1:], + ignoreWatts: ignoreWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + RecordPCP: false, + schedTrace: log.New(logFile, "", log.LstdFlags), + } + return s +} + +func (s *BottomHeavy) newTask(offer *mesos.Offer, task def.Task, newTaskClass string) *mesos.TaskInfo { + taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) + s.tasksCreated++ + + if !s.RecordPCP { + // Turn on logging + s.RecordPCP = true + time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts + } + + // If this is our first time running into this Agent + if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok { + s.running[offer.GetSlaveId().GoString()] = make(map[string]bool) + } + + // Add task to list of tasks running on node + s.running[offer.GetSlaveId().GoString()][taskName] = true + + resources := []*mesos.Resource{ + mesosutil.NewScalarResource("cpus", task.CPU), + mesosutil.NewScalarResource("mem", task.RAM), + } + + if !s.ignoreWatts { + resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[newTaskClass])) + } + + return &mesos.TaskInfo{ + Name: proto.String(taskName), + TaskId: &mesos.TaskID{ + Value: proto.String("electron-" + taskName), + }, + SlaveId: offer.SlaveId, + Resources: resources, + Command: &mesos.CommandInfo{ + Value: proto.String(task.CMD), + }, + Container: &mesos.ContainerInfo{ + Type: mesos.ContainerInfo_DOCKER.Enum(), + Docker: &mesos.ContainerInfo_DockerInfo{ + Image: proto.String(task.Image), + Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated + }, + }, + } +} + +// Shut down scheduler if no more tasks to schedule +func (s *BottomHeavy) shutDownIfNecessary() { + if len(s.smallTasks) <= 0 && len(s.largeTasks) <= 0 { + log.Println("Done scheduling all tasks") + close(s.Shutdown) + } +} + +// create TaskInfo and log scheduling trace +func (s *BottomHeavy) createTaskInfoAndLogSchedTrace(offer *mesos.Offer, + powerClass string, task def.Task) *mesos.TaskInfo { + log.Println("Co-Located with:") + coLocated(s.running[offer.GetSlaveId().GoString()]) + taskToSchedule := s.newTask(offer, task, powerClass) + + fmt.Println("Inst: ", *task.Instances) + s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) + *task.Instances-- + return taskToSchedule +} + +// Using BinPacking to pack small tasks into this offer. +func (s *BottomHeavy) pack(offers []*mesos.Offer, driver sched.SchedulerDriver) { + for _, offer := range offers { + select { + case <-s.Shutdown: + log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) + + log.Println("Number of tasks still running: ", s.tasksRunning) + continue + default: + } + + tasks := []*mesos.TaskInfo{} + totalWatts := 0.0 + totalCPU := 0.0 + totalRAM := 0.0 + offerTaken := false + for i := 0; i < len(s.largeTasks); i++ { + task := s.largeTasks[i] + + for *task.Instances > 0 { + powerClass := offerUtils.PowerClass(offer) + // Does the task fit + // OR lazy evaluation. If ignore watts is set to true, second statement won't + // be evaluated. + wattsToConsider := task.Watts + if !s.ignoreWatts { + wattsToConsider = task.ClassToWatts[powerClass] + } + if s.takeOffer(offer, totalCPU, totalRAM, totalWatts, wattsToConsider, task) { + offerTaken = true + totalWatts += wattsToConsider + totalCPU += task.CPU + totalRAM += task.RAM + tasks = append(tasks, s.createTaskInfoAndLogSchedTrace(offer, powerClass, task)) + + if *task.Instances <= 0 { + // All instances of task have been scheduled, remove it + s.largeTasks = append(s.largeTasks[:i], s.largeTasks[i+1:]...) + s.shutDownIfNecessary() + } + } else { + break // Continue on to next task + } + } + } + + if offerTaken { + log.Printf("Starting on [%s]\n", offer.GetHostname()) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) + } else { + // If there was no match for the task + fmt.Println("There is not enough resources to launch a task:") + cpus, mem, watts := offerUtils.OfferAgg(offer) + + log.Printf("\n", cpus, mem, watts) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) + } + } +} + +// Using first fit to spread large tasks into these offers. +func (s *BottomHeavy) spread(offers []*mesos.Offer, driver sched.SchedulerDriver) { + for _, offer := range offers { + select { + case <-s.Shutdown: + log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) + + log.Println("Number of tasks still running: ", s.tasksRunning) + continue + default: + } + + tasks := []*mesos.TaskInfo{} + offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) + taken := false + for i := 0; i < len(s.smallTasks); i++ { + task := s.smallTasks[i] + powerClass := offerUtils.PowerClass(offer) + + // Decision to take the offer or not + wattsToConsider := task.Watts + if !s.ignoreWatts { + wattsToConsider = task.ClassToWatts[powerClass] + } + if (s.ignoreWatts || (offerWatts >= wattsToConsider)) && + (offerCPU >= task.CPU) && (offerRAM >= task.RAM) { + taken = true + tasks = append(tasks, s.createTaskInfoAndLogSchedTrace(offer, powerClass, task)) + log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) + + if *task.Instances <= 0 { + // All instances of task have been scheduled, remove it + s.smallTasks = append(s.smallTasks[:i], s.smallTasks[i+1:]...) + s.shutDownIfNecessary() + } + break // Offer taken, move on + } + } + + if !taken { + // If there was no match for the task + fmt.Println("There is not enough resources to launch a task:") + cpus, mem, watts := offerUtils.OfferAgg(offer) + + log.Printf("\n", cpus, mem, watts) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) + } + } +} + +func (s *BottomHeavy) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { + log.Printf("Received %d resource offers", len(offers)) + + // We need to separate the offers into + // offers from ClassA and ClassB and offers from ClassC. + // Nodes in ClassA and ClassB will be packed with the large tasks. + // Small tasks will be spread out among the nodes in ClassC. + offersClassAB := []*mesos.Offer{} + offersClassC := []*mesos.Offer{} + + for _, offer := range offers { + select { + case <-s.Shutdown: + log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) + + log.Println("Number of tasks still running: ", s.tasksRunning) + continue + default: + } + + if constants.PowerClasses["ClassA"][*offer.Hostname] || + constants.PowerClasses["ClassB"][*offer.Hostname] { + offersClassAB = append(offersClassAB, offer) + } else if constants.PowerClasses["ClassC"][*offer.Hostname] { + offersClassC = append(offersClassC, offer) + } + } + + log.Println("Packing Large tasks into ClassAB offers:") + for _, o := range offersClassAB { + log.Println(*o.Hostname) + } + // Packing tasks into offersClassAB + s.pack(offersClassAB, driver) + + log.Println("Spreading Small tasks among ClassC offers:") + for _, o := range offersClassC { + log.Println(*o.Hostname) + } + // Spreading tasks among offersClassC + s.spread(offersClassC, driver) +} + +func (s *BottomHeavy) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { + log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) + + if *status.State == mesos.TaskState_TASK_RUNNING { + s.tasksRunning++ + } else if IsTerminal(status.State) { + delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) + s.tasksRunning-- + if s.tasksRunning == 0 { + select { + case <-s.Shutdown: + close(s.Done) + default: + } + } + } + log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) +} diff --git a/schedulers/bpMaxMin.go b/schedulers/bpMaxMin.go index 9221476..6daa6a6 100644 --- a/schedulers/bpMaxMin.go +++ b/schedulers/bpMaxMin.go @@ -2,6 +2,8 @@ package schedulers import ( "bitbucket.org/sunybingcloud/electron/def" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" "fmt" "github.com/golang/protobuf/proto" mesos "github.com/mesos/mesos-go/mesosproto" @@ -10,21 +12,19 @@ import ( "log" "os" "sort" - "strings" "time" ) // Decides if to take an offer or not -func (*BPMaxMinWatts) takeOffer(offer *mesos.Offer, task def.Task) bool { - - cpus, mem, watts := OfferAgg(offer) +func (s *BPMaxMinWatts) takeOffer(offer *mesos.Offer, totalCPU, totalRAM, totalWatts float64, task def.Task) bool { + offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) //TODO: Insert watts calculation here instead of taking them as a parameter - - if cpus >= task.CPU && mem >= task.RAM && watts >= task.Watts { + if (s.ignoreWatts || (offerWatts >= (totalWatts + task.Watts))) && + (offerCPU >= (totalCPU + task.CPU)) && + (offerRAM >= (totalRAM + task.RAM)) { return true } - return false } @@ -133,12 +133,8 @@ func (s *BPMaxMinWatts) CheckFit(i int, totalRAM *float64, totalWatts *float64) (bool, *mesos.TaskInfo) { - offerCPU, offerRAM, offerWatts := OfferAgg(offer) - // Does the task fit - if (s.ignoreWatts || (offerWatts >= (*totalWatts + task.Watts))) && - (offerCPU >= (*totalCPU + task.CPU)) && - (offerRAM >= (*totalRAM + task.RAM)) { + if s.takeOffer(offer, *totalCPU, *totalRAM, *totalWatts, task) { *totalWatts += task.Watts *totalCPU += task.CPU @@ -175,7 +171,7 @@ func (s *BPMaxMinWatts) ResourceOffers(driver sched.SchedulerDriver, offers []*m select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue @@ -196,12 +192,9 @@ func (s *BPMaxMinWatts) ResourceOffers(driver sched.SchedulerDriver, offers []*m for i := len(s.tasks) - 1; i >= 0; i-- { task := s.tasks[i] - // Check host if it exists - if task.Host != "" { - // Don't take offer if it doesn't match our task's host requirement - if !strings.HasPrefix(*offer.Hostname, task.Host) { - continue - } + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { + continue } // TODO: Fix this so index doesn't need to be passed @@ -217,12 +210,9 @@ func (s *BPMaxMinWatts) ResourceOffers(driver sched.SchedulerDriver, offers []*m // Pack the rest of the offer with the smallest tasks for i, task := range s.tasks { - // Check host if it exists - if task.Host != "" { - // Don't take offer if it doesn't match our task's host requirement - if !strings.HasPrefix(*offer.Hostname, task.Host) { - continue - } + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { + continue } for *task.Instances > 0 { @@ -240,15 +230,15 @@ func (s *BPMaxMinWatts) ResourceOffers(driver sched.SchedulerDriver, offers []*m if offerTaken { log.Printf("Starting on [%s]\n", offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) } else { // If there was no match for the task fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } } diff --git a/schedulers/bpMaxMinPistonCapping.go b/schedulers/bpMaxMinPistonCapping.go index edc27d8..9562751 100644 --- a/schedulers/bpMaxMinPistonCapping.go +++ b/schedulers/bpMaxMinPistonCapping.go @@ -4,6 +4,8 @@ import ( "bitbucket.org/sunybingcloud/electron/constants" "bitbucket.org/sunybingcloud/electron/def" "bitbucket.org/sunybingcloud/electron/rapl" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" "errors" "fmt" "github.com/golang/protobuf/proto" @@ -14,22 +16,21 @@ import ( "math" "os" "sort" - "strings" "sync" "time" ) // Decides if to take an offer or not -func (s *BPMaxMinPistonCapping) takeOffer(offer *mesos.Offer, task def.Task) bool { - - cpus, mem, watts := OfferAgg(offer) +func (s *BPMaxMinPistonCapping) takeOffer(offer *mesos.Offer, totalCPU, totalRAM, totalWatts float64, task def.Task) bool { + offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) //TODO: Insert watts calculation here instead of taking them as a parameter - - if cpus >= task.CPU && mem >= task.RAM && watts >= task.Watts { + // Does the task fit + if (s.ignoreWatts || (offerWatts >= (*totalWatts + task.Watts))) && + (offerCPU >= (*totalCPU + task.CPU)) && + (offerRAM >= (*totalRAM + task.RAM)) { return true } - return false } @@ -222,12 +223,8 @@ func (s *BPMaxMinPistonCapping) CheckFit(i int, totalWatts *float64, partialLoad *float64) (bool, *mesos.TaskInfo) { - offerCPU, offerRAM, offerWatts := OfferAgg(offer) - // Does the task fit - if (s.ignoreWatts || (offerWatts >= (*totalWatts + task.Watts))) && - (offerCPU >= (*totalCPU + task.CPU)) && - (offerRAM >= (*totalRAM + task.RAM)) { + if s.takeOffer(offer, *totalCPU, *totalRAM, *totalWatts, task) { // Start piston capping if haven't started yet if !s.isCapping { @@ -271,7 +268,7 @@ func (s *BPMaxMinPistonCapping) ResourceOffers(driver sched.SchedulerDriver, off select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue @@ -295,12 +292,9 @@ func (s *BPMaxMinPistonCapping) ResourceOffers(driver sched.SchedulerDriver, off for i := len(s.tasks) - 1; i >= 0; i-- { task := s.tasks[i] - // Check host if it exists - if task.Host != "" { - // Don't take offer if it doesn't match our task's host requirement - if !strings.HasPrefix(*offer.Hostname, task.Host) { - continue - } + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { + continue } // TODO: Fix this so index doesn't need to be passed @@ -316,12 +310,9 @@ func (s *BPMaxMinPistonCapping) ResourceOffers(driver sched.SchedulerDriver, off // Pack the rest of the offer with the smallest tasks for i, task := range s.tasks { - // Check host if it exists - if task.Host != "" { - // Don't take offer if it doesn't match our task's host requirement - if !strings.HasPrefix(*offer.Hostname, task.Host) { - continue - } + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { + continue } for *task.Instances > 0 { @@ -343,15 +334,15 @@ func (s *BPMaxMinPistonCapping) ResourceOffers(driver sched.SchedulerDriver, off bpMaxMinPistonCappingCapValues[*offer.Hostname] += partialLoad bpMaxMinPistonCappingMutex.Unlock() log.Printf("Starting on [%s]\n", offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) } else { // If there was no match for the task fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } } diff --git a/schedulers/bpMaxMinProacCC.go b/schedulers/bpMaxMinProacCC.go index 39e96fc..96c27ee 100644 --- a/schedulers/bpMaxMinProacCC.go +++ b/schedulers/bpMaxMinProacCC.go @@ -3,8 +3,10 @@ package schedulers import ( "bitbucket.org/sunybingcloud/electron/constants" "bitbucket.org/sunybingcloud/electron/def" - "bitbucket.org/sunybingcloud/electron/pcp" + powCap "bitbucket.org/sunybingcloud/electron/powerCapping" "bitbucket.org/sunybingcloud/electron/rapl" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" "fmt" "github.com/golang/protobuf/proto" mesos "github.com/mesos/mesos-go/mesosproto" @@ -14,21 +16,21 @@ import ( "math" "os" "sort" - "strings" "sync" "time" ) // Decides if to take an offer or not -func (s *BPMaxMinProacCC) takeOffer(offer *mesos.Offer, task def.Task) bool { - cpus, mem, watts := OfferAgg(offer) +func (s *BPMaxMinProacCC) takeOffer(offer *mesos.Offer, totalCPU, totalRAM, totalWatts float64, task def.Task) bool { + offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) //TODO: Insert watts calculation here instead of taking them as a parameter - - if cpus >= task.CPU && mem >= task.RAM && watts >= task.Watts { + // Does the task fit + if (s.ignoreWatts || (offerWatts >= (*totalWatts + task.Watts))) && + (offerCPU >= (*totalCPU + task.CPU)) && + (offerRAM >= (*totalRAM + task.RAM)) { return true } - return false } @@ -43,7 +45,7 @@ type BPMaxMinProacCC struct { availablePower map[string]float64 totalPower map[string]float64 ignoreWatts bool - capper *pcp.ClusterwideCapper + capper *powCap.ClusterwideCapper ticker *time.Ticker recapTicker *time.Ticker isCapping bool // indicate whether we are currently performing cluster-wide capping. @@ -86,7 +88,7 @@ func NewBPMaxMinProacCC(tasks []def.Task, ignoreWatts bool, schedTracePrefix str availablePower: make(map[string]float64), totalPower: make(map[string]float64), RecordPCP: false, - capper: pcp.GetClusterwideCapperInstance(), + capper: powCap.GetClusterwideCapperInstance(), ticker: time.NewTicker(10 * time.Second), recapTicker: time.NewTicker(20 * time.Second), isCapping: false, @@ -246,12 +248,8 @@ func (s *BPMaxMinProacCC) CheckFit(i int, totalRAM *float64, totalWatts *float64) (bool, *mesos.TaskInfo) { - offerCPU, offerRAM, offerWatts := OfferAgg(offer) - // Does the task fit - if (s.ignoreWatts || (offerWatts >= (*totalWatts + task.Watts))) && - (offerCPU >= (*totalCPU + task.CPU)) && - (offerRAM >= (*totalRAM + task.RAM)) { + if s.takeOffer(offer, *totalCPU, *totalRAM, *totalWatts, task) { // Capping the cluster if haven't yet started if !s.isCapping { @@ -308,7 +306,7 @@ func (s *BPMaxMinProacCC) ResourceOffers(driver sched.SchedulerDriver, offers [] // retrieving the available power for all the hosts in the offers. for _, offer := range offers { - _, _, offerWatts := OfferAgg(offer) + _, _, offerWatts := offerUtils.OfferAgg(offer) s.availablePower[*offer.Hostname] = offerWatts // setting total power if the first time if _, ok := s.totalPower[*offer.Hostname]; !ok { @@ -324,7 +322,7 @@ func (s *BPMaxMinProacCC) ResourceOffers(driver sched.SchedulerDriver, offers [] select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue @@ -345,12 +343,9 @@ func (s *BPMaxMinProacCC) ResourceOffers(driver sched.SchedulerDriver, offers [] for i := len(s.tasks) - 1; i >= 0; i-- { task := s.tasks[i] - // Check host if it exists - if task.Host != "" { - // Don't take offer if it doesn't match our task's host requirement - if !strings.HasPrefix(*offer.Hostname, task.Host) { - continue - } + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { + continue } // TODO: Fix this so index doesn't need to be passed @@ -366,12 +361,9 @@ func (s *BPMaxMinProacCC) ResourceOffers(driver sched.SchedulerDriver, offers [] // Pack the rest of the offer with the smallest tasks for i, task := range s.tasks { - // Check host if it exists - if task.Host != "" { - // Don't take offer if it doesn't match our task's host requirement - if !strings.HasPrefix(*offer.Hostname, task.Host) { - continue - } + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { + continue } for *task.Instances > 0 { @@ -389,15 +381,15 @@ func (s *BPMaxMinProacCC) ResourceOffers(driver sched.SchedulerDriver, offers [] if offerTaken { log.Printf("Starting on [%s]\n", offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) } else { // If there was no match for the task fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } } diff --git a/schedulers/bpswClassMapWatts.go b/schedulers/bpswClassMapWatts.go index 1196459..b6c3bc6 100644 --- a/schedulers/bpswClassMapWatts.go +++ b/schedulers/bpswClassMapWatts.go @@ -2,6 +2,8 @@ package schedulers import ( "bitbucket.org/sunybingcloud/electron/def" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" "fmt" "github.com/golang/protobuf/proto" mesos "github.com/mesos/mesos-go/mesosproto" @@ -10,21 +12,20 @@ import ( "log" "os" "sort" - "strings" "time" ) // Decides if to take an offer or not -func (*BPSWClassMapWatts) takeOffer(offer *mesos.Offer, task def.Task) bool { - - cpus, mem, watts := OfferAgg(offer) +func (s *BPSWClassMapWatts) takeOffer(offer *mesos.Offer, totalCPU, totalRAM, + totalWatts float64, powerClass string, task def.Task) bool { + offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) //TODO: Insert watts calculation here instead of taking them as a parameter - - if cpus >= task.CPU && mem >= task.RAM && watts >= task.Watts { + if (s.ignoreWatts || (offerWatts >= (totalWatts + task.ClassToWatts[powerClass]))) && + (offerCPU >= (totalCPU + task.CPU)) && + (offerRAM >= (totalRAM + task.RAM)) { return true } - return false } @@ -76,7 +77,7 @@ func NewBPSWClassMapWatts(tasks []def.Task, ignoreWatts bool, schedTracePrefix s return s } -func (s *BPSWClassMapWatts) newTask(offer *mesos.Offer, task def.Task, newTaskClass string) *mesos.TaskInfo { +func (s *BPSWClassMapWatts) newTask(offer *mesos.Offer, task def.Task, powerClass string) *mesos.TaskInfo { taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) s.tasksCreated++ @@ -100,7 +101,7 @@ func (s *BPSWClassMapWatts) newTask(offer *mesos.Offer, task def.Task, newTaskCl } if !s.ignoreWatts { - resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[newTaskClass])) + resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[powerClass])) } return &mesos.TaskInfo{ @@ -130,7 +131,7 @@ func (s *BPSWClassMapWatts) ResourceOffers(driver sched.SchedulerDriver, offers select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue @@ -139,45 +140,33 @@ func (s *BPSWClassMapWatts) ResourceOffers(driver sched.SchedulerDriver, offers tasks := []*mesos.TaskInfo{} - offerCPU, offerRAM, offerWatts := OfferAgg(offer) - - taken := false + offerTaken := false totalWatts := 0.0 totalCPU := 0.0 totalRAM := 0.0 for i := 0; i < len(s.tasks); i++ { task := s.tasks[i] - // Check host if it exists - if task.Host != "" { - // Don't take offer if it doesn't match our task's host requirement - if !strings.HasPrefix(*offer.Hostname, task.Host) { - continue - } + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { + continue } for *task.Instances > 0 { - var nodeClass string - for _, attr := range offer.GetAttributes() { - if attr.GetName() == "class" { - nodeClass = attr.GetText().GetValue() - } - } + powerClass := offerUtils.PowerClass(offer) // Does the task fit // OR lazy evaluation. If ignore watts is set to true, second statement won't // be evaluated. - if (s.ignoreWatts || (offerWatts >= (totalWatts + task.ClassToWatts[nodeClass]))) && - (offerCPU >= (totalCPU + task.CPU)) && - (offerRAM >= (totalRAM + task.RAM)) { + if s.takeOffer(offer, totalCPU, totalRAM, totalWatts, powerClass, task) { - fmt.Println("Watts being used: ", task.ClassToWatts[nodeClass]) - taken = true - totalWatts += task.ClassToWatts[nodeClass] + fmt.Println("Watts being used: ", task.ClassToWatts[powerClass]) + offerTaken = true + totalWatts += task.ClassToWatts[powerClass] totalCPU += task.CPU totalRAM += task.RAM log.Println("Co-Located with: ") coLocated(s.running[offer.GetSlaveId().GoString()]) - taskToSchedule := s.newTask(offer, task, nodeClass) + taskToSchedule := s.newTask(offer, task, powerClass) tasks = append(tasks, taskToSchedule) fmt.Println("Inst: ", *task.Instances) @@ -199,17 +188,17 @@ func (s *BPSWClassMapWatts) ResourceOffers(driver sched.SchedulerDriver, offers } } - if taken { + if offerTaken { log.Printf("Starting on [%s]\n", offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) } else { // If there was no match for the task fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } } diff --git a/schedulers/bpswClassMapWattsPistonCapping.go b/schedulers/bpswClassMapWattsPistonCapping.go index cae8cc3..412ace6 100644 --- a/schedulers/bpswClassMapWattsPistonCapping.go +++ b/schedulers/bpswClassMapWattsPistonCapping.go @@ -4,6 +4,8 @@ import ( "bitbucket.org/sunybingcloud/electron/constants" "bitbucket.org/sunybingcloud/electron/def" "bitbucket.org/sunybingcloud/electron/rapl" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" "errors" "fmt" "github.com/golang/protobuf/proto" @@ -14,21 +16,21 @@ import ( "math" "os" "sort" - "strings" "sync" "time" ) -// Decides if to take offer or not -func (s *BPSWClassMapWattsPistonCapping) takeOffer(offer *mesos.Offer, task def.Task) bool { - cpus, mem, watts := OfferAgg(offer) +// Decides if to take an offer or not +func (s *BPSWClassMapWattsPistonCapping) takeOffer(offer *mesos.Offer, totalCPU, totalRAM, + totalWatts float64, powerClass string, task def.Task) bool { + offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) //TODO: Insert watts calculation here instead of taking them as a parameter - - if cpus >= task.CPU && mem >= task.RAM && watts >= task.Watts { + if (s.ignoreWatts || (offerWatts >= (totalWatts + task.ClassToWatts[powerClass]))) && + (offerCPU >= (totalCPU + task.CPU)) && + (offerRAM >= (totalRAM + task.RAM)) { return true } - return false } @@ -89,7 +91,7 @@ func NewBPSWClassMapWattsPistonCapping(tasks []def.Task, ignoreWatts bool, sched return s } -func (s *BPSWClassMapWattsPistonCapping) newTask(offer *mesos.Offer, task def.Task, newTaskClass string) *mesos.TaskInfo { +func (s *BPSWClassMapWattsPistonCapping) newTask(offer *mesos.Offer, task def.Task, powerClass string) *mesos.TaskInfo { taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) s.tasksCreated++ @@ -123,7 +125,7 @@ func (s *BPSWClassMapWattsPistonCapping) newTask(offer *mesos.Offer, task def.Ta } if !s.ignoreWatts { - resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[newTaskClass])) + resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[powerClass])) } return &mesos.TaskInfo{ @@ -215,7 +217,7 @@ func (s *BPSWClassMapWattsPistonCapping) ResourceOffers(driver sched.SchedulerDr // retrieving the total power for each host in the offers. for _, offer := range offers { if _, ok := s.totalPower[*offer.Hostname]; !ok { - _, _, offerWatts := OfferAgg(offer) + _, _, offerWatts := offerUtils.OfferAgg(offer) s.totalPower[*offer.Hostname] = offerWatts } } @@ -229,7 +231,7 @@ func (s *BPSWClassMapWattsPistonCapping) ResourceOffers(driver sched.SchedulerDr select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue @@ -238,9 +240,7 @@ func (s *BPSWClassMapWattsPistonCapping) ResourceOffers(driver sched.SchedulerDr tasks := []*mesos.TaskInfo{} - offerCPU, offerRAM, offerWatts := OfferAgg(offer) - - taken := false + offerTaken := false totalWatts := 0.0 totalCPU := 0.0 totalRAM := 0.0 @@ -249,27 +249,17 @@ func (s *BPSWClassMapWattsPistonCapping) ResourceOffers(driver sched.SchedulerDr partialLoad := 0.0 for i := 0; i < len(s.tasks); i++ { task := s.tasks[i] - // Check host if it exists - if task.Host != "" { - // Don't take offer if it doesn't match our task's host requirement - if !strings.HasPrefix(*offer.Hostname, task.Host) { - continue - } + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { + continue } for *task.Instances > 0 { - var nodeClass string - for _, attr := range offer.GetAttributes() { - if attr.GetName() == "class" { - nodeClass = attr.GetText().GetValue() - } - } + powerClass := offerUtils.PowerClass(offer) // Does the task fit // OR lazy evaluation. If ignoreWatts is set to true, second statement won't // be evaluated - if (s.ignoreWatts || (offerWatts >= (totalWatts + task.ClassToWatts[nodeClass]))) && - (offerCPU >= (totalCPU + task.CPU)) && - (offerRAM >= (totalRAM + task.RAM)) { + if s.takeOffer(offer, totalCPU, totalRAM, totalWatts, powerClass, task) { // Start piston capping if haven't started yet if !s.isCapping { @@ -277,14 +267,14 @@ func (s *BPSWClassMapWattsPistonCapping) ResourceOffers(driver sched.SchedulerDr s.startCapping() } - fmt.Println("Watts being used: ", task.ClassToWatts[nodeClass]) - taken = true - totalWatts += task.ClassToWatts[nodeClass] + fmt.Println("Watts being used: ", task.ClassToWatts[powerClass]) + offerTaken = true + totalWatts += task.ClassToWatts[powerClass] totalCPU += task.CPU totalRAM += task.RAM log.Println("Co-Located with: ") coLocated(s.running[offer.GetSlaveId().GoString()]) - taskToSchedule := s.newTask(offer, task, nodeClass) + taskToSchedule := s.newTask(offer, task, powerClass) tasks = append(tasks, taskToSchedule) fmt.Println("Inst: ", *task.Instances) @@ -306,20 +296,20 @@ func (s *BPSWClassMapWattsPistonCapping) ResourceOffers(driver sched.SchedulerDr } } - if taken { + if offerTaken { // Updating the cap value for offer.Hostname bpswClassMapWattsPistonMutex.Lock() bpswClassMapWattsPistonCapValues[*offer.Hostname] += partialLoad bpswClassMapWattsPistonMutex.Unlock() log.Printf("Starting on [%s]\n", offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) } else { // If there was no match for task log.Println("There is not enough resources to launch task: ") - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } } diff --git a/schedulers/bpswClassMapWattsProacCC.go b/schedulers/bpswClassMapWattsProacCC.go index 19eb393..3d9f14d 100644 --- a/schedulers/bpswClassMapWattsProacCC.go +++ b/schedulers/bpswClassMapWattsProacCC.go @@ -3,8 +3,10 @@ package schedulers import ( "bitbucket.org/sunybingcloud/electron/constants" "bitbucket.org/sunybingcloud/electron/def" - "bitbucket.org/sunybingcloud/electron/pcp" + powCap "bitbucket.org/sunybingcloud/electron/powerCapping" "bitbucket.org/sunybingcloud/electron/rapl" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" "fmt" "github.com/golang/protobuf/proto" mesos "github.com/mesos/mesos-go/mesosproto" @@ -14,21 +16,21 @@ import ( "math" "os" "sort" - "strings" "sync" "time" ) // Decides if to take an offer or not -func (*BPSWClassMapWattsProacCC) takeOffer(offer *mesos.Offer, task def.Task) bool { - cpus, mem, watts := OfferAgg(offer) +func (s *BPSWClassMapWattsProacCC) takeOffer(offer *mesos.Offer, totalCPU, totalRAM, + totalWatts float64, powerClass string, task def.Task) bool { + offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) - // TODO: Insert watts calculation here instead of taking them as parameter - - if cpus >= task.CPU && mem >= task.RAM && watts >= task.Watts { + //TODO: Insert watts calculation here instead of taking them as a parameter + if (s.ignoreWatts || (offerWatts >= (totalWatts + task.ClassToWatts[powerClass]))) && + (offerCPU >= (totalCPU + task.CPU)) && + (offerRAM >= (totalRAM + task.RAM)) { return true } - return false } @@ -43,7 +45,7 @@ type BPSWClassMapWattsProacCC struct { availablePower map[string]float64 totalPower map[string]float64 ignoreWatts bool - capper *pcp.ClusterwideCapper + capper *powCap.ClusterwideCapper ticker *time.Ticker recapTicker *time.Ticker isCapping bool // indicate whether we are currently performing cluster-wide capping. @@ -86,7 +88,7 @@ func NewBPSWClassMapWattsProacCC(tasks []def.Task, ignoreWatts bool, schedTraceP availablePower: make(map[string]float64), totalPower: make(map[string]float64), RecordPCP: false, - capper: pcp.GetClusterwideCapperInstance(), + capper: powCap.GetClusterwideCapperInstance(), ticker: time.NewTicker(10 * time.Second), recapTicker: time.NewTicker(20 * time.Second), isCapping: false, @@ -99,7 +101,7 @@ func NewBPSWClassMapWattsProacCC(tasks []def.Task, ignoreWatts bool, schedTraceP // mutex var bpswClassMapWattsProacCCMutex sync.Mutex -func (s *BPSWClassMapWattsProacCC) newTask(offer *mesos.Offer, task def.Task, newTaskClass string) *mesos.TaskInfo { +func (s *BPSWClassMapWattsProacCC) newTask(offer *mesos.Offer, task def.Task, powerClass string) *mesos.TaskInfo { taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) s.tasksCreated++ @@ -131,7 +133,7 @@ func (s *BPSWClassMapWattsProacCC) newTask(offer *mesos.Offer, task def.Task, ne } if !s.ignoreWatts { - resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[newTaskClass])) + resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[powerClass])) } return &mesos.TaskInfo{ @@ -251,7 +253,7 @@ func (s *BPSWClassMapWattsProacCC) ResourceOffers(driver sched.SchedulerDriver, // retrieving the available power for all the hosts in the offers. for _, offer := range offers { - _, _, offerWatts := OfferAgg(offer) + _, _, offerWatts := offerUtils.OfferAgg(offer) s.availablePower[*offer.Hostname] = offerWatts // setting total power if the first time if _, ok := s.totalPower[*offer.Hostname]; !ok { @@ -267,7 +269,7 @@ func (s *BPSWClassMapWattsProacCC) ResourceOffers(driver sched.SchedulerDriver, select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue @@ -276,35 +278,23 @@ func (s *BPSWClassMapWattsProacCC) ResourceOffers(driver sched.SchedulerDriver, tasks := []*mesos.TaskInfo{} - offerCPU, offerRAM, offerWatts := OfferAgg(offer) - - taken := false + offerTaken := false totalWatts := 0.0 totalCPU := 0.0 totalRAM := 0.0 for i := 0; i < len(s.tasks); i++ { task := s.tasks[i] - // Check host if it exists - if task.Host != "" { - // Don't take offer it it doesn't match our task's host requirement. - if strings.HasPrefix(*offer.Hostname, task.Host) { - continue - } + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { + continue } for *task.Instances > 0 { - var nodeClass string - for _, attr := range offer.GetAttributes() { - if attr.GetName() == "class" { - nodeClass = attr.GetText().GetValue() - } - } + powerClass := offerUtils.PowerClass(offer) // Does the task fit // OR Lazy evaluation. If ignore watts is set to true, second statement won't // be evaluated. - if (s.ignoreWatts || (offerWatts >= (totalWatts + task.ClassToWatts[nodeClass]))) && - (offerCPU >= (totalCPU + task.CPU)) && - (offerRAM >= (totalRAM + task.RAM)) { + if s.takeOffer(offer, totalCPU, totalRAM, totalWatts, powerClass, task) { // Capping the cluster if haven't yet started if !s.isCapping { @@ -314,7 +304,7 @@ func (s *BPSWClassMapWattsProacCC) ResourceOffers(driver sched.SchedulerDriver, s.startCapping() } - fmt.Println("Watts being used: ", task.ClassToWatts[nodeClass]) + fmt.Println("Watts being used: ", task.ClassToWatts[powerClass]) tempCap, err := s.capper.FCFSDeterminedCap(s.totalPower, &task) if err == nil { bpswClassMapWattsProacCCMutex.Lock() @@ -324,13 +314,13 @@ func (s *BPSWClassMapWattsProacCC) ResourceOffers(driver sched.SchedulerDriver, log.Println("Failed to determine new cluster-wide cap:") log.Println(err) } - taken = true - totalWatts += task.ClassToWatts[nodeClass] + offerTaken = true + totalWatts += task.ClassToWatts[powerClass] totalCPU += task.CPU totalRAM += task.RAM log.Println("Co-Located with: ") coLocated(s.running[offer.GetSlaveId().GoString()]) - taskToSchedule := s.newTask(offer, task, nodeClass) + taskToSchedule := s.newTask(offer, task, powerClass) tasks = append(tasks, taskToSchedule) fmt.Println("Inst: ", *task.Instances) @@ -355,16 +345,16 @@ func (s *BPSWClassMapWattsProacCC) ResourceOffers(driver sched.SchedulerDriver, } } - if taken { + if offerTaken { log.Printf("Starting on [%s]\n", offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) } else { // If there was no match for the task fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } } diff --git a/schedulers/firstfit.go b/schedulers/firstfit.go index 4eaecdd..3f6f4fc 100644 --- a/schedulers/firstfit.go +++ b/schedulers/firstfit.go @@ -2,6 +2,8 @@ package schedulers import ( "bitbucket.org/sunybingcloud/electron/def" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" "fmt" "github.com/golang/protobuf/proto" mesos "github.com/mesos/mesos-go/mesosproto" @@ -9,14 +11,13 @@ import ( sched "github.com/mesos/mesos-go/scheduler" "log" "os" - "strings" "time" ) // Decides if to take an offer or not func (s *FirstFit) takeOffer(offer *mesos.Offer, task def.Task) bool { - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) //TODO: Insert watts calculation here instead of taking them as a parameter @@ -129,7 +130,7 @@ func (s *FirstFit) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos. select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue @@ -140,16 +141,13 @@ func (s *FirstFit) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos. // First fit strategy - taken := false + offerTaken := false for i := 0; i < len(s.tasks); i++ { task := s.tasks[i] - // Check host if it exists - if task.Host != "" { - // Don't take offer if it doesn't match our task's host requirement - if !strings.HasPrefix(*offer.Hostname, task.Host) { - continue - } + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { + continue } // Decision to take the offer or not @@ -162,9 +160,9 @@ func (s *FirstFit) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos. tasks = append(tasks, taskToSchedule) log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) - taken = true + offerTaken = true fmt.Println("Inst: ", *task.Instances) s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) @@ -185,12 +183,12 @@ func (s *FirstFit) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos. } // If there was no match for the task - if !taken { + if !offerTaken { fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } diff --git a/schedulers/firstfitSortedOffers.go b/schedulers/firstfitSortedOffers.go new file mode 100644 index 0000000..8db4147 --- /dev/null +++ b/schedulers/firstfitSortedOffers.go @@ -0,0 +1,226 @@ +package schedulers + +import ( + "bitbucket.org/sunybingcloud/electron/def" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" + "fmt" + "github.com/golang/protobuf/proto" + mesos "github.com/mesos/mesos-go/mesosproto" + "github.com/mesos/mesos-go/mesosutil" + sched "github.com/mesos/mesos-go/scheduler" + "log" + "os" + "sort" + "time" +) + +// Decides if to take an offer or not +func (s *FirstFitSortedOffers) takeOffer(offer *mesos.Offer, task def.Task) bool { + + cpus, mem, watts := offerUtils.OfferAgg(offer) + + //TODO: Insert watts calculation here instead of taking them as a parameter + + if cpus >= task.CPU && mem >= task.RAM && (s.ignoreWatts || watts >= task.Watts) { + return true + } + + return false +} + +// electronScheduler implements the Scheduler interface +type FirstFitSortedOffers struct { + base // Type embedded to inherit common functions + tasksCreated int + tasksRunning int + tasks []def.Task + metrics map[string]def.Metric + running map[string]map[string]bool + ignoreWatts bool + + // First set of PCP values are garbage values, signal to logger to start recording when we're + // about to schedule a new task + RecordPCP bool + + // This channel is closed when the program receives an interrupt, + // signalling that the program should shut down. + Shutdown chan struct{} + // This channel is closed after shutdown is closed, and only when all + // outstanding tasks have been cleaned up + Done chan struct{} + + // Controls when to shutdown pcp logging + PCPLog chan struct{} + + schedTrace *log.Logger +} + +// New electron scheduler +func NewFirstFitSortedOffers(tasks []def.Task, ignoreWatts bool, schedTracePrefix string) *FirstFitSortedOffers { + + logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") + if err != nil { + log.Fatal(err) + } + + s := &FirstFitSortedOffers{ + tasks: tasks, + ignoreWatts: ignoreWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + RecordPCP: false, + schedTrace: log.New(logFile, "", log.LstdFlags), + } + return s +} + +func (s *FirstFitSortedOffers) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { + taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) + s.tasksCreated++ + + if !s.RecordPCP { + // Turn on logging + s.RecordPCP = true + time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts + } + + // If this is our first time running into this Agent + if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok { + s.running[offer.GetSlaveId().GoString()] = make(map[string]bool) + } + + // Add task to list of tasks running on node + s.running[offer.GetSlaveId().GoString()][taskName] = true + + resources := []*mesos.Resource{ + mesosutil.NewScalarResource("cpus", task.CPU), + mesosutil.NewScalarResource("mem", task.RAM), + } + + if !s.ignoreWatts { + resources = append(resources, mesosutil.NewScalarResource("watts", task.Watts)) + } + + return &mesos.TaskInfo{ + Name: proto.String(taskName), + TaskId: &mesos.TaskID{ + Value: proto.String("electron-" + taskName), + }, + SlaveId: offer.SlaveId, + Resources: resources, + Command: &mesos.CommandInfo{ + Value: proto.String(task.CMD), + }, + Container: &mesos.ContainerInfo{ + Type: mesos.ContainerInfo_DOCKER.Enum(), + Docker: &mesos.ContainerInfo_DockerInfo{ + Image: proto.String(task.Image), + Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated + }, + }, + } +} + +func (s *FirstFitSortedOffers) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { + log.Printf("Received %d resource offers", len(offers)) + + // Sorting the offers + sort.Sort(offerUtils.OffersSorter(offers)) + + // Printing the sorted offers and the corresponding CPU resource availability + log.Println("Sorted Offers:") + for i := 0; i < len(offers); i++ { + offer := offers[i] + offerCPU, _, _ := offerUtils.OfferAgg(offer) + log.Printf("Offer[%s].CPU = %f\n", offer.GetHostname(), offerCPU) + } + + for _, offer := range offers { + select { + case <-s.Shutdown: + log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) + + log.Println("Number of tasks still running: ", s.tasksRunning) + continue + default: + } + + tasks := []*mesos.TaskInfo{} + + // First fit strategy + + offerTaken := false + for i := 0; i < len(s.tasks); i++ { + task := s.tasks[i] + + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { + continue + } + + // Decision to take the offer or not + if s.takeOffer(offer, task) { + + log.Println("Co-Located with: ") + coLocated(s.running[offer.GetSlaveId().GoString()]) + + taskToSchedule := s.newTask(offer, task) + tasks = append(tasks, taskToSchedule) + + log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) + + offerTaken = true + + fmt.Println("Inst: ", *task.Instances) + s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) + *task.Instances-- + + if *task.Instances <= 0 { + // All instances of task have been scheduled, remove it + s.tasks[i] = s.tasks[len(s.tasks)-1] + s.tasks = s.tasks[:len(s.tasks)-1] + + if len(s.tasks) <= 0 { + log.Println("Done scheduling all tasks") + close(s.Shutdown) + } + } + break // Offer taken, move on + } + } + + // If there was no match for the task + if !offerTaken { + fmt.Println("There is not enough resources to launch a task:") + cpus, mem, watts := offerUtils.OfferAgg(offer) + + log.Printf("\n", cpus, mem, watts) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) + } + + } +} + +func (s *FirstFitSortedOffers) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { + log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) + + if *status.State == mesos.TaskState_TASK_RUNNING { + s.tasksRunning++ + } else if IsTerminal(status.State) { + delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) + s.tasksRunning-- + if s.tasksRunning == 0 { + select { + case <-s.Shutdown: + close(s.Done) + default: + } + } + } + log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) +} diff --git a/schedulers/firstfitSortedWattsClassMapWatts.go b/schedulers/firstfitSortedWattsClassMapWatts.go index 4a03d89..e2559ea 100644 --- a/schedulers/firstfitSortedWattsClassMapWatts.go +++ b/schedulers/firstfitSortedWattsClassMapWatts.go @@ -2,6 +2,8 @@ package schedulers import ( "bitbucket.org/sunybingcloud/electron/def" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" "fmt" "github.com/golang/protobuf/proto" mesos "github.com/mesos/mesos-go/mesosproto" @@ -10,10 +12,22 @@ import ( "log" "os" "sort" - "strings" "time" ) +// Decides if to take an offer or not +func (s *FirstFitSortedWattsClassMapWatts) takeOffer(offer *mesos.Offer, powerClass string, task def.Task) bool { + offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) + + //TODO: Insert watts calculation here instead of taking them as a parameter + // Decision to take the offer or not + if (s.ignoreWatts || (offerWatts >= task.ClassToWatts[powerClass])) && + (offerCPU >= task.CPU) && (offerRAM >= task.RAM) { + return true + } + return false +} + // electron scheduler implements the Scheduler interface type FirstFitSortedWattsClassMapWatts struct { base // Type embedded to inherit common features. @@ -63,7 +77,7 @@ func NewFirstFitSortedWattsClassMapWatts(tasks []def.Task, ignoreWatts bool, sch return s } -func (s *FirstFitSortedWattsClassMapWatts) newTask(offer *mesos.Offer, task def.Task, newTaskClass string) *mesos.TaskInfo { +func (s *FirstFitSortedWattsClassMapWatts) newTask(offer *mesos.Offer, task def.Task, powerClass string) *mesos.TaskInfo { taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) s.tasksCreated++ @@ -87,7 +101,7 @@ func (s *FirstFitSortedWattsClassMapWatts) newTask(offer *mesos.Offer, task def. } if !s.ignoreWatts { - resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[newTaskClass])) + resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[powerClass])) } return &mesos.TaskInfo{ @@ -117,47 +131,37 @@ func (s *FirstFitSortedWattsClassMapWatts) ResourceOffers(driver sched.Scheduler select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue default: } - offerCPU, offerRAM, offerWatts := OfferAgg(offer) - // First fit strategy - taken := false + offerTaken := false for i := 0; i < len(s.tasks); i++ { task := s.tasks[i] - // Check host if it exists - if task.Host != "" { - // Don't take offer if it doens't match our task's host requirement. - if !strings.HasPrefix(*offer.Hostname, task.Host) { - continue - } + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { + continue } - // retrieving the node class from the offer - var nodeClass string - for _, attr := range offer.GetAttributes() { - if attr.GetName() == "class" { - nodeClass = attr.GetText().GetValue() - } - } + // retrieving the powerClass from the offer + powerClass := offerUtils.PowerClass(offer) // Decision to take the offer or not - if (s.ignoreWatts || (offerWatts >= task.ClassToWatts[nodeClass])) && - (offerCPU >= task.CPU) && (offerRAM >= task.RAM) { + if s.takeOffer(offer, powerClass, task) { + fmt.Println("Watts being used: ", task.ClassToWatts[powerClass]) log.Println("Co-Located with: ") coLocated(s.running[offer.GetSlaveId().GoString()]) - taskToSchedule := s.newTask(offer, task, nodeClass) + taskToSchedule := s.newTask(offer, task, powerClass) s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, []*mesos.TaskInfo{taskToSchedule}, defaultFilter) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, []*mesos.TaskInfo{taskToSchedule}, mesosUtils.DefaultFilter) - taken = true + offerTaken = true fmt.Println("Inst: ", *task.Instances) *task.Instances-- if *task.Instances <= 0 { @@ -174,12 +178,12 @@ func (s *FirstFitSortedWattsClassMapWatts) ResourceOffers(driver sched.Scheduler } // If there was no match for the task - if !taken { + if !offerTaken { fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } diff --git a/schedulers/firstfitSortedWattsClassMapWattsProacCC.go b/schedulers/firstfitSortedWattsClassMapWattsProacCC.go index 3cc9fb9..35c3d3b 100644 --- a/schedulers/firstfitSortedWattsClassMapWattsProacCC.go +++ b/schedulers/firstfitSortedWattsClassMapWattsProacCC.go @@ -3,8 +3,10 @@ package schedulers import ( "bitbucket.org/sunybingcloud/electron/constants" "bitbucket.org/sunybingcloud/electron/def" - "bitbucket.org/sunybingcloud/electron/pcp" + powCap "bitbucket.org/sunybingcloud/electron/powerCapping" "bitbucket.org/sunybingcloud/electron/rapl" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" "fmt" "github.com/golang/protobuf/proto" mesos "github.com/mesos/mesos-go/mesosproto" @@ -14,11 +16,23 @@ import ( "math" "os" "sort" - "strings" "sync" "time" ) +// Decides if to take an offer or not +func (s *FirstFitSortedWattsClassMapWattsProacCC) takeOffer(offer *mesos.Offer, powerClass string, task def.Task) bool { + offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) + + //TODO: Insert watts calculation here instead of taking them as a parameter + // Decision to take the offer or not + if (s.ignoreWatts || (offerWatts >= task.ClassToWatts[powerClass])) && + (offerCPU >= task.CPU) && (offerRAM >= task.RAM) { + return true + } + return false +} + // electron scheduler implements the Scheduler interface type FirstFitSortedWattsClassMapWattsProacCC struct { base // Type embedded to inherit common features. @@ -31,7 +45,7 @@ type FirstFitSortedWattsClassMapWattsProacCC struct { availablePower map[string]float64 totalPower map[string]float64 ignoreWatts bool - capper *pcp.ClusterwideCapper + capper *powCap.ClusterwideCapper ticker *time.Ticker recapTicker *time.Ticker isCapping bool // indicate whether we are currently performing cluster-wide capping. @@ -74,7 +88,7 @@ func NewFirstFitSortedWattsClassMapWattsProacCC(tasks []def.Task, ignoreWatts bo availablePower: make(map[string]float64), totalPower: make(map[string]float64), RecordPCP: false, - capper: pcp.GetClusterwideCapperInstance(), + capper: powCap.GetClusterwideCapperInstance(), ticker: time.NewTicker(10 * time.Second), recapTicker: time.NewTicker(20 * time.Second), isCapping: false, @@ -87,7 +101,7 @@ func NewFirstFitSortedWattsClassMapWattsProacCC(tasks []def.Task, ignoreWatts bo // mutex var ffswClassMapWattsProacCCMutex sync.Mutex -func (s *FirstFitSortedWattsClassMapWattsProacCC) newTask(offer *mesos.Offer, task def.Task, newTaskClass string) *mesos.TaskInfo { +func (s *FirstFitSortedWattsClassMapWattsProacCC) newTask(offer *mesos.Offer, task def.Task, powerClass string) *mesos.TaskInfo { taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) s.tasksCreated++ @@ -119,7 +133,7 @@ func (s *FirstFitSortedWattsClassMapWattsProacCC) newTask(offer *mesos.Offer, ta } if !s.ignoreWatts { - resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[newTaskClass])) + resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[powerClass])) } return &mesos.TaskInfo{ @@ -239,7 +253,7 @@ func (s *FirstFitSortedWattsClassMapWattsProacCC) ResourceOffers(driver sched.Sc // retrieving the available power for all the hosts in the offers. for _, offer := range offers { - _, _, offerWatts := OfferAgg(offer) + _, _, offerWatts := offerUtils.OfferAgg(offer) s.availablePower[*offer.Hostname] = offerWatts // setting total power if the first time if _, ok := s.totalPower[*offer.Hostname]; !ok { @@ -255,38 +269,27 @@ func (s *FirstFitSortedWattsClassMapWattsProacCC) ResourceOffers(driver sched.Sc select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue default: } - offerCPU, offerRAM, offerWatts := OfferAgg(offer) - // First fit strategy - taken := false + offerTaken := false for i := 0; i < len(s.tasks); i++ { task := s.tasks[i] - // Check host if it exists - if task.Host != "" { - // Don't take offer if it doens't match our task's host requirement. - if !strings.HasPrefix(*offer.Hostname, task.Host) { - continue - } + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { + continue } - // Retrieving the node class from the offer - var nodeClass string - for _, attr := range offer.GetAttributes() { - if attr.GetName() == "class" { - nodeClass = attr.GetText().GetValue() - } - } + // retrieving the powerClass for the offer + powerClass := offerUtils.PowerClass(offer) // Decision to take the offer or not - if (s.ignoreWatts || (offerWatts >= task.ClassToWatts[nodeClass])) && - (offerCPU >= task.CPU) && (offerRAM >= task.RAM) { + if s.takeOffer(offer, powerClass, task) { // Capping the cluster if haven't yet started if !s.isCapping { @@ -296,7 +299,7 @@ func (s *FirstFitSortedWattsClassMapWattsProacCC) ResourceOffers(driver sched.Sc s.startCapping() } - fmt.Println("Watts being used: ", task.ClassToWatts[nodeClass]) + fmt.Println("Watts being used: ", task.ClassToWatts[powerClass]) tempCap, err := s.capper.FCFSDeterminedCap(s.totalPower, &task) if err == nil { ffswClassMapWattsProacCCMutex.Lock() @@ -310,12 +313,12 @@ func (s *FirstFitSortedWattsClassMapWattsProacCC) ResourceOffers(driver sched.Sc log.Println("Co-Located with: ") coLocated(s.running[offer.GetSlaveId().GoString()]) - taskToSchedule := s.newTask(offer, task, nodeClass) + taskToSchedule := s.newTask(offer, task, powerClass) s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, []*mesos.TaskInfo{taskToSchedule}, defaultFilter) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, []*mesos.TaskInfo{taskToSchedule}, mesosUtils.DefaultFilter) - taken = true + offerTaken = true fmt.Println("Inst: ", *task.Instances) *task.Instances-- if *task.Instances <= 0 { @@ -335,12 +338,12 @@ func (s *FirstFitSortedWattsClassMapWattsProacCC) ResourceOffers(driver sched.Sc } // If there was no match for the task - if !taken { + if !offerTaken { fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } } diff --git a/schedulers/firstfitSortedWattsSortedOffers.go b/schedulers/firstfitSortedWattsSortedOffers.go new file mode 100644 index 0000000..8dd22d2 --- /dev/null +++ b/schedulers/firstfitSortedWattsSortedOffers.go @@ -0,0 +1,228 @@ +package schedulers + +import ( + "bitbucket.org/sunybingcloud/electron/def" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" + "fmt" + "github.com/golang/protobuf/proto" + mesos "github.com/mesos/mesos-go/mesosproto" + "github.com/mesos/mesos-go/mesosutil" + sched "github.com/mesos/mesos-go/scheduler" + "log" + "os" + "sort" + "time" +) + +// Decides if to take an offer or not +func (s *FirstFitSortedWattsSortedOffers) takeOffer(offer *mesos.Offer, task def.Task) bool { + + cpus, mem, watts := offerUtils.OfferAgg(offer) + + //TODO: Insert watts calculation here instead of taking them as a parameter + + if cpus >= task.CPU && mem >= task.RAM && (s.ignoreWatts || watts >= task.Watts) { + return true + } + + return false +} + +// electronScheduler implements the Scheduler interface +type FirstFitSortedWattsSortedOffers struct { + base // Type embedded to inherit common functions + tasksCreated int + tasksRunning int + tasks []def.Task + metrics map[string]def.Metric + running map[string]map[string]bool + ignoreWatts bool + + // First set of PCP values are garbage values, signal to logger to start recording when we're + // about to schedule a new task + RecordPCP bool + + // This channel is closed when the program receives an interrupt, + // signalling that the program should shut down. + Shutdown chan struct{} + // This channel is closed after shutdown is closed, and only when all + // outstanding tasks have been cleaned up + Done chan struct{} + + // Controls when to shutdown pcp logging + PCPLog chan struct{} + + schedTrace *log.Logger +} + +// New electron scheduler +func NewFirstFitSortedWattsSortedOffers(tasks []def.Task, ignoreWatts bool, schedTracePrefix string) *FirstFitSortedWattsSortedOffers { + + // Sorting the tasks in increasing order of watts requirement. + sort.Sort(def.WattsSorter(tasks)) + + logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") + if err != nil { + log.Fatal(err) + } + + s := &FirstFitSortedWattsSortedOffers{ + tasks: tasks, + ignoreWatts: ignoreWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + RecordPCP: false, + schedTrace: log.New(logFile, "", log.LstdFlags), + } + return s +} + +func (s *FirstFitSortedWattsSortedOffers) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { + taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) + s.tasksCreated++ + + if !s.RecordPCP { + // Turn on logging + s.RecordPCP = true + time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts + } + + // If this is our first time running into this Agent + if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok { + s.running[offer.GetSlaveId().GoString()] = make(map[string]bool) + } + + // Add task to list of tasks running on node + s.running[offer.GetSlaveId().GoString()][taskName] = true + + resources := []*mesos.Resource{ + mesosutil.NewScalarResource("cpus", task.CPU), + mesosutil.NewScalarResource("mem", task.RAM), + } + + if !s.ignoreWatts { + resources = append(resources, mesosutil.NewScalarResource("watts", task.Watts)) + } + + return &mesos.TaskInfo{ + Name: proto.String(taskName), + TaskId: &mesos.TaskID{ + Value: proto.String("electron-" + taskName), + }, + SlaveId: offer.SlaveId, + Resources: resources, + Command: &mesos.CommandInfo{ + Value: proto.String(task.CMD), + }, + Container: &mesos.ContainerInfo{ + Type: mesos.ContainerInfo_DOCKER.Enum(), + Docker: &mesos.ContainerInfo_DockerInfo{ + Image: proto.String(task.Image), + Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated + }, + }, + } +} + +func (s *FirstFitSortedWattsSortedOffers) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { + // Sorting the offers + sort.Sort(offerUtils.OffersSorter(offers)) + + // Printing the sorted offers and the corresponding CPU resource availability + log.Println("Sorted Offers:") + for i := 0; i < len(offers); i++ { + offer := offers[i] + offerCPU, _, _ := offerUtils.OfferAgg(offer) + log.Printf("Offer[%s].CPU = %f\n", offer.GetHostname(), offerCPU) + } + + log.Printf("Received %d resource offers", len(offers)) + + for _, offer := range offers { + select { + case <-s.Shutdown: + log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) + + log.Println("Number of tasks still running: ", s.tasksRunning) + continue + default: + } + + tasks := []*mesos.TaskInfo{} + + // First fit strategy + + offerTaken := false + for i := 0; i < len(s.tasks); i++ { + task := s.tasks[i] + + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { + continue + } + + // Decision to take the offer or not + if s.takeOffer(offer, task) { + + log.Println("Co-Located with: ") + coLocated(s.running[offer.GetSlaveId().GoString()]) + + taskToSchedule := s.newTask(offer, task) + tasks = append(tasks, taskToSchedule) + + log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) + + offerTaken = true + + fmt.Println("Inst: ", *task.Instances) + s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) + *task.Instances-- + + if *task.Instances <= 0 { + // All instances of task have been scheduled, remove it + s.tasks = append(s.tasks[:i], s.tasks[i+1:]...) + + if len(s.tasks) <= 0 { + log.Println("Done scheduling all tasks") + close(s.Shutdown) + } + } + break // Offer taken, move on + } + } + + // If there was no match for the task + if !offerTaken { + fmt.Println("There is not enough resources to launch a task:") + cpus, mem, watts := offerUtils.OfferAgg(offer) + + log.Printf("\n", cpus, mem, watts) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) + } + + } +} + +func (s *FirstFitSortedWattsSortedOffers) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { + log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) + + if *status.State == mesos.TaskState_TASK_RUNNING { + s.tasksRunning++ + } else if IsTerminal(status.State) { + delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) + s.tasksRunning-- + if s.tasksRunning == 0 { + select { + case <-s.Shutdown: + close(s.Done) + default: + } + } + } + log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) +} diff --git a/schedulers/firstfitsortedwatts.go b/schedulers/firstfitsortedwatts.go index 940ef90..4553bfc 100644 --- a/schedulers/firstfitsortedwatts.go +++ b/schedulers/firstfitsortedwatts.go @@ -2,6 +2,8 @@ package schedulers import ( "bitbucket.org/sunybingcloud/electron/def" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" "fmt" "github.com/golang/protobuf/proto" mesos "github.com/mesos/mesos-go/mesosproto" @@ -10,14 +12,13 @@ import ( "log" "os" "sort" - "strings" "time" ) // Decides if to take an offer or not func (s *FirstFitSortedWatts) takeOffer(offer *mesos.Offer, task def.Task) bool { - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) //TODO: Insert watts calculation here instead of taking them as a parameter @@ -132,7 +133,7 @@ func (s *FirstFitSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offer select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue @@ -143,16 +144,13 @@ func (s *FirstFitSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offer // First fit strategy - taken := false + offerTaken := false for i := 0; i < len(s.tasks); i++ { task := s.tasks[i] - // Check host if it exists - if task.Host != "" { - // Don't take offer if it doesn't match our task's host requirement - if !strings.HasPrefix(*offer.Hostname, task.Host) { - continue - } + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { + continue } // Decision to take the offer or not @@ -165,9 +163,9 @@ func (s *FirstFitSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offer tasks = append(tasks, taskToSchedule) log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) - taken = true + offerTaken = true fmt.Println("Inst: ", *task.Instances) s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) @@ -187,12 +185,12 @@ func (s *FirstFitSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offer } // If there was no match for the task - if !taken { + if !offerTaken { fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } diff --git a/schedulers/firstfitwattsonly.go b/schedulers/firstfitwattsonly.go index c23727f..2413dcf 100644 --- a/schedulers/firstfitwattsonly.go +++ b/schedulers/firstfitwattsonly.go @@ -2,6 +2,8 @@ package schedulers import ( "bitbucket.org/sunybingcloud/electron/def" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" "fmt" "github.com/golang/protobuf/proto" mesos "github.com/mesos/mesos-go/mesosproto" @@ -9,14 +11,13 @@ import ( sched "github.com/mesos/mesos-go/scheduler" "log" "os" - "strings" "time" ) // Decides if to take an offer or not func (*FirstFitWattsOnly) takeOffer(offer *mesos.Offer, task def.Task) bool { - _, _, watts := OfferAgg(offer) + _, _, watts := offerUtils.OfferAgg(offer) //TODO: Insert watts calculation here instead of taking them as a parameter @@ -123,7 +124,7 @@ func (s *FirstFitWattsOnly) ResourceOffers(driver sched.SchedulerDriver, offers select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue @@ -134,16 +135,13 @@ func (s *FirstFitWattsOnly) ResourceOffers(driver sched.SchedulerDriver, offers // First fit strategy - taken := false + offerTaken := false for i := 0; i < len(s.tasks); i++ { task := s.tasks[i] - // Check host if it exists - if task.Host != "" { - // Don't take offer if it doesn't match our task's host requirement - if !strings.HasPrefix(*offer.Hostname, task.Host) { - continue - } + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { + continue } // Decision to take the offer or not @@ -156,9 +154,9 @@ func (s *FirstFitWattsOnly) ResourceOffers(driver sched.SchedulerDriver, offers tasks = append(tasks, taskToSchedule) log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) - taken = true + offerTaken = true fmt.Println("Inst: ", *task.Instances) s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) @@ -179,12 +177,12 @@ func (s *FirstFitWattsOnly) ResourceOffers(driver sched.SchedulerDriver, offers } // If there was no match for the task - if !taken { + if !offerTaken { fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } diff --git a/schedulers/helpers.go b/schedulers/helpers.go index 2c6ffd2..1891808 100644 --- a/schedulers/helpers.go +++ b/schedulers/helpers.go @@ -2,33 +2,12 @@ package schedulers import ( "fmt" - "github.com/golang/protobuf/proto" - mesos "github.com/mesos/mesos-go/mesosproto" "log" + "bitbucket.org/sunybingcloud/electron/def" + mesos "github.com/mesos/mesos-go/mesosproto" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" ) -var ( - defaultFilter = &mesos.Filters{RefuseSeconds: proto.Float64(1)} - longFilter = &mesos.Filters{RefuseSeconds: proto.Float64(1000)} -) - -func OfferAgg(offer *mesos.Offer) (float64, float64, float64) { - var cpus, mem, watts float64 - - for _, resource := range offer.Resources { - switch resource.GetName() { - case "cpus": - cpus += *resource.GetScalar().Value - case "mem": - mem += *resource.GetScalar().Value - case "watts": - watts += *resource.GetScalar().Value - } - } - - return cpus, mem, watts -} - func coLocated(tasks map[string]bool) { for task := range tasks { @@ -37,3 +16,22 @@ func coLocated(tasks map[string]bool) { fmt.Println("---------------------") } + +/* + Determine the watts value to consider for each task. + + This value could either be task.Watts or task.ClassToWatts[] + If task.ClassToWatts is not present, then return task.Watts (this would be for workloads which don't have classMapWatts) +*/ +func wattsToConsider(task def.Task, classMapWatts bool, offer *mesos.Offer) float64 { + if classMapWatts { + // checking if ClassToWatts was present in the workload. + if task.ClassToWatts != nil { + return task.ClassToWatts[offerUtils.PowerClass(offer)] + } else { + return task.Watts + } + } else { + return task.Watts + } +} diff --git a/schedulers/proactiveclusterwidecappingfcfs.go b/schedulers/proactiveclusterwidecappingfcfs.go index d89390b..2643335 100644 --- a/schedulers/proactiveclusterwidecappingfcfs.go +++ b/schedulers/proactiveclusterwidecappingfcfs.go @@ -3,8 +3,10 @@ package schedulers import ( "bitbucket.org/sunybingcloud/electron/constants" "bitbucket.org/sunybingcloud/electron/def" - "bitbucket.org/sunybingcloud/electron/pcp" + powCap "bitbucket.org/sunybingcloud/electron/powerCapping" "bitbucket.org/sunybingcloud/electron/rapl" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" "fmt" "github.com/golang/protobuf/proto" mesos "github.com/mesos/mesos-go/mesosproto" @@ -13,16 +15,15 @@ import ( "log" "math" "os" - "strings" "sync" "time" ) // Decides if to take an offer or not -func (_ *ProactiveClusterwideCapFCFS) takeOffer(offer *mesos.Offer, task def.Task) bool { - offer_cpu, offer_mem, offer_watts := OfferAgg(offer) +func (s *ProactiveClusterwideCapFCFS) takeOffer(offer *mesos.Offer, task def.Task) bool { + offer_cpu, offer_mem, offer_watts := offerUtils.OfferAgg(offer) - if offer_cpu >= task.CPU && offer_mem >= task.RAM && offer_watts >= task.Watts { + if offer_cpu >= task.CPU && offer_mem >= task.RAM && (s.ignoreWatts || (offer_watts >= task.Watts)) { return true } return false @@ -40,7 +41,7 @@ type ProactiveClusterwideCapFCFS struct { availablePower map[string]float64 // available power for each node in the cluster. totalPower map[string]float64 // total power for each node in the cluster. ignoreWatts bool - capper *pcp.ClusterwideCapper + capper *powCap.ClusterwideCapper ticker *time.Ticker recapTicker *time.Ticker isCapping bool // indicate whether we are currently performing cluster wide capping. @@ -83,7 +84,7 @@ func NewProactiveClusterwideCapFCFS(tasks []def.Task, ignoreWatts bool, schedTra availablePower: make(map[string]float64), totalPower: make(map[string]float64), RecordPCP: false, - capper: pcp.GetClusterwideCapperInstance(), + capper: powCap.GetClusterwideCapperInstance(), ticker: time.NewTicker(10 * time.Second), recapTicker: time.NewTicker(20 * time.Second), isCapping: false, @@ -240,7 +241,7 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive // retrieving the available power for all the hosts in the offers. for _, offer := range offers { - _, _, offer_watts := OfferAgg(offer) + _, _, offer_watts := offerUtils.OfferAgg(offer) s.availablePower[*offer.Hostname] = offer_watts // setting total power if the first time. if _, ok := s.totalPower[*offer.Hostname]; !ok { @@ -256,7 +257,7 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue @@ -273,12 +274,12 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive Cluster wide capping is currently performed at regular intervals of time. */ - taken := false + offerTaken := false for i := 0; i < len(s.tasks); i++ { task := s.tasks[i] - // Don't take offer if it doesn't match our task's host requirement. - if !strings.HasPrefix(*offer.Hostname, task.Host) { + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { continue } @@ -291,7 +292,7 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive fcfsMutex.Unlock() s.startCapping() } - taken = true + offerTaken = true tempCap, err := s.capper.FCFSDeterminedCap(s.totalPower, &task) if err == nil { @@ -305,7 +306,7 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive log.Printf("Starting on [%s]\n", offer.GetHostname()) taskToSchedule := s.newTask(offer, task) toSchedule := []*mesos.TaskInfo{taskToSchedule} - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, toSchedule, defaultFilter) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, toSchedule, mesosUtils.DefaultFilter) log.Printf("Inst: %d", *task.Instances) s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) *task.Instances-- @@ -329,12 +330,12 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive } // If no task fit the offer, then declining the offer. - if !taken { + if !offerTaken { log.Printf("There is not enough resources to launch a task on Host: %s\n", offer.GetHostname()) - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } } diff --git a/schedulers/proactiveclusterwidecappingranked.go b/schedulers/proactiveclusterwidecappingranked.go index f4c3484..786b1ff 100644 --- a/schedulers/proactiveclusterwidecappingranked.go +++ b/schedulers/proactiveclusterwidecappingranked.go @@ -13,8 +13,10 @@ package schedulers import ( "bitbucket.org/sunybingcloud/electron/constants" "bitbucket.org/sunybingcloud/electron/def" - "bitbucket.org/sunybingcloud/electron/pcp" + powCap "bitbucket.org/sunybingcloud/electron/powerCapping" "bitbucket.org/sunybingcloud/electron/rapl" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" "fmt" "github.com/golang/protobuf/proto" mesos "github.com/mesos/mesos-go/mesosproto" @@ -24,16 +26,15 @@ import ( "math" "os" "sort" - "strings" "sync" "time" ) // Decides if to taken an offer or not -func (_ *ProactiveClusterwideCapRanked) takeOffer(offer *mesos.Offer, task def.Task) bool { - offer_cpu, offer_mem, offer_watts := OfferAgg(offer) +func (s *ProactiveClusterwideCapRanked) takeOffer(offer *mesos.Offer, task def.Task) bool { + offer_cpu, offer_mem, offer_watts := offerUtils.OfferAgg(offer) - if offer_cpu >= task.CPU && offer_mem >= task.RAM && offer_watts >= task.Watts { + if offer_cpu >= task.CPU && offer_mem >= task.RAM && (s.ignoreWatts || (offer_watts >= task.Watts)) { return true } return false @@ -51,7 +52,7 @@ type ProactiveClusterwideCapRanked struct { availablePower map[string]float64 // available power for each node in the cluster. totalPower map[string]float64 // total power for each node in the cluster. ignoreWatts bool - capper *pcp.ClusterwideCapper + capper *powCap.ClusterwideCapper ticker *time.Ticker recapTicker *time.Ticker isCapping bool // indicate whether we are currently performing cluster wide capping. @@ -94,7 +95,7 @@ func NewProactiveClusterwideCapRanked(tasks []def.Task, ignoreWatts bool, schedT availablePower: make(map[string]float64), totalPower: make(map[string]float64), RecordPCP: false, - capper: pcp.GetClusterwideCapperInstance(), + capper: powCap.GetClusterwideCapperInstance(), ticker: time.NewTicker(10 * time.Second), recapTicker: time.NewTicker(20 * time.Second), isCapping: false, @@ -251,7 +252,7 @@ func (s *ProactiveClusterwideCapRanked) ResourceOffers(driver sched.SchedulerDri // retrieving the available power for all the hosts in the offers. for _, offer := range offers { - _, _, offer_watts := OfferAgg(offer) + _, _, offer_watts := offerUtils.OfferAgg(offer) s.availablePower[*offer.Hostname] = offer_watts // setting total power if the first time. if _, ok := s.totalPower[*offer.Hostname]; !ok { @@ -277,7 +278,7 @@ func (s *ProactiveClusterwideCapRanked) ResourceOffers(driver sched.SchedulerDri select { case <-s.Shutdown: log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, longFilter) + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) log.Println("Number of tasks still running: ", s.tasksRunning) continue @@ -297,12 +298,12 @@ func (s *ProactiveClusterwideCapRanked) ResourceOffers(driver sched.SchedulerDri Cluster wide capping is currently performed at regular intervals of time. */ - taken := false + offerTaken := false for i := 0; i < len(s.tasks); i++ { task := s.tasks[i] - // Don't take offer if it doesn't match our task's host requirement. - if !strings.HasPrefix(*offer.Hostname, task.Host) { + // Don't take offer if it doesn't match our task's host requirement + if offerUtils.HostMismatch(*offer.Hostname, task.Host) { continue } @@ -315,7 +316,7 @@ func (s *ProactiveClusterwideCapRanked) ResourceOffers(driver sched.SchedulerDri rankedMutex.Unlock() s.startCapping() } - taken = true + offerTaken = true tempCap, err := s.capper.FCFSDeterminedCap(s.totalPower, &task) if err == nil { @@ -328,7 +329,7 @@ func (s *ProactiveClusterwideCapRanked) ResourceOffers(driver sched.SchedulerDri log.Printf("Starting on [%s]\n", offer.GetHostname()) taskToSchedule := s.newTask(offer, task) to_schedule := []*mesos.TaskInfo{taskToSchedule} - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, to_schedule, defaultFilter) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, to_schedule, mesosUtils.DefaultFilter) log.Printf("Inst: %d", *task.Instances) s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) *task.Instances-- @@ -352,12 +353,12 @@ func (s *ProactiveClusterwideCapRanked) ResourceOffers(driver sched.SchedulerDri } // If no tasks fit the offer, then declining the offer. - if !taken { + if !offerTaken { log.Printf("There is not enough resources to launch a task on Host: %s\n", offer.GetHostname()) - cpus, mem, watts := OfferAgg(offer) + cpus, mem, watts := offerUtils.OfferAgg(offer) log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, defaultFilter) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } } diff --git a/schedulers/topHeavy.go b/schedulers/topHeavy.go new file mode 100644 index 0000000..ab4fdd6 --- /dev/null +++ b/schedulers/topHeavy.go @@ -0,0 +1,329 @@ +package schedulers + +import ( + "bitbucket.org/sunybingcloud/electron/constants" + "bitbucket.org/sunybingcloud/electron/def" + "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" + "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" + "fmt" + "github.com/golang/protobuf/proto" + mesos "github.com/mesos/mesos-go/mesosproto" + "github.com/mesos/mesos-go/mesosutil" + sched "github.com/mesos/mesos-go/scheduler" + "log" + "math" + "os" + "sort" + "time" +) + +/* +Tasks are categorized into small and large tasks based on the watts requirement. +All the large tasks are packed into offers from agents belonging to power class A and power class B, using BinPacking. +All the small tasks are spread among the offers from agents belonging to power class C, using FirstFit. + +This was done to give a little more room for the large tasks (power intensive) for execution and reduce the possibility of +starvation of power intensive tasks. +*/ + +// electronScheduler implements the Scheduler interface +type TopHeavy struct { + base // Type embedded to inherit common functions + tasksCreated int + tasksRunning int + tasks []def.Task + metrics map[string]def.Metric + running map[string]map[string]bool + ignoreWatts bool + smallTasks, largeTasks []def.Task + + // First set of PCP values are garbage values, signal to logger to start recording when we're + // about to schedule a new task + RecordPCP bool + + // This channel is closed when the program receives an interrupt, + // signalling that the program should shut down. + Shutdown chan struct{} + // This channel is closed after shutdown is closed, and only when all + // outstanding tasks have been cleaned up + Done chan struct{} + + // Controls when to shutdown pcp logging + PCPLog chan struct{} + + schedTrace *log.Logger +} + +// New electron scheduler +func NewPackSmallSpreadBig(tasks []def.Task, ignoreWatts bool, schedTracePrefix string) *TopHeavy { + sort.Sort(def.WattsSorter(tasks)) + + logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") + if err != nil { + log.Fatal(err) + } + + // Separating small tasks from large tasks. + // Classification done based on MMPU watts requirements. + mid := int(math.Floor((float64(len(tasks)) / 2.0) + 0.5)) + s := &TopHeavy{ + smallTasks: tasks[:mid], + largeTasks: tasks[mid+1:], + ignoreWatts: ignoreWatts, + Shutdown: make(chan struct{}), + Done: make(chan struct{}), + PCPLog: make(chan struct{}), + running: make(map[string]map[string]bool), + RecordPCP: false, + schedTrace: log.New(logFile, "", log.LstdFlags), + } + return s +} + +func (s *TopHeavy) newTask(offer *mesos.Offer, task def.Task, newTaskClass string) *mesos.TaskInfo { + taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) + s.tasksCreated++ + + if !s.RecordPCP { + // Turn on logging + s.RecordPCP = true + time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts + } + + // If this is our first time running into this Agent + if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok { + s.running[offer.GetSlaveId().GoString()] = make(map[string]bool) + } + + // Add task to list of tasks running on node + s.running[offer.GetSlaveId().GoString()][taskName] = true + + resources := []*mesos.Resource{ + mesosutil.NewScalarResource("cpus", task.CPU), + mesosutil.NewScalarResource("mem", task.RAM), + } + + if !s.ignoreWatts { + resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[newTaskClass])) + } + + return &mesos.TaskInfo{ + Name: proto.String(taskName), + TaskId: &mesos.TaskID{ + Value: proto.String("electron-" + taskName), + }, + SlaveId: offer.SlaveId, + Resources: resources, + Command: &mesos.CommandInfo{ + Value: proto.String(task.CMD), + }, + Container: &mesos.ContainerInfo{ + Type: mesos.ContainerInfo_DOCKER.Enum(), + Docker: &mesos.ContainerInfo_DockerInfo{ + Image: proto.String(task.Image), + Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated + }, + }, + } +} + +// Shut down scheduler if no more tasks to schedule +func (s *TopHeavy) shutDownIfNecessary() { + if len(s.smallTasks) <= 0 && len(s.largeTasks) <= 0 { + log.Println("Done scheduling all tasks") + close(s.Shutdown) + } +} + +// create TaskInfo and log scheduling trace +func (s *TopHeavy) createTaskInfoAndLogSchedTrace(offer *mesos.Offer, + powerClass string, task def.Task) *mesos.TaskInfo { + log.Println("Co-Located with:") + coLocated(s.running[offer.GetSlaveId().GoString()]) + taskToSchedule := s.newTask(offer, task, powerClass) + + fmt.Println("Inst: ", *task.Instances) + s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) + *task.Instances-- + return taskToSchedule +} + +// Using BinPacking to pack small tasks into this offer. +func (s *TopHeavy) pack(offers []*mesos.Offer, driver sched.SchedulerDriver) { + for _, offer := range offers { + select { + case <-s.Shutdown: + log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) + + log.Println("Number of tasks still running: ", s.tasksRunning) + continue + default: + } + + tasks := []*mesos.TaskInfo{} + offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) + totalWatts := 0.0 + totalCPU := 0.0 + totalRAM := 0.0 + taken := false + for i := 0; i < len(s.smallTasks); i++ { + task := s.smallTasks[i] + + for *task.Instances > 0 { + powerClass := offerUtils.PowerClass(offer) + // Does the task fit + // OR lazy evaluation. If ignore watts is set to true, second statement won't + // be evaluated. + wattsToConsider := task.Watts + if !s.ignoreWatts { + wattsToConsider = task.ClassToWatts[powerClass] + } + if (s.ignoreWatts || (offerWatts >= (totalWatts + wattsToConsider))) && + (offerCPU >= (totalCPU + task.CPU)) && + (offerRAM >= (totalRAM + task.RAM)) { + taken = true + totalWatts += wattsToConsider + totalCPU += task.CPU + totalRAM += task.RAM + tasks = append(tasks, s.createTaskInfoAndLogSchedTrace(offer, powerClass, task)) + + if *task.Instances <= 0 { + // All instances of task have been scheduled, remove it + s.smallTasks = append(s.smallTasks[:i], s.smallTasks[i+1:]...) + s.shutDownIfNecessary() + } + } else { + break // Continue on to next task + } + } + } + + if taken { + log.Printf("Starting on [%s]\n", offer.GetHostname()) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) + } else { + // If there was no match for the task + fmt.Println("There is not enough resources to launch a task:") + cpus, mem, watts := offerUtils.OfferAgg(offer) + + log.Printf("\n", cpus, mem, watts) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) + } + } +} + +// Using first fit to spread large tasks into these offers. +func (s *TopHeavy) spread(offers []*mesos.Offer, driver sched.SchedulerDriver) { + for _, offer := range offers { + select { + case <-s.Shutdown: + log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) + + log.Println("Number of tasks still running: ", s.tasksRunning) + continue + default: + } + + tasks := []*mesos.TaskInfo{} + offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) + offerTaken := false + for i := 0; i < len(s.largeTasks); i++ { + task := s.largeTasks[i] + powerClass := offerUtils.PowerClass(offer) + + // Decision to take the offer or not + wattsToConsider := task.Watts + if !s.ignoreWatts { + wattsToConsider = task.ClassToWatts[powerClass] + } + if (s.ignoreWatts || (offerWatts >= wattsToConsider)) && + (offerCPU >= task.CPU) && (offerRAM >= task.RAM) { + offerTaken = true + tasks = append(tasks, s.createTaskInfoAndLogSchedTrace(offer, powerClass, task)) + log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) + + if *task.Instances <= 0 { + // All instances of task have been scheduled, remove it + s.largeTasks = append(s.largeTasks[:i], s.largeTasks[i+1:]...) + s.shutDownIfNecessary() + } + break // Offer taken, move on + } + } + + if !offerTaken { + // If there was no match for the task + fmt.Println("There is not enough resources to launch a task:") + cpus, mem, watts := offerUtils.OfferAgg(offer) + + log.Printf("\n", cpus, mem, watts) + driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) + } + } +} + +func (s *TopHeavy) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { + log.Printf("Received %d resource offers", len(offers)) + + // We need to separate the offers into + // offers from ClassA and ClassB and offers from ClassC. + // Offers from ClassA and ClassB would execute the large tasks. + // Offers from ClassC would execute the small tasks. + offersClassAB := []*mesos.Offer{} + offersClassC := []*mesos.Offer{} + + for _, offer := range offers { + select { + case <-s.Shutdown: + log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") + driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) + + log.Println("Number of tasks still running: ", s.tasksRunning) + continue + default: + } + + if constants.PowerClasses["ClassA"][*offer.Hostname] || + constants.PowerClasses["ClassB"][*offer.Hostname] { + offersClassAB = append(offersClassAB, offer) + } else if constants.PowerClasses["ClassC"][*offer.Hostname] { + offersClassC = append(offersClassC, offer) + } + } + + log.Println("ClassAB Offers:") + for _, o := range offersClassAB { + log.Println(*o.Hostname) + } + log.Println("ClassC Offers:") + for _, o := range offersClassC { + log.Println(*o.Hostname) + } + + // Packing tasks into offersClassC + s.pack(offersClassC, driver) + // Spreading tasks among offersClassAB + s.spread(offersClassAB, driver) +} + +func (s *TopHeavy) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { + log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) + + if *status.State == mesos.TaskState_TASK_RUNNING { + s.tasksRunning++ + } else if IsTerminal(status.State) { + delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) + s.tasksRunning-- + if s.tasksRunning == 0 { + select { + case <-s.Shutdown: + close(s.Done) + default: + } + } + } + log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) +} diff --git a/utilities/mesosUtils/mesosUtils.go b/utilities/mesosUtils/mesosUtils.go new file mode 100644 index 0000000..dcc5e77 --- /dev/null +++ b/utilities/mesosUtils/mesosUtils.go @@ -0,0 +1,11 @@ +package mesosUtils + +import ( + "github.com/golang/protobuf/proto" + mesos "github.com/mesos/mesos-go/mesosproto" +) + +var ( + DefaultFilter = &mesos.Filters{RefuseSeconds: proto.Float64(1)} + LongFilter = &mesos.Filters{RefuseSeconds: proto.Float64(1000)} +) diff --git a/utilities/offerUtils/offerUtils.go b/utilities/offerUtils/offerUtils.go new file mode 100644 index 0000000..6f5dc81 --- /dev/null +++ b/utilities/offerUtils/offerUtils.go @@ -0,0 +1,62 @@ +package offerUtils + +import ( + mesos "github.com/mesos/mesos-go/mesosproto" + "strings" +) + +func OfferAgg(offer *mesos.Offer) (float64, float64, float64) { + var cpus, mem, watts float64 + + for _, resource := range offer.Resources { + switch resource.GetName() { + case "cpus": + cpus += *resource.GetScalar().Value + case "mem": + mem += *resource.GetScalar().Value + case "watts": + watts += *resource.GetScalar().Value + } + } + + return cpus, mem, watts +} + +// Determine the power class of the host in the offer +func PowerClass(offer *mesos.Offer) string { + var powerClass string + for _, attr := range offer.GetAttributes() { + if attr.GetName() == "class" { + powerClass = attr.GetText().GetValue() + } + } + return powerClass +} + +// Implements the sort.Sort interface to sort Offers based on CPU. +// TODO: Have a generic sorter that sorts based on a defined requirement (CPU, RAM, DISK or Watts) +type OffersSorter []*mesos.Offer + +func (offersSorter OffersSorter) Len() int { + return len(offersSorter) +} + +func (offersSorter OffersSorter) Swap(i, j int) { + offersSorter[i], offersSorter[j] = offersSorter[j], offersSorter[i] +} + +func (offersSorter OffersSorter) Less(i, j int) bool { + // getting CPU resource availability of offersSorter[i] + cpu1, _, _ := OfferAgg(offersSorter[i]) + // getting CPU resource availability of offersSorter[j] + cpu2, _, _ := OfferAgg(offersSorter[j]) + return cpu1 <= cpu2 +} + +// Is there a mismatch between the task's host requirement and the host corresponding to the offer. +func HostMismatch(offerHost string, taskHost string) bool { + if taskHost != "" && !strings.HasPrefix(offerHost, taskHost) { + return true + } + return false +} diff --git a/utilities/runAvg/runAvg.go b/utilities/runAvg/runAvg.go index 592929f..297dea4 100644 --- a/utilities/runAvg/runAvg.go +++ b/utilities/runAvg/runAvg.go @@ -1,7 +1,7 @@ /* A utility to calculate the running average. -One should implement Val() to be able to use this utility. +One should implement Val() and ID() to use this utility. */ package runAvg @@ -19,9 +19,9 @@ type Interface interface { } type runningAverageCalculator struct { - window list.List - windowSize int - currentSum float64 + considerationWindow list.List + considerationWindowSize int + currentSum float64 } // singleton instance @@ -31,14 +31,14 @@ var racSingleton *runningAverageCalculator func getInstance(curSum float64, wSize int) *runningAverageCalculator { if racSingleton == nil { racSingleton = &runningAverageCalculator{ - windowSize: wSize, - currentSum: curSum, + considerationWindowSize: wSize, + currentSum: curSum, } return racSingleton } else { // Updating window size if a new window size is given. - if wSize != racSingleton.windowSize { - racSingleton.windowSize = wSize + if wSize != racSingleton.considerationWindowSize { + racSingleton.considerationWindowSize = wSize } return racSingleton } @@ -47,20 +47,20 @@ func getInstance(curSum float64, wSize int) *runningAverageCalculator { // Compute the running average by adding 'data' to the window. // Updating currentSum to get constant time complexity for every running average computation. func (rac *runningAverageCalculator) calculate(data Interface) float64 { - if rac.window.Len() < rac.windowSize { - rac.window.PushBack(data) + if rac.considerationWindow.Len() < rac.considerationWindowSize { + rac.considerationWindow.PushBack(data) rac.currentSum += data.Val() } else { // removing the element at the front of the window. - elementToRemove := rac.window.Front() + elementToRemove := rac.considerationWindow.Front() rac.currentSum -= elementToRemove.Value.(Interface).Val() - rac.window.Remove(elementToRemove) + rac.considerationWindow.Remove(elementToRemove) // adding new element to the window - rac.window.PushBack(data) + rac.considerationWindow.PushBack(data) rac.currentSum += data.Val() } - return rac.currentSum / float64(rac.window.Len()) + return rac.currentSum / float64(rac.considerationWindow.Len()) } /* @@ -68,9 +68,9 @@ If element with given ID present in the window, then remove it and return (remov Else, return (nil, error) */ func (rac *runningAverageCalculator) removeFromWindow(id string) (interface{}, error) { - for element := rac.window.Front(); element != nil; element = element.Next() { + for element := rac.considerationWindow.Front(); element != nil; element = element.Next() { if elementToRemove := element.Value.(Interface); elementToRemove.ID() == id { - rac.window.Remove(element) + rac.considerationWindow.Remove(element) rac.currentSum -= elementToRemove.Val() return elementToRemove, nil } @@ -102,7 +102,7 @@ func Init() { } // Setting parameters to default values. Could also set racSingleton to nil but this leads to unnecessary overhead of creating // another instance when Calc is called. - racSingleton.window.Init() - racSingleton.windowSize = 0 + racSingleton.considerationWindow.Init() + racSingleton.considerationWindowSize = 0 racSingleton.currentSum = 0.0 }