Added another line that needs to be uncommented to choose cleverRecap.
This commit is contained in:
parent
ec4f4e0f03
commit
4bc81707e0
1 changed files with 37 additions and 36 deletions
|
@ -82,7 +82,7 @@ func NewProactiveClusterwideCapFCFS(tasks []def.Task, ignoreWatts bool) *Proacti
|
|||
}
|
||||
|
||||
// mutex
|
||||
var mutex sync.Mutex
|
||||
var fcfsMutex sync.Mutex
|
||||
|
||||
func (s *ProactiveClusterwideCapFCFS) newTask(offer *mesos.Offer, task def.Task) *mesos.TaskInfo {
|
||||
taskName := fmt.Sprintf("%s-%d", task.Name, *task.Instances)
|
||||
|
@ -154,58 +154,58 @@ func (s *ProactiveClusterwideCapFCFS) Disconnected(sched.SchedulerDriver) {
|
|||
// Need to stop the capping process.
|
||||
s.ticker.Stop()
|
||||
s.recapTicker.Stop()
|
||||
mutex.Lock()
|
||||
fcfsMutex.Lock()
|
||||
s.isCapping = false
|
||||
mutex.Unlock()
|
||||
fcfsMutex.Unlock()
|
||||
log.Println("Framework disconnected with master")
|
||||
}
|
||||
|
||||
// go routine to cap the entire cluster in regular intervals of time.
|
||||
var currentCapValue = 0.0 // initial value to indicate that we haven't capped the cluster yet.
|
||||
var fcfsCurrentCapValue = 0.0 // initial value to indicate that we haven't capped the cluster yet.
|
||||
func (s *ProactiveClusterwideCapFCFS) startCapping() {
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-s.ticker.C:
|
||||
// Need to cap the cluster to the currentCapValue.
|
||||
mutex.Lock()
|
||||
if currentCapValue > 0.0 {
|
||||
// Need to cap the cluster to the fcfsCurrentCapValue.
|
||||
fcfsMutex.Lock()
|
||||
if fcfsCurrentCapValue > 0.0 {
|
||||
for _, host := range constants.Hosts {
|
||||
// Rounding curreCapValue to the nearest int.
|
||||
if err := rapl.Cap(host, "rapl", int(math.Floor(currentCapValue+0.5))); err != nil {
|
||||
if err := rapl.Cap(host, "rapl", int(math.Floor(fcfsCurrentCapValue+0.5))); err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
}
|
||||
log.Printf("Capped the cluster to %d", int(math.Floor(currentCapValue+0.5)))
|
||||
log.Printf("Capped the cluster to %d", int(math.Floor(fcfsCurrentCapValue+0.5)))
|
||||
}
|
||||
mutex.Unlock()
|
||||
fcfsMutex.Unlock()
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// go routine to cap the entire cluster in regular intervals of time.
|
||||
var recapValue = 0.0 // The cluster wide cap value when recapping.
|
||||
var fcfsRecapValue = 0.0 // The cluster wide cap value when recapping.
|
||||
func (s *ProactiveClusterwideCapFCFS) startRecapping() {
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-s.recapTicker.C:
|
||||
mutex.Lock()
|
||||
fcfsMutex.Lock()
|
||||
// If stopped performing cluster wide capping then we need to explicitly cap the entire cluster.
|
||||
//if !s.isCapping && s.isRecapping && recapValue > 0.0 {
|
||||
if s.isRecapping && recapValue > 0.0 {
|
||||
//if !s.isCapping && s.isRecapping && fcfsRecapValue > 0.0 {
|
||||
if s.isRecapping && fcfsRecapValue > 0.0 {
|
||||
for _, host := range constants.Hosts {
|
||||
// Rounding curreCapValue to the nearest int.
|
||||
if err := rapl.Cap(host, "rapl", int(math.Floor(recapValue+0.5))); err != nil {
|
||||
if err := rapl.Cap(host, "rapl", int(math.Floor(fcfsRecapValue+0.5))); err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
}
|
||||
log.Printf("Recapped the cluster to %d", int(math.Floor(recapValue+0.5)))
|
||||
log.Printf("Recapped the cluster to %d", int(math.Floor(fcfsRecapValue+0.5)))
|
||||
}
|
||||
// setting recapping to false
|
||||
s.isRecapping = false
|
||||
mutex.Unlock()
|
||||
fcfsMutex.Unlock()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
@ -216,10 +216,10 @@ func (s *ProactiveClusterwideCapFCFS) stopCapping() {
|
|||
if s.isCapping {
|
||||
log.Println("Stopping the cluster wide capping.")
|
||||
s.ticker.Stop()
|
||||
mutex.Lock()
|
||||
fcfsMutex.Lock()
|
||||
s.isCapping = false
|
||||
s.isRecapping = true
|
||||
mutex.Unlock()
|
||||
fcfsMutex.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -229,9 +229,9 @@ func (s *ProactiveClusterwideCapFCFS) stopRecapping() {
|
|||
if !s.isCapping && s.isRecapping {
|
||||
log.Println("Stopping the cluster wide re-capping.")
|
||||
s.recapTicker.Stop()
|
||||
mutex.Lock()
|
||||
fcfsMutex.Lock()
|
||||
s.isRecapping = false
|
||||
mutex.Unlock()
|
||||
fcfsMutex.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -270,7 +270,7 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive
|
|||
For each task in s.tasks,
|
||||
1. Need to check whether the offer can be taken or not (based on CPU and RAM requirements).
|
||||
2. If the tasks fits the offer, then I need to detemrine the cluster wide cap.
|
||||
3. currentCapValue is updated with the determined cluster wide cap.
|
||||
3. fcfsCurrentCapValue is updated with the determined cluster wide cap.
|
||||
|
||||
Cluster wide capping is currently performed at regular intervals of time.
|
||||
TODO: We can choose to cap the cluster only if the clusterwide cap varies more than the current clusterwide cap.
|
||||
|
@ -288,18 +288,18 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive
|
|||
if s.takeOffer(offer, task) {
|
||||
// Capping the cluster if haven't yet started,
|
||||
if !s.isCapping {
|
||||
mutex.Lock()
|
||||
fcfsMutex.Lock()
|
||||
s.isCapping = true
|
||||
mutex.Unlock()
|
||||
fcfsMutex.Unlock()
|
||||
s.startCapping()
|
||||
}
|
||||
taken = true
|
||||
tempCap, err := s.capper.fcfsDetermineCap(s.totalPower, &task)
|
||||
|
||||
if err == nil {
|
||||
mutex.Lock()
|
||||
currentCapValue = tempCap
|
||||
mutex.Unlock()
|
||||
fcfsMutex.Lock()
|
||||
fcfsCurrentCapValue = tempCap
|
||||
fcfsMutex.Unlock()
|
||||
} else {
|
||||
log.Printf("Failed to determine new cluster wide cap: ")
|
||||
log.Println(err)
|
||||
|
@ -350,21 +350,22 @@ func (s *ProactiveClusterwideCapFCFS) StatusUpdate(driver sched.SchedulerDriver,
|
|||
s.capper.taskFinished(*status.TaskId.Value)
|
||||
// Determining the new cluster wide cap.
|
||||
tempCap, err := s.capper.recap(s.totalPower, s.taskMonitor, *status.TaskId.Value)
|
||||
//tempCap, err := s.capper.cleverRecap(s.totalPower, s.taskMonitor, *status.TaskId.Value)
|
||||
if err == nil {
|
||||
// if new determined cap value is different from the current recap value then we need to recap.
|
||||
if int(math.Floor(tempCap+0.5)) != int(math.Floor(recapValue+0.5)) {
|
||||
recapValue = tempCap
|
||||
mutex.Lock()
|
||||
if int(math.Floor(tempCap+0.5)) != int(math.Floor(fcfsRecapValue+0.5)) {
|
||||
fcfsRecapValue = tempCap
|
||||
fcfsMutex.Lock()
|
||||
s.isRecapping = true
|
||||
mutex.Unlock()
|
||||
log.Printf("Determined re-cap value: %f\n", recapValue)
|
||||
fcfsMutex.Unlock()
|
||||
log.Printf("Determined re-cap value: %f\n", fcfsRecapValue)
|
||||
} else {
|
||||
mutex.Lock()
|
||||
fcfsMutex.Lock()
|
||||
s.isRecapping = false
|
||||
mutex.Unlock()
|
||||
fcfsMutex.Unlock()
|
||||
}
|
||||
} else {
|
||||
// Not updating currentCapValue
|
||||
// Not updating fcfsCurrentCapValue
|
||||
log.Println(err)
|
||||
}
|
||||
|
||||
|
@ -372,7 +373,7 @@ func (s *ProactiveClusterwideCapFCFS) StatusUpdate(driver sched.SchedulerDriver,
|
|||
if s.tasksRunning == 0 {
|
||||
select {
|
||||
case <-s.Shutdown:
|
||||
// Need to stop the capping process.
|
||||
// Need to stop the recapping process.
|
||||
s.stopRecapping()
|
||||
close(s.Done)
|
||||
default:
|
||||
|
|
Reference in a new issue