Adding autopause APIs to future (#110)

* Updating thrift definitions to add autopause for batch based update strategies.

* Adding batch calculator utility and test cases for it.

* Adding PauseUpdateMonitor which allows users to poll Aurora for information on an active Update being carried out until it enters the ROLL_FORWARD_PAUSED state.

* Tests for PauseUpdateMonitor and VariableBatchStep added to the end to end tests.

* Adding TerminalUpdateStates function which returns a slice containing all terminal states for an update. Changed signature of JobUpdateStatus from using a map for desired states to a slice. A map is no longer necessary with the new version of thrift and only adds complexity.
This commit is contained in:
Renan I. Del Valle 2020-01-13 16:03:40 -08:00
parent fe692040aa
commit 976dc26dcc
9 changed files with 285 additions and 28 deletions

View file

@ -3,6 +3,15 @@
* CreateService and StartJobUpdate do not continue retrying if a timeout has been encountered * CreateService and StartJobUpdate do not continue retrying if a timeout has been encountered
by the HTTP client. Instead they now return an error that conforms to the Timedout interface. by the HTTP client. Instead they now return an error that conforms to the Timedout interface.
Users can check for a Timedout error by using `realis.IsTimeout(err)`. Users can check for a Timedout error by using `realis.IsTimeout(err)`.
* New API function VariableBatchStep has been added which returns the current batch at which
a Variable Batch Update configured Update is currently in.
* Added new PauseUpdateMonitor which monitors an update until it is an `ROLL_FORWARD_PAUSED` state.
* Added variableBatchStep command to sample client to be used for testing new VariableBatchStep api.
* JobUpdateStatus has changed function signature from:
`JobUpdateStatus(updateKey aurora.JobUpdateKey, desiredStatuses map[aurora.JobUpdateStatus]bool, interval, timeout time.Duration) (aurora.JobUpdateStatus, error)`
to
`JobUpdateStatus(updateKey aurora.JobUpdateKey, desiredStatuses []aurora.JobUpdateStatus, interval, timeout time.Duration) (aurora.JobUpdateStatus, error)`
* Added TerminalUpdateStates function which returns an slice containing all UpdateStates which are considered terminal states.
1.21.0 1.21.0

View file

@ -726,6 +726,8 @@ struct QueueJobUpdateStrategy {
*/ */
struct BatchJobUpdateStrategy { struct BatchJobUpdateStrategy {
1: i32 groupSize 1: i32 groupSize
/* Update will pause automatically after each batch completes */
2: bool autopauseAfterBatch
} }
/** Same as Batch strategy but each time an active group completes, the size of the next active /** Same as Batch strategy but each time an active group completes, the size of the next active
@ -733,6 +735,8 @@ struct BatchJobUpdateStrategy {
*/ */
struct VariableBatchJobUpdateStrategy { struct VariableBatchJobUpdateStrategy {
1: list<i32> groupSizes 1: list<i32> groupSizes
/* Update will pause automatically after each batch completes */
2: bool autopauseAfterBatch
} }
union JobUpdateStrategy { union JobUpdateStrategy {

View file

@ -451,6 +451,13 @@ func main() {
} }
fmt.Println(resp.String()) fmt.Println(resp.String())
case "variableBatchStep":
step, err := r.VariableBatchStep(aurora.JobUpdateKey{Job: job.JobKey(), ID: updateId})
if err != nil {
log.Fatal(err)
}
fmt.Println(step)
case "taskConfig": case "taskConfig":
fmt.Println("Getting job info") fmt.Println("Getting job info")
live, err := r.GetInstanceIds(job.JobKey(), aurora.ACTIVE_STATES) live, err := r.GetInstanceIds(job.JobKey(), aurora.ACTIVE_STATES)

View file

@ -10137,8 +10137,10 @@ func (p *QueueJobUpdateStrategy) String() string {
// //
// Attributes: // Attributes:
// - GroupSize // - GroupSize
// - AutopauseAfterBatch
type BatchJobUpdateStrategy struct { type BatchJobUpdateStrategy struct {
GroupSize int32 `thrift:"groupSize,1" db:"groupSize" json:"groupSize"` GroupSize int32 `thrift:"groupSize,1" db:"groupSize" json:"groupSize"`
AutopauseAfterBatch bool `thrift:"autopauseAfterBatch,2" db:"autopauseAfterBatch" json:"autopauseAfterBatch"`
} }
func NewBatchJobUpdateStrategy() *BatchJobUpdateStrategy { func NewBatchJobUpdateStrategy() *BatchJobUpdateStrategy {
@ -10149,6 +10151,10 @@ func NewBatchJobUpdateStrategy() *BatchJobUpdateStrategy {
func (p *BatchJobUpdateStrategy) GetGroupSize() int32 { func (p *BatchJobUpdateStrategy) GetGroupSize() int32 {
return p.GroupSize return p.GroupSize
} }
func (p *BatchJobUpdateStrategy) GetAutopauseAfterBatch() bool {
return p.AutopauseAfterBatch
}
func (p *BatchJobUpdateStrategy) Read(iprot thrift.TProtocol) error { func (p *BatchJobUpdateStrategy) Read(iprot thrift.TProtocol) error {
if _, err := iprot.ReadStructBegin(); err != nil { if _, err := iprot.ReadStructBegin(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err)
@ -10172,6 +10178,16 @@ func (p *BatchJobUpdateStrategy) Read(iprot thrift.TProtocol) error {
return err return err
} }
} }
case 2:
if fieldTypeId == thrift.BOOL {
if err := p.ReadField2(iprot); err != nil {
return err
}
} else {
if err := iprot.Skip(fieldTypeId); err != nil {
return err
}
}
default: default:
if err := iprot.Skip(fieldTypeId); err != nil { if err := iprot.Skip(fieldTypeId); err != nil {
return err return err
@ -10196,11 +10212,21 @@ func (p *BatchJobUpdateStrategy) ReadField1(iprot thrift.TProtocol) error {
return nil return nil
} }
func (p *BatchJobUpdateStrategy) ReadField2(iprot thrift.TProtocol) error {
if v, err := iprot.ReadBool(); err != nil {
return thrift.PrependError("error reading field 2: ", err)
} else {
p.AutopauseAfterBatch = v
}
return nil
}
func (p *BatchJobUpdateStrategy) Write(oprot thrift.TProtocol) error { func (p *BatchJobUpdateStrategy) Write(oprot thrift.TProtocol) error {
if err := oprot.WriteStructBegin("BatchJobUpdateStrategy"); err != nil { if err := oprot.WriteStructBegin("BatchJobUpdateStrategy"); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) } return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) }
if p != nil { if p != nil {
if err := p.writeField1(oprot); err != nil { return err } if err := p.writeField1(oprot); err != nil { return err }
if err := p.writeField2(oprot); err != nil { return err }
} }
if err := oprot.WriteFieldStop(); err != nil { if err := oprot.WriteFieldStop(); err != nil {
return thrift.PrependError("write field stop error: ", err) } return thrift.PrependError("write field stop error: ", err) }
@ -10219,6 +10245,16 @@ func (p *BatchJobUpdateStrategy) writeField1(oprot thrift.TProtocol) (err error)
return err return err
} }
func (p *BatchJobUpdateStrategy) writeField2(oprot thrift.TProtocol) (err error) {
if err := oprot.WriteFieldBegin("autopauseAfterBatch", thrift.BOOL, 2); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field begin error 2:autopauseAfterBatch: ", p), err) }
if err := oprot.WriteBool(bool(p.AutopauseAfterBatch)); err != nil {
return thrift.PrependError(fmt.Sprintf("%T.autopauseAfterBatch (2) field write error: ", p), err) }
if err := oprot.WriteFieldEnd(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field end error 2:autopauseAfterBatch: ", p), err) }
return err
}
func (p *BatchJobUpdateStrategy) String() string { func (p *BatchJobUpdateStrategy) String() string {
if p == nil { if p == nil {
return "<nil>" return "<nil>"
@ -10231,8 +10267,10 @@ func (p *BatchJobUpdateStrategy) String() string {
// //
// Attributes: // Attributes:
// - GroupSizes // - GroupSizes
// - AutopauseAfterBatch
type VariableBatchJobUpdateStrategy struct { type VariableBatchJobUpdateStrategy struct {
GroupSizes []int32 `thrift:"groupSizes,1" db:"groupSizes" json:"groupSizes"` GroupSizes []int32 `thrift:"groupSizes,1" db:"groupSizes" json:"groupSizes"`
AutopauseAfterBatch bool `thrift:"autopauseAfterBatch,2" db:"autopauseAfterBatch" json:"autopauseAfterBatch"`
} }
func NewVariableBatchJobUpdateStrategy() *VariableBatchJobUpdateStrategy { func NewVariableBatchJobUpdateStrategy() *VariableBatchJobUpdateStrategy {
@ -10243,6 +10281,10 @@ func NewVariableBatchJobUpdateStrategy() *VariableBatchJobUpdateStrategy {
func (p *VariableBatchJobUpdateStrategy) GetGroupSizes() []int32 { func (p *VariableBatchJobUpdateStrategy) GetGroupSizes() []int32 {
return p.GroupSizes return p.GroupSizes
} }
func (p *VariableBatchJobUpdateStrategy) GetAutopauseAfterBatch() bool {
return p.AutopauseAfterBatch
}
func (p *VariableBatchJobUpdateStrategy) Read(iprot thrift.TProtocol) error { func (p *VariableBatchJobUpdateStrategy) Read(iprot thrift.TProtocol) error {
if _, err := iprot.ReadStructBegin(); err != nil { if _, err := iprot.ReadStructBegin(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err)
@ -10266,6 +10308,16 @@ func (p *VariableBatchJobUpdateStrategy) Read(iprot thrift.TProtocol) error {
return err return err
} }
} }
case 2:
if fieldTypeId == thrift.BOOL {
if err := p.ReadField2(iprot); err != nil {
return err
}
} else {
if err := iprot.Skip(fieldTypeId); err != nil {
return err
}
}
default: default:
if err := iprot.Skip(fieldTypeId); err != nil { if err := iprot.Skip(fieldTypeId); err != nil {
return err return err
@ -10303,11 +10355,21 @@ var _elem25 int32
return nil return nil
} }
func (p *VariableBatchJobUpdateStrategy) ReadField2(iprot thrift.TProtocol) error {
if v, err := iprot.ReadBool(); err != nil {
return thrift.PrependError("error reading field 2: ", err)
} else {
p.AutopauseAfterBatch = v
}
return nil
}
func (p *VariableBatchJobUpdateStrategy) Write(oprot thrift.TProtocol) error { func (p *VariableBatchJobUpdateStrategy) Write(oprot thrift.TProtocol) error {
if err := oprot.WriteStructBegin("VariableBatchJobUpdateStrategy"); err != nil { if err := oprot.WriteStructBegin("VariableBatchJobUpdateStrategy"); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) } return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) }
if p != nil { if p != nil {
if err := p.writeField1(oprot); err != nil { return err } if err := p.writeField1(oprot); err != nil { return err }
if err := p.writeField2(oprot); err != nil { return err }
} }
if err := oprot.WriteFieldStop(); err != nil { if err := oprot.WriteFieldStop(); err != nil {
return thrift.PrependError("write field stop error: ", err) } return thrift.PrependError("write field stop error: ", err) }
@ -10334,6 +10396,16 @@ func (p *VariableBatchJobUpdateStrategy) writeField1(oprot thrift.TProtocol) (er
return err return err
} }
func (p *VariableBatchJobUpdateStrategy) writeField2(oprot thrift.TProtocol) (err error) {
if err := oprot.WriteFieldBegin("autopauseAfterBatch", thrift.BOOL, 2); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field begin error 2:autopauseAfterBatch: ", p), err) }
if err := oprot.WriteBool(bool(p.AutopauseAfterBatch)); err != nil {
return thrift.PrependError(fmt.Sprintf("%T.autopauseAfterBatch (2) field write error: ", p), err) }
if err := oprot.WriteFieldEnd(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field end error 2:autopauseAfterBatch: ", p), err) }
return err
}
func (p *VariableBatchJobUpdateStrategy) String() string { func (p *VariableBatchJobUpdateStrategy) String() string {
if p == nil { if p == nil {
return "<nil>" return "<nil>"

View file

@ -36,15 +36,9 @@ func (m *Monitor) JobUpdate(
timeout int) (bool, error) { timeout int) (bool, error) {
updateQ := aurora.JobUpdateQuery{ updateQ := aurora.JobUpdateQuery{
Key: &updateKey, Key: &updateKey,
Limit: 1, Limit: 1,
UpdateStatuses: []aurora.JobUpdateStatus{ UpdateStatuses: TerminalUpdateStates(),
aurora.JobUpdateStatus_ROLLED_FORWARD,
aurora.JobUpdateStatus_ROLLED_BACK,
aurora.JobUpdateStatus_ABORTED,
aurora.JobUpdateStatus_ERROR,
aurora.JobUpdateStatus_FAILED,
},
} }
updateSummaries, err := m.JobUpdateQuery( updateSummaries, err := m.JobUpdateQuery(
updateQ, updateQ,
@ -75,22 +69,13 @@ func (m *Monitor) JobUpdate(
} }
// JobUpdateStatus polls the scheduler every certain amount of time to see if the update has entered a specified state. // JobUpdateStatus polls the scheduler every certain amount of time to see if the update has entered a specified state.
func (m *Monitor) JobUpdateStatus( func (m *Monitor) JobUpdateStatus(updateKey aurora.JobUpdateKey,
updateKey aurora.JobUpdateKey, desiredStatuses []aurora.JobUpdateStatus,
desiredStatuses map[aurora.JobUpdateStatus]bool, interval, timeout time.Duration) (aurora.JobUpdateStatus, error) {
interval time.Duration,
timeout time.Duration) (aurora.JobUpdateStatus, error) {
desiredStatusesSlice := make([]aurora.JobUpdateStatus, 0)
for k := range desiredStatuses {
desiredStatusesSlice = append(desiredStatusesSlice, k)
}
updateQ := aurora.JobUpdateQuery{ updateQ := aurora.JobUpdateQuery{
Key: &updateKey, Key: &updateKey,
Limit: 1, Limit: 1,
UpdateStatuses: desiredStatusesSlice, UpdateStatuses: desiredStatuses,
} }
summary, err := m.JobUpdateQuery(updateQ, interval, timeout) summary, err := m.JobUpdateQuery(updateQ, interval, timeout)
@ -129,11 +114,74 @@ func (m *Monitor) JobUpdateQuery(
} }
} }
// Instances will monitor a Job until all instances enter one of the LIVE_STATES // AutoPaused monitor is a special monitor for auto pause enabled batch updates. This monitor ensures that the update
func (m *Monitor) Instances( // being monitored is capable of auto pausing and has auto pausing enabled. After verifying this information,
key *aurora.JobKey, // the monitor watches for the job to enter the ROLL_FORWARD_PAUSED state and calculates the current batch
instances int32, // the update is in using information from the update configuration.
interval, timeout int) (bool, error) { func (m *Monitor) AutoPausedUpdateMonitor(key aurora.JobUpdateKey, interval, timeout time.Duration) (int, error) {
key.Job = &aurora.JobKey{
Role: key.Job.Role,
Environment: key.Job.Environment,
Name: key.Job.Name,
}
query := aurora.JobUpdateQuery{
UpdateStatuses: aurora.ACTIVE_JOB_UPDATE_STATES,
Limit: 1,
Key: &key,
}
response, err := m.Client.JobUpdateDetails(query)
if err != nil {
return -1, errors.Wrap(err, "unable to get information about update")
}
// TODO (rdelvalle): check for possible nil values when going down the list of structs
updateDetails := response.Result_.GetJobUpdateDetailsResult_.DetailsList
if len(updateDetails) == 0 {
return -1, errors.Errorf("details for update could not be found")
}
updateStrategy := updateDetails[0].Update.Instructions.Settings.UpdateStrategy
var batchSizes []int32
switch {
case updateStrategy.IsSetVarBatchStrategy():
batchSizes = updateStrategy.VarBatchStrategy.GroupSizes
if !updateStrategy.VarBatchStrategy.AutopauseAfterBatch {
return -1, errors.Errorf("update does not have auto pause enabled")
}
case updateStrategy.IsSetBatchStrategy():
batchSizes = []int32{updateStrategy.BatchStrategy.GroupSize}
if !updateStrategy.BatchStrategy.AutopauseAfterBatch {
return -1, errors.Errorf("update does not have auto pause enabled")
}
default:
return -1, errors.Errorf("update is not using a batch update strategy")
}
query.UpdateStatuses = append(TerminalUpdateStates(), aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED)
summary, err := m.JobUpdateQuery(query, interval, timeout)
if err != nil {
return -1, err
}
if summary[0].State.Status != aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED {
return -1, errors.Errorf("update is in a terminal state %v", summary[0].State.Status)
}
updatingInstances := make(map[int32]struct{})
for _, e := range updateDetails[0].InstanceEvents {
// We only care about INSTANCE_UPDATING actions because we only care that they've been attempted
if e != nil && e.GetAction() == aurora.JobUpdateAction_INSTANCE_UPDATING {
updatingInstances[e.GetInstanceId()] = struct{}{}
}
}
return calculateCurrentBatch(int32(len(updatingInstances)), batchSizes), nil
}
// Monitor a Job until all instances enter one of the LIVE_STATES
func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval, timeout int) (bool, error) {
return m.ScheduleStatus(key, instances, LiveStates, interval, timeout) return m.ScheduleStatus(key, instances, LiveStates, interval, timeout)
} }

View file

@ -829,7 +829,7 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str
m := Monitor{Client: r} m := Monitor{Client: r}
_, err := m.JobUpdateStatus( _, err := m.JobUpdateStatus(
updateKey, updateKey,
map[aurora.JobUpdateStatus]bool{aurora.JobUpdateStatus_ABORTED: true}, []aurora.JobUpdateStatus{aurora.JobUpdateStatus_ABORTED},
time.Second*5, time.Second*5,
time.Minute) time.Minute)

View file

@ -983,3 +983,52 @@ func TestRealisClient_UpdateStrategies(t *testing.T) {
}) })
} }
} }
func TestRealisClient_BatchAwareAutoPause(t *testing.T) {
// Create a single job
job := realis.NewJob().
Environment("prod").
Role("vagrant").
Name("BatchAwareAutoPauseTest").
ExecutorName(aurora.AURORA_EXECUTOR_NAME).
ExecutorData(string(thermosPayload)).
CPU(.01).
RAM(4).
Disk(10).
InstanceCount(6).
IsService(true)
updateGroups := []int32{1, 2, 3}
strategy := realis.NewDefaultUpdateJob(job.TaskConfig()).
VariableBatchStrategy(aurora.VariableBatchJobUpdateStrategy{
GroupSizes: updateGroups,
AutopauseAfterBatch: true,
}).
InstanceCount(6).
WatchTime(1000)
resp, err := r.StartJobUpdate(strategy, "")
require.NoError(t, err)
require.NotNil(t, resp)
require.NotNil(t, resp.GetResult_())
require.NotNil(t, resp.GetResult_().GetStartJobUpdateResult_())
require.NotNil(t, resp.GetResult_().GetStartJobUpdateResult_().GetKey())
key := *resp.GetResult_().GetStartJobUpdateResult_().GetKey()
for i := range updateGroups {
curStep, mErr := monitor.AutoPausedUpdateMonitor(key, time.Second*5, time.Second*240)
if mErr != nil {
// Update may already be in a terminal state so don't check for error
_, err := r.AbortJobUpdate(key, "Monitor timed out.")
assert.NoError(t, err)
}
assert.Equal(t, i, curStep)
_, err = r.ResumeJobUpdate(&key, "auto resuming test")
require.NoError(t, err)
}
_, err = r.KillJob(job.JobKey())
assert.NoError(t, err)
}

31
util.go
View file

@ -25,6 +25,18 @@ var TerminalStates = make(map[aurora.ScheduleStatus]bool)
// ActiveJobUpdateStates - States a Job Update may be in where it is considered active. // ActiveJobUpdateStates - States a Job Update may be in where it is considered active.
var ActiveJobUpdateStates = make(map[aurora.JobUpdateStatus]bool) var ActiveJobUpdateStates = make(map[aurora.JobUpdateStatus]bool)
// TerminalJobUpdateStates returns a slice containing all the terminal states an update may end up in.
// This is a function in order to avoid having a slice that can be accidentally mutated.
func TerminalUpdateStates() []aurora.JobUpdateStatus {
return []aurora.JobUpdateStatus{
aurora.JobUpdateStatus_ROLLED_FORWARD,
aurora.JobUpdateStatus_ROLLED_BACK,
aurora.JobUpdateStatus_ABORTED,
aurora.JobUpdateStatus_ERROR,
aurora.JobUpdateStatus_FAILED,
}
}
// AwaitingPulseJobUpdateStates - States a job update may be in where it is waiting for a pulse. // AwaitingPulseJobUpdateStates - States a job update may be in where it is waiting for a pulse.
var AwaitingPulseJobUpdateStates = make(map[aurora.JobUpdateStatus]bool) var AwaitingPulseJobUpdateStates = make(map[aurora.JobUpdateStatus]bool)
@ -87,3 +99,22 @@ func validateAuroraURL(location string) (string, error) {
return u.String(), nil return u.String(), nil
} }
func calculateCurrentBatch(updatingInstances int32, batchSizes []int32) int {
for i, size := range batchSizes {
updatingInstances -= size
if updatingInstances <= 0 {
return i
}
}
// Overflow batches
batchCount := len(batchSizes) - 1
lastBatchIndex := len(batchSizes) - 1
batchCount += int(updatingInstances / batchSizes[lastBatchIndex])
if updatingInstances%batchSizes[lastBatchIndex] != 0 {
batchCount++
}
return batchCount
}

View file

@ -63,3 +63,40 @@ func TestAuroraURLValidator(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
}) })
} }
func TestCurrentBatchCalculator(t *testing.T) {
t.Run("singleBatchOverflow", func(t *testing.T) {
curBatch := calculateCurrentBatch(10, []int32{2})
assert.Equal(t, 4, curBatch)
})
t.Run("noInstancesUpdating", func(t *testing.T) {
curBatch := calculateCurrentBatch(0, []int32{2})
assert.Equal(t, 0, curBatch)
})
t.Run("evenMatchSingleBatch", func(t *testing.T) {
curBatch := calculateCurrentBatch(2, []int32{2})
assert.Equal(t, 0, curBatch)
})
t.Run("moreInstancesThanBatches", func(t *testing.T) {
curBatch := calculateCurrentBatch(5, []int32{1, 2})
assert.Equal(t, 2, curBatch)
})
t.Run("moreInstancesThanBatchesDecreasing", func(t *testing.T) {
curBatch := calculateCurrentBatch(5, []int32{2, 1})
assert.Equal(t, 3, curBatch)
})
t.Run("unevenFit", func(t *testing.T) {
curBatch := calculateCurrentBatch(2, []int32{1, 2})
assert.Equal(t, 1, curBatch)
})
t.Run("halfWay", func(t *testing.T) {
curBatch := calculateCurrentBatch(1, []int32{1, 2})
assert.Equal(t, 0, curBatch)
})
}