diff --git a/schedulers/proactiveclusterwidecappingfcfs.go b/schedulers/proactiveclusterwidecappingfcfs.go index ae116f8..679686a 100644 --- a/schedulers/proactiveclusterwidecappingfcfs.go +++ b/schedulers/proactiveclusterwidecappingfcfs.go @@ -10,12 +10,21 @@ import ( "github.com/mesos/mesos-go/mesosutil" sched "github.com/mesos/mesos-go/scheduler" "log" - "strings" - "sync" - "time" "math" + "strings" + "time" ) +// Decides if to take an offer or not +func (_ *ProactiveClusterwideCapFCFS) takeOffer(offer *mesos.Offer, task def.Task) bool { + offer_cpu, offer_mem, _ := OfferAgg(offer) + + if offer_cpu >= task.CPU && offer_mem >= task.RAM { + return true + } + return false +} + // electronScheduler implements the Scheduler interface. type ProactiveClusterwideCapFCFS struct { tasksCreated int @@ -23,10 +32,14 @@ type ProactiveClusterwideCapFCFS struct { tasks []def.Task metrics map[string]def.Metric running map[string]map[string]bool + taskMonitor map[string][]def.Task // store tasks that are currently running. + availablePower map[string]float64 // available power for each node in the cluster. + totalPower map[string]float64 // total power for each node in the cluster. ignoreWatts bool capper *clusterwideCapper ticker *time.Ticker - isCapping bool + isCapping bool // indicate whether we are currently performing cluster wide capping. + //lock *sync.Mutex // First set of PCP values are garbage values, signal to logger to start recording when we're // about to schedule the new task. @@ -53,10 +66,14 @@ func NewProactiveClusterwideCapFCFS(tasks []def.Task, ignoreWatts bool) *Proacti Done: make(chan struct{}), PCPLog: make(chan struct{}), running: make(map[string]map[string]bool), + taskMonitor: make(map[string][]def.Task), + availablePower: make(map[string]float64), + totalPower: make(map[string]float64), RecordPCP: false, capper: getClusterwideCapperInstance(), - ticker: time.NewTicker(10 * time.Second), + ticker: time.NewTicker(5 * time.Second), isCapping: false, + //lock: new(sync.Mutex), } return s } @@ -81,6 +98,7 @@ func (s *ProactiveClusterwideCapFCFS) newTask(offer *mesos.Offer, task def.Task) task.SetTaskID(*proto.String(taskName)) // Add task to the list of tasks running on the node. s.running[offer.GetSlaveId().GoString()][taskName] = true + s.taskMonitor[offer.GetSlaveId().GoString()] = []def.Task{task} resources := []*mesos.Resource{ mesosutil.NewScalarResource("cpus", task.CPU), @@ -123,51 +141,76 @@ func (s *ProactiveClusterwideCapFCFS) Reregistered(_ sched.SchedulerDriver, mast } func (s *ProactiveClusterwideCapFCFS) Disconnected(sched.SchedulerDriver) { + // Need to stop the capping process. + s.ticker.Stop() + s.isCapping = false log.Println("Framework disconnected with master") } // go routine to cap the entire cluster in regular intervals of time. var currentCapValue = 0.0 // initial value to indicate that we haven't capped the cluster yet. -func (s *ProactiveClusterwideCapFCFS) startCapping(mutex sync.Mutex) { +func (s *ProactiveClusterwideCapFCFS) startCapping() { go func() { for { select { case <- s.ticker.C: // Need to cap the cluster to the currentCapValue. if currentCapValue > 0.0 { - mutex.Lock() + //mutex.Lock() + //s.lock.Lock() for _, host := range constants.Hosts { + // Rounding curreCapValue to the nearest int. if err := rapl.Cap(host, "rapl", int(math.Floor(currentCapValue + 0.5))); err != nil { fmt.Println(err) } else { - fmt.Printf("Successfully capped %s to %d\\% at %\n", host, currentCapValue) + fmt.Printf("Successfully capped %s to %f%\n", host, currentCapValue) } } - mutex.Unlock() + //mutex.Unlock() + //s.lock.Unlock() } } } }() } +// Stop cluster wide capping +func (s *ProactiveClusterwideCapFCFS) stopCapping() { + if s.isCapping { + log.Println("Stopping the cluster wide capping.") + s.ticker.Stop() + s.isCapping = false + } +} + // TODO: Need to reduce the time complexity: looping over offers twice (Possible to do it just once?). func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { log.Printf("Received %d resource offers", len(offers)) // retrieving the available power for all the hosts in the offers. - available_power := make(map[string]float64) for _, offer := range offers { _, _, offer_watts := OfferAgg(offer) - available_power[*offer.Hostname] = offer_watts + s.availablePower[*offer.Hostname] = offer_watts + // setting total power if the first time. + if _, ok := s.totalPower[*offer.Hostname]; !ok { + s.totalPower[*offer.Hostname] = offer_watts + } + } + + for host, tpower := range s.totalPower { + fmt.Printf("TotalPower[%s] = %f\n", host, tpower) + } + for host, apower := range s.availablePower { + fmt.Printf("AvailablePower[%s] = %f\n", host, apower) } for _, offer := range offers { select { case <-s.Shutdown: - log.Println("Done scheduling tasks: declining offerf on [", offer.GetHostname(), "]") + log.Println("Done scheduling tasks: declining offer on [", offer.GetHostname(), "]") driver.DeclineOffer(offer.Id, longFilter) - log.Println("Number og tasks still running: ", s.tasksRunning) + log.Println("Number of tasks still running: ", s.tasksRunning) continue default: } @@ -176,46 +219,64 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive Clusterwide Capping strategy For each task in s.tasks, - 1. I need to check whether the mesos offer can be taken or not (based on CPU and RAM). - 2. If the tasks fits the offer then I need to detemrine the cluster wide cap. - 3. First need to cap the cluster to the determine cap value and then launch the task on the host corresponding to the offer. + 1. Need to check whether the offer can be taken or not (based on CPU and RAM requirements). + 2. If the tasks fits the offer, then I need to detemrine the cluster wide cap. + 3. currentCapValue is updated with the determined cluster wide cap. - Capping the cluster for every task would create a lot of overhead. Hence, clusterwide capping is performed at regular intervals. + Cluster wide capping is currently performed at regular intervals of time. TODO: We can choose to cap the cluster only if the clusterwide cap varies more than the current clusterwide cap. Although this sounds like a better approach, it only works when the resource requirements of neighbouring tasks are similar. */ - offer_cpu, offer_ram, _ := OfferAgg(offer) + //offer_cpu, offer_ram, _ := OfferAgg(offer) taken := false - var mutex sync.Mutex + //var mutex sync.Mutex - // If haven't started cluster wide capping then doing so, - if !s.isCapping { - s.startCapping(mutex) - s.isCapping = true - } - - for _, task := range s.tasks { + for i, task := range s.tasks { // Don't take offer if it doesn't match our task's host requirement. if !strings.HasPrefix(*offer.Hostname, task.Host) { continue } // Does the task fit. - if (s.ignoreWatts || offer_cpu >= task.CPU || offer_ram >= task.RAM) { + if s.takeOffer(offer, task) { + // Capping the cluster if haven't yet started, + if !s.isCapping { + s.startCapping() + s.isCapping = true + } taken = true - mutex.Lock() - tempCap, err := s.capper.fcfsDetermineCap(available_power, &task) + //mutex.Lock() + //s.lock.Lock() + //tempCap, err := s.capper.fcfsDetermineCap(s.availablePower, &task) + tempCap, err := s.capper.fcfsDetermineCap(s.totalPower, &task) + if err == nil { currentCapValue = tempCap } else { - fmt.Printf("Failed to determine cluster wide cap: ") + fmt.Printf("Failed to determine new cluster wide cap: ") fmt.Println(err) } - mutex.Unlock() + //mutex.Unlock() + //s.lock.Unlock() fmt.Printf("Starting on [%s]\n", offer.GetHostname()) to_schedule := []*mesos.TaskInfo{s.newTask(offer, task)} driver.LaunchTasks([]*mesos.OfferID{offer.Id}, to_schedule, defaultFilter) + fmt.Printf("Inst: %d", *task.Instances) + *task.Instances-- + if *task.Instances <= 0 { + // All instances of the task have been scheduled. Need to remove it from the list of tasks to schedule. + s.tasks[i] = s.tasks[len(s.tasks)-1] + s.tasks = s.tasks[:len(s.tasks)-1] + + if len(s.tasks) <= 0 { + log.Println("Done scheduling all tasks") + // Need to stop the cluster wide capping as there aren't any more tasks to schedule. + s.stopCapping() + close(s.Shutdown) + } + } + break // Offer taken, move on. } else { // Task doesn't fit the offer. Move onto the next offer. } @@ -223,7 +284,7 @@ func (s *ProactiveClusterwideCapFCFS) ResourceOffers(driver sched.SchedulerDrive // If no task fit the offer, then declining the offer. if !taken { - fmt.Println("There is not enough resources to launch a task:") + fmt.Printf("There is not enough resources to launch a task on Host: %s\n", offer.GetHostname()) cpus, mem, watts := OfferAgg(offer) log.Printf("\n", cpus, mem, watts) @@ -241,10 +302,17 @@ func (s *ProactiveClusterwideCapFCFS) StatusUpdate(driver sched.SchedulerDriver, delete(s.running[status.GetSlaveId().GoString()], *status.TaskId.Value) // Need to remove the task from the window of tasks. s.capper.taskFinished(*status.TaskId.Value) + //currentCapValue, _ = s.capper.recap(s.availablePower, s.taskMonitor, *status.TaskId.Value) + // Determining the new cluster wide cap. + currentCapValue, _ = s.capper.recap(s.totalPower, s.taskMonitor, *status.TaskId.Value) + log.Printf("Recapping the cluster to %f\n", currentCapValue) + s.tasksRunning-- if s.tasksRunning == 0 { select { case <-s.Shutdown: + // Need to stop the capping process. + s.stopCapping() close(s.Done) default: }