Integrating batch calculations into an AutoPause monitor which provides some guarantees to the user.
This commit is contained in:
parent
b73954cd7c
commit
a0fa99df8a
3 changed files with 60 additions and 64 deletions
70
monitors.go
70
monitors.go
|
@ -114,24 +114,70 @@ func (m *Monitor) JobUpdateQuery(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// PausedUpdateMonitor polls Aurora for information about a job update until the job update has entered into
|
// AutoPaused monitor is a special monitor for auto pause enabled batch updates. This monitor ensures that the update
|
||||||
// a ROLL_FORWARD_PAUSED state.
|
// being monitored is capable of auto pausing and has auto pausing enabled. After verifying this information,
|
||||||
func (m *Monitor) PausedUpdateMonitor(key aurora.JobUpdateKey, interval, timeout time.Duration) (bool, error) {
|
// the monitor watches for the job to enter the ROLL_FORWARD_PAUSED state and calculates the current batch
|
||||||
status, err := m.JobUpdateStatus(
|
// the update is in using information from the update configuration.
|
||||||
key,
|
func (m *Monitor) AutoPausedUpdateMonitor(key aurora.JobUpdateKey, interval, timeout time.Duration) (int, error) {
|
||||||
append(TerminalUpdateStates(), aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED),
|
key.Job = &aurora.JobKey{
|
||||||
interval,
|
Role: key.Job.Role,
|
||||||
timeout)
|
Environment: key.Job.Environment,
|
||||||
|
Name: key.Job.Name,
|
||||||
|
}
|
||||||
|
query := aurora.JobUpdateQuery{
|
||||||
|
UpdateStatuses: aurora.ACTIVE_JOB_UPDATE_STATES,
|
||||||
|
Limit: 1,
|
||||||
|
Key: &key,
|
||||||
|
}
|
||||||
|
|
||||||
|
response, err := m.Client.JobUpdateDetails(query)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return -1, errors.Wrap(err, "unable to get information about update")
|
||||||
}
|
}
|
||||||
|
|
||||||
if status != aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED {
|
// TODO (rdelvalle): check for possible nil values when going down the list of structs
|
||||||
return false, errors.Errorf("update is in a terminal state %v", status)
|
updateDetails := response.Result_.GetJobUpdateDetailsResult_.DetailsList
|
||||||
|
if len(updateDetails) == 0 {
|
||||||
|
return -1, errors.Errorf("details for update could not be found")
|
||||||
}
|
}
|
||||||
|
|
||||||
return true, nil
|
updateStrategy := updateDetails[0].Update.Instructions.Settings.UpdateStrategy
|
||||||
|
|
||||||
|
var batchSizes []int32
|
||||||
|
switch {
|
||||||
|
case updateStrategy.IsSetVarBatchStrategy():
|
||||||
|
batchSizes = updateStrategy.VarBatchStrategy.GroupSizes
|
||||||
|
if !updateStrategy.VarBatchStrategy.AutopauseAfterBatch {
|
||||||
|
return -1, errors.Errorf("update does not have auto pause enabled")
|
||||||
|
}
|
||||||
|
case updateStrategy.IsSetBatchStrategy():
|
||||||
|
batchSizes = []int32{updateStrategy.BatchStrategy.GroupSize}
|
||||||
|
if !updateStrategy.BatchStrategy.AutopauseAfterBatch {
|
||||||
|
return -1, errors.Errorf("update does not have auto pause enabled")
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return -1, errors.Errorf("update is not using a batch update strategy")
|
||||||
|
}
|
||||||
|
|
||||||
|
query.UpdateStatuses = append(TerminalUpdateStates(), aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED)
|
||||||
|
summary, err := m.JobUpdateQuery(query, interval, timeout)
|
||||||
|
if err != nil {
|
||||||
|
return -1, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if summary[0].State.Status != aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED {
|
||||||
|
return -1, errors.Errorf("update is in a terminal state %v", summary[0].State.Status)
|
||||||
|
}
|
||||||
|
|
||||||
|
updatingInstances := make(map[int32]struct{})
|
||||||
|
for _, e := range updateDetails[0].InstanceEvents {
|
||||||
|
// We only care about INSTANCE_UPDATING actions because we only care that they've been attempted
|
||||||
|
if e != nil && e.GetAction() == aurora.JobUpdateAction_INSTANCE_UPDATING {
|
||||||
|
updatingInstances[e.GetInstanceId()] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return calculateCurrentBatch(int32(len(updatingInstances)), batchSizes), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Monitor a Job until all instances enter one of the LIVE_STATES
|
// Monitor a Job until all instances enter one of the LIVE_STATES
|
||||||
|
|
47
realis.go
47
realis.go
|
@ -68,7 +68,6 @@ type Realis interface {
|
||||||
RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error)
|
RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error)
|
||||||
ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
|
ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
|
||||||
StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error)
|
StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error)
|
||||||
VariableBatchStep(key aurora.JobUpdateKey) (int, error)
|
|
||||||
|
|
||||||
PauseJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error)
|
PauseJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error)
|
||||||
ResumeJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error)
|
ResumeJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error)
|
||||||
|
@ -1060,49 +1059,3 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string
|
||||||
}
|
}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// VariableBatchStep returns the current batch the update is in during a variable batch update.
|
|
||||||
func (r *realisClient) VariableBatchStep(key aurora.JobUpdateKey) (int, error) {
|
|
||||||
// Query Aurora for an Update that is either paused or rolling forward
|
|
||||||
resp, err := r.JobUpdateDetails(aurora.JobUpdateQuery{
|
|
||||||
UpdateStatuses: []aurora.JobUpdateStatus{
|
|
||||||
aurora.JobUpdateStatus_ROLLING_FORWARD,
|
|
||||||
aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED,
|
|
||||||
},
|
|
||||||
Key: &key,
|
|
||||||
Limit: 1,
|
|
||||||
})
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return 0, errors.Wrap(err, "unable to determine current batch the update is in")
|
|
||||||
}
|
|
||||||
|
|
||||||
jobUpdates := response.JobUpdateDetails(resp)
|
|
||||||
if len(jobUpdates) == 0 {
|
|
||||||
return 0, errors.New("update provided could not be found in Aurora")
|
|
||||||
}
|
|
||||||
|
|
||||||
strategy := jobUpdates[0].GetUpdate().GetInstructions().GetSettings().GetUpdateStrategy()
|
|
||||||
if !strategy.IsSetVarBatchStrategy() {
|
|
||||||
return 0, errors.New("update strategy used by update config is not supported by this function")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Using a map to create a set to guard against multiple INSTANCE_UPDATING events for the same shard.
|
|
||||||
updatingInstances := make(map[int32]struct{})
|
|
||||||
for _, e := range jobUpdates[0].GetInstanceEvents() {
|
|
||||||
// We only care about INSTANCE_UPDATING actions because we only care that they've been attempted
|
|
||||||
if e.GetAction() == aurora.JobUpdateAction_INSTANCE_UPDATING {
|
|
||||||
updatingInstances[e.GetInstanceId()] = struct{}{}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
updatingInstancesCount := int32(len(updatingInstances))
|
|
||||||
for i, groupSize := range strategy.GetVarBatchStrategy().GetGroupSizes() {
|
|
||||||
updatingInstancesCount -= groupSize
|
|
||||||
if updatingInstancesCount <= 0 {
|
|
||||||
return i, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0, errors.New("unable to determine current batch the update is in")
|
|
||||||
}
|
|
||||||
|
|
|
@ -1013,19 +1013,16 @@ func TestRealisClient_BatchAwareAutoPause(t *testing.T) {
|
||||||
require.NotNil(t, resp.GetResult_().GetStartJobUpdateResult_())
|
require.NotNil(t, resp.GetResult_().GetStartJobUpdateResult_())
|
||||||
require.NotNil(t, resp.GetResult_().GetStartJobUpdateResult_().GetKey())
|
require.NotNil(t, resp.GetResult_().GetStartJobUpdateResult_().GetKey())
|
||||||
|
|
||||||
var ok bool
|
|
||||||
var mErr error
|
|
||||||
key := *resp.GetResult_().GetStartJobUpdateResult_().GetKey()
|
key := *resp.GetResult_().GetStartJobUpdateResult_().GetKey()
|
||||||
|
|
||||||
for i := range updateGroups {
|
for i := range updateGroups {
|
||||||
if ok, mErr = monitor.PausedUpdateMonitor(key, time.Second*5, time.Second*240); !ok || mErr != nil {
|
curStep, mErr := monitor.AutoPausedUpdateMonitor(key, time.Second*5, time.Second*240)
|
||||||
|
if mErr != nil {
|
||||||
// Update may already be in a terminal state so don't check for error
|
// Update may already be in a terminal state so don't check for error
|
||||||
_, err := r.AbortJobUpdate(key, "Monitor timed out.")
|
_, err := r.AbortJobUpdate(key, "Monitor timed out.")
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
curStep, err := r.VariableBatchStep(key)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.Equal(t, i, curStep)
|
assert.Equal(t, i, curStep)
|
||||||
|
|
||||||
_, err = r.ResumeJobUpdate(&key, "auto resuming test")
|
_, err = r.ResumeJobUpdate(&key, "auto resuming test")
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue