Merged in hybridSchedulerAndSortedOffers (pull request #8)

Hybrid Scheduler -- TopHeavy, BottomHeavy, FirstFit and BinPacked schedulers with sortedOffers.
This commit is contained in:
Pradyumna Kaushik 2017-02-10 20:21:04 +00:00 committed by Renan DelValle
commit a0a3e78041
29 changed files with 1826 additions and 436 deletions

View file

@ -8,12 +8,14 @@ To Do:
* Add ability to use constraints
* Running average calculations https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
* Make parameters corresponding to each scheduler configurable (possible to have a config template for each scheduler?)
* TODO : Adding type of scheduler to be used, to be picked from a config file, along with it's configurable parameters.
* Write test code for each scheduler (This should be after the design change)
* Some of the constants in constants/constants.go can vary based on the environment.
Possible to setup the constants at runtime based on the environment?
* Log fix for declining offer -- different reason when insufficient resources as compared to when there are no
longer any tasks to schedule.
* Have a centralised logFile that can be filtered by identifier. All electron logs should go into this file.
* Make ClassMapWatts to commandLine arguments so Electron can be run with ClassMapWatts enabled/disabled.
**Requires [Performance Co-Pilot](http://pcp.io/) tool pmdumptext to be installed on the

View file

@ -6,6 +6,8 @@ Constants that are used across scripts
5. window_size = number of tasks to consider for computation of the dynamic cap.
Also, exposing functions to update or initialize some of the constants.
TODO: Clean this up and use Mesos Attributes instead.
*/
package constants
@ -14,6 +16,24 @@ var Hosts = []string{"stratos-001.cs.binghamton.edu", "stratos-002.cs.binghamton
"stratos-005.cs.binghamton.edu", "stratos-006.cs.binghamton.edu",
"stratos-007.cs.binghamton.edu", "stratos-008.cs.binghamton.edu"}
// Classification of the nodes in the cluster based on their power consumption.
var PowerClasses = map[string]map[string]bool{
"ClassA": map[string]bool{
"stratos-005.cs.binghamton.edu": true,
"stratos-006.cs.binghamton.edu": true,
},
"ClassB": map[string]bool{
"stratos-007.cs.binghamton.edu": true,
"stratos-008.cs.binghamton.edu": true,
},
"ClassC": map[string]bool{
"stratos-001.cs.binghamton.edu": true,
"stratos-002.cs.binghamton.edu": true,
"stratos-003.cs.binghamton.edu": true,
"stratos-004.cs.binghamton.edu": true,
},
}
// Add a new host to the slice of hosts.
func AddNewHost(newHost string) bool {
// Validation
@ -68,7 +88,7 @@ func UpdateCapMargin(newCapMargin float64) bool {
var StarvationFactor = PowerThreshold / CapMargin
// Window size for running average
var WindowSize = 20
var ConsiderationWindowSize = 20
// Update the window size.
func UpdateWindowSize(newWindowSize int) bool {
@ -76,7 +96,7 @@ func UpdateWindowSize(newWindowSize int) bool {
if newWindowSize == 0 {
return false
} else {
WindowSize = newWindowSize
ConsiderationWindowSize = newWindowSize
return true
}
}

View file

@ -1,9 +1,9 @@
/*
Cluster wide dynamic capping
This is not a scheduler but a scheduling scheme that schedulers can use.
This is a capping strategy that can be used with schedulers to improve the power consumption.
*/
package pcp
package powerCapping
import (
"bitbucket.org/sunybingcloud/electron/constants"
@ -251,7 +251,7 @@ func (capper ClusterwideCapper) FCFSDeterminedCap(totalPower map[string]float64,
return 100, errors.New("Invalid argument: totalPower")
} else {
// Need to calculate the running average
runningAverage := runAvg.Calc(taskWrapper{task: *newTask}, constants.WindowSize)
runningAverage := runAvg.Calc(taskWrapper{task: *newTask}, constants.ConsiderationWindowSize)
// For each node, calculate the percentage of the running average to the total power.
ratios := make(map[string]float64)
for host, tpower := range totalPower {
@ -271,5 +271,5 @@ func (capper ClusterwideCapper) FCFSDeterminedCap(totalPower map[string]float64,
// Stringer for an instance of clusterwideCapper
func (capper ClusterwideCapper) String() string {
return "Cluster Capper -- Proactively cap the entire cluster."
return "Cluster-wide Capper -- Proactively cap the entire cluster."
}

View file

@ -58,7 +58,7 @@ func main() {
startTime := time.Now().Format("20060102150405")
logPrefix := *pcplogPrefix + "_" + startTime
scheduler := schedulers.NewFirstFitSortedWattsReducedWAR(tasks, *ignoreWatts, logPrefix)
scheduler := schedulers.NewBinPackSortedWatts(tasks, *ignoreWatts, logPrefix)
driver, err := sched.NewMesosSchedulerDriver(sched.DriverConfig{
Master: *master,
Framework: &mesos.FrameworkInfo{
@ -96,7 +96,7 @@ func main() {
// Signals we have scheduled every task we have
select {
case <-scheduler.Shutdown:
// case <-time.After(shutdownTimeout):
//case <-time.After(shutdownTimeout):
}
// All tasks have finished
@ -104,7 +104,7 @@ func main() {
case <-scheduler.Done:
close(scheduler.PCPLog)
time.Sleep(5 * time.Second) //Wait for PCP to log a few more seconds
// case <-time.After(shutdownTimeout):
//case <-time.After(shutdownTimeout):
}
// Done shutting down

View file

@ -8,12 +8,20 @@ To Do:
* Separate the capping strategies from the scheduling algorithms and make it possible to use any capping strategy with any scheduler.
* Make newTask(...) variadic where the newTaskClass argument can either be given or not. If not give, then pick task.Watts as the watts attribute, else pick task.ClassToWatts[newTaskClass].
* Retrofit pcp/proactiveclusterwidecappers.go to include the power capping go routines and to cap only when necessary.
* Create a package that would contain routines to perform various logging and move helpers.coLocated(...) into that.
* Retrofit schedulers to be able to run either using ClassMapWatts enabled or disabled.
Scheduling Algorithms:
* First Fit
* First Fit with sorted watts
* Bin-packing with sorted watts
* FCFS Proactive Cluster-wide Capping
* Ranked Proactive Cluster-wide Capping
* Piston Capping -- Works when scheduler is run with WAR
* ClassMapWatts -- Bin-packing and First Fit that now use Watts per power class.
* Top Heavy -- Hybrid scheduler that packs small tasks (less power intensive) using Bin-packing and spreads large tasks (power intensive) using First Fit.
* Bottom Heavy -- Hybrid scheduler that packs large tasks (power intensive) using Bin-packing and spreads small tasks (less power intensive) using First Fit.
Capping Strategies
* Extrema Dynamic Capping
* Proactive Cluster-wide Capping
* Piston Capping

View file

@ -0,0 +1,232 @@
package schedulers
import (
"bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/electron/utilities/offerUtils"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler"
"log"
"os"
"sort"
"time"
)
// Decides if to take an offer or not
func (s *BinPackSortedWattsSortedOffers) takeOffer(offer *mesos.Offer, totalCPU, totalRAM,
totalWatts float64, task def.Task) bool {
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
//TODO: Insert watts calculation here instead of taking them as a parameter
// Does the task fit
if (s.ignoreWatts || (offerWatts >= (totalWatts + task.Watts))) &&
(offerCPU >= (totalCPU + task.CPU)) &&
(offerRAM >= (totalRAM + task.RAM)) {
return true
}
return false
}
type BinPackSortedWattsSortedOffers struct {
base // Type embedded to inherit common functions
tasksCreated int
tasksRunning int
tasks []def.Task
metrics map[string]def.Metric
running map[string]map[string]bool
ignoreWatts bool
// First set of PCP values are garbage values, signal to logger to start recording when we're
// about to schedule a new task
RecordPCP bool
// This channel is closed when the program receives an interrupt,
// signalling that the program should shut down.
Shutdown chan struct{}
// This channel is closed after shutdown is closed, and only when all
// outstanding tasks have been cleaned up
Done chan struct{}
// Controls when to shutdown pcp logging
PCPLog chan struct{}
schedTrace *log.Logger
}
// New electron scheduler
func NewBinPackSortedWattsSortedOffers(tasks []def.Task, ignoreWatts bool, schedTracePrefix string) *BinPackSortedWattsSortedOffers {
sort.Sort(def.WattsSorter(tasks))
logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log")
if err != nil {
log.Fatal(err)
}
s := &BinPackSortedWattsSortedOffers{
tasks: tasks,
ignoreWatts: ignoreWatts,
Shutdown: make(chan struct{}),
Done: make(chan struct{}),
PCPLog: make(chan struct{}),
running: make(map[string]map[string]bool),
RecordPCP: false,
schedTrace: log.New(logFile, "", log.LstdFlags),
}
return s
}
func (s *BinPackSortedWattsSortedOffers) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
s.tasksCreated++
if !s.RecordPCP {
// Turn on logging
s.RecordPCP = true
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
}
// If this is our first time running into this Agent
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
}
// Add task to list of tasks running on node
s.running[offer.GetSlaveId().GoString()][taskName] = true
resources := []*mesos.Resource{
mesosutil.NewScalarResource("cpus", task.CPU),
mesosutil.NewScalarResource("mem", task.RAM),
}
if !s.ignoreWatts {
resources = append(resources, mesosutil.NewScalarResource("watts", task.Watts))
}
return &mesos.TaskInfo{
Name: proto.String(taskName),
TaskId: &mesos.TaskID{
Value: proto.String("electron-" + taskName),
},
SlaveId: offer.SlaveId,
Resources: resources,
Command: &mesos.CommandInfo{
Value: proto.String(task.CMD),
},
Container: &mesos.ContainerInfo{
Type: mesos.ContainerInfo_DOCKER.Enum(),
Docker: &mesos.ContainerInfo_DockerInfo{
Image: proto.String(task.Image),
Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated
},
},
}
}
func (s *BinPackSortedWattsSortedOffers) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {
log.Printf("Received %d resource offers", len(offers))
// Sorting the offers
sort.Sort(offerUtils.OffersSorter(offers))
// Printing the sorted offers and the corresponding CPU resource availability
log.Println("Sorted Offers:")
for i := 0; i < len(offers); i++ {
offer := offers[i]
offerCPU, _, _ := offerUtils.OfferAgg(offer)
log.Printf("Offer[%s].CPU = %f\n", offer.GetHostname(), offerCPU)
}
for _, offer := range offers {
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
default:
}
tasks := []*mesos.TaskInfo{}
offerTaken := false
totalWatts := 0.0
totalCPU := 0.0
totalRAM := 0.0
for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i]
// Don't take offer if it doesn't match our task's host requirement
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
for *task.Instances > 0 {
// Does the task fit
if s.takeOffer(offer, totalCPU, totalRAM, totalWatts, task) {
offerTaken = true
totalWatts += task.Watts
totalCPU += task.CPU
totalRAM += task.RAM
log.Println("Co-Located with: ")
coLocated(s.running[offer.GetSlaveId().GoString()])
taskToSchedule := s.newTask(offer, task)
tasks = append(tasks, taskToSchedule)
fmt.Println("Inst: ", *task.Instances)
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
*task.Instances--
if *task.Instances <= 0 {
// All instances of task have been scheduled, remove it
s.tasks = append(s.tasks[:i], s.tasks[i+1:]...)
if len(s.tasks) <= 0 {
log.Println("Done scheduling all tasks")
close(s.Shutdown)
}
}
} else {
break // Continue on to next offer
}
}
}
if offerTaken {
log.Printf("Starting on [%s]\n", offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
} else {
// If there was no match for the task
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
}
func (s *BinPackSortedWattsSortedOffers) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) {
log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
if *status.State == mesos.TaskState_TASK_RUNNING {
s.tasksRunning++
} else if IsTerminal(status.State) {
delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value)
s.tasksRunning--
if s.tasksRunning == 0 {
select {
case <-s.Shutdown:
close(s.Done)
default:
}
}
}
log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
}

View file

@ -4,6 +4,8 @@ import (
"bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/rapl"
"bitbucket.org/sunybingcloud/electron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/electron/utilities/offerUtils"
"errors"
"fmt"
"github.com/golang/protobuf/proto"
@ -13,7 +15,6 @@ import (
"log"
"math"
"os"
"strings"
"sync"
"time"
)
@ -217,7 +218,7 @@ func (s *BinPackedPistonCapper) ResourceOffers(driver sched.SchedulerDriver, off
// retrieving the total power for each host in the offers
for _, offer := range offers {
if _, ok := s.totalPower[*offer.Hostname]; !ok {
_, _, offer_watts := OfferAgg(offer)
_, _, offer_watts := offerUtils.OfferAgg(offer)
s.totalPower[*offer.Hostname] = offer_watts
}
}
@ -238,7 +239,7 @@ func (s *BinPackedPistonCapper) ResourceOffers(driver sched.SchedulerDriver, off
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, longFilter)
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
@ -246,8 +247,8 @@ func (s *BinPackedPistonCapper) ResourceOffers(driver sched.SchedulerDriver, off
}
fitTasks := []*mesos.TaskInfo{}
offerCPU, offerRAM, offerWatts := OfferAgg(offer)
taken := false
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
offerTaken := false
totalWatts := 0.0
totalCPU := 0.0
totalRAM := 0.0
@ -256,13 +257,8 @@ func (s *BinPackedPistonCapper) ResourceOffers(driver sched.SchedulerDriver, off
partialLoad := 0.0
for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i]
// Check host if it exists
if task.Host != "" {
// Don't take offer if it doens't match our task's host requirement.
if !strings.HasPrefix(*offer.Hostname, task.Host) {
continue
}
}
// Don't take offer if it doesn't match our task's host requirement
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {continue}
for *task.Instances > 0 {
// Does the task fit
@ -274,7 +270,7 @@ func (s *BinPackedPistonCapper) ResourceOffers(driver sched.SchedulerDriver, off
s.startCapping()
}
taken = true
offerTaken = true
totalWatts += task.Watts
totalCPU += task.CPU
totalRAM += task.RAM
@ -303,20 +299,20 @@ func (s *BinPackedPistonCapper) ResourceOffers(driver sched.SchedulerDriver, off
}
}
if taken {
if offerTaken {
// Updating the cap value for offer.Hostname
bpPistonMutex.Lock()
bpPistonCapValues[*offer.Hostname] += partialLoad
bpPistonMutex.Unlock()
log.Printf("Starting on [%s]\n", offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, fitTasks, defaultFilter)
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, fitTasks, mesosUtils.DefaultFilter)
} else {
// If there was no match for task
log.Println("There is not enough resources to launch task: ")
cpus, mem, watts := OfferAgg(offer)
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, defaultFilter)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
}

View file

@ -2,6 +2,8 @@ package schedulers
import (
"bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/electron/utilities/offerUtils"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
@ -10,21 +12,19 @@ import (
"log"
"os"
"sort"
"strings"
"time"
)
// Decides if to take an offer or not
func (*BinPackSortedWatts) takeOffer(offer *mesos.Offer, task def.Task) bool {
cpus, mem, watts := OfferAgg(offer)
func (s *BinPackSortedWatts) takeOffer(offer *mesos.Offer, totalCPU, totalRAM, totalWatts float64, task def.Task) bool {
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
//TODO: Insert watts calculation here instead of taking them as a parameter
if cpus >= task.CPU && mem >= task.RAM && watts >= task.Watts {
if (s.ignoreWatts || (offerWatts >= (totalWatts + task.Watts))) &&
(offerCPU >= (totalCPU + task.CPU)) &&
(offerRAM >= (totalRAM + task.RAM)) {
return true
}
return false
}
@ -130,7 +130,7 @@ func (s *BinPackSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offers
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, longFilter)
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
@ -139,30 +139,23 @@ func (s *BinPackSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offers
tasks := []*mesos.TaskInfo{}
offer_cpu, offer_ram, offer_watts := OfferAgg(offer)
taken := false
offerTaken := false
totalWatts := 0.0
totalCPU := 0.0
totalRAM := 0.0
for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i]
// Check host if it exists
if task.Host != "" {
// Don't take offer if it doesn't match our task's host requirement
if !strings.HasPrefix(*offer.Hostname, task.Host) {
continue
}
// Don't take offer if it doesn't match our task's host requirement
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
for *task.Instances > 0 {
// Does the task fit
if (s.ignoreWatts || offer_watts >= (totalWatts+task.Watts)) &&
(offer_cpu >= (totalCPU + task.CPU)) &&
(offer_ram >= (totalRAM + task.RAM)) {
if s.takeOffer(offer, totalCPU, totalRAM, totalWatts, task) {
taken = true
offerTaken = true
totalWatts += task.Watts
totalCPU += task.CPU
totalRAM += task.RAM
@ -190,17 +183,17 @@ func (s *BinPackSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offers
}
}
if taken {
if offerTaken {
log.Printf("Starting on [%s]\n", offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter)
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
} else {
// If there was no match for the task
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := OfferAgg(offer)
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, defaultFilter)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
}

340
schedulers/bottomHeavy.go Normal file
View file

@ -0,0 +1,340 @@
package schedulers
import (
"bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/electron/utilities/offerUtils"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler"
"log"
"math"
"os"
"sort"
"time"
)
/*
Tasks are categorized into small and large tasks based on the watts requirement.
All the small tasks are packed into offers from agents belonging to power class C, using BinPacking.
All the large tasks are spread among the offers from agents belonging to power class A and power class B, using FirstFit.
BinPacking has the most effect when co-scheduling of tasks is increased. Large tasks typically utilize more resources and hence,
co-scheduling them has a great impact on the total power utilization.
*/
func (s *BottomHeavy) takeOffer(offer *mesos.Offer, totalCPU, totalRAM, totalWatts,
wattsToConsider float64, task def.Task) bool {
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
//TODO: Insert watts calculation here instead of taking them as a parameter
if (s.ignoreWatts || (offerWatts >= (totalWatts + wattsToConsider))) &&
(offerCPU >= (totalCPU + task.CPU)) &&
(offerRAM >= (totalRAM + task.RAM)) {
return true
}
return false
}
// electronScheduler implements the Scheduler interface
type BottomHeavy struct {
base // Type embedded to inherit common functions
tasksCreated int
tasksRunning int
tasks []def.Task
metrics map[string]def.Metric
running map[string]map[string]bool
ignoreWatts bool
smallTasks, largeTasks []def.Task
// First set of PCP values are garbage values, signal to logger to start recording when we're
// about to schedule a new task
RecordPCP bool
// This channel is closed when the program receives an interrupt,
// signalling that the program should shut down.
Shutdown chan struct{}
// This channel is closed after shutdown is closed, and only when all
// outstanding tasks have been cleaned up
Done chan struct{}
// Controls when to shutdown pcp logging
PCPLog chan struct{}
schedTrace *log.Logger
}
// New electron scheduler
func NewBottomHeavy(tasks []def.Task, ignoreWatts bool, schedTracePrefix string) *BottomHeavy {
sort.Sort(def.WattsSorter(tasks))
logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log")
if err != nil {
log.Fatal(err)
}
// Separating small tasks from large tasks.
// Classification done based on MMPU watts requirements.
mid := int(math.Floor((float64(len(tasks)) / 2.0) + 0.5))
s := &BottomHeavy{
smallTasks: tasks[:mid],
largeTasks: tasks[mid+1:],
ignoreWatts: ignoreWatts,
Shutdown: make(chan struct{}),
Done: make(chan struct{}),
PCPLog: make(chan struct{}),
running: make(map[string]map[string]bool),
RecordPCP: false,
schedTrace: log.New(logFile, "", log.LstdFlags),
}
return s
}
func (s *BottomHeavy) newTask(offer *mesos.Offer, task def.Task, newTaskClass string) *mesos.TaskInfo {
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
s.tasksCreated++
if !s.RecordPCP {
// Turn on logging
s.RecordPCP = true
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
}
// If this is our first time running into this Agent
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
}
// Add task to list of tasks running on node
s.running[offer.GetSlaveId().GoString()][taskName] = true
resources := []*mesos.Resource{
mesosutil.NewScalarResource("cpus", task.CPU),
mesosutil.NewScalarResource("mem", task.RAM),
}
if !s.ignoreWatts {
resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[newTaskClass]))
}
return &mesos.TaskInfo{
Name: proto.String(taskName),
TaskId: &mesos.TaskID{
Value: proto.String("electron-" + taskName),
},
SlaveId: offer.SlaveId,
Resources: resources,
Command: &mesos.CommandInfo{
Value: proto.String(task.CMD),
},
Container: &mesos.ContainerInfo{
Type: mesos.ContainerInfo_DOCKER.Enum(),
Docker: &mesos.ContainerInfo_DockerInfo{
Image: proto.String(task.Image),
Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated
},
},
}
}
// Shut down scheduler if no more tasks to schedule
func (s *BottomHeavy) shutDownIfNecessary() {
if len(s.smallTasks) <= 0 && len(s.largeTasks) <= 0 {
log.Println("Done scheduling all tasks")
close(s.Shutdown)
}
}
// create TaskInfo and log scheduling trace
func (s *BottomHeavy) createTaskInfoAndLogSchedTrace(offer *mesos.Offer,
powerClass string, task def.Task) *mesos.TaskInfo {
log.Println("Co-Located with:")
coLocated(s.running[offer.GetSlaveId().GoString()])
taskToSchedule := s.newTask(offer, task, powerClass)
fmt.Println("Inst: ", *task.Instances)
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
*task.Instances--
return taskToSchedule
}
// Using BinPacking to pack small tasks into this offer.
func (s *BottomHeavy) pack(offers []*mesos.Offer, driver sched.SchedulerDriver) {
for _, offer := range offers {
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
default:
}
tasks := []*mesos.TaskInfo{}
totalWatts := 0.0
totalCPU := 0.0
totalRAM := 0.0
offerTaken := false
for i := 0; i < len(s.largeTasks); i++ {
task := s.largeTasks[i]
for *task.Instances > 0 {
powerClass := offerUtils.PowerClass(offer)
// Does the task fit
// OR lazy evaluation. If ignore watts is set to true, second statement won't
// be evaluated.
wattsToConsider := task.Watts
if !s.ignoreWatts {
wattsToConsider = task.ClassToWatts[powerClass]
}
if s.takeOffer(offer, totalCPU, totalRAM, totalWatts, wattsToConsider, task) {
offerTaken = true
totalWatts += wattsToConsider
totalCPU += task.CPU
totalRAM += task.RAM
tasks = append(tasks, s.createTaskInfoAndLogSchedTrace(offer, powerClass, task))
if *task.Instances <= 0 {
// All instances of task have been scheduled, remove it
s.largeTasks = append(s.largeTasks[:i], s.largeTasks[i+1:]...)
s.shutDownIfNecessary()
}
} else {
break // Continue on to next task
}
}
}
if offerTaken {
log.Printf("Starting on [%s]\n", offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
} else {
// If there was no match for the task
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
}
// Using first fit to spread large tasks into these offers.
func (s *BottomHeavy) spread(offers []*mesos.Offer, driver sched.SchedulerDriver) {
for _, offer := range offers {
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
default:
}
tasks := []*mesos.TaskInfo{}
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
taken := false
for i := 0; i < len(s.smallTasks); i++ {
task := s.smallTasks[i]
powerClass := offerUtils.PowerClass(offer)
// Decision to take the offer or not
wattsToConsider := task.Watts
if !s.ignoreWatts {
wattsToConsider = task.ClassToWatts[powerClass]
}
if (s.ignoreWatts || (offerWatts >= wattsToConsider)) &&
(offerCPU >= task.CPU) && (offerRAM >= task.RAM) {
taken = true
tasks = append(tasks, s.createTaskInfoAndLogSchedTrace(offer, powerClass, task))
log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
if *task.Instances <= 0 {
// All instances of task have been scheduled, remove it
s.smallTasks = append(s.smallTasks[:i], s.smallTasks[i+1:]...)
s.shutDownIfNecessary()
}
break // Offer taken, move on
}
}
if !taken {
// If there was no match for the task
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
}
func (s *BottomHeavy) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {
log.Printf("Received %d resource offers", len(offers))
// We need to separate the offers into
// offers from ClassA and ClassB and offers from ClassC.
// Nodes in ClassA and ClassB will be packed with the large tasks.
// Small tasks will be spread out among the nodes in ClassC.
offersClassAB := []*mesos.Offer{}
offersClassC := []*mesos.Offer{}
for _, offer := range offers {
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
default:
}
if constants.PowerClasses["ClassA"][*offer.Hostname] ||
constants.PowerClasses["ClassB"][*offer.Hostname] {
offersClassAB = append(offersClassAB, offer)
} else if constants.PowerClasses["ClassC"][*offer.Hostname] {
offersClassC = append(offersClassC, offer)
}
}
log.Println("Packing Large tasks into ClassAB offers:")
for _, o := range offersClassAB {
log.Println(*o.Hostname)
}
// Packing tasks into offersClassAB
s.pack(offersClassAB, driver)
log.Println("Spreading Small tasks among ClassC offers:")
for _, o := range offersClassC {
log.Println(*o.Hostname)
}
// Spreading tasks among offersClassC
s.spread(offersClassC, driver)
}
func (s *BottomHeavy) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) {
log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
if *status.State == mesos.TaskState_TASK_RUNNING {
s.tasksRunning++
} else if IsTerminal(status.State) {
delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value)
s.tasksRunning--
if s.tasksRunning == 0 {
select {
case <-s.Shutdown:
close(s.Done)
default:
}
}
}
log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
}

View file

@ -2,6 +2,8 @@ package schedulers
import (
"bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/electron/utilities/offerUtils"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
@ -10,21 +12,19 @@ import (
"log"
"os"
"sort"
"strings"
"time"
)
// Decides if to take an offer or not
func (*BPMaxMinWatts) takeOffer(offer *mesos.Offer, task def.Task) bool {
cpus, mem, watts := OfferAgg(offer)
func (s *BPMaxMinWatts) takeOffer(offer *mesos.Offer, totalCPU, totalRAM, totalWatts float64, task def.Task) bool {
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
//TODO: Insert watts calculation here instead of taking them as a parameter
if cpus >= task.CPU && mem >= task.RAM && watts >= task.Watts {
if (s.ignoreWatts || (offerWatts >= (totalWatts + task.Watts))) &&
(offerCPU >= (totalCPU + task.CPU)) &&
(offerRAM >= (totalRAM + task.RAM)) {
return true
}
return false
}
@ -133,12 +133,8 @@ func (s *BPMaxMinWatts) CheckFit(i int,
totalRAM *float64,
totalWatts *float64) (bool, *mesos.TaskInfo) {
offerCPU, offerRAM, offerWatts := OfferAgg(offer)
// Does the task fit
if (s.ignoreWatts || (offerWatts >= (*totalWatts + task.Watts))) &&
(offerCPU >= (*totalCPU + task.CPU)) &&
(offerRAM >= (*totalRAM + task.RAM)) {
if s.takeOffer(offer, *totalCPU, *totalRAM, *totalWatts, task) {
*totalWatts += task.Watts
*totalCPU += task.CPU
@ -175,7 +171,7 @@ func (s *BPMaxMinWatts) ResourceOffers(driver sched.SchedulerDriver, offers []*m
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, longFilter)
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
@ -196,12 +192,9 @@ func (s *BPMaxMinWatts) ResourceOffers(driver sched.SchedulerDriver, offers []*m
for i := len(s.tasks) - 1; i >= 0; i-- {
task := s.tasks[i]
// Check host if it exists
if task.Host != "" {
// Don't take offer if it doesn't match our task's host requirement
if !strings.HasPrefix(*offer.Hostname, task.Host) {
continue
}
// Don't take offer if it doesn't match our task's host requirement
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
// TODO: Fix this so index doesn't need to be passed
@ -217,12 +210,9 @@ func (s *BPMaxMinWatts) ResourceOffers(driver sched.SchedulerDriver, offers []*m
// Pack the rest of the offer with the smallest tasks
for i, task := range s.tasks {
// Check host if it exists
if task.Host != "" {
// Don't take offer if it doesn't match our task's host requirement
if !strings.HasPrefix(*offer.Hostname, task.Host) {
continue
}
// Don't take offer if it doesn't match our task's host requirement
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
for *task.Instances > 0 {
@ -240,15 +230,15 @@ func (s *BPMaxMinWatts) ResourceOffers(driver sched.SchedulerDriver, offers []*m
if offerTaken {
log.Printf("Starting on [%s]\n", offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter)
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
} else {
// If there was no match for the task
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := OfferAgg(offer)
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, defaultFilter)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
}

View file

@ -4,6 +4,8 @@ import (
"bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/rapl"
"bitbucket.org/sunybingcloud/electron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/electron/utilities/offerUtils"
"errors"
"fmt"
"github.com/golang/protobuf/proto"
@ -14,22 +16,21 @@ import (
"math"
"os"
"sort"
"strings"
"sync"
"time"
)
// Decides if to take an offer or not
func (s *BPMaxMinPistonCapping) takeOffer(offer *mesos.Offer, task def.Task) bool {
cpus, mem, watts := OfferAgg(offer)
func (s *BPMaxMinPistonCapping) takeOffer(offer *mesos.Offer, totalCPU, totalRAM, totalWatts float64, task def.Task) bool {
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
//TODO: Insert watts calculation here instead of taking them as a parameter
if cpus >= task.CPU && mem >= task.RAM && watts >= task.Watts {
// Does the task fit
if (s.ignoreWatts || (offerWatts >= (*totalWatts + task.Watts))) &&
(offerCPU >= (*totalCPU + task.CPU)) &&
(offerRAM >= (*totalRAM + task.RAM)) {
return true
}
return false
}
@ -222,12 +223,8 @@ func (s *BPMaxMinPistonCapping) CheckFit(i int,
totalWatts *float64,
partialLoad *float64) (bool, *mesos.TaskInfo) {
offerCPU, offerRAM, offerWatts := OfferAgg(offer)
// Does the task fit
if (s.ignoreWatts || (offerWatts >= (*totalWatts + task.Watts))) &&
(offerCPU >= (*totalCPU + task.CPU)) &&
(offerRAM >= (*totalRAM + task.RAM)) {
if s.takeOffer(offer, *totalCPU, *totalRAM, *totalWatts, task) {
// Start piston capping if haven't started yet
if !s.isCapping {
@ -271,7 +268,7 @@ func (s *BPMaxMinPistonCapping) ResourceOffers(driver sched.SchedulerDriver, off
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, longFilter)
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
@ -295,12 +292,9 @@ func (s *BPMaxMinPistonCapping) ResourceOffers(driver sched.SchedulerDriver, off
for i := len(s.tasks) - 1; i >= 0; i-- {
task := s.tasks[i]
// Check host if it exists
if task.Host != "" {
// Don't take offer if it doesn't match our task's host requirement
if !strings.HasPrefix(*offer.Hostname, task.Host) {
continue
}
// Don't take offer if it doesn't match our task's host requirement
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
// TODO: Fix this so index doesn't need to be passed
@ -316,12 +310,9 @@ func (s *BPMaxMinPistonCapping) ResourceOffers(driver sched.SchedulerDriver, off
// Pack the rest of the offer with the smallest tasks
for i, task := range s.tasks {
// Check host if it exists
if task.Host != "" {
// Don't take offer if it doesn't match our task's host requirement
if !strings.HasPrefix(*offer.Hostname, task.Host) {
continue
}
// Don't take offer if it doesn't match our task's host requirement
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
for *task.Instances > 0 {
@ -343,15 +334,15 @@ func (s *BPMaxMinPistonCapping) ResourceOffers(driver sched.SchedulerDriver, off
bpMaxMinPistonCappingCapValues[*offer.Hostname] += partialLoad
bpMaxMinPistonCappingMutex.Unlock()
log.Printf("Starting on [%s]\n", offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter)
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
} else {
// If there was no match for the task
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := OfferAgg(offer)
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, defaultFilter)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
}

View file

@ -3,8 +3,10 @@ package schedulers
import (
"bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/pcp"
powCap "bitbucket.org/sunybingcloud/electron/powerCapping"
"bitbucket.org/sunybingcloud/electron/rapl"
"bitbucket.org/sunybingcloud/electron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/electron/utilities/offerUtils"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
@ -14,21 +16,21 @@ import (
"math"
"os"
"sort"
"strings"
"sync"
"time"
)
// Decides if to take an offer or not
func (s *BPMaxMinProacCC) takeOffer(offer *mesos.Offer, task def.Task) bool {
cpus, mem, watts := OfferAgg(offer)
func (s *BPMaxMinProacCC) takeOffer(offer *mesos.Offer, totalCPU, totalRAM, totalWatts float64, task def.Task) bool {
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
//TODO: Insert watts calculation here instead of taking them as a parameter
if cpus >= task.CPU && mem >= task.RAM && watts >= task.Watts {
// Does the task fit
if (s.ignoreWatts || (offerWatts >= (*totalWatts + task.Watts))) &&
(offerCPU >= (*totalCPU + task.CPU)) &&
(offerRAM >= (*totalRAM + task.RAM)) {
return true
}
return false
}
@ -43,7 +45,7 @@ type BPMaxMinProacCC struct {
availablePower map[string]float64
totalPower map[string]float64
ignoreWatts bool
capper *pcp.ClusterwideCapper
capper *powCap.ClusterwideCapper
ticker *time.Ticker
recapTicker *time.Ticker
isCapping bool // indicate whether we are currently performing cluster-wide capping.
@ -86,7 +88,7 @@ func NewBPMaxMinProacCC(tasks []def.Task, ignoreWatts bool, schedTracePrefix str
availablePower: make(map[string]float64),
totalPower: make(map[string]float64),
RecordPCP: false,
capper: pcp.GetClusterwideCapperInstance(),
capper: powCap.GetClusterwideCapperInstance(),
ticker: time.NewTicker(10 * time.Second),
recapTicker: time.NewTicker(20 * time.Second),
isCapping: false,
@ -246,12 +248,8 @@ func (s *BPMaxMinProacCC) CheckFit(i int,
totalRAM *float64,
totalWatts *float64) (bool, *mesos.TaskInfo) {
offerCPU, offerRAM, offerWatts := OfferAgg(offer)
// Does the task fit
if (s.ignoreWatts || (offerWatts >= (*totalWatts + task.Watts))) &&
(offerCPU >= (*totalCPU + task.CPU)) &&
(offerRAM >= (*totalRAM + task.RAM)) {
if s.takeOffer(offer, *totalCPU, *totalRAM, *totalWatts, task) {
// Capping the cluster if haven't yet started
if !s.isCapping {
@ -308,7 +306,7 @@ func (s *BPMaxMinProacCC) ResourceOffers(driver sched.SchedulerDriver, offers []
// retrieving the available power for all the hosts in the offers.
for _, offer := range offers {
_, _, offerWatts := OfferAgg(offer)
_, _, offerWatts := offerUtils.OfferAgg(offer)
s.availablePower[*offer.Hostname] = offerWatts
// setting total power if the first time
if _, ok := s.totalPower[*offer.Hostname]; !ok {
@ -324,7 +322,7 @@ func (s *BPMaxMinProacCC) ResourceOffers(driver sched.SchedulerDriver, offers []
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, longFilter)
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
@ -345,12 +343,9 @@ func (s *BPMaxMinProacCC) ResourceOffers(driver sched.SchedulerDriver, offers []
for i := len(s.tasks) - 1; i >= 0; i-- {
task := s.tasks[i]
// Check host if it exists
if task.Host != "" {
// Don't take offer if it doesn't match our task's host requirement
if !strings.HasPrefix(*offer.Hostname, task.Host) {
continue
}
// Don't take offer if it doesn't match our task's host requirement
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
// TODO: Fix this so index doesn't need to be passed
@ -366,12 +361,9 @@ func (s *BPMaxMinProacCC) ResourceOffers(driver sched.SchedulerDriver, offers []
// Pack the rest of the offer with the smallest tasks
for i, task := range s.tasks {
// Check host if it exists
if task.Host != "" {
// Don't take offer if it doesn't match our task's host requirement
if !strings.HasPrefix(*offer.Hostname, task.Host) {
continue
}
// Don't take offer if it doesn't match our task's host requirement
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
for *task.Instances > 0 {
@ -389,15 +381,15 @@ func (s *BPMaxMinProacCC) ResourceOffers(driver sched.SchedulerDriver, offers []
if offerTaken {
log.Printf("Starting on [%s]\n", offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter)
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
} else {
// If there was no match for the task
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := OfferAgg(offer)
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, defaultFilter)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
}

View file

@ -2,6 +2,8 @@ package schedulers
import (
"bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/electron/utilities/offerUtils"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
@ -10,21 +12,20 @@ import (
"log"
"os"
"sort"
"strings"
"time"
)
// Decides if to take an offer or not
func (*BPSWClassMapWatts) takeOffer(offer *mesos.Offer, task def.Task) bool {
cpus, mem, watts := OfferAgg(offer)
func (s *BPSWClassMapWatts) takeOffer(offer *mesos.Offer, totalCPU, totalRAM,
totalWatts float64, powerClass string, task def.Task) bool {
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
//TODO: Insert watts calculation here instead of taking them as a parameter
if cpus >= task.CPU && mem >= task.RAM && watts >= task.Watts {
if (s.ignoreWatts || (offerWatts >= (totalWatts + task.ClassToWatts[powerClass]))) &&
(offerCPU >= (totalCPU + task.CPU)) &&
(offerRAM >= (totalRAM + task.RAM)) {
return true
}
return false
}
@ -76,7 +77,7 @@ func NewBPSWClassMapWatts(tasks []def.Task, ignoreWatts bool, schedTracePrefix s
return s
}
func (s *BPSWClassMapWatts) newTask(offer *mesos.Offer, task def.Task, newTaskClass string) *mesos.TaskInfo {
func (s *BPSWClassMapWatts) newTask(offer *mesos.Offer, task def.Task, powerClass string) *mesos.TaskInfo {
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
s.tasksCreated++
@ -100,7 +101,7 @@ func (s *BPSWClassMapWatts) newTask(offer *mesos.Offer, task def.Task, newTaskCl
}
if !s.ignoreWatts {
resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[newTaskClass]))
resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[powerClass]))
}
return &mesos.TaskInfo{
@ -130,7 +131,7 @@ func (s *BPSWClassMapWatts) ResourceOffers(driver sched.SchedulerDriver, offers
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, longFilter)
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
@ -139,45 +140,33 @@ func (s *BPSWClassMapWatts) ResourceOffers(driver sched.SchedulerDriver, offers
tasks := []*mesos.TaskInfo{}
offerCPU, offerRAM, offerWatts := OfferAgg(offer)
taken := false
offerTaken := false
totalWatts := 0.0
totalCPU := 0.0
totalRAM := 0.0
for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i]
// Check host if it exists
if task.Host != "" {
// Don't take offer if it doesn't match our task's host requirement
if !strings.HasPrefix(*offer.Hostname, task.Host) {
continue
}
// Don't take offer if it doesn't match our task's host requirement
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
for *task.Instances > 0 {
var nodeClass string
for _, attr := range offer.GetAttributes() {
if attr.GetName() == "class" {
nodeClass = attr.GetText().GetValue()
}
}
powerClass := offerUtils.PowerClass(offer)
// Does the task fit
// OR lazy evaluation. If ignore watts is set to true, second statement won't
// be evaluated.
if (s.ignoreWatts || (offerWatts >= (totalWatts + task.ClassToWatts[nodeClass]))) &&
(offerCPU >= (totalCPU + task.CPU)) &&
(offerRAM >= (totalRAM + task.RAM)) {
if s.takeOffer(offer, totalCPU, totalRAM, totalWatts, powerClass, task) {
fmt.Println("Watts being used: ", task.ClassToWatts[nodeClass])
taken = true
totalWatts += task.ClassToWatts[nodeClass]
fmt.Println("Watts being used: ", task.ClassToWatts[powerClass])
offerTaken = true
totalWatts += task.ClassToWatts[powerClass]
totalCPU += task.CPU
totalRAM += task.RAM
log.Println("Co-Located with: ")
coLocated(s.running[offer.GetSlaveId().GoString()])
taskToSchedule := s.newTask(offer, task, nodeClass)
taskToSchedule := s.newTask(offer, task, powerClass)
tasks = append(tasks, taskToSchedule)
fmt.Println("Inst: ", *task.Instances)
@ -199,17 +188,17 @@ func (s *BPSWClassMapWatts) ResourceOffers(driver sched.SchedulerDriver, offers
}
}
if taken {
if offerTaken {
log.Printf("Starting on [%s]\n", offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter)
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
} else {
// If there was no match for the task
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := OfferAgg(offer)
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, defaultFilter)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
}

View file

@ -4,6 +4,8 @@ import (
"bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/rapl"
"bitbucket.org/sunybingcloud/electron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/electron/utilities/offerUtils"
"errors"
"fmt"
"github.com/golang/protobuf/proto"
@ -14,21 +16,21 @@ import (
"math"
"os"
"sort"
"strings"
"sync"
"time"
)
// Decides if to take offer or not
func (s *BPSWClassMapWattsPistonCapping) takeOffer(offer *mesos.Offer, task def.Task) bool {
cpus, mem, watts := OfferAgg(offer)
// Decides if to take an offer or not
func (s *BPSWClassMapWattsPistonCapping) takeOffer(offer *mesos.Offer, totalCPU, totalRAM,
totalWatts float64, powerClass string, task def.Task) bool {
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
//TODO: Insert watts calculation here instead of taking them as a parameter
if cpus >= task.CPU && mem >= task.RAM && watts >= task.Watts {
if (s.ignoreWatts || (offerWatts >= (totalWatts + task.ClassToWatts[powerClass]))) &&
(offerCPU >= (totalCPU + task.CPU)) &&
(offerRAM >= (totalRAM + task.RAM)) {
return true
}
return false
}
@ -89,7 +91,7 @@ func NewBPSWClassMapWattsPistonCapping(tasks []def.Task, ignoreWatts bool, sched
return s
}
func (s *BPSWClassMapWattsPistonCapping) newTask(offer *mesos.Offer, task def.Task, newTaskClass string) *mesos.TaskInfo {
func (s *BPSWClassMapWattsPistonCapping) newTask(offer *mesos.Offer, task def.Task, powerClass string) *mesos.TaskInfo {
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
s.tasksCreated++
@ -123,7 +125,7 @@ func (s *BPSWClassMapWattsPistonCapping) newTask(offer *mesos.Offer, task def.Ta
}
if !s.ignoreWatts {
resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[newTaskClass]))
resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[powerClass]))
}
return &mesos.TaskInfo{
@ -215,7 +217,7 @@ func (s *BPSWClassMapWattsPistonCapping) ResourceOffers(driver sched.SchedulerDr
// retrieving the total power for each host in the offers.
for _, offer := range offers {
if _, ok := s.totalPower[*offer.Hostname]; !ok {
_, _, offerWatts := OfferAgg(offer)
_, _, offerWatts := offerUtils.OfferAgg(offer)
s.totalPower[*offer.Hostname] = offerWatts
}
}
@ -229,7 +231,7 @@ func (s *BPSWClassMapWattsPistonCapping) ResourceOffers(driver sched.SchedulerDr
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, longFilter)
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
@ -238,9 +240,7 @@ func (s *BPSWClassMapWattsPistonCapping) ResourceOffers(driver sched.SchedulerDr
tasks := []*mesos.TaskInfo{}
offerCPU, offerRAM, offerWatts := OfferAgg(offer)
taken := false
offerTaken := false
totalWatts := 0.0
totalCPU := 0.0
totalRAM := 0.0
@ -249,27 +249,17 @@ func (s *BPSWClassMapWattsPistonCapping) ResourceOffers(driver sched.SchedulerDr
partialLoad := 0.0
for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i]
// Check host if it exists
if task.Host != "" {
// Don't take offer if it doesn't match our task's host requirement
if !strings.HasPrefix(*offer.Hostname, task.Host) {
continue
}
// Don't take offer if it doesn't match our task's host requirement
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
for *task.Instances > 0 {
var nodeClass string
for _, attr := range offer.GetAttributes() {
if attr.GetName() == "class" {
nodeClass = attr.GetText().GetValue()
}
}
powerClass := offerUtils.PowerClass(offer)
// Does the task fit
// OR lazy evaluation. If ignoreWatts is set to true, second statement won't
// be evaluated
if (s.ignoreWatts || (offerWatts >= (totalWatts + task.ClassToWatts[nodeClass]))) &&
(offerCPU >= (totalCPU + task.CPU)) &&
(offerRAM >= (totalRAM + task.RAM)) {
if s.takeOffer(offer, totalCPU, totalRAM, totalWatts, powerClass, task) {
// Start piston capping if haven't started yet
if !s.isCapping {
@ -277,14 +267,14 @@ func (s *BPSWClassMapWattsPistonCapping) ResourceOffers(driver sched.SchedulerDr
s.startCapping()
}
fmt.Println("Watts being used: ", task.ClassToWatts[nodeClass])
taken = true
totalWatts += task.ClassToWatts[nodeClass]
fmt.Println("Watts being used: ", task.ClassToWatts[powerClass])
offerTaken = true
totalWatts += task.ClassToWatts[powerClass]
totalCPU += task.CPU
totalRAM += task.RAM
log.Println("Co-Located with: ")
coLocated(s.running[offer.GetSlaveId().GoString()])
taskToSchedule := s.newTask(offer, task, nodeClass)
taskToSchedule := s.newTask(offer, task, powerClass)
tasks = append(tasks, taskToSchedule)
fmt.Println("Inst: ", *task.Instances)
@ -306,20 +296,20 @@ func (s *BPSWClassMapWattsPistonCapping) ResourceOffers(driver sched.SchedulerDr
}
}
if taken {
if offerTaken {
// Updating the cap value for offer.Hostname
bpswClassMapWattsPistonMutex.Lock()
bpswClassMapWattsPistonCapValues[*offer.Hostname] += partialLoad
bpswClassMapWattsPistonMutex.Unlock()
log.Printf("Starting on [%s]\n", offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter)
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
} else {
// If there was no match for task
log.Println("There is not enough resources to launch task: ")
cpus, mem, watts := OfferAgg(offer)
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, defaultFilter)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
}

View file

@ -3,8 +3,10 @@ package schedulers
import (
"bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/pcp"
powCap "bitbucket.org/sunybingcloud/electron/powerCapping"
"bitbucket.org/sunybingcloud/electron/rapl"
"bitbucket.org/sunybingcloud/electron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/electron/utilities/offerUtils"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
@ -14,21 +16,21 @@ import (
"math"
"os"
"sort"
"strings"
"sync"
"time"
)
// Decides if to take an offer or not
func (*BPSWClassMapWattsProacCC) takeOffer(offer *mesos.Offer, task def.Task) bool {
cpus, mem, watts := OfferAgg(offer)
func (s *BPSWClassMapWattsProacCC) takeOffer(offer *mesos.Offer, totalCPU, totalRAM,
totalWatts float64, powerClass string, task def.Task) bool {
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
// TODO: Insert watts calculation here instead of taking them as parameter
if cpus >= task.CPU && mem >= task.RAM && watts >= task.Watts {
//TODO: Insert watts calculation here instead of taking them as a parameter
if (s.ignoreWatts || (offerWatts >= (totalWatts + task.ClassToWatts[powerClass]))) &&
(offerCPU >= (totalCPU + task.CPU)) &&
(offerRAM >= (totalRAM + task.RAM)) {
return true
}
return false
}
@ -43,7 +45,7 @@ type BPSWClassMapWattsProacCC struct {
availablePower map[string]float64
totalPower map[string]float64
ignoreWatts bool
capper *pcp.ClusterwideCapper
capper *powCap.ClusterwideCapper
ticker *time.Ticker
recapTicker *time.Ticker
isCapping bool // indicate whether we are currently performing cluster-wide capping.
@ -86,7 +88,7 @@ func NewBPSWClassMapWattsProacCC(tasks []def.Task, ignoreWatts bool, schedTraceP
availablePower: make(map[string]float64),
totalPower: make(map[string]float64),
RecordPCP: false,
capper: pcp.GetClusterwideCapperInstance(),
capper: powCap.GetClusterwideCapperInstance(),
ticker: time.NewTicker(10 * time.Second),
recapTicker: time.NewTicker(20 * time.Second),
isCapping: false,
@ -99,7 +101,7 @@ func NewBPSWClassMapWattsProacCC(tasks []def.Task, ignoreWatts bool, schedTraceP
// mutex
var bpswClassMapWattsProacCCMutex sync.Mutex
func (s *BPSWClassMapWattsProacCC) newTask(offer *mesos.Offer, task def.Task, newTaskClass string) *mesos.TaskInfo {
func (s *BPSWClassMapWattsProacCC) newTask(offer *mesos.Offer, task def.Task, powerClass string) *mesos.TaskInfo {
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
s.tasksCreated++
@ -131,7 +133,7 @@ func (s *BPSWClassMapWattsProacCC) newTask(offer *mesos.Offer, task def.Task, ne
}
if !s.ignoreWatts {
resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[newTaskClass]))
resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[powerClass]))
}
return &mesos.TaskInfo{
@ -251,7 +253,7 @@ func (s *BPSWClassMapWattsProacCC) ResourceOffers(driver sched.SchedulerDriver,
// retrieving the available power for all the hosts in the offers.
for _, offer := range offers {
_, _, offerWatts := OfferAgg(offer)
_, _, offerWatts := offerUtils.OfferAgg(offer)
s.availablePower[*offer.Hostname] = offerWatts
// setting total power if the first time
if _, ok := s.totalPower[*offer.Hostname]; !ok {
@ -267,7 +269,7 @@ func (s *BPSWClassMapWattsProacCC) ResourceOffers(driver sched.SchedulerDriver,
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, longFilter)
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
@ -276,35 +278,23 @@ func (s *BPSWClassMapWattsProacCC) ResourceOffers(driver sched.SchedulerDriver,
tasks := []*mesos.TaskInfo{}
offerCPU, offerRAM, offerWatts := OfferAgg(offer)
taken := false
offerTaken := false
totalWatts := 0.0
totalCPU := 0.0
totalRAM := 0.0
for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i]
// Check host if it exists
if task.Host != "" {
// Don't take offer it it doesn't match our task's host requirement.
if strings.HasPrefix(*offer.Hostname, task.Host) {
continue
}
// Don't take offer if it doesn't match our task's host requirement
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
for *task.Instances > 0 {
var nodeClass string
for _, attr := range offer.GetAttributes() {
if attr.GetName() == "class" {
nodeClass = attr.GetText().GetValue()
}
}
powerClass := offerUtils.PowerClass(offer)
// Does the task fit
// OR Lazy evaluation. If ignore watts is set to true, second statement won't
// be evaluated.
if (s.ignoreWatts || (offerWatts >= (totalWatts + task.ClassToWatts[nodeClass]))) &&
(offerCPU >= (totalCPU + task.CPU)) &&
(offerRAM >= (totalRAM + task.RAM)) {
if s.takeOffer(offer, totalCPU, totalRAM, totalWatts, powerClass, task) {
// Capping the cluster if haven't yet started
if !s.isCapping {
@ -314,7 +304,7 @@ func (s *BPSWClassMapWattsProacCC) ResourceOffers(driver sched.SchedulerDriver,
s.startCapping()
}
fmt.Println("Watts being used: ", task.ClassToWatts[nodeClass])
fmt.Println("Watts being used: ", task.ClassToWatts[powerClass])
tempCap, err := s.capper.FCFSDeterminedCap(s.totalPower, &task)
if err == nil {
bpswClassMapWattsProacCCMutex.Lock()
@ -324,13 +314,13 @@ func (s *BPSWClassMapWattsProacCC) ResourceOffers(driver sched.SchedulerDriver,
log.Println("Failed to determine new cluster-wide cap:")
log.Println(err)
}
taken = true
totalWatts += task.ClassToWatts[nodeClass]
offerTaken = true
totalWatts += task.ClassToWatts[powerClass]
totalCPU += task.CPU
totalRAM += task.RAM
log.Println("Co-Located with: ")
coLocated(s.running[offer.GetSlaveId().GoString()])
taskToSchedule := s.newTask(offer, task, nodeClass)
taskToSchedule := s.newTask(offer, task, powerClass)
tasks = append(tasks, taskToSchedule)
fmt.Println("Inst: ", *task.Instances)
@ -355,16 +345,16 @@ func (s *BPSWClassMapWattsProacCC) ResourceOffers(driver sched.SchedulerDriver,
}
}
if taken {
if offerTaken {
log.Printf("Starting on [%s]\n", offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter)
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
} else {
// If there was no match for the task
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := OfferAgg(offer)
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, defaultFilter)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
}

View file

@ -2,6 +2,8 @@ package schedulers
import (
"bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/electron/utilities/offerUtils"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
@ -9,14 +11,13 @@ import (
sched "github.com/mesos/mesos-go/scheduler"
"log"
"os"
"strings"
"time"
)
// Decides if to take an offer or not
func (s *FirstFit) takeOffer(offer *mesos.Offer, task def.Task) bool {
cpus, mem, watts := OfferAgg(offer)
cpus, mem, watts := offerUtils.OfferAgg(offer)
//TODO: Insert watts calculation here instead of taking them as a parameter
@ -129,7 +130,7 @@ func (s *FirstFit) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, longFilter)
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
@ -140,16 +141,13 @@ func (s *FirstFit) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.
// First fit strategy
taken := false
offerTaken := false
for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i]
// Check host if it exists
if task.Host != "" {
// Don't take offer if it doesn't match our task's host requirement
if !strings.HasPrefix(*offer.Hostname, task.Host) {
continue
}
// Don't take offer if it doesn't match our task's host requirement
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
// Decision to take the offer or not
@ -162,9 +160,9 @@ func (s *FirstFit) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.
tasks = append(tasks, taskToSchedule)
log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter)
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
taken = true
offerTaken = true
fmt.Println("Inst: ", *task.Instances)
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
@ -185,12 +183,12 @@ func (s *FirstFit) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.
}
// If there was no match for the task
if !taken {
if !offerTaken {
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := OfferAgg(offer)
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, defaultFilter)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}

View file

@ -0,0 +1,226 @@
package schedulers
import (
"bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/electron/utilities/offerUtils"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler"
"log"
"os"
"sort"
"time"
)
// Decides if to take an offer or not
func (s *FirstFitSortedOffers) takeOffer(offer *mesos.Offer, task def.Task) bool {
cpus, mem, watts := offerUtils.OfferAgg(offer)
//TODO: Insert watts calculation here instead of taking them as a parameter
if cpus >= task.CPU && mem >= task.RAM && (s.ignoreWatts || watts >= task.Watts) {
return true
}
return false
}
// electronScheduler implements the Scheduler interface
type FirstFitSortedOffers struct {
base // Type embedded to inherit common functions
tasksCreated int
tasksRunning int
tasks []def.Task
metrics map[string]def.Metric
running map[string]map[string]bool
ignoreWatts bool
// First set of PCP values are garbage values, signal to logger to start recording when we're
// about to schedule a new task
RecordPCP bool
// This channel is closed when the program receives an interrupt,
// signalling that the program should shut down.
Shutdown chan struct{}
// This channel is closed after shutdown is closed, and only when all
// outstanding tasks have been cleaned up
Done chan struct{}
// Controls when to shutdown pcp logging
PCPLog chan struct{}
schedTrace *log.Logger
}
// New electron scheduler
func NewFirstFitSortedOffers(tasks []def.Task, ignoreWatts bool, schedTracePrefix string) *FirstFitSortedOffers {
logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log")
if err != nil {
log.Fatal(err)
}
s := &FirstFitSortedOffers{
tasks: tasks,
ignoreWatts: ignoreWatts,
Shutdown: make(chan struct{}),
Done: make(chan struct{}),
PCPLog: make(chan struct{}),
running: make(map[string]map[string]bool),
RecordPCP: false,
schedTrace: log.New(logFile, "", log.LstdFlags),
}
return s
}
func (s *FirstFitSortedOffers) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
s.tasksCreated++
if !s.RecordPCP {
// Turn on logging
s.RecordPCP = true
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
}
// If this is our first time running into this Agent
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
}
// Add task to list of tasks running on node
s.running[offer.GetSlaveId().GoString()][taskName] = true
resources := []*mesos.Resource{
mesosutil.NewScalarResource("cpus", task.CPU),
mesosutil.NewScalarResource("mem", task.RAM),
}
if !s.ignoreWatts {
resources = append(resources, mesosutil.NewScalarResource("watts", task.Watts))
}
return &mesos.TaskInfo{
Name: proto.String(taskName),
TaskId: &mesos.TaskID{
Value: proto.String("electron-" + taskName),
},
SlaveId: offer.SlaveId,
Resources: resources,
Command: &mesos.CommandInfo{
Value: proto.String(task.CMD),
},
Container: &mesos.ContainerInfo{
Type: mesos.ContainerInfo_DOCKER.Enum(),
Docker: &mesos.ContainerInfo_DockerInfo{
Image: proto.String(task.Image),
Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated
},
},
}
}
func (s *FirstFitSortedOffers) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {
log.Printf("Received %d resource offers", len(offers))
// Sorting the offers
sort.Sort(offerUtils.OffersSorter(offers))
// Printing the sorted offers and the corresponding CPU resource availability
log.Println("Sorted Offers:")
for i := 0; i < len(offers); i++ {
offer := offers[i]
offerCPU, _, _ := offerUtils.OfferAgg(offer)
log.Printf("Offer[%s].CPU = %f\n", offer.GetHostname(), offerCPU)
}
for _, offer := range offers {
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
default:
}
tasks := []*mesos.TaskInfo{}
// First fit strategy
offerTaken := false
for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i]
// Don't take offer if it doesn't match our task's host requirement
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
// Decision to take the offer or not
if s.takeOffer(offer, task) {
log.Println("Co-Located with: ")
coLocated(s.running[offer.GetSlaveId().GoString()])
taskToSchedule := s.newTask(offer, task)
tasks = append(tasks, taskToSchedule)
log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
offerTaken = true
fmt.Println("Inst: ", *task.Instances)
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
*task.Instances--
if *task.Instances <= 0 {
// All instances of task have been scheduled, remove it
s.tasks[i] = s.tasks[len(s.tasks)-1]
s.tasks = s.tasks[:len(s.tasks)-1]
if len(s.tasks) <= 0 {
log.Println("Done scheduling all tasks")
close(s.Shutdown)
}
}
break // Offer taken, move on
}
}
// If there was no match for the task
if !offerTaken {
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
}
func (s *FirstFitSortedOffers) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) {
log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
if *status.State == mesos.TaskState_TASK_RUNNING {
s.tasksRunning++
} else if IsTerminal(status.State) {
delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value)
s.tasksRunning--
if s.tasksRunning == 0 {
select {
case <-s.Shutdown:
close(s.Done)
default:
}
}
}
log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
}

View file

@ -2,6 +2,8 @@ package schedulers
import (
"bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/electron/utilities/offerUtils"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
@ -10,10 +12,22 @@ import (
"log"
"os"
"sort"
"strings"
"time"
)
// Decides if to take an offer or not
func (s *FirstFitSortedWattsClassMapWatts) takeOffer(offer *mesos.Offer, powerClass string, task def.Task) bool {
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
//TODO: Insert watts calculation here instead of taking them as a parameter
// Decision to take the offer or not
if (s.ignoreWatts || (offerWatts >= task.ClassToWatts[powerClass])) &&
(offerCPU >= task.CPU) && (offerRAM >= task.RAM) {
return true
}
return false
}
// electron scheduler implements the Scheduler interface
type FirstFitSortedWattsClassMapWatts struct {
base // Type embedded to inherit common features.
@ -63,7 +77,7 @@ func NewFirstFitSortedWattsClassMapWatts(tasks []def.Task, ignoreWatts bool, sch
return s
}
func (s *FirstFitSortedWattsClassMapWatts) newTask(offer *mesos.Offer, task def.Task, newTaskClass string) *mesos.TaskInfo {
func (s *FirstFitSortedWattsClassMapWatts) newTask(offer *mesos.Offer, task def.Task, powerClass string) *mesos.TaskInfo {
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
s.tasksCreated++
@ -87,7 +101,7 @@ func (s *FirstFitSortedWattsClassMapWatts) newTask(offer *mesos.Offer, task def.
}
if !s.ignoreWatts {
resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[newTaskClass]))
resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[powerClass]))
}
return &mesos.TaskInfo{
@ -117,47 +131,37 @@ func (s *FirstFitSortedWattsClassMapWatts) ResourceOffers(driver sched.Scheduler
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, longFilter)
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
default:
}
offerCPU, offerRAM, offerWatts := OfferAgg(offer)
// First fit strategy
taken := false
offerTaken := false
for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i]
// Check host if it exists
if task.Host != "" {
// Don't take offer if it doens't match our task's host requirement.
if !strings.HasPrefix(*offer.Hostname, task.Host) {
continue
}
// Don't take offer if it doesn't match our task's host requirement
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
// retrieving the node class from the offer
var nodeClass string
for _, attr := range offer.GetAttributes() {
if attr.GetName() == "class" {
nodeClass = attr.GetText().GetValue()
}
}
// retrieving the powerClass from the offer
powerClass := offerUtils.PowerClass(offer)
// Decision to take the offer or not
if (s.ignoreWatts || (offerWatts >= task.ClassToWatts[nodeClass])) &&
(offerCPU >= task.CPU) && (offerRAM >= task.RAM) {
if s.takeOffer(offer, powerClass, task) {
fmt.Println("Watts being used: ", task.ClassToWatts[powerClass])
log.Println("Co-Located with: ")
coLocated(s.running[offer.GetSlaveId().GoString()])
taskToSchedule := s.newTask(offer, task, nodeClass)
taskToSchedule := s.newTask(offer, task, powerClass)
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, []*mesos.TaskInfo{taskToSchedule}, defaultFilter)
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, []*mesos.TaskInfo{taskToSchedule}, mesosUtils.DefaultFilter)
taken = true
offerTaken = true
fmt.Println("Inst: ", *task.Instances)
*task.Instances--
if *task.Instances <= 0 {
@ -174,12 +178,12 @@ func (s *FirstFitSortedWattsClassMapWatts) ResourceOffers(driver sched.Scheduler
}
// If there was no match for the task
if !taken {
if !offerTaken {
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := OfferAgg(offer)
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, defaultFilter)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}

View file

@ -3,8 +3,10 @@ package schedulers
import (
"bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/pcp"
powCap "bitbucket.org/sunybingcloud/electron/powerCapping"
"bitbucket.org/sunybingcloud/electron/rapl"
"bitbucket.org/sunybingcloud/electron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/electron/utilities/offerUtils"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
@ -14,11 +16,23 @@ import (
"math"
"os"
"sort"
"strings"
"sync"
"time"
)
// Decides if to take an offer or not
func (s *FirstFitSortedWattsClassMapWattsProacCC) takeOffer(offer *mesos.Offer, powerClass string, task def.Task) bool {
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
//TODO: Insert watts calculation here instead of taking them as a parameter
// Decision to take the offer or not
if (s.ignoreWatts || (offerWatts >= task.ClassToWatts[powerClass])) &&
(offerCPU >= task.CPU) && (offerRAM >= task.RAM) {
return true
}
return false
}
// electron scheduler implements the Scheduler interface
type FirstFitSortedWattsClassMapWattsProacCC struct {
base // Type embedded to inherit common features.
@ -31,7 +45,7 @@ type FirstFitSortedWattsClassMapWattsProacCC struct {
availablePower map[string]float64
totalPower map[string]float64
ignoreWatts bool
capper *pcp.ClusterwideCapper
capper *powCap.ClusterwideCapper
ticker *time.Ticker
recapTicker *time.Ticker
isCapping bool // indicate whether we are currently performing cluster-wide capping.
@ -74,7 +88,7 @@ func NewFirstFitSortedWattsClassMapWattsProacCC(tasks []def.Task, ignoreWatts bo
availablePower: make(map[string]float64),
totalPower: make(map[string]float64),
RecordPCP: false,
capper: pcp.GetClusterwideCapperInstance(),
capper: powCap.GetClusterwideCapperInstance(),
ticker: time.NewTicker(10 * time.Second),
recapTicker: time.NewTicker(20 * time.Second),
isCapping: false,
@ -87,7 +101,7 @@ func NewFirstFitSortedWattsClassMapWattsProacCC(tasks []def.Task, ignoreWatts bo
// mutex
var ffswClassMapWattsProacCCMutex sync.Mutex
func (s *FirstFitSortedWattsClassMapWattsProacCC) newTask(offer *mesos.Offer, task def.Task, newTaskClass string) *mesos.TaskInfo {
func (s *FirstFitSortedWattsClassMapWattsProacCC) newTask(offer *mesos.Offer, task def.Task, powerClass string) *mesos.TaskInfo {
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
s.tasksCreated++
@ -119,7 +133,7 @@ func (s *FirstFitSortedWattsClassMapWattsProacCC) newTask(offer *mesos.Offer, ta
}
if !s.ignoreWatts {
resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[newTaskClass]))
resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[powerClass]))
}
return &mesos.TaskInfo{
@ -239,7 +253,7 @@ func (s *FirstFitSortedWattsClassMapWattsProacCC) ResourceOffers(driver sched.Sc
// retrieving the available power for all the hosts in the offers.
for _, offer := range offers {
_, _, offerWatts := OfferAgg(offer)
_, _, offerWatts := offerUtils.OfferAgg(offer)
s.availablePower[*offer.Hostname] = offerWatts
// setting total power if the first time
if _, ok := s.totalPower[*offer.Hostname]; !ok {
@ -255,38 +269,27 @@ func (s *FirstFitSortedWattsClassMapWattsProacCC) ResourceOffers(driver sched.Sc
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, longFilter)
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
default:
}
offerCPU, offerRAM, offerWatts := OfferAgg(offer)
// First fit strategy
taken := false
offerTaken := false
for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i]
// Check host if it exists
if task.Host != "" {
// Don't take offer if it doens't match our task's host requirement.
if !strings.HasPrefix(*offer.Hostname, task.Host) {
continue
}
// Don't take offer if it doesn't match our task's host requirement
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
// Retrieving the node class from the offer
var nodeClass string
for _, attr := range offer.GetAttributes() {
if attr.GetName() == "class" {
nodeClass = attr.GetText().GetValue()
}
}
// retrieving the powerClass for the offer
powerClass := offerUtils.PowerClass(offer)
// Decision to take the offer or not
if (s.ignoreWatts || (offerWatts >= task.ClassToWatts[nodeClass])) &&
(offerCPU >= task.CPU) && (offerRAM >= task.RAM) {
if s.takeOffer(offer, powerClass, task) {
// Capping the cluster if haven't yet started
if !s.isCapping {
@ -296,7 +299,7 @@ func (s *FirstFitSortedWattsClassMapWattsProacCC) ResourceOffers(driver sched.Sc
s.startCapping()
}
fmt.Println("Watts being used: ", task.ClassToWatts[nodeClass])
fmt.Println("Watts being used: ", task.ClassToWatts[powerClass])
tempCap, err := s.capper.FCFSDeterminedCap(s.totalPower, &task)
if err == nil {
ffswClassMapWattsProacCCMutex.Lock()
@ -310,12 +313,12 @@ func (s *FirstFitSortedWattsClassMapWattsProacCC) ResourceOffers(driver sched.Sc
log.Println("Co-Located with: ")
coLocated(s.running[offer.GetSlaveId().GoString()])
taskToSchedule := s.newTask(offer, task, nodeClass)
taskToSchedule := s.newTask(offer, task, powerClass)
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, []*mesos.TaskInfo{taskToSchedule}, defaultFilter)
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, []*mesos.TaskInfo{taskToSchedule}, mesosUtils.DefaultFilter)
taken = true
offerTaken = true
fmt.Println("Inst: ", *task.Instances)
*task.Instances--
if *task.Instances <= 0 {
@ -335,12 +338,12 @@ func (s *FirstFitSortedWattsClassMapWattsProacCC) ResourceOffers(driver sched.Sc
}
// If there was no match for the task
if !taken {
if !offerTaken {
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := OfferAgg(offer)
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, defaultFilter)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
}

View file

@ -0,0 +1,228 @@
package schedulers
import (
"bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/electron/utilities/offerUtils"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler"
"log"
"os"
"sort"
"time"
)
// Decides if to take an offer or not
func (s *FirstFitSortedWattsSortedOffers) takeOffer(offer *mesos.Offer, task def.Task) bool {
cpus, mem, watts := offerUtils.OfferAgg(offer)
//TODO: Insert watts calculation here instead of taking them as a parameter
if cpus >= task.CPU && mem >= task.RAM && (s.ignoreWatts || watts >= task.Watts) {
return true
}
return false
}
// electronScheduler implements the Scheduler interface
type FirstFitSortedWattsSortedOffers struct {
base // Type embedded to inherit common functions
tasksCreated int
tasksRunning int
tasks []def.Task
metrics map[string]def.Metric
running map[string]map[string]bool
ignoreWatts bool
// First set of PCP values are garbage values, signal to logger to start recording when we're
// about to schedule a new task
RecordPCP bool
// This channel is closed when the program receives an interrupt,
// signalling that the program should shut down.
Shutdown chan struct{}
// This channel is closed after shutdown is closed, and only when all
// outstanding tasks have been cleaned up
Done chan struct{}
// Controls when to shutdown pcp logging
PCPLog chan struct{}
schedTrace *log.Logger
}
// New electron scheduler
func NewFirstFitSortedWattsSortedOffers(tasks []def.Task, ignoreWatts bool, schedTracePrefix string) *FirstFitSortedWattsSortedOffers {
// Sorting the tasks in increasing order of watts requirement.
sort.Sort(def.WattsSorter(tasks))
logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log")
if err != nil {
log.Fatal(err)
}
s := &FirstFitSortedWattsSortedOffers{
tasks: tasks,
ignoreWatts: ignoreWatts,
Shutdown: make(chan struct{}),
Done: make(chan struct{}),
PCPLog: make(chan struct{}),
running: make(map[string]map[string]bool),
RecordPCP: false,
schedTrace: log.New(logFile, "", log.LstdFlags),
}
return s
}
func (s *FirstFitSortedWattsSortedOffers) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
s.tasksCreated++
if !s.RecordPCP {
// Turn on logging
s.RecordPCP = true
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
}
// If this is our first time running into this Agent
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
}
// Add task to list of tasks running on node
s.running[offer.GetSlaveId().GoString()][taskName] = true
resources := []*mesos.Resource{
mesosutil.NewScalarResource("cpus", task.CPU),
mesosutil.NewScalarResource("mem", task.RAM),
}
if !s.ignoreWatts {
resources = append(resources, mesosutil.NewScalarResource("watts", task.Watts))
}
return &mesos.TaskInfo{
Name: proto.String(taskName),
TaskId: &mesos.TaskID{
Value: proto.String("electron-" + taskName),
},
SlaveId: offer.SlaveId,
Resources: resources,
Command: &mesos.CommandInfo{
Value: proto.String(task.CMD),
},
Container: &mesos.ContainerInfo{
Type: mesos.ContainerInfo_DOCKER.Enum(),
Docker: &mesos.ContainerInfo_DockerInfo{
Image: proto.String(task.Image),
Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated
},
},
}
}
func (s *FirstFitSortedWattsSortedOffers) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {
// Sorting the offers
sort.Sort(offerUtils.OffersSorter(offers))
// Printing the sorted offers and the corresponding CPU resource availability
log.Println("Sorted Offers:")
for i := 0; i < len(offers); i++ {
offer := offers[i]
offerCPU, _, _ := offerUtils.OfferAgg(offer)
log.Printf("Offer[%s].CPU = %f\n", offer.GetHostname(), offerCPU)
}
log.Printf("Received %d resource offers", len(offers))
for _, offer := range offers {
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
default:
}
tasks := []*mesos.TaskInfo{}
// First fit strategy
offerTaken := false
for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i]
// Don't take offer if it doesn't match our task's host requirement
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
// Decision to take the offer or not
if s.takeOffer(offer, task) {
log.Println("Co-Located with: ")
coLocated(s.running[offer.GetSlaveId().GoString()])
taskToSchedule := s.newTask(offer, task)
tasks = append(tasks, taskToSchedule)
log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
offerTaken = true
fmt.Println("Inst: ", *task.Instances)
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
*task.Instances--
if *task.Instances <= 0 {
// All instances of task have been scheduled, remove it
s.tasks = append(s.tasks[:i], s.tasks[i+1:]...)
if len(s.tasks) <= 0 {
log.Println("Done scheduling all tasks")
close(s.Shutdown)
}
}
break // Offer taken, move on
}
}
// If there was no match for the task
if !offerTaken {
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
}
func (s *FirstFitSortedWattsSortedOffers) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) {
log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
if *status.State == mesos.TaskState_TASK_RUNNING {
s.tasksRunning++
} else if IsTerminal(status.State) {
delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value)
s.tasksRunning--
if s.tasksRunning == 0 {
select {
case <-s.Shutdown:
close(s.Done)
default:
}
}
}
log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
}

View file

@ -2,6 +2,8 @@ package schedulers
import (
"bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/electron/utilities/offerUtils"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
@ -10,14 +12,13 @@ import (
"log"
"os"
"sort"
"strings"
"time"
)
// Decides if to take an offer or not
func (s *FirstFitSortedWatts) takeOffer(offer *mesos.Offer, task def.Task) bool {
cpus, mem, watts := OfferAgg(offer)
cpus, mem, watts := offerUtils.OfferAgg(offer)
//TODO: Insert watts calculation here instead of taking them as a parameter
@ -132,7 +133,7 @@ func (s *FirstFitSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offer
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, longFilter)
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
@ -143,16 +144,13 @@ func (s *FirstFitSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offer
// First fit strategy
taken := false
offerTaken := false
for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i]
// Check host if it exists
if task.Host != "" {
// Don't take offer if it doesn't match our task's host requirement
if !strings.HasPrefix(*offer.Hostname, task.Host) {
continue
}
// Don't take offer if it doesn't match our task's host requirement
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
// Decision to take the offer or not
@ -165,9 +163,9 @@ func (s *FirstFitSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offer
tasks = append(tasks, taskToSchedule)
log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter)
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
taken = true
offerTaken = true
fmt.Println("Inst: ", *task.Instances)
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
@ -187,12 +185,12 @@ func (s *FirstFitSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offer
}
// If there was no match for the task
if !taken {
if !offerTaken {
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := OfferAgg(offer)
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, defaultFilter)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}

View file

@ -2,6 +2,8 @@ package schedulers
import (
"bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/electron/utilities/offerUtils"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
@ -9,14 +11,13 @@ import (
sched "github.com/mesos/mesos-go/scheduler"
"log"
"os"
"strings"
"time"
)
// Decides if to take an offer or not
func (*FirstFitWattsOnly) takeOffer(offer *mesos.Offer, task def.Task) bool {
_, _, watts := OfferAgg(offer)
_, _, watts := offerUtils.OfferAgg(offer)
//TODO: Insert watts calculation here instead of taking them as a parameter
@ -123,7 +124,7 @@ func (s *FirstFitWattsOnly) ResourceOffers(driver sched.SchedulerDriver, offers
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, longFilter)
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
@ -134,16 +135,13 @@ func (s *FirstFitWattsOnly) ResourceOffers(driver sched.SchedulerDriver, offers
// First fit strategy
taken := false
offerTaken := false
for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i]
// Check host if it exists
if task.Host != "" {
// Don't take offer if it doesn't match our task's host requirement
if !strings.HasPrefix(*offer.Hostname, task.Host) {
continue
}
// Don't take offer if it doesn't match our task's host requirement
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
// Decision to take the offer or not
@ -156,9 +154,9 @@ func (s *FirstFitWattsOnly) ResourceOffers(driver sched.SchedulerDriver, offers
tasks = append(tasks, taskToSchedule)
log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter)
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
taken = true
offerTaken = true
fmt.Println("Inst: ", *task.Instances)
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
@ -179,12 +177,12 @@ func (s *FirstFitWattsOnly) ResourceOffers(driver sched.SchedulerDriver, offers
}
// If there was no match for the task
if !taken {
if !offerTaken {
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := OfferAgg(offer)
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, defaultFilter)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}

View file

@ -2,33 +2,12 @@ package schedulers
import (
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
"log"
"bitbucket.org/sunybingcloud/electron/def"
mesos "github.com/mesos/mesos-go/mesosproto"
"bitbucket.org/sunybingcloud/electron/utilities/offerUtils"
)
var (
defaultFilter = &mesos.Filters{RefuseSeconds: proto.Float64(1)}
longFilter = &mesos.Filters{RefuseSeconds: proto.Float64(1000)}
)
func OfferAgg(offer *mesos.Offer) (float64, float64, float64) {
var cpus, mem, watts float64
for _, resource := range offer.Resources {
switch resource.GetName() {
case "cpus":
cpus += *resource.GetScalar().Value
case "mem":
mem += *resource.GetScalar().Value
case "watts":
watts += *resource.GetScalar().Value
}
}
return cpus, mem, watts
}
func coLocated(tasks map[string]bool) {
for task := range tasks {
@ -37,3 +16,22 @@ func coLocated(tasks map[string]bool) {
fmt.Println("---------------------")
}
/*
Determine the watts value to consider for each task.
This value could either be task.Watts or task.ClassToWatts[<power class>]
If task.ClassToWatts is not present, then return task.Watts (this would be for workloads which don't have classMapWatts)
*/
func wattsToConsider(task def.Task, classMapWatts bool, offer *mesos.Offer) float64 {
if classMapWatts {
// checking if ClassToWatts was present in the workload.
if task.ClassToWatts != nil {
return task.ClassToWatts[offerUtils.PowerClass(offer)]
} else {
return task.Watts
}
} else {
return task.Watts
}
}

View file

@ -3,8 +3,10 @@ package schedulers
import (
"bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/pcp"
powCap "bitbucket.org/sunybingcloud/electron/powerCapping"
"bitbucket.org/sunybingcloud/electron/rapl"
"bitbucket.org/sunybingcloud/electron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/electron/utilities/offerUtils"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
@ -13,16 +15,15 @@ import (
"log"
"math"
"os"
"strings"
"sync"
"time"
)
// Decides if to take an offer or not
func (_ *ProactiveClusterwideCapFCFS) takeOffer(offer *mesos.Offer, task def.Task) bool {
offer_cpu, offer_mem, offer_watts := OfferAgg(offer)
func (s *ProactiveClusterwideCapFCFS) takeOffer(offer *mesos.Offer, task def.Task) bool {
offer_cpu, offer_mem, offer_watts := offerUtils.OfferAgg(offer)
if offer_cpu >= task.CPU && offer_mem >= task.RAM && offer_watts >= task.Watts {
if offer_cpu >= task.CPU && offer_mem >= task.RAM && (s.ignoreWatts || (offer_watts >= task.Watts)) {
return true
}
return false
@ -40,7 +41,7 @@ type ProactiveClusterwideCapFCFS struct {
availablePower map[string]float64 // available power for each node in the cluster.
totalPower map[string]float64 // total power for each node in the cluster.
ignoreWatts bool
capper *pcp.ClusterwideCapper
capper *powCap.ClusterwideCapper
ticker *time.Ticker
recapTicker *time.Ticker
isCapping bool // indicate whether we are currently performing cluster wide capping.
@ -83,7 +84,7 @@ func NewProactiveClusterwideCapFCFS(tasks []def.Task, ignoreWatts bool, schedTra
availablePower: make(map[string]float64),
totalPower: make(map[string]float64),
RecordPCP: false,
capper: pcp.GetClusterwideCapperInstance(),
capper: powCap.GetClusterwideCapperInstance(),
ticker: time.NewTicker(10 * time.Second),
recapTicker: time.NewTicker(20 * time.Second),
isCapping: false,
@ -240,7 +241,7 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive
// retrieving the available power for all the hosts in the offers.
for _, offer := range offers {
_, _, offer_watts := OfferAgg(offer)
_, _, offer_watts := offerUtils.OfferAgg(offer)
s.availablePower[*offer.Hostname] = offer_watts
// setting total power if the first time.
if _, ok := s.totalPower[*offer.Hostname]; !ok {
@ -256,7 +257,7 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, longFilter)
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
@ -273,12 +274,12 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive
Cluster wide capping is currently performed at regular intervals of time.
*/
taken := false
offerTaken := false
for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i]
// Don't take offer if it doesn't match our task's host requirement.
if !strings.HasPrefix(*offer.Hostname, task.Host) {
// Don't take offer if it doesn't match our task's host requirement
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
@ -291,7 +292,7 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive
fcfsMutex.Unlock()
s.startCapping()
}
taken = true
offerTaken = true
tempCap, err := s.capper.FCFSDeterminedCap(s.totalPower, &task)
if err == nil {
@ -305,7 +306,7 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive
log.Printf("Starting on [%s]\n", offer.GetHostname())
taskToSchedule := s.newTask(offer, task)
toSchedule := []*mesos.TaskInfo{taskToSchedule}
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, toSchedule, defaultFilter)
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, toSchedule, mesosUtils.DefaultFilter)
log.Printf("Inst: %d", *task.Instances)
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
*task.Instances--
@ -329,12 +330,12 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive
}
// If no task fit the offer, then declining the offer.
if !taken {
if !offerTaken {
log.Printf("There is not enough resources to launch a task on Host: %s\n", offer.GetHostname())
cpus, mem, watts := OfferAgg(offer)
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, defaultFilter)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
}

View file

@ -13,8 +13,10 @@ package schedulers
import (
"bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/pcp"
powCap "bitbucket.org/sunybingcloud/electron/powerCapping"
"bitbucket.org/sunybingcloud/electron/rapl"
"bitbucket.org/sunybingcloud/electron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/electron/utilities/offerUtils"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
@ -24,16 +26,15 @@ import (
"math"
"os"
"sort"
"strings"
"sync"
"time"
)
// Decides if to taken an offer or not
func (_ *ProactiveClusterwideCapRanked) takeOffer(offer *mesos.Offer, task def.Task) bool {
offer_cpu, offer_mem, offer_watts := OfferAgg(offer)
func (s *ProactiveClusterwideCapRanked) takeOffer(offer *mesos.Offer, task def.Task) bool {
offer_cpu, offer_mem, offer_watts := offerUtils.OfferAgg(offer)
if offer_cpu >= task.CPU && offer_mem >= task.RAM && offer_watts >= task.Watts {
if offer_cpu >= task.CPU && offer_mem >= task.RAM && (s.ignoreWatts || (offer_watts >= task.Watts)) {
return true
}
return false
@ -51,7 +52,7 @@ type ProactiveClusterwideCapRanked struct {
availablePower map[string]float64 // available power for each node in the cluster.
totalPower map[string]float64 // total power for each node in the cluster.
ignoreWatts bool
capper *pcp.ClusterwideCapper
capper *powCap.ClusterwideCapper
ticker *time.Ticker
recapTicker *time.Ticker
isCapping bool // indicate whether we are currently performing cluster wide capping.
@ -94,7 +95,7 @@ func NewProactiveClusterwideCapRanked(tasks []def.Task, ignoreWatts bool, schedT
availablePower: make(map[string]float64),
totalPower: make(map[string]float64),
RecordPCP: false,
capper: pcp.GetClusterwideCapperInstance(),
capper: powCap.GetClusterwideCapperInstance(),
ticker: time.NewTicker(10 * time.Second),
recapTicker: time.NewTicker(20 * time.Second),
isCapping: false,
@ -251,7 +252,7 @@ func (s *ProactiveClusterwideCapRanked) ResourceOffers(driver sched.SchedulerDri
// retrieving the available power for all the hosts in the offers.
for _, offer := range offers {
_, _, offer_watts := OfferAgg(offer)
_, _, offer_watts := offerUtils.OfferAgg(offer)
s.availablePower[*offer.Hostname] = offer_watts
// setting total power if the first time.
if _, ok := s.totalPower[*offer.Hostname]; !ok {
@ -277,7 +278,7 @@ func (s *ProactiveClusterwideCapRanked) ResourceOffers(driver sched.SchedulerDri
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, longFilter)
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
@ -297,12 +298,12 @@ func (s *ProactiveClusterwideCapRanked) ResourceOffers(driver sched.SchedulerDri
Cluster wide capping is currently performed at regular intervals of time.
*/
taken := false
offerTaken := false
for i := 0; i < len(s.tasks); i++ {
task := s.tasks[i]
// Don't take offer if it doesn't match our task's host requirement.
if !strings.HasPrefix(*offer.Hostname, task.Host) {
// Don't take offer if it doesn't match our task's host requirement
if offerUtils.HostMismatch(*offer.Hostname, task.Host) {
continue
}
@ -315,7 +316,7 @@ func (s *ProactiveClusterwideCapRanked) ResourceOffers(driver sched.SchedulerDri
rankedMutex.Unlock()
s.startCapping()
}
taken = true
offerTaken = true
tempCap, err := s.capper.FCFSDeterminedCap(s.totalPower, &task)
if err == nil {
@ -328,7 +329,7 @@ func (s *ProactiveClusterwideCapRanked) ResourceOffers(driver sched.SchedulerDri
log.Printf("Starting on [%s]\n", offer.GetHostname())
taskToSchedule := s.newTask(offer, task)
to_schedule := []*mesos.TaskInfo{taskToSchedule}
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, to_schedule, defaultFilter)
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, to_schedule, mesosUtils.DefaultFilter)
log.Printf("Inst: %d", *task.Instances)
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
*task.Instances--
@ -352,12 +353,12 @@ func (s *ProactiveClusterwideCapRanked) ResourceOffers(driver sched.SchedulerDri
}
// If no tasks fit the offer, then declining the offer.
if !taken {
if !offerTaken {
log.Printf("There is not enough resources to launch a task on Host: %s\n", offer.GetHostname())
cpus, mem, watts := OfferAgg(offer)
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, defaultFilter)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
}

329
schedulers/topHeavy.go Normal file
View file

@ -0,0 +1,329 @@
package schedulers
import (
"bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/def"
"bitbucket.org/sunybingcloud/electron/utilities/mesosUtils"
"bitbucket.org/sunybingcloud/electron/utilities/offerUtils"
"fmt"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler"
"log"
"math"
"os"
"sort"
"time"
)
/*
Tasks are categorized into small and large tasks based on the watts requirement.
All the large tasks are packed into offers from agents belonging to power class A and power class B, using BinPacking.
All the small tasks are spread among the offers from agents belonging to power class C, using FirstFit.
This was done to give a little more room for the large tasks (power intensive) for execution and reduce the possibility of
starvation of power intensive tasks.
*/
// electronScheduler implements the Scheduler interface
type TopHeavy struct {
base // Type embedded to inherit common functions
tasksCreated int
tasksRunning int
tasks []def.Task
metrics map[string]def.Metric
running map[string]map[string]bool
ignoreWatts bool
smallTasks, largeTasks []def.Task
// First set of PCP values are garbage values, signal to logger to start recording when we're
// about to schedule a new task
RecordPCP bool
// This channel is closed when the program receives an interrupt,
// signalling that the program should shut down.
Shutdown chan struct{}
// This channel is closed after shutdown is closed, and only when all
// outstanding tasks have been cleaned up
Done chan struct{}
// Controls when to shutdown pcp logging
PCPLog chan struct{}
schedTrace *log.Logger
}
// New electron scheduler
func NewPackSmallSpreadBig(tasks []def.Task, ignoreWatts bool, schedTracePrefix string) *TopHeavy {
sort.Sort(def.WattsSorter(tasks))
logFile, err := os.Create("./" + schedTracePrefix + "_schedTrace.log")
if err != nil {
log.Fatal(err)
}
// Separating small tasks from large tasks.
// Classification done based on MMPU watts requirements.
mid := int(math.Floor((float64(len(tasks)) / 2.0) + 0.5))
s := &TopHeavy{
smallTasks: tasks[:mid],
largeTasks: tasks[mid+1:],
ignoreWatts: ignoreWatts,
Shutdown: make(chan struct{}),
Done: make(chan struct{}),
PCPLog: make(chan struct{}),
running: make(map[string]map[string]bool),
RecordPCP: false,
schedTrace: log.New(logFile, "", log.LstdFlags),
}
return s
}
func (s *TopHeavy) newTask(offer *mesos.Offer, task def.Task, newTaskClass string) *mesos.TaskInfo {
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
s.tasksCreated++
if !s.RecordPCP {
// Turn on logging
s.RecordPCP = true
time.Sleep(1 * time.Second) // Make sure we're recording by the time the first task starts
}
// If this is our first time running into this Agent
if _, ok := s.running[offer.GetSlaveId().GoString()]; !ok {
s.running[offer.GetSlaveId().GoString()] = make(map[string]bool)
}
// Add task to list of tasks running on node
s.running[offer.GetSlaveId().GoString()][taskName] = true
resources := []*mesos.Resource{
mesosutil.NewScalarResource("cpus", task.CPU),
mesosutil.NewScalarResource("mem", task.RAM),
}
if !s.ignoreWatts {
resources = append(resources, mesosutil.NewScalarResource("watts", task.ClassToWatts[newTaskClass]))
}
return &mesos.TaskInfo{
Name: proto.String(taskName),
TaskId: &mesos.TaskID{
Value: proto.String("electron-" + taskName),
},
SlaveId: offer.SlaveId,
Resources: resources,
Command: &mesos.CommandInfo{
Value: proto.String(task.CMD),
},
Container: &mesos.ContainerInfo{
Type: mesos.ContainerInfo_DOCKER.Enum(),
Docker: &mesos.ContainerInfo_DockerInfo{
Image: proto.String(task.Image),
Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(), // Run everything isolated
},
},
}
}
// Shut down scheduler if no more tasks to schedule
func (s *TopHeavy) shutDownIfNecessary() {
if len(s.smallTasks) <= 0 && len(s.largeTasks) <= 0 {
log.Println("Done scheduling all tasks")
close(s.Shutdown)
}
}
// create TaskInfo and log scheduling trace
func (s *TopHeavy) createTaskInfoAndLogSchedTrace(offer *mesos.Offer,
powerClass string, task def.Task) *mesos.TaskInfo {
log.Println("Co-Located with:")
coLocated(s.running[offer.GetSlaveId().GoString()])
taskToSchedule := s.newTask(offer, task, powerClass)
fmt.Println("Inst: ", *task.Instances)
s.schedTrace.Print(offer.GetHostname() + ":" + taskToSchedule.GetTaskId().GetValue())
*task.Instances--
return taskToSchedule
}
// Using BinPacking to pack small tasks into this offer.
func (s *TopHeavy) pack(offers []*mesos.Offer, driver sched.SchedulerDriver) {
for _, offer := range offers {
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
default:
}
tasks := []*mesos.TaskInfo{}
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
totalWatts := 0.0
totalCPU := 0.0
totalRAM := 0.0
taken := false
for i := 0; i < len(s.smallTasks); i++ {
task := s.smallTasks[i]
for *task.Instances > 0 {
powerClass := offerUtils.PowerClass(offer)
// Does the task fit
// OR lazy evaluation. If ignore watts is set to true, second statement won't
// be evaluated.
wattsToConsider := task.Watts
if !s.ignoreWatts {
wattsToConsider = task.ClassToWatts[powerClass]
}
if (s.ignoreWatts || (offerWatts >= (totalWatts + wattsToConsider))) &&
(offerCPU >= (totalCPU + task.CPU)) &&
(offerRAM >= (totalRAM + task.RAM)) {
taken = true
totalWatts += wattsToConsider
totalCPU += task.CPU
totalRAM += task.RAM
tasks = append(tasks, s.createTaskInfoAndLogSchedTrace(offer, powerClass, task))
if *task.Instances <= 0 {
// All instances of task have been scheduled, remove it
s.smallTasks = append(s.smallTasks[:i], s.smallTasks[i+1:]...)
s.shutDownIfNecessary()
}
} else {
break // Continue on to next task
}
}
}
if taken {
log.Printf("Starting on [%s]\n", offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
} else {
// If there was no match for the task
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
}
// Using first fit to spread large tasks into these offers.
func (s *TopHeavy) spread(offers []*mesos.Offer, driver sched.SchedulerDriver) {
for _, offer := range offers {
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
default:
}
tasks := []*mesos.TaskInfo{}
offerCPU, offerRAM, offerWatts := offerUtils.OfferAgg(offer)
offerTaken := false
for i := 0; i < len(s.largeTasks); i++ {
task := s.largeTasks[i]
powerClass := offerUtils.PowerClass(offer)
// Decision to take the offer or not
wattsToConsider := task.Watts
if !s.ignoreWatts {
wattsToConsider = task.ClassToWatts[powerClass]
}
if (s.ignoreWatts || (offerWatts >= wattsToConsider)) &&
(offerCPU >= task.CPU) && (offerRAM >= task.RAM) {
offerTaken = true
tasks = append(tasks, s.createTaskInfoAndLogSchedTrace(offer, powerClass, task))
log.Printf("Starting %s on [%s]\n", task.Name, offer.GetHostname())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, mesosUtils.DefaultFilter)
if *task.Instances <= 0 {
// All instances of task have been scheduled, remove it
s.largeTasks = append(s.largeTasks[:i], s.largeTasks[i+1:]...)
s.shutDownIfNecessary()
}
break // Offer taken, move on
}
}
if !offerTaken {
// If there was no match for the task
fmt.Println("There is not enough resources to launch a task:")
cpus, mem, watts := offerUtils.OfferAgg(offer)
log.Printf("<CPU: %f, RAM: %f, Watts: %f>\n", cpus, mem, watts)
driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
}
}
}
func (s *TopHeavy) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {
log.Printf("Received %d resource offers", len(offers))
// We need to separate the offers into
// offers from ClassA and ClassB and offers from ClassC.
// Offers from ClassA and ClassB would execute the large tasks.
// Offers from ClassC would execute the small tasks.
offersClassAB := []*mesos.Offer{}
offersClassC := []*mesos.Offer{}
for _, offer := range offers {
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
driver.DeclineOffer(offer.Id, mesosUtils.LongFilter)
log.Println("Number of tasks still running: ", s.tasksRunning)
continue
default:
}
if constants.PowerClasses["ClassA"][*offer.Hostname] ||
constants.PowerClasses["ClassB"][*offer.Hostname] {
offersClassAB = append(offersClassAB, offer)
} else if constants.PowerClasses["ClassC"][*offer.Hostname] {
offersClassC = append(offersClassC, offer)
}
}
log.Println("ClassAB Offers:")
for _, o := range offersClassAB {
log.Println(*o.Hostname)
}
log.Println("ClassC Offers:")
for _, o := range offersClassC {
log.Println(*o.Hostname)
}
// Packing tasks into offersClassC
s.pack(offersClassC, driver)
// Spreading tasks among offersClassAB
s.spread(offersClassAB, driver)
}
func (s *TopHeavy) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) {
log.Printf("Received task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
if *status.State == mesos.TaskState_TASK_RUNNING {
s.tasksRunning++
} else if IsTerminal(status.State) {
delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value)
s.tasksRunning--
if s.tasksRunning == 0 {
select {
case <-s.Shutdown:
close(s.Done)
default:
}
}
}
log.Printf("DONE: Task status [%s] for task [%s]", NameFor(status.State), *status.TaskId.Value)
}

View file

@ -0,0 +1,11 @@
package mesosUtils
import (
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
)
var (
DefaultFilter = &mesos.Filters{RefuseSeconds: proto.Float64(1)}
LongFilter = &mesos.Filters{RefuseSeconds: proto.Float64(1000)}
)

View file

@ -0,0 +1,62 @@
package offerUtils
import (
mesos "github.com/mesos/mesos-go/mesosproto"
"strings"
)
func OfferAgg(offer *mesos.Offer) (float64, float64, float64) {
var cpus, mem, watts float64
for _, resource := range offer.Resources {
switch resource.GetName() {
case "cpus":
cpus += *resource.GetScalar().Value
case "mem":
mem += *resource.GetScalar().Value
case "watts":
watts += *resource.GetScalar().Value
}
}
return cpus, mem, watts
}
// Determine the power class of the host in the offer
func PowerClass(offer *mesos.Offer) string {
var powerClass string
for _, attr := range offer.GetAttributes() {
if attr.GetName() == "class" {
powerClass = attr.GetText().GetValue()
}
}
return powerClass
}
// Implements the sort.Sort interface to sort Offers based on CPU.
// TODO: Have a generic sorter that sorts based on a defined requirement (CPU, RAM, DISK or Watts)
type OffersSorter []*mesos.Offer
func (offersSorter OffersSorter) Len() int {
return len(offersSorter)
}
func (offersSorter OffersSorter) Swap(i, j int) {
offersSorter[i], offersSorter[j] = offersSorter[j], offersSorter[i]
}
func (offersSorter OffersSorter) Less(i, j int) bool {
// getting CPU resource availability of offersSorter[i]
cpu1, _, _ := OfferAgg(offersSorter[i])
// getting CPU resource availability of offersSorter[j]
cpu2, _, _ := OfferAgg(offersSorter[j])
return cpu1 <= cpu2
}
// Is there a mismatch between the task's host requirement and the host corresponding to the offer.
func HostMismatch(offerHost string, taskHost string) bool {
if taskHost != "" && !strings.HasPrefix(offerHost, taskHost) {
return true
}
return false
}

View file

@ -1,7 +1,7 @@
/*
A utility to calculate the running average.
One should implement Val() to be able to use this utility.
One should implement Val() and ID() to use this utility.
*/
package runAvg
@ -19,9 +19,9 @@ type Interface interface {
}
type runningAverageCalculator struct {
window list.List
windowSize int
currentSum float64
considerationWindow list.List
considerationWindowSize int
currentSum float64
}
// singleton instance
@ -31,14 +31,14 @@ var racSingleton *runningAverageCalculator
func getInstance(curSum float64, wSize int) *runningAverageCalculator {
if racSingleton == nil {
racSingleton = &runningAverageCalculator{
windowSize: wSize,
currentSum: curSum,
considerationWindowSize: wSize,
currentSum: curSum,
}
return racSingleton
} else {
// Updating window size if a new window size is given.
if wSize != racSingleton.windowSize {
racSingleton.windowSize = wSize
if wSize != racSingleton.considerationWindowSize {
racSingleton.considerationWindowSize = wSize
}
return racSingleton
}
@ -47,20 +47,20 @@ func getInstance(curSum float64, wSize int) *runningAverageCalculator {
// Compute the running average by adding 'data' to the window.
// Updating currentSum to get constant time complexity for every running average computation.
func (rac *runningAverageCalculator) calculate(data Interface) float64 {
if rac.window.Len() < rac.windowSize {
rac.window.PushBack(data)
if rac.considerationWindow.Len() < rac.considerationWindowSize {
rac.considerationWindow.PushBack(data)
rac.currentSum += data.Val()
} else {
// removing the element at the front of the window.
elementToRemove := rac.window.Front()
elementToRemove := rac.considerationWindow.Front()
rac.currentSum -= elementToRemove.Value.(Interface).Val()
rac.window.Remove(elementToRemove)
rac.considerationWindow.Remove(elementToRemove)
// adding new element to the window
rac.window.PushBack(data)
rac.considerationWindow.PushBack(data)
rac.currentSum += data.Val()
}
return rac.currentSum / float64(rac.window.Len())
return rac.currentSum / float64(rac.considerationWindow.Len())
}
/*
@ -68,9 +68,9 @@ If element with given ID present in the window, then remove it and return (remov
Else, return (nil, error)
*/
func (rac *runningAverageCalculator) removeFromWindow(id string) (interface{}, error) {
for element := rac.window.Front(); element != nil; element = element.Next() {
for element := rac.considerationWindow.Front(); element != nil; element = element.Next() {
if elementToRemove := element.Value.(Interface); elementToRemove.ID() == id {
rac.window.Remove(element)
rac.considerationWindow.Remove(element)
rac.currentSum -= elementToRemove.Val()
return elementToRemove, nil
}
@ -102,7 +102,7 @@ func Init() {
}
// Setting parameters to default values. Could also set racSingleton to nil but this leads to unnecessary overhead of creating
// another instance when Calc is called.
racSingleton.window.Init()
racSingleton.windowSize = 0
racSingleton.considerationWindow.Init()
racSingleton.considerationWindowSize = 0
racSingleton.currentSum = 0.0
}