From ae811251102a9775056aac510c98bfc04d228f14 Mon Sep 17 00:00:00 2001
From: Pradyumna Kaushik <pkaushi1@binghamton.edu>
Date: Tue, 17 Apr 2018 20:09:35 +0000
Subject: [PATCH] Merged in mapTaskDistrToSchedPolWhenSwitching (pull request
 #11)

MapTaskDistrToSchedPolWhenSwitching

Approved-by: Akash Kothawale <akothaw1@binghamton.edu>
---
 def/taskUtils.go                   | 92 ++++++++++++++++++++++++----
 scheduler.go                       |  5 +-
 schedulers/MaxGreedyMins.go        | 15 ++---
 schedulers/MaxMin.go               |  5 --
 schedulers/base.go                 | 24 ++++----
 schedulers/bin-packing.go          |  7 +--
 schedulers/first-fit.go            |  5 --
 schedulers/helpers.go              | 17 ++++++
 schedulers/schedPolicy.go          | 96 +++++++++++++++++++++++++-----
 schedulers/store.go                | 36 +++++++++++
 utilities/schedUtils/schedUtils.go | 18 ++++++
 11 files changed, 255 insertions(+), 65 deletions(-)

diff --git a/def/taskUtils.go b/def/taskUtils.go
index 71ae80b..f239bc6 100644
--- a/def/taskUtils.go
+++ b/def/taskUtils.go
@@ -4,6 +4,7 @@ import (
 	"errors"
 	"fmt"
 	"github.com/mash/gokmeans"
+	"github.com/montanaflynn/stats"
 	"log"
 	"sort"
 )
@@ -69,16 +70,31 @@ func getObservations(tasks []Task, taskObservation func(task Task) []float64) []
 	return observations
 }
 
-// Size tasks based on the power consumption.
-// TODO: Size the cluster in a better way other than just taking an aggregate of the watts resource requirement.
-func clusterSize(tasks []Task, taskObservation func(task Task) []float64) float64 {
-	size := 0.0
+// Sizing each task cluster using the average MMMPU requirement of the task in the cluster.
+func clusterSizeAvgMMMPU(tasks []Task, taskObservation func(task Task) []float64) float64 {
+	mmmpuValues := []float64{}
+	// Total sum of the Median of Median Max Power Usage values for all tasks.
+	total := 0.0
 	for _, task := range tasks {
-		for _, observation := range taskObservation(task) {
-			size += observation
+		observations := taskObservation(task)
+		if len(observations) > 0 {
+			// taskObservation would give us the mmpu values. We would need to take the median of these
+			// values to obtain the Median of Median Max Power Usage value.
+			if medianValue, err := stats.Median(observations); err == nil {
+				mmmpuValues = append(mmmpuValues, medianValue)
+				total += medianValue
+			} else {
+				// skip this value
+				// there is an error in the task config.
+				log.Println(err)
+			}
+		} else {
+			// There is only one observation for the task.
+			mmmpuValues = append(mmmpuValues, observations[0])
 		}
 	}
-	return size
+
+	return total / float64(len(mmmpuValues))
 }
 
 // Order clusters in increasing order of task heaviness.
@@ -96,12 +112,12 @@ func labelAndOrder(clusters map[int][]Task, numberOfClusters int, taskObservatio
 	}
 
 	for i := 0; i < numberOfClusters-1; i++ {
-		// Sizing the current cluster.
-		sizeI := clusterSize(clusters[i], taskObservation)
+		// Sizing the current cluster based on average Median of Median Max Power Usage of tasks.
+		sizeI := clusterSizeAvgMMMPU(clusters[i], taskObservation)
 
 		// Comparing with the other clusters.
 		for j := i + 1; j < numberOfClusters; j++ {
-			sizeJ := clusterSize(clusters[j], taskObservation)
+			sizeJ := clusterSizeAvgMMMPU(clusters[j], taskObservation)
 			if sizeI > sizeJ {
 				sizedClusters[i].SizeScore++
 			} else {
@@ -159,3 +175,59 @@ func GetResourceRequirement(taskID string) (TaskResources, error) {
 		return TaskResources{}, errors.New("Invalid TaskID: " + taskID)
 	}
 }
+
+// Determine the distribution of light power consuming and heavy power consuming tasks in a given window.
+func GetTaskDistributionInWindow(windowSize int, tasks []Task) (float64, error) {
+	getTotalInstances := func(ts []Task, taskExceedingWindow struct {
+		taskName       string
+		instsToDiscard int
+	}) int {
+		total := 0
+		for _, t := range ts {
+			if t.Name == taskExceedingWindow.taskName {
+				total += (*t.Instances - taskExceedingWindow.instsToDiscard)
+				continue
+			}
+			total += *t.Instances
+		}
+		return total
+	}
+
+	getTasksInWindow := func() (tasksInWindow []Task, taskExceedingWindow struct {
+		taskName       string
+		instsToDiscard int
+	}) {
+		tasksTraversed := 0
+		// Name of task, only few instances of which fall within the window.
+		lastTaskName := ""
+		for _, task := range tasks {
+			tasksInWindow = append(tasksInWindow, task)
+			tasksTraversed += *task.Instances
+			lastTaskName = task.Name
+			if tasksTraversed >= windowSize {
+				taskExceedingWindow.taskName = lastTaskName
+				taskExceedingWindow.instsToDiscard = tasksTraversed - windowSize
+				break
+			}
+		}
+
+		return
+	}
+
+	// Retrieving the tasks that are in the window.
+	tasksInWIndow, taskExceedingWindow := getTasksInWindow()
+	// Classifying the tasks based on Median of Median Max Power Usage values.
+	taskClusters := ClassifyTasks(tasksInWIndow, 2)
+	// First we'll need to check if the tasks in the window could be classified into 2 clusters.
+	// If yes, then we proceed with determining the distribution.
+	// Else, we throw an error stating that the distribution is even as only one cluster could be formed.
+	if len(taskClusters[1].Tasks) == 0 {
+		return -1.0, errors.New("Only one cluster could be formed.")
+	}
+
+	// The first cluster would corresponding to the light power consuming tasks.
+	// The second cluster would corresponding to the high power consuming tasks.
+	lpcTasksTotalInst := getTotalInstances(taskClusters[0].Tasks, taskExceedingWindow)
+	hpcTasksTotalInst := getTotalInstances(taskClusters[1].Tasks, taskExceedingWindow)
+	return float64(lpcTasksTotalInst) / float64(hpcTasksTotalInst), nil
+}
diff --git a/scheduler.go b/scheduler.go
index c532ae4..14eef78 100644
--- a/scheduler.go
+++ b/scheduler.go
@@ -28,6 +28,7 @@ var schedPolicyName = flag.String("schedPolicy", "first-fit", "Name of the sched
 var listSchedPolicies = flag.Bool("listSchedPolicies", false, "List the names of the pluaggable scheduling policies.")
 var enableSchedPolicySwitch = flag.Bool("switchSchedPolicy", false, "Enable switching of scheduling policies at runtime.")
 var schedPolConfigFile = flag.String("schedPolConfig", "", "Config file that contains information for each scheduling policy.")
+var fixFirstSchedPol = flag.String("fixFirstSchedPol", "", "Name of the scheduling policy to be deployed first, regardless of the distribution of tasks, provided switching is enabled.")
 
 // Short hand args
 func init() {
@@ -42,6 +43,7 @@ func init() {
 	flag.BoolVar(listSchedPolicies, "lsp", false, "Names of the pluaggable scheduling policies. (shorthand)")
 	flag.BoolVar(enableSchedPolicySwitch, "ssp", false, "Enable switching of scheduling policies at runtime.")
 	flag.StringVar(schedPolConfigFile, "spConfig", "", "Config file that contains information for each scheduling policy (shorthand).")
+	flag.StringVar(fixFirstSchedPol, "fxFstSchedPol", "", "Name of the schedulin gpolicy to be deployed first, regardless of the distribution of tasks, provided switching is enabled (shorthand).")
 }
 
 func listAllSchedulingPolicies() {
@@ -135,7 +137,8 @@ func main() {
 		schedulers.WithDone(done),
 		schedulers.WithPCPLog(pcpLog),
 		schedulers.WithLoggingChannels(logMType, logMsg),
-		schedulers.WithSchedPolSwitchEnabled(*enableSchedPolicySwitch))
+		schedulers.WithSchedPolSwitchEnabled(*enableSchedPolicySwitch),
+		schedulers.WithNameOfFirstSchedPolToFix(*fixFirstSchedPol))
 	driver, err := sched.NewMesosSchedulerDriver(sched.DriverConfig{
 		Master: *master,
 		Framework: &mesos.FrameworkInfo{
diff --git a/schedulers/MaxGreedyMins.go b/schedulers/MaxGreedyMins.go
index aeef812..310cb42 100644
--- a/schedulers/MaxGreedyMins.go
+++ b/schedulers/MaxGreedyMins.go
@@ -77,7 +77,6 @@ func (s *MaxGreedyMins) CheckFit(
 }
 
 func (s *MaxGreedyMins) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDriver, offers []*mesos.Offer) {
-	log.Println("Max-GreedyMins scheduling...")
 	baseSchedRef := spc.(*BaseScheduler)
 	if baseSchedRef.schedPolSwitchEnabled {
 		SortNTasks(baseSchedRef.tasks, baseSchedRef.numTasksInSchedWindow, def.SortByWatts)
@@ -112,8 +111,6 @@ func (s *MaxGreedyMins) ConsumeOffers(spc SchedPolicyContext, driver sched.Sched
 			// If scheduling policy switching enabled, then
 			// stop scheduling if the #baseSchedRef.schedWindowSize tasks have been scheduled.
 			if baseSchedRef.schedPolSwitchEnabled && (s.numTasksScheduled >= baseSchedRef.schedWindowSize) {
-				log.Printf("Stopped scheduling... Completed scheduling %d tasks.",
-					s.numTasksScheduled)
 				break // Offers will automatically get declined.
 			}
 			task := baseSchedRef.tasks[i]
@@ -141,11 +138,6 @@ func (s *MaxGreedyMins) ConsumeOffers(spc SchedPolicyContext, driver sched.Sched
 
 		// Pack the rest of the offer with the smallest tasks
 		for i := 0; i < len(baseSchedRef.tasks); i++ {
-			// If scheduling policy switching enabled, then
-			// stop scheduling if the #baseSchedRef.schedWindowSize tasks have been scheduled.
-			if baseSchedRef.schedPolSwitchEnabled && (s.numTasksScheduled >= baseSchedRef.schedWindowSize) {
-				break // Offers will automatically get declined.
-			}
 			task := baseSchedRef.tasks[i]
 			wattsConsideration, err := def.WattsToConsider(task, baseSchedRef.classMapWatts, offer)
 			if err != nil {
@@ -159,6 +151,11 @@ func (s *MaxGreedyMins) ConsumeOffers(spc SchedPolicyContext, driver sched.Sched
 			}
 
 			for *task.Instances > 0 {
+				// If scheduling policy switching enabled, then
+				// stop scheduling if the #baseSchedRef.schedWindowSize tasks have been scheduled.
+				if baseSchedRef.schedPolSwitchEnabled && (s.numTasksScheduled >= baseSchedRef.schedWindowSize) {
+					break // Offers will automatically get declined.
+				}
 				// TODO: Fix this so index doesn't need to be passed
 				taken, taskToSchedule := s.CheckFit(spc, i, task, wattsConsideration, offer,
 					&totalCPU, &totalRAM, &totalWatts)
@@ -183,6 +180,4 @@ func (s *MaxGreedyMins) ConsumeOffers(spc SchedPolicyContext, driver sched.Sched
 			driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
 		}
 	}
-
-	s.switchIfNecessary(spc)
 }
diff --git a/schedulers/MaxMin.go b/schedulers/MaxMin.go
index 841c96d..672124f 100644
--- a/schedulers/MaxMin.go
+++ b/schedulers/MaxMin.go
@@ -76,7 +76,6 @@ func (s *MaxMin) CheckFit(
 }
 
 func (s *MaxMin) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDriver, offers []*mesos.Offer) {
-	log.Println("Max-Min scheduling...")
 	baseSchedRef := spc.(*BaseScheduler)
 	if baseSchedRef.schedPolSwitchEnabled {
 		SortNTasks(baseSchedRef.tasks, baseSchedRef.numTasksInSchedWindow, def.SortByWatts)
@@ -116,8 +115,6 @@ func (s *MaxMin) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDri
 			// stop scheduling if the #baseSchedRef.schedWindowSize tasks have been scheduled.
 			if baseSchedRef.schedPolSwitchEnabled &&
 				(s.numTasksScheduled >= baseSchedRef.schedWindowSize) {
-				log.Printf("Stopped scheduling... Completed scheduling %d tasks.",
-					s.numTasksScheduled)
 				break // Offers will automatically get declined.
 			}
 			// We need to pick a min task or a max task
@@ -173,6 +170,4 @@ func (s *MaxMin) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDri
 			driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
 		}
 	}
-
-	s.switchIfNecessary(spc)
 }
diff --git a/schedulers/base.go b/schedulers/base.go
index 191c745..ec81227 100644
--- a/schedulers/base.go
+++ b/schedulers/base.go
@@ -58,6 +58,10 @@ type BaseScheduler struct {
 
 	// Whether switching of scheduling policies at runtime has been enabled
 	schedPolSwitchEnabled bool
+	// Name of the first scheduling policy to be deployed, if provided.
+	// This scheduling policy would be deployed first regardless of the distribution of tasks in the TaskQueue.
+	// Note: Scheduling policy switching needs to be enabled.
+	nameOfFstSchedPolToDeploy string
 
 	// Size of window of tasks that can be scheduled in the next offer cycle.
 	// The window size can be adjusted to make the most use of every resource offer.
@@ -186,19 +190,10 @@ func (s *BaseScheduler) ResourceOffers(driver sched.SchedulerDriver, offers []*m
 			s.HostNameToSlaveID[offer.GetHostname()] = *offer.SlaveId.Value
 		}
 	}
-	// If no resource offers have been received yet, and if scheduling policy switching has been enabled,
-	// then we would need to compute the size of the scheduling window for the current scheduling policy.
-	// Initially the size of the scheduling window is 0. So, based on the total available resources on the cluster,
-	// the size of the window is determined and the scheduling policy is then applied for the corresponding number
-	// of tasks.
-	// Subsequently, the size of the scheduling window is determined at the end of each offer cycle.
-	if s.schedPolSwitchEnabled && !s.hasReceivedResourceOffers {
-		s.schedWindowSize, s.numTasksInSchedWindow = s.schedWindowResStrategy.Apply(func() interface{} {
-			return s.tasks
-		})
-	}
-	log.Printf("SchedWindowSize: %d, NumberOfTasksInWindow: %d", s.schedWindowSize, s.numTasksInSchedWindow)
-
+	// Switch just before consuming the resource offers.
+	s.curSchedPolicy.SwitchIfNecessary(s)
+	s.Log(elecLogDef.GENERAL, fmt.Sprintf("SchedWindowSize[%d], #TasksInWindow[%d]",
+		s.schedWindowSize, s.numTasksInSchedWindow))
 	s.curSchedPolicy.ConsumeOffers(s, driver, offers)
 	s.hasReceivedResourceOffers = true
 }
@@ -403,8 +398,9 @@ func (s *BaseScheduler) LogTaskStatusUpdate(status *mesos.TaskStatus) {
 	s.Log(lmt, msg)
 }
 
-func (s *BaseScheduler) LogSchedPolicySwitch(name string, nextPolicy SchedPolicyState) {
+func (s *BaseScheduler) LogSchedPolicySwitch(taskDist float64, name string, nextPolicy SchedPolicyState) {
 	if s.curSchedPolicy != nextPolicy {
 		s.Log(elecLogDef.SPS, name)
+		s.Log(elecLogDef.GENERAL, fmt.Sprintf("Switching... TaskDistribution[%d] ==> %s", taskDist, name))
 	}
 }
diff --git a/schedulers/bin-packing.go b/schedulers/bin-packing.go
index dd6f9be..fb85e55 100644
--- a/schedulers/bin-packing.go
+++ b/schedulers/bin-packing.go
@@ -34,7 +34,6 @@ type BinPackSortedWatts struct {
 }
 
 func (s *BinPackSortedWatts) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDriver, offers []*mesos.Offer) {
-	log.Println("BPSW scheduling...")
 	baseSchedRef := spc.(*BaseScheduler)
 	if baseSchedRef.schedPolSwitchEnabled {
 		SortNTasks(baseSchedRef.tasks, baseSchedRef.numTasksInSchedWindow, def.SortByWatts)
@@ -78,8 +77,6 @@ func (s *BinPackSortedWatts) ConsumeOffers(spc SchedPolicyContext, driver sched.
 				// stop scheduling if the #baseSchedRef.schedWindowSize tasks have been scheduled.
 				if baseSchedRef.schedPolSwitchEnabled &&
 					(s.numTasksScheduled >= baseSchedRef.schedWindowSize) {
-					log.Printf("Stopped scheduling... Completed scheduling %d tasks.",
-						s.numTasksScheduled)
 					break // Offers will automatically get declined.
 				}
 				// Does the task fit
@@ -107,7 +104,7 @@ func (s *BinPackSortedWatts) ConsumeOffers(spc SchedPolicyContext, driver sched.
 						}
 					}
 				} else {
-					break // Continue on to next offer.
+					break // Continue on to next task
 				}
 			}
 		}
@@ -123,6 +120,4 @@ func (s *BinPackSortedWatts) ConsumeOffers(spc SchedPolicyContext, driver sched.
 			driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
 		}
 	}
-
-	s.switchIfNecessary(spc)
 }
diff --git a/schedulers/first-fit.go b/schedulers/first-fit.go
index 0a93858..8538aa8 100644
--- a/schedulers/first-fit.go
+++ b/schedulers/first-fit.go
@@ -34,7 +34,6 @@ type FirstFit struct {
 }
 
 func (s *FirstFit) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDriver, offers []*mesos.Offer) {
-	log.Println("FirstFit scheduling...")
 	baseSchedRef := spc.(*BaseScheduler)
 	baseSchedRef.LogOffersReceived(offers)
 
@@ -57,8 +56,6 @@ func (s *FirstFit) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerD
 			// If scheduling policy switching enabled, then
 			// stop scheduling if the #baseSchedRef.schedWindowSize tasks have been scheduled.
 			if baseSchedRef.schedPolSwitchEnabled && (s.numTasksScheduled >= baseSchedRef.schedWindowSize) {
-				log.Printf("Stopped scheduling... Completed scheduling %d tasks.",
-					s.numTasksScheduled)
 				break // Offers will automatically get declined.
 			}
 			task := baseSchedRef.tasks[i]
@@ -104,6 +101,4 @@ func (s *FirstFit) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerD
 			driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter)
 		}
 	}
-
-	s.switchIfNecessary(spc)
 }
diff --git a/schedulers/helpers.go b/schedulers/helpers.go
index 91a1d79..4a5ada2 100644
--- a/schedulers/helpers.go
+++ b/schedulers/helpers.go
@@ -124,6 +124,23 @@ func WithSchedPolSwitchEnabled(enableSchedPolicySwitch bool) schedulerOptions {
 	}
 }
 
+func WithNameOfFirstSchedPolToFix(nameOfFirstSchedPol string) schedulerOptions {
+	return func(s ElectronScheduler) error {
+		if nameOfFirstSchedPol == "" {
+			lmt := elecLogDef.WARNING
+			msgColor := elecLogDef.LogMessageColors[lmt]
+			msg := msgColor.Sprintf("First scheduling policy to deploy not mentioned. This is now going to be determined at runtime.")
+			s.(*BaseScheduler).Log(lmt, msg)
+			return nil
+		}
+		if _, ok := SchedPolicies[nameOfFirstSchedPol]; !ok {
+			return errors.New("Invalid name of scheduling policy.")
+		}
+		s.(*BaseScheduler).nameOfFstSchedPolToDeploy = nameOfFirstSchedPol
+		return nil
+	}
+}
+
 // Launch tasks.
 func LaunchTasks(offerIDs []*mesos.OfferID, tasksToLaunch []*mesos.TaskInfo, driver sched.SchedulerDriver) {
 	driver.LaunchTasks(offerIDs, tasksToLaunch, mesosUtils.DefaultFilter)
diff --git a/schedulers/schedPolicy.go b/schedulers/schedPolicy.go
index 1dec900..8f6a098 100644
--- a/schedulers/schedPolicy.go
+++ b/schedulers/schedPolicy.go
@@ -1,9 +1,9 @@
 package schedulers
 
 import (
+	"bitbucket.org/sunybingcloud/electron/def"
 	mesos "github.com/mesos/mesos-go/api/v0/mesosproto"
 	sched "github.com/mesos/mesos-go/api/v0/scheduler"
-	"math/rand"
 )
 
 type SchedPolicyContext interface {
@@ -14,6 +14,13 @@ type SchedPolicyContext interface {
 type SchedPolicyState interface {
 	// Define the particular scheduling policy's methodology of resource offer consumption.
 	ConsumeOffers(SchedPolicyContext, sched.SchedulerDriver, []*mesos.Offer)
+	// Get information about the scheduling policy.
+	GetInfo() (info struct {
+		taskDist    float64
+		varCpuShare float64
+	})
+	// Switch scheduling policy if necessary.
+	SwitchIfNecessary(SchedPolicyContext)
 }
 
 type baseSchedPolicyState struct {
@@ -29,26 +36,87 @@ type baseSchedPolicyState struct {
 	VarianceCpuSharePerTask float64 `json:"varCpuShare"`
 }
 
-func (bsps *baseSchedPolicyState) switchIfNecessary(spc SchedPolicyContext) {
+func (bsps *baseSchedPolicyState) SwitchIfNecessary(spc SchedPolicyContext) {
 	baseSchedRef := spc.(*BaseScheduler)
-	// Switch scheduling policy only if feature enabled from CLI
+	// Switching scheduling policy only if feature enabled from CLI
 	if baseSchedRef.schedPolSwitchEnabled {
+		// Name of the scheduling policy to switch to.
+		switchToPolicyName := ""
 		// Need to recompute size of the scheduling window for the next offer cycle.
 		// The next scheduling policy will schedule at max schedWindowSize number of tasks.
 		baseSchedRef.schedWindowSize, baseSchedRef.numTasksInSchedWindow =
 			baseSchedRef.schedWindowResStrategy.Apply(func() interface{} { return baseSchedRef.tasks })
-		// Switching to a random scheduling policy.
-		// TODO: Switch based on some criteria.
-		index := rand.Intn(len(SchedPolicies))
-		for k, v := range SchedPolicies {
-			if index == 0 {
-				baseSchedRef.LogSchedPolicySwitch(k, v)
-				spc.SwitchSchedPol(v)
-				// Resetting the number of tasks scheduled.
-				bsps.numTasksScheduled = 0
-				break
+		// Determine the distribution of tasks in the new scheduling window.
+		taskDist, err := def.GetTaskDistributionInWindow(baseSchedRef.schedWindowSize, baseSchedRef.tasks)
+		// If no resource offers have been received yet, and
+		// 	the name of the first scheduling policy to be deployed is provided,
+		// 	we switch to this policy regardless of the task distribution.
+		if !baseSchedRef.hasReceivedResourceOffers && (baseSchedRef.nameOfFstSchedPolToDeploy != "") {
+			switchToPolicyName = baseSchedRef.nameOfFstSchedPolToDeploy
+		} else if err != nil {
+			// All the tasks in the window were only classified into 1 cluster.
+			// Max-Min and Max-GreedyMins would work the same way as Bin-Packing for this situation.
+			// So, we have 2 choices to make. First-Fit or Bin-Packing.
+			// If choose Bin-Packing, then there might be a performance degradation due to increase in
+			// 	resource contention. So, First-Fit might be a better option to cater to the worst case
+			// 	where all the tasks are power intensive tasks.
+			// TODO: Another possibility is to do the exact opposite and choose Bin-Packing.
+			// TODO[2]: Determine scheduling policy based on the distribution of tasks in the whole queue.
+			switchToPolicyName = bp
+		} else {
+			// The tasks in the scheduling window were classified into 2 clusters, meaning that there is
+			// 	some variety in the kind of tasks.
+			// We now select the scheduling policy which is most appropriate for this distribution of tasks.
+			first := schedPoliciesToSwitch[0]
+			last := schedPoliciesToSwitch[len(schedPoliciesToSwitch)-1]
+			if taskDist < first.sp.GetInfo().taskDist {
+				switchToPolicyName = first.spName
+			} else if taskDist > last.sp.GetInfo().taskDist {
+				switchToPolicyName = last.spName
+			} else {
+				low := 0
+				high := len(schedPoliciesToSwitch) - 1
+				for low <= high {
+					mid := (low + high) / 2
+					if taskDist < schedPoliciesToSwitch[mid].sp.GetInfo().taskDist {
+						high = mid - 1
+					} else if taskDist > schedPoliciesToSwitch[mid].sp.GetInfo().taskDist {
+						low = mid + 1
+					} else {
+						switchToPolicyName = schedPoliciesToSwitch[mid].spName
+						break
+					}
+				}
+				// We're here if low == high+1.
+				// If haven't yet found the closest match.
+				if switchToPolicyName == "" {
+					lowDiff := schedPoliciesToSwitch[low].sp.GetInfo().taskDist - taskDist
+					highDiff := taskDist - schedPoliciesToSwitch[high].sp.GetInfo().taskDist
+					if lowDiff > highDiff {
+						switchToPolicyName = schedPoliciesToSwitch[high].spName
+					} else if highDiff > lowDiff {
+						switchToPolicyName = schedPoliciesToSwitch[low].spName
+					} else {
+						// index doens't matter as the values at high and low are equidistant
+						// 	from taskDist.
+						switchToPolicyName = schedPoliciesToSwitch[high].spName
+					}
+				}
 			}
-			index--
 		}
+		// Switching scheduling policy.
+		baseSchedRef.LogSchedPolicySwitch(taskDist, switchToPolicyName, SchedPolicies[switchToPolicyName])
+		baseSchedRef.SwitchSchedPol(SchedPolicies[switchToPolicyName])
+		// Resetting the number of tasks scheduled.
+		bsps.numTasksScheduled = 0
 	}
 }
+
+func (bsps *baseSchedPolicyState) GetInfo() (info struct {
+	taskDist    float64
+	varCpuShare float64
+}) {
+	info.taskDist = bsps.TaskDistribution
+	info.varCpuShare = bsps.VarianceCpuSharePerTask
+	return info
+}
diff --git a/schedulers/store.go b/schedulers/store.go
index 1e40ba6..ff3b844 100644
--- a/schedulers/store.go
+++ b/schedulers/store.go
@@ -1,10 +1,12 @@
 package schedulers
 
 import (
+	"bitbucket.org/sunybingcloud/electron/utilities"
 	"encoding/json"
 	sched "github.com/mesos/mesos-go/api/v0/scheduler"
 	"github.com/pkg/errors"
 	"os"
+	"sort"
 )
 
 // Names of different scheduling policies.
@@ -23,6 +25,15 @@ var SchedPolicies map[string]SchedPolicyState = map[string]SchedPolicyState{
 	mm:  &MaxMin{},
 }
 
+// Scheduling policies to choose when switching
+var schedPoliciesToSwitch map[int]struct {
+	spName string
+	sp     SchedPolicyState
+} = make(map[int]struct {
+	spName string
+	sp     SchedPolicyState
+})
+
 // Initialize scheduling policy characteristics using the provided config file.
 func InitSchedPolicyCharacteristics(schedPoliciesConfigFilename string) error {
 	var schedPolConfig map[string]baseSchedPolicyState
@@ -52,6 +63,31 @@ func InitSchedPolicyCharacteristics(schedPoliciesConfigFilename string) error {
 				t.VarianceCpuSharePerTask = schedPolConfig[schedPolName].VarianceCpuSharePerTask
 			}
 		}
+
+		// Initialize schedPoliciesToSwitch to allow binary searching for scheduling policy switching.
+		spInformation := map[string]float64{}
+		for spName, sp := range SchedPolicies {
+			spInformation[spName] = sp.GetInfo().taskDist
+		}
+		spInformationPairList := utilities.GetPairList(spInformation)
+		// Sorting spInformationPairList in non-increasing order of taskDist.
+		sort.SliceStable(spInformationPairList, func(i, j int) bool {
+			return spInformationPairList[i].Value < spInformationPairList[j].Value
+		})
+		// Initializing scheduling policies that are setup for switching.
+		index := 0
+		for _, spInformationPair := range spInformationPairList {
+			if spInformationPair.Value != 0 {
+				schedPoliciesToSwitch[index] = struct {
+					spName string
+					sp     SchedPolicyState
+				}{
+					spName: spInformationPair.Key,
+					sp:     SchedPolicies[spInformationPair.Key],
+				}
+				index++
+			}
+		}
 	}
 
 	return nil
diff --git a/utilities/schedUtils/schedUtils.go b/utilities/schedUtils/schedUtils.go
index 6a2b0b7..889141a 100644
--- a/utilities/schedUtils/schedUtils.go
+++ b/utilities/schedUtils/schedUtils.go
@@ -77,5 +77,23 @@ func (s *fillNextOfferCycle) apply(taskQueue []def.Task) (int, int) {
 			break
 		}
 	}
+	// Hacking...
+	// 2^window is window<=7
+	//	if newSchedWindow <= 7 {
+	//		newSchedWindow = int(math.Pow(2.0, float64(newSchedWindow)))
+	//	}
+	// Another hack. Getting rid of window to see whether the idle power consumption can be amortized.
+	// Setting window as the length of the entire queue.
+	// Also setting numberOfTasksTraversed to the number of tasks in the entire queue.
+	// TODO: Create another resizing strategy that sizes the window to the length of the entire pending queue.
+	flattenedLength := 0
+	numTasks := 0
+	for _, ts := range taskQueue {
+		numTasks++
+		flattenedLength += *ts.Instances
+	}
+	newSchedWindow = flattenedLength
+	numberOfTasksTraversed = numTasks
+
 	return newSchedWindow, numberOfTasksTraversed
 }