Made sure that the capping happens only if a different cap value has been determined. Otherwise we don't unncessarily cap again. This reduces overhead.
This commit is contained in:
parent
d9f626eefb
commit
2744bbe0f4
1 changed files with 47 additions and 17 deletions
|
@ -165,22 +165,52 @@ func (s *BPSWClassMapWattsProacCC) Disconnected(sched.SchedulerDriver) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// go routine to cap the entire cluster in regular intervals of time.
|
// go routine to cap the entire cluster in regular intervals of time.
|
||||||
var bpswClassMapWattsCapValue = 0.0 // initial value to indicate that we haven't capped the cluster yet.
|
var bpswClassMapWattsProacCCCapValue = 0.0 // initial value to indicate that we haven't capped the cluster yet.
|
||||||
|
var bpswClassMapWattsProacCCNewCapValue = 0.0 // newly computed cap value
|
||||||
|
//func (s *BPSWClassMapWattsProacCC) startCapping() {
|
||||||
|
// go func() {
|
||||||
|
// for {
|
||||||
|
// select {
|
||||||
|
// case <-s.ticker.C:
|
||||||
|
// // Need to cap the cluster to the bpswClassMapWattsCapValue.
|
||||||
|
// bpswClassMapWattsProacCCMutex.Lock()
|
||||||
|
// if s.isCapping && bpswClassMapWattsProacCCCapValue > 0.0 {
|
||||||
|
// for _, host := range constants.Hosts {
|
||||||
|
// // Rounding capValue to nearest int.
|
||||||
|
// if err := rapl.Cap(host, "rapl", int(math.Floor(bpswClassMapWattsProacCCCapValue +0.5))); err != nil {
|
||||||
|
// log.Println(err)
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// log.Printf("Capped the cluster to %d", int(math.Floor(bpswClassMapWattsProacCCCapValue +0.5)))
|
||||||
|
// }
|
||||||
|
// bpswClassMapWattsProacCCMutex.Unlock()
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// }()
|
||||||
|
//}
|
||||||
|
|
||||||
func (s *BPSWClassMapWattsProacCC) startCapping() {
|
func (s *BPSWClassMapWattsProacCC) startCapping() {
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-s.ticker.C:
|
case <-s.ticker.C:
|
||||||
// Need to cap the cluster to the bpswClassMapWattsCapValue.
|
// Need to cap the cluster only if new cap value different from old cap value.
|
||||||
|
// This way we don't unnecessarily cap the cluster.
|
||||||
bpswClassMapWattsProacCCMutex.Lock()
|
bpswClassMapWattsProacCCMutex.Lock()
|
||||||
if bpswClassMapWattsCapValue > 0.0 {
|
if s.isCapping {
|
||||||
for _, host := range constants.Hosts {
|
if int(math.Floor(bpswClassMapWattsProacCCNewCapValue +0.5)) != int(math.Floor(bpswClassMapWattsProacCCCapValue +0.5)) {
|
||||||
// Rounding capValue to nearest int.
|
// updating cap value
|
||||||
if err := rapl.Cap(host, "rapl", int(math.Floor(bpswClassMapWattsCapValue+0.5))); err != nil {
|
bpswClassMapWattsProacCCCapValue = bpswClassMapWattsProacCCNewCapValue
|
||||||
log.Println(err)
|
if bpswClassMapWattsProacCCCapValue > 0.0 {
|
||||||
|
for _, host := range constants.Hosts {
|
||||||
|
// Rounding cap value to nearest int
|
||||||
|
if err := rapl.Cap(host, "rapl", int(math.Floor(bpswClassMapWattsProacCCCapValue+0.5))); err != nil {
|
||||||
|
log.Println(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.Printf("Capped the cluster to %d", int(math.Floor(bpswClassMapWattsProacCCCapValue+0.5)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.Printf("Capped the cluster to %d", int(math.Floor(bpswClassMapWattsCapValue+0.5)))
|
|
||||||
}
|
}
|
||||||
bpswClassMapWattsProacCCMutex.Unlock()
|
bpswClassMapWattsProacCCMutex.Unlock()
|
||||||
}
|
}
|
||||||
|
@ -189,7 +219,7 @@ func (s *BPSWClassMapWattsProacCC) startCapping() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// go routine to recap the entire cluster in regular intervals of time.
|
// go routine to recap the entire cluster in regular intervals of time.
|
||||||
var bpswClassMapWattsRecapValue = 0.0 // The cluster-wide cap value when recapping
|
var bpswClassMapWattsProacCCRecapValue = 0.0 // The cluster-wide cap value when recapping
|
||||||
func (s *BPSWClassMapWattsProacCC) startRecapping() {
|
func (s *BPSWClassMapWattsProacCC) startRecapping() {
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
|
@ -197,14 +227,14 @@ func (s *BPSWClassMapWattsProacCC) startRecapping() {
|
||||||
case <-s.recapTicker.C:
|
case <-s.recapTicker.C:
|
||||||
bpswClassMapWattsProacCCMutex.Lock()
|
bpswClassMapWattsProacCCMutex.Lock()
|
||||||
// If stopped performing cluster wide capping, then we need to recap
|
// If stopped performing cluster wide capping, then we need to recap
|
||||||
if s.isRecapping && bpswClassMapWattsRecapValue > 0.0 {
|
if s.isRecapping && bpswClassMapWattsProacCCRecapValue > 0.0 {
|
||||||
for _, host := range constants.Hosts {
|
for _, host := range constants.Hosts {
|
||||||
// Rounding capValue to the nearest int
|
// Rounding capValue to the nearest int
|
||||||
if err := rapl.Cap(host, "rapl", int(math.Floor(bpswClassMapWattsRecapValue+0.5))); err != nil {
|
if err := rapl.Cap(host, "rapl", int(math.Floor(bpswClassMapWattsProacCCRecapValue +0.5))); err != nil {
|
||||||
log.Println(err)
|
log.Println(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.Printf("Recapping the cluster to %d", int(math.Floor(bpswClassMapWattsRecapValue+0.5)))
|
log.Printf("Recapping the cluster to %d", int(math.Floor(bpswClassMapWattsProacCCRecapValue +0.5)))
|
||||||
}
|
}
|
||||||
// Setting recapping to false
|
// Setting recapping to false
|
||||||
s.isRecapping = false
|
s.isRecapping = false
|
||||||
|
@ -309,7 +339,7 @@ func (s *BPSWClassMapWattsProacCC) ResourceOffers(driver sched.SchedulerDriver,
|
||||||
tempCap, err := s.capper.FCFSDeterminedCap(s.totalPower, &task)
|
tempCap, err := s.capper.FCFSDeterminedCap(s.totalPower, &task)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
bpswClassMapWattsProacCCMutex.Lock()
|
bpswClassMapWattsProacCCMutex.Lock()
|
||||||
bpswClassMapWattsCapValue = tempCap
|
bpswClassMapWattsProacCCNewCapValue = tempCap
|
||||||
bpswClassMapWattsProacCCMutex.Unlock()
|
bpswClassMapWattsProacCCMutex.Unlock()
|
||||||
} else {
|
} else {
|
||||||
log.Println("Failed to determine new cluster-wide cap:")
|
log.Println("Failed to determine new cluster-wide cap:")
|
||||||
|
@ -370,16 +400,16 @@ func (s *BPSWClassMapWattsProacCC) StatusUpdate(driver sched.SchedulerDriver, st
|
||||||
// Need to remove the task from the window
|
// Need to remove the task from the window
|
||||||
s.capper.TaskFinished(*status.TaskId.Value)
|
s.capper.TaskFinished(*status.TaskId.Value)
|
||||||
// Determining the new cluster wide recap value
|
// Determining the new cluster wide recap value
|
||||||
|
//tempCap, err := s.capper.Recap(s.totalPower, s.taskMonitor, *status.TaskId.Value)
|
||||||
tempCap, err := s.capper.CleverRecap(s.totalPower, s.taskMonitor, *status.TaskId.Value)
|
tempCap, err := s.capper.CleverRecap(s.totalPower, s.taskMonitor, *status.TaskId.Value)
|
||||||
//tempCap, err := s.capper.CleverRecap(s.totalPower, s.taskMonitor, *status.TaskId.Value)
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
// If new determined cap value is different from the current recap value, then we need to recap
|
// If new determined cap value is different from the current recap value, then we need to recap
|
||||||
if int(math.Floor(tempCap+0.5)) != int(math.Floor(bpswClassMapWattsRecapValue+0.5)) {
|
if int(math.Floor(tempCap+0.5)) != int(math.Floor(bpswClassMapWattsProacCCRecapValue +0.5)) {
|
||||||
bpswClassMapWattsRecapValue = tempCap
|
bpswClassMapWattsProacCCRecapValue = tempCap
|
||||||
bpswClassMapWattsProacCCMutex.Lock()
|
bpswClassMapWattsProacCCMutex.Lock()
|
||||||
s.isRecapping = true
|
s.isRecapping = true
|
||||||
bpswClassMapWattsProacCCMutex.Unlock()
|
bpswClassMapWattsProacCCMutex.Unlock()
|
||||||
log.Printf("Determined re-cap value: %f\n", bpswClassMapWattsRecapValue)
|
log.Printf("Determined re-cap value: %f\n", bpswClassMapWattsProacCCRecapValue)
|
||||||
} else {
|
} else {
|
||||||
bpswClassMapWattsProacCCMutex.Lock()
|
bpswClassMapWattsProacCCMutex.Lock()
|
||||||
s.isRecapping = false
|
s.isRecapping = false
|
||||||
|
|
Reference in a new issue