diff --git a/CHANGELOG.md b/CHANGELOG.md index 31c8acd..551bae9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,15 @@ * CreateService and StartJobUpdate do not continue retrying if a timeout has been encountered by the HTTP client. Instead they now return an error that conforms to the Timedout interface. Users can check for a Timedout error by using `realis.IsTimeout(err)`. +* New API function VariableBatchStep has been added which returns the current batch at which +a Variable Batch Update configured Update is currently in. +* Added new PauseUpdateMonitor which monitors an update until it is an `ROLL_FORWARD_PAUSED` state. +* Added variableBatchStep command to sample client to be used for testing new VariableBatchStep api. +* JobUpdateStatus has changed function signature from: +`JobUpdateStatus(updateKey aurora.JobUpdateKey, desiredStatuses map[aurora.JobUpdateStatus]bool, interval, timeout time.Duration) (aurora.JobUpdateStatus, error)` +to +`JobUpdateStatus(updateKey aurora.JobUpdateKey, desiredStatuses []aurora.JobUpdateStatus, interval, timeout time.Duration) (aurora.JobUpdateStatus, error)` +* Added TerminalUpdateStates function which returns an slice containing all UpdateStates which are considered terminal states. 1.21.0 diff --git a/auroraAPI.thrift b/auroraAPI.thrift index 4e9e5a9..3e43f6f 100644 --- a/auroraAPI.thrift +++ b/auroraAPI.thrift @@ -726,6 +726,8 @@ struct QueueJobUpdateStrategy { */ struct BatchJobUpdateStrategy { 1: i32 groupSize + /* Update will pause automatically after each batch completes */ + 2: bool autopauseAfterBatch } /** Same as Batch strategy but each time an active group completes, the size of the next active @@ -733,6 +735,8 @@ struct BatchJobUpdateStrategy { */ struct VariableBatchJobUpdateStrategy { 1: list groupSizes + /* Update will pause automatically after each batch completes */ + 2: bool autopauseAfterBatch } union JobUpdateStrategy { diff --git a/examples/client.go b/examples/client.go index f0dcedf..4f153e5 100644 --- a/examples/client.go +++ b/examples/client.go @@ -451,6 +451,13 @@ func main() { } fmt.Println(resp.String()) + case "variableBatchStep": + step, err := r.VariableBatchStep(aurora.JobUpdateKey{Job: job.JobKey(), ID: updateId}) + if err != nil { + log.Fatal(err) + } + fmt.Println(step) + case "taskConfig": fmt.Println("Getting job info") live, err := r.GetInstanceIds(job.JobKey(), aurora.ACTIVE_STATES) diff --git a/gen-go/apache/aurora/auroraAPI.go b/gen-go/apache/aurora/auroraAPI.go index 9bb9396..849c381 100644 --- a/gen-go/apache/aurora/auroraAPI.go +++ b/gen-go/apache/aurora/auroraAPI.go @@ -10137,8 +10137,10 @@ func (p *QueueJobUpdateStrategy) String() string { // // Attributes: // - GroupSize +// - AutopauseAfterBatch type BatchJobUpdateStrategy struct { GroupSize int32 `thrift:"groupSize,1" db:"groupSize" json:"groupSize"` + AutopauseAfterBatch bool `thrift:"autopauseAfterBatch,2" db:"autopauseAfterBatch" json:"autopauseAfterBatch"` } func NewBatchJobUpdateStrategy() *BatchJobUpdateStrategy { @@ -10149,6 +10151,10 @@ func NewBatchJobUpdateStrategy() *BatchJobUpdateStrategy { func (p *BatchJobUpdateStrategy) GetGroupSize() int32 { return p.GroupSize } + +func (p *BatchJobUpdateStrategy) GetAutopauseAfterBatch() bool { + return p.AutopauseAfterBatch +} func (p *BatchJobUpdateStrategy) Read(iprot thrift.TProtocol) error { if _, err := iprot.ReadStructBegin(); err != nil { return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) @@ -10172,6 +10178,16 @@ func (p *BatchJobUpdateStrategy) Read(iprot thrift.TProtocol) error { return err } } + case 2: + if fieldTypeId == thrift.BOOL { + if err := p.ReadField2(iprot); err != nil { + return err + } + } else { + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } default: if err := iprot.Skip(fieldTypeId); err != nil { return err @@ -10196,11 +10212,21 @@ func (p *BatchJobUpdateStrategy) ReadField1(iprot thrift.TProtocol) error { return nil } +func (p *BatchJobUpdateStrategy) ReadField2(iprot thrift.TProtocol) error { + if v, err := iprot.ReadBool(); err != nil { + return thrift.PrependError("error reading field 2: ", err) +} else { + p.AutopauseAfterBatch = v +} + return nil +} + func (p *BatchJobUpdateStrategy) Write(oprot thrift.TProtocol) error { if err := oprot.WriteStructBegin("BatchJobUpdateStrategy"); err != nil { return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) } if p != nil { if err := p.writeField1(oprot); err != nil { return err } + if err := p.writeField2(oprot); err != nil { return err } } if err := oprot.WriteFieldStop(); err != nil { return thrift.PrependError("write field stop error: ", err) } @@ -10219,6 +10245,16 @@ func (p *BatchJobUpdateStrategy) writeField1(oprot thrift.TProtocol) (err error) return err } +func (p *BatchJobUpdateStrategy) writeField2(oprot thrift.TProtocol) (err error) { + if err := oprot.WriteFieldBegin("autopauseAfterBatch", thrift.BOOL, 2); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 2:autopauseAfterBatch: ", p), err) } + if err := oprot.WriteBool(bool(p.AutopauseAfterBatch)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.autopauseAfterBatch (2) field write error: ", p), err) } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 2:autopauseAfterBatch: ", p), err) } + return err +} + func (p *BatchJobUpdateStrategy) String() string { if p == nil { return "" @@ -10231,8 +10267,10 @@ func (p *BatchJobUpdateStrategy) String() string { // // Attributes: // - GroupSizes +// - AutopauseAfterBatch type VariableBatchJobUpdateStrategy struct { GroupSizes []int32 `thrift:"groupSizes,1" db:"groupSizes" json:"groupSizes"` + AutopauseAfterBatch bool `thrift:"autopauseAfterBatch,2" db:"autopauseAfterBatch" json:"autopauseAfterBatch"` } func NewVariableBatchJobUpdateStrategy() *VariableBatchJobUpdateStrategy { @@ -10243,6 +10281,10 @@ func NewVariableBatchJobUpdateStrategy() *VariableBatchJobUpdateStrategy { func (p *VariableBatchJobUpdateStrategy) GetGroupSizes() []int32 { return p.GroupSizes } + +func (p *VariableBatchJobUpdateStrategy) GetAutopauseAfterBatch() bool { + return p.AutopauseAfterBatch +} func (p *VariableBatchJobUpdateStrategy) Read(iprot thrift.TProtocol) error { if _, err := iprot.ReadStructBegin(); err != nil { return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) @@ -10266,6 +10308,16 @@ func (p *VariableBatchJobUpdateStrategy) Read(iprot thrift.TProtocol) error { return err } } + case 2: + if fieldTypeId == thrift.BOOL { + if err := p.ReadField2(iprot); err != nil { + return err + } + } else { + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } default: if err := iprot.Skip(fieldTypeId); err != nil { return err @@ -10303,11 +10355,21 @@ var _elem25 int32 return nil } +func (p *VariableBatchJobUpdateStrategy) ReadField2(iprot thrift.TProtocol) error { + if v, err := iprot.ReadBool(); err != nil { + return thrift.PrependError("error reading field 2: ", err) +} else { + p.AutopauseAfterBatch = v +} + return nil +} + func (p *VariableBatchJobUpdateStrategy) Write(oprot thrift.TProtocol) error { if err := oprot.WriteStructBegin("VariableBatchJobUpdateStrategy"); err != nil { return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) } if p != nil { if err := p.writeField1(oprot); err != nil { return err } + if err := p.writeField2(oprot); err != nil { return err } } if err := oprot.WriteFieldStop(); err != nil { return thrift.PrependError("write field stop error: ", err) } @@ -10334,6 +10396,16 @@ func (p *VariableBatchJobUpdateStrategy) writeField1(oprot thrift.TProtocol) (er return err } +func (p *VariableBatchJobUpdateStrategy) writeField2(oprot thrift.TProtocol) (err error) { + if err := oprot.WriteFieldBegin("autopauseAfterBatch", thrift.BOOL, 2); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 2:autopauseAfterBatch: ", p), err) } + if err := oprot.WriteBool(bool(p.AutopauseAfterBatch)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.autopauseAfterBatch (2) field write error: ", p), err) } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 2:autopauseAfterBatch: ", p), err) } + return err +} + func (p *VariableBatchJobUpdateStrategy) String() string { if p == nil { return "" diff --git a/monitors.go b/monitors.go index 69167df..575e177 100644 --- a/monitors.go +++ b/monitors.go @@ -36,15 +36,9 @@ func (m *Monitor) JobUpdate( timeout int) (bool, error) { updateQ := aurora.JobUpdateQuery{ - Key: &updateKey, - Limit: 1, - UpdateStatuses: []aurora.JobUpdateStatus{ - aurora.JobUpdateStatus_ROLLED_FORWARD, - aurora.JobUpdateStatus_ROLLED_BACK, - aurora.JobUpdateStatus_ABORTED, - aurora.JobUpdateStatus_ERROR, - aurora.JobUpdateStatus_FAILED, - }, + Key: &updateKey, + Limit: 1, + UpdateStatuses: TerminalUpdateStates(), } updateSummaries, err := m.JobUpdateQuery( updateQ, @@ -75,22 +69,13 @@ func (m *Monitor) JobUpdate( } // JobUpdateStatus polls the scheduler every certain amount of time to see if the update has entered a specified state. -func (m *Monitor) JobUpdateStatus( - updateKey aurora.JobUpdateKey, - desiredStatuses map[aurora.JobUpdateStatus]bool, - interval time.Duration, - timeout time.Duration) (aurora.JobUpdateStatus, error) { - - desiredStatusesSlice := make([]aurora.JobUpdateStatus, 0) - - for k := range desiredStatuses { - desiredStatusesSlice = append(desiredStatusesSlice, k) - } - +func (m *Monitor) JobUpdateStatus(updateKey aurora.JobUpdateKey, + desiredStatuses []aurora.JobUpdateStatus, + interval, timeout time.Duration) (aurora.JobUpdateStatus, error) { updateQ := aurora.JobUpdateQuery{ Key: &updateKey, Limit: 1, - UpdateStatuses: desiredStatusesSlice, + UpdateStatuses: desiredStatuses, } summary, err := m.JobUpdateQuery(updateQ, interval, timeout) @@ -129,11 +114,74 @@ func (m *Monitor) JobUpdateQuery( } } -// Instances will monitor a Job until all instances enter one of the LIVE_STATES -func (m *Monitor) Instances( - key *aurora.JobKey, - instances int32, - interval, timeout int) (bool, error) { +// AutoPaused monitor is a special monitor for auto pause enabled batch updates. This monitor ensures that the update +// being monitored is capable of auto pausing and has auto pausing enabled. After verifying this information, +// the monitor watches for the job to enter the ROLL_FORWARD_PAUSED state and calculates the current batch +// the update is in using information from the update configuration. +func (m *Monitor) AutoPausedUpdateMonitor(key aurora.JobUpdateKey, interval, timeout time.Duration) (int, error) { + key.Job = &aurora.JobKey{ + Role: key.Job.Role, + Environment: key.Job.Environment, + Name: key.Job.Name, + } + query := aurora.JobUpdateQuery{ + UpdateStatuses: aurora.ACTIVE_JOB_UPDATE_STATES, + Limit: 1, + Key: &key, + } + + response, err := m.Client.JobUpdateDetails(query) + if err != nil { + return -1, errors.Wrap(err, "unable to get information about update") + } + + // TODO (rdelvalle): check for possible nil values when going down the list of structs + updateDetails := response.Result_.GetJobUpdateDetailsResult_.DetailsList + if len(updateDetails) == 0 { + return -1, errors.Errorf("details for update could not be found") + } + + updateStrategy := updateDetails[0].Update.Instructions.Settings.UpdateStrategy + + var batchSizes []int32 + switch { + case updateStrategy.IsSetVarBatchStrategy(): + batchSizes = updateStrategy.VarBatchStrategy.GroupSizes + if !updateStrategy.VarBatchStrategy.AutopauseAfterBatch { + return -1, errors.Errorf("update does not have auto pause enabled") + } + case updateStrategy.IsSetBatchStrategy(): + batchSizes = []int32{updateStrategy.BatchStrategy.GroupSize} + if !updateStrategy.BatchStrategy.AutopauseAfterBatch { + return -1, errors.Errorf("update does not have auto pause enabled") + } + default: + return -1, errors.Errorf("update is not using a batch update strategy") + } + + query.UpdateStatuses = append(TerminalUpdateStates(), aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED) + summary, err := m.JobUpdateQuery(query, interval, timeout) + if err != nil { + return -1, err + } + + if summary[0].State.Status != aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED { + return -1, errors.Errorf("update is in a terminal state %v", summary[0].State.Status) + } + + updatingInstances := make(map[int32]struct{}) + for _, e := range updateDetails[0].InstanceEvents { + // We only care about INSTANCE_UPDATING actions because we only care that they've been attempted + if e != nil && e.GetAction() == aurora.JobUpdateAction_INSTANCE_UPDATING { + updatingInstances[e.GetInstanceId()] = struct{}{} + } + } + + return calculateCurrentBatch(int32(len(updatingInstances)), batchSizes), nil +} + +// Monitor a Job until all instances enter one of the LIVE_STATES +func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval, timeout int) (bool, error) { return m.ScheduleStatus(key, instances, LiveStates, interval, timeout) } diff --git a/realis.go b/realis.go index c41016a..8f2767d 100644 --- a/realis.go +++ b/realis.go @@ -829,7 +829,7 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str m := Monitor{Client: r} _, err := m.JobUpdateStatus( updateKey, - map[aurora.JobUpdateStatus]bool{aurora.JobUpdateStatus_ABORTED: true}, + []aurora.JobUpdateStatus{aurora.JobUpdateStatus_ABORTED}, time.Second*5, time.Minute) diff --git a/realis_e2e_test.go b/realis_e2e_test.go index 187c262..999f2b9 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -983,3 +983,52 @@ func TestRealisClient_UpdateStrategies(t *testing.T) { }) } } + +func TestRealisClient_BatchAwareAutoPause(t *testing.T) { + // Create a single job + job := realis.NewJob(). + Environment("prod"). + Role("vagrant"). + Name("BatchAwareAutoPauseTest"). + ExecutorName(aurora.AURORA_EXECUTOR_NAME). + ExecutorData(string(thermosPayload)). + CPU(.01). + RAM(4). + Disk(10). + InstanceCount(6). + IsService(true) + updateGroups := []int32{1, 2, 3} + strategy := realis.NewDefaultUpdateJob(job.TaskConfig()). + VariableBatchStrategy(aurora.VariableBatchJobUpdateStrategy{ + GroupSizes: updateGroups, + AutopauseAfterBatch: true, + }). + InstanceCount(6). + WatchTime(1000) + + resp, err := r.StartJobUpdate(strategy, "") + require.NoError(t, err) + require.NotNil(t, resp) + require.NotNil(t, resp.GetResult_()) + require.NotNil(t, resp.GetResult_().GetStartJobUpdateResult_()) + require.NotNil(t, resp.GetResult_().GetStartJobUpdateResult_().GetKey()) + + key := *resp.GetResult_().GetStartJobUpdateResult_().GetKey() + + for i := range updateGroups { + curStep, mErr := monitor.AutoPausedUpdateMonitor(key, time.Second*5, time.Second*240) + if mErr != nil { + // Update may already be in a terminal state so don't check for error + _, err := r.AbortJobUpdate(key, "Monitor timed out.") + assert.NoError(t, err) + } + + assert.Equal(t, i, curStep) + + _, err = r.ResumeJobUpdate(&key, "auto resuming test") + require.NoError(t, err) + } + + _, err = r.KillJob(job.JobKey()) + assert.NoError(t, err) +} diff --git a/util.go b/util.go index 8431347..989f8e8 100644 --- a/util.go +++ b/util.go @@ -25,6 +25,18 @@ var TerminalStates = make(map[aurora.ScheduleStatus]bool) // ActiveJobUpdateStates - States a Job Update may be in where it is considered active. var ActiveJobUpdateStates = make(map[aurora.JobUpdateStatus]bool) +// TerminalJobUpdateStates returns a slice containing all the terminal states an update may end up in. +// This is a function in order to avoid having a slice that can be accidentally mutated. +func TerminalUpdateStates() []aurora.JobUpdateStatus { + return []aurora.JobUpdateStatus{ + aurora.JobUpdateStatus_ROLLED_FORWARD, + aurora.JobUpdateStatus_ROLLED_BACK, + aurora.JobUpdateStatus_ABORTED, + aurora.JobUpdateStatus_ERROR, + aurora.JobUpdateStatus_FAILED, + } +} + // AwaitingPulseJobUpdateStates - States a job update may be in where it is waiting for a pulse. var AwaitingPulseJobUpdateStates = make(map[aurora.JobUpdateStatus]bool) @@ -87,3 +99,22 @@ func validateAuroraURL(location string) (string, error) { return u.String(), nil } + +func calculateCurrentBatch(updatingInstances int32, batchSizes []int32) int { + for i, size := range batchSizes { + updatingInstances -= size + if updatingInstances <= 0 { + return i + } + } + + // Overflow batches + batchCount := len(batchSizes) - 1 + lastBatchIndex := len(batchSizes) - 1 + batchCount += int(updatingInstances / batchSizes[lastBatchIndex]) + + if updatingInstances%batchSizes[lastBatchIndex] != 0 { + batchCount++ + } + return batchCount +} diff --git a/util_test.go b/util_test.go index e015535..b8341b2 100644 --- a/util_test.go +++ b/util_test.go @@ -63,3 +63,40 @@ func TestAuroraURLValidator(t *testing.T) { assert.NoError(t, err) }) } + +func TestCurrentBatchCalculator(t *testing.T) { + t.Run("singleBatchOverflow", func(t *testing.T) { + curBatch := calculateCurrentBatch(10, []int32{2}) + assert.Equal(t, 4, curBatch) + }) + + t.Run("noInstancesUpdating", func(t *testing.T) { + curBatch := calculateCurrentBatch(0, []int32{2}) + assert.Equal(t, 0, curBatch) + }) + + t.Run("evenMatchSingleBatch", func(t *testing.T) { + curBatch := calculateCurrentBatch(2, []int32{2}) + assert.Equal(t, 0, curBatch) + }) + + t.Run("moreInstancesThanBatches", func(t *testing.T) { + curBatch := calculateCurrentBatch(5, []int32{1, 2}) + assert.Equal(t, 2, curBatch) + }) + + t.Run("moreInstancesThanBatchesDecreasing", func(t *testing.T) { + curBatch := calculateCurrentBatch(5, []int32{2, 1}) + assert.Equal(t, 3, curBatch) + }) + + t.Run("unevenFit", func(t *testing.T) { + curBatch := calculateCurrentBatch(2, []int32{1, 2}) + assert.Equal(t, 1, curBatch) + }) + + t.Run("halfWay", func(t *testing.T) { + curBatch := calculateCurrentBatch(1, []int32{1, 2}) + assert.Equal(t, 0, curBatch) + }) +}