Merged in hostsConstantUpdation (pull request #15)

Dynamic updation of constant.Hosts and constants.PowerClasses.

Approved-by: Renan DelValle <rdelval1@binghamton.edu>
Approved-by: Pradyumna Kaushik <pkaushi1@binghamton.edu>
This commit is contained in:
ajain13 2017-03-26 00:37:41 +00:00 committed by Renan DelValle
commit 84c14f0c2f
24 changed files with 112 additions and 95 deletions

View file

@ -8,9 +8,8 @@ To Do:
* Add ability to use constraints
* Running average calculations https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
* Make parameters corresponding to each scheduler configurable (possible to have a config template for each scheduler?)
* TODO : Adding type of scheduler to be used, to be picked from a config file, along with it's configurable parameters.
* Adding type of scheduler to be used, to be picked from a config file, along with it's configurable parameters.
* Write test code for each scheduler (This should be after the design change)
* Some of the constants in constants/constants.go can vary based on the environment.
Possible to setup the constants at runtime based on the environment?
* Log fix for declining offer -- different reason when insufficient resources as compared to when there are no
longer any tasks to schedule.
@ -18,6 +17,8 @@ To Do:
* Make def.Task an interface for further modularization and flexibility.
* Convert def#WattsToConsider(...) to be a receiver of def.Task and change the name of it to Watts(...).
* Have a generic sorter for task resources instead of having one for each kind of resource.
* Handle powerclass not configured on a node condition. As of now, an assumption is made that the powerclass is configured
for all the nodes.
**Requires [Performance Co-Pilot](http://pcp.io/) tool pmdumptext to be installed on the
machine on which electron is launched for logging to work and PCP collector agents installed
@ -28,9 +29,9 @@ How to run (Use the --help option to get information about other command-line op
`./electron -workload <workload json>`
To run electron with ignoreWatts, run the following command,
To run electron with Watts as Resource, run the following command,
`./electron -workload <workload json> -ignoreWatts`
`./electron -workload <workload json> -wattsAsAResource`
Workload schema:
@ -41,30 +42,31 @@ Workload schema:
"name": "minife",
"cpu": 3.0,
"ram": 4096,
"watts": 50,
"image": "gouravr/minife:v5",
"cmd": "cd src && mpirun -np 1 miniFE.x -nx 100 -ny 100 -nz 100",
"inst": 9,
"class_to_watts" : {
"A": 30.2475289996,
"B": 35.6491229228,
"C": 24.0476734352
}
"watts": 63.141,
"class_to_watts": {
"A": 93.062,
"B": 65.552,
"C": 57.897,
"D": 60.729
},
"image": "rdelvalle/minife:electron1",
"cmd": "cd src && mpirun -np 3 miniFE.x -nx 100 -ny 100 -nz 100",
"inst": 10
},
{
"name": "dgemm",
"cpu": 3.0,
"ram": 4096,
"watts": 50,
"image": "gouravr/dgemm:v2",
"ram": 32,
"watts": 85.903,
"class_to_watts": {
"A": 114.789,
"B": 89.133,
"C": 82.672,
"D": 81.944
},
"image": "rdelvalle/dgemm:electron1",
"cmd": "/./mt-dgemm 1024",
"inst": 9,
"class_to_watts" : {
"A": 35.2475289996,
"B": 25.6491229228,
"C": 29.0476734352
}
"inst": 10
}
]
```

View file

@ -7,34 +7,14 @@ TODO: Clean this up and use Mesos Attributes instead.
*/
package constants
var Hosts = []string{"stratos-001.cs.binghamton.edu", "stratos-002.cs.binghamton.edu",
"stratos-003.cs.binghamton.edu", "stratos-004.cs.binghamton.edu",
"stratos-005.cs.binghamton.edu", "stratos-006.cs.binghamton.edu",
"stratos-007.cs.binghamton.edu", "stratos-008.cs.binghamton.edu"}
var Hosts = make(map[string]struct{})
/*
Classification of the nodes in the cluster based on their Thermal Design Power (TDP).
The power classes are labelled in the decreasing order of the corresponding TDP, with class A nodes
having the highest TDP and class C nodes having the lowest TDP.
*/
var PowerClasses = map[string]map[string]bool{
"A": map[string]bool{
"stratos-005.cs.binghamton.edu": true,
"stratos-006.cs.binghamton.edu": true,
},
"B": map[string]bool{
"stratos-007.cs.binghamton.edu": true,
},
"C": map[string]bool{
"stratos-008.cs.binghamton.edu": true,
},
"D": map[string]bool {
"stratos-001.cs.binghamton.edu": true,
"stratos-002.cs.binghamton.edu": true,
"stratos-003.cs.binghamton.edu": true,
"stratos-004.cs.binghamton.edu": true,
},
}
var PowerClasses = make(map[string]map[string]struct{})
/*
Margin with respect to the required power for a job.

View file

@ -43,7 +43,7 @@ func TasksFromJSON(uri string) ([]Task, error) {
func (tsk *Task) UpdateHost(newHost string) bool {
// Validation
isCorrectHost := false
for _, existingHost := range constants.Hosts {
for existingHost, _ := range constants.Hosts {
if newHost == existingHost {
isCorrectHost = true
}

View file

@ -3,6 +3,7 @@ package pcp
import (
"bitbucket.org/sunybingcloud/electron/constants"
"bitbucket.org/sunybingcloud/electron/rapl"
"bitbucket.org/sunybingcloud/electron/utilities"
"bufio"
"container/ring"
"log"
@ -14,7 +15,6 @@ import (
"strings"
"syscall"
"time"
"bitbucket.org/sunybingcloud/electron/utilities"
)
func round(num float64) int {
@ -202,7 +202,7 @@ func StartPCPLogAndProgressiveExtremaCap(quit chan struct{}, logging *bool, pref
// If cannot find any victim, then all nodes have been capped to the maximum and we stop capping at this point.
}
}
if (!canCapAlreadyCappedVictim) {
if !canCapAlreadyCappedVictim {
log.Println("No Victim left to cap.")
}
}

View file

@ -37,15 +37,6 @@ func Start(quit chan struct{}, logging *bool, prefix string) {
// Write to logfile
logFile.WriteString(scanner.Text() + "\n")
/*
headers := strings.Split(scanner.Text(), ",")
for _, hostMetric := range headers {
split := strings.Split(hostMetric, ":")
fmt.Printf("Host %s: Metric: %s\n", split[0], split[1])
}
*/
// Throw away first set of results
scanner.Scan()
@ -57,15 +48,7 @@ func Start(quit chan struct{}, logging *bool, prefix string) {
logFile.WriteString(scanner.Text() + "\n")
}
/*
fmt.Printf("Second: %d\n", seconds)
for i, val := range strings.Split(scanner.Text(), ",") {
fmt.Printf("host metric: %s val: %s\n", headers[i], val)
}*/
seconds++
// fmt.Println("--------------------------------")
}
}(logging)

View file

@ -110,7 +110,7 @@ func (capper ClusterwideCapper) CleverRecap(totalPower map[string]float64,
wattsUsages := make(map[string][]float64)
hostOfFinishedTask := ""
indexOfFinishedTask := -1
for _, host := range constants.Hosts {
for host, _ := range constants.Hosts {
wattsUsages[host] = []float64{0.0}
}
for host, tasks := range taskMonitor {

View file

@ -43,7 +43,7 @@ func main() {
}
if *hiThreshold < *loThreshold {
fmt.Println("High threshold is of a lower value than low threhold.")
fmt.Println("High threshold is of a lower value than low threshold.")
os.Exit(1)
}
@ -74,12 +74,12 @@ func main() {
return
}
//go pcp.Start(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix)
go pcp.Start(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix)
//go pcp.StartPCPLogAndExtremaDynamicCap(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix, *hiThreshold, *loThreshold)
go pcp.StartPCPLogAndProgressiveExtremaCap(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix, *hiThreshold, *loThreshold)
//go pcp.StartPCPLogAndProgressiveExtremaCap(scheduler.PCPLog, &scheduler.RecordPCP, logPrefix, *hiThreshold, *loThreshold)
time.Sleep(1 * time.Second) // Take a second between starting PCP log and continuing
// Attempt to handle signint to not leave pmdumptext running
// Attempt to handle SIGINT to not leave pmdumptext running
// Catch interrupt
go func() {
c := make(chan os.Signal, 1)
@ -120,4 +120,4 @@ func main() {
log.Printf("Framework stopped with status %s and error: %s\n", status.String(), err.Error())
}
log.Println("Exiting...")
}
}

View file

@ -127,6 +127,7 @@ func (s *BinPackSortedWattsSortedOffers) ResourceOffers(driver sched.SchedulerDr
log.Println("Sorted Offers:")
for i := 0; i < len(offers); i++ {
offer := offers[i]
offerUtils.UpdateEnvironment(offer)
offerCPU, _, _ := offerUtils.OfferAgg(offer)
log.Printf("Offer[%s].CPU = %f\n", offer.GetHostname(), offerCPU)
}

View file

@ -210,6 +210,7 @@ func (s *BinPackedPistonCapper) ResourceOffers(driver sched.SchedulerDriver, off
// retrieving the total power for each host in the offers
for _, offer := range offers {
offerUtils.UpdateEnvironment(offer)
if _, ok := s.totalPower[*offer.Hostname]; !ok {
_, _, offerWatts := offerUtils.OfferAgg(offer)
s.totalPower[*offer.Hostname] = offerWatts

View file

@ -120,6 +120,7 @@ func (s *BinPackSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offers
log.Printf("Received %d resource offers", len(offers))
for _, offer := range offers {
offerUtils.UpdateEnvironment(offer)
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")

View file

@ -18,12 +18,12 @@ import (
)
/*
Tasks are categorized into small and large tasks based on the watts requirement.
All the small tasks are packed into offers from agents belonging to power class C and power class D, using BinPacking.
All the large tasks are spread among the offers from agents belonging to power class A and power class B, using FirstFit.
Tasks are categorized into small and large tasks based on watts requirements.
All the large tasks are packed into offers from agents belonging to power classes A and B, using Bin-Packing.
All the small tasks are spread among offers from agents belonging to power class C and D, using First-Fit.
BinPacking has the most effect when co-scheduling of tasks is increased. Large tasks typically utilize more resources and hence,
co-scheduling them has a great impact on the total power utilization.
Bin-Packing has the most effect when co-scheduling of tasks is increased. Large tasks typically utilize more resources and hence,
co-scheduling them has a great impact on the total power utilization.
*/
func (s *BottomHeavy) takeOfferBinPack(offer *mesos.Offer, totalCPU, totalRAM, totalWatts,
@ -159,7 +159,7 @@ func (s *BottomHeavy) createTaskInfoAndLogSchedTrace(offer *mesos.Offer, task de
return taskToSchedule
}
// Using BinPacking to pack small tasks into this offer.
// Using BinPacking to pack large tasks into the given offers.
func (s *BottomHeavy) pack(offers []*mesos.Offer, driver sched.SchedulerDriver) {
for _, offer := range offers {
select {
@ -221,7 +221,7 @@ func (s *BottomHeavy) pack(offers []*mesos.Offer, driver sched.SchedulerDriver)
}
}
// Using first fit to spread large tasks into these offers.
// Using First-Fit to spread small tasks among the given offers.
func (s *BottomHeavy) spread(offers []*mesos.Offer, driver sched.SchedulerDriver) {
for _, offer := range offers {
select {
@ -282,6 +282,7 @@ func (s *BottomHeavy) ResourceOffers(driver sched.SchedulerDriver, offers []*mes
offersLightPowerClasses := []*mesos.Offer{}
for _, offer := range offers {
offerUtils.UpdateEnvironment(offer)
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
@ -292,13 +293,19 @@ func (s *BottomHeavy) ResourceOffers(driver sched.SchedulerDriver, offers []*mes
default:
}
if constants.PowerClasses["A"][*offer.Hostname] ||
constants.PowerClasses["B"][*offer.Hostname] {
if _, ok := constants.PowerClasses["A"][*offer.Hostname]; ok {
offersHeavyPowerClasses = append(offersHeavyPowerClasses, offer)
} else if constants.PowerClasses["C"][*offer.Hostname] ||
constants.PowerClasses["D"][*offer.Hostname] {
}
if _, ok := constants.PowerClasses["B"][*offer.Hostname]; ok {
offersHeavyPowerClasses = append(offersHeavyPowerClasses, offer)
}
if _, ok := constants.PowerClasses["C"][*offer.Hostname]; ok {
offersLightPowerClasses = append(offersLightPowerClasses, offer)
}
if _, ok := constants.PowerClasses["D"][*offer.Hostname]; ok {
offersLightPowerClasses = append(offersLightPowerClasses, offer)
}
}
log.Println("Packing Large tasks into ClassAB offers:")

View file

@ -164,6 +164,7 @@ func (s *BPSWMaxMinWatts) ResourceOffers(driver sched.SchedulerDriver, offers []
log.Printf("Received %d resource offers", len(offers))
for _, offer := range offers {
offerUtils.UpdateEnvironment(offer)
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")

View file

@ -261,6 +261,7 @@ func (s *BPSWMaxMinPistonCapping) ResourceOffers(driver sched.SchedulerDriver, o
log.Printf("Received %d resource offers", len(offers))
for _, offer := range offers {
offerUtils.UpdateEnvironment(offer)
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")

View file

@ -164,7 +164,7 @@ func (s *BPSWMaxMinProacCC) startCapping() {
// updating cap value
bpMaxMinProacCCCapValue = bpMaxMinProacCCNewCapValue
if bpMaxMinProacCCCapValue > 0.0 {
for _, host := range constants.Hosts {
for host, _ := range constants.Hosts {
// Rounding cap value to nearest int
if err := rapl.Cap(host, "rapl", float64(int(math.Floor(bpMaxMinProacCCCapValue+0.5)))); err != nil {
log.Println(err)
@ -190,7 +190,7 @@ func (s *BPSWMaxMinProacCC) startRecapping() {
bpMaxMinProacCCMutex.Lock()
// If stopped performing cluster-wide capping, then we need to recap.
if s.isRecapping && bpMaxMinProacCCRecapValue > 0.0 {
for _, host := range constants.Hosts {
for host, _ := range constants.Hosts {
// Rounding the recap value to the nearest int
if err := rapl.Cap(host, "rapl", float64(int(math.Floor(bpMaxMinProacCCRecapValue+0.5)))); err != nil {
log.Println(err)
@ -300,6 +300,7 @@ func (s *BPSWMaxMinProacCC) ResourceOffers(driver sched.SchedulerDriver, offers
// retrieving the available power for all the hosts in the offers.
for _, offer := range offers {
offerUtils.UpdateEnvironment(offer)
_, _, offerWatts := offerUtils.OfferAgg(offer)
s.availablePower[*offer.Hostname] = offerWatts
// setting total power if the first time

View file

@ -119,6 +119,7 @@ func (s *FirstFit) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.
log.Printf("Received %d resource offers", len(offers))
for _, offer := range offers {
offerUtils.UpdateEnvironment(offer)
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")

View file

@ -164,7 +164,7 @@ func (s *FirstFitProacCC) startCapping() {
// Need to cap the cluster to the fcfsCurrentCapValue.
fcfsMutex.Lock()
if fcfsCurrentCapValue > 0.0 {
for _, host := range constants.Hosts {
for host, _ := range constants.Hosts {
// Rounding curreCapValue to the nearest int.
if err := rapl.Cap(host, "rapl", float64(int(math.Floor(fcfsCurrentCapValue+0.5)))); err != nil {
log.Println(err)
@ -188,7 +188,7 @@ func (s *FirstFitProacCC) startRecapping() {
fcfsMutex.Lock()
// If stopped performing cluster wide capping then we need to explicitly cap the entire cluster.
if s.isRecapping && fcfsRecapValue > 0.0 {
for _, host := range constants.Hosts {
for host, _ := range constants.Hosts {
// Rounding curreCapValue to the nearest int.
if err := rapl.Cap(host, "rapl", float64(int(math.Floor(fcfsRecapValue+0.5)))); err != nil {
log.Println(err)
@ -233,6 +233,7 @@ func (s *FirstFitProacCC) ResourceOffers(driver sched.SchedulerDriver, offers []
// retrieving the available power for all the hosts in the offers.
for _, offer := range offers {
offerUtils.UpdateEnvironment(offer)
_, _, offer_watts := offerUtils.OfferAgg(offer)
s.availablePower[*offer.Hostname] = offer_watts
// setting total power if the first time.

View file

@ -126,6 +126,7 @@ func (s *FirstFitSortedOffers) ResourceOffers(driver sched.SchedulerDriver, offe
log.Println("Sorted Offers:")
for i := 0; i < len(offers); i++ {
offer := offers[i]
offerUtils.UpdateEnvironment(offer)
offerCPU, _, _ := offerUtils.OfferAgg(offer)
log.Printf("Offer[%s].CPU = %f\n", offer.GetHostname(), offerCPU)
}

View file

@ -177,7 +177,7 @@ func (s *FirstFitSortedWattsProacCC) startCapping() {
// Need to cap the cluster to the rankedCurrentCapValue.
rankedMutex.Lock()
if rankedCurrentCapValue > 0.0 {
for _, host := range constants.Hosts {
for host, _ := range constants.Hosts {
// Rounding currentCapValue to the nearest int.
if err := rapl.Cap(host, "rapl", float64(int(math.Floor(rankedCurrentCapValue+0.5)))); err != nil {
log.Println(err)
@ -201,7 +201,7 @@ func (s *FirstFitSortedWattsProacCC) startRecapping() {
rankedMutex.Lock()
// If stopped performing cluster wide capping then we need to explicitly cap the entire cluster.
if s.isRecapping && rankedRecapValue > 0.0 {
for _, host := range constants.Hosts {
for host, _ := range constants.Hosts {
// Rounding currentCapValue to the nearest int.
if err := rapl.Cap(host, "rapl", float64(int(math.Floor(rankedRecapValue+0.5)))); err != nil {
log.Println(err)
@ -246,6 +246,7 @@ func (s *FirstFitSortedWattsProacCC) ResourceOffers(driver sched.SchedulerDriver
// retrieving the available power for all the hosts in the offers.
for _, offer := range offers {
offerUtils.UpdateEnvironment(offer)
_, _, offer_watts := offerUtils.OfferAgg(offer)
s.availablePower[*offer.Hostname] = offer_watts
// setting total power if the first time.

View file

@ -128,6 +128,7 @@ func (s *FirstFitSortedWattsSortedOffers) ResourceOffers(driver sched.SchedulerD
log.Println("Sorted Offers:")
for i := 0; i < len(offers); i++ {
offer := offers[i]
offerUtils.UpdateEnvironment(offer)
offerCPU, _, _ := offerUtils.OfferAgg(offer)
log.Printf("Offer[%s].CPU = %f\n", offer.GetHostname(), offerCPU)
}

View file

@ -122,6 +122,7 @@ func (s *FirstFitSortedWatts) ResourceOffers(driver sched.SchedulerDriver, offer
log.Printf("Received %d resource offers", len(offers))
for _, offer := range offers {
offerUtils.UpdateEnvironment(offer)
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")

View file

@ -112,6 +112,7 @@ func (s *FirstFitWattsOnly) ResourceOffers(driver sched.SchedulerDriver, offers
log.Printf("Received %d resource offers", len(offers))
for _, offer := range offers {
offerUtils.UpdateEnvironment(offer)
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")

View file

@ -18,7 +18,7 @@ func coLocated(tasks map[string]bool) {
// Get the powerClass of the given hostname
func hostToPowerClass(hostName string) string {
for powerClass, hosts := range constants.PowerClasses {
if ok := hosts[hostName]; ok {
if _, ok := hosts[hostName]; ok {
return powerClass
}
}

View file

@ -19,8 +19,8 @@ import (
/*
Tasks are categorized into small and large tasks based on the watts requirement.
All the large tasks are packed into offers from agents belonging to power class A and power class B, using BinPacking.
All the small tasks are spread among the offers from agents belonging to power class C and power class D, using FirstFit.
All the small tasks are packed into offers from agents belonging to power class C and power class D, using BinPacking.
All the large tasks are spread among the offers from agents belonging to power class A and power class B, using FirstFit.
This was done to give a little more room for the large tasks (power intensive) for execution and reduce the possibility of
starvation of power intensive tasks.
@ -281,6 +281,7 @@ func (s *TopHeavy) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.
offersLightPowerClasses := []*mesos.Offer{}
for _, offer := range offers {
offerUtils.UpdateEnvironment(offer)
select {
case <-s.Shutdown:
log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]")
@ -291,11 +292,16 @@ func (s *TopHeavy) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.
default:
}
if constants.PowerClasses["A"][*offer.Hostname] ||
constants.PowerClasses["B"][*offer.Hostname] {
if _, ok := constants.PowerClasses["A"][*offer.Hostname]; ok {
offersHeavyPowerClasses = append(offersHeavyPowerClasses, offer)
} else if constants.PowerClasses["C"][*offer.Hostname] ||
constants.PowerClasses["D"][*offer.Hostname] {
}
if _, ok := constants.PowerClasses["B"][*offer.Hostname]; ok {
offersHeavyPowerClasses = append(offersHeavyPowerClasses, offer)
}
if _, ok := constants.PowerClasses["C"][*offer.Hostname]; ok {
offersLightPowerClasses = append(offersLightPowerClasses, offer)
}
if _, ok := constants.PowerClasses["D"][*offer.Hostname]; ok {
offersLightPowerClasses = append(offersLightPowerClasses, offer)
}
}

View file

@ -1,7 +1,10 @@
package offerUtils
import (
"bitbucket.org/sunybingcloud/electron-archive/utilities/offerUtils"
"bitbucket.org/sunybingcloud/electron/constants"
mesos "github.com/mesos/mesos-go/mesosproto"
"log"
"strings"
)
@ -60,3 +63,26 @@ func HostMismatch(offerHost string, taskHost string) bool {
}
return false
}
// If the host in the offer is a new host, add the host to the set of Hosts and
// register the powerclass of this host.
func UpdateEnvironment(offer *mesos.Offer) {
var host = offer.GetHostname()
// If this host is not present in the set of hosts.
if _, ok := constants.Hosts[host]; !ok {
log.Printf("New host detected. Adding host [%s]", host)
// Add this host.
constants.Hosts[host] = struct{}{}
// Get the power class of this host.
class := offerUtils.PowerClass(offer)
log.Printf("Registering the power class... Host [%s] --> PowerClass [%s]", host, class)
// If new power class, register the power class.
if _, ok := constants.PowerClasses[class]; !ok {
constants.PowerClasses[class] = make(map[string]struct{})
}
// If the host of this class is not yet present in PowerClasses[class], add it.
if _, ok := constants.PowerClasses[class][host]; !ok {
constants.PowerClasses[class][host] = struct{}{}
}
}
}