Adding VariableBatchStep api which allows users to find the current step at which an update using the Variable Batch strategy is currently in.

This commit is contained in:
Renan DelValle 2019-08-29 18:06:43 -07:00
parent f43a257726
commit a4b89feb0e
No known key found for this signature in database
GPG key ID: C240AD6D6F443EC9
2 changed files with 55 additions and 0 deletions

View file

@ -451,6 +451,13 @@ func main() {
} }
fmt.Println(resp.String()) fmt.Println(resp.String())
case "variableBatchStep":
step, err := r.VariableBatchStep(aurora.JobUpdateKey{Job: job.JobKey(), ID: updateId})
if err != nil {
log.Fatal(err)
}
fmt.Println(step)
case "taskConfig": case "taskConfig":
fmt.Println("Getting job info") fmt.Println("Getting job info")
live, err := r.GetInstanceIds(job.JobKey(), aurora.ACTIVE_STATES) live, err := r.GetInstanceIds(job.JobKey(), aurora.ACTIVE_STATES)

View file

@ -68,6 +68,7 @@ type Realis interface {
RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error)
ScheduleCronJob(auroraJob Job) (*aurora.Response, error) ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error)
VariableBatchStep(key aurora.JobUpdateKey) (int, error)
PauseJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error) PauseJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error)
ResumeJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error) ResumeJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error)
@ -1059,3 +1060,50 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string
} }
return resp, nil return resp, nil
} }
// VariableBatchStep returns the current batch the update is in during a variable batch update.
func (r *realisClient) VariableBatchStep(key aurora.JobUpdateKey) (int, error) {
// Query Aurora for an Update that is either paused or rolling forward
resp, err := r.JobUpdateDetails(aurora.JobUpdateQuery{
UpdateStatuses: []aurora.JobUpdateStatus{
aurora.JobUpdateStatus_ROLLING_FORWARD,
aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED,
},
Key: &key,
Limit: 1,
})
if err != nil {
return 0, errors.Wrap(err, "unable to determine current batch the update is in")
}
jobUpdates := response.JobUpdateDetails(resp)
if len(jobUpdates) == 0 {
return 0, errors.New("update provided could not be found in Aurora")
}
strategy := jobUpdates[0].GetUpdate().GetInstructions().GetSettings().GetUpdateStrategy()
if !strategy.IsSetVarBatchStrategy() {
return 0, errors.New("update strategy used by update config is not supported by this function")
}
// Using a map to create a set to guard against multiple INSTANCE_UPDATING events for the same shard.
updatingInstances := make(map[int32]struct{})
for _, e := range jobUpdates[0].GetInstanceEvents() {
// We only care about INSTANCE_UPDATING actions because we only care that they've been attempted
if e.GetAction() == aurora.JobUpdateAction_INSTANCE_UPDATING {
updatingInstances[e.GetInstanceId()] = struct{}{}
}
}
updatingInstancesCount := int32(len(updatingInstances))
for i, groupSize := range strategy.GetVarBatchStrategy().GetGroupSizes() {
updatingInstancesCount -= groupSize
if updatingInstancesCount <= 0 {
return i, nil
}
}
return 0, errors.New("unable to determine current batch the update is in")
}