Added another line that needs to be uncommented to choose cleverRecap.

This commit is contained in:
Pradyumna Kaushik 2016-11-22 17:04:30 -05:00 committed by Renan DelValle
parent a158f4a341
commit 6c858c4e88

View file

@ -82,7 +82,7 @@ func NewProactiveClusterwideCapFCFS(tasks []def.Task, ignoreWatts bool) *Proacti
}
// mutex
var mutex sync.Mutex
var fcfsMutex sync.Mutex
func (s *ProactiveClusterwideCapFCFS) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
@ -154,58 +154,58 @@ func (s *ProactiveClusterwideCapFCFS) Disconnected(sched.SchedulerDriver) {
// Need to stop the capping process.
s.ticker.Stop()
s.recapTicker.Stop()
mutex.Lock()
fcfsMutex.Lock()
s.isCapping = false
mutex.Unlock()
fcfsMutex.Unlock()
log.Println("Framework disconnected with master")
}
// go routine to cap the entire cluster in regular intervals of time.
var currentCapValue = 0.0 // initial value to indicate that we haven't capped the cluster yet.
var fcfsCurrentCapValue = 0.0 // initial value to indicate that we haven't capped the cluster yet.
func (s *ProactiveClusterwideCapFCFS) startCapping() {
go func() {
for {
select {
case <-s.ticker.C:
// Need to cap the cluster to the currentCapValue.
mutex.Lock()
if currentCapValue > 0.0 {
// Need to cap the cluster to the fcfsCurrentCapValue.
fcfsMutex.Lock()
if fcfsCurrentCapValue > 0.0 {
for _, host := range constants.Hosts {
// Rounding curreCapValue to the nearest int.
if err := rapl.Cap(host, "rapl", int(math.Floor(currentCapValue+0.5))); err != nil {
if err := rapl.Cap(host, "rapl", int(math.Floor(fcfsCurrentCapValue+0.5))); err != nil {
log.Println(err)
}
}
log.Printf("Capped the cluster to %d", int(math.Floor(currentCapValue+0.5)))
log.Printf("Capped the cluster to %d", int(math.Floor(fcfsCurrentCapValue+0.5)))
}
mutex.Unlock()
fcfsMutex.Unlock()
}
}
}()
}
// go routine to cap the entire cluster in regular intervals of time.
var recapValue = 0.0 // The cluster wide cap value when recapping.
var fcfsRecapValue = 0.0 // The cluster wide cap value when recapping.
func (s *ProactiveClusterwideCapFCFS) startRecapping() {
go func() {
for {
select {
case <-s.recapTicker.C:
mutex.Lock()
fcfsMutex.Lock()
// If stopped performing cluster wide capping then we need to explicitly cap the entire cluster.
//if !s.isCapping && s.isRecapping && recapValue > 0.0 {
if s.isRecapping && recapValue > 0.0 {
//if !s.isCapping && s.isRecapping && fcfsRecapValue > 0.0 {
if s.isRecapping && fcfsRecapValue > 0.0 {
for _, host := range constants.Hosts {
// Rounding curreCapValue to the nearest int.
if err := rapl.Cap(host, "rapl", int(math.Floor(recapValue+0.5))); err != nil {
if err := rapl.Cap(host, "rapl", int(math.Floor(fcfsRecapValue+0.5))); err != nil {
log.Println(err)
}
}
log.Printf("Recapped the cluster to %d", int(math.Floor(recapValue+0.5)))
log.Printf("Recapped the cluster to %d", int(math.Floor(fcfsRecapValue+0.5)))
}
// setting recapping to false
s.isRecapping = false
mutex.Unlock()
fcfsMutex.Unlock()
}
}
}()
@ -216,10 +216,10 @@ func (s *ProactiveClusterwideCapFCFS) stopCapping() {
if s.isCapping {
log.Println("Stopping the cluster wide capping.")
s.ticker.Stop()
mutex.Lock()
fcfsMutex.Lock()
s.isCapping = false
s.isRecapping = true
mutex.Unlock()
fcfsMutex.Unlock()
}
}
@ -229,9 +229,9 @@ func (s *ProactiveClusterwideCapFCFS) stopRecapping() {
if !s.isCapping && s.isRecapping {
log.Println("Stopping the cluster wide re-capping.")
s.recapTicker.Stop()
mutex.Lock()
fcfsMutex.Lock()
s.isRecapping = false
mutex.Unlock()
fcfsMutex.Unlock()
}
}
@ -270,7 +270,7 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive
For each task in s.tasks,
1. Need to check whether the offer can be taken or not (based on CPU and RAM requirements).
2. If the tasks fits the offer, then I need to detemrine the cluster wide cap.
3. currentCapValue is updated with the determined cluster wide cap.
3. fcfsCurrentCapValue is updated with the determined cluster wide cap.
Cluster wide capping is currently performed at regular intervals of time.
TODO: We can choose to cap the cluster only if the clusterwide cap varies more than the current clusterwide cap.
@ -288,18 +288,18 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive
if s.takeOffer(offer, task) {
// Capping the cluster if haven't yet started,
if !s.isCapping {
mutex.Lock()
fcfsMutex.Lock()
s.isCapping = true
mutex.Unlock()
fcfsMutex.Unlock()
s.startCapping()
}
taken = true
tempCap, err := s.capper.fcfsDetermineCap(s.totalPower, &task)
if err == nil {
mutex.Lock()
currentCapValue = tempCap
mutex.Unlock()
fcfsMutex.Lock()
fcfsCurrentCapValue = tempCap
fcfsMutex.Unlock()
} else {
log.Printf("Failed to determine new cluster wide cap: ")
log.Println(err)
@ -350,21 +350,22 @@ func (s *ProactiveClusterwideCapFCFS) StatusUpdate(driver sched.SchedulerDriver,
s.capper.taskFinished(*status.TaskId.Value)
// Determining the new cluster wide cap.
tempCap, err := s.capper.recap(s.totalPower, s.taskMonitor, *status.TaskId.Value)
//tempCap, err := s.capper.cleverRecap(s.totalPower, s.taskMonitor, *status.TaskId.Value)
if err == nil {
// if new determined cap value is different from the current recap value then we need to recap.
if int(math.Floor(tempCap+0.5)) != int(math.Floor(recapValue+0.5)) {
recapValue = tempCap
mutex.Lock()
if int(math.Floor(tempCap+0.5)) != int(math.Floor(fcfsRecapValue+0.5)) {
fcfsRecapValue = tempCap
fcfsMutex.Lock()
s.isRecapping = true
mutex.Unlock()
log.Printf("Determined re-cap value: %f\n", recapValue)
fcfsMutex.Unlock()
log.Printf("Determined re-cap value: %f\n", fcfsRecapValue)
} else {
mutex.Lock()
fcfsMutex.Lock()
s.isRecapping = false
mutex.Unlock()
fcfsMutex.Unlock()
}
} else {
// Not updating currentCapValue
// Not updating fcfsCurrentCapValue
log.Println(err)
}
@ -372,7 +373,7 @@ func (s *ProactiveClusterwideCapFCFS) StatusUpdate(driver sched.SchedulerDriver,
if s.tasksRunning == 0 {
select {
case <-s.Shutdown:
// Need to stop the capping process.
// Need to stop the recapping process.
s.stopRecapping()
close(s.Done)
default: