diff --git a/examples/client.go b/examples/client.go index f0dcedf..4f153e5 100644 --- a/examples/client.go +++ b/examples/client.go @@ -451,6 +451,13 @@ func main() { } fmt.Println(resp.String()) + case "variableBatchStep": + step, err := r.VariableBatchStep(aurora.JobUpdateKey{Job: job.JobKey(), ID: updateId}) + if err != nil { + log.Fatal(err) + } + fmt.Println(step) + case "taskConfig": fmt.Println("Getting job info") live, err := r.GetInstanceIds(job.JobKey(), aurora.ACTIVE_STATES) diff --git a/realis.go b/realis.go index c41016a..bd0efb2 100644 --- a/realis.go +++ b/realis.go @@ -68,6 +68,7 @@ type Realis interface { RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) + VariableBatchStep(key aurora.JobUpdateKey) (int, error) PauseJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error) ResumeJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error) @@ -1059,3 +1060,50 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string } return resp, nil } + +// VariableBatchStep returns the current batch the update is in during a variable batch update. +func (r *realisClient) VariableBatchStep(key aurora.JobUpdateKey) (int, error) { + + // Query Aurora for an Update that is either paused or rolling forward + resp, err := r.JobUpdateDetails(aurora.JobUpdateQuery{ + UpdateStatuses: []aurora.JobUpdateStatus{ + aurora.JobUpdateStatus_ROLLING_FORWARD, + aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED, + }, + Key: &key, + Limit: 1, + }) + + if err != nil { + return 0, errors.Wrap(err, "unable to determine current batch the update is in") + } + + jobUpdates := response.JobUpdateDetails(resp) + if len(jobUpdates) == 0 { + return 0, errors.New("update provided could not be found in Aurora") + } + + strategy := jobUpdates[0].GetUpdate().GetInstructions().GetSettings().GetUpdateStrategy() + if !strategy.IsSetVarBatchStrategy() { + return 0, errors.New("update strategy used by update config is not supported by this function") + } + + // Using a map to create a set to guard against multiple INSTANCE_UPDATING events for the same shard. + updatingInstances := make(map[int32]struct{}) + for _, e := range jobUpdates[0].GetInstanceEvents() { + // We only care about INSTANCE_UPDATING actions because we only care that they've been attempted + if e.GetAction() == aurora.JobUpdateAction_INSTANCE_UPDATING { + updatingInstances[e.GetInstanceId()] = struct{}{} + } + } + + updatingInstancesCount := int32(len(updatingInstances)) + for i, groupSize := range strategy.GetVarBatchStrategy().GetGroupSizes() { + updatingInstancesCount -= groupSize + if updatingInstancesCount <= 0 { + return i, nil + } + } + + return 0, errors.New("unable to determine current batch the update is in") +}