refined README and removed unwanted scheduling policies.

This commit is contained in:
Pradyumna Kaushik 2017-09-26 00:05:19 -04:00
parent dc279801b7
commit 1c4b6f0f01
16 changed files with 8 additions and 3896 deletions

View file

@ -1,75 +1,10 @@
Electron: A power budget manager
======================================
Elektron: A Pluggable Mesos framework with power-aware capabilities
===================================================================
To Do:
Elektron is a Mesos framework that behaves as a playground for developers to experiment with different scheduling policies to launch ad-hoc jobs.
Elektron is designed as a lightweight, configurable framework, which can be used in conjunction with built-in power-capping policies to reduce the peak power and/or energy usage of co-scheduled tasks.
* Create metrics for each task launched [Time to schedule, run time, power used]
* Have calibration phase?
* Add ability to use constraints
* Running average calculations https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
* Make parameters corresponding to each scheduler configurable (possible to have a config template for each scheduler?)
* Adding type of scheduler to be used, to be picked from a config file, along with it's configurable parameters.
* Write test code for each scheduler (This should be after the design change)
Possible to setup the constants at runtime based on the environment?
* Log fix for declining offer -- different reason when insufficient resources as compared to when there are no
longer any tasks to schedule.
* Have a centralised logFile that can be filtered by identifier. All electron logs should go into this file.
* Make def.Task an interface for further modularization and flexibility.
* Convert def#WattsToConsider(...) to be a receiver of def.Task and change the name of it to Watts(...).
* **Critical** -- Add software requirements to the README.md (Mesos version, RAPL version, PCP version, Go version...)
* **Critical** -- Retrofit to use Go 1.8 sorting techniques. Use def/taskUtils.go for reference.
* Handle powerclass not configured on a node condition. As of now, an assumption is made that the powerclass is configured
* Refine the sorting algorithm that sorts the clusters of tasks retrieved using the kmeans algorithm. This also involves the reduction in time complexity of the same.
* Use the generic task sorter in def/taskUtils.go to sort the tasks based on CPU or RAM etc. Remove the existing sorters present in def/task.go.
for all the nodes.
**Requires [Performance Co-Pilot](http://pcp.io/) tool pmdumptext to be installed on the
machine on which electron is launched for logging to work and PCP collector agents installed
on the Mesos Agents**
How to run (Use the --help option to get information about other command-line options):
`./electron -workload <workload json>`
To run electron with Watts as Resource, run the following command,
`./electron -workload <workload json> -wattsAsAResource`
Workload schema:
```
[
{
"name": "minife",
"cpu": 3.0,
"ram": 4096,
"watts": 63.141,
"class_to_watts": {
"A": 93.062,
"B": 65.552,
"C": 57.897,
"D": 60.729
},
"image": "rdelvalle/minife:electron1",
"cmd": "cd src && mpirun -np 3 miniFE.x -nx 100 -ny 100 -nz 100",
"inst": 10
},
{
"name": "dgemm",
"cpu": 3.0,
"ram": 32,
"watts": 85.903,
"class_to_watts": {
"A": 114.789,
"B": 89.133,
"C": 82.672,
"D": 81.944
},
"image": "rdelvalle/dgemm:electron1",
"cmd": "/./mt-dgemm 1024",
"inst": 10
}
]
```
#Features
* Pluggable Scheduling policies
* Pluggable Power-Capping strategies
* Cluster resource monitoring

View file

@ -1,229 +0,0 @@
package schedulers
import (
"bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/electron/utilities/offerUtils"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler"
"log"
"os"
"sort"
"time"
)
// Decides if to take an offer or not
func (s *BinPackSortedWattsSortedOffers) takeOffer(offer *mesos.Offer, task def.Task,
totalCPU, totalRAM, totalWatts float64) bool {
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
//TODO: Insert watts calculation here instead of taking them as a parameter
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
if err != nil {
// Error in determining wattsConsideration
log.Fatal(err)
}
if (offerCPU >= (totalCPU + task.CPU)) && (offerRAM >= (totalRAM + task.RAM)) &&
(!s.wattsAsAResource || (offerWatts >= (totalWatts + wattsConsideration))) {
return true
}
return false
}
type BinPackSortedWattsSortedOffers struct {
base // Type embedded to inherit common functions
}
// New electron scheduler
func NewBinPackSortedWattsSortedOffers(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string,
classMapWatts bool) *BinPackSortedWattsSortedOffers {
sort.Sort(def.WattsSorter(tasks))
logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log")
if err != nil {
log.Fatal(err)
}
s := &BinPackSortedWattsSortedOffers{
base: base{
tasks: tasks,
wattsAsAResource: wattsAsAResource,
classMapWatts: classMapWatts,
Shutdown: make(chan struct{}),
Done: make(chan struct{}),
PCPLog: make(chan struct{}),
running: make(map[string]map[string]bool),
RecordPCP: false,
schedTrace: log.New(logFile, "", log.LstdFlags),
},
}
return s
}
func (s *BinPackSortedWattsSortedOffers) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
s.tasksCreated++
if !s.RecordPCP {
// Turn on logging
s.RecordPCP = true
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
}
// If this is our first time running into this Agent
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
}
// Add task to list of tasks running on node
s.running[offer.GetSlaveId().GoString()][taskName] = true
resources := []*mesos.Resource{
mesosutil.NewScalarResource("cpus", task.CPU),
mesosutil.NewScalarResource("mem", task.RAM),
}
if s.wattsAsAResource {
if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil {
log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider)
resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider))
} else {
// Error in determining wattsConsideration
log.Fatal(err)
}
}
return &mesos.TaskInfo{
Name: proto.String(taskName),
TaskId: &mesos.TaskID{
Value: proto.String("electron-" + taskName),
},
SlaveId: offer.SlaveId,
Resources: resources,
Command: &mesos.CommandInfo{
Value: proto.String(task.CMD),
},
Container: &mesos.ContainerInfo{
Type: mesos.ContainerInfo_DOCKER.Enum(),
Docker: &mesos.ContainerInfo_DockerInfo{
Image: proto.String(task.Image),
Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated
},
},
}
}
func (s *BinPackSortedWattsSortedOffers) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {
log.Printf("Received %d resource offers", len(offers))
// Sorting the offers
sort.Sort(offerUtils.OffersSorter(offers))
// Printing the sorted offers and the corresponding CPU resource availability
log.Println("Sorted Offers:")
for i := 0; i < len(offers); i++ {
offer := offers[i]
offerUtils.UpdateEnvironment(offer)
offerCPU, _, _ := offerUtils.OfferAgg(offer)
log.Printf("Offer[%s].CPU = %f\n", offer.GetHostname(), offerCPU)
}
for _, offer := range offers {
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
default:
}
tasks := []*mesos.TaskInfo{}
offerTaken := false
totalWatts := 0.0
totalCPU := 0.0
totalRAM := 0.0
for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i]
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
if err != nil {
// Error in determining wattsConsideration
log.Fatal(err)
}
// Don't take offer if it doesn't match our task's host requirement
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
for *task.Instances > 0 {
// Does the task fit
if s.takeOffer(offer, task, totalCPU, totalRAM, totalWatts) {
offerTaken = true
totalWatts += wattsConsideration
totalCPU += task.CPU
totalRAM += task.RAM
log.Println("Co-Located with: ")
coLocated(s.running[offer.GetSlaveId().GoString()])
taskToSchedule := s.newTask(offer, task)
tasks = append(tasks, taskToSchedule)
fmt.Println("Inst: ", *task.Instances)
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
*task.Instances--
if *task.Instances <= 0 {
// All instances of task have been scheduled, remove it
s.tasks = append(s.tasks[:i], s.tasks[i+1:]...)
if len(s.tasks) <= 0 {
log.Println("Done scheduling all tasks")
close(s.Shutdown)
}
}
} else {
break // Continue on to next offer
}
}
}
if offerTaken {
log.Printf("Starting on [%s]\n", offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
} else {
// If there was no match for the task
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
}
func (s *BinPackSortedWattsSortedOffers) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) {
log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
if *status.State == mesos.TaskState_TASK_RUNNING {
s.tasksRunning++
} else if IsTerminal(status.State) {
delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value)
s.tasksRunning--
if s.tasksRunning == 0 {
select {
case <-s.Shutdown:
close(s.Done)
default:
}
}
}
log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
}

View file

