From 45eac75e471cd15048e7191bc2ef213e7dfe62ff Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Tue, 5 May 2020 20:46:11 -0700 Subject: [PATCH] Adding support for using different update strategies. --- jobUpdate.go | 55 +++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 39 insertions(+), 16 deletions(-) diff --git a/jobUpdate.go b/jobUpdate.go index b4fbfea..1ed2ce5 100644 --- a/jobUpdate.go +++ b/jobUpdate.go @@ -31,32 +31,35 @@ type JobUpdate struct { func NewJobUpdate() *JobUpdate { newTask := NewTask() - req := aurora.JobUpdateRequest{} - req.TaskConfig = newTask.TaskConfig() - req.Settings = newUpdateSettings() - - return &JobUpdate{task: newTask, request: &req} + return &JobUpdate{ + task: newTask, + request: &aurora.JobUpdateRequest{TaskConfig: newTask.TaskConfig(), Settings: newUpdateSettings()}, + } } +// Creates an update with default values using an AuroraTask as the underlying task configuration. +// This function has a high level understanding of Aurora Tasks and thus will support copying a task that is configured +// to use Thermos. func JobUpdateFromAuroraTask(task *AuroraTask) *JobUpdate { newTask := task.Clone() - req := aurora.JobUpdateRequest{} - req.TaskConfig = newTask.TaskConfig() - req.Settings = newUpdateSettings() - - return &JobUpdate{task: newTask, request: &req} + return &JobUpdate{ + task: newTask, + request: &aurora.JobUpdateRequest{TaskConfig: newTask.TaskConfig(), Settings: newUpdateSettings()}, + } } - +// JobUpdateFromConfig creates an update with default values using an aurora.TaskConfig +// primitive as the underlying task configuration. +// This function should not be used unless the implications of using a primitive value are understood. +// For example, the primitive has no concept of Thermos. func JobUpdateFromConfig(task *aurora.TaskConfig) *JobUpdate { // Perform a deep copy to avoid unexpected behavior newTask := TaskFromThrift(task) - req := aurora.JobUpdateRequest{} - req.TaskConfig = newTask.TaskConfig() - req.Settings = newUpdateSettings() - - return &JobUpdate{task: newTask, request: &req} + return &JobUpdate{ + task: newTask, + request: &aurora.JobUpdateRequest{TaskConfig: newTask.TaskConfig(), Settings: newUpdateSettings()}, + } } // Set instance count the job will have after the update. @@ -106,6 +109,26 @@ func (j *JobUpdate) PulseIntervalTimeout(timeout time.Duration) *JobUpdate { j.request.Settings.BlockIfNoPulsesAfterMs = thrift.Int32Ptr(int32(timeout.Seconds() * 1000)) return j } +func (j *JobUpdate) BatchUpdateStrategy(autoPause bool, batchSize int32) *JobUpdate { + j.request.Settings.UpdateStrategy = &aurora.JobUpdateStrategy{ + BatchStrategy: &aurora.BatchJobUpdateStrategy{GroupSize: batchSize, AutopauseAfterBatch: autoPause}, + } + return j +} + +func (j *JobUpdate) QueueUpdateStrategy(groupSize int32) *JobUpdate { + j.request.Settings.UpdateStrategy = &aurora.JobUpdateStrategy{ + QueueStrategy: &aurora.QueueJobUpdateStrategy{GroupSize: groupSize}, + } + return j +} + +func (j *JobUpdate) VariableBatchStrategy(autoPause bool, batchSizes... int32) *JobUpdate { + j.request.Settings.UpdateStrategy = &aurora.JobUpdateStrategy{ + VarBatchStrategy: &aurora.VariableBatchJobUpdateStrategy{GroupSizes: batchSizes, AutopauseAfterBatch: autoPause}, + } + return j +} func newUpdateSettings() *aurora.JobUpdateSettings {