diff --git a/monitors.go b/monitors.go index 4e45cbe..575e177 100644 --- a/monitors.go +++ b/monitors.go @@ -114,24 +114,70 @@ func (m *Monitor) JobUpdateQuery( } } -// PausedUpdateMonitor polls Aurora for information about a job update until the job update has entered into -// a ROLL_FORWARD_PAUSED state. -func (m *Monitor) PausedUpdateMonitor(key aurora.JobUpdateKey, interval, timeout time.Duration) (bool, error) { - status, err := m.JobUpdateStatus( - key, - append(TerminalUpdateStates(), aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED), - interval, - timeout) +// AutoPaused monitor is a special monitor for auto pause enabled batch updates. This monitor ensures that the update +// being monitored is capable of auto pausing and has auto pausing enabled. After verifying this information, +// the monitor watches for the job to enter the ROLL_FORWARD_PAUSED state and calculates the current batch +// the update is in using information from the update configuration. +func (m *Monitor) AutoPausedUpdateMonitor(key aurora.JobUpdateKey, interval, timeout time.Duration) (int, error) { + key.Job = &aurora.JobKey{ + Role: key.Job.Role, + Environment: key.Job.Environment, + Name: key.Job.Name, + } + query := aurora.JobUpdateQuery{ + UpdateStatuses: aurora.ACTIVE_JOB_UPDATE_STATES, + Limit: 1, + Key: &key, + } + response, err := m.Client.JobUpdateDetails(query) if err != nil { - return false, err + return -1, errors.Wrap(err, "unable to get information about update") } - if status != aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED { - return false, errors.Errorf("update is in a terminal state %v", status) + // TODO (rdelvalle): check for possible nil values when going down the list of structs + updateDetails := response.Result_.GetJobUpdateDetailsResult_.DetailsList + if len(updateDetails) == 0 { + return -1, errors.Errorf("details for update could not be found") } - return true, nil + updateStrategy := updateDetails[0].Update.Instructions.Settings.UpdateStrategy + + var batchSizes []int32 + switch { + case updateStrategy.IsSetVarBatchStrategy(): + batchSizes = updateStrategy.VarBatchStrategy.GroupSizes + if !updateStrategy.VarBatchStrategy.AutopauseAfterBatch { + return -1, errors.Errorf("update does not have auto pause enabled") + } + case updateStrategy.IsSetBatchStrategy(): + batchSizes = []int32{updateStrategy.BatchStrategy.GroupSize} + if !updateStrategy.BatchStrategy.AutopauseAfterBatch { + return -1, errors.Errorf("update does not have auto pause enabled") + } + default: + return -1, errors.Errorf("update is not using a batch update strategy") + } + + query.UpdateStatuses = append(TerminalUpdateStates(), aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED) + summary, err := m.JobUpdateQuery(query, interval, timeout) + if err != nil { + return -1, err + } + + if summary[0].State.Status != aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED { + return -1, errors.Errorf("update is in a terminal state %v", summary[0].State.Status) + } + + updatingInstances := make(map[int32]struct{}) + for _, e := range updateDetails[0].InstanceEvents { + // We only care about INSTANCE_UPDATING actions because we only care that they've been attempted + if e != nil && e.GetAction() == aurora.JobUpdateAction_INSTANCE_UPDATING { + updatingInstances[e.GetInstanceId()] = struct{}{} + } + } + + return calculateCurrentBatch(int32(len(updatingInstances)), batchSizes), nil } // Monitor a Job until all instances enter one of the LIVE_STATES diff --git a/realis.go b/realis.go index 28127b2..8f2767d 100644 --- a/realis.go +++ b/realis.go @@ -68,7 +68,6 @@ type Realis interface { RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) - VariableBatchStep(key aurora.JobUpdateKey) (int, error) PauseJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error) ResumeJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error) @@ -1060,49 +1059,3 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string } return resp, nil } - -// VariableBatchStep returns the current batch the update is in during a variable batch update. -func (r *realisClient) VariableBatchStep(key aurora.JobUpdateKey) (int, error) { - // Query Aurora for an Update that is either paused or rolling forward - resp, err := r.JobUpdateDetails(aurora.JobUpdateQuery{ - UpdateStatuses: []aurora.JobUpdateStatus{ - aurora.JobUpdateStatus_ROLLING_FORWARD, - aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED, - }, - Key: &key, - Limit: 1, - }) - - if err != nil { - return 0, errors.Wrap(err, "unable to determine current batch the update is in") - } - - jobUpdates := response.JobUpdateDetails(resp) - if len(jobUpdates) == 0 { - return 0, errors.New("update provided could not be found in Aurora") - } - - strategy := jobUpdates[0].GetUpdate().GetInstructions().GetSettings().GetUpdateStrategy() - if !strategy.IsSetVarBatchStrategy() { - return 0, errors.New("update strategy used by update config is not supported by this function") - } - - // Using a map to create a set to guard against multiple INSTANCE_UPDATING events for the same shard. - updatingInstances := make(map[int32]struct{}) - for _, e := range jobUpdates[0].GetInstanceEvents() { - // We only care about INSTANCE_UPDATING actions because we only care that they've been attempted - if e.GetAction() == aurora.JobUpdateAction_INSTANCE_UPDATING { - updatingInstances[e.GetInstanceId()] = struct{}{} - } - } - - updatingInstancesCount := int32(len(updatingInstances)) - for i, groupSize := range strategy.GetVarBatchStrategy().GetGroupSizes() { - updatingInstancesCount -= groupSize - if updatingInstancesCount <= 0 { - return i, nil - } - } - - return 0, errors.New("unable to determine current batch the update is in") -} diff --git a/realis_e2e_test.go b/realis_e2e_test.go index 54c1939..999f2b9 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -1013,19 +1013,16 @@ func TestRealisClient_BatchAwareAutoPause(t *testing.T) { require.NotNil(t, resp.GetResult_().GetStartJobUpdateResult_()) require.NotNil(t, resp.GetResult_().GetStartJobUpdateResult_().GetKey()) - var ok bool - var mErr error key := *resp.GetResult_().GetStartJobUpdateResult_().GetKey() for i := range updateGroups { - if ok, mErr = monitor.PausedUpdateMonitor(key, time.Second*5, time.Second*240); !ok || mErr != nil { + curStep, mErr := monitor.AutoPausedUpdateMonitor(key, time.Second*5, time.Second*240) + if mErr != nil { // Update may already be in a terminal state so don't check for error _, err := r.AbortJobUpdate(key, "Monitor timed out.") assert.NoError(t, err) } - curStep, err := r.VariableBatchStep(key) - assert.NoError(t, err) assert.Equal(t, i, curStep) _, err = r.ResumeJobUpdate(&key, "auto resuming test")