@ -1,397 +0,0 @@
package schedulers
import (
"bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/rapl"
"bitbucket.org/sunybingcloud/electron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/electron/utilities/offerUtils"
"errors"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler"
"log"
"math"
"os"
"sync"
"time"
)
/*
Piston Capper implements the Scheduler interface
This is basically extending the BinPacking algorithm to also cap each node at a different values,
corresponding to the load on that node.
*/
type BinPackedPistonCapper struct {
base // Type embedded to inherit common functions
taskMonitor map[string][]def.Task
totalPower map[string]float64
ticker *time.Ticker
isCapping bool
}
// New electron scheduler.
func NewBinPackedPistonCapper(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string,
classMapWatts bool) *BinPackedPistonCapper {
logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log")
if err != nil {
log.Fatal(err)
}
s := &BinPackedPistonCapper{
base: base{
tasks: tasks,
wattsAsAResource: wattsAsAResource,
classMapWatts: classMapWatts,
Shutdown: make(chan struct{}),
Done: make(chan struct{}),
PCPLog: make(chan struct{}),
running: make(map[string]map[string]bool),
RecordPCP: false,
schedTrace: log.New(logFile, "", log.LstdFlags),
},
taskMonitor: make(map[string][]def.Task),
totalPower: make(map[string]float64),
ticker: time.NewTicker(5 * time.Second),
isCapping: false,
}
return s
}
// check whether task fits the offer or not.
func (s *BinPackedPistonCapper) takeOffer(offer *mesos.Offer, offerWatts float64, offerCPU float64, offerRAM float64,
totalWatts float64, totalCPU float64, totalRAM float64, task def.Task) bool {
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
if err != nil {
// Error in determining wattsToConsider
log.Fatal(err)
}
if (!s.wattsAsAResource || (offerWatts >= (totalWatts + wattsConsideration))) &&
(offerCPU >= (totalCPU + task.CPU)) &&
(offerRAM >= (totalRAM + task.RAM)) {
return true
} else {
return false
}
}
// mutex
var bpPistonMutex sync.Mutex
func (s *BinPackedPistonCapper) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
s.tasksCreated++
if !s.RecordPCP {
// Turn on logging
s.RecordPCP = true
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
}
// If this is our first time running into this Agent
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
}
// Setting the task ID to the task. This is done so that we can consider each task to be different,
// even though they have the same parameters.
task.SetTaskID(*proto.String("electron-" + taskName))
// Add task to list of tasks running on node
s.running[offer.GetSlaveId().GoString()][taskName] = true
// Adding the task to the taskMonitor
if len(s.taskMonitor[*offer.Hostname]) == 0 {
s.taskMonitor[*offer.Hostname] = []def.Task{task}
} else {
s.taskMonitor[*offer.Hostname] = append(s.taskMonitor[*offer.Hostname], task)
}
resources := []*mesos.Resource{
mesosutil.NewScalarResource("cpus", task.CPU),
mesosutil.NewScalarResource("mem", task.RAM),
}
if s.wattsAsAResource {
if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil {
log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider)
resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider))
} else {
// Error in determining wattsConsideration
log.Fatal(err)
}
}
return &mesos.TaskInfo{
Name: proto.String(taskName),
TaskId: &mesos.TaskID{
Value: proto.String("electron-" + taskName),
},
SlaveId: offer.SlaveId,
Resources: resources,
Command: &mesos.CommandInfo{
Value: proto.String(task.CMD),
},
Container: &mesos.ContainerInfo{
Type: mesos.ContainerInfo_DOCKER.Enum(),
Docker: &mesos.ContainerInfo_DockerInfo{
Image: proto.String(task.Image),
Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated
},
},
}
}
func (s *BinPackedPistonCapper) Disconnected(sched.SchedulerDriver) {
// Need to stop the capping process
s.ticker.Stop()
bpPistonMutex.Lock()
s.isCapping = false
bpPistonMutex.Unlock()
log.Println("Framework disconnected with master")
}
// go routine to cap the each node in the cluster at regular intervals of time.
var bpPistonCapValues = make(map[string]float64)
// Storing the previous cap value for each host so as to not repeatedly cap the nodes to the same value. (reduces overhead)
var bpPistonPreviousRoundedCapValues = make(map[string]float64)
func (s *BinPackedPistonCapper) startCapping() {
go func() {
for {
select {
case <-s.ticker.C:
// Need to cap each node
bpPistonMutex.Lock()
for host, capValue := range bpPistonCapValues {
roundedCapValue := float64(int(math.Floor(capValue + 0.5)))
// has the cap value changed
if prevRoundedCap, ok := bpPistonPreviousRoundedCapValues[host]; ok {
if prevRoundedCap != roundedCapValue {
if err := rapl.Cap(host, "rapl", roundedCapValue); err != nil {
log.Println(err)
} else {
log.Printf("Capped [%s] at %d", host,
int(math.Floor(capValue+0.5)))
}
bpPistonPreviousRoundedCapValues[host] = roundedCapValue
}
} else {
if err := rapl.Cap(host, "rapl", roundedCapValue); err != nil {
log.Println(err)
} else {
log.Printf("Capped [%s] at %d", host, int(math.Floor(capValue+0.5)))
}
bpPistonPreviousRoundedCapValues[host] = roundedCapValue
}
}
bpPistonMutex.Unlock()
}
}
}()
}
// Stop the capping
func (s *BinPackedPistonCapper) stopCapping() {
if s.isCapping {
log.Println("Stopping the capping.")
s.ticker.Stop()
bpPistonMutex.Lock()
s.isCapping = false
bpPistonMutex.Unlock()
}
}
func (s *BinPackedPistonCapper) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {
log.Printf("Received %d resource offers", len(offers))
// retrieving the total power for each host in the offers
for _, offer := range offers {
offerUtils.UpdateEnvironment(offer)
if _, ok := s.totalPower[*offer.Hostname]; !ok {
_, _, offerWatts := offerUtils.OfferAgg(offer)
s.totalPower[*offer.Hostname] = offerWatts
}
}
// Displaying the totalPower
for host, tpower := range s.totalPower {
log.Printf("TotalPower[%s] = %f", host, tpower)
}
/*
Piston capping strategy
Perform bin-packing of tasks on nodes in the cluster, making sure that no task is given less hard-limit resources than requested.
For each set of tasks that are scheduled, compute the new cap values for each host in the cluster.
At regular intervals of time, cap each node in the cluster.
*/
for _, offer := range offers {
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
default:
}
fitTasks := []*mesos.TaskInfo{}
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
offerTaken := false
totalWatts := 0.0
totalCPU := 0.0
totalRAM := 0.0
// Store the partialLoad for host corresponding to this offer.
// Once we can't fit any more tasks, we update capValue for this host with partialLoad and then launch the fit tasks.
partialLoad := 0.0
for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i]
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
if err != nil {
// Error in determining wattsConsideration
log.Fatal(err)
}
// Don't take offer if it doesn't match our task's host requirement
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
for *task.Instances > 0 {
// Does the task fit
if s.takeOffer(offer, offerWatts, offerCPU, offerRAM,
totalWatts, totalCPU, totalRAM, task) {
// Start piston capping if haven't started yet
if !s.isCapping {
s.isCapping = true
s.startCapping()
}
offerTaken = true
totalWatts += wattsConsideration
totalCPU += task.CPU
totalRAM += task.RAM
log.Println("Co-Located with: ")
coLocated(s.running[offer.GetSlaveId().GoString()])
taskToSchedule := s.newTask(offer, task)
fitTasks = append(fitTasks, taskToSchedule)
log.Println("Inst: ", *task.Instances)
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
*task.Instances--
// updating the cap value for offer.Hostname
partialLoad += ((wattsConsideration * constants.Tolerance) / s.totalPower[*offer.Hostname]) * 100
if *task.Instances <= 0 {
// All instances of task have been scheduled. Remove it
s.tasks = append(s.tasks[:i], s.tasks[i+1:]...)
if len(s.tasks) <= 0 {
log.Println("Done scheduling all tasks")
close(s.Shutdown)
}
}
} else {
break // Continue on to next task
}
}
}
if offerTaken {
// Updating the cap value for offer.Hostname
bpPistonMutex.Lock()
bpPistonCapValues[*offer.Hostname] += partialLoad
bpPistonMutex.Unlock()
log.Printf("Starting on [%s]\n", offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, fitTasks, mesosUtils.DefaultFilter)
} else {
// If there was no match for task
log.Println("There is not enough resources to launch task: ")
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
}
// Remove finished task from the taskMonitor
func (s *BinPackedPistonCapper) deleteFromTaskMonitor(finishedTaskID string) (def.Task, string, error) {
hostOfFinishedTask := ""
indexOfFinishedTask := -1
found := false
var finishedTask def.Task
for host, tasks := range s.taskMonitor {
for i, task := range tasks {
if task.TaskID == finishedTaskID {
hostOfFinishedTask = host
indexOfFinishedTask = i
found = true
}
}
if found {
break
}
}
if hostOfFinishedTask != "" && indexOfFinishedTask != -1 {
finishedTask = s.taskMonitor[hostOfFinishedTask][indexOfFinishedTask]
log.Printf("Removing task with TaskID [%s] from the list of running tasks\n",
s.taskMonitor[hostOfFinishedTask][indexOfFinishedTask].TaskID)
s.taskMonitor[hostOfFinishedTask] = append(s.taskMonitor[hostOfFinishedTask][:indexOfFinishedTask],
s.taskMonitor[hostOfFinishedTask][indexOfFinishedTask+1:]...)
} else {
return finishedTask, hostOfFinishedTask, errors.New("Finished Task not present in TaskMonitor")
}
return finishedTask, hostOfFinishedTask, nil
}
func (s *BinPackedPistonCapper) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) {
log.Printf("Received task status [%s] for task [%s]\n", NameFor(status.State), *status.TaskId.Value)
if *status.State == mesos.TaskState_TASK_RUNNING {
bpPistonMutex.Lock()
s.tasksRunning++
bpPistonMutex.Unlock()
} else if IsTerminal(status.State) {
delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value)
// Deleting the task from the taskMonitor
finishedTask, hostOfFinishedTask, err := s.deleteFromTaskMonitor(*status.TaskId.Value)
if err != nil {
log.Println(err)
}
// Need to determine the watts consideration for the finishedTask
var wattsConsideration float64
if s.classMapWatts {
wattsConsideration = finishedTask.ClassToWatts[hostToPowerClass(hostOfFinishedTask)]
} else {
wattsConsideration = finishedTask.Watts
}
// Need to update the cap values for host of the finishedTask
bpPistonMutex.Lock()
bpPistonCapValues[hostOfFinishedTask] -= ((wattsConsideration * constants.Tolerance) / s.totalPower[hostOfFinishedTask]) * 100
// Checking to see if the cap value has become 0, in which case we uncap the host.
if int(math.Floor(bpPistonCapValues[hostOfFinishedTask]+0.5)) == 0 {
bpPistonCapValues[hostOfFinishedTask] = 100
}
s.tasksRunning--
bpPistonMutex.Unlock()
if s.tasksRunning == 0 {
select {
case <-s.Shutdown:
s.stopCapping()
close(s.Done)
default:
}
}
}
log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
}

View file

@ -1,343 +0,0 @@
package schedulers
import (
"bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/electron/utilities/offerUtils"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler"
"log"
"os"
"sort"
"time"
)
/*
Tasks are categorized into small and large tasks based on watts requirements.
All the large tasks are packed into offers from agents belonging to power classes A and B, using Bin-Packing.
All the small tasks are spread among offers from agents belonging to power class C and D, using First-Fit.
Bin-Packing has the most effect when co-scheduling of tasks is increased. Large tasks typically utilize more resources and hence,
co-scheduling them has a great impact on the total power utilization.
*/
func (s *BottomHeavy) takeOfferBinPack(offer *mesos.Offer, totalCPU, totalRAM, totalWatts,
wattsToConsider float64, task def.Task) bool {
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
//TODO: Insert watts calculation here instead of taking them as a parameter
if (!s.wattsAsAResource || (offerWatts >= (totalWatts + wattsToConsider))) &&
(offerCPU >= (totalCPU + task.CPU)) &&
(offerRAM >= (totalRAM + task.RAM)) {
return true
}
return false
}
func (s *BottomHeavy) takeOfferFirstFit(offer *mesos.Offer, wattsConsideration float64, task def.Task) bool {
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
//TODO: Insert watts calculation here instead of taking them as a parameter
if (!s.wattsAsAResource || (offerWatts >= wattsConsideration)) &&
(offerCPU >= task.CPU) && (offerRAM >= task.RAM) {
return true
}
return false
}
// electronScheduler implements the Scheduler interface
type BottomHeavy struct {
base // Type embedded to inherit common functions
smallTasks, largeTasks []def.Task
}
// New electron scheduler
func NewBottomHeavy(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *BottomHeavy {
sort.Sort(def.WattsSorter(tasks))
logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log")
if err != nil {
log.Fatal(err)
}
// Classification done based on MMPU watts requirements, into 2 clusters.
classifiedTasks := def.ClassifyTasks(tasks, 2)
s := &BottomHeavy{
base: base{
wattsAsAResource: wattsAsAResource,
classMapWatts: classMapWatts,
Shutdown: make(chan struct{}),
Done: make(chan struct{}),
PCPLog: make(chan struct{}),
running: make(map[string]map[string]bool),
RecordPCP: false,
schedTrace: log.New(logFile, "", log.LstdFlags),
},
// Separating small tasks from large tasks.
smallTasks: classifiedTasks[0].Tasks,
largeTasks: classifiedTasks[1].Tasks,
}
return s
}
func (s *BottomHeavy) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
s.tasksCreated++
if !s.RecordPCP {
// Turn on logging
s.RecordPCP = true
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
}
// If this is our first time running into this Agent
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
}
// Add task to list of tasks running on node
s.running[offer.GetSlaveId().GoString()][taskName] = true
resources := []*mesos.Resource{
mesosutil.NewScalarResource("cpus", task.CPU),
mesosutil.NewScalarResource("mem", task.RAM),
}
if s.wattsAsAResource {
if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil {
log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider)
resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider))
} else {
// Error in determining wattsConsideration
log.Fatal(err)
}
}
return &mesos.TaskInfo{
Name: proto.String(taskName),
TaskId: &mesos.TaskID{
Value: proto.String("electron-" + taskName),
},
SlaveId: offer.SlaveId,
Resources: resources,
Command: &mesos.CommandInfo{
Value: proto.String(task.CMD),
},
Container: &mesos.ContainerInfo{
Type: mesos.ContainerInfo_DOCKER.Enum(),
Docker: &mesos.ContainerInfo_DockerInfo{
Image: proto.String(task.Image),
Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated
},
},
}
}
// Shut down scheduler if no more tasks to schedule
func (s *BottomHeavy) shutDownIfNecessary() {
if len(s.smallTasks) <= 0 && len(s.largeTasks) <= 0 {
log.Println("Done scheduling all tasks")
close(s.Shutdown)
}
}
// create TaskInfo and log scheduling trace
func (s *BottomHeavy) createTaskInfoAndLogSchedTrace(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
log.Println("Co-Located with:")
coLocated(s.running[offer.GetSlaveId().GoString()])
taskToSchedule := s.newTask(offer, task)
fmt.Println("Inst: ", *task.Instances)
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
*task.Instances--
return taskToSchedule
}
// Using BinPacking to pack large tasks into the given offers.
func (s *BottomHeavy) pack(offers []*mesos.Offer, driver sched.SchedulerDriver) {
for _, offer := range offers {
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
default:
}
tasks := []*mesos.TaskInfo{}
totalWatts := 0.0
totalCPU := 0.0
totalRAM := 0.0
offerTaken := false
for i := 0; i < len(s.largeTasks); i++ {
task := s.largeTasks[i]
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
if err != nil {
// Error in determining wattsConsideration
log.Fatal(err)
}
for *task.Instances > 0 {
// Does the task fit
// OR lazy evaluation. If ignore watts is set to true, second statement won't
// be evaluated.
if s.takeOfferBinPack(offer, totalCPU, totalRAM, totalWatts, wattsConsideration, task) {
offerTaken = true
totalWatts += wattsConsideration
totalCPU += task.CPU
totalRAM += task.RAM
tasks = append(tasks, s.createTaskInfoAndLogSchedTrace(offer, task))
if *task.Instances <= 0 {
// All instances of task have been scheduled, remove it
s.largeTasks = append(s.largeTasks[:i], s.largeTasks[i+1:]...)
s.shutDownIfNecessary()
}
} else {
break // Continue on to next task
}
}
}
if offerTaken {
log.Printf("Starting on [%s]\n", offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
} else {
// If there was no match for the task
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
}
// Using First-Fit to spread small tasks among the given offers.
func (s *BottomHeavy) spread(offers []*mesos.Offer, driver sched.SchedulerDriver) {
for _, offer := range offers {
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
default:
}
tasks := []*mesos.TaskInfo{}
taken := false
for i := 0; i < len(s.smallTasks); i++ {
task := s.smallTasks[i]
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
if err != nil {
// Error in determining wattsConsideration
log.Fatal(err)
}
// Decision to take the offer or not
if s.takeOfferFirstFit(offer, wattsConsideration, task) {
taken = true
tasks = append(tasks, s.createTaskInfoAndLogSchedTrace(offer, task))
log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
if *task.Instances <= 0 {
// All instances of task have been scheduled, remove it
s.smallTasks = append(s.smallTasks[:i], s.smallTasks[i+1:]...)
s.shutDownIfNecessary()
}
break // Offer taken, move on
}
}
if !taken {
// If there was no match for the task
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
}
func (s *BottomHeavy) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {
log.Printf("Received %d resource offers", len(offers))
// We need to separate the offers into
// offers from ClassA and ClassB and offers from ClassC and ClassD.
// Nodes in ClassA and ClassB will be packed with the large tasks.
// Small tasks will be spread out among the nodes in ClassC and ClassD.
offersHeavyPowerClasses := []*mesos.Offer{}
offersLightPowerClasses := []*mesos.Offer{}
for _, offer := range offers {
offerUtils.UpdateEnvironment(offer)
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
default:
}
if _, ok := constants.PowerClasses["A"][*offer.Hostname]; ok {
offersHeavyPowerClasses = append(offersHeavyPowerClasses, offer)
}
if _, ok := constants.PowerClasses["B"][*offer.Hostname]; ok {
offersHeavyPowerClasses = append(offersHeavyPowerClasses, offer)
}
if _, ok := constants.PowerClasses["C"][*offer.Hostname]; ok {
offersLightPowerClasses = append(offersLightPowerClasses, offer)
}
if _, ok := constants.PowerClasses["D"][*offer.Hostname]; ok {
offersLightPowerClasses = append(offersLightPowerClasses, offer)
}
}
log.Println("Packing Large tasks into ClassAB offers:")
for _, o := range offersHeavyPowerClasses {
log.Println(*o.Hostname)
}
// Packing tasks into offersHeavyPowerClasses
s.pack(offersHeavyPowerClasses, driver)
log.Println("Spreading Small tasks among ClassCD offers:")
for _, o := range offersLightPowerClasses {
log.Println(*o.Hostname)
}
// Spreading tasks among offersLightPowerClasses
s.spread(offersLightPowerClasses, driver)
}
func (s *BottomHeavy) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) {
log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
if *status.State == mesos.TaskState_TASK_RUNNING {
s.tasksRunning++
} else if IsTerminal(status.State) {
delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value)
s.tasksRunning--
if s.tasksRunning == 0 {
select {
case <-s.Shutdown:
close(s.Done)
default:
}
}
}
log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
}

View file

@ -1,436 +0,0 @@
package schedulers
import (
"bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/rapl"
"bitbucket.org/sunybingcloud/electron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/electron/utilities/offerUtils"
"errors"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler"
"log"
"math"
"os"
"sort"
"sync"
"time"
)
// Decides if to take an offer or not
func (s *BPSWMaxMinPistonCapping) takeOffer(offer *mesos.Offer, task def.Task,
totalCPU, totalRAM, totalWatts float64) bool {
cpus, mem, watts := offerUtils.OfferAgg(offer)
//TODO: Insert watts calculation here instead of taking them as a parameter
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
if err != nil {
// Error in determining wattsConsideration
log.Fatal(err)
}
if (cpus >= (totalCPU + task.CPU)) && (mem >= (totalRAM + task.RAM)) &&
(!s.wattsAsAResource || (watts >= (totalWatts + wattsConsideration))) {
return true
}
return false
}
type BPSWMaxMinPistonCapping struct {
base //Type embedding to inherit common functions
taskMonitor map[string][]def.Task
totalPower map[string]float64
ticker *time.Ticker
isCapping bool
}
// New electron scheduler
func NewBPSWMaxMinPistonCapping(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string,
classMapWatts bool) *BPSWMaxMinPistonCapping {
sort.Sort(def.WattsSorter(tasks))
logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log")
if err != nil {
log.Fatal(err)
}
s := &BPSWMaxMinPistonCapping{
base: base{
tasks: tasks,
wattsAsAResource: wattsAsAResource,
classMapWatts: classMapWatts,
Shutdown: make(chan struct{}),
Done: make(chan struct{}),
PCPLog: make(chan struct{}),
running: make(map[string]map[string]bool),
RecordPCP: false,
schedTrace: log.New(logFile, "", log.LstdFlags),
},
taskMonitor: make(map[string][]def.Task),
totalPower: make(map[string]float64),
ticker: time.NewTicker(5 * time.Second),
isCapping: false,
}
return s
}
func (s *BPSWMaxMinPistonCapping) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
s.tasksCreated++
// Start recording only when we're creating the first task
if !s.RecordPCP {
// Turn on logging
s.RecordPCP = true
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
}
// If this is our first time running into this Agent
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
}
// Add task to list of tasks running on node
s.running[offer.GetSlaveId().GoString()][taskName] = true
// Setting the task ID to the task. This is done so that we can consider each task to be different
// even though they have the same parameters.
task.SetTaskID(*proto.String("electron-" + taskName))
// Add task to list of tasks running on node
if len(s.taskMonitor[*offer.Hostname]) == 0 {
s.taskMonitor[*offer.Hostname] = []def.Task{task}
} else {
s.taskMonitor[*offer.Hostname] = append(s.taskMonitor[*offer.Hostname], task)
}
resources := []*mesos.Resource{
mesosutil.NewScalarResource("cpus", task.CPU),
mesosutil.NewScalarResource("mem", task.RAM),
}
if s.wattsAsAResource {
if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil {
log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider)
resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider))
} else {
// Error in determining wattsConsideration
log.Fatal(err)
}
}
return &mesos.TaskInfo{
Name: proto.String(taskName),
TaskId: &mesos.TaskID{
Value: proto.String("electron-" + taskName),
},
SlaveId: offer.SlaveId,
Resources: resources,
Command: &mesos.CommandInfo{
Value: proto.String(task.CMD),
},
Container: &mesos.ContainerInfo{
Type: mesos.ContainerInfo_DOCKER.Enum(),
Docker: &mesos.ContainerInfo_DockerInfo{
Image: proto.String(task.Image),
Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated
},
},
}
}
func (s *BPSWMaxMinPistonCapping) Disconnected(sched.SchedulerDriver) {
// Need to stop the capping process
s.ticker.Stop()
bpMaxMinPistonCappingMutex.Lock()
s.isCapping = false
bpMaxMinPistonCappingMutex.Unlock()
log.Println("Framework disconnected with master")
}
// mutex
var bpMaxMinPistonCappingMutex sync.Mutex
// go routine to cap each node in the cluster at regular intervals of time
var bpMaxMinPistonCappingCapValues = make(map[string]float64)
// Storing the previous cap value for each host so as to not repeatedly cap the nodes to the same value. (reduces overhead)
var bpMaxMinPistonCappingPreviousRoundedCapValues = make(map[string]float64)
func (s *BPSWMaxMinPistonCapping) startCapping() {
go func() {
for {
select {
case <-s.ticker.C:
// Need to cap each node
bpMaxMinPistonCappingMutex.Lock()
for host, capValue := range bpMaxMinPistonCappingCapValues {
roundedCapValue := float64(int(math.Floor(capValue + 0.5)))
// has the cap value changed
if previousRoundedCap, ok := bpMaxMinPistonCappingPreviousRoundedCapValues[host]; ok {
if previousRoundedCap != roundedCapValue {
if err := rapl.Cap(host, "rapl", roundedCapValue); err != nil {
log.Println(err)
} else {
log.Printf("Capped [%s] at %d", host, int(math.Floor(capValue)))
}
bpMaxMinPistonCappingPreviousRoundedCapValues[host] = roundedCapValue
}
} else {
if err := rapl.Cap(host, "rapl", roundedCapValue); err != nil {
log.Println(err)
} else {
log.Printf("Capped [%s] at %d", host, int(math.Floor(capValue+0.5)))
}
bpMaxMinPistonCappingPreviousRoundedCapValues[host] = roundedCapValue
}
}
bpMaxMinPistonCappingMutex.Unlock()
}
}
}()
}
// Stop the capping
func (s *BPSWMaxMinPistonCapping) stopCapping() {
if s.isCapping {
log.Println("Stopping the capping.")
s.ticker.Stop()
bpMaxMinPistonCappingMutex.Lock()
s.isCapping = false
bpMaxMinPistonCappingMutex.Unlock()
}
}
// Determine if the remaining sapce inside of the offer is enough for
// the task we need to create. If it is, create a TaskInfo and return it.
func (s *BPSWMaxMinPistonCapping) CheckFit(
i int,
task def.Task,
wattsConsideration float64,
offer *mesos.Offer,
totalCPU *float64,
totalRAM *float64,
totalWatts *float64,
partialLoad *float64) (bool, *mesos.TaskInfo) {
// Does the task fit
if s.takeOffer(offer, task, *totalCPU, *totalRAM, *totalWatts) {
// Start piston capping if haven't started yet
if !s.isCapping {
s.isCapping = true
s.startCapping()
}
*totalWatts += wattsConsideration
*totalCPU += task.CPU
*totalRAM += task.RAM
log.Println("Co-Located with: ")
coLocated(s.running[offer.GetSlaveId().GoString()])
taskToSchedule := s.newTask(offer, task)
fmt.Println("Inst: ", *task.Instances)
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
*task.Instances--
*partialLoad += ((wattsConsideration * constants.Tolerance) / s.totalPower[*offer.Hostname]) * 100
if *task.Instances <= 0 {
// All instances of task have been scheduled, remove it
s.tasks = append(s.tasks[:i], s.tasks[i+1:]...)
if len(s.tasks) <= 0 {
log.Println("Done scheduling all tasks")
close(s.Shutdown)
}
}
return true, taskToSchedule
}
return false, nil
}
func (s *BPSWMaxMinPistonCapping) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {
log.Printf("Received %d resource offers", len(offers))
for _, offer := range offers {
offerUtils.UpdateEnvironment(offer)
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
default:
}
tasks := []*mesos.TaskInfo{}
offerTaken := false
totalWatts := 0.0
totalCPU := 0.0
totalRAM := 0.0
// Store the partialLoad for host corresponding to this offer
// Once we can't fit any more tasks, we update the capValue for this host using partialLoad and then launch the fit tasks.
partialLoad := 0.0
// Assumes s.tasks is ordered in non-decreasing median max peak order
// Attempt to schedule a single instance of the heaviest workload available first
// Start from the back until one fits
for i := len(s.tasks) - 1; i >= 0; i-- {
task := s.tasks[i]
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
if err != nil {
// Error in determining wattsConsideration
log.Fatal(err)
}
// Don't take offer if it doesn't match our task's host requirement
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
// TODO: Fix this so index doesn't need to be passed
taken, taskToSchedule := s.CheckFit(i, task, wattsConsideration, offer,
&totalCPU, &totalRAM, &totalWatts, &partialLoad)
if taken {
offerTaken = true
tasks = append(tasks, taskToSchedule)
break
}
}
// Pack the rest of the offer with the smallest tasks
for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i]
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
if err != nil {
// Error in determining wattsConsideration
log.Fatal(err)
}
// Don't take offer if it doesn't match our task's host requirement
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
for *task.Instances > 0 {
// TODO: Fix this so index doesn't need to be passed
taken, taskToSchedule := s.CheckFit(i, task, wattsConsideration, offer,
&totalCPU, &totalRAM, &totalWatts, &partialLoad)
if taken {
offerTaken = true
tasks = append(tasks, taskToSchedule)
} else {
break // Continue on to next task
}
}
}
if offerTaken {
// Updating the cap value for offer.Hostname
bpMaxMinPistonCappingMutex.Lock()
bpMaxMinPistonCappingCapValues[*offer.Hostname] += partialLoad
bpMaxMinPistonCappingMutex.Unlock()
log.Printf("Starting on [%s]\n", offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
} else {
// If there was no match for the task
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
}
// Remove finished task from the taskMonitor
func (s *BPSWMaxMinPistonCapping) deleteFromTaskMonitor(finishedTaskID string) (def.Task, string, error) {
hostOfFinishedTask := ""
indexOfFinishedTask := -1
found := false
var finishedTask def.Task
for host, tasks := range s.taskMonitor {
for i, task := range tasks {
if task.TaskID == finishedTaskID {
hostOfFinishedTask = host
indexOfFinishedTask = i
found = true
}
}
if found {
break
}
}
if hostOfFinishedTask != "" && indexOfFinishedTask != -1 {
finishedTask = s.taskMonitor[hostOfFinishedTask][indexOfFinishedTask]
log.Printf("Removing task with TaskID [%s] from the list of running tasks\n",
s.taskMonitor[hostOfFinishedTask][indexOfFinishedTask].TaskID)
s.taskMonitor[hostOfFinishedTask] = append(s.taskMonitor[hostOfFinishedTask][:indexOfFinishedTask],
s.taskMonitor[hostOfFinishedTask][indexOfFinishedTask+1:]...)
} else {
return finishedTask, hostOfFinishedTask, errors.New("Finished Task not present in TaskMonitor")
}
return finishedTask, hostOfFinishedTask, nil
}
func (s *BPSWMaxMinPistonCapping) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) {
log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
if *status.State == mesos.TaskState_TASK_RUNNING {
bpMaxMinPistonCappingMutex.Lock()
s.tasksRunning++
bpMaxMinPistonCappingMutex.Unlock()
} else if IsTerminal(status.State) {
delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value)
// Deleting the task from the taskMonitor
finishedTask, hostOfFinishedTask, err := s.deleteFromTaskMonitor(*status.TaskId.Value)
if err != nil {
log.Println(err)
}
// Need to determine the watts consideration for the finishedTask
var wattsConsideration float64
if s.classMapWatts {
wattsConsideration = finishedTask.ClassToWatts[hostToPowerClass(hostOfFinishedTask)]
} else {
wattsConsideration = finishedTask.Watts
}
// Need to update the cap values for host of the finishedTask
bpMaxMinPistonCappingMutex.Lock()
bpMaxMinPistonCappingCapValues[hostOfFinishedTask] -= ((wattsConsideration * constants.Tolerance) / s.totalPower[hostOfFinishedTask]) * 100
// Checking to see if the cap value has become 0, in which case we uncap the host.
if int(math.Floor(bpMaxMinPistonCappingCapValues[hostOfFinishedTask]+0.5)) == 0 {
bpMaxMinPistonCappingCapValues[hostOfFinishedTask] = 100
}
s.tasksRunning--
bpMaxMinPistonCappingMutex.Unlock()
if s.tasksRunning == 0 {
select {
case <-s.Shutdown:
s.stopCapping()
close(s.Done)
default:
}
}
}
log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
}

