diff --git a/auroraAPI.thrift b/auroraAPI.thrift index 4e9e5a9..3e43f6f 100644 --- a/auroraAPI.thrift +++ b/auroraAPI.thrift @@ -726,6 +726,8 @@ struct QueueJobUpdateStrategy { */ struct BatchJobUpdateStrategy { 1: i32 groupSize + /* Update will pause automatically after each batch completes */ + 2: bool autopauseAfterBatch } /** Same as Batch strategy but each time an active group completes, the size of the next active @@ -733,6 +735,8 @@ struct BatchJobUpdateStrategy { */ struct VariableBatchJobUpdateStrategy { 1: list groupSizes + /* Update will pause automatically after each batch completes */ + 2: bool autopauseAfterBatch } union JobUpdateStrategy { diff --git a/gen-go/apache/aurora/auroraAPI.go b/gen-go/apache/aurora/auroraAPI.go index 9bb9396..849c381 100644 --- a/gen-go/apache/aurora/auroraAPI.go +++ b/gen-go/apache/aurora/auroraAPI.go @@ -10137,8 +10137,10 @@ func (p *QueueJobUpdateStrategy) String() string { // // Attributes: // - GroupSize +// - AutopauseAfterBatch type BatchJobUpdateStrategy struct { GroupSize int32 `thrift:"groupSize,1" db:"groupSize" json:"groupSize"` + AutopauseAfterBatch bool `thrift:"autopauseAfterBatch,2" db:"autopauseAfterBatch" json:"autopauseAfterBatch"` } func NewBatchJobUpdateStrategy() *BatchJobUpdateStrategy { @@ -10149,6 +10151,10 @@ func NewBatchJobUpdateStrategy() *BatchJobUpdateStrategy { func (p *BatchJobUpdateStrategy) GetGroupSize() int32 { return p.GroupSize } + +func (p *BatchJobUpdateStrategy) GetAutopauseAfterBatch() bool { + return p.AutopauseAfterBatch +} func (p *BatchJobUpdateStrategy) Read(iprot thrift.TProtocol) error { if _, err := iprot.ReadStructBegin(); err != nil { return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) @@ -10172,6 +10178,16 @@ func (p *BatchJobUpdateStrategy) Read(iprot thrift.TProtocol) error { return err } } + case 2: + if fieldTypeId == thrift.BOOL { + if err := p.ReadField2(iprot); err != nil { + return err + } + } else { + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } default: if err := iprot.Skip(fieldTypeId); err != nil { return err @@ -10196,11 +10212,21 @@ func (p *BatchJobUpdateStrategy) ReadField1(iprot thrift.TProtocol) error { return nil } +func (p *BatchJobUpdateStrategy) ReadField2(iprot thrift.TProtocol) error { + if v, err := iprot.ReadBool(); err != nil { + return thrift.PrependError("error reading field 2: ", err) +} else { + p.AutopauseAfterBatch = v +} + return nil +} + func (p *BatchJobUpdateStrategy) Write(oprot thrift.TProtocol) error { if err := oprot.WriteStructBegin("BatchJobUpdateStrategy"); err != nil { return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) } if p != nil { if err := p.writeField1(oprot); err != nil { return err } + if err := p.writeField2(oprot); err != nil { return err } } if err := oprot.WriteFieldStop(); err != nil { return thrift.PrependError("write field stop error: ", err) } @@ -10219,6 +10245,16 @@ func (p *BatchJobUpdateStrategy) writeField1(oprot thrift.TProtocol) (err error) return err } +func (p *BatchJobUpdateStrategy) writeField2(oprot thrift.TProtocol) (err error) { + if err := oprot.WriteFieldBegin("autopauseAfterBatch", thrift.BOOL, 2); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 2:autopauseAfterBatch: ", p), err) } + if err := oprot.WriteBool(bool(p.AutopauseAfterBatch)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.autopauseAfterBatch (2) field write error: ", p), err) } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 2:autopauseAfterBatch: ", p), err) } + return err +} + func (p *BatchJobUpdateStrategy) String() string { if p == nil { return "" @@ -10231,8 +10267,10 @@ func (p *BatchJobUpdateStrategy) String() string { // // Attributes: // - GroupSizes +// - AutopauseAfterBatch type VariableBatchJobUpdateStrategy struct { GroupSizes []int32 `thrift:"groupSizes,1" db:"groupSizes" json:"groupSizes"` + AutopauseAfterBatch bool `thrift:"autopauseAfterBatch,2" db:"autopauseAfterBatch" json:"autopauseAfterBatch"` } func NewVariableBatchJobUpdateStrategy() *VariableBatchJobUpdateStrategy { @@ -10243,6 +10281,10 @@ func NewVariableBatchJobUpdateStrategy() *VariableBatchJobUpdateStrategy { func (p *VariableBatchJobUpdateStrategy) GetGroupSizes() []int32 { return p.GroupSizes } + +func (p *VariableBatchJobUpdateStrategy) GetAutopauseAfterBatch() bool { + return p.AutopauseAfterBatch +} func (p *VariableBatchJobUpdateStrategy) Read(iprot thrift.TProtocol) error { if _, err := iprot.ReadStructBegin(); err != nil { return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) @@ -10266,6 +10308,16 @@ func (p *VariableBatchJobUpdateStrategy) Read(iprot thrift.TProtocol) error { return err } } + case 2: + if fieldTypeId == thrift.BOOL { + if err := p.ReadField2(iprot); err != nil { + return err + } + } else { + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } default: if err := iprot.Skip(fieldTypeId); err != nil { return err @@ -10303,11 +10355,21 @@ var _elem25 int32 return nil } +func (p *VariableBatchJobUpdateStrategy) ReadField2(iprot thrift.TProtocol) error { + if v, err := iprot.ReadBool(); err != nil { + return thrift.PrependError("error reading field 2: ", err) +} else { + p.AutopauseAfterBatch = v +} + return nil +} + func (p *VariableBatchJobUpdateStrategy) Write(oprot thrift.TProtocol) error { if err := oprot.WriteStructBegin("VariableBatchJobUpdateStrategy"); err != nil { return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) } if p != nil { if err := p.writeField1(oprot); err != nil { return err } + if err := p.writeField2(oprot); err != nil { return err } } if err := oprot.WriteFieldStop(); err != nil { return thrift.PrependError("write field stop error: ", err) } @@ -10334,6 +10396,16 @@ func (p *VariableBatchJobUpdateStrategy) writeField1(oprot thrift.TProtocol) (er return err } +func (p *VariableBatchJobUpdateStrategy) writeField2(oprot thrift.TProtocol) (err error) { + if err := oprot.WriteFieldBegin("autopauseAfterBatch", thrift.BOOL, 2); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 2:autopauseAfterBatch: ", p), err) } + if err := oprot.WriteBool(bool(p.AutopauseAfterBatch)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.autopauseAfterBatch (2) field write error: ", p), err) } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 2:autopauseAfterBatch: ", p), err) } + return err +} + func (p *VariableBatchJobUpdateStrategy) String() string { if p == nil { return "" diff --git a/monitors.go b/monitors.go index 69167df..8c911bc 100644 --- a/monitors.go +++ b/monitors.go @@ -81,6 +81,15 @@ func (m *Monitor) JobUpdateStatus( interval time.Duration, timeout time.Duration) (aurora.JobUpdateStatus, error) { + // Minimal unit is the second, if it's below a second, multiply input by seconds + if interval < 1*time.Second { + interval *= time.Second + } + + if timeout < 1*time.Second { + timeout *= time.Second + } + desiredStatusesSlice := make([]aurora.JobUpdateStatus, 0) for k := range desiredStatuses { diff --git a/realis_e2e_test.go b/realis_e2e_test.go index 187c262..bdfca72 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -983,3 +983,66 @@ func TestRealisClient_UpdateStrategies(t *testing.T) { }) } } + +func TestRealisClient_Update_Autopause_After_Batch(t *testing.T) { + + // Create a single job + job := realis.NewJob(). + Environment("prod"). + Role("vagrant"). + Name("autopause_test"). + ExecutorName(aurora.AURORA_EXECUTOR_NAME). + ExecutorData(string(thermosPayload)). + CPU(.01). + RAM(4). + Disk(10). + InstanceCount(6). + IsService(true) + + UpdateJob := realis.NewDefaultUpdateJob(job.TaskConfig()). + VariableBatchStrategy( + aurora.VariableBatchJobUpdateStrategy{GroupSizes: []int32{1, 2, 3}, AutopauseAfterBatch: true}). + InstanceCount(6). + WatchTime(1000) + + resp, err := r.StartJobUpdate(UpdateJob, "") + + assert.NoError(t, err) + assert.NotNil(t, resp) + assert.NotNil(t, resp.GetResult_()) + assert.NotNil(t, resp.GetResult_().GetStartJobUpdateResult_()) + assert.NotNil(t, resp.GetResult_().GetStartJobUpdateResult_().GetKey()) + + key := *resp.GetResult_().GetStartJobUpdateResult_().GetKey() + +updateLoop: + for { + status, mErr := monitor.JobUpdateStatus( + key, + map[aurora.JobUpdateStatus]bool{ + aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED: true, + aurora.JobUpdateStatus_ROLLED_FORWARD: true}, + time.Second*5, + time.Second*240) + + if mErr != nil { + // Update did not enter the state we needed it to before the monitor timed out + _, err := r.AbortJobUpdate(key, "Monitor timed out.") + assert.NoError(t, err) + } + + switch status { + case aurora.JobUpdateStatus_ROLLED_FORWARD: + break updateLoop + case aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED: + // Update may already be in a terminal state so don't check for error + _, err := r.ResumeJobUpdate(&key, "Monitor timed out.") + assert.NoError(t, err) + default: + // This should never occur as an error will be returned instead + } + } + + _, err = r.KillJob(job.JobKey()) + assert.NoError(t, err) +}