Made sure that capping is done only if a new cap value is computed. Otherwise we don't unnecessarily cap the cluster. This reduces overhead.
This commit is contained in:
parent
59e08f0a1a
commit
95c419785f
1 changed files with 22 additions and 15 deletions
|
@ -1,22 +1,22 @@
|
|||
package schedulers
|
||||
|
||||
import (
|
||||
"bitbucket.org/sunybingcloud/electron/constants"
|
||||
"bitbucket.org/sunybingcloud/electron/def"
|
||||
"bitbucket.org/sunybingcloud/electron/pcp"
|
||||
"bitbucket.org/sunybingcloud/electron/rapl"
|
||||
"fmt"
|
||||
"github.com/golang/protobuf/proto"
|
||||
mesos "github.com/mesos/mesos-go/mesosproto"
|
||||
"github.com/mesos/mesos-go/mesosutil"
|
||||
sched "github.com/mesos/mesos-go/scheduler"
|
||||
"log"
|
||||
"math"
|
||||
"os"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
"bitbucket.org/sunybingcloud/electron/constants"
|
||||
"bitbucket.org/sunybingcloud/electron/rapl"
|
||||
"math"
|
||||
)
|
||||
|
||||
// Decides if to take an offer or not
|
||||
|
@ -156,21 +156,29 @@ func (s *BPMaxMinProacCC) newTask(offer *mesos.Offer, task def.Task) *mesos.Task
|
|||
|
||||
// go routine to cap the entire cluster in regular intervals of time.
|
||||
var bpMaxMinProacCCCapValue = 0.0 // initial value to indicate that we haven't capped the cluster yet.
|
||||
var bpMaxMinProacCCNewCapValue = 0.0 // newly computed cap value
|
||||
func (s *BPMaxMinProacCC) startCapping() {
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-s.ticker.C:
|
||||
// Need to cap the cluster to the bpMaxMinProacCCCapValue
|
||||
// Need to cap the cluster only if new cap value different from old cap value.
|
||||
// This way we don't unnecessarily cap the cluster.
|
||||
bpMaxMinProacCCMutex.Lock()
|
||||
if bpMaxMinProacCCCapValue > 0.0 {
|
||||
for _, host := range constants.Hosts {
|
||||
// Rounding the cap value to the nearest int
|
||||
if err := rapl.Cap(host, "rapl", int(math.Floor(bpMaxMinProacCCCapValue+0.5))); err != nil {
|
||||
log.Println(err)
|
||||
if s.isCapping {
|
||||
if int(math.Floor(bpMaxMinProacCCNewCapValue+0.5)) != int(math.Floor(bpMaxMinProacCCCapValue+0.5)) {
|
||||
// updating cap value
|
||||
bpMaxMinProacCCCapValue = bpMaxMinProacCCNewCapValue
|
||||
if bpMaxMinProacCCCapValue > 0.0 {
|
||||
for _, host := range constants.Hosts {
|
||||
// Rounding cap value to nearest int
|
||||
if err := rapl.Cap(host, "rapl", int(math.Floor(bpMaxMinProacCCCapValue+0.5))); err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
}
|
||||
log.Printf("Capped the cluster to %d", int(math.Floor(bpMaxMinProacCCCapValue+0.5)))
|
||||
}
|
||||
}
|
||||
log.Printf("Capped the cluster to %d", int(math.Floor(bpMaxMinProacCCCapValue+0.5)))
|
||||
}
|
||||
bpMaxMinProacCCMutex.Unlock()
|
||||
}
|
||||
|
@ -256,7 +264,7 @@ func (s *BPMaxMinProacCC) CheckFit(i int,
|
|||
tempCap, err := s.capper.FCFSDeterminedCap(s.totalPower, &task)
|
||||
if err == nil {
|
||||
bpMaxMinProacCCMutex.Lock()
|
||||
bpMaxMinProacCCCapValue = tempCap
|
||||
bpMaxMinProacCCNewCapValue = tempCap
|
||||
bpMaxMinProacCCMutex.Unlock()
|
||||
} else {
|
||||
log.Println("Failed to determine new cluster-wide cap:")
|
||||
|
@ -334,7 +342,7 @@ func (s *BPMaxMinProacCC) ResourceOffers(driver sched.SchedulerDriver, offers []
|
|||
|
||||
// Attempt to schedule a single instance of the heaviest workload available first
|
||||
// Start from the back until one fits
|
||||
for i:= len(s.tasks)-1; i >= 0; i-- {
|
||||
for i := len(s.tasks) - 1; i >= 0; i-- {
|
||||
|
||||
task := s.tasks[i]
|
||||
// Check host if it exists
|
||||
|
@ -379,7 +387,6 @@ func (s *BPMaxMinProacCC) ResourceOffers(driver sched.SchedulerDriver, offers []
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
if offerTaken {
|
||||
log.Printf("Starting on [%s]\n", offer.GetHostname())
|
||||
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter)
|
||||
|
@ -405,8 +412,8 @@ func (s *BPMaxMinProacCC) StatusUpdate(driver sched.SchedulerDriver, status *mes
|
|||
// Need to remove the task from the window
|
||||
s.capper.TaskFinished(*status.TaskId.Value)
|
||||
// Determining the new cluster wide recap value
|
||||
//tempCap, err := s.capper.Recap(s.totalPower, s.taskMonitor, *status.TaskId.Value)
|
||||
tempCap, err := s.capper.CleverRecap(s.totalPower, s.taskMonitor, *status.TaskId.Value)
|
||||
tempCap, err := s.capper.Recap(s.totalPower, s.taskMonitor, *status.TaskId.Value)
|
||||
//tempCap, err := s.capper.CleverRecap(s.totalPower, s.taskMonitor, *status.TaskId.Value)
|
||||
if err == nil {
|
||||
// If new determined recap value is different from the current recap value, then we need to recap.
|
||||
if int(math.Floor(tempCap+0.5)) != int(math.Floor(bpMaxMinProacCCRecapValue+0.5)) {
|
||||
|
|
Reference in a new issue