View file

@ -1,447 +0,0 @@
package schedulers
import (
"bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/def"
powCap "bitbucket.org/sunybingcloud/electron/powerCapping"
"bitbucket.org/sunybingcloud/electron/rapl"
"bitbucket.org/sunybingcloud/electron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/electron/utilities/offerUtils"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler"
"log"
"math"
"os"
"sort"
"sync"
"time"
)
// Decides if to take an offer or not
func (s *BPSWMaxMinProacCC) takeOffer(offer *mesos.Offer, task def.Task,
totalCPU, totalRAM, totalWatts float64) bool {
cpus, mem, watts := offerUtils.OfferAgg(offer)
//TODO: Insert watts calculation here instead of taking them as a parameter
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
if err != nil {
// Error in determining wattsConsideration
log.Fatal(err)
}
if (cpus >= (totalCPU + task.CPU)) && (mem >= (totalRAM + task.RAM)) &&
(!s.wattsAsAResource || (watts >= (totalWatts + wattsConsideration))) {
return true
}
return false
}
type BPSWMaxMinProacCC struct {
base // Type embedding to inherit common functions
taskMonitor map[string][]def.Task
availablePower map[string]float64
totalPower map[string]float64
capper *powCap.ClusterwideCapper
ticker *time.Ticker
recapTicker *time.Ticker
isCapping bool // indicate whether we are currently performing cluster-wide capping.
isRecapping bool // indicate whether we are currently performing cluster-wide recapping.
}
// New electron scheduler
func NewBPSWMaxMinProacCC(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *BPSWMaxMinProacCC {
sort.Sort(def.WattsSorter(tasks))
logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log")
if err != nil {
log.Fatal(err)
}
s := &BPSWMaxMinProacCC{
base: base{
tasks: tasks,
wattsAsAResource: wattsAsAResource,
classMapWatts: classMapWatts,
Shutdown: make(chan struct{}),
Done: make(chan struct{}),
PCPLog: make(chan struct{}),
running: make(map[string]map[string]bool),
RecordPCP: false,
schedTrace: log.New(logFile, "", log.LstdFlags),
},
taskMonitor: make(map[string][]def.Task),
availablePower: make(map[string]float64),
totalPower: make(map[string]float64),
capper: powCap.GetClusterwideCapperInstance(),
ticker: time.NewTicker(10 * time.Second),
recapTicker: time.NewTicker(20 * time.Second),
isCapping: false,
isRecapping: false,
}
return s
}
// mutex
var bpMaxMinProacCCMutex sync.Mutex
func (s *BPSWMaxMinProacCC) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
s.tasksCreated++
if !s.RecordPCP {
// Turn on logging.
s.RecordPCP = true
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
}
// If this is our first time running into this Agent
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
}
// Setting the task ID to the task. This is done so that we can consider each task to be different,
// even though they have the same parameters.
task.SetTaskID(*proto.String("electron-" + taskName))
// Add task to the list of tasks running on the node.
s.running[offer.GetSlaveId().GoString()][taskName] = true
if len(s.taskMonitor[*offer.Hostname]) == 0 {
s.taskMonitor[*offer.Hostname] = []def.Task{task}
} else {
s.taskMonitor[*offer.Hostname] = append(s.taskMonitor[*offer.Hostname], task)
}
resources := []*mesos.Resource{
mesosutil.NewScalarResource("cpus", task.CPU),
mesosutil.NewScalarResource("mem", task.RAM),
}
if s.wattsAsAResource {
if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil {
log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider)
resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider))
} else {
// Error in determining wattsConsideration
log.Fatal(err)
}
}
return &mesos.TaskInfo{
Name: proto.String(taskName),
TaskId: &mesos.TaskID{
Value: proto.String("electron-" + taskName),
},
SlaveId: offer.SlaveId,
Resources: resources,
Command: &mesos.CommandInfo{
Value: proto.String(task.CMD),
},
Container: &mesos.ContainerInfo{
Type: mesos.ContainerInfo_DOCKER.Enum(),
Docker: &mesos.ContainerInfo_DockerInfo{
Image: proto.String(task.Image),
Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated
},
},
}
}
// go routine to cap the entire cluster in regular intervals of time.
var bpMaxMinProacCCCapValue = 0.0 // initial value to indicate that we haven't capped the cluster yet.
var bpMaxMinProacCCNewCapValue = 0.0 // newly computed cap value
func (s *BPSWMaxMinProacCC) startCapping() {
go func() {
for {
select {
case <-s.ticker.C:
// Need to cap the cluster only if new cap value different from old cap value.
// This way we don't unnecessarily cap the cluster.
bpMaxMinProacCCMutex.Lock()
if s.isCapping {
if int(math.Floor(bpMaxMinProacCCNewCapValue+0.5)) != int(math.Floor(bpMaxMinProacCCCapValue+0.5)) {
// updating cap value
bpMaxMinProacCCCapValue = bpMaxMinProacCCNewCapValue
if bpMaxMinProacCCCapValue > 0.0 {
for host, _ := range constants.Hosts {
// Rounding cap value to nearest int
if err := rapl.Cap(host, "rapl", float64(int(math.Floor(bpMaxMinProacCCCapValue+0.5)))); err != nil {
log.Println(err)
}
}
log.Printf("Capped the cluster to %d", int(math.Floor(bpMaxMinProacCCCapValue+0.5)))
}
}
}
bpMaxMinProacCCMutex.Unlock()
}
}
}()
}
// go routine to recap the entire cluster in regular intervals of time.
var bpMaxMinProacCCRecapValue = 0.0 // The cluster-wide cap value when recapping.
func (s *BPSWMaxMinProacCC) startRecapping() {
go func() {
for {
select {
case <-s.recapTicker.C:
bpMaxMinProacCCMutex.Lock()
// If stopped performing cluster-wide capping, then we need to recap.
if s.isRecapping && bpMaxMinProacCCRecapValue > 0.0 {
for host, _ := range constants.Hosts {
// Rounding the recap value to the nearest int
if err := rapl.Cap(host, "rapl", float64(int(math.Floor(bpMaxMinProacCCRecapValue+0.5)))); err != nil {
log.Println(err)
}
}
log.Printf("Capped the cluster to %d", int(math.Floor(bpMaxMinProacCCRecapValue+0.5)))
}
// Setting the recapping to false
s.isRecapping = false
bpMaxMinProacCCMutex.Unlock()
}
}
}()
}
// Stop cluster-wide capping
func (s *BPSWMaxMinProacCC) stopCapping() {
if s.isCapping {
log.Println("Stopping the cluster-wide capping.")
s.ticker.Stop()
bpMaxMinProacCCMutex.Lock()
s.isCapping = false
s.isRecapping = true
bpMaxMinProacCCMutex.Unlock()
}
}
// Stop the cluster-wide recapping
func (s *BPSWMaxMinProacCC) stopRecapping() {
// If not capping, then definitely recapping.
if !s.isCapping && s.isRecapping {
log.Println("Stopping the cluster-wide re-capping.")
s.recapTicker.Stop()
bpMaxMinProacCCMutex.Lock()
s.isRecapping = false
bpMaxMinProacCCMutex.Unlock()
}
}
// Determine if the remaining space inside of the offer is enough for
// the task we need to create. If it is, create TaskInfo and return it.
func (s *BPSWMaxMinProacCC) CheckFit(
i int,
task def.Task,
wattsConsideration float64,
offer *mesos.Offer,
totalCPU *float64,
totalRAM *float64,
totalWatts *float64) (bool, *mesos.TaskInfo) {
// Does the task fit
if s.takeOffer(offer, task, *totalCPU, *totalRAM, *totalWatts) {
// Capping the cluster if haven't yet started
if !s.isCapping {
bpMaxMinProacCCMutex.Lock()
s.isCapping = true
bpMaxMinProacCCMutex.Unlock()
s.startCapping()
}
tempCap, err := s.capper.FCFSDeterminedCap(s.totalPower, &task)
if err == nil {
bpMaxMinProacCCMutex.Lock()
bpMaxMinProacCCNewCapValue = tempCap
bpMaxMinProacCCMutex.Unlock()
} else {
log.Println("Failed to determine new cluster-wide cap:")
log.Println(err)
}
*totalWatts += wattsConsideration
*totalCPU += task.CPU
*totalRAM += task.RAM
log.Println("Co-Located with: ")
coLocated(s.running[offer.GetSlaveId().GoString()])
taskToSchedule := s.newTask(offer, task)
fmt.Println("Inst: ", *task.Instances)
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
*task.Instances--
if *task.Instances <= 0 {
// All instances of task have been scheduled, remove it
s.tasks = append(s.tasks[:i], s.tasks[i+1:]...)
if len(s.tasks) <= 0 {
log.Println("Done scheduling all tasks")
// Need to stop the cluster wide capping
s.stopCapping()
s.startRecapping() // Load changes after every task finishes and hence, we need to change the capping of the cluster.
close(s.Shutdown)
}
}
return true, taskToSchedule
}
return false, nil
}
func (s *BPSWMaxMinProacCC) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {
log.Printf("Received %d resource offers", len(offers))
// retrieving the available power for all the hosts in the offers.
for _, offer := range offers {
offerUtils.UpdateEnvironment(offer)
_, _, offerWatts := offerUtils.OfferAgg(offer)
s.availablePower[*offer.Hostname] = offerWatts
// setting total power if the first time
if _, ok := s.totalPower[*offer.Hostname]; !ok {
s.totalPower[*offer.Hostname] = offerWatts
}
}
for host, tpower := range s.totalPower {
log.Printf("TotalPower[%s] = %f", host, tpower)
}
for _, offer := range offers {
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
default:
}
tasks := []*mesos.TaskInfo{}
offerTaken := false
totalWatts := 0.0
totalCPU := 0.0
totalRAM := 0.0
// Assumes s.tasks is ordered in non-decreasing median max peak order
// Attempt to schedule a single instance of the heaviest workload available first
// Start from the back until one fits
for i := len(s.tasks) - 1; i >= 0; i-- {
task := s.tasks[i]
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
if err != nil {
// Error in determining wattsConsideration
log.Fatal(err)
}
// Don't take offer if it doesn't match our task's host requirement
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
// TODO: Fix this so index doesn't need to be passed
taken, taskToSchedule := s.CheckFit(i, task, wattsConsideration, offer,
&totalCPU, &totalRAM, &totalWatts)
if taken {
offerTaken = true
tasks = append(tasks, taskToSchedule)
break
}
}
// Pack the rest of the offer with the smallest tasks
for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i]
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
if err != nil {
// Error in determining wattsConsideration
log.Fatal(err)
}
// Don't take offer if it doesn't match our task's host requirement
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
for *task.Instances > 0 {
// TODO: Fix this so index doesn't need to be passed
taken, taskToSchedule := s.CheckFit(i, task, wattsConsideration, offer,
&totalCPU, &totalRAM, &totalWatts)
if taken {
offerTaken = true
tasks = append(tasks, taskToSchedule)
} else {
break // Continue on to next task
}
}
}
if offerTaken {
log.Printf("Starting on [%s]\n", offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
} else {
// If there was no match for the task
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
}
func (s *BPSWMaxMinProacCC) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) {
log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
if *status.State == mesos.TaskState_TASK_RUNNING {
s.tasksRunning++
} else if IsTerminal(status.State) {
delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value)
// Need to remove the task from the window
s.capper.TaskFinished(*status.TaskId.Value)
// Determining the new cluster wide recap value
tempCap, err := s.capper.NaiveRecap(s.totalPower, s.taskMonitor, *status.TaskId.Value)
//tempCap, err := s.capper.CleverRecap(s.totalPower, s.taskMonitor, *status.TaskId.Value)
if err == nil {
// If new determined recap value is different from the current recap value, then we need to recap.
if int(math.Floor(tempCap+0.5)) != int(math.Floor(bpMaxMinProacCCRecapValue+0.5)) {
bpMaxMinProacCCRecapValue = tempCap
bpMaxMinProacCCMutex.Lock()
s.isRecapping = true
bpMaxMinProacCCMutex.Unlock()
log.Printf("Determined re-cap value: %f\n", bpMaxMinProacCCRecapValue)
} else {
bpMaxMinProacCCMutex.Lock()
s.isRecapping = false
bpMaxMinProacCCMutex.Unlock()
}
} else {
log.Println(err)
}
s.tasksRunning--
if s.tasksRunning == 0 {
select {
case <-s.Shutdown:
// Need to stop the cluster-wide recapping
s.stopRecapping()
close(s.Done)
default:
}
}
}
log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
}

View file

@ -1,382 +0,0 @@
package schedulers
import (
"bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/def"
powCap "bitbucket.org/sunybingcloud/electron/powerCapping"
"bitbucket.org/sunybingcloud/electron/rapl"
"bitbucket.org/sunybingcloud/electron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/electron/utilities/offerUtils"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler"
"log"
"math"
"os"
"sync"
"time"
)
// Decides if to take an offer or not
func (s *FirstFitProacCC) takeOffer(offer *mesos.Offer, task def.Task) bool {
offer_cpu, offer_mem, offer_watts := offerUtils.OfferAgg(offer)
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
if err != nil {
// Error in determining wattsConsideration
log.Fatal(err)
}
if offer_cpu >= task.CPU && offer_mem >= task.RAM && (!s.wattsAsAResource || (offer_watts >= wattsConsideration)) {
return true
}
return false
}
// electronScheduler implements the Scheduler interface.
type FirstFitProacCC struct {
base // Type embedded to inherit common functions
taskMonitor map[string][]def.Task // store tasks that are currently running.
availablePower map[string]float64 // available power for each node in the cluster.
totalPower map[string]float64 // total power for each node in the cluster.
capper *powCap.ClusterwideCapper
ticker *time.Ticker
recapTicker *time.Ticker
isCapping bool // indicate whether we are currently performing cluster wide capping.
isRecapping bool // indicate whether we are currently performing cluster wide re-capping.
}
// New electron scheduler.
func NewFirstFitProacCC(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string,
classMapWatts bool) *FirstFitProacCC {
logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log")
if err != nil {
log.Fatal(err)
}
s := &FirstFitProacCC{
base: base{
tasks: tasks,
wattsAsAResource: wattsAsAResource,
classMapWatts: classMapWatts,
Shutdown: make(chan struct{}),
Done: make(chan struct{}),
PCPLog: make(chan struct{}),
running: make(map[string]map[string]bool),
RecordPCP: false,
schedTrace: log.New(logFile, "", log.LstdFlags),
},
taskMonitor: make(map[string][]def.Task),
availablePower: make(map[string]float64),
totalPower: make(map[string]float64),
capper: powCap.GetClusterwideCapperInstance(),
ticker: time.NewTicker(10 * time.Second),
recapTicker: time.NewTicker(20 * time.Second),
isCapping: false,
isRecapping: false,
}
return s
}
// mutex
var fcfsMutex sync.Mutex
func (s *FirstFitProacCC) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
s.tasksCreated++
if !s.RecordPCP {
// Turn on logging.
s.RecordPCP = true
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
}
// If this is our first time running into this Agent
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
}
// Setting the task ID to the task. This is done so that we can consider each task to be different,
// even though they have the same parameters.
task.SetTaskID(*proto.String("electron-" + taskName))
// Add task to the list of tasks running on the node.
s.running[offer.GetSlaveId().GoString()][taskName] = true
if len(s.taskMonitor[*offer.Hostname]) == 0 {
s.taskMonitor[*offer.Hostname] = []def.Task{task}
} else {
s.taskMonitor[*offer.Hostname] = append(s.taskMonitor[*offer.Hostname], task)
}
resources := []*mesos.Resource{
mesosutil.NewScalarResource("cpus", task.CPU),
mesosutil.NewScalarResource("mem", task.RAM),
}
if s.wattsAsAResource {
if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil {
log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider)
resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider))
} else {
// Error in determining wattsConsideration
log.Fatal(err)
}
}
return &mesos.TaskInfo{
Name: proto.String(taskName),
TaskId: &mesos.TaskID{
Value: proto.String("electron-" + taskName),
},
SlaveId: offer.SlaveId,
Resources: resources,
Command: &mesos.CommandInfo{
Value: proto.String(task.CMD),
},
Container: &mesos.ContainerInfo{
Type: mesos.ContainerInfo_DOCKER.Enum(),
Docker: &mesos.ContainerInfo_DockerInfo{
Image: proto.String(task.Image),
Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated
},
},
}
}
func (s *FirstFitProacCC) Disconnected(sched.SchedulerDriver) {
// Need to stop the capping process.
s.ticker.Stop()
s.recapTicker.Stop()
fcfsMutex.Lock()
s.isCapping = false
fcfsMutex.Unlock()
log.Println("Framework disconnected with master")
}
// go routine to cap the entire cluster in regular intervals of time.
var fcfsCurrentCapValue = 0.0 // initial value to indicate that we haven't capped the cluster yet.
func (s *FirstFitProacCC) startCapping() {
go func() {
for {
select {
case <-s.ticker.C:
// Need to cap the cluster to the fcfsCurrentCapValue.
fcfsMutex.Lock()
if fcfsCurrentCapValue > 0.0 {
for host, _ := range constants.Hosts {
// Rounding curreCapValue to the nearest int.
if err := rapl.Cap(host, "rapl", float64(int(math.Floor(fcfsCurrentCapValue+0.5)))); err != nil {
log.Println(err)
}
}
log.Printf("Capped the cluster to %d", int(math.Floor(fcfsCurrentCapValue+0.5)))
}
fcfsMutex.Unlock()
}
}
}()
}
// go routine to cap the entire cluster in regular intervals of time.
var fcfsRecapValue = 0.0 // The cluster wide cap value when recapping.
func (s *FirstFitProacCC) startRecapping() {
go func() {
for {
select {
case <-s.recapTicker.C:
fcfsMutex.Lock()
// If stopped performing cluster wide capping then we need to explicitly cap the entire cluster.
if s.isRecapping && fcfsRecapValue > 0.0 {
for host, _ := range constants.Hosts {
// Rounding curreCapValue to the nearest int.
if err := rapl.Cap(host, "rapl", float64(int(math.Floor(fcfsRecapValue+0.5)))); err != nil {
log.Println(err)
}
}
log.Printf("Recapped the cluster to %d", int(math.Floor(fcfsRecapValue+0.5)))
}
// setting recapping to false
s.isRecapping = false
fcfsMutex.Unlock()
}
}
}()
}
// Stop cluster wide capping
func (s *FirstFitProacCC) stopCapping() {
if s.isCapping {
log.Println("Stopping the cluster wide capping.")
s.ticker.Stop()
fcfsMutex.Lock()
s.isCapping = false
s.isRecapping = true
fcfsMutex.Unlock()
}
}
// Stop cluster wide Recapping
func (s *FirstFitProacCC) stopRecapping() {
// If not capping, then definitely recapping.
if !s.isCapping && s.isRecapping {
log.Println("Stopping the cluster wide re-capping.")
s.recapTicker.Stop()
fcfsMutex.Lock()
s.isRecapping = false
fcfsMutex.Unlock()
}
}
func (s *FirstFitProacCC) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {
log.Printf("Received %d resource offers", len(offers))
// retrieving the available power for all the hosts in the offers.
for _, offer := range offers {
offerUtils.UpdateEnvironment(offer)
_, _, offer_watts := offerUtils.OfferAgg(offer)
s.availablePower[*offer.Hostname] = offer_watts
// setting total power if the first time.
if _, ok := s.totalPower[*offer.Hostname]; !ok {
s.totalPower[*offer.Hostname] = offer_watts
}
}
for host, tpower := range s.totalPower {
log.Printf("TotalPower[%s] = %f", host, tpower)
}
for _, offer := range offers {
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
default:
}
/*
Clusterwide Capping strategy
For each task in s.tasks,
1. Need to check whether the offer can be taken or not (based on CPU and RAM requirements).
2. If the tasks fits the offer, then I need to detemrine the cluster wide cap.
3. fcfsCurrentCapValue is updated with the determined cluster wide cap.
Cluster wide capping is currently performed at regular intervals of time.
*/
offerTaken := false
for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i]
// Don't take offer if it doesn't match our task's host requirement
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
// Does the task fit.
if s.takeOffer(offer, task) {
// Capping the cluster if haven't yet started,
if !s.isCapping {
fcfsMutex.Lock()
s.isCapping = true
fcfsMutex.Unlock()
s.startCapping()
}
offerTaken = true
tempCap, err := s.capper.FCFSDeterminedCap(s.totalPower, &task)
if err == nil {
fcfsMutex.Lock()
fcfsCurrentCapValue = tempCap
fcfsMutex.Unlock()
} else {
log.Println("Failed to determine new cluster wide cap: ")
log.Println(err)
}
log.Printf("Starting on [%s]\n", offer.GetHostname())
taskToSchedule := s.newTask(offer, task)
toSchedule := []*mesos.TaskInfo{taskToSchedule}
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, toSchedule, mesosUtils.DefaultFilter)
log.Printf("Inst: %d", *task.Instances)
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
*task.Instances--
if *task.Instances <= 0 {
// All instances of the task have been scheduled. Need to remove it from the list of tasks to schedule.
s.tasks[i] = s.tasks[len(s.tasks)-1]
s.tasks = s.tasks[:len(s.tasks)-1]
if len(s.tasks) <= 0 {
log.Println("Done scheduling all tasks")
// Need to stop the cluster wide capping as there aren't any more tasks to schedule.
s.stopCapping()
s.startRecapping() // Load changes after every task finishes and hence we need to change the capping of the cluster.
close(s.Shutdown)
}
}
break // Offer taken, move on.
} else {
// Task doesn't fit the offer. Move onto the next offer.
}
}
// If no task fit the offer, then declining the offer.
if !offerTaken {
log.Printf("There is not enough resources to launch a task on Host: %s\n", offer.GetHostname())
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
}
func (s *FirstFitProacCC) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) {
log.Printf("Received task status [%s] for task [%s]\n", NameFor(status.State), *status.TaskId.Value)
if *status.State == mesos.TaskState_TASK_RUNNING {
fcfsMutex.Lock()
s.tasksRunning++
fcfsMutex.Unlock()
} else if IsTerminal(status.State) {
delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value)
// Need to remove the task from the window of tasks.
s.capper.TaskFinished(*status.TaskId.Value)
// Determining the new cluster wide cap.
//tempCap, err := s.capper.NaiveRecap(s.totalPower, s.taskMonitor, *status.TaskId.Value)
tempCap, err := s.capper.CleverRecap(s.totalPower, s.taskMonitor, *status.TaskId.Value)
if err == nil {
// if new determined cap value is different from the current recap value then we need to recap.
if int(math.Floor(tempCap+0.5)) != int(math.Floor(fcfsRecapValue+0.5)) {
fcfsRecapValue = tempCap
fcfsMutex.Lock()
s.isRecapping = true
fcfsMutex.Unlock()
log.Printf("Determined re-cap value: %f\n", fcfsRecapValue)
} else {
fcfsMutex.Lock()
s.isRecapping = false
fcfsMutex.Unlock()
}
} else {
// Not updating fcfsCurrentCapValue
log.Println(err)
}
fcfsMutex.Lock()
s.tasksRunning--
fcfsMutex.Unlock()
if s.tasksRunning == 0 {
select {
case <-s.Shutdown:
// Need to stop the recapping process.
s.stopRecapping()
close(s.Done)
default:
}
}
}
log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
}

