Adding auto pause after batch support (#102)
* Adding support for auto pause after batch and modifying the update status monitor to automatically multiply values below a second by a second.
This commit is contained in:
parent
d0866b98bc
commit
682cfd931b
4 changed files with 148 additions and 0 deletions
|
@ -726,6 +726,8 @@ struct QueueJobUpdateStrategy {
|
||||||
*/
|
*/
|
||||||
struct BatchJobUpdateStrategy {
|
struct BatchJobUpdateStrategy {
|
||||||
1: i32 groupSize
|
1: i32 groupSize
|
||||||
|
/* Update will pause automatically after each batch completes */
|
||||||
|
2: bool autopauseAfterBatch
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Same as Batch strategy but each time an active group completes, the size of the next active
|
/** Same as Batch strategy but each time an active group completes, the size of the next active
|
||||||
|
@ -733,6 +735,8 @@ struct BatchJobUpdateStrategy {
|
||||||
*/
|
*/
|
||||||
struct VariableBatchJobUpdateStrategy {
|
struct VariableBatchJobUpdateStrategy {
|
||||||
1: list<i32> groupSizes
|
1: list<i32> groupSizes
|
||||||
|
/* Update will pause automatically after each batch completes */
|
||||||
|
2: bool autopauseAfterBatch
|
||||||
}
|
}
|
||||||
|
|
||||||
union JobUpdateStrategy {
|
union JobUpdateStrategy {
|
||||||
|
|
|
@ -10137,8 +10137,10 @@ func (p *QueueJobUpdateStrategy) String() string {
|
||||||
//
|
//
|
||||||
// Attributes:
|
// Attributes:
|
||||||
// - GroupSize
|
// - GroupSize
|
||||||
|
// - AutopauseAfterBatch
|
||||||
type BatchJobUpdateStrategy struct {
|
type BatchJobUpdateStrategy struct {
|
||||||
GroupSize int32 `thrift:"groupSize,1" db:"groupSize" json:"groupSize"`
|
GroupSize int32 `thrift:"groupSize,1" db:"groupSize" json:"groupSize"`
|
||||||
|
AutopauseAfterBatch bool `thrift:"autopauseAfterBatch,2" db:"autopauseAfterBatch" json:"autopauseAfterBatch"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBatchJobUpdateStrategy() *BatchJobUpdateStrategy {
|
func NewBatchJobUpdateStrategy() *BatchJobUpdateStrategy {
|
||||||
|
@ -10149,6 +10151,10 @@ func NewBatchJobUpdateStrategy() *BatchJobUpdateStrategy {
|
||||||
func (p *BatchJobUpdateStrategy) GetGroupSize() int32 {
|
func (p *BatchJobUpdateStrategy) GetGroupSize() int32 {
|
||||||
return p.GroupSize
|
return p.GroupSize
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *BatchJobUpdateStrategy) GetAutopauseAfterBatch() bool {
|
||||||
|
return p.AutopauseAfterBatch
|
||||||
|
}
|
||||||
func (p *BatchJobUpdateStrategy) Read(iprot thrift.TProtocol) error {
|
func (p *BatchJobUpdateStrategy) Read(iprot thrift.TProtocol) error {
|
||||||
if _, err := iprot.ReadStructBegin(); err != nil {
|
if _, err := iprot.ReadStructBegin(); err != nil {
|
||||||
return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err)
|
return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err)
|
||||||
|
@ -10172,6 +10178,16 @@ func (p *BatchJobUpdateStrategy) Read(iprot thrift.TProtocol) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
case 2:
|
||||||
|
if fieldTypeId == thrift.BOOL {
|
||||||
|
if err := p.ReadField2(iprot); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if err := iprot.Skip(fieldTypeId); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
if err := iprot.Skip(fieldTypeId); err != nil {
|
if err := iprot.Skip(fieldTypeId); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -10196,11 +10212,21 @@ func (p *BatchJobUpdateStrategy) ReadField1(iprot thrift.TProtocol) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *BatchJobUpdateStrategy) ReadField2(iprot thrift.TProtocol) error {
|
||||||
|
if v, err := iprot.ReadBool(); err != nil {
|
||||||
|
return thrift.PrependError("error reading field 2: ", err)
|
||||||
|
} else {
|
||||||
|
p.AutopauseAfterBatch = v
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (p *BatchJobUpdateStrategy) Write(oprot thrift.TProtocol) error {
|
func (p *BatchJobUpdateStrategy) Write(oprot thrift.TProtocol) error {
|
||||||
if err := oprot.WriteStructBegin("BatchJobUpdateStrategy"); err != nil {
|
if err := oprot.WriteStructBegin("BatchJobUpdateStrategy"); err != nil {
|
||||||
return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) }
|
return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) }
|
||||||
if p != nil {
|
if p != nil {
|
||||||
if err := p.writeField1(oprot); err != nil { return err }
|
if err := p.writeField1(oprot); err != nil { return err }
|
||||||
|
if err := p.writeField2(oprot); err != nil { return err }
|
||||||
}
|
}
|
||||||
if err := oprot.WriteFieldStop(); err != nil {
|
if err := oprot.WriteFieldStop(); err != nil {
|
||||||
return thrift.PrependError("write field stop error: ", err) }
|
return thrift.PrependError("write field stop error: ", err) }
|
||||||
|
@ -10219,6 +10245,16 @@ func (p *BatchJobUpdateStrategy) writeField1(oprot thrift.TProtocol) (err error)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *BatchJobUpdateStrategy) writeField2(oprot thrift.TProtocol) (err error) {
|
||||||
|
if err := oprot.WriteFieldBegin("autopauseAfterBatch", thrift.BOOL, 2); err != nil {
|
||||||
|
return thrift.PrependError(fmt.Sprintf("%T write field begin error 2:autopauseAfterBatch: ", p), err) }
|
||||||
|
if err := oprot.WriteBool(bool(p.AutopauseAfterBatch)); err != nil {
|
||||||
|
return thrift.PrependError(fmt.Sprintf("%T.autopauseAfterBatch (2) field write error: ", p), err) }
|
||||||
|
if err := oprot.WriteFieldEnd(); err != nil {
|
||||||
|
return thrift.PrependError(fmt.Sprintf("%T write field end error 2:autopauseAfterBatch: ", p), err) }
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (p *BatchJobUpdateStrategy) String() string {
|
func (p *BatchJobUpdateStrategy) String() string {
|
||||||
if p == nil {
|
if p == nil {
|
||||||
return "<nil>"
|
return "<nil>"
|
||||||
|
@ -10231,8 +10267,10 @@ func (p *BatchJobUpdateStrategy) String() string {
|
||||||
//
|
//
|
||||||
// Attributes:
|
// Attributes:
|
||||||
// - GroupSizes
|
// - GroupSizes
|
||||||
|
// - AutopauseAfterBatch
|
||||||
type VariableBatchJobUpdateStrategy struct {
|
type VariableBatchJobUpdateStrategy struct {
|
||||||
GroupSizes []int32 `thrift:"groupSizes,1" db:"groupSizes" json:"groupSizes"`
|
GroupSizes []int32 `thrift:"groupSizes,1" db:"groupSizes" json:"groupSizes"`
|
||||||
|
AutopauseAfterBatch bool `thrift:"autopauseAfterBatch,2" db:"autopauseAfterBatch" json:"autopauseAfterBatch"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewVariableBatchJobUpdateStrategy() *VariableBatchJobUpdateStrategy {
|
func NewVariableBatchJobUpdateStrategy() *VariableBatchJobUpdateStrategy {
|
||||||
|
@ -10243,6 +10281,10 @@ func NewVariableBatchJobUpdateStrategy() *VariableBatchJobUpdateStrategy {
|
||||||
func (p *VariableBatchJobUpdateStrategy) GetGroupSizes() []int32 {
|
func (p *VariableBatchJobUpdateStrategy) GetGroupSizes() []int32 {
|
||||||
return p.GroupSizes
|
return p.GroupSizes
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *VariableBatchJobUpdateStrategy) GetAutopauseAfterBatch() bool {
|
||||||
|
return p.AutopauseAfterBatch
|
||||||
|
}
|
||||||
func (p *VariableBatchJobUpdateStrategy) Read(iprot thrift.TProtocol) error {
|
func (p *VariableBatchJobUpdateStrategy) Read(iprot thrift.TProtocol) error {
|
||||||
if _, err := iprot.ReadStructBegin(); err != nil {
|
if _, err := iprot.ReadStructBegin(); err != nil {
|
||||||
return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err)
|
return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err)
|
||||||
|
@ -10266,6 +10308,16 @@ func (p *VariableBatchJobUpdateStrategy) Read(iprot thrift.TProtocol) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
case 2:
|
||||||
|
if fieldTypeId == thrift.BOOL {
|
||||||
|
if err := p.ReadField2(iprot); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if err := iprot.Skip(fieldTypeId); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
if err := iprot.Skip(fieldTypeId); err != nil {
|
if err := iprot.Skip(fieldTypeId); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -10303,11 +10355,21 @@ var _elem25 int32
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *VariableBatchJobUpdateStrategy) ReadField2(iprot thrift.TProtocol) error {
|
||||||
|
if v, err := iprot.ReadBool(); err != nil {
|
||||||
|
return thrift.PrependError("error reading field 2: ", err)
|
||||||
|
} else {
|
||||||
|
p.AutopauseAfterBatch = v
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (p *VariableBatchJobUpdateStrategy) Write(oprot thrift.TProtocol) error {
|
func (p *VariableBatchJobUpdateStrategy) Write(oprot thrift.TProtocol) error {
|
||||||
if err := oprot.WriteStructBegin("VariableBatchJobUpdateStrategy"); err != nil {
|
if err := oprot.WriteStructBegin("VariableBatchJobUpdateStrategy"); err != nil {
|
||||||
return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) }
|
return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) }
|
||||||
if p != nil {
|
if p != nil {
|
||||||
if err := p.writeField1(oprot); err != nil { return err }
|
if err := p.writeField1(oprot); err != nil { return err }
|
||||||
|
if err := p.writeField2(oprot); err != nil { return err }
|
||||||
}
|
}
|
||||||
if err := oprot.WriteFieldStop(); err != nil {
|
if err := oprot.WriteFieldStop(); err != nil {
|
||||||
return thrift.PrependError("write field stop error: ", err) }
|
return thrift.PrependError("write field stop error: ", err) }
|
||||||
|
@ -10334,6 +10396,16 @@ func (p *VariableBatchJobUpdateStrategy) writeField1(oprot thrift.TProtocol) (er
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *VariableBatchJobUpdateStrategy) writeField2(oprot thrift.TProtocol) (err error) {
|
||||||
|
if err := oprot.WriteFieldBegin("autopauseAfterBatch", thrift.BOOL, 2); err != nil {
|
||||||
|
return thrift.PrependError(fmt.Sprintf("%T write field begin error 2:autopauseAfterBatch: ", p), err) }
|
||||||
|
if err := oprot.WriteBool(bool(p.AutopauseAfterBatch)); err != nil {
|
||||||
|
return thrift.PrependError(fmt.Sprintf("%T.autopauseAfterBatch (2) field write error: ", p), err) }
|
||||||
|
if err := oprot.WriteFieldEnd(); err != nil {
|
||||||
|
return thrift.PrependError(fmt.Sprintf("%T write field end error 2:autopauseAfterBatch: ", p), err) }
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (p *VariableBatchJobUpdateStrategy) String() string {
|
func (p *VariableBatchJobUpdateStrategy) String() string {
|
||||||
if p == nil {
|
if p == nil {
|
||||||
return "<nil>"
|
return "<nil>"
|
||||||
|
|
|
@ -81,6 +81,15 @@ func (m *Monitor) JobUpdateStatus(
|
||||||
interval time.Duration,
|
interval time.Duration,
|
||||||
timeout time.Duration) (aurora.JobUpdateStatus, error) {
|
timeout time.Duration) (aurora.JobUpdateStatus, error) {
|
||||||
|
|
||||||
|
// Minimal unit is the second, if it's below a second, multiply input by seconds
|
||||||
|
if interval < 1*time.Second {
|
||||||
|
interval *= time.Second
|
||||||
|
}
|
||||||
|
|
||||||
|
if timeout < 1*time.Second {
|
||||||
|
timeout *= time.Second
|
||||||
|
}
|
||||||
|
|
||||||
desiredStatusesSlice := make([]aurora.JobUpdateStatus, 0)
|
desiredStatusesSlice := make([]aurora.JobUpdateStatus, 0)
|
||||||
|
|
||||||
for k := range desiredStatuses {
|
for k := range desiredStatuses {
|
||||||
|
|
|
@ -983,3 +983,66 @@ func TestRealisClient_UpdateStrategies(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRealisClient_Update_Autopause_After_Batch(t *testing.T) {
|
||||||
|
|
||||||
|
// Create a single job
|
||||||
|
job := realis.NewJob().
|
||||||
|
Environment("prod").
|
||||||
|
Role("vagrant").
|
||||||
|
Name("autopause_test").
|
||||||
|
ExecutorName(aurora.AURORA_EXECUTOR_NAME).
|
||||||
|
ExecutorData(string(thermosPayload)).
|
||||||
|
CPU(.01).
|
||||||
|
RAM(4).
|
||||||
|
Disk(10).
|
||||||
|
InstanceCount(6).
|
||||||
|
IsService(true)
|
||||||
|
|
||||||
|
UpdateJob := realis.NewDefaultUpdateJob(job.TaskConfig()).
|
||||||
|
VariableBatchStrategy(
|
||||||
|
aurora.VariableBatchJobUpdateStrategy{GroupSizes: []int32{1, 2, 3}, AutopauseAfterBatch: true}).
|
||||||
|
InstanceCount(6).
|
||||||
|
WatchTime(1000)
|
||||||
|
|
||||||
|
resp, err := r.StartJobUpdate(UpdateJob, "")
|
||||||
|
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.NotNil(t, resp)
|
||||||
|
assert.NotNil(t, resp.GetResult_())
|
||||||
|
assert.NotNil(t, resp.GetResult_().GetStartJobUpdateResult_())
|
||||||
|
assert.NotNil(t, resp.GetResult_().GetStartJobUpdateResult_().GetKey())
|
||||||
|
|
||||||
|
key := *resp.GetResult_().GetStartJobUpdateResult_().GetKey()
|
||||||
|
|
||||||
|
updateLoop:
|
||||||
|
for {
|
||||||
|
status, mErr := monitor.JobUpdateStatus(
|
||||||
|
key,
|
||||||
|
map[aurora.JobUpdateStatus]bool{
|
||||||
|
aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED: true,
|
||||||
|
aurora.JobUpdateStatus_ROLLED_FORWARD: true},
|
||||||
|
time.Second*5,
|
||||||
|
time.Second*240)
|
||||||
|
|
||||||
|
if mErr != nil {
|
||||||
|
// Update did not enter the state we needed it to before the monitor timed out
|
||||||
|
_, err := r.AbortJobUpdate(key, "Monitor timed out.")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
switch status {
|
||||||
|
case aurora.JobUpdateStatus_ROLLED_FORWARD:
|
||||||
|
break updateLoop
|
||||||
|
case aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED:
|
||||||
|
// Update may already be in a terminal state so don't check for error
|
||||||
|
_, err := r.ResumeJobUpdate(&key, "Monitor timed out.")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
default:
|
||||||
|
// This should never occur as an error will be returned instead
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = r.KillJob(job.JobKey())
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue