From 77bb78927ec040eb2d2f8a74cabc46bffb9a512a Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Wed, 9 May 2018 15:33:18 -0700 Subject: [PATCH] API update to support staggered updates prototype. --- auroraAPI.thrift | 42 ++++++--- examples/client.go | 43 +++++++-- gen-go/apache/aurora/ttypes.go | 166 ++++++++++++++++++++++----------- updatejob.go | 13 ++- 4 files changed, 191 insertions(+), 73 deletions(-) diff --git a/auroraAPI.thrift b/auroraAPI.thrift index 3d3d5bc..a5fcb0e 100644 --- a/auroraAPI.thrift +++ b/auroraAPI.thrift @@ -680,9 +680,27 @@ struct JobUpdateKey { 2: string id } -/** Job update thresholds and limits. */ +/** Declaration of update strategy types available. **/ +enum JobUpdateStrategyType { + /** An update strategy that will maintain a limited amount of updates running. */ + QUEUE = 0, + /** An update strategy that will only add more work when the current active group is empty. */ + BATCH = 1, + /** + * An update strategy that will only add more work when the current active group is empty. + * Unlike BatchUpdate, once an active group is empty, the size of the next active group + * is allowed to change using this strategy. + */ + VARIABLE_BATCH = 2 +} + +/** Job update thresholds and limits. **/ struct JobUpdateSettings { - /** Max number of instances being updated at any given moment. */ + /** + * TODO(rdelvalle): determine if it's better to use updateGroupSizes for everything and capping + * updateGroupSizes at length=1 for BATCH and QUEUE. + * Max number of instances being updated at any given moment. + */ 1: i32 updateGroupSize /** Max number of instance failures to tolerate before marking instance as FAILED. */ @@ -700,13 +718,13 @@ struct JobUpdateSettings { /** Instance IDs to act on. All instances will be affected if this is not set. */ 7: set updateOnlyTheseInstances - /** + /** TODO(rdelvalle): Deprecated, please set updateStrategyType to BATCH instead * If true, use updateGroupSize as strict batching boundaries, and avoid proceeding to another * batch until the preceding batch finishes updating. */ 8: bool waitForBatchCompletion - /** + /** * If set, requires external calls to pulseJobUpdate RPC within the specified rate for the * update to make progress. If no pulses received within specified interval the update will * block. A blocked update is unable to continue but retains its current status. It may only get @@ -715,16 +733,16 @@ struct JobUpdateSettings { 9: optional i32 blockIfNoPulsesAfterMs /** - * This list contains the number of instances that each batch will complete before moving on to - * the next. This field can only be used with waitForBatchCompletion set as true. - **/ - 10: optional list variableUpdateGroupSize + * Explicitly state which Update strategy type to use. + */ + 10: optional JobUpdateStrategyType updateStrategyType /** - * Pauses the deployment of further tasks after each batch completes - * until the user sends an resume call. - **/ - 11: bool autoPause} + * Limit for each update group during an update. + * This field should always be length of 1 for QUEUE and BATCH. + */ + 11: optional list groupsSize +} /** Event marking a state transition in job update lifecycle. */ struct JobUpdateEvent { diff --git a/examples/client.go b/examples/client.go index 41a9145..8e6e7ea 100644 --- a/examples/client.go +++ b/examples/client.go @@ -18,12 +18,9 @@ import ( "flag" "fmt" "io/ioutil" - "log" "os" - - "time" - "strings" + "time" "github.com/paypal/gorealis" "github.com/paypal/gorealis/gen-go/apache/aurora" @@ -90,7 +87,6 @@ func main() { Factor: 2.0, Jitter: 0.1, }), - realis.SetLogger(log.New(os.Stdout, "realis-debug: ", log.Ldate)), realis.Debug(), } @@ -191,7 +187,6 @@ func main() { // Create a service with three instances using the update API instead of the createJob API fmt.Println("Creating service") settings := realis.NewUpdateSettings() - settings.VariableUpdateGroupSize = []int32{1, 2, 3} job.InstanceCount(6).RAM(16).CPU(.1) resp, result, err := r.CreateService(job, settings) if err != nil { @@ -430,6 +425,40 @@ func main() { monitor.JobUpdate(*jobUpdateKey, 5, 500) break + case "staggeredUpdate": + fmt.Println("Updating a job with with less RAM and to 5 instances staggered") + live, err := r.GetInstanceIds(job.JobKey(), aurora.ACTIVE_STATES) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + var instId int32 + for k := range live { + instId = k + break + } + taskConfig, err := r.FetchTaskConfig(aurora.InstanceKey{ + JobKey: job.JobKey(), + InstanceId: instId, + }) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + updateJob := realis.NewDefaultUpdateJob(taskConfig). + UpdateStrategy(aurora.JobUpdateStrategyType_VARIABLE_BATCH). + GroupsSize([]int32{1, 2}) + updateJob.InstanceCount(3).RAM(8).CPU(.1) + + resp, err := r.StartJobUpdate(updateJob, "") + if err != nil { + fmt.Println(err) + os.Exit(1) + } + + jobUpdateKey := response.JobUpdateKey(resp) + monitor.JobUpdate(*jobUpdateKey, 5, 500) + break case "pauseJobUpdate": resp, err := r.PauseJobUpdate(&aurora.JobUpdateKey{ Job: job.JobKey(), @@ -492,7 +521,7 @@ func main() { fmt.Println(resp.String()) break case "rollbackUpdate": - fmt.Println("Abort update") + fmt.Println("Rollback update") resp, err := r.RollbackJobUpdate(aurora.JobUpdateKey{ Job: job.JobKey(), ID: updateId, diff --git a/gen-go/apache/aurora/ttypes.go b/gen-go/apache/aurora/ttypes.go index deaaa37..396e583 100644 --- a/gen-go/apache/aurora/ttypes.go +++ b/gen-go/apache/aurora/ttypes.go @@ -523,6 +523,54 @@ func (p *JobUpdatePulseStatus) UnmarshalText(text []byte) error { return nil } +//Declaration of update strategy types available. * +type JobUpdateStrategyType int64 + +const ( + JobUpdateStrategyType_QUEUE JobUpdateStrategyType = 0 + JobUpdateStrategyType_BATCH JobUpdateStrategyType = 1 + JobUpdateStrategyType_VARIABLE_BATCH JobUpdateStrategyType = 2 +) + +func (p JobUpdateStrategyType) String() string { + switch p { + case JobUpdateStrategyType_QUEUE: + return "QUEUE" + case JobUpdateStrategyType_BATCH: + return "BATCH" + case JobUpdateStrategyType_VARIABLE_BATCH: + return "VARIABLE_BATCH" + } + return "" +} + +func JobUpdateStrategyTypeFromString(s string) (JobUpdateStrategyType, error) { + switch s { + case "QUEUE": + return JobUpdateStrategyType_QUEUE, nil + case "BATCH": + return JobUpdateStrategyType_BATCH, nil + case "VARIABLE_BATCH": + return JobUpdateStrategyType_VARIABLE_BATCH, nil + } + return JobUpdateStrategyType(0), fmt.Errorf("not a valid JobUpdateStrategyType string") +} + +func JobUpdateStrategyTypePtr(v JobUpdateStrategyType) *JobUpdateStrategyType { return &v } + +func (p JobUpdateStrategyType) MarshalText() ([]byte, error) { + return []byte(p.String()), nil +} + +func (p *JobUpdateStrategyType) UnmarshalText(text []byte) error { + q, err := JobUpdateStrategyTypeFromString(string(text)) + if err != nil { + return err + } + *p = q + return nil +} + // Attributes: // - User type Identity struct { @@ -9183,39 +9231,39 @@ func (p *JobUpdateKey) String() string { return fmt.Sprintf("JobUpdateKey(%+v)", *p) } -// Job update thresholds and limits. +// Job update thresholds and limits. * // // Attributes: -// - UpdateGroupSize: Max number of instances being updated at any given moment. +// - UpdateGroupSize: TODO(rdelvalle): determine if it's better to use updateGroupSizes for everything and capping +// updateGroupSizes at length=1 for BATCH and QUEUE. +// Max number of instances being updated at any given moment. // - MaxPerInstanceFailures: Max number of instance failures to tolerate before marking instance as FAILED. // - MaxFailedInstances: Max number of FAILED instances to tolerate before terminating the update. // - MinWaitInInstanceRunningMs: Min time to watch a RUNNING instance. // - RollbackOnFailure: If true, enables failed update rollback. // - UpdateOnlyTheseInstances: Instance IDs to act on. All instances will be affected if this is not set. -// - WaitForBatchCompletion: If true, use updateGroupSize as strict batching boundaries, and avoid proceeding to another +// - WaitForBatchCompletion: TODO(rdelvalle): Deprecated, please set updateStrategyType to BATCH instead +// If true, use updateGroupSize as strict batching boundaries, and avoid proceeding to another // batch until the preceding batch finishes updating. // - BlockIfNoPulsesAfterMs: If set, requires external calls to pulseJobUpdate RPC within the specified rate for the // update to make progress. If no pulses received within specified interval the update will // block. A blocked update is unable to continue but retains its current status. It may only get // unblocked by a fresh pulseJobUpdate call. -// - VariableUpdateGroupSize: * This list contains the number of instances that each batch will complete before moving on to -// * the next. This field can only be used with waitForBatchCompletion set as true. -// * -// - AutoPause: Pauses the deployment of further tasks after each batch completes -// until the user sends an resume call. -// +// - UpdateStrategyType: Explicitly state which Update strategy type to use. +// - GroupsSize: Limit for each update group during an update. +// This field should always be length of 1 for QUEUE and BATCH. type JobUpdateSettings struct { UpdateGroupSize int32 `thrift:"updateGroupSize,1" json:"updateGroupSize"` MaxPerInstanceFailures int32 `thrift:"maxPerInstanceFailures,2" json:"maxPerInstanceFailures"` MaxFailedInstances int32 `thrift:"maxFailedInstances,3" json:"maxFailedInstances"` // unused field # 4 - MinWaitInInstanceRunningMs int32 `thrift:"minWaitInInstanceRunningMs,5" json:"minWaitInInstanceRunningMs"` - RollbackOnFailure bool `thrift:"rollbackOnFailure,6" json:"rollbackOnFailure"` - UpdateOnlyTheseInstances map[*Range]bool `thrift:"updateOnlyTheseInstances,7" json:"updateOnlyTheseInstances"` - WaitForBatchCompletion bool `thrift:"waitForBatchCompletion,8" json:"waitForBatchCompletion"` - BlockIfNoPulsesAfterMs *int32 `thrift:"blockIfNoPulsesAfterMs,9" json:"blockIfNoPulsesAfterMs,omitempty"` - VariableUpdateGroupSize []int32 `thrift:"variableUpdateGroupSize,10" json:"variableUpdateGroupSize,omitempty"` - AutoPause bool `thrift:"autoPause,11" json:"autoPause"` + MinWaitInInstanceRunningMs int32 `thrift:"minWaitInInstanceRunningMs,5" json:"minWaitInInstanceRunningMs"` + RollbackOnFailure bool `thrift:"rollbackOnFailure,6" json:"rollbackOnFailure"` + UpdateOnlyTheseInstances map[*Range]bool `thrift:"updateOnlyTheseInstances,7" json:"updateOnlyTheseInstances"` + WaitForBatchCompletion bool `thrift:"waitForBatchCompletion,8" json:"waitForBatchCompletion"` + BlockIfNoPulsesAfterMs *int32 `thrift:"blockIfNoPulsesAfterMs,9" json:"blockIfNoPulsesAfterMs,omitempty"` + UpdateStrategyType *JobUpdateStrategyType `thrift:"updateStrategyType,10" json:"updateStrategyType,omitempty"` + GroupsSize []int32 `thrift:"groupsSize,11" json:"groupsSize,omitempty"` } func NewJobUpdateSettings() *JobUpdateSettings { @@ -9259,21 +9307,30 @@ func (p *JobUpdateSettings) GetBlockIfNoPulsesAfterMs() int32 { return *p.BlockIfNoPulsesAfterMs } -var JobUpdateSettings_VariableUpdateGroupSize_DEFAULT []int32 +var JobUpdateSettings_UpdateStrategyType_DEFAULT JobUpdateStrategyType -func (p *JobUpdateSettings) GetVariableUpdateGroupSize() []int32 { - return p.VariableUpdateGroupSize +func (p *JobUpdateSettings) GetUpdateStrategyType() JobUpdateStrategyType { + if !p.IsSetUpdateStrategyType() { + return JobUpdateSettings_UpdateStrategyType_DEFAULT + } + return *p.UpdateStrategyType } -func (p *JobUpdateSettings) GetAutoPause() bool { - return p.AutoPause +var JobUpdateSettings_GroupsSize_DEFAULT []int32 + +func (p *JobUpdateSettings) GetGroupsSize() []int32 { + return p.GroupsSize } func (p *JobUpdateSettings) IsSetBlockIfNoPulsesAfterMs() bool { return p.BlockIfNoPulsesAfterMs != nil } -func (p *JobUpdateSettings) IsSetVariableUpdateGroupSize() bool { - return p.VariableUpdateGroupSize != nil +func (p *JobUpdateSettings) IsSetUpdateStrategyType() bool { + return p.UpdateStrategyType != nil +} + +func (p *JobUpdateSettings) IsSetGroupsSize() bool { + return p.GroupsSize != nil } func (p *JobUpdateSettings) Read(iprot thrift.TProtocol) error { @@ -9429,12 +9486,22 @@ func (p *JobUpdateSettings) readField9(iprot thrift.TProtocol) error { } func (p *JobUpdateSettings) readField10(iprot thrift.TProtocol) error { + if v, err := iprot.ReadI32(); err != nil { + return thrift.PrependError("error reading field 10: ", err) + } else { + temp := JobUpdateStrategyType(v) + p.UpdateStrategyType = &temp + } + return nil +} + +func (p *JobUpdateSettings) readField11(iprot thrift.TProtocol) error { _, size, err := iprot.ReadListBegin() if err != nil { return thrift.PrependError("error reading list begin: ", err) } tSlice := make([]int32, 0, size) - p.VariableUpdateGroupSize = tSlice + p.GroupsSize = tSlice for i := 0; i < size; i++ { var _elem27 int32 if v, err := iprot.ReadI32(); err != nil { @@ -9442,7 +9509,7 @@ func (p *JobUpdateSettings) readField10(iprot thrift.TProtocol) error { } else { _elem27 = v } - p.VariableUpdateGroupSize = append(p.VariableUpdateGroupSize, _elem27) + p.GroupsSize = append(p.GroupsSize, _elem27) } if err := iprot.ReadListEnd(); err != nil { return thrift.PrependError("error reading list end: ", err) @@ -9450,15 +9517,6 @@ func (p *JobUpdateSettings) readField10(iprot thrift.TProtocol) error { return nil } -func (p *JobUpdateSettings) readField11(iprot thrift.TProtocol) error { - if v, err := iprot.ReadBool(); err != nil { - return thrift.PrependError("error reading field 11: ", err) - } else { - p.AutoPause = v - } - return nil -} - func (p *JobUpdateSettings) Write(oprot thrift.TProtocol) error { if err := oprot.WriteStructBegin("JobUpdateSettings"); err != nil { return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) @@ -9617,14 +9675,29 @@ func (p *JobUpdateSettings) writeField9(oprot thrift.TProtocol) (err error) { } func (p *JobUpdateSettings) writeField10(oprot thrift.TProtocol) (err error) { - if p.IsSetVariableUpdateGroupSize() { - if err := oprot.WriteFieldBegin("variableUpdateGroupSize", thrift.LIST, 10); err != nil { - return thrift.PrependError(fmt.Sprintf("%T write field begin error 10:variableUpdateGroupSize: ", p), err) + if p.IsSetUpdateStrategyType() { + if err := oprot.WriteFieldBegin("updateStrategyType", thrift.I32, 10); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 10:updateStrategyType: ", p), err) } - if err := oprot.WriteListBegin(thrift.I32, len(p.VariableUpdateGroupSize)); err != nil { + if err := oprot.WriteI32(int32(*p.UpdateStrategyType)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.updateStrategyType (10) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 10:updateStrategyType: ", p), err) + } + } + return err +} + +func (p *JobUpdateSettings) writeField11(oprot thrift.TProtocol) (err error) { + if p.IsSetGroupsSize() { + if err := oprot.WriteFieldBegin("groupsSize", thrift.LIST, 11); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 11:groupsSize: ", p), err) + } + if err := oprot.WriteListBegin(thrift.I32, len(p.GroupsSize)); err != nil { return thrift.PrependError("error writing list begin: ", err) } - for _, v := range p.VariableUpdateGroupSize { + for _, v := range p.GroupsSize { if err := oprot.WriteI32(int32(v)); err != nil { return thrift.PrependError(fmt.Sprintf("%T. (0) field write error: ", p), err) } @@ -9633,25 +9706,12 @@ func (p *JobUpdateSettings) writeField10(oprot thrift.TProtocol) (err error) { return thrift.PrependError("error writing list end: ", err) } if err := oprot.WriteFieldEnd(); err != nil { - return thrift.PrependError(fmt.Sprintf("%T write field end error 10:variableUpdateGroupSize: ", p), err) + return thrift.PrependError(fmt.Sprintf("%T write field end error 11:groupsSize: ", p), err) } } return err } -func (p *JobUpdateSettings) writeField11(oprot thrift.TProtocol) (err error) { - if err := oprot.WriteFieldBegin("autoPause", thrift.BOOL, 11); err != nil { - return thrift.PrependError(fmt.Sprintf("%T write field begin error 11:autoPause: ", p), err) - } - if err := oprot.WriteBool(bool(p.AutoPause)); err != nil { - return thrift.PrependError(fmt.Sprintf("%T.autoPause (11) field write error: ", p), err) - } - if err := oprot.WriteFieldEnd(); err != nil { - return thrift.PrependError(fmt.Sprintf("%T write field end error 11:autoPause: ", p), err) - } - return err -} - func (p *JobUpdateSettings) String() string { if p == nil { return "" diff --git a/updatejob.go b/updatejob.go index c6f1377..20da84c 100644 --- a/updatejob.go +++ b/updatejob.go @@ -60,7 +60,6 @@ func NewDefaultUpdateJob(config *aurora.TaskConfig) *UpdateJob { req.Settings.MaxPerInstanceFailures = 0 req.Settings.MaxFailedInstances = 0 req.Settings.RollbackOnFailure = true - req.Settings.VariableUpdateGroupSize = []int32{1, 2, 3} //TODO(rdelvalle): Deep copy job struct to avoid unexpected behavior return &UpdateJob{Job: job, req: req} @@ -139,6 +138,18 @@ func (u *UpdateJob) RollbackOnFail(rollback bool) *UpdateJob { return u } +func (u *UpdateJob) UpdateStrategy(strategy aurora.JobUpdateStrategyType) *UpdateJob { + u.req.Settings.UpdateStrategyType = &strategy + return u +} + +func (u *UpdateJob) GroupsSize(groupSizes []int32) *UpdateJob { + u.req.Settings.GroupsSize = make([]int32, len(groupSizes)) + + copy(u.req.Settings.GroupsSize, groupSizes) + return u +} + func NewUpdateSettings() *aurora.JobUpdateSettings { us := new(aurora.JobUpdateSettings)