View file

@ -1,219 +0,0 @@
package schedulers
import (
"bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/electron/utilities/offerUtils"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler"
"log"
"os"
"sort"
"time"
)
// Decides if to take an offer or not
func (s *FirstFitSortedOffers) takeOffer(offer *mesos.Offer, task def.Task) bool {
cpus, mem, watts := offerUtils.OfferAgg(offer)
//TODO: Insert watts calculation here instead of taking them as a parameter
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
if err != nil {
// Error in determining wattsConsideration
log.Fatal(err)
}
if cpus >= task.CPU && mem >= task.RAM && (!s.wattsAsAResource || watts >= wattsConsideration) {
return true
}
return false
}
// electronScheduler implements the Scheduler interface
type FirstFitSortedOffers struct {
base // Type embedded to inherit common functions
}
// New electron scheduler
func NewFirstFitSortedOffers(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *FirstFitSortedOffers {
logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log")
if err != nil {
log.Fatal(err)
}
s := &FirstFitSortedOffers{
base: base{
tasks: tasks,
wattsAsAResource: wattsAsAResource,
classMapWatts: classMapWatts,
Shutdown: make(chan struct{}),
Done: make(chan struct{}),
PCPLog: make(chan struct{}),
running: make(map[string]map[string]bool),
RecordPCP: false,
schedTrace: log.New(logFile, "", log.LstdFlags),
},
}
return s
}
func (s *FirstFitSortedOffers) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
s.tasksCreated++
if !s.RecordPCP {
// Turn on logging
s.RecordPCP = true
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
}
// If this is our first time running into this Agent
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
}
// Add task to list of tasks running on node
s.running[offer.GetSlaveId().GoString()][taskName] = true
resources := []*mesos.Resource{
mesosutil.NewScalarResource("cpus", task.CPU),
mesosutil.NewScalarResource("mem", task.RAM),
}
if s.wattsAsAResource {
if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil {
log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider)
resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider))
} else {
// Error in determining wattsConsideration
log.Fatal(err)
}
}
return &mesos.TaskInfo{
Name: proto.String(taskName),
TaskId: &mesos.TaskID{
Value: proto.String("electron-" + taskName),
},
SlaveId: offer.SlaveId,
Resources: resources,
Command: &mesos.CommandInfo{
Value: proto.String(task.CMD),
},
Container: &mesos.ContainerInfo{
Type: mesos.ContainerInfo_DOCKER.Enum(),
Docker: &mesos.ContainerInfo_DockerInfo{
Image: proto.String(task.Image),
Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated
},
},
}
}
func (s *FirstFitSortedOffers) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {
log.Printf("Received %d resource offers", len(offers))
// Sorting the offers
sort.Sort(offerUtils.OffersSorter(offers))
// Printing the sorted offers and the corresponding CPU resource availability
log.Println("Sorted Offers:")
for i := 0; i < len(offers); i++ {
offer := offers[i]
offerUtils.UpdateEnvironment(offer)
offerCPU, _, _ := offerUtils.OfferAgg(offer)
log.Printf("Offer[%s].CPU = %f\n", offer.GetHostname(), offerCPU)
}
for _, offer := range offers {
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
default:
}
tasks := []*mesos.TaskInfo{}
// First fit strategy
offerTaken := false
for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i]
// Don't take offer if it doesn't match our task's host requirement
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
// Decision to take the offer or not
if s.takeOffer(offer, task) {
log.Println("Co-Located with: ")
coLocated(s.running[offer.GetSlaveId().GoString()])
taskToSchedule := s.newTask(offer, task)
tasks = append(tasks, taskToSchedule)
log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
offerTaken = true
fmt.Println("Inst: ", *task.Instances)
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
*task.Instances--
if *task.Instances <= 0 {
// All instances of task have been scheduled, remove it
s.tasks[i] = s.tasks[len(s.tasks)-1]
s.tasks = s.tasks[:len(s.tasks)-1]
if len(s.tasks) <= 0 {
log.Println("Done scheduling all tasks")
close(s.Shutdown)
}
}
break // Offer taken, move on
}
}
// If there was no match for the task
if !offerTaken {
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
}
func (s *FirstFitSortedOffers) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) {
log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
if *status.State == mesos.TaskState_TASK_RUNNING {
s.tasksRunning++
} else if IsTerminal(status.State) {
delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value)
s.tasksRunning--
if s.tasksRunning == 0 {
select {
case <-s.Shutdown:
close(s.Done)
default:
}
}
}
log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
}

View file

@ -1,398 +0,0 @@
/*
Ranked based cluster wide capping.
Note: Sorting the tasks right in the beginning, in ascending order of watts.
You are hence certain that the tasks that didn't fit are the ones that require more resources,
and hence, you can find a way to address that issue.
On the other hand, if you use first fit to fit the tasks and then sort them to determine the cap,
you are never certain as which tasks are the ones that don't fit and hence, it becomes much harder
to address this issue.
*/
package schedulers
import (
"bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/def"
powCap "bitbucket.org/sunybingcloud/electron/powerCapping"
"bitbucket.org/sunybingcloud/electron/rapl"
"bitbucket.org/sunybingcloud/electron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/electron/utilities/offerUtils"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler"
"log"
"math"
"os"
"sort"
"sync"
"time"
)
// Decides if to taken an offer or not
func (s *FirstFitSortedWattsProacCC) takeOffer(offer *mesos.Offer, task def.Task) bool {
offer_cpu, offer_mem, offer_watts := offerUtils.OfferAgg(offer)
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
if err != nil {
// Error in determining wattsToConsider
log.Fatal(err)
}
if offer_cpu >= task.CPU && offer_mem >= task.RAM && (!s.wattsAsAResource || offer_watts >= wattsConsideration) {
return true
}
return false
}
// electronScheduler implements the Scheduler interface
type FirstFitSortedWattsProacCC struct {
base // Type embedded to inherit common functions
taskMonitor map[string][]def.Task // store tasks that are currently running.
availablePower map[string]float64 // available power for each node in the cluster.
totalPower map[string]float64 // total power for each node in the cluster.
capper *powCap.ClusterwideCapper
ticker *time.Ticker
recapTicker *time.Ticker
isCapping bool // indicate whether we are currently performing cluster wide capping.
isRecapping bool // indicate whether we are currently performing cluster wide re-capping.
}
// New electron scheduler.
func NewFirstFitSortedWattsProacCC(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string,
classMapWatts bool) *FirstFitSortedWattsProacCC {
// Sorting tasks in ascending order of watts
sort.Sort(def.WattsSorter(tasks))
logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log")
if err != nil {
log.Fatal(err)
}
s := &FirstFitSortedWattsProacCC{
base: base{
tasks: tasks,
wattsAsAResource: wattsAsAResource,
classMapWatts: classMapWatts,
Shutdown: make(chan struct{}),
Done: make(chan struct{}),
PCPLog: make(chan struct{}),
running: make(map[string]map[string]bool),
RecordPCP: false,
schedTrace: log.New(logFile, "", log.LstdFlags),
},
taskMonitor: make(map[string][]def.Task),
availablePower: make(map[string]float64),
totalPower: make(map[string]float64),
capper: powCap.GetClusterwideCapperInstance(),
ticker: time.NewTicker(10 * time.Second),
recapTicker: time.NewTicker(20 * time.Second),
isCapping: false,
isRecapping: false,
}
return s
}
// mutex
var rankedMutex sync.Mutex
func (s *FirstFitSortedWattsProacCC) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
s.tasksCreated++
if !s.RecordPCP {
// Turn on logging.
s.RecordPCP = true
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
}
// If this is our first time running into this Agent
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
}
// Setting the task ID to the task. This is done so that we can consider each task to be different,
// even though they have the same parameters.
task.SetTaskID(*proto.String("electron-" + taskName))
// Add task to the list of tasks running on the node.
s.running[offer.GetSlaveId().GoString()][taskName] = true
if len(s.taskMonitor[*offer.Hostname]) == 0 {
s.taskMonitor[*offer.Hostname] = []def.Task{task}
} else {
s.taskMonitor[*offer.Hostname] = append(s.taskMonitor[*offer.Hostname], task)
}
resources := []*mesos.Resource{
mesosutil.NewScalarResource("cpus", task.CPU),
mesosutil.NewScalarResource("mem", task.RAM),
}
if s.wattsAsAResource {
if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil {
resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider))
} else {
// Error in determining wattsToConsider
log.Fatal(err)
}
}
return &mesos.TaskInfo{
Name: proto.String(taskName),
TaskId: &mesos.TaskID{
Value: proto.String("electron-" + taskName),
},
SlaveId: offer.SlaveId,
Resources: resources,
Command: &mesos.CommandInfo{
Value: proto.String(task.CMD),
},
Container: &mesos.ContainerInfo{
Type: mesos.ContainerInfo_DOCKER.Enum(),
Docker: &mesos.ContainerInfo_DockerInfo{
Image: proto.String(task.Image),
Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated
},
},
}
}
func (s *FirstFitSortedWattsProacCC) Disconnected(sched.SchedulerDriver) {
// Need to stop the capping process.
s.ticker.Stop()
s.recapTicker.Stop()
rankedMutex.Lock()
s.isCapping = false
rankedMutex.Unlock()
log.Println("Framework disconnected with master")
}
// go routine to cap the entire cluster in regular intervals of time.
var rankedCurrentCapValue = 0.0 // initial value to indicate that we haven't capped the cluster yet.
func (s *FirstFitSortedWattsProacCC) startCapping() {
go func() {
for {
select {
case <-s.ticker.C:
// Need to cap the cluster to the rankedCurrentCapValue.
rankedMutex.Lock()
if rankedCurrentCapValue > 0.0 {
for host, _ := range constants.Hosts {
// Rounding currentCapValue to the nearest int.
if err := rapl.Cap(host, "rapl", float64(int(math.Floor(rankedCurrentCapValue+0.5)))); err != nil {
log.Println(err)
}
}
log.Printf("Capped the cluster to %d", int(math.Floor(rankedCurrentCapValue+0.5)))
}
rankedMutex.Unlock()
}
}
}()
}
// go routine to cap the entire cluster in regular intervals of time.
var rankedRecapValue = 0.0 // The cluster wide cap value when recapping.
func (s *FirstFitSortedWattsProacCC) startRecapping() {
go func() {
for {
select {
case <-s.recapTicker.C:
rankedMutex.Lock()
// If stopped performing cluster wide capping then we need to explicitly cap the entire cluster.
if s.isRecapping && rankedRecapValue > 0.0 {
for host, _ := range constants.Hosts {
// Rounding currentCapValue to the nearest int.
if err := rapl.Cap(host, "rapl", float64(int(math.Floor(rankedRecapValue+0.5)))); err != nil {
log.Println(err)
}
}
log.Printf("Recapped the cluster to %d", int(math.Floor(rankedRecapValue+0.5)))
}
// setting recapping to false
s.isRecapping = false
rankedMutex.Unlock()
}
}
}()
}
// Stop cluster wide capping
func (s *FirstFitSortedWattsProacCC) stopCapping() {
if s.isCapping {
log.Println("Stopping the cluster wide capping.")
s.ticker.Stop()
fcfsMutex.Lock()
s.isCapping = false
s.isRecapping = true
fcfsMutex.Unlock()
}
}
// Stop cluster wide Recapping
func (s *FirstFitSortedWattsProacCC) stopRecapping() {
// If not capping, then definitely recapping.
if !s.isCapping && s.isRecapping {
log.Println("Stopping the cluster wide re-capping.")
s.recapTicker.Stop()
fcfsMutex.Lock()
s.isRecapping = false
fcfsMutex.Unlock()
}
}
func (s *FirstFitSortedWattsProacCC) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {
log.Printf("Received %d resource offers", len(offers))
// retrieving the available power for all the hosts in the offers.
for _, offer := range offers {
offerUtils.UpdateEnvironment(offer)
_, _, offer_watts := offerUtils.OfferAgg(offer)
s.availablePower[*offer.Hostname] = offer_watts
// setting total power if the first time.
if _, ok := s.totalPower[*offer.Hostname]; !ok {
s.totalPower[*offer.Hostname] = offer_watts
}
}
for host, tpower := range s.totalPower {
log.Printf("TotalPower[%s] = %f", host, tpower)
}
for _, offer := range offers {
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
default:
}
/*
Ranked cluster wide capping strategy
For each task in the sorted tasks,
1. Need to check whether the offer can be taken or not (based on CPU, RAM and WATTS requirements).
2. If the task fits the offer, then need to determine the cluster wide cap.'
3. rankedCurrentCapValue is updated with the determined cluster wide cap.
Once we are done scheduling all the tasks,
we start recalculating the cluster wide cap each time a task finishes.
Cluster wide capping is currently performed at regular intervals of time.
*/
offerTaken := false
for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i]
// Don't take offer if it doesn't match our task's host requirement
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
// Does the task fit.
if s.takeOffer(offer, task) {
// Capping the cluster if haven't yet started
if !s.isCapping {
rankedMutex.Lock()
s.isCapping = true
rankedMutex.Unlock()
s.startCapping()
}
offerTaken = true
tempCap, err := s.capper.FCFSDeterminedCap(s.totalPower, &task)
if err == nil {
rankedMutex.Lock()
rankedCurrentCapValue = tempCap
rankedMutex.Unlock()
} else {
log.Println("Failed to determine the new cluster wide cap: ", err)
}
log.Printf("Starting on [%s]\n", offer.GetHostname())
taskToSchedule := s.newTask(offer, task)
to_schedule := []*mesos.TaskInfo{taskToSchedule}
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, to_schedule, mesosUtils.DefaultFilter)
log.Printf("Inst: %d", *task.Instances)
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
*task.Instances--
if *task.Instances <= 0 {
// All instances of the task have been scheduled. Need to remove it from the list of tasks to schedule.
s.tasks[i] = s.tasks[len(s.tasks)-1]
s.tasks = s.tasks[:len(s.tasks)-1]
if len(s.tasks) <= 0 {
log.Println("Done scheduling all tasks")
// Need to stop the cluster wide capping as there aren't any more tasks to schedule.
s.stopCapping()
s.startRecapping()
close(s.Shutdown)
}
}
break // Offer taken, move on.
} else {
// Task doesn't fit the offer. Move onto the next offer.
}
}
// If no tasks fit the offer, then declining the offer.
if !offerTaken {
log.Printf("There is not enough resources to launch a task on Host: %s\n", offer.GetHostname())
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
}
func (s *FirstFitSortedWattsProacCC) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) {
log.Printf("Received task status [%s] for task [%s]\n", NameFor(status.State), *status.TaskId.Value)
if *status.State == mesos.TaskState_TASK_RUNNING {
rankedMutex.Lock()
s.tasksRunning++
rankedMutex.Unlock()
} else if IsTerminal(status.State) {
delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value)
rankedMutex.Lock()
s.tasksRunning--
rankedMutex.Unlock()
if s.tasksRunning == 0 {
select {
case <-s.Shutdown:
// Need to stop the recapping process.
s.stopRecapping()
close(s.Done)
default:
}
} else {
// Need to remove the task from the window
s.capper.TaskFinished(*status.TaskId.Value)
// Determining the new cluster wide cap.
//tempCap, err := s.capper.NaiveRecap(s.totalPower, s.taskMonitor, *status.TaskId.Value)
tempCap, err := s.capper.CleverRecap(s.totalPower, s.taskMonitor, *status.TaskId.Value)
if err == nil {
// If new determined cap value is different from the current recap value then we need to recap.
if int(math.Floor(tempCap+0.5)) != int(math.Floor(rankedRecapValue+0.5)) {
rankedRecapValue = tempCap
rankedMutex.Lock()
s.isRecapping = true
rankedMutex.Unlock()
log.Printf("Determined re-cap value: %f\n", rankedRecapValue)
} else {
rankedMutex.Lock()
s.isRecapping = false
rankedMutex.Unlock()
}
} else {
// Not updating rankedCurrentCapValue
log.Println(err)
}
}
}
log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
}

