diff --git a/def/taskUtils.go b/def/taskUtils.go index 71ae80b..f239bc6 100644 --- a/def/taskUtils.go +++ b/def/taskUtils.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "github.com/mash/gokmeans" + "github.com/montanaflynn/stats" "log" "sort" ) @@ -69,16 +70,31 @@ func getObservations(tasks []Task, taskObservation func(task Task) []float64) [] return observations } -// Size tasks based on the power consumption. -// TODO: Size the cluster in a better way other than just taking an aggregate of the watts resource requirement. -func clusterSize(tasks []Task, taskObservation func(task Task) []float64) float64 { - size := 0.0 +// Sizing each task cluster using the average MMMPU requirement of the task in the cluster. +func clusterSizeAvgMMMPU(tasks []Task, taskObservation func(task Task) []float64) float64 { + mmmpuValues := []float64{} + // Total sum of the Median of Median Max Power Usage values for all tasks. + total := 0.0 for _, task := range tasks { - for _, observation := range taskObservation(task) { - size += observation + observations := taskObservation(task) + if len(observations) > 0 { + // taskObservation would give us the mmpu values. We would need to take the median of these + // values to obtain the Median of Median Max Power Usage value. + if medianValue, err := stats.Median(observations); err == nil { + mmmpuValues = append(mmmpuValues, medianValue) + total += medianValue + } else { + // skip this value + // there is an error in the task config. + log.Println(err) + } + } else { + // There is only one observation for the task. + mmmpuValues = append(mmmpuValues, observations[0]) } } - return size + + return total / float64(len(mmmpuValues)) } // Order clusters in increasing order of task heaviness. @@ -96,12 +112,12 @@ func labelAndOrder(clusters map[int][]Task, numberOfClusters int, taskObservatio } for i := 0; i < numberOfClusters-1; i++ { - // Sizing the current cluster. - sizeI := clusterSize(clusters[i], taskObservation) + // Sizing the current cluster based on average Median of Median Max Power Usage of tasks. + sizeI := clusterSizeAvgMMMPU(clusters[i], taskObservation) // Comparing with the other clusters. for j := i + 1; j < numberOfClusters; j++ { - sizeJ := clusterSize(clusters[j], taskObservation) + sizeJ := clusterSizeAvgMMMPU(clusters[j], taskObservation) if sizeI > sizeJ { sizedClusters[i].SizeScore++ } else { @@ -159,3 +175,59 @@ func GetResourceRequirement(taskID string) (TaskResources, error) { return TaskResources{}, errors.New("Invalid TaskID: " + taskID) } } + +// Determine the distribution of light power consuming and heavy power consuming tasks in a given window. +func GetTaskDistributionInWindow(windowSize int, tasks []Task) (float64, error) { + getTotalInstances := func(ts []Task, taskExceedingWindow struct { + taskName string + instsToDiscard int + }) int { + total := 0 + for _, t := range ts { + if t.Name == taskExceedingWindow.taskName { + total += (*t.Instances - taskExceedingWindow.instsToDiscard) + continue + } + total += *t.Instances + } + return total + } + + getTasksInWindow := func() (tasksInWindow []Task, taskExceedingWindow struct { + taskName string + instsToDiscard int + }) { + tasksTraversed := 0 + // Name of task, only few instances of which fall within the window. + lastTaskName := "" + for _, task := range tasks { + tasksInWindow = append(tasksInWindow, task) + tasksTraversed += *task.Instances + lastTaskName = task.Name + if tasksTraversed >= windowSize { + taskExceedingWindow.taskName = lastTaskName + taskExceedingWindow.instsToDiscard = tasksTraversed - windowSize + break + } + } + + return + } + + // Retrieving the tasks that are in the window. + tasksInWIndow, taskExceedingWindow := getTasksInWindow() + // Classifying the tasks based on Median of Median Max Power Usage values. + taskClusters := ClassifyTasks(tasksInWIndow, 2) + // First we'll need to check if the tasks in the window could be classified into 2 clusters. + // If yes, then we proceed with determining the distribution. + // Else, we throw an error stating that the distribution is even as only one cluster could be formed. + if len(taskClusters[1].Tasks) == 0 { + return -1.0, errors.New("Only one cluster could be formed.") + } + + // The first cluster would corresponding to the light power consuming tasks. + // The second cluster would corresponding to the high power consuming tasks. + lpcTasksTotalInst := getTotalInstances(taskClusters[0].Tasks, taskExceedingWindow) + hpcTasksTotalInst := getTotalInstances(taskClusters[1].Tasks, taskExceedingWindow) + return float64(lpcTasksTotalInst) / float64(hpcTasksTotalInst), nil +} diff --git a/scheduler.go b/scheduler.go index c532ae4..14eef78 100644 --- a/scheduler.go +++ b/scheduler.go @@ -28,6 +28,7 @@ var schedPolicyName = flag.String("schedPolicy", "first-fit", "Name of the sched var listSchedPolicies = flag.Bool("listSchedPolicies", false, "List the names of the pluaggable scheduling policies.") var enableSchedPolicySwitch = flag.Bool("switchSchedPolicy", false, "Enable switching of scheduling policies at runtime.") var schedPolConfigFile = flag.String("schedPolConfig", "", "Config file that contains information for each scheduling policy.") +var fixFirstSchedPol = flag.String("fixFirstSchedPol", "", "Name of the scheduling policy to be deployed first, regardless of the distribution of tasks, provided switching is enabled.") // Short hand args func init() { @@ -42,6 +43,7 @@ func init() { flag.BoolVar(listSchedPolicies, "lsp", false, "Names of the pluaggable scheduling policies. (shorthand)") flag.BoolVar(enableSchedPolicySwitch, "ssp", false, "Enable switching of scheduling policies at runtime.") flag.StringVar(schedPolConfigFile, "spConfig", "", "Config file that contains information for each scheduling policy (shorthand).") + flag.StringVar(fixFirstSchedPol, "fxFstSchedPol", "", "Name of the schedulin gpolicy to be deployed first, regardless of the distribution of tasks, provided switching is enabled (shorthand).") } func listAllSchedulingPolicies() { @@ -135,7 +137,8 @@ func main() { schedulers.WithDone(done), schedulers.WithPCPLog(pcpLog), schedulers.WithLoggingChannels(logMType, logMsg), - schedulers.WithSchedPolSwitchEnabled(*enableSchedPolicySwitch)) + schedulers.WithSchedPolSwitchEnabled(*enableSchedPolicySwitch), + schedulers.WithNameOfFirstSchedPolToFix(*fixFirstSchedPol)) driver, err := sched.NewMesosSchedulerDriver(sched.DriverConfig{ Master: *master, Framework: &mesos.FrameworkInfo{ diff --git a/schedulers/MaxGreedyMins.go b/schedulers/MaxGreedyMins.go index aeef812..310cb42 100644 --- a/schedulers/MaxGreedyMins.go +++ b/schedulers/MaxGreedyMins.go @@ -77,7 +77,6 @@ func (s *MaxGreedyMins) CheckFit( } func (s *MaxGreedyMins) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDriver, offers []*mesos.Offer) { - log.Println("Max-GreedyMins scheduling...") baseSchedRef := spc.(*BaseScheduler) if baseSchedRef.schedPolSwitchEnabled { SortNTasks(baseSchedRef.tasks, baseSchedRef.numTasksInSchedWindow, def.SortByWatts) @@ -112,8 +111,6 @@ func (s *MaxGreedyMins) ConsumeOffers(spc SchedPolicyContext, driver sched.Sched // If scheduling policy switching enabled, then // stop scheduling if the #baseSchedRef.schedWindowSize tasks have been scheduled. if baseSchedRef.schedPolSwitchEnabled && (s.numTasksScheduled >= baseSchedRef.schedWindowSize) { - log.Printf("Stopped scheduling... Completed scheduling %d tasks.", - s.numTasksScheduled) break // Offers will automatically get declined. } task := baseSchedRef.tasks[i] @@ -141,11 +138,6 @@ func (s *MaxGreedyMins) ConsumeOffers(spc SchedPolicyContext, driver sched.Sched // Pack the rest of the offer with the smallest tasks for i := 0; i < len(baseSchedRef.tasks); i++ { - // If scheduling policy switching enabled, then - // stop scheduling if the #baseSchedRef.schedWindowSize tasks have been scheduled. - if baseSchedRef.schedPolSwitchEnabled && (s.numTasksScheduled >= baseSchedRef.schedWindowSize) { - break // Offers will automatically get declined. - } task := baseSchedRef.tasks[i] wattsConsideration, err := def.WattsToConsider(task, baseSchedRef.classMapWatts, offer) if err != nil { @@ -159,6 +151,11 @@ func (s *MaxGreedyMins) ConsumeOffers(spc SchedPolicyContext, driver sched.Sched } for *task.Instances > 0 { + // If scheduling policy switching enabled, then + // stop scheduling if the #baseSchedRef.schedWindowSize tasks have been scheduled. + if baseSchedRef.schedPolSwitchEnabled && (s.numTasksScheduled >= baseSchedRef.schedWindowSize) { + break // Offers will automatically get declined. + } // TODO: Fix this so index doesn't need to be passed taken, taskToSchedule := s.CheckFit(spc, i, task, wattsConsideration, offer, &totalCPU, &totalRAM, &totalWatts) @@ -183,6 +180,4 @@ func (s *MaxGreedyMins) ConsumeOffers(spc SchedPolicyContext, driver sched.Sched driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } - - s.switchIfNecessary(spc) } diff --git a/schedulers/MaxMin.go b/schedulers/MaxMin.go index 841c96d..672124f 100644 --- a/schedulers/MaxMin.go +++ b/schedulers/MaxMin.go @@ -76,7 +76,6 @@ func (s *MaxMin) CheckFit( } func (s *MaxMin) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDriver, offers []*mesos.Offer) { - log.Println("Max-Min scheduling...") baseSchedRef := spc.(*BaseScheduler) if baseSchedRef.schedPolSwitchEnabled { SortNTasks(baseSchedRef.tasks, baseSchedRef.numTasksInSchedWindow, def.SortByWatts) @@ -116,8 +115,6 @@ func (s *MaxMin) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDri // stop scheduling if the #baseSchedRef.schedWindowSize tasks have been scheduled. if baseSchedRef.schedPolSwitchEnabled && (s.numTasksScheduled >= baseSchedRef.schedWindowSize) { - log.Printf("Stopped scheduling... Completed scheduling %d tasks.", - s.numTasksScheduled) break // Offers will automatically get declined. } // We need to pick a min task or a max task @@ -173,6 +170,4 @@ func (s *MaxMin) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDri driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } - - s.switchIfNecessary(spc) } diff --git a/schedulers/base.go b/schedulers/base.go index 191c745..ec81227 100644 --- a/schedulers/base.go +++ b/schedulers/base.go @@ -58,6 +58,10 @@ type BaseScheduler struct { // Whether switching of scheduling policies at runtime has been enabled schedPolSwitchEnabled bool + // Name of the first scheduling policy to be deployed, if provided. + // This scheduling policy would be deployed first regardless of the distribution of tasks in the TaskQueue. + // Note: Scheduling policy switching needs to be enabled. + nameOfFstSchedPolToDeploy string // Size of window of tasks that can be scheduled in the next offer cycle. // The window size can be adjusted to make the most use of every resource offer. @@ -186,19 +190,10 @@ func (s *BaseScheduler) ResourceOffers(driver sched.SchedulerDriver, offers []*m s.HostNameToSlaveID[offer.GetHostname()] = *offer.SlaveId.Value } } - // If no resource offers have been received yet, and if scheduling policy switching has been enabled, - // then we would need to compute the size of the scheduling window for the current scheduling policy. - // Initially the size of the scheduling window is 0. So, based on the total available resources on the cluster, - // the size of the window is determined and the scheduling policy is then applied for the corresponding number - // of tasks. - // Subsequently, the size of the scheduling window is determined at the end of each offer cycle. - if s.schedPolSwitchEnabled && !s.hasReceivedResourceOffers { - s.schedWindowSize, s.numTasksInSchedWindow = s.schedWindowResStrategy.Apply(func() interface{} { - return s.tasks - }) - } - log.Printf("SchedWindowSize: %d, NumberOfTasksInWindow: %d", s.schedWindowSize, s.numTasksInSchedWindow) - + // Switch just before consuming the resource offers. + s.curSchedPolicy.SwitchIfNecessary(s) + s.Log(elecLogDef.GENERAL, fmt.Sprintf("SchedWindowSize[%d], #TasksInWindow[%d]", + s.schedWindowSize, s.numTasksInSchedWindow)) s.curSchedPolicy.ConsumeOffers(s, driver, offers) s.hasReceivedResourceOffers = true } @@ -403,8 +398,9 @@ func (s *BaseScheduler) LogTaskStatusUpdate(status *mesos.TaskStatus) { s.Log(lmt, msg) } -func (s *BaseScheduler) LogSchedPolicySwitch(name string, nextPolicy SchedPolicyState) { +func (s *BaseScheduler) LogSchedPolicySwitch(taskDist float64, name string, nextPolicy SchedPolicyState) { if s.curSchedPolicy != nextPolicy { s.Log(elecLogDef.SPS, name) + s.Log(elecLogDef.GENERAL, fmt.Sprintf("Switching... TaskDistribution[%d] ==> %s", taskDist, name)) } } diff --git a/schedulers/bin-packing.go b/schedulers/bin-packing.go index dd6f9be..fb85e55 100644 --- a/schedulers/bin-packing.go +++ b/schedulers/bin-packing.go @@ -34,7 +34,6 @@ type BinPackSortedWatts struct { } func (s *BinPackSortedWatts) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDriver, offers []*mesos.Offer) { - log.Println("BPSW scheduling...") baseSchedRef := spc.(*BaseScheduler) if baseSchedRef.schedPolSwitchEnabled { SortNTasks(baseSchedRef.tasks, baseSchedRef.numTasksInSchedWindow, def.SortByWatts) @@ -78,8 +77,6 @@ func (s *BinPackSortedWatts) ConsumeOffers(spc SchedPolicyContext, driver sched. // stop scheduling if the #baseSchedRef.schedWindowSize tasks have been scheduled. if baseSchedRef.schedPolSwitchEnabled && (s.numTasksScheduled >= baseSchedRef.schedWindowSize) { - log.Printf("Stopped scheduling... Completed scheduling %d tasks.", - s.numTasksScheduled) break // Offers will automatically get declined. } // Does the task fit @@ -107,7 +104,7 @@ func (s *BinPackSortedWatts) ConsumeOffers(spc SchedPolicyContext, driver sched. } } } else { - break // Continue on to next offer. + break // Continue on to next task } } } @@ -123,6 +120,4 @@ func (s *BinPackSortedWatts) ConsumeOffers(spc SchedPolicyContext, driver sched. driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } - - s.switchIfNecessary(spc) } diff --git a/schedulers/first-fit.go b/schedulers/first-fit.go index 0a93858..8538aa8 100644 --- a/schedulers/first-fit.go +++ b/schedulers/first-fit.go @@ -34,7 +34,6 @@ type FirstFit struct { } func (s *FirstFit) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerDriver, offers []*mesos.Offer) { - log.Println("FirstFit scheduling...") baseSchedRef := spc.(*BaseScheduler) baseSchedRef.LogOffersReceived(offers) @@ -57,8 +56,6 @@ func (s *FirstFit) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerD // If scheduling policy switching enabled, then // stop scheduling if the #baseSchedRef.schedWindowSize tasks have been scheduled. if baseSchedRef.schedPolSwitchEnabled && (s.numTasksScheduled >= baseSchedRef.schedWindowSize) { - log.Printf("Stopped scheduling... Completed scheduling %d tasks.", - s.numTasksScheduled) break // Offers will automatically get declined. } task := baseSchedRef.tasks[i] @@ -104,6 +101,4 @@ func (s *FirstFit) ConsumeOffers(spc SchedPolicyContext, driver sched.SchedulerD driver.DeclineOffer(offer.Id, mesosUtils.DefaultFilter) } } - - s.switchIfNecessary(spc) } diff --git a/schedulers/helpers.go b/schedulers/helpers.go index 91a1d79..4a5ada2 100644 --- a/schedulers/helpers.go +++ b/schedulers/helpers.go @@ -124,6 +124,23 @@ func WithSchedPolSwitchEnabled(enableSchedPolicySwitch bool) schedulerOptions { } } +func WithNameOfFirstSchedPolToFix(nameOfFirstSchedPol string) schedulerOptions { + return func(s ElectronScheduler) error { + if nameOfFirstSchedPol == "" { + lmt := elecLogDef.WARNING + msgColor := elecLogDef.LogMessageColors[lmt] + msg := msgColor.Sprintf("First scheduling policy to deploy not mentioned. This is now going to be determined at runtime.") + s.(*BaseScheduler).Log(lmt, msg) + return nil + } + if _, ok := SchedPolicies[nameOfFirstSchedPol]; !ok { + return errors.New("Invalid name of scheduling policy.") + } + s.(*BaseScheduler).nameOfFstSchedPolToDeploy = nameOfFirstSchedPol + return nil + } +} + // Launch tasks. func LaunchTasks(offerIDs []*mesos.OfferID, tasksToLaunch []*mesos.TaskInfo, driver sched.SchedulerDriver) { driver.LaunchTasks(offerIDs, tasksToLaunch, mesosUtils.DefaultFilter) diff --git a/schedulers/schedPolicy.go b/schedulers/schedPolicy.go index 1dec900..8f6a098 100644 --- a/schedulers/schedPolicy.go +++ b/schedulers/schedPolicy.go @@ -1,9 +1,9 @@ package schedulers import ( + "bitbucket.org/sunybingcloud/electron/def" mesos "github.com/mesos/mesos-go/api/v0/mesosproto" sched "github.com/mesos/mesos-go/api/v0/scheduler" - "math/rand" ) type SchedPolicyContext interface { @@ -14,6 +14,13 @@ type SchedPolicyContext interface { type SchedPolicyState interface { // Define the particular scheduling policy's methodology of resource offer consumption. ConsumeOffers(SchedPolicyContext, sched.SchedulerDriver, []*mesos.Offer) + // Get information about the scheduling policy. + GetInfo() (info struct { + taskDist float64 + varCpuShare float64 + }) + // Switch scheduling policy if necessary. + SwitchIfNecessary(SchedPolicyContext) } type baseSchedPolicyState struct { @@ -29,26 +36,87 @@ type baseSchedPolicyState struct { VarianceCpuSharePerTask float64 `json:"varCpuShare"` } -func (bsps *baseSchedPolicyState) switchIfNecessary(spc SchedPolicyContext) { +func (bsps *baseSchedPolicyState) SwitchIfNecessary(spc SchedPolicyContext) { baseSchedRef := spc.(*BaseScheduler) - // Switch scheduling policy only if feature enabled from CLI + // Switching scheduling policy only if feature enabled from CLI if baseSchedRef.schedPolSwitchEnabled { + // Name of the scheduling policy to switch to. + switchToPolicyName := "" // Need to recompute size of the scheduling window for the next offer cycle. // The next scheduling policy will schedule at max schedWindowSize number of tasks. baseSchedRef.schedWindowSize, baseSchedRef.numTasksInSchedWindow = baseSchedRef.schedWindowResStrategy.Apply(func() interface{} { return baseSchedRef.tasks }) - // Switching to a random scheduling policy. - // TODO: Switch based on some criteria. - index := rand.Intn(len(SchedPolicies)) - for k, v := range SchedPolicies { - if index == 0 { - baseSchedRef.LogSchedPolicySwitch(k, v) - spc.SwitchSchedPol(v) - // Resetting the number of tasks scheduled. - bsps.numTasksScheduled = 0 - break + // Determine the distribution of tasks in the new scheduling window. + taskDist, err := def.GetTaskDistributionInWindow(baseSchedRef.schedWindowSize, baseSchedRef.tasks) + // If no resource offers have been received yet, and + // the name of the first scheduling policy to be deployed is provided, + // we switch to this policy regardless of the task distribution. + if !baseSchedRef.hasReceivedResourceOffers && (baseSchedRef.nameOfFstSchedPolToDeploy != "") { + switchToPolicyName = baseSchedRef.nameOfFstSchedPolToDeploy + } else if err != nil { + // All the tasks in the window were only classified into 1 cluster. + // Max-Min and Max-GreedyMins would work the same way as Bin-Packing for this situation. + // So, we have 2 choices to make. First-Fit or Bin-Packing. + // If choose Bin-Packing, then there might be a performance degradation due to increase in + // resource contention. So, First-Fit might be a better option to cater to the worst case + // where all the tasks are power intensive tasks. + // TODO: Another possibility is to do the exact opposite and choose Bin-Packing. + // TODO[2]: Determine scheduling policy based on the distribution of tasks in the whole queue. + switchToPolicyName = bp + } else { + // The tasks in the scheduling window were classified into 2 clusters, meaning that there is + // some variety in the kind of tasks. + // We now select the scheduling policy which is most appropriate for this distribution of tasks. + first := schedPoliciesToSwitch[0] + last := schedPoliciesToSwitch[len(schedPoliciesToSwitch)-1] + if taskDist < first.sp.GetInfo().taskDist { + switchToPolicyName = first.spName + } else if taskDist > last.sp.GetInfo().taskDist { + switchToPolicyName = last.spName + } else { + low := 0 + high := len(schedPoliciesToSwitch) - 1 + for low <= high { + mid := (low + high) / 2 + if taskDist < schedPoliciesToSwitch[mid].sp.GetInfo().taskDist { + high = mid - 1 + } else if taskDist > schedPoliciesToSwitch[mid].sp.GetInfo().taskDist { + low = mid + 1 + } else { + switchToPolicyName = schedPoliciesToSwitch[mid].spName + break + } + } + // We're here if low == high+1. + // If haven't yet found the closest match. + if switchToPolicyName == "" { + lowDiff := schedPoliciesToSwitch[low].sp.GetInfo().taskDist - taskDist + highDiff := taskDist - schedPoliciesToSwitch[high].sp.GetInfo().taskDist + if lowDiff > highDiff { + switchToPolicyName = schedPoliciesToSwitch[high].spName + } else if highDiff > lowDiff { + switchToPolicyName = schedPoliciesToSwitch[low].spName + } else { + // index doens't matter as the values at high and low are equidistant + // from taskDist. + switchToPolicyName = schedPoliciesToSwitch[high].spName + } + } } - index-- } + // Switching scheduling policy. + baseSchedRef.LogSchedPolicySwitch(taskDist, switchToPolicyName, SchedPolicies[switchToPolicyName]) + baseSchedRef.SwitchSchedPol(SchedPolicies[switchToPolicyName]) + // Resetting the number of tasks scheduled. + bsps.numTasksScheduled = 0 } } + +func (bsps *baseSchedPolicyState) GetInfo() (info struct { + taskDist float64 + varCpuShare float64 +}) { + info.taskDist = bsps.TaskDistribution + info.varCpuShare = bsps.VarianceCpuSharePerTask + return info +} diff --git a/schedulers/store.go b/schedulers/store.go index 1e40ba6..ff3b844 100644 --- a/schedulers/store.go +++ b/schedulers/store.go @@ -1,10 +1,12 @@ package schedulers import ( + "bitbucket.org/sunybingcloud/electron/utilities" "encoding/json" sched "github.com/mesos/mesos-go/api/v0/scheduler" "github.com/pkg/errors" "os" + "sort" ) // Names of different scheduling policies. @@ -23,6 +25,15 @@ var SchedPolicies map[string]SchedPolicyState = map[string]SchedPolicyState{ mm: &MaxMin{}, } +// Scheduling policies to choose when switching +var schedPoliciesToSwitch map[int]struct { + spName string + sp SchedPolicyState +} = make(map[int]struct { + spName string + sp SchedPolicyState +}) + // Initialize scheduling policy characteristics using the provided config file. func InitSchedPolicyCharacteristics(schedPoliciesConfigFilename string) error { var schedPolConfig map[string]baseSchedPolicyState @@ -52,6 +63,31 @@ func InitSchedPolicyCharacteristics(schedPoliciesConfigFilename string) error { t.VarianceCpuSharePerTask = schedPolConfig[schedPolName].VarianceCpuSharePerTask } } + + // Initialize schedPoliciesToSwitch to allow binary searching for scheduling policy switching. + spInformation := map[string]float64{} + for spName, sp := range SchedPolicies { + spInformation[spName] = sp.GetInfo().taskDist + } + spInformationPairList := utilities.GetPairList(spInformation) + // Sorting spInformationPairList in non-increasing order of taskDist. + sort.SliceStable(spInformationPairList, func(i, j int) bool { + return spInformationPairList[i].Value < spInformationPairList[j].Value + }) + // Initializing scheduling policies that are setup for switching. + index := 0 + for _, spInformationPair := range spInformationPairList { + if spInformationPair.Value != 0 { + schedPoliciesToSwitch[index] = struct { + spName string + sp SchedPolicyState + }{ + spName: spInformationPair.Key, + sp: SchedPolicies[spInformationPair.Key], + } + index++ + } + } } return nil diff --git a/utilities/schedUtils/schedUtils.go b/utilities/schedUtils/schedUtils.go index 6a2b0b7..889141a 100644 --- a/utilities/schedUtils/schedUtils.go +++ b/utilities/schedUtils/schedUtils.go @@ -77,5 +77,23 @@ func (s *fillNextOfferCycle) apply(taskQueue []def.Task) (int, int) { break } } + // Hacking... + // 2^window is window<=7 + // if newSchedWindow <= 7 { + // newSchedWindow = int(math.Pow(2.0, float64(newSchedWindow))) + // } + // Another hack. Getting rid of window to see whether the idle power consumption can be amortized. + // Setting window as the length of the entire queue. + // Also setting numberOfTasksTraversed to the number of tasks in the entire queue. + // TODO: Create another resizing strategy that sizes the window to the length of the entire pending queue. + flattenedLength := 0 + numTasks := 0 + for _, ts := range taskQueue { + numTasks++ + flattenedLength += *ts.Instances + } + newSchedWindow = flattenedLength + numberOfTasksTraversed = numTasks + return newSchedWindow, numberOfTasksTraversed }