diff --git a/README.md b/README.md index 2e0ee06..56893c2 100644 --- a/README.md +++ b/README.md @@ -1,75 +1,10 @@ -Electron: A power budget manager -====================================== +Elektron: A Pluggable Mesos framework with power-aware capabilities +=================================================================== -To Do: +Elektron is a Mesos framework that behaves as a playground for developers to experiment with different scheduling policies to launch ad-hoc jobs. +Elektron is designed as a lightweight, configurable framework, which can be used in conjunction with built-in power-capping policies to reduce the peak power and/or energy usage of co-scheduled tasks. - * Create metrics for each task launched [Time to schedule, run time, power used] - * Have calibration phase? - * Add ability to use constraints - * Running average calculations https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average - * Make parameters corresponding to each scheduler configurable (possible to have a config template for each scheduler?) - * Adding type of scheduler to be used, to be picked from a config file, along with it's configurable parameters. - * Write test code for each scheduler (This should be after the design change) - Possible to setup the constants at runtime based on the environment? - * Log fix for declining offer -- different reason when insufficient resources as compared to when there are no - longer any tasks to schedule. - * Have a centralised logFile that can be filtered by identifier. All electron logs should go into this file. - * Make def.Task an interface for further modularization and flexibility. - * Convert def#WattsToConsider(...) to be a receiver of def.Task and change the name of it to Watts(...). - * **Critical** -- Add software requirements to the README.md (Mesos version, RAPL version, PCP version, Go version...) - * **Critical** -- Retrofit to use Go 1.8 sorting techniques. Use def/taskUtils.go for reference. - * Handle powerclass not configured on a node condition. As of now, an assumption is made that the powerclass is configured - * Refine the sorting algorithm that sorts the clusters of tasks retrieved using the kmeans algorithm. This also involves the reduction in time complexity of the same. - * Use the generic task sorter in def/taskUtils.go to sort the tasks based on CPU or RAM etc. Remove the existing sorters present in def/task.go. - for all the nodes. - -**Requires [Performance Co-Pilot](http://pcp.io/) tool pmdumptext to be installed on the -machine on which electron is launched for logging to work and PCP collector agents installed -on the Mesos Agents** - - -How to run (Use the --help option to get information about other command-line options): - -`./electron -workload ` - -To run electron with Watts as Resource, run the following command, - -`./electron -workload -wattsAsAResource` - - -Workload schema: - -``` -[ - { - "name": "minife", - "cpu": 3.0, - "ram": 4096, - "watts": 63.141, - "class_to_watts": { - "A": 93.062, - "B": 65.552, - "C": 57.897, - "D": 60.729 - }, - "image": "rdelvalle/minife:electron1", - "cmd": "cd src && mpirun -np 3 miniFE.x -nx 100 -ny 100 -nz 100", - "inst": 10 - }, - { - "name": "dgemm", - "cpu": 3.0, - "ram": 32, - "watts": 85.903, - "class_to_watts": { - "A": 114.789, - "B": 89.133, - "C": 82.672, - "D": 81.944 - }, - "image": "rdelvalle/dgemm:electron1", - "cmd": "/./mt-dgemm 1024", - "inst": 10 - } -] -``` +#Features +* Pluggable Scheduling policies +* Pluggable Power-Capping strategies +* Cluster resource monitoring diff --git a/schedulers/binpacksortedwatts.go b/schedulers/bin-packing.go similarity index 100% rename from schedulers/binpacksortedwatts.go rename to schedulers/bin-packing.go diff --git a/schedulers/binPackSortedWattsSortedOffers.go b/schedulers/binPackSortedWattsSortedOffers.go deleted file mode 100644 index 8d757fb..0000000 --- a/schedulers/binPackSortedWattsSortedOffers.go +++ /dev/null @@ -1,229 +0,0 @@ -package schedulers - -import ( - "bitbucket.org/sunybingcloud/electron/def" - "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" - "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" - "fmt" - "github.com/golang/protobuf/proto" - mesos "github.com/mesos/mesos-go/mesosproto" - "github.com/mesos/mesos-go/mesosutil" - sched "github.com/mesos/mesos-go/scheduler" - "log" - "os" - "sort" - "time" -) - -// Decides if to take an offer or not -func (s *BinPackSortedWattsSortedOffers) takeOffer(offer *mesos.Offer, task def.Task, - totalCPU, totalRAM, totalWatts float64) bool { - - offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) - - //TODO: Insert watts calculation here instead of taking them as a parameter - wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer) - if err != nil { - // Error in determining wattsConsideration - log.Fatal(err) - } - if (offerCPU >= (totalCPU + task.CPU)) && (offerRAM >= (totalRAM + task.RAM)) && - (!s.wattsAsAResource || (offerWatts >= (totalWatts + wattsConsideration))) { - return true - } - return false -} - -type BinPackSortedWattsSortedOffers struct { - base // Type embedded to inherit common functions -} - -// New electron scheduler -func NewBinPackSortedWattsSortedOffers(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, - classMapWatts bool) *BinPackSortedWattsSortedOffers { - sort.Sort(def.WattsSorter(tasks)) - - logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") - if err != nil { - log.Fatal(err) - } - - s := &BinPackSortedWattsSortedOffers{ - base: base{ - tasks: tasks, - wattsAsAResource: wattsAsAResource, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - RecordPCP: false, - schedTrace: log.New(logFile, "", log.LstdFlags), - }, - } - return s -} - -func (s *BinPackSortedWattsSortedOffers) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { - taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) - s.tasksCreated++ - - if !s.RecordPCP { - // Turn on logging - s.RecordPCP = true - time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts - } - - // If this is our first time running into this Agent - if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok { - s.running[offer.GetSlaveId().GoString()] = make(map[string]bool) - } - - // Add task to list of tasks running on node - s.running[offer.GetSlaveId().GoString()][taskName] = true - - resources := []*mesos.Resource{ - mesosutil.NewScalarResource("cpus", task.CPU), - mesosutil.NewScalarResource("mem", task.RAM), - } - - if s.wattsAsAResource { - if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil { - log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider) - resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider)) - } else { - // Error in determining wattsConsideration - log.Fatal(err) - } - } - - return &mesos.TaskInfo{ - Name: proto.String(taskName), - TaskId: &mesos.TaskID{ - Value: proto.String("electron-" + taskName), - }, - SlaveId: offer.SlaveId, - Resources: resources, - Command: &mesos.CommandInfo{ - Value: proto.String(task.CMD), - }, - Container: &mesos.ContainerInfo{ - Type: mesos.ContainerInfo_DOCKER.Enum(), - Docker: &mesos.ContainerInfo_DockerInfo{ - Image: proto.String(task.Image), - Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated - }, - }, - } -} - -func (s *BinPackSortedWattsSortedOffers) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { - log.Printf("Received %d resource offers", len(offers)) - - // Sorting the offers - sort.Sort(offerUtils.OffersSorter(offers)) - - // Printing the sorted offers and the corresponding CPU resource availability - log.Println("Sorted Offers:") - for i := 0; i < len(offers); i++ { - offer := offers[i] - offerUtils.UpdateEnvironment(offer) - offerCPU, _, _ := offerUtils.OfferAgg(offer) - log.Printf("Offer[%s].CPU = %f\n", offer.GetHostname(), offerCPU) - } - - for _, offer := range offers { - select { - case <-s.Shutdown: - log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) - - log.Println("Number of tasks still running: ", s.tasksRunning) - continue - default: - } - - tasks := []*mesos.TaskInfo{} - - offerTaken := false - totalWatts := 0.0 - totalCPU := 0.0 - totalRAM := 0.0 - for i := 0; i < len(s.tasks); i++ { - task := s.tasks[i] - wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer) - if err != nil { - // Error in determining wattsConsideration - log.Fatal(err) - } - - // Don't take offer if it doesn't match our task's host requirement - if offerUtils.HostMismatch(*offer.Hostname, task.Host) { - continue - } - - for *task.Instances > 0 { - // Does the task fit - if s.takeOffer(offer, task, totalCPU, totalRAM, totalWatts) { - - offerTaken = true - totalWatts += wattsConsideration - totalCPU += task.CPU - totalRAM += task.RAM - log.Println("Co-Located with: ") - coLocated(s.running[offer.GetSlaveId().GoString()]) - taskToSchedule := s.newTask(offer, task) - tasks = append(tasks, taskToSchedule) - - fmt.Println("Inst: ", *task.Instances) - s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) - *task.Instances-- - - if *task.Instances <= 0 { - // All instances of task have been scheduled, remove it - s.tasks = append(s.tasks[:i], s.tasks[i+1:]...) - - if len(s.tasks) <= 0 { - log.Println("Done scheduling all tasks") - close(s.Shutdown) - } - } - } else { - break // Continue on to next offer - } - } - } - - if offerTaken { - log.Printf("Starting on [%s]\n", offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) - } else { - - // If there was no match for the task - fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := offerUtils.OfferAgg(offer) - - log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) - } - } -} - -func (s *BinPackSortedWattsSortedOffers) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { - log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) - - if *status.State == mesos.TaskState_TASK_RUNNING { - s.tasksRunning++ - } else if IsTerminal(status.State) { - delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) - s.tasksRunning-- - if s.tasksRunning == 0 { - select { - case <-s.Shutdown: - close(s.Done) - default: - } - } - } - log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) -} diff --git a/schedulers/binpackedpistoncapping.go b/schedulers/binpackedpistoncapping.go deleted file mode 100644 index a7ead66..0000000 --- a/schedulers/binpackedpistoncapping.go +++ /dev/null @@ -1,397 +0,0 @@ -package schedulers - -import ( - "bitbucket.org/sunybingcloud/electron/constants" - "bitbucket.org/sunybingcloud/electron/def" - "bitbucket.org/sunybingcloud/electron/rapl" - "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" - "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" - "errors" - "fmt" - "github.com/golang/protobuf/proto" - mesos "github.com/mesos/mesos-go/mesosproto" - "github.com/mesos/mesos-go/mesosutil" - sched "github.com/mesos/mesos-go/scheduler" - "log" - "math" - "os" - "sync" - "time" -) - -/* - Piston Capper implements the Scheduler interface - - This is basically extending the BinPacking algorithm to also cap each node at a different values, - corresponding to the load on that node. -*/ -type BinPackedPistonCapper struct { - base // Type embedded to inherit common functions - taskMonitor map[string][]def.Task - totalPower map[string]float64 - ticker *time.Ticker - isCapping bool -} - -// New electron scheduler. -func NewBinPackedPistonCapper(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, - classMapWatts bool) *BinPackedPistonCapper { - - logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") - if err != nil { - log.Fatal(err) - } - - s := &BinPackedPistonCapper{ - base: base{ - tasks: tasks, - wattsAsAResource: wattsAsAResource, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - RecordPCP: false, - schedTrace: log.New(logFile, "", log.LstdFlags), - }, - taskMonitor: make(map[string][]def.Task), - totalPower: make(map[string]float64), - ticker: time.NewTicker(5 * time.Second), - isCapping: false, - } - return s -} - -// check whether task fits the offer or not. -func (s *BinPackedPistonCapper) takeOffer(offer *mesos.Offer, offerWatts float64, offerCPU float64, offerRAM float64, - totalWatts float64, totalCPU float64, totalRAM float64, task def.Task) bool { - wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer) - if err != nil { - // Error in determining wattsToConsider - log.Fatal(err) - } - if (!s.wattsAsAResource || (offerWatts >= (totalWatts + wattsConsideration))) && - (offerCPU >= (totalCPU + task.CPU)) && - (offerRAM >= (totalRAM + task.RAM)) { - return true - } else { - return false - } -} - -// mutex -var bpPistonMutex sync.Mutex - -func (s *BinPackedPistonCapper) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { - taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) - s.tasksCreated++ - - if !s.RecordPCP { - // Turn on logging - s.RecordPCP = true - time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts - } - - // If this is our first time running into this Agent - if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok { - s.running[offer.GetSlaveId().GoString()] = make(map[string]bool) - } - - // Setting the task ID to the task. This is done so that we can consider each task to be different, - // even though they have the same parameters. - task.SetTaskID(*proto.String("electron-" + taskName)) - // Add task to list of tasks running on node - s.running[offer.GetSlaveId().GoString()][taskName] = true - // Adding the task to the taskMonitor - if len(s.taskMonitor[*offer.Hostname]) == 0 { - s.taskMonitor[*offer.Hostname] = []def.Task{task} - } else { - s.taskMonitor[*offer.Hostname] = append(s.taskMonitor[*offer.Hostname], task) - } - - resources := []*mesos.Resource{ - mesosutil.NewScalarResource("cpus", task.CPU), - mesosutil.NewScalarResource("mem", task.RAM), - } - - if s.wattsAsAResource { - if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil { - log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider) - resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider)) - } else { - // Error in determining wattsConsideration - log.Fatal(err) - } - } - - return &mesos.TaskInfo{ - Name: proto.String(taskName), - TaskId: &mesos.TaskID{ - Value: proto.String("electron-" + taskName), - }, - SlaveId: offer.SlaveId, - Resources: resources, - Command: &mesos.CommandInfo{ - Value: proto.String(task.CMD), - }, - Container: &mesos.ContainerInfo{ - Type: mesos.ContainerInfo_DOCKER.Enum(), - Docker: &mesos.ContainerInfo_DockerInfo{ - Image: proto.String(task.Image), - Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated - }, - }, - } -} - -func (s *BinPackedPistonCapper) Disconnected(sched.SchedulerDriver) { - // Need to stop the capping process - s.ticker.Stop() - bpPistonMutex.Lock() - s.isCapping = false - bpPistonMutex.Unlock() - log.Println("Framework disconnected with master") -} - -// go routine to cap the each node in the cluster at regular intervals of time. -var bpPistonCapValues = make(map[string]float64) - -// Storing the previous cap value for each host so as to not repeatedly cap the nodes to the same value. (reduces overhead) -var bpPistonPreviousRoundedCapValues = make(map[string]float64) - -func (s *BinPackedPistonCapper) startCapping() { - go func() { - for { - select { - case <-s.ticker.C: - // Need to cap each node - bpPistonMutex.Lock() - for host, capValue := range bpPistonCapValues { - roundedCapValue := float64(int(math.Floor(capValue + 0.5))) - // has the cap value changed - if prevRoundedCap, ok := bpPistonPreviousRoundedCapValues[host]; ok { - if prevRoundedCap != roundedCapValue { - if err := rapl.Cap(host, "rapl", roundedCapValue); err != nil { - log.Println(err) - } else { - log.Printf("Capped [%s] at %d", host, - int(math.Floor(capValue+0.5))) - } - bpPistonPreviousRoundedCapValues[host] = roundedCapValue - } - } else { - if err := rapl.Cap(host, "rapl", roundedCapValue); err != nil { - log.Println(err) - } else { - log.Printf("Capped [%s] at %d", host, int(math.Floor(capValue+0.5))) - } - bpPistonPreviousRoundedCapValues[host] = roundedCapValue - } - } - bpPistonMutex.Unlock() - } - } - }() -} - -// Stop the capping -func (s *BinPackedPistonCapper) stopCapping() { - if s.isCapping { - log.Println("Stopping the capping.") - s.ticker.Stop() - bpPistonMutex.Lock() - s.isCapping = false - bpPistonMutex.Unlock() - } -} - -func (s *BinPackedPistonCapper) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { - log.Printf("Received %d resource offers", len(offers)) - - // retrieving the total power for each host in the offers - for _, offer := range offers { - offerUtils.UpdateEnvironment(offer) - if _, ok := s.totalPower[*offer.Hostname]; !ok { - _, _, offerWatts := offerUtils.OfferAgg(offer) - s.totalPower[*offer.Hostname] = offerWatts - } - } - - // Displaying the totalPower - for host, tpower := range s.totalPower { - log.Printf("TotalPower[%s] = %f", host, tpower) - } - - /* - Piston capping strategy - - Perform bin-packing of tasks on nodes in the cluster, making sure that no task is given less hard-limit resources than requested. - For each set of tasks that are scheduled, compute the new cap values for each host in the cluster. - At regular intervals of time, cap each node in the cluster. - */ - for _, offer := range offers { - select { - case <-s.Shutdown: - log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) - - log.Println("Number of tasks still running: ", s.tasksRunning) - continue - default: - } - - fitTasks := []*mesos.TaskInfo{} - offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) - offerTaken := false - totalWatts := 0.0 - totalCPU := 0.0 - totalRAM := 0.0 - // Store the partialLoad for host corresponding to this offer. - // Once we can't fit any more tasks, we update capValue for this host with partialLoad and then launch the fit tasks. - partialLoad := 0.0 - for i := 0; i < len(s.tasks); i++ { - task := s.tasks[i] - wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer) - if err != nil { - // Error in determining wattsConsideration - log.Fatal(err) - } - - // Don't take offer if it doesn't match our task's host requirement - if offerUtils.HostMismatch(*offer.Hostname, task.Host) { - continue - } - - for *task.Instances > 0 { - // Does the task fit - if s.takeOffer(offer, offerWatts, offerCPU, offerRAM, - totalWatts, totalCPU, totalRAM, task) { - - // Start piston capping if haven't started yet - if !s.isCapping { - s.isCapping = true - s.startCapping() - } - - offerTaken = true - totalWatts += wattsConsideration - totalCPU += task.CPU - totalRAM += task.RAM - log.Println("Co-Located with: ") - coLocated(s.running[offer.GetSlaveId().GoString()]) - taskToSchedule := s.newTask(offer, task) - fitTasks = append(fitTasks, taskToSchedule) - - log.Println("Inst: ", *task.Instances) - s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) - *task.Instances-- - // updating the cap value for offer.Hostname - partialLoad += ((wattsConsideration * constants.Tolerance) / s.totalPower[*offer.Hostname]) * 100 - - if *task.Instances <= 0 { - // All instances of task have been scheduled. Remove it - s.tasks = append(s.tasks[:i], s.tasks[i+1:]...) - if len(s.tasks) <= 0 { - log.Println("Done scheduling all tasks") - close(s.Shutdown) - } - } - } else { - break // Continue on to next task - } - } - } - - if offerTaken { - // Updating the cap value for offer.Hostname - bpPistonMutex.Lock() - bpPistonCapValues[*offer.Hostname] += partialLoad - bpPistonMutex.Unlock() - log.Printf("Starting on [%s]\n", offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, fitTasks, mesosUtils.DefaultFilter) - } else { - // If there was no match for task - log.Println("There is not enough resources to launch task: ") - cpus, mem, watts := offerUtils.OfferAgg(offer) - - log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) - } - } -} - -// Remove finished task from the taskMonitor -func (s *BinPackedPistonCapper) deleteFromTaskMonitor(finishedTaskID string) (def.Task, string, error) { - hostOfFinishedTask := "" - indexOfFinishedTask := -1 - found := false - var finishedTask def.Task - - for host, tasks := range s.taskMonitor { - for i, task := range tasks { - if task.TaskID == finishedTaskID { - hostOfFinishedTask = host - indexOfFinishedTask = i - found = true - } - } - if found { - break - } - } - - if hostOfFinishedTask != "" && indexOfFinishedTask != -1 { - finishedTask = s.taskMonitor[hostOfFinishedTask][indexOfFinishedTask] - log.Printf("Removing task with TaskID [%s] from the list of running tasks\n", - s.taskMonitor[hostOfFinishedTask][indexOfFinishedTask].TaskID) - s.taskMonitor[hostOfFinishedTask] = append(s.taskMonitor[hostOfFinishedTask][:indexOfFinishedTask], - s.taskMonitor[hostOfFinishedTask][indexOfFinishedTask+1:]...) - } else { - return finishedTask, hostOfFinishedTask, errors.New("Finished Task not present in TaskMonitor") - } - return finishedTask, hostOfFinishedTask, nil -} - -func (s *BinPackedPistonCapper) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { - log.Printf("Received task status [%s] for task [%s]\n", NameFor(status.State), *status.TaskId.Value) - - if *status.State == mesos.TaskState_TASK_RUNNING { - bpPistonMutex.Lock() - s.tasksRunning++ - bpPistonMutex.Unlock() - } else if IsTerminal(status.State) { - delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) - // Deleting the task from the taskMonitor - finishedTask, hostOfFinishedTask, err := s.deleteFromTaskMonitor(*status.TaskId.Value) - if err != nil { - log.Println(err) - } - - // Need to determine the watts consideration for the finishedTask - var wattsConsideration float64 - if s.classMapWatts { - wattsConsideration = finishedTask.ClassToWatts[hostToPowerClass(hostOfFinishedTask)] - } else { - wattsConsideration = finishedTask.Watts - } - // Need to update the cap values for host of the finishedTask - bpPistonMutex.Lock() - bpPistonCapValues[hostOfFinishedTask] -= ((wattsConsideration * constants.Tolerance) / s.totalPower[hostOfFinishedTask]) * 100 - // Checking to see if the cap value has become 0, in which case we uncap the host. - if int(math.Floor(bpPistonCapValues[hostOfFinishedTask]+0.5)) == 0 { - bpPistonCapValues[hostOfFinishedTask] = 100 - } - s.tasksRunning-- - bpPistonMutex.Unlock() - - if s.tasksRunning == 0 { - select { - case <-s.Shutdown: - s.stopCapping() - close(s.Done) - default: - } - } - } - log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) -} diff --git a/schedulers/bottomHeavy.go b/schedulers/bottomHeavy.go deleted file mode 100644 index d677f00..0000000 --- a/schedulers/bottomHeavy.go +++ /dev/null @@ -1,343 +0,0 @@ -package schedulers - -import ( - "bitbucket.org/sunybingcloud/electron/constants" - "bitbucket.org/sunybingcloud/electron/def" - "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" - "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" - "fmt" - "github.com/golang/protobuf/proto" - mesos "github.com/mesos/mesos-go/mesosproto" - "github.com/mesos/mesos-go/mesosutil" - sched "github.com/mesos/mesos-go/scheduler" - "log" - "os" - "sort" - "time" -) - -/* -Tasks are categorized into small and large tasks based on watts requirements. -All the large tasks are packed into offers from agents belonging to power classes A and B, using Bin-Packing. -All the small tasks are spread among offers from agents belonging to power class C and D, using First-Fit. - -Bin-Packing has the most effect when co-scheduling of tasks is increased. Large tasks typically utilize more resources and hence, -co-scheduling them has a great impact on the total power utilization. -*/ - -func (s *BottomHeavy) takeOfferBinPack(offer *mesos.Offer, totalCPU, totalRAM, totalWatts, - wattsToConsider float64, task def.Task) bool { - offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) - - //TODO: Insert watts calculation here instead of taking them as a parameter - if (!s.wattsAsAResource || (offerWatts >= (totalWatts + wattsToConsider))) && - (offerCPU >= (totalCPU + task.CPU)) && - (offerRAM >= (totalRAM + task.RAM)) { - return true - } - return false - -} - -func (s *BottomHeavy) takeOfferFirstFit(offer *mesos.Offer, wattsConsideration float64, task def.Task) bool { - offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) - - //TODO: Insert watts calculation here instead of taking them as a parameter - if (!s.wattsAsAResource || (offerWatts >= wattsConsideration)) && - (offerCPU >= task.CPU) && (offerRAM >= task.RAM) { - return true - } - return false -} - -// electronScheduler implements the Scheduler interface -type BottomHeavy struct { - base // Type embedded to inherit common functions - smallTasks, largeTasks []def.Task -} - -// New electron scheduler -func NewBottomHeavy(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *BottomHeavy { - sort.Sort(def.WattsSorter(tasks)) - - logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") - if err != nil { - log.Fatal(err) - } - - // Classification done based on MMPU watts requirements, into 2 clusters. - classifiedTasks := def.ClassifyTasks(tasks, 2) - - s := &BottomHeavy{ - base: base{ - wattsAsAResource: wattsAsAResource, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - RecordPCP: false, - schedTrace: log.New(logFile, "", log.LstdFlags), - }, - // Separating small tasks from large tasks. - smallTasks: classifiedTasks[0].Tasks, - largeTasks: classifiedTasks[1].Tasks, - } - return s -} - -func (s *BottomHeavy) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { - taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) - s.tasksCreated++ - - if !s.RecordPCP { - // Turn on logging - s.RecordPCP = true - time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts - } - - // If this is our first time running into this Agent - if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok { - s.running[offer.GetSlaveId().GoString()] = make(map[string]bool) - } - - // Add task to list of tasks running on node - s.running[offer.GetSlaveId().GoString()][taskName] = true - - resources := []*mesos.Resource{ - mesosutil.NewScalarResource("cpus", task.CPU), - mesosutil.NewScalarResource("mem", task.RAM), - } - - if s.wattsAsAResource { - if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil { - log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider) - resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider)) - } else { - // Error in determining wattsConsideration - log.Fatal(err) - } - } - - return &mesos.TaskInfo{ - Name: proto.String(taskName), - TaskId: &mesos.TaskID{ - Value: proto.String("electron-" + taskName), - }, - SlaveId: offer.SlaveId, - Resources: resources, - Command: &mesos.CommandInfo{ - Value: proto.String(task.CMD), - }, - Container: &mesos.ContainerInfo{ - Type: mesos.ContainerInfo_DOCKER.Enum(), - Docker: &mesos.ContainerInfo_DockerInfo{ - Image: proto.String(task.Image), - Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated - }, - }, - } -} - -// Shut down scheduler if no more tasks to schedule -func (s *BottomHeavy) shutDownIfNecessary() { - if len(s.smallTasks) <= 0 && len(s.largeTasks) <= 0 { - log.Println("Done scheduling all tasks") - close(s.Shutdown) - } -} - -// create TaskInfo and log scheduling trace -func (s *BottomHeavy) createTaskInfoAndLogSchedTrace(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { - log.Println("Co-Located with:") - coLocated(s.running[offer.GetSlaveId().GoString()]) - taskToSchedule := s.newTask(offer, task) - - fmt.Println("Inst: ", *task.Instances) - s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) - *task.Instances-- - return taskToSchedule -} - -// Using BinPacking to pack large tasks into the given offers. -func (s *BottomHeavy) pack(offers []*mesos.Offer, driver sched.SchedulerDriver) { - for _, offer := range offers { - select { - case <-s.Shutdown: - log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) - - log.Println("Number of tasks still running: ", s.tasksRunning) - continue - default: - } - - tasks := []*mesos.TaskInfo{} - totalWatts := 0.0 - totalCPU := 0.0 - totalRAM := 0.0 - offerTaken := false - for i := 0; i < len(s.largeTasks); i++ { - task := s.largeTasks[i] - wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer) - if err != nil { - // Error in determining wattsConsideration - log.Fatal(err) - } - - for *task.Instances > 0 { - // Does the task fit - // OR lazy evaluation. If ignore watts is set to true, second statement won't - // be evaluated. - if s.takeOfferBinPack(offer, totalCPU, totalRAM, totalWatts, wattsConsideration, task) { - offerTaken = true - totalWatts += wattsConsideration - totalCPU += task.CPU - totalRAM += task.RAM - tasks = append(tasks, s.createTaskInfoAndLogSchedTrace(offer, task)) - - if *task.Instances <= 0 { - // All instances of task have been scheduled, remove it - s.largeTasks = append(s.largeTasks[:i], s.largeTasks[i+1:]...) - s.shutDownIfNecessary() - } - } else { - break // Continue on to next task - } - } - } - - if offerTaken { - log.Printf("Starting on [%s]\n", offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) - } else { - // If there was no match for the task - fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := offerUtils.OfferAgg(offer) - - log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) - } - } -} - -// Using First-Fit to spread small tasks among the given offers. -func (s *BottomHeavy) spread(offers []*mesos.Offer, driver sched.SchedulerDriver) { - for _, offer := range offers { - select { - case <-s.Shutdown: - log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) - - log.Println("Number of tasks still running: ", s.tasksRunning) - continue - default: - } - - tasks := []*mesos.TaskInfo{} - taken := false - for i := 0; i < len(s.smallTasks); i++ { - task := s.smallTasks[i] - wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer) - if err != nil { - // Error in determining wattsConsideration - log.Fatal(err) - } - - // Decision to take the offer or not - if s.takeOfferFirstFit(offer, wattsConsideration, task) { - taken = true - tasks = append(tasks, s.createTaskInfoAndLogSchedTrace(offer, task)) - log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) - - if *task.Instances <= 0 { - // All instances of task have been scheduled, remove it - s.smallTasks = append(s.smallTasks[:i], s.smallTasks[i+1:]...) - s.shutDownIfNecessary() - } - break // Offer taken, move on - } - } - - if !taken { - // If there was no match for the task - fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := offerUtils.OfferAgg(offer) - - log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) - } - } -} - -func (s *BottomHeavy) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { - log.Printf("Received %d resource offers", len(offers)) - - // We need to separate the offers into - // offers from ClassA and ClassB and offers from ClassC and ClassD. - // Nodes in ClassA and ClassB will be packed with the large tasks. - // Small tasks will be spread out among the nodes in ClassC and ClassD. - offersHeavyPowerClasses := []*mesos.Offer{} - offersLightPowerClasses := []*mesos.Offer{} - - for _, offer := range offers { - offerUtils.UpdateEnvironment(offer) - select { - case <-s.Shutdown: - log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) - - log.Println("Number of tasks still running: ", s.tasksRunning) - continue - default: - } - - if _, ok := constants.PowerClasses["A"][*offer.Hostname]; ok { - offersHeavyPowerClasses = append(offersHeavyPowerClasses, offer) - } - if _, ok := constants.PowerClasses["B"][*offer.Hostname]; ok { - offersHeavyPowerClasses = append(offersHeavyPowerClasses, offer) - } - if _, ok := constants.PowerClasses["C"][*offer.Hostname]; ok { - offersLightPowerClasses = append(offersLightPowerClasses, offer) - } - if _, ok := constants.PowerClasses["D"][*offer.Hostname]; ok { - offersLightPowerClasses = append(offersLightPowerClasses, offer) - } - - } - - log.Println("Packing Large tasks into ClassAB offers:") - for _, o := range offersHeavyPowerClasses { - log.Println(*o.Hostname) - } - // Packing tasks into offersHeavyPowerClasses - s.pack(offersHeavyPowerClasses, driver) - - log.Println("Spreading Small tasks among ClassCD offers:") - for _, o := range offersLightPowerClasses { - log.Println(*o.Hostname) - } - // Spreading tasks among offersLightPowerClasses - s.spread(offersLightPowerClasses, driver) -} - -func (s *BottomHeavy) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { - log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) - - if *status.State == mesos.TaskState_TASK_RUNNING { - s.tasksRunning++ - } else if IsTerminal(status.State) { - delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) - s.tasksRunning-- - if s.tasksRunning == 0 { - select { - case <-s.Shutdown: - close(s.Done) - default: - } - } - } - log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) -} diff --git a/schedulers/bpswMaxMinPistonCapping.go b/schedulers/bpswMaxMinPistonCapping.go deleted file mode 100644 index 53a7200..0000000 --- a/schedulers/bpswMaxMinPistonCapping.go +++ /dev/null @@ -1,436 +0,0 @@ -package schedulers - -import ( - "bitbucket.org/sunybingcloud/electron/constants" - "bitbucket.org/sunybingcloud/electron/def" - "bitbucket.org/sunybingcloud/electron/rapl" - "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" - "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" - "errors" - "fmt" - "github.com/golang/protobuf/proto" - mesos "github.com/mesos/mesos-go/mesosproto" - "github.com/mesos/mesos-go/mesosutil" - sched "github.com/mesos/mesos-go/scheduler" - "log" - "math" - "os" - "sort" - "sync" - "time" -) - -// Decides if to take an offer or not -func (s *BPSWMaxMinPistonCapping) takeOffer(offer *mesos.Offer, task def.Task, - totalCPU, totalRAM, totalWatts float64) bool { - - cpus, mem, watts := offerUtils.OfferAgg(offer) - - //TODO: Insert watts calculation here instead of taking them as a parameter - - wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer) - if err != nil { - // Error in determining wattsConsideration - log.Fatal(err) - } - if (cpus >= (totalCPU + task.CPU)) && (mem >= (totalRAM + task.RAM)) && - (!s.wattsAsAResource || (watts >= (totalWatts + wattsConsideration))) { - return true - } - return false -} - -type BPSWMaxMinPistonCapping struct { - base //Type embedding to inherit common functions - taskMonitor map[string][]def.Task - totalPower map[string]float64 - ticker *time.Ticker - isCapping bool -} - -// New electron scheduler -func NewBPSWMaxMinPistonCapping(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, - classMapWatts bool) *BPSWMaxMinPistonCapping { - sort.Sort(def.WattsSorter(tasks)) - - logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") - if err != nil { - log.Fatal(err) - } - - s := &BPSWMaxMinPistonCapping{ - base: base{ - tasks: tasks, - wattsAsAResource: wattsAsAResource, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - RecordPCP: false, - schedTrace: log.New(logFile, "", log.LstdFlags), - }, - taskMonitor: make(map[string][]def.Task), - totalPower: make(map[string]float64), - ticker: time.NewTicker(5 * time.Second), - isCapping: false, - } - return s - -} - -func (s *BPSWMaxMinPistonCapping) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { - taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) - s.tasksCreated++ - - // Start recording only when we're creating the first task - if !s.RecordPCP { - // Turn on logging - s.RecordPCP = true - time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts - } - - // If this is our first time running into this Agent - if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok { - s.running[offer.GetSlaveId().GoString()] = make(map[string]bool) - } - - // Add task to list of tasks running on node - s.running[offer.GetSlaveId().GoString()][taskName] = true - - // Setting the task ID to the task. This is done so that we can consider each task to be different - // even though they have the same parameters. - task.SetTaskID(*proto.String("electron-" + taskName)) - // Add task to list of tasks running on node - if len(s.taskMonitor[*offer.Hostname]) == 0 { - s.taskMonitor[*offer.Hostname] = []def.Task{task} - } else { - s.taskMonitor[*offer.Hostname] = append(s.taskMonitor[*offer.Hostname], task) - } - - resources := []*mesos.Resource{ - mesosutil.NewScalarResource("cpus", task.CPU), - mesosutil.NewScalarResource("mem", task.RAM), - } - - if s.wattsAsAResource { - if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil { - log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider) - resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider)) - } else { - // Error in determining wattsConsideration - log.Fatal(err) - } - } - - return &mesos.TaskInfo{ - Name: proto.String(taskName), - TaskId: &mesos.TaskID{ - Value: proto.String("electron-" + taskName), - }, - SlaveId: offer.SlaveId, - Resources: resources, - Command: &mesos.CommandInfo{ - Value: proto.String(task.CMD), - }, - Container: &mesos.ContainerInfo{ - Type: mesos.ContainerInfo_DOCKER.Enum(), - Docker: &mesos.ContainerInfo_DockerInfo{ - Image: proto.String(task.Image), - Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated - }, - }, - } -} - -func (s *BPSWMaxMinPistonCapping) Disconnected(sched.SchedulerDriver) { - // Need to stop the capping process - s.ticker.Stop() - bpMaxMinPistonCappingMutex.Lock() - s.isCapping = false - bpMaxMinPistonCappingMutex.Unlock() - log.Println("Framework disconnected with master") -} - -// mutex -var bpMaxMinPistonCappingMutex sync.Mutex - -// go routine to cap each node in the cluster at regular intervals of time -var bpMaxMinPistonCappingCapValues = make(map[string]float64) - -// Storing the previous cap value for each host so as to not repeatedly cap the nodes to the same value. (reduces overhead) -var bpMaxMinPistonCappingPreviousRoundedCapValues = make(map[string]float64) - -func (s *BPSWMaxMinPistonCapping) startCapping() { - go func() { - for { - select { - case <-s.ticker.C: - // Need to cap each node - bpMaxMinPistonCappingMutex.Lock() - for host, capValue := range bpMaxMinPistonCappingCapValues { - roundedCapValue := float64(int(math.Floor(capValue + 0.5))) - // has the cap value changed - if previousRoundedCap, ok := bpMaxMinPistonCappingPreviousRoundedCapValues[host]; ok { - if previousRoundedCap != roundedCapValue { - if err := rapl.Cap(host, "rapl", roundedCapValue); err != nil { - log.Println(err) - } else { - log.Printf("Capped [%s] at %d", host, int(math.Floor(capValue))) - } - bpMaxMinPistonCappingPreviousRoundedCapValues[host] = roundedCapValue - } - } else { - if err := rapl.Cap(host, "rapl", roundedCapValue); err != nil { - log.Println(err) - } else { - log.Printf("Capped [%s] at %d", host, int(math.Floor(capValue+0.5))) - } - bpMaxMinPistonCappingPreviousRoundedCapValues[host] = roundedCapValue - } - } - bpMaxMinPistonCappingMutex.Unlock() - } - } - }() - -} - -// Stop the capping -func (s *BPSWMaxMinPistonCapping) stopCapping() { - if s.isCapping { - log.Println("Stopping the capping.") - s.ticker.Stop() - bpMaxMinPistonCappingMutex.Lock() - s.isCapping = false - bpMaxMinPistonCappingMutex.Unlock() - } -} - -// Determine if the remaining sapce inside of the offer is enough for -// the task we need to create. If it is, create a TaskInfo and return it. -func (s *BPSWMaxMinPistonCapping) CheckFit( - i int, - task def.Task, - wattsConsideration float64, - offer *mesos.Offer, - totalCPU *float64, - totalRAM *float64, - totalWatts *float64, - partialLoad *float64) (bool, *mesos.TaskInfo) { - - // Does the task fit - if s.takeOffer(offer, task, *totalCPU, *totalRAM, *totalWatts) { - - // Start piston capping if haven't started yet - if !s.isCapping { - s.isCapping = true - s.startCapping() - } - - *totalWatts += wattsConsideration - *totalCPU += task.CPU - *totalRAM += task.RAM - log.Println("Co-Located with: ") - coLocated(s.running[offer.GetSlaveId().GoString()]) - - taskToSchedule := s.newTask(offer, task) - - fmt.Println("Inst: ", *task.Instances) - s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) - *task.Instances-- - *partialLoad += ((wattsConsideration * constants.Tolerance) / s.totalPower[*offer.Hostname]) * 100 - - if *task.Instances <= 0 { - // All instances of task have been scheduled, remove it - s.tasks = append(s.tasks[:i], s.tasks[i+1:]...) - - if len(s.tasks) <= 0 { - log.Println("Done scheduling all tasks") - close(s.Shutdown) - } - } - - return true, taskToSchedule - } - - return false, nil -} - -func (s *BPSWMaxMinPistonCapping) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { - log.Printf("Received %d resource offers", len(offers)) - - for _, offer := range offers { - offerUtils.UpdateEnvironment(offer) - select { - case <-s.Shutdown: - log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) - - log.Println("Number of tasks still running: ", s.tasksRunning) - continue - default: - } - - tasks := []*mesos.TaskInfo{} - - offerTaken := false - totalWatts := 0.0 - totalCPU := 0.0 - totalRAM := 0.0 - // Store the partialLoad for host corresponding to this offer - // Once we can't fit any more tasks, we update the capValue for this host using partialLoad and then launch the fit tasks. - partialLoad := 0.0 - - // Assumes s.tasks is ordered in non-decreasing median max peak order - - // Attempt to schedule a single instance of the heaviest workload available first - // Start from the back until one fits - for i := len(s.tasks) - 1; i >= 0; i-- { - - task := s.tasks[i] - wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer) - if err != nil { - // Error in determining wattsConsideration - log.Fatal(err) - } - - // Don't take offer if it doesn't match our task's host requirement - if offerUtils.HostMismatch(*offer.Hostname, task.Host) { - continue - } - - // TODO: Fix this so index doesn't need to be passed - taken, taskToSchedule := s.CheckFit(i, task, wattsConsideration, offer, - &totalCPU, &totalRAM, &totalWatts, &partialLoad) - - if taken { - offerTaken = true - tasks = append(tasks, taskToSchedule) - break - } - } - - // Pack the rest of the offer with the smallest tasks - for i := 0; i < len(s.tasks); i++ { - task := s.tasks[i] - wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer) - if err != nil { - // Error in determining wattsConsideration - log.Fatal(err) - } - - // Don't take offer if it doesn't match our task's host requirement - if offerUtils.HostMismatch(*offer.Hostname, task.Host) { - continue - } - - for *task.Instances > 0 { - // TODO: Fix this so index doesn't need to be passed - taken, taskToSchedule := s.CheckFit(i, task, wattsConsideration, offer, - &totalCPU, &totalRAM, &totalWatts, &partialLoad) - - if taken { - offerTaken = true - tasks = append(tasks, taskToSchedule) - } else { - break // Continue on to next task - } - } - } - - if offerTaken { - // Updating the cap value for offer.Hostname - bpMaxMinPistonCappingMutex.Lock() - bpMaxMinPistonCappingCapValues[*offer.Hostname] += partialLoad - bpMaxMinPistonCappingMutex.Unlock() - log.Printf("Starting on [%s]\n", offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) - } else { - - // If there was no match for the task - fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := offerUtils.OfferAgg(offer) - - log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) - } - } -} - -// Remove finished task from the taskMonitor -func (s *BPSWMaxMinPistonCapping) deleteFromTaskMonitor(finishedTaskID string) (def.Task, string, error) { - hostOfFinishedTask := "" - indexOfFinishedTask := -1 - found := false - var finishedTask def.Task - - for host, tasks := range s.taskMonitor { - for i, task := range tasks { - if task.TaskID == finishedTaskID { - hostOfFinishedTask = host - indexOfFinishedTask = i - found = true - } - } - if found { - break - } - } - - if hostOfFinishedTask != "" && indexOfFinishedTask != -1 { - finishedTask = s.taskMonitor[hostOfFinishedTask][indexOfFinishedTask] - log.Printf("Removing task with TaskID [%s] from the list of running tasks\n", - s.taskMonitor[hostOfFinishedTask][indexOfFinishedTask].TaskID) - s.taskMonitor[hostOfFinishedTask] = append(s.taskMonitor[hostOfFinishedTask][:indexOfFinishedTask], - s.taskMonitor[hostOfFinishedTask][indexOfFinishedTask+1:]...) - } else { - return finishedTask, hostOfFinishedTask, errors.New("Finished Task not present in TaskMonitor") - } - return finishedTask, hostOfFinishedTask, nil -} - -func (s *BPSWMaxMinPistonCapping) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { - log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) - - if *status.State == mesos.TaskState_TASK_RUNNING { - bpMaxMinPistonCappingMutex.Lock() - s.tasksRunning++ - bpMaxMinPistonCappingMutex.Unlock() - } else if IsTerminal(status.State) { - delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) - // Deleting the task from the taskMonitor - finishedTask, hostOfFinishedTask, err := s.deleteFromTaskMonitor(*status.TaskId.Value) - if err != nil { - log.Println(err) - } - - // Need to determine the watts consideration for the finishedTask - var wattsConsideration float64 - if s.classMapWatts { - wattsConsideration = finishedTask.ClassToWatts[hostToPowerClass(hostOfFinishedTask)] - } else { - wattsConsideration = finishedTask.Watts - } - // Need to update the cap values for host of the finishedTask - bpMaxMinPistonCappingMutex.Lock() - bpMaxMinPistonCappingCapValues[hostOfFinishedTask] -= ((wattsConsideration * constants.Tolerance) / s.totalPower[hostOfFinishedTask]) * 100 - // Checking to see if the cap value has become 0, in which case we uncap the host. - if int(math.Floor(bpMaxMinPistonCappingCapValues[hostOfFinishedTask]+0.5)) == 0 { - bpMaxMinPistonCappingCapValues[hostOfFinishedTask] = 100 - } - s.tasksRunning-- - bpMaxMinPistonCappingMutex.Unlock() - - if s.tasksRunning == 0 { - select { - case <-s.Shutdown: - s.stopCapping() - close(s.Done) - default: - } - } - } - log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) - -} diff --git a/schedulers/bpswMaxMinProacCC.go b/schedulers/bpswMaxMinProacCC.go deleted file mode 100644 index a0ac947..0000000 --- a/schedulers/bpswMaxMinProacCC.go +++ /dev/null @@ -1,447 +0,0 @@ -package schedulers - -import ( - "bitbucket.org/sunybingcloud/electron/constants" - "bitbucket.org/sunybingcloud/electron/def" - powCap "bitbucket.org/sunybingcloud/electron/powerCapping" - "bitbucket.org/sunybingcloud/electron/rapl" - "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" - "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" - "fmt" - "github.com/golang/protobuf/proto" - mesos "github.com/mesos/mesos-go/mesosproto" - "github.com/mesos/mesos-go/mesosutil" - sched "github.com/mesos/mesos-go/scheduler" - "log" - "math" - "os" - "sort" - "sync" - "time" -) - -// Decides if to take an offer or not -func (s *BPSWMaxMinProacCC) takeOffer(offer *mesos.Offer, task def.Task, - totalCPU, totalRAM, totalWatts float64) bool { - cpus, mem, watts := offerUtils.OfferAgg(offer) - - //TODO: Insert watts calculation here instead of taking them as a parameter - - wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer) - if err != nil { - // Error in determining wattsConsideration - log.Fatal(err) - } - if (cpus >= (totalCPU + task.CPU)) && (mem >= (totalRAM + task.RAM)) && - (!s.wattsAsAResource || (watts >= (totalWatts + wattsConsideration))) { - return true - } - return false -} - -type BPSWMaxMinProacCC struct { - base // Type embedding to inherit common functions - taskMonitor map[string][]def.Task - availablePower map[string]float64 - totalPower map[string]float64 - capper *powCap.ClusterwideCapper - ticker *time.Ticker - recapTicker *time.Ticker - isCapping bool // indicate whether we are currently performing cluster-wide capping. - isRecapping bool // indicate whether we are currently performing cluster-wide recapping. -} - -// New electron scheduler -func NewBPSWMaxMinProacCC(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *BPSWMaxMinProacCC { - sort.Sort(def.WattsSorter(tasks)) - - logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") - if err != nil { - log.Fatal(err) - } - - s := &BPSWMaxMinProacCC{ - base: base{ - tasks: tasks, - wattsAsAResource: wattsAsAResource, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - RecordPCP: false, - schedTrace: log.New(logFile, "", log.LstdFlags), - }, - taskMonitor: make(map[string][]def.Task), - availablePower: make(map[string]float64), - totalPower: make(map[string]float64), - capper: powCap.GetClusterwideCapperInstance(), - ticker: time.NewTicker(10 * time.Second), - recapTicker: time.NewTicker(20 * time.Second), - isCapping: false, - isRecapping: false, - } - return s -} - -// mutex -var bpMaxMinProacCCMutex sync.Mutex - -func (s *BPSWMaxMinProacCC) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { - taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) - s.tasksCreated++ - - if !s.RecordPCP { - // Turn on logging. - s.RecordPCP = true - time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts - } - - // If this is our first time running into this Agent - if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok { - s.running[offer.GetSlaveId().GoString()] = make(map[string]bool) - } - - // Setting the task ID to the task. This is done so that we can consider each task to be different, - // even though they have the same parameters. - task.SetTaskID(*proto.String("electron-" + taskName)) - // Add task to the list of tasks running on the node. - s.running[offer.GetSlaveId().GoString()][taskName] = true - if len(s.taskMonitor[*offer.Hostname]) == 0 { - s.taskMonitor[*offer.Hostname] = []def.Task{task} - } else { - s.taskMonitor[*offer.Hostname] = append(s.taskMonitor[*offer.Hostname], task) - } - - resources := []*mesos.Resource{ - mesosutil.NewScalarResource("cpus", task.CPU), - mesosutil.NewScalarResource("mem", task.RAM), - } - - if s.wattsAsAResource { - if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil { - log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider) - resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider)) - } else { - // Error in determining wattsConsideration - log.Fatal(err) - } - } - - return &mesos.TaskInfo{ - Name: proto.String(taskName), - TaskId: &mesos.TaskID{ - Value: proto.String("electron-" + taskName), - }, - SlaveId: offer.SlaveId, - Resources: resources, - Command: &mesos.CommandInfo{ - Value: proto.String(task.CMD), - }, - Container: &mesos.ContainerInfo{ - Type: mesos.ContainerInfo_DOCKER.Enum(), - Docker: &mesos.ContainerInfo_DockerInfo{ - Image: proto.String(task.Image), - Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated - }, - }, - } -} - -// go routine to cap the entire cluster in regular intervals of time. -var bpMaxMinProacCCCapValue = 0.0 // initial value to indicate that we haven't capped the cluster yet. -var bpMaxMinProacCCNewCapValue = 0.0 // newly computed cap value -func (s *BPSWMaxMinProacCC) startCapping() { - go func() { - for { - select { - case <-s.ticker.C: - // Need to cap the cluster only if new cap value different from old cap value. - // This way we don't unnecessarily cap the cluster. - bpMaxMinProacCCMutex.Lock() - if s.isCapping { - if int(math.Floor(bpMaxMinProacCCNewCapValue+0.5)) != int(math.Floor(bpMaxMinProacCCCapValue+0.5)) { - // updating cap value - bpMaxMinProacCCCapValue = bpMaxMinProacCCNewCapValue - if bpMaxMinProacCCCapValue > 0.0 { - for host, _ := range constants.Hosts { - // Rounding cap value to nearest int - if err := rapl.Cap(host, "rapl", float64(int(math.Floor(bpMaxMinProacCCCapValue+0.5)))); err != nil { - log.Println(err) - } - } - log.Printf("Capped the cluster to %d", int(math.Floor(bpMaxMinProacCCCapValue+0.5))) - } - } - } - bpMaxMinProacCCMutex.Unlock() - } - } - }() -} - -// go routine to recap the entire cluster in regular intervals of time. -var bpMaxMinProacCCRecapValue = 0.0 // The cluster-wide cap value when recapping. -func (s *BPSWMaxMinProacCC) startRecapping() { - go func() { - for { - select { - case <-s.recapTicker.C: - bpMaxMinProacCCMutex.Lock() - // If stopped performing cluster-wide capping, then we need to recap. - if s.isRecapping && bpMaxMinProacCCRecapValue > 0.0 { - for host, _ := range constants.Hosts { - // Rounding the recap value to the nearest int - if err := rapl.Cap(host, "rapl", float64(int(math.Floor(bpMaxMinProacCCRecapValue+0.5)))); err != nil { - log.Println(err) - } - } - log.Printf("Capped the cluster to %d", int(math.Floor(bpMaxMinProacCCRecapValue+0.5))) - } - // Setting the recapping to false - s.isRecapping = false - bpMaxMinProacCCMutex.Unlock() - } - } - }() - -} - -// Stop cluster-wide capping -func (s *BPSWMaxMinProacCC) stopCapping() { - if s.isCapping { - log.Println("Stopping the cluster-wide capping.") - s.ticker.Stop() - bpMaxMinProacCCMutex.Lock() - s.isCapping = false - s.isRecapping = true - bpMaxMinProacCCMutex.Unlock() - } -} - -// Stop the cluster-wide recapping -func (s *BPSWMaxMinProacCC) stopRecapping() { - // If not capping, then definitely recapping. - if !s.isCapping && s.isRecapping { - log.Println("Stopping the cluster-wide re-capping.") - s.recapTicker.Stop() - bpMaxMinProacCCMutex.Lock() - s.isRecapping = false - bpMaxMinProacCCMutex.Unlock() - } -} - -// Determine if the remaining space inside of the offer is enough for -// the task we need to create. If it is, create TaskInfo and return it. -func (s *BPSWMaxMinProacCC) CheckFit( - i int, - task def.Task, - wattsConsideration float64, - offer *mesos.Offer, - totalCPU *float64, - totalRAM *float64, - totalWatts *float64) (bool, *mesos.TaskInfo) { - - // Does the task fit - if s.takeOffer(offer, task, *totalCPU, *totalRAM, *totalWatts) { - - // Capping the cluster if haven't yet started - if !s.isCapping { - bpMaxMinProacCCMutex.Lock() - s.isCapping = true - bpMaxMinProacCCMutex.Unlock() - s.startCapping() - } - - tempCap, err := s.capper.FCFSDeterminedCap(s.totalPower, &task) - if err == nil { - bpMaxMinProacCCMutex.Lock() - bpMaxMinProacCCNewCapValue = tempCap - bpMaxMinProacCCMutex.Unlock() - } else { - log.Println("Failed to determine new cluster-wide cap:") - log.Println(err) - } - - *totalWatts += wattsConsideration - *totalCPU += task.CPU - *totalRAM += task.RAM - log.Println("Co-Located with: ") - coLocated(s.running[offer.GetSlaveId().GoString()]) - - taskToSchedule := s.newTask(offer, task) - - fmt.Println("Inst: ", *task.Instances) - s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) - *task.Instances-- - - if *task.Instances <= 0 { - // All instances of task have been scheduled, remove it - s.tasks = append(s.tasks[:i], s.tasks[i+1:]...) - - if len(s.tasks) <= 0 { - log.Println("Done scheduling all tasks") - // Need to stop the cluster wide capping - s.stopCapping() - s.startRecapping() // Load changes after every task finishes and hence, we need to change the capping of the cluster. - close(s.Shutdown) - } - } - - return true, taskToSchedule - } - - return false, nil - -} - -func (s *BPSWMaxMinProacCC) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { - log.Printf("Received %d resource offers", len(offers)) - - // retrieving the available power for all the hosts in the offers. - for _, offer := range offers { - offerUtils.UpdateEnvironment(offer) - _, _, offerWatts := offerUtils.OfferAgg(offer) - s.availablePower[*offer.Hostname] = offerWatts - // setting total power if the first time - if _, ok := s.totalPower[*offer.Hostname]; !ok { - s.totalPower[*offer.Hostname] = offerWatts - } - } - - for host, tpower := range s.totalPower { - log.Printf("TotalPower[%s] = %f", host, tpower) - } - - for _, offer := range offers { - select { - case <-s.Shutdown: - log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) - - log.Println("Number of tasks still running: ", s.tasksRunning) - continue - default: - } - - tasks := []*mesos.TaskInfo{} - - offerTaken := false - totalWatts := 0.0 - totalCPU := 0.0 - totalRAM := 0.0 - - // Assumes s.tasks is ordered in non-decreasing median max peak order - - // Attempt to schedule a single instance of the heaviest workload available first - // Start from the back until one fits - for i := len(s.tasks) - 1; i >= 0; i-- { - - task := s.tasks[i] - wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer) - if err != nil { - // Error in determining wattsConsideration - log.Fatal(err) - } - // Don't take offer if it doesn't match our task's host requirement - if offerUtils.HostMismatch(*offer.Hostname, task.Host) { - continue - } - - // TODO: Fix this so index doesn't need to be passed - taken, taskToSchedule := s.CheckFit(i, task, wattsConsideration, offer, - &totalCPU, &totalRAM, &totalWatts) - - if taken { - offerTaken = true - tasks = append(tasks, taskToSchedule) - break - } - } - - // Pack the rest of the offer with the smallest tasks - for i := 0; i < len(s.tasks); i++ { - task := s.tasks[i] - wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer) - if err != nil { - // Error in determining wattsConsideration - log.Fatal(err) - } - - // Don't take offer if it doesn't match our task's host requirement - if offerUtils.HostMismatch(*offer.Hostname, task.Host) { - continue - } - - for *task.Instances > 0 { - // TODO: Fix this so index doesn't need to be passed - taken, taskToSchedule := s.CheckFit(i, task, wattsConsideration, offer, - &totalCPU, &totalRAM, &totalWatts) - - if taken { - offerTaken = true - tasks = append(tasks, taskToSchedule) - } else { - break // Continue on to next task - } - } - } - - if offerTaken { - log.Printf("Starting on [%s]\n", offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) - } else { - - // If there was no match for the task - fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := offerUtils.OfferAgg(offer) - - log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) - } - } -} - -func (s *BPSWMaxMinProacCC) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { - log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) - - if *status.State == mesos.TaskState_TASK_RUNNING { - s.tasksRunning++ - } else if IsTerminal(status.State) { - delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) - // Need to remove the task from the window - s.capper.TaskFinished(*status.TaskId.Value) - // Determining the new cluster wide recap value - tempCap, err := s.capper.NaiveRecap(s.totalPower, s.taskMonitor, *status.TaskId.Value) - //tempCap, err := s.capper.CleverRecap(s.totalPower, s.taskMonitor, *status.TaskId.Value) - if err == nil { - // If new determined recap value is different from the current recap value, then we need to recap. - if int(math.Floor(tempCap+0.5)) != int(math.Floor(bpMaxMinProacCCRecapValue+0.5)) { - bpMaxMinProacCCRecapValue = tempCap - bpMaxMinProacCCMutex.Lock() - s.isRecapping = true - bpMaxMinProacCCMutex.Unlock() - log.Printf("Determined re-cap value: %f\n", bpMaxMinProacCCRecapValue) - } else { - bpMaxMinProacCCMutex.Lock() - s.isRecapping = false - bpMaxMinProacCCMutex.Unlock() - } - } else { - log.Println(err) - } - - s.tasksRunning-- - if s.tasksRunning == 0 { - select { - case <-s.Shutdown: - // Need to stop the cluster-wide recapping - s.stopRecapping() - close(s.Done) - default: - } - } - } - log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) - -} diff --git a/schedulers/firstfit.go b/schedulers/first-fit.go similarity index 100% rename from schedulers/firstfit.go rename to schedulers/first-fit.go diff --git a/schedulers/firstfitProacCC.go b/schedulers/firstfitProacCC.go deleted file mode 100644 index 51a466e..0000000 --- a/schedulers/firstfitProacCC.go +++ /dev/null @@ -1,382 +0,0 @@ -package schedulers - -import ( - "bitbucket.org/sunybingcloud/electron/constants" - "bitbucket.org/sunybingcloud/electron/def" - powCap "bitbucket.org/sunybingcloud/electron/powerCapping" - "bitbucket.org/sunybingcloud/electron/rapl" - "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" - "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" - "fmt" - "github.com/golang/protobuf/proto" - mesos "github.com/mesos/mesos-go/mesosproto" - "github.com/mesos/mesos-go/mesosutil" - sched "github.com/mesos/mesos-go/scheduler" - "log" - "math" - "os" - "sync" - "time" -) - -// Decides if to take an offer or not -func (s *FirstFitProacCC) takeOffer(offer *mesos.Offer, task def.Task) bool { - offer_cpu, offer_mem, offer_watts := offerUtils.OfferAgg(offer) - - wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer) - if err != nil { - // Error in determining wattsConsideration - log.Fatal(err) - } - if offer_cpu >= task.CPU && offer_mem >= task.RAM && (!s.wattsAsAResource || (offer_watts >= wattsConsideration)) { - return true - } - return false -} - -// electronScheduler implements the Scheduler interface. -type FirstFitProacCC struct { - base // Type embedded to inherit common functions - taskMonitor map[string][]def.Task // store tasks that are currently running. - availablePower map[string]float64 // available power for each node in the cluster. - totalPower map[string]float64 // total power for each node in the cluster. - capper *powCap.ClusterwideCapper - ticker *time.Ticker - recapTicker *time.Ticker - isCapping bool // indicate whether we are currently performing cluster wide capping. - isRecapping bool // indicate whether we are currently performing cluster wide re-capping. -} - -// New electron scheduler. -func NewFirstFitProacCC(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, - classMapWatts bool) *FirstFitProacCC { - - logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") - if err != nil { - log.Fatal(err) - } - - s := &FirstFitProacCC{ - base: base{ - tasks: tasks, - wattsAsAResource: wattsAsAResource, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - RecordPCP: false, - schedTrace: log.New(logFile, "", log.LstdFlags), - }, - taskMonitor: make(map[string][]def.Task), - availablePower: make(map[string]float64), - totalPower: make(map[string]float64), - capper: powCap.GetClusterwideCapperInstance(), - ticker: time.NewTicker(10 * time.Second), - recapTicker: time.NewTicker(20 * time.Second), - isCapping: false, - isRecapping: false, - } - return s -} - -// mutex -var fcfsMutex sync.Mutex - -func (s *FirstFitProacCC) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { - taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) - s.tasksCreated++ - - if !s.RecordPCP { - // Turn on logging. - s.RecordPCP = true - time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts - } - - // If this is our first time running into this Agent - if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok { - s.running[offer.GetSlaveId().GoString()] = make(map[string]bool) - } - - // Setting the task ID to the task. This is done so that we can consider each task to be different, - // even though they have the same parameters. - task.SetTaskID(*proto.String("electron-" + taskName)) - // Add task to the list of tasks running on the node. - s.running[offer.GetSlaveId().GoString()][taskName] = true - if len(s.taskMonitor[*offer.Hostname]) == 0 { - s.taskMonitor[*offer.Hostname] = []def.Task{task} - } else { - s.taskMonitor[*offer.Hostname] = append(s.taskMonitor[*offer.Hostname], task) - } - - resources := []*mesos.Resource{ - mesosutil.NewScalarResource("cpus", task.CPU), - mesosutil.NewScalarResource("mem", task.RAM), - } - - if s.wattsAsAResource { - if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil { - log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider) - resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider)) - } else { - // Error in determining wattsConsideration - log.Fatal(err) - } - } - - return &mesos.TaskInfo{ - Name: proto.String(taskName), - TaskId: &mesos.TaskID{ - Value: proto.String("electron-" + taskName), - }, - SlaveId: offer.SlaveId, - Resources: resources, - Command: &mesos.CommandInfo{ - Value: proto.String(task.CMD), - }, - Container: &mesos.ContainerInfo{ - Type: mesos.ContainerInfo_DOCKER.Enum(), - Docker: &mesos.ContainerInfo_DockerInfo{ - Image: proto.String(task.Image), - Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated - }, - }, - } -} - -func (s *FirstFitProacCC) Disconnected(sched.SchedulerDriver) { - // Need to stop the capping process. - s.ticker.Stop() - s.recapTicker.Stop() - fcfsMutex.Lock() - s.isCapping = false - fcfsMutex.Unlock() - log.Println("Framework disconnected with master") -} - -// go routine to cap the entire cluster in regular intervals of time. -var fcfsCurrentCapValue = 0.0 // initial value to indicate that we haven't capped the cluster yet. -func (s *FirstFitProacCC) startCapping() { - go func() { - for { - select { - case <-s.ticker.C: - // Need to cap the cluster to the fcfsCurrentCapValue. - fcfsMutex.Lock() - if fcfsCurrentCapValue > 0.0 { - for host, _ := range constants.Hosts { - // Rounding curreCapValue to the nearest int. - if err := rapl.Cap(host, "rapl", float64(int(math.Floor(fcfsCurrentCapValue+0.5)))); err != nil { - log.Println(err) - } - } - log.Printf("Capped the cluster to %d", int(math.Floor(fcfsCurrentCapValue+0.5))) - } - fcfsMutex.Unlock() - } - } - }() -} - -// go routine to cap the entire cluster in regular intervals of time. -var fcfsRecapValue = 0.0 // The cluster wide cap value when recapping. -func (s *FirstFitProacCC) startRecapping() { - go func() { - for { - select { - case <-s.recapTicker.C: - fcfsMutex.Lock() - // If stopped performing cluster wide capping then we need to explicitly cap the entire cluster. - if s.isRecapping && fcfsRecapValue > 0.0 { - for host, _ := range constants.Hosts { - // Rounding curreCapValue to the nearest int. - if err := rapl.Cap(host, "rapl", float64(int(math.Floor(fcfsRecapValue+0.5)))); err != nil { - log.Println(err) - } - } - log.Printf("Recapped the cluster to %d", int(math.Floor(fcfsRecapValue+0.5))) - } - // setting recapping to false - s.isRecapping = false - fcfsMutex.Unlock() - } - } - }() -} - -// Stop cluster wide capping -func (s *FirstFitProacCC) stopCapping() { - if s.isCapping { - log.Println("Stopping the cluster wide capping.") - s.ticker.Stop() - fcfsMutex.Lock() - s.isCapping = false - s.isRecapping = true - fcfsMutex.Unlock() - } -} - -// Stop cluster wide Recapping -func (s *FirstFitProacCC) stopRecapping() { - // If not capping, then definitely recapping. - if !s.isCapping && s.isRecapping { - log.Println("Stopping the cluster wide re-capping.") - s.recapTicker.Stop() - fcfsMutex.Lock() - s.isRecapping = false - fcfsMutex.Unlock() - } -} - -func (s *FirstFitProacCC) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { - log.Printf("Received %d resource offers", len(offers)) - - // retrieving the available power for all the hosts in the offers. - for _, offer := range offers { - offerUtils.UpdateEnvironment(offer) - _, _, offer_watts := offerUtils.OfferAgg(offer) - s.availablePower[*offer.Hostname] = offer_watts - // setting total power if the first time. - if _, ok := s.totalPower[*offer.Hostname]; !ok { - s.totalPower[*offer.Hostname] = offer_watts - } - } - - for host, tpower := range s.totalPower { - log.Printf("TotalPower[%s] = %f", host, tpower) - } - - for _, offer := range offers { - select { - case <-s.Shutdown: - log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) - - log.Println("Number of tasks still running: ", s.tasksRunning) - continue - default: - } - - /* - Clusterwide Capping strategy - - For each task in s.tasks, - 1. Need to check whether the offer can be taken or not (based on CPU and RAM requirements). - 2. If the tasks fits the offer, then I need to detemrine the cluster wide cap. - 3. fcfsCurrentCapValue is updated with the determined cluster wide cap. - - Cluster wide capping is currently performed at regular intervals of time. - */ - offerTaken := false - - for i := 0; i < len(s.tasks); i++ { - task := s.tasks[i] - // Don't take offer if it doesn't match our task's host requirement - if offerUtils.HostMismatch(*offer.Hostname, task.Host) { - continue - } - - // Does the task fit. - if s.takeOffer(offer, task) { - // Capping the cluster if haven't yet started, - if !s.isCapping { - fcfsMutex.Lock() - s.isCapping = true - fcfsMutex.Unlock() - s.startCapping() - } - offerTaken = true - tempCap, err := s.capper.FCFSDeterminedCap(s.totalPower, &task) - - if err == nil { - fcfsMutex.Lock() - fcfsCurrentCapValue = tempCap - fcfsMutex.Unlock() - } else { - log.Println("Failed to determine new cluster wide cap: ") - log.Println(err) - } - log.Printf("Starting on [%s]\n", offer.GetHostname()) - taskToSchedule := s.newTask(offer, task) - toSchedule := []*mesos.TaskInfo{taskToSchedule} - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, toSchedule, mesosUtils.DefaultFilter) - log.Printf("Inst: %d", *task.Instances) - s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) - *task.Instances-- - if *task.Instances <= 0 { - // All instances of the task have been scheduled. Need to remove it from the list of tasks to schedule. - s.tasks[i] = s.tasks[len(s.tasks)-1] - s.tasks = s.tasks[:len(s.tasks)-1] - - if len(s.tasks) <= 0 { - log.Println("Done scheduling all tasks") - // Need to stop the cluster wide capping as there aren't any more tasks to schedule. - s.stopCapping() - s.startRecapping() // Load changes after every task finishes and hence we need to change the capping of the cluster. - close(s.Shutdown) - } - } - break // Offer taken, move on. - } else { - // Task doesn't fit the offer. Move onto the next offer. - } - } - - // If no task fit the offer, then declining the offer. - if !offerTaken { - log.Printf("There is not enough resources to launch a task on Host: %s\n", offer.GetHostname()) - cpus, mem, watts := offerUtils.OfferAgg(offer) - - log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) - } - } -} - -func (s *FirstFitProacCC) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { - log.Printf("Received task status [%s] for task [%s]\n", NameFor(status.State), *status.TaskId.Value) - - if *status.State == mesos.TaskState_TASK_RUNNING { - fcfsMutex.Lock() - s.tasksRunning++ - fcfsMutex.Unlock() - } else if IsTerminal(status.State) { - delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) - // Need to remove the task from the window of tasks. - s.capper.TaskFinished(*status.TaskId.Value) - // Determining the new cluster wide cap. - //tempCap, err := s.capper.NaiveRecap(s.totalPower, s.taskMonitor, *status.TaskId.Value) - tempCap, err := s.capper.CleverRecap(s.totalPower, s.taskMonitor, *status.TaskId.Value) - if err == nil { - // if new determined cap value is different from the current recap value then we need to recap. - if int(math.Floor(tempCap+0.5)) != int(math.Floor(fcfsRecapValue+0.5)) { - fcfsRecapValue = tempCap - fcfsMutex.Lock() - s.isRecapping = true - fcfsMutex.Unlock() - log.Printf("Determined re-cap value: %f\n", fcfsRecapValue) - } else { - fcfsMutex.Lock() - s.isRecapping = false - fcfsMutex.Unlock() - } - } else { - // Not updating fcfsCurrentCapValue - log.Println(err) - } - - fcfsMutex.Lock() - s.tasksRunning-- - fcfsMutex.Unlock() - if s.tasksRunning == 0 { - select { - case <-s.Shutdown: - // Need to stop the recapping process. - s.stopRecapping() - close(s.Done) - default: - } - } - } - log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) -} diff --git a/schedulers/firstfitSortedOffers.go b/schedulers/firstfitSortedOffers.go deleted file mode 100644 index d3fdb5f..0000000 --- a/schedulers/firstfitSortedOffers.go +++ /dev/null @@ -1,219 +0,0 @@ -package schedulers - -import ( - "bitbucket.org/sunybingcloud/electron/def" - "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" - "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" - "fmt" - "github.com/golang/protobuf/proto" - mesos "github.com/mesos/mesos-go/mesosproto" - "github.com/mesos/mesos-go/mesosutil" - sched "github.com/mesos/mesos-go/scheduler" - "log" - "os" - "sort" - "time" -) - -// Decides if to take an offer or not -func (s *FirstFitSortedOffers) takeOffer(offer *mesos.Offer, task def.Task) bool { - - cpus, mem, watts := offerUtils.OfferAgg(offer) - - //TODO: Insert watts calculation here instead of taking them as a parameter - - wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer) - if err != nil { - // Error in determining wattsConsideration - log.Fatal(err) - } - if cpus >= task.CPU && mem >= task.RAM && (!s.wattsAsAResource || watts >= wattsConsideration) { - return true - } - - return false -} - -// electronScheduler implements the Scheduler interface -type FirstFitSortedOffers struct { - base // Type embedded to inherit common functions -} - -// New electron scheduler -func NewFirstFitSortedOffers(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *FirstFitSortedOffers { - - logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") - if err != nil { - log.Fatal(err) - } - - s := &FirstFitSortedOffers{ - base: base{ - tasks: tasks, - wattsAsAResource: wattsAsAResource, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - RecordPCP: false, - schedTrace: log.New(logFile, "", log.LstdFlags), - }, - } - return s -} - -func (s *FirstFitSortedOffers) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { - taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) - s.tasksCreated++ - - if !s.RecordPCP { - // Turn on logging - s.RecordPCP = true - time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts - } - - // If this is our first time running into this Agent - if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok { - s.running[offer.GetSlaveId().GoString()] = make(map[string]bool) - } - - // Add task to list of tasks running on node - s.running[offer.GetSlaveId().GoString()][taskName] = true - - resources := []*mesos.Resource{ - mesosutil.NewScalarResource("cpus", task.CPU), - mesosutil.NewScalarResource("mem", task.RAM), - } - - if s.wattsAsAResource { - if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil { - log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider) - resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider)) - } else { - // Error in determining wattsConsideration - log.Fatal(err) - } - } - - return &mesos.TaskInfo{ - Name: proto.String(taskName), - TaskId: &mesos.TaskID{ - Value: proto.String("electron-" + taskName), - }, - SlaveId: offer.SlaveId, - Resources: resources, - Command: &mesos.CommandInfo{ - Value: proto.String(task.CMD), - }, - Container: &mesos.ContainerInfo{ - Type: mesos.ContainerInfo_DOCKER.Enum(), - Docker: &mesos.ContainerInfo_DockerInfo{ - Image: proto.String(task.Image), - Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated - }, - }, - } -} - -func (s *FirstFitSortedOffers) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { - log.Printf("Received %d resource offers", len(offers)) - - // Sorting the offers - sort.Sort(offerUtils.OffersSorter(offers)) - - // Printing the sorted offers and the corresponding CPU resource availability - log.Println("Sorted Offers:") - for i := 0; i < len(offers); i++ { - offer := offers[i] - offerUtils.UpdateEnvironment(offer) - offerCPU, _, _ := offerUtils.OfferAgg(offer) - log.Printf("Offer[%s].CPU = %f\n", offer.GetHostname(), offerCPU) - } - - for _, offer := range offers { - select { - case <-s.Shutdown: - log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) - - log.Println("Number of tasks still running: ", s.tasksRunning) - continue - default: - } - - tasks := []*mesos.TaskInfo{} - - // First fit strategy - - offerTaken := false - for i := 0; i < len(s.tasks); i++ { - task := s.tasks[i] - - // Don't take offer if it doesn't match our task's host requirement - if offerUtils.HostMismatch(*offer.Hostname, task.Host) { - continue - } - - // Decision to take the offer or not - if s.takeOffer(offer, task) { - - log.Println("Co-Located with: ") - coLocated(s.running[offer.GetSlaveId().GoString()]) - - taskToSchedule := s.newTask(offer, task) - tasks = append(tasks, taskToSchedule) - - log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) - - offerTaken = true - - fmt.Println("Inst: ", *task.Instances) - s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) - *task.Instances-- - - if *task.Instances <= 0 { - // All instances of task have been scheduled, remove it - s.tasks[i] = s.tasks[len(s.tasks)-1] - s.tasks = s.tasks[:len(s.tasks)-1] - - if len(s.tasks) <= 0 { - log.Println("Done scheduling all tasks") - close(s.Shutdown) - } - } - break // Offer taken, move on - } - } - - // If there was no match for the task - if !offerTaken { - fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := offerUtils.OfferAgg(offer) - - log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) - } - - } -} - -func (s *FirstFitSortedOffers) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { - log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) - - if *status.State == mesos.TaskState_TASK_RUNNING { - s.tasksRunning++ - } else if IsTerminal(status.State) { - delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) - s.tasksRunning-- - if s.tasksRunning == 0 { - select { - case <-s.Shutdown: - close(s.Done) - default: - } - } - } - log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) -} diff --git a/schedulers/firstfitSortedWattsProacCC.go b/schedulers/firstfitSortedWattsProacCC.go deleted file mode 100644 index 2610084..0000000 --- a/schedulers/firstfitSortedWattsProacCC.go +++ /dev/null @@ -1,398 +0,0 @@ -/* -Ranked based cluster wide capping. - -Note: Sorting the tasks right in the beginning, in ascending order of watts. - You are hence certain that the tasks that didn't fit are the ones that require more resources, - and hence, you can find a way to address that issue. - On the other hand, if you use first fit to fit the tasks and then sort them to determine the cap, - you are never certain as which tasks are the ones that don't fit and hence, it becomes much harder - to address this issue. -*/ -package schedulers - -import ( - "bitbucket.org/sunybingcloud/electron/constants" - "bitbucket.org/sunybingcloud/electron/def" - powCap "bitbucket.org/sunybingcloud/electron/powerCapping" - "bitbucket.org/sunybingcloud/electron/rapl" - "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" - "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" - "fmt" - "github.com/golang/protobuf/proto" - mesos "github.com/mesos/mesos-go/mesosproto" - "github.com/mesos/mesos-go/mesosutil" - sched "github.com/mesos/mesos-go/scheduler" - "log" - "math" - "os" - "sort" - "sync" - "time" -) - -// Decides if to taken an offer or not -func (s *FirstFitSortedWattsProacCC) takeOffer(offer *mesos.Offer, task def.Task) bool { - offer_cpu, offer_mem, offer_watts := offerUtils.OfferAgg(offer) - - wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer) - if err != nil { - // Error in determining wattsToConsider - log.Fatal(err) - } - if offer_cpu >= task.CPU && offer_mem >= task.RAM && (!s.wattsAsAResource || offer_watts >= wattsConsideration) { - return true - } - return false -} - -// electronScheduler implements the Scheduler interface -type FirstFitSortedWattsProacCC struct { - base // Type embedded to inherit common functions - taskMonitor map[string][]def.Task // store tasks that are currently running. - availablePower map[string]float64 // available power for each node in the cluster. - totalPower map[string]float64 // total power for each node in the cluster. - capper *powCap.ClusterwideCapper - ticker *time.Ticker - recapTicker *time.Ticker - isCapping bool // indicate whether we are currently performing cluster wide capping. - isRecapping bool // indicate whether we are currently performing cluster wide re-capping. -} - -// New electron scheduler. -func NewFirstFitSortedWattsProacCC(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, - classMapWatts bool) *FirstFitSortedWattsProacCC { - - // Sorting tasks in ascending order of watts - sort.Sort(def.WattsSorter(tasks)) - - logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") - if err != nil { - log.Fatal(err) - } - - s := &FirstFitSortedWattsProacCC{ - base: base{ - tasks: tasks, - wattsAsAResource: wattsAsAResource, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - RecordPCP: false, - schedTrace: log.New(logFile, "", log.LstdFlags), - }, - taskMonitor: make(map[string][]def.Task), - availablePower: make(map[string]float64), - totalPower: make(map[string]float64), - capper: powCap.GetClusterwideCapperInstance(), - ticker: time.NewTicker(10 * time.Second), - recapTicker: time.NewTicker(20 * time.Second), - isCapping: false, - isRecapping: false, - } - return s -} - -// mutex -var rankedMutex sync.Mutex - -func (s *FirstFitSortedWattsProacCC) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { - taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) - s.tasksCreated++ - - if !s.RecordPCP { - // Turn on logging. - s.RecordPCP = true - time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts - } - - // If this is our first time running into this Agent - if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok { - s.running[offer.GetSlaveId().GoString()] = make(map[string]bool) - } - - // Setting the task ID to the task. This is done so that we can consider each task to be different, - // even though they have the same parameters. - task.SetTaskID(*proto.String("electron-" + taskName)) - // Add task to the list of tasks running on the node. - s.running[offer.GetSlaveId().GoString()][taskName] = true - if len(s.taskMonitor[*offer.Hostname]) == 0 { - s.taskMonitor[*offer.Hostname] = []def.Task{task} - } else { - s.taskMonitor[*offer.Hostname] = append(s.taskMonitor[*offer.Hostname], task) - } - - resources := []*mesos.Resource{ - mesosutil.NewScalarResource("cpus", task.CPU), - mesosutil.NewScalarResource("mem", task.RAM), - } - - if s.wattsAsAResource { - if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil { - resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider)) - } else { - // Error in determining wattsToConsider - log.Fatal(err) - } - } - - return &mesos.TaskInfo{ - Name: proto.String(taskName), - TaskId: &mesos.TaskID{ - Value: proto.String("electron-" + taskName), - }, - SlaveId: offer.SlaveId, - Resources: resources, - Command: &mesos.CommandInfo{ - Value: proto.String(task.CMD), - }, - Container: &mesos.ContainerInfo{ - Type: mesos.ContainerInfo_DOCKER.Enum(), - Docker: &mesos.ContainerInfo_DockerInfo{ - Image: proto.String(task.Image), - Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated - }, - }, - } -} - -func (s *FirstFitSortedWattsProacCC) Disconnected(sched.SchedulerDriver) { - // Need to stop the capping process. - s.ticker.Stop() - s.recapTicker.Stop() - rankedMutex.Lock() - s.isCapping = false - rankedMutex.Unlock() - log.Println("Framework disconnected with master") -} - -// go routine to cap the entire cluster in regular intervals of time. -var rankedCurrentCapValue = 0.0 // initial value to indicate that we haven't capped the cluster yet. -func (s *FirstFitSortedWattsProacCC) startCapping() { - go func() { - for { - select { - case <-s.ticker.C: - // Need to cap the cluster to the rankedCurrentCapValue. - rankedMutex.Lock() - if rankedCurrentCapValue > 0.0 { - for host, _ := range constants.Hosts { - // Rounding currentCapValue to the nearest int. - if err := rapl.Cap(host, "rapl", float64(int(math.Floor(rankedCurrentCapValue+0.5)))); err != nil { - log.Println(err) - } - } - log.Printf("Capped the cluster to %d", int(math.Floor(rankedCurrentCapValue+0.5))) - } - rankedMutex.Unlock() - } - } - }() -} - -// go routine to cap the entire cluster in regular intervals of time. -var rankedRecapValue = 0.0 // The cluster wide cap value when recapping. -func (s *FirstFitSortedWattsProacCC) startRecapping() { - go func() { - for { - select { - case <-s.recapTicker.C: - rankedMutex.Lock() - // If stopped performing cluster wide capping then we need to explicitly cap the entire cluster. - if s.isRecapping && rankedRecapValue > 0.0 { - for host, _ := range constants.Hosts { - // Rounding currentCapValue to the nearest int. - if err := rapl.Cap(host, "rapl", float64(int(math.Floor(rankedRecapValue+0.5)))); err != nil { - log.Println(err) - } - } - log.Printf("Recapped the cluster to %d", int(math.Floor(rankedRecapValue+0.5))) - } - // setting recapping to false - s.isRecapping = false - rankedMutex.Unlock() - } - } - }() -} - -// Stop cluster wide capping -func (s *FirstFitSortedWattsProacCC) stopCapping() { - if s.isCapping { - log.Println("Stopping the cluster wide capping.") - s.ticker.Stop() - fcfsMutex.Lock() - s.isCapping = false - s.isRecapping = true - fcfsMutex.Unlock() - } -} - -// Stop cluster wide Recapping -func (s *FirstFitSortedWattsProacCC) stopRecapping() { - // If not capping, then definitely recapping. - if !s.isCapping && s.isRecapping { - log.Println("Stopping the cluster wide re-capping.") - s.recapTicker.Stop() - fcfsMutex.Lock() - s.isRecapping = false - fcfsMutex.Unlock() - } -} - -func (s *FirstFitSortedWattsProacCC) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { - log.Printf("Received %d resource offers", len(offers)) - - // retrieving the available power for all the hosts in the offers. - for _, offer := range offers { - offerUtils.UpdateEnvironment(offer) - _, _, offer_watts := offerUtils.OfferAgg(offer) - s.availablePower[*offer.Hostname] = offer_watts - // setting total power if the first time. - if _, ok := s.totalPower[*offer.Hostname]; !ok { - s.totalPower[*offer.Hostname] = offer_watts - } - } - - for host, tpower := range s.totalPower { - log.Printf("TotalPower[%s] = %f", host, tpower) - } - - for _, offer := range offers { - select { - case <-s.Shutdown: - log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) - - log.Println("Number of tasks still running: ", s.tasksRunning) - continue - default: - } - - /* - Ranked cluster wide capping strategy - - For each task in the sorted tasks, - 1. Need to check whether the offer can be taken or not (based on CPU, RAM and WATTS requirements). - 2. If the task fits the offer, then need to determine the cluster wide cap.' - 3. rankedCurrentCapValue is updated with the determined cluster wide cap. - - Once we are done scheduling all the tasks, - we start recalculating the cluster wide cap each time a task finishes. - - Cluster wide capping is currently performed at regular intervals of time. - */ - offerTaken := false - - for i := 0; i < len(s.tasks); i++ { - task := s.tasks[i] - // Don't take offer if it doesn't match our task's host requirement - if offerUtils.HostMismatch(*offer.Hostname, task.Host) { - continue - } - - // Does the task fit. - if s.takeOffer(offer, task) { - // Capping the cluster if haven't yet started - if !s.isCapping { - rankedMutex.Lock() - s.isCapping = true - rankedMutex.Unlock() - s.startCapping() - } - offerTaken = true - tempCap, err := s.capper.FCFSDeterminedCap(s.totalPower, &task) - - if err == nil { - rankedMutex.Lock() - rankedCurrentCapValue = tempCap - rankedMutex.Unlock() - } else { - log.Println("Failed to determine the new cluster wide cap: ", err) - } - log.Printf("Starting on [%s]\n", offer.GetHostname()) - taskToSchedule := s.newTask(offer, task) - to_schedule := []*mesos.TaskInfo{taskToSchedule} - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, to_schedule, mesosUtils.DefaultFilter) - log.Printf("Inst: %d", *task.Instances) - s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) - *task.Instances-- - if *task.Instances <= 0 { - // All instances of the task have been scheduled. Need to remove it from the list of tasks to schedule. - s.tasks[i] = s.tasks[len(s.tasks)-1] - s.tasks = s.tasks[:len(s.tasks)-1] - - if len(s.tasks) <= 0 { - log.Println("Done scheduling all tasks") - // Need to stop the cluster wide capping as there aren't any more tasks to schedule. - s.stopCapping() - s.startRecapping() - close(s.Shutdown) - } - } - break // Offer taken, move on. - } else { - // Task doesn't fit the offer. Move onto the next offer. - } - } - - // If no tasks fit the offer, then declining the offer. - if !offerTaken { - log.Printf("There is not enough resources to launch a task on Host: %s\n", offer.GetHostname()) - cpus, mem, watts := offerUtils.OfferAgg(offer) - - log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) - } - } -} - -func (s *FirstFitSortedWattsProacCC) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { - log.Printf("Received task status [%s] for task [%s]\n", NameFor(status.State), *status.TaskId.Value) - - if *status.State == mesos.TaskState_TASK_RUNNING { - rankedMutex.Lock() - s.tasksRunning++ - rankedMutex.Unlock() - } else if IsTerminal(status.State) { - delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) - rankedMutex.Lock() - s.tasksRunning-- - rankedMutex.Unlock() - if s.tasksRunning == 0 { - select { - case <-s.Shutdown: - // Need to stop the recapping process. - s.stopRecapping() - close(s.Done) - default: - } - } else { - // Need to remove the task from the window - s.capper.TaskFinished(*status.TaskId.Value) - // Determining the new cluster wide cap. - //tempCap, err := s.capper.NaiveRecap(s.totalPower, s.taskMonitor, *status.TaskId.Value) - tempCap, err := s.capper.CleverRecap(s.totalPower, s.taskMonitor, *status.TaskId.Value) - - if err == nil { - // If new determined cap value is different from the current recap value then we need to recap. - if int(math.Floor(tempCap+0.5)) != int(math.Floor(rankedRecapValue+0.5)) { - rankedRecapValue = tempCap - rankedMutex.Lock() - s.isRecapping = true - rankedMutex.Unlock() - log.Printf("Determined re-cap value: %f\n", rankedRecapValue) - } else { - rankedMutex.Lock() - s.isRecapping = false - rankedMutex.Unlock() - } - } else { - // Not updating rankedCurrentCapValue - log.Println(err) - } - } - } - log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) -} diff --git a/schedulers/firstfitSortedWattsSortedOffers.go b/schedulers/firstfitSortedWattsSortedOffers.go deleted file mode 100644 index b8b1eef..0000000 --- a/schedulers/firstfitSortedWattsSortedOffers.go +++ /dev/null @@ -1,222 +0,0 @@ -package schedulers - -import ( - "bitbucket.org/sunybingcloud/electron/def" - "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" - "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" - "fmt" - "github.com/golang/protobuf/proto" - mesos "github.com/mesos/mesos-go/mesosproto" - "github.com/mesos/mesos-go/mesosutil" - sched "github.com/mesos/mesos-go/scheduler" - "log" - "os" - "sort" - "time" -) - -// Decides if to take an offer or not -func (s *FirstFitSortedWattsSortedOffers) takeOffer(offer *mesos.Offer, task def.Task) bool { - - cpus, mem, watts := offerUtils.OfferAgg(offer) - - //TODO: Insert watts calculation here instead of taking them as a parameter - - wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer) - if err != nil { - // Error in determining wattsConsideration - log.Fatal(err) - } - if cpus >= task.CPU && mem >= task.RAM && (!s.wattsAsAResource || watts >= wattsConsideration) { - return true - } - - return false -} - -// electronScheduler implements the Scheduler interface -type FirstFitSortedWattsSortedOffers struct { - base // Type embedded to inherit common functions -} - -// New electron scheduler -func NewFirstFitSortedWattsSortedOffers(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, - classMapWatts bool) *FirstFitSortedWattsSortedOffers { - - // Sorting the tasks in increasing order of watts requirement. - sort.Sort(def.WattsSorter(tasks)) - - logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") - if err != nil { - log.Fatal(err) - } - - s := &FirstFitSortedWattsSortedOffers{ - base: base{ - tasks: tasks, - wattsAsAResource: wattsAsAResource, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - RecordPCP: false, - schedTrace: log.New(logFile, "", log.LstdFlags), - }, - } - return s -} - -func (s *FirstFitSortedWattsSortedOffers) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { - taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) - s.tasksCreated++ - - if !s.RecordPCP { - // Turn on logging - s.RecordPCP = true - time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts - } - - // If this is our first time running into this Agent - if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok { - s.running[offer.GetSlaveId().GoString()] = make(map[string]bool) - } - - // Add task to list of tasks running on node - s.running[offer.GetSlaveId().GoString()][taskName] = true - - resources := []*mesos.Resource{ - mesosutil.NewScalarResource("cpus", task.CPU), - mesosutil.NewScalarResource("mem", task.RAM), - } - - if s.wattsAsAResource { - if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil { - log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider) - resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider)) - } else { - // Error in determining wattsConsideration - log.Fatal(err) - } - } - - return &mesos.TaskInfo{ - Name: proto.String(taskName), - TaskId: &mesos.TaskID{ - Value: proto.String("electron-" + taskName), - }, - SlaveId: offer.SlaveId, - Resources: resources, - Command: &mesos.CommandInfo{ - Value: proto.String(task.CMD), - }, - Container: &mesos.ContainerInfo{ - Type: mesos.ContainerInfo_DOCKER.Enum(), - Docker: &mesos.ContainerInfo_DockerInfo{ - Image: proto.String(task.Image), - Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated - }, - }, - } -} - -func (s *FirstFitSortedWattsSortedOffers) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { - // Sorting the offers - sort.Sort(offerUtils.OffersSorter(offers)) - - // Printing the sorted offers and the corresponding CPU resource availability - log.Println("Sorted Offers:") - for i := 0; i < len(offers); i++ { - offer := offers[i] - offerUtils.UpdateEnvironment(offer) - offerCPU, _, _ := offerUtils.OfferAgg(offer) - log.Printf("Offer[%s].CPU = %f\n", offer.GetHostname(), offerCPU) - } - - log.Printf("Received %d resource offers", len(offers)) - - for _, offer := range offers { - select { - case <-s.Shutdown: - log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) - - log.Println("Number of tasks still running: ", s.tasksRunning) - continue - default: - } - - tasks := []*mesos.TaskInfo{} - - // First fit strategy - - offerTaken := false - for i := 0; i < len(s.tasks); i++ { - task := s.tasks[i] - - // Don't take offer if it doesn't match our task's host requirement - if offerUtils.HostMismatch(*offer.Hostname, task.Host) { - continue - } - - // Decision to take the offer or not - if s.takeOffer(offer, task) { - - log.Println("Co-Located with: ") - coLocated(s.running[offer.GetSlaveId().GoString()]) - - taskToSchedule := s.newTask(offer, task) - tasks = append(tasks, taskToSchedule) - - log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) - - offerTaken = true - - fmt.Println("Inst: ", *task.Instances) - s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) - *task.Instances-- - - if *task.Instances <= 0 { - // All instances of task have been scheduled, remove it - s.tasks = append(s.tasks[:i], s.tasks[i+1:]...) - - if len(s.tasks) <= 0 { - log.Println("Done scheduling all tasks") - close(s.Shutdown) - } - } - break // Offer taken, move on - } - } - - // If there was no match for the task - if !offerTaken { - fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := offerUtils.OfferAgg(offer) - - log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) - } - - } -} - -func (s *FirstFitSortedWattsSortedOffers) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { - log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) - - if *status.State == mesos.TaskState_TASK_RUNNING { - s.tasksRunning++ - } else if IsTerminal(status.State) { - delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) - s.tasksRunning-- - if s.tasksRunning == 0 { - select { - case <-s.Shutdown: - close(s.Done) - default: - } - } - } - log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) -} diff --git a/schedulers/firstfitsortedwatts.go b/schedulers/firstfitsortedwatts.go deleted file mode 100644 index 1f71411..0000000 --- a/schedulers/firstfitsortedwatts.go +++ /dev/null @@ -1,209 +0,0 @@ -package schedulers - -import ( - "bitbucket.org/sunybingcloud/electron/def" - "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" - "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" - "fmt" - "github.com/golang/protobuf/proto" - mesos "github.com/mesos/mesos-go/mesosproto" - "github.com/mesos/mesos-go/mesosutil" - sched "github.com/mesos/mesos-go/scheduler" - "log" - "os" - "sort" - "time" -) - -// Decides if to take an offer or not -func (s *FirstFitSortedWatts) takeOffer(offer *mesos.Offer, task def.Task) bool { - - cpus, mem, watts := offerUtils.OfferAgg(offer) - - //TODO: Insert watts calculation here instead of taking them as a parameter - - wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer) - if err != nil { - // Error in determining wattsConsideration - log.Fatal(err) - } - if cpus >= task.CPU && mem >= task.RAM && (!s.wattsAsAResource || watts >= wattsConsideration) { - return true - } - - return false -} - -// electronScheduler implements the Scheduler interface -type FirstFitSortedWatts struct { - base // Type embedded to inherit common functions -} - -// New electron scheduler -func NewFirstFitSortedWatts(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *FirstFitSortedWatts { - - sort.Sort(def.WattsSorter(tasks)) - - logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") - if err != nil { - log.Fatal(err) - } - - s := &FirstFitSortedWatts{ - base: base{ - tasks: tasks, - wattsAsAResource: wattsAsAResource, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - RecordPCP: false, - schedTrace: log.New(logFile, "", log.LstdFlags), - }, - } - return s -} - -func (s *FirstFitSortedWatts) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { - taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) - s.tasksCreated++ - - if !s.RecordPCP { - // Turn on logging - s.RecordPCP = true - time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts - } - - // If this is our first time running into this Agent - if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok { - s.running[offer.GetSlaveId().GoString()] = make(map[string]bool) - } - - // Add task to list of tasks running on node - s.running[offer.GetSlaveId().GoString()][taskName] = true - - resources := []*mesos.Resource{ - mesosutil.NewScalarResource("cpus", task.CPU), - mesosutil.NewScalarResource("mem", task.RAM), - } - - if s.wattsAsAResource { - if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil { - log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider) - resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider)) - } else { - // Error in determining wattsConsideration - log.Fatal(err) - } - } - - return &mesos.TaskInfo{ - Name: proto.String(taskName), - TaskId: &mesos.TaskID{ - Value: proto.String("electron-" + taskName), - }, - SlaveId: offer.SlaveId, - Resources: resources, - Command: &mesos.CommandInfo{ - Value: proto.String(task.CMD), - }, - Container: &mesos.ContainerInfo{ - Type: mesos.ContainerInfo_DOCKER.Enum(), - Docker: &mesos.ContainerInfo_DockerInfo{ - Image: proto.String(task.Image), - Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated - }, - }, - } -} - -func (s *FirstFitSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { - log.Printf("Received %d resource offers", len(offers)) - - for _, offer := range offers { - offerUtils.UpdateEnvironment(offer) - select { - case <-s.Shutdown: - log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) - - log.Println("Number of tasks still running: ", s.tasksRunning) - continue - default: - } - - tasks := []*mesos.TaskInfo{} - - // First fit strategy - - offerTaken := false - for i := 0; i < len(s.tasks); i++ { - task := s.tasks[i] - - // Don't take offer if it doesn't match our task's host requirement - if offerUtils.HostMismatch(*offer.Hostname, task.Host) { - continue - } - - // Decision to take the offer or not - if s.takeOffer(offer, task) { - - log.Println("Co-Located with: ") - coLocated(s.running[offer.GetSlaveId().GoString()]) - - taskToSchedule := s.newTask(offer, task) - tasks = append(tasks, taskToSchedule) - - log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) - - offerTaken = true - - fmt.Println("Inst: ", *task.Instances) - s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) - *task.Instances-- - - if *task.Instances <= 0 { - // All instances of task have been scheduled, remove it - s.tasks = append(s.tasks[:i], s.tasks[i+1:]...) - - if len(s.tasks) <= 0 { - log.Println("Done scheduling all tasks") - close(s.Shutdown) - } - } - break // Offer taken, move on - } - } - - // If there was no match for the task - if !offerTaken { - fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := offerUtils.OfferAgg(offer) - - log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) - } - - } -} - -func (s *FirstFitSortedWatts) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { - log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) - - if *status.State == mesos.TaskState_TASK_RUNNING { - s.tasksRunning++ - } else if IsTerminal(status.State) { - delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) - s.tasksRunning-- - if s.tasksRunning == 0 { - select { - case <-s.Shutdown: - close(s.Done) - default: - } - } - } - log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) -} diff --git a/schedulers/firstfitwattsonly.go b/schedulers/firstfitwattsonly.go deleted file mode 100644 index d2b13b6..0000000 --- a/schedulers/firstfitwattsonly.go +++ /dev/null @@ -1,200 +0,0 @@ -package schedulers - -import ( - "bitbucket.org/sunybingcloud/electron/def" - "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" - "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" - "fmt" - "github.com/golang/protobuf/proto" - mesos "github.com/mesos/mesos-go/mesosproto" - "github.com/mesos/mesos-go/mesosutil" - sched "github.com/mesos/mesos-go/scheduler" - "log" - "os" - "time" -) - -// Decides if to take an offer or not -func (s *FirstFitWattsOnly) takeOffer(offer *mesos.Offer, task def.Task) bool { - - _, _, watts := offerUtils.OfferAgg(offer) - - //TODO: Insert watts calculation here instead of taking them as a parameter - - wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer) - if err != nil { - // Error in determining wattsConsideration - log.Fatal(err) - } - if watts >= wattsConsideration { - return true - } - - return false -} - -type FirstFitWattsOnly struct { - base // Type embedded to inherit common functions -} - -// New electron scheduler -func NewFirstFitWattsOnly(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *FirstFitWattsOnly { - - logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") - if err != nil { - log.Fatal(err) - } - - s := &FirstFitWattsOnly{ - base: base{ - tasks: tasks, - wattsAsAResource: wattsAsAResource, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - RecordPCP: false, - schedTrace: log.New(logFile, "", log.LstdFlags), - }, - } - return s -} - -func (s *FirstFitWattsOnly) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { - taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) - s.tasksCreated++ - - if !s.RecordPCP { - // Turn on logging - s.RecordPCP = true - time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts - } - - // If this is our first time running into this Agent - if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok { - s.running[offer.GetSlaveId().GoString()] = make(map[string]bool) - } - - // Add task to list of tasks running on node - s.running[offer.GetSlaveId().GoString()][taskName] = true - - wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer) - if err != nil { - // Error in determining wattsConsideration - log.Fatal(err) - } - resources := []*mesos.Resource{ - mesosutil.NewScalarResource("watts", wattsConsideration), - } - - return &mesos.TaskInfo{ - Name: proto.String(taskName), - TaskId: &mesos.TaskID{ - Value: proto.String("electron-" + taskName), - }, - SlaveId: offer.SlaveId, - Resources: resources, - Command: &mesos.CommandInfo{ - Value: proto.String(task.CMD), - }, - Container: &mesos.ContainerInfo{ - Type: mesos.ContainerInfo_DOCKER.Enum(), - Docker: &mesos.ContainerInfo_DockerInfo{ - Image: proto.String(task.Image), - Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated - }, - }, - } -} - -func (s *FirstFitWattsOnly) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { - log.Printf("Received %d resource offers", len(offers)) - - for _, offer := range offers { - offerUtils.UpdateEnvironment(offer) - select { - case <-s.Shutdown: - log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) - - log.Println("Number of tasks still running: ", s.tasksRunning) - continue - default: - } - - tasks := []*mesos.TaskInfo{} - - // First fit strategy - - offerTaken := false - for i := 0; i < len(s.tasks); i++ { - task := s.tasks[i] - - // Don't take offer if it doesn't match our task's host requirement - if offerUtils.HostMismatch(*offer.Hostname, task.Host) { - continue - } - - // Decision to take the offer or not - if s.takeOffer(offer, task) { - - log.Println("Co-Located with: ") - coLocated(s.running[offer.GetSlaveId().GoString()]) - - taskToSchedule := s.newTask(offer, task) - tasks = append(tasks, taskToSchedule) - - log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) - - offerTaken = true - - fmt.Println("Inst: ", *task.Instances) - s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) - *task.Instances-- - - if *task.Instances <= 0 { - // All instances of task have been scheduled, remove it - s.tasks[i] = s.tasks[len(s.tasks)-1] - s.tasks = s.tasks[:len(s.tasks)-1] - - if len(s.tasks) <= 0 { - log.Println("Done scheduling all tasks") - close(s.Shutdown) - } - } - break // Offer taken, move on - } - } - - // If there was no match for the task - if !offerTaken { - fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := offerUtils.OfferAgg(offer) - - log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) - } - - } -} - -func (s *FirstFitWattsOnly) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { - log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) - - if *status.State == mesos.TaskState_TASK_RUNNING { - s.tasksRunning++ - } else if IsTerminal(status.State) { - delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) - s.tasksRunning-- - if s.tasksRunning == 0 { - select { - case <-s.Shutdown: - close(s.Done) - default: - } - } - } - log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) -} diff --git a/schedulers/bpswMaxMin.go b/schedulers/max-greedymins.go similarity index 100% rename from schedulers/bpswMaxMin.go rename to schedulers/max-greedymins.go diff --git a/schedulers/topHeavy.go b/schedulers/topHeavy.go deleted file mode 100644 index 0f7f405..0000000 --- a/schedulers/topHeavy.go +++ /dev/null @@ -1,341 +0,0 @@ -package schedulers - -import ( - "bitbucket.org/sunybingcloud/electron/constants" - "bitbucket.org/sunybingcloud/electron/def" - "bitbucket.org/sunybingcloud/electron/utilities/mesosUtils" - "bitbucket.org/sunybingcloud/electron/utilities/offerUtils" - "fmt" - "github.com/golang/protobuf/proto" - mesos "github.com/mesos/mesos-go/mesosproto" - "github.com/mesos/mesos-go/mesosutil" - sched "github.com/mesos/mesos-go/scheduler" - "log" - "os" - "sort" - "time" -) - -/* -Tasks are categorized into small and large tasks based on the watts requirement. -All the small tasks are packed into offers from agents belonging to power class C and power class D, using BinPacking. -All the large tasks are spread among the offers from agents belonging to power class A and power class B, using FirstFit. - -This was done to give a little more room for the large tasks (power intensive) for execution and reduce the possibility of -starvation of power intensive tasks. -*/ - -func (s *TopHeavy) takeOfferBinPack(offer *mesos.Offer, totalCPU, totalRAM, totalWatts, - wattsToConsider float64, task def.Task) bool { - offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) - - //TODO: Insert watts calculation here instead of taking them as a parameter - if (!s.wattsAsAResource || (offerWatts >= (totalWatts + wattsToConsider))) && - (offerCPU >= (totalCPU + task.CPU)) && - (offerRAM >= (totalRAM + task.RAM)) { - return true - } - return false -} - -func (s *TopHeavy) takeOfferFirstFit(offer *mesos.Offer, wattsConsideration float64, task def.Task) bool { - offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer) - - //TODO: Insert watts calculation here instead of taking them as a parameter - if (!s.wattsAsAResource || (offerWatts >= wattsConsideration)) && - (offerCPU >= task.CPU) && (offerRAM >= task.RAM) { - return true - } - return false -} - -// electronScheduler implements the Scheduler interface -type TopHeavy struct { - base // Type embedded to inherit common functions - smallTasks, largeTasks []def.Task -} - -// New electron scheduler -func NewTopHeavy(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *TopHeavy { - sort.Sort(def.WattsSorter(tasks)) - - logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log") - if err != nil { - log.Fatal(err) - } - - // Classification done based on MMPU watts requirements, into 2 clusters. - classifiedTasks := def.ClassifyTasks(tasks, 2) - - s := &TopHeavy{ - base: base{ - wattsAsAResource: wattsAsAResource, - classMapWatts: classMapWatts, - Shutdown: make(chan struct{}), - Done: make(chan struct{}), - PCPLog: make(chan struct{}), - running: make(map[string]map[string]bool), - RecordPCP: false, - schedTrace: log.New(logFile, "", log.LstdFlags), - }, - // Separating small tasks from large tasks. - smallTasks: classifiedTasks[0].Tasks, - largeTasks: classifiedTasks[1].Tasks, - } - return s -} - -func (s *TopHeavy) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { - taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances) - s.tasksCreated++ - - if !s.RecordPCP { - // Turn on logging - s.RecordPCP = true - time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts - } - - // If this is our first time running into this Agent - if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok { - s.running[offer.GetSlaveId().GoString()] = make(map[string]bool) - } - - // Add task to list of tasks running on node - s.running[offer.GetSlaveId().GoString()][taskName] = true - - resources := []*mesos.Resource{ - mesosutil.NewScalarResource("cpus", task.CPU), - mesosutil.NewScalarResource("mem", task.RAM), - } - - if s.wattsAsAResource { - if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil { - log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider) - resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider)) - } else { - // Error in determining wattsConsideration - log.Fatal(err) - } - } - - return &mesos.TaskInfo{ - Name: proto.String(taskName), - TaskId: &mesos.TaskID{ - Value: proto.String("electron-" + taskName), - }, - SlaveId: offer.SlaveId, - Resources: resources, - Command: &mesos.CommandInfo{ - Value: proto.String(task.CMD), - }, - Container: &mesos.ContainerInfo{ - Type: mesos.ContainerInfo_DOCKER.Enum(), - Docker: &mesos.ContainerInfo_DockerInfo{ - Image: proto.String(task.Image), - Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated - }, - }, - } -} - -// Shut down scheduler if no more tasks to schedule -func (s *TopHeavy) shutDownIfNecessary() { - if len(s.smallTasks) <= 0 && len(s.largeTasks) <= 0 { - log.Println("Done scheduling all tasks") - close(s.Shutdown) - } -} - -// create TaskInfo and log scheduling trace -func (s *TopHeavy) createTaskInfoAndLogSchedTrace(offer *mesos.Offer, task def.Task) *mesos.TaskInfo { - log.Println("Co-Located with:") - coLocated(s.running[offer.GetSlaveId().GoString()]) - taskToSchedule := s.newTask(offer, task) - - fmt.Println("Inst: ", *task.Instances) - s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue()) - *task.Instances-- - return taskToSchedule -} - -// Using BinPacking to pack small tasks into this offer. -func (s *TopHeavy) pack(offers []*mesos.Offer, driver sched.SchedulerDriver) { - for _, offer := range offers { - select { - case <-s.Shutdown: - log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) - - log.Println("Number of tasks still running: ", s.tasksRunning) - continue - default: - } - - tasks := []*mesos.TaskInfo{} - totalWatts := 0.0 - totalCPU := 0.0 - totalRAM := 0.0 - taken := false - for i := 0; i < len(s.smallTasks); i++ { - task := s.smallTasks[i] - wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer) - if err != nil { - // Error in determining wattsConsideration - log.Fatal(err) - } - - for *task.Instances > 0 { - // Does the task fit - // OR lazy evaluation. If ignore watts is set to true, second statement won't - // be evaluated. - if s.takeOfferBinPack(offer, totalCPU, totalRAM, totalWatts, wattsConsideration, task) { - taken = true - totalWatts += wattsConsideration - totalCPU += task.CPU - totalRAM += task.RAM - tasks = append(tasks, s.createTaskInfoAndLogSchedTrace(offer, task)) - - if *task.Instances <= 0 { - // All instances of task have been scheduled, remove it - s.smallTasks = append(s.smallTasks[:i], s.smallTasks[i+1:]...) - s.shutDownIfNecessary() - } - } else { - break // Continue on to next task - } - } - } - - if taken { - log.Printf("Starting on [%s]\n", offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) - } else { - // If there was no match for the task - fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := offerUtils.OfferAgg(offer) - - log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) - } - } -} - -// Using first fit to spread large tasks into these offers. -func (s *TopHeavy) spread(offers []*mesos.Offer, driver sched.SchedulerDriver) { - for _, offer := range offers { - select { - case <-s.Shutdown: - log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) - - log.Println("Number of tasks still running: ", s.tasksRunning) - continue - default: - } - - tasks := []*mesos.TaskInfo{} - offerTaken := false - for i := 0; i < len(s.largeTasks); i++ { - task := s.largeTasks[i] - wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer) - if err != nil { - // Error in determining wattsConsideration - log.Fatal(err) - } - - // Decision to take the offer or not - if s.takeOfferFirstFit(offer, wattsConsideration, task) { - offerTaken = true - tasks = append(tasks, s.createTaskInfoAndLogSchedTrace(offer, task)) - log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname()) - driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter) - - if *task.Instances <= 0 { - // All instances of task have been scheduled, remove it - s.largeTasks = append(s.largeTasks[:i], s.largeTasks[i+1:]...) - s.shutDownIfNecessary() - } - break // Offer taken, move on - } - } - - if !offerTaken { - // If there was no match for the task - fmt.Println("There is not enough resources to launch a task:") - cpus, mem, watts := offerUtils.OfferAgg(offer) - - log.Printf("\n", cpus, mem, watts) - driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) - } - } -} - -func (s *TopHeavy) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { - log.Printf("Received %d resource offers", len(offers)) - - // We need to separate the offers into - // offers from ClassA and ClassB and offers from ClassC and ClassD. - // Offers from ClassA and ClassB would execute the large tasks. - // Offers from ClassC and ClassD would execute the small tasks. - offersHeavyPowerClasses := []*mesos.Offer{} - offersLightPowerClasses := []*mesos.Offer{} - - for _, offer := range offers { - offerUtils.UpdateEnvironment(offer) - select { - case <-s.Shutdown: - log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") - driver.DeclineOffer(offer.Id, mesosUtils.LongFilter) - - log.Println("Number of tasks still running: ", s.tasksRunning) - continue - default: - } - - if _, ok := constants.PowerClasses["A"][*offer.Hostname]; ok { - offersHeavyPowerClasses = append(offersHeavyPowerClasses, offer) - } - if _, ok := constants.PowerClasses["B"][*offer.Hostname]; ok { - offersHeavyPowerClasses = append(offersHeavyPowerClasses, offer) - } - if _, ok := constants.PowerClasses["C"][*offer.Hostname]; ok { - offersLightPowerClasses = append(offersLightPowerClasses, offer) - } - if _, ok := constants.PowerClasses["D"][*offer.Hostname]; ok { - offersLightPowerClasses = append(offersLightPowerClasses, offer) - } - } - - log.Println("Spreading Large tasks into ClassAB Offers:") - for _, o := range offersHeavyPowerClasses { - log.Println(*o.Hostname) - } - log.Println("Packing Small tasks into ClassCD Offers:") - for _, o := range offersLightPowerClasses { - log.Println(*o.Hostname) - } - - // Packing tasks into offersLightPowerClasses - s.pack(offersLightPowerClasses, driver) - // Spreading tasks among offersHeavyPowerClasses - s.spread(offersHeavyPowerClasses, driver) -} - -func (s *TopHeavy) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { - log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) - - if *status.State == mesos.TaskState_TASK_RUNNING { - s.tasksRunning++ - } else if IsTerminal(status.State) { - delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) - s.tasksRunning-- - if s.tasksRunning == 0 { - select { - case <-s.Shutdown: - close(s.Done) - default: - } - } - } - log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value) -}