View file

@ -1,222 +0,0 @@
package schedulers
import (
"bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/electron/utilities/offerUtils"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler"
"log"
"os"
"sort"
"time"
)
// Decides if to take an offer or not
func (s *FirstFitSortedWattsSortedOffers) takeOffer(offer *mesos.Offer, task def.Task) bool {
cpus, mem, watts := offerUtils.OfferAgg(offer)
//TODO: Insert watts calculation here instead of taking them as a parameter
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
if err != nil {
// Error in determining wattsConsideration
log.Fatal(err)
}
if cpus >= task.CPU && mem >= task.RAM && (!s.wattsAsAResource || watts >= wattsConsideration) {
return true
}
return false
}
// electronScheduler implements the Scheduler interface
type FirstFitSortedWattsSortedOffers struct {
base // Type embedded to inherit common functions
}
// New electron scheduler
func NewFirstFitSortedWattsSortedOffers(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string,
classMapWatts bool) *FirstFitSortedWattsSortedOffers {
// Sorting the tasks in increasing order of watts requirement.
sort.Sort(def.WattsSorter(tasks))
logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log")
if err != nil {
log.Fatal(err)
}
s := &FirstFitSortedWattsSortedOffers{
base: base{
tasks: tasks,
wattsAsAResource: wattsAsAResource,
classMapWatts: classMapWatts,
Shutdown: make(chan struct{}),
Done: make(chan struct{}),
PCPLog: make(chan struct{}),
running: make(map[string]map[string]bool),
RecordPCP: false,
schedTrace: log.New(logFile, "", log.LstdFlags),
},
}
return s
}
func (s *FirstFitSortedWattsSortedOffers) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
s.tasksCreated++
if !s.RecordPCP {
// Turn on logging
s.RecordPCP = true
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
}
// If this is our first time running into this Agent
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
}
// Add task to list of tasks running on node
s.running[offer.GetSlaveId().GoString()][taskName] = true
resources := []*mesos.Resource{
mesosutil.NewScalarResource("cpus", task.CPU),
mesosutil.NewScalarResource("mem", task.RAM),
}
if s.wattsAsAResource {
if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil {
log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider)
resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider))
} else {
// Error in determining wattsConsideration
log.Fatal(err)
}
}
return &mesos.TaskInfo{
Name: proto.String(taskName),
TaskId: &mesos.TaskID{
Value: proto.String("electron-" + taskName),
},
SlaveId: offer.SlaveId,
Resources: resources,
Command: &mesos.CommandInfo{
Value: proto.String(task.CMD),
},
Container: &mesos.ContainerInfo{
Type: mesos.ContainerInfo_DOCKER.Enum(),
Docker: &mesos.ContainerInfo_DockerInfo{
Image: proto.String(task.Image),
Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated
},
},
}
}
func (s *FirstFitSortedWattsSortedOffers) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {
// Sorting the offers
sort.Sort(offerUtils.OffersSorter(offers))
// Printing the sorted offers and the corresponding CPU resource availability
log.Println("Sorted Offers:")
for i := 0; i < len(offers); i++ {
offer := offers[i]
offerUtils.UpdateEnvironment(offer)
offerCPU, _, _ := offerUtils.OfferAgg(offer)
log.Printf("Offer[%s].CPU = %f\n", offer.GetHostname(), offerCPU)
}
log.Printf("Received %d resource offers", len(offers))
for _, offer := range offers {
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
default:
}
tasks := []*mesos.TaskInfo{}
// First fit strategy
offerTaken := false
for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i]
// Don't take offer if it doesn't match our task's host requirement
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
// Decision to take the offer or not
if s.takeOffer(offer, task) {
log.Println("Co-Located with: ")
coLocated(s.running[offer.GetSlaveId().GoString()])
taskToSchedule := s.newTask(offer, task)
tasks = append(tasks, taskToSchedule)
log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
offerTaken = true
fmt.Println("Inst: ", *task.Instances)
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
*task.Instances--
if *task.Instances <= 0 {
// All instances of task have been scheduled, remove it
s.tasks = append(s.tasks[:i], s.tasks[i+1:]...)
if len(s.tasks) <= 0 {
log.Println("Done scheduling all tasks")
close(s.Shutdown)
}
}
break // Offer taken, move on
}
}
// If there was no match for the task
if !offerTaken {
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
}
func (s *FirstFitSortedWattsSortedOffers) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) {
log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
if *status.State == mesos.TaskState_TASK_RUNNING {
s.tasksRunning++
} else if IsTerminal(status.State) {
delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value)
s.tasksRunning--
if s.tasksRunning == 0 {
select {
case <-s.Shutdown:
close(s.Done)
default:
}
}
}
log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
}

View file

@ -1,209 +0,0 @@
package schedulers
import (
"bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/electron/utilities/offerUtils"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler"
"log"
"os"
"sort"
"time"
)
// Decides if to take an offer or not
func (s *FirstFitSortedWatts) takeOffer(offer *mesos.Offer, task def.Task) bool {
cpus, mem, watts := offerUtils.OfferAgg(offer)
//TODO: Insert watts calculation here instead of taking them as a parameter
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
if err != nil {
// Error in determining wattsConsideration
log.Fatal(err)
}
if cpus >= task.CPU && mem >= task.RAM && (!s.wattsAsAResource || watts >= wattsConsideration) {
return true
}
return false
}
// electronScheduler implements the Scheduler interface
type FirstFitSortedWatts struct {
base // Type embedded to inherit common functions
}
// New electron scheduler
func NewFirstFitSortedWatts(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *FirstFitSortedWatts {
sort.Sort(def.WattsSorter(tasks))
logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log")
if err != nil {
log.Fatal(err)
}
s := &FirstFitSortedWatts{
base: base{
tasks: tasks,
wattsAsAResource: wattsAsAResource,
classMapWatts: classMapWatts,
Shutdown: make(chan struct{}),
Done: make(chan struct{}),
PCPLog: make(chan struct{}),
running: make(map[string]map[string]bool),
RecordPCP: false,
schedTrace: log.New(logFile, "", log.LstdFlags),
},
}
return s
}
func (s *FirstFitSortedWatts) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
s.tasksCreated++
if !s.RecordPCP {
// Turn on logging
s.RecordPCP = true
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
}
// If this is our first time running into this Agent
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
}
// Add task to list of tasks running on node
s.running[offer.GetSlaveId().GoString()][taskName] = true
resources := []*mesos.Resource{
mesosutil.NewScalarResource("cpus", task.CPU),
mesosutil.NewScalarResource("mem", task.RAM),
}
if s.wattsAsAResource {
if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil {
log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider)
resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider))
} else {
// Error in determining wattsConsideration
log.Fatal(err)
}
}
return &mesos.TaskInfo{
Name: proto.String(taskName),
TaskId: &mesos.TaskID{
Value: proto.String("electron-" + taskName),
},
SlaveId: offer.SlaveId,
Resources: resources,
Command: &mesos.CommandInfo{
Value: proto.String(task.CMD),
},
Container: &mesos.ContainerInfo{
Type: mesos.ContainerInfo_DOCKER.Enum(),
Docker: &mesos.ContainerInfo_DockerInfo{
Image: proto.String(task.Image),
Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated
},
},
}
}
func (s *FirstFitSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {
log.Printf("Received %d resource offers", len(offers))
for _, offer := range offers {
offerUtils.UpdateEnvironment(offer)
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
default:
}
tasks := []*mesos.TaskInfo{}
// First fit strategy
offerTaken := false
for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i]
// Don't take offer if it doesn't match our task's host requirement
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
// Decision to take the offer or not
if s.takeOffer(offer, task) {
log.Println("Co-Located with: ")
coLocated(s.running[offer.GetSlaveId().GoString()])
taskToSchedule := s.newTask(offer, task)
tasks = append(tasks, taskToSchedule)
log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
offerTaken = true
fmt.Println("Inst: ", *task.Instances)
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
*task.Instances--
if *task.Instances <= 0 {
// All instances of task have been scheduled, remove it
s.tasks = append(s.tasks[:i], s.tasks[i+1:]...)
if len(s.tasks) <= 0 {
log.Println("Done scheduling all tasks")
close(s.Shutdown)
}
}
break // Offer taken, move on
}
}
// If there was no match for the task
if !offerTaken {
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
}
func (s *FirstFitSortedWatts) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) {
log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
if *status.State == mesos.TaskState_TASK_RUNNING {
s.tasksRunning++
} else if IsTerminal(status.State) {
delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value)
s.tasksRunning--
if s.tasksRunning == 0 {
select {
case <-s.Shutdown:
close(s.Done)
default:
}
}
}
log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
}

View file

@ -1,200 +0,0 @@
package schedulers
import (
"bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/electron/utilities/offerUtils"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler"
"log"
"os"
"time"
)
// Decides if to take an offer or not
func (s *FirstFitWattsOnly) takeOffer(offer *mesos.Offer, task def.Task) bool {
_, _, watts := offerUtils.OfferAgg(offer)
//TODO: Insert watts calculation here instead of taking them as a parameter
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
if err != nil {
// Error in determining wattsConsideration
log.Fatal(err)
}
if watts >= wattsConsideration {
return true
}
return false
}
type FirstFitWattsOnly struct {
base // Type embedded to inherit common functions
}
// New electron scheduler
func NewFirstFitWattsOnly(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *FirstFitWattsOnly {
logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log")
if err != nil {
log.Fatal(err)
}
s := &FirstFitWattsOnly{
base: base{
tasks: tasks,
wattsAsAResource: wattsAsAResource,
classMapWatts: classMapWatts,
Shutdown: make(chan struct{}),
Done: make(chan struct{}),
PCPLog: make(chan struct{}),
running: make(map[string]map[string]bool),
RecordPCP: false,
schedTrace: log.New(logFile, "", log.LstdFlags),
},
}
return s
}
func (s *FirstFitWattsOnly) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
s.tasksCreated++
if !s.RecordPCP {
// Turn on logging
s.RecordPCP = true
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
}
// If this is our first time running into this Agent
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
}
// Add task to list of tasks running on node
s.running[offer.GetSlaveId().GoString()][taskName] = true
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
if err != nil {
// Error in determining wattsConsideration
log.Fatal(err)
}
resources := []*mesos.Resource{
mesosutil.NewScalarResource("watts", wattsConsideration),
}
return &mesos.TaskInfo{
Name: proto.String(taskName),
TaskId: &mesos.TaskID{
Value: proto.String("electron-" + taskName),
},
SlaveId: offer.SlaveId,
Resources: resources,
Command: &mesos.CommandInfo{
Value: proto.String(task.CMD),
},
Container: &mesos.ContainerInfo{
Type: mesos.ContainerInfo_DOCKER.Enum(),
Docker: &mesos.ContainerInfo_DockerInfo{
Image: proto.String(task.Image),
Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated
},
},
}
}
func (s *FirstFitWattsOnly) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {
log.Printf("Received %d resource offers", len(offers))
for _, offer := range offers {
offerUtils.UpdateEnvironment(offer)
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
default:
}
tasks := []*mesos.TaskInfo{}
// First fit strategy
offerTaken := false
for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i]
// Don't take offer if it doesn't match our task's host requirement
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
// Decision to take the offer or not
if s.takeOffer(offer, task) {
log.Println("Co-Located with: ")
coLocated(s.running[offer.GetSlaveId().GoString()])
taskToSchedule := s.newTask(offer, task)
tasks = append(tasks, taskToSchedule)
log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
offerTaken = true
fmt.Println("Inst: ", *task.Instances)
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
*task.Instances--
if *task.Instances <= 0 {
// All instances of task have been scheduled, remove it
s.tasks[i] = s.tasks[len(s.tasks)-1]
s.tasks = s.tasks[:len(s.tasks)-1]
if len(s.tasks) <= 0 {
log.Println("Done scheduling all tasks")
close(s.Shutdown)
}
}
break // Offer taken, move on
}
}
// If there was no match for the task
if !offerTaken {
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
}
func (s *FirstFitWattsOnly) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) {
log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
if *status.State == mesos.TaskState_TASK_RUNNING {
s.tasksRunning++
} else if IsTerminal(status.State) {
delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value)
s.tasksRunning--
if s.tasksRunning == 0 {
select {
case <-s.Shutdown:
close(s.Done)
default:
}
}
}
log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
}

View file

@ -1,341 +0,0 @@
package schedulers
import (
"bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/electron/utilities/offerUtils"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler"
"log"
"os"
"sort"
"time"
)
/*
Tasks are categorized into small and large tasks based on the watts requirement.
All the small tasks are packed into offers from agents belonging to power class C and power class D, using BinPacking.
All the large tasks are spread among the offers from agents belonging to power class A and power class B, using FirstFit.
This was done to give a little more room for the large tasks (power intensive) for execution and reduce the possibility of
starvation of power intensive tasks.
*/
func (s *TopHeavy) takeOfferBinPack(offer *mesos.Offer, totalCPU, totalRAM, totalWatts,
wattsToConsider float64, task def.Task) bool {
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
//TODO: Insert watts calculation here instead of taking them as a parameter
if (!s.wattsAsAResource || (offerWatts >= (totalWatts + wattsToConsider))) &&
(offerCPU >= (totalCPU + task.CPU)) &&
(offerRAM >= (totalRAM + task.RAM)) {
return true
}
return false
}
func (s *TopHeavy) takeOfferFirstFit(offer *mesos.Offer, wattsConsideration float64, task def.Task) bool {
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
//TODO: Insert watts calculation here instead of taking them as a parameter
if (!s.wattsAsAResource || (offerWatts >= wattsConsideration)) &&
(offerCPU >= task.CPU) && (offerRAM >= task.RAM) {
return true
}
return false
}
// electronScheduler implements the Scheduler interface
type TopHeavy struct {
base // Type embedded to inherit common functions
smallTasks, largeTasks []def.Task
}
// New electron scheduler
func NewTopHeavy(tasks []def.Task, wattsAsAResource bool, schedTracePrefix string, classMapWatts bool) *TopHeavy {
sort.Sort(def.WattsSorter(tasks))
logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log")
if err != nil {
log.Fatal(err)
}
// Classification done based on MMPU watts requirements, into 2 clusters.
classifiedTasks := def.ClassifyTasks(tasks, 2)
s := &TopHeavy{
base: base{
wattsAsAResource: wattsAsAResource,
classMapWatts: classMapWatts,
Shutdown: make(chan struct{}),
Done: make(chan struct{}),
PCPLog: make(chan struct{}),
running: make(map[string]map[string]bool),
RecordPCP: false,
schedTrace: log.New(logFile, "", log.LstdFlags),
},
// Separating small tasks from large tasks.
smallTasks: classifiedTasks[0].Tasks,
largeTasks: classifiedTasks[1].Tasks,
}
return s
}
func (s *TopHeavy) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
s.tasksCreated++
if !s.RecordPCP {
// Turn on logging
s.RecordPCP = true
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
}
// If this is our first time running into this Agent
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
}
// Add task to list of tasks running on node
s.running[offer.GetSlaveId().GoString()][taskName] = true
resources := []*mesos.Resource{
mesosutil.NewScalarResource("cpus", task.CPU),
mesosutil.NewScalarResource("mem", task.RAM),
}
if s.wattsAsAResource {
if wattsToConsider, err := def.WattsToConsider(task, s.classMapWatts, offer); err == nil {
log.Printf("Watts considered for host[%s] and task[%s] = %f", *offer.Hostname, task.Name, wattsToConsider)
resources = append(resources, mesosutil.NewScalarResource("watts", wattsToConsider))
} else {
// Error in determining wattsConsideration
log.Fatal(err)
}
}
return &mesos.TaskInfo{
Name: proto.String(taskName),
TaskId: &mesos.TaskID{
Value: proto.String("electron-" + taskName),
},
SlaveId: offer.SlaveId,
Resources: resources,
Command: &mesos.CommandInfo{
Value: proto.String(task.CMD),
},
Container: &mesos.ContainerInfo{
Type: mesos.ContainerInfo_DOCKER.Enum(),
Docker: &mesos.ContainerInfo_DockerInfo{
Image: proto.String(task.Image),
Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated
},
},
}
}
// Shut down scheduler if no more tasks to schedule
func (s *TopHeavy) shutDownIfNecessary() {
if len(s.smallTasks) <= 0 && len(s.largeTasks) <= 0 {
log.Println("Done scheduling all tasks")
close(s.Shutdown)
}
}
// create TaskInfo and log scheduling trace
func (s *TopHeavy) createTaskInfoAndLogSchedTrace(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
log.Println("Co-Located with:")
coLocated(s.running[offer.GetSlaveId().GoString()])
taskToSchedule := s.newTask(offer, task)
fmt.Println("Inst: ", *task.Instances)
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
*task.Instances--
return taskToSchedule
}
// Using BinPacking to pack small tasks into this offer.
func (s *TopHeavy) pack(offers []*mesos.Offer, driver sched.SchedulerDriver) {
for _, offer := range offers {
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
default:
}
tasks := []*mesos.TaskInfo{}
totalWatts := 0.0
totalCPU := 0.0
totalRAM := 0.0
taken := false
for i := 0; i < len(s.smallTasks); i++ {
task := s.smallTasks[i]
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
if err != nil {
// Error in determining wattsConsideration
log.Fatal(err)
}
for *task.Instances > 0 {
// Does the task fit
// OR lazy evaluation. If ignore watts is set to true, second statement won't
// be evaluated.
if s.takeOfferBinPack(offer, totalCPU, totalRAM, totalWatts, wattsConsideration, task) {
taken = true
totalWatts += wattsConsideration
totalCPU += task.CPU
totalRAM += task.RAM
tasks = append(tasks, s.createTaskInfoAndLogSchedTrace(offer, task))
if *task.Instances <= 0 {
// All instances of task have been scheduled, remove it
s.smallTasks = append(s.smallTasks[:i], s.smallTasks[i+1:]...)
s.shutDownIfNecessary()
}
} else {
break // Continue on to next task
}
}
}
if taken {
log.Printf("Starting on [%s]\n", offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
} else {
// If there was no match for the task
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
}
// Using first fit to spread large tasks into these offers.
func (s *TopHeavy) spread(offers []*mesos.Offer, driver sched.SchedulerDriver) {
for _, offer := range offers {
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
default:
}
tasks := []*mesos.TaskInfo{}
offerTaken := false
for i := 0; i < len(s.largeTasks); i++ {
task := s.largeTasks[i]
wattsConsideration, err := def.WattsToConsider(task, s.classMapWatts, offer)
if err != nil {
// Error in determining wattsConsideration
log.Fatal(err)
}
// Decision to take the offer or not
if s.takeOfferFirstFit(offer, wattsConsideration, task) {
offerTaken = true
tasks = append(tasks, s.createTaskInfoAndLogSchedTrace(offer, task))
log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
if *task.Instances <= 0 {
// All instances of task have been scheduled, remove it
s.largeTasks = append(s.largeTasks[:i], s.largeTasks[i+1:]...)
s.shutDownIfNecessary()
}
break // Offer taken, move on
}
}
if !offerTaken {
// If there was no match for the task
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
}
func (s *TopHeavy) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {
log.Printf("Received %d resource offers", len(offers))
// We need to separate the offers into
// offers from ClassA and ClassB and offers from ClassC and ClassD.
// Offers from ClassA and ClassB would execute the large tasks.
// Offers from ClassC and ClassD would execute the small tasks.
offersHeavyPowerClasses := []*mesos.Offer{}
offersLightPowerClasses := []*mesos.Offer{}
for _, offer := range offers {
offerUtils.UpdateEnvironment(offer)
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
default:
}
if _, ok := constants.PowerClasses["A"][*offer.Hostname]; ok {
offersHeavyPowerClasses = append(offersHeavyPowerClasses, offer)
}
if _, ok := constants.PowerClasses["B"][*offer.Hostname]; ok {
offersHeavyPowerClasses = append(offersHeavyPowerClasses, offer)
}
if _, ok := constants.PowerClasses["C"][*offer.Hostname]; ok {
offersLightPowerClasses = append(offersLightPowerClasses, offer)
}
if _, ok := constants.PowerClasses["D"][*offer.Hostname]; ok {
offersLightPowerClasses = append(offersLightPowerClasses, offer)
}
}
log.Println("Spreading Large tasks into ClassAB Offers:")
for _, o := range offersHeavyPowerClasses {
log.Println(*o.Hostname)
}
log.Println("Packing Small tasks into ClassCD Offers:")
for _, o := range offersLightPowerClasses {
log.Println(*o.Hostname)
}
// Packing tasks into offersLightPowerClasses
s.pack(offersLightPowerClasses, driver)
// Spreading tasks among offersHeavyPowerClasses
s.spread(offersHeavyPowerClasses, driver)
}
func (s *TopHeavy) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) {
log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
if *status.State == mesos.TaskState_TASK_RUNNING {
s.tasksRunning++
} else if IsTerminal(status.State) {
delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value)
s.tasksRunning--
if s.tasksRunning == 0 {
select {
case <-s.Shutdown:
close(s.Done)
default:
}
}
}
log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
}