Breaking Changes
Change in StartUpdate API API now retunrs a (*StartJobUpdateResult_, error) tuple instead of (*aurora.Response, error) Change in GetJobSummary API API now returns a []*aurora.JobUpdateSummary slice instead of (*aurora.Response, error)
This commit is contained in:
parent
d74833ac2b
commit
1c732146b1
7 changed files with 85 additions and 53 deletions
81
realis.go
81
realis.go
|
@ -47,11 +47,11 @@ type Realis interface {
|
|||
CreateJob(auroraJob Job) error
|
||||
CreateService(
|
||||
auroraJob Job,
|
||||
settings *aurora.JobUpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error)
|
||||
settings *aurora.JobUpdateSettings) (*aurora.StartJobUpdateResult_, error)
|
||||
DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error)
|
||||
FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error)
|
||||
GetInstanceIds(key *aurora.JobKey, states []aurora.ScheduleStatus) ([]int32, error)
|
||||
GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error)
|
||||
GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) ([]*aurora.JobUpdateSummary, error)
|
||||
GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error)
|
||||
GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error)
|
||||
GetJobs(role string) (*aurora.Response, *aurora.GetJobsResult_, error)
|
||||
|
@ -64,7 +64,7 @@ type Realis interface {
|
|||
RestartJob(key *aurora.JobKey) (*aurora.Response, error)
|
||||
RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error)
|
||||
ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
|
||||
StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error)
|
||||
StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.StartJobUpdateResult_, error)
|
||||
|
||||
PauseJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error)
|
||||
ResumeJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error)
|
||||
|
@ -575,7 +575,7 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states []aurora.Schedu
|
|||
|
||||
}
|
||||
|
||||
func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error) {
|
||||
func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) ([]*aurora.JobUpdateSummary, error) {
|
||||
|
||||
r.logger.debugPrintf("GetJobUpdateSummaries Thrift Payload: %+v\n", jobUpdateQuery)
|
||||
|
||||
|
@ -591,7 +591,12 @@ func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue
|
|||
return nil, errors.Wrap(retryErr, "error getting job update summaries from Aurora Scheduler")
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
if resp.GetResult_() == nil || resp.GetResult_().GetGetJobUpdateSummariesResult_() == nil {
|
||||
return nil, errors.New("unexpected response from scheduler")
|
||||
|
||||
}
|
||||
|
||||
return resp.GetResult_().GetGetJobUpdateSummariesResult_().GetUpdateSummaries(), nil
|
||||
}
|
||||
|
||||
func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsResult_, error) {
|
||||
|
@ -673,10 +678,15 @@ func (r *realisClient) CreateJob(auroraJob Job) error {
|
|||
func() (*aurora.Response, error) {
|
||||
return r.client.CreateJob(context.TODO(), auroraJob.JobConfig())
|
||||
},
|
||||
// Verify by checking the number of tasks in an active state
|
||||
// match the number of instances in the original job
|
||||
func() (*aurora.Response, bool) {
|
||||
getTaskResp, err := r.client.GetTasksStatus(
|
||||
context.TODO(),
|
||||
&aurora.TaskQuery{JobKeys: []*aurora.JobKey{auroraJob.JobKey()}},
|
||||
&aurora.TaskQuery{
|
||||
JobKeys: []*aurora.JobKey{auroraJob.JobKey()},
|
||||
Statuses: aurora.ACTIVE_STATES,
|
||||
},
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
|
@ -688,7 +698,7 @@ func (r *realisClient) CreateJob(auroraJob Job) error {
|
|||
return nil, false
|
||||
}
|
||||
|
||||
return nil, true
|
||||
return &aurora.Response{ResponseCode: aurora.ResponseCode_OK}, true
|
||||
},
|
||||
)
|
||||
|
||||
|
@ -701,25 +711,20 @@ func (r *realisClient) CreateJob(auroraJob Job) error {
|
|||
// CreateService uses the scheduler's updating mechanism to create a job.
|
||||
func (r *realisClient) CreateService(
|
||||
auroraJob Job,
|
||||
settings *aurora.JobUpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error) {
|
||||
settings *aurora.JobUpdateSettings) (*aurora.StartJobUpdateResult_, error) {
|
||||
// Create a new job update object and ship it to the StartJobUpdate api
|
||||
update := NewUpdateJob(auroraJob.TaskConfig(), settings)
|
||||
update.InstanceCount(auroraJob.GetInstanceCount())
|
||||
|
||||
resp, err := r.StartJobUpdate(update, "")
|
||||
jobUpdateResult, err := r.StartJobUpdate(update, "")
|
||||
if err != nil {
|
||||
if IsTimeout(err) {
|
||||
return resp, nil, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp, nil, errors.Wrap(err, "unable to create service")
|
||||
return nil, errors.Wrap(err, "unable to create service")
|
||||
}
|
||||
|
||||
if resp.GetResult_() != nil {
|
||||
return resp, resp.GetResult_().GetStartJobUpdateResult_(), nil
|
||||
}
|
||||
|
||||
return resp, nil, errors.New("results object is nil")
|
||||
return jobUpdateResult, nil
|
||||
}
|
||||
|
||||
func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) {
|
||||
|
@ -826,7 +831,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
|
|||
}
|
||||
|
||||
// StartJobUpdate updates all instances under a job configuration.
|
||||
func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) {
|
||||
func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.StartJobUpdateResult_, error) {
|
||||
|
||||
r.logger.debugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message)
|
||||
|
||||
|
@ -835,18 +840,50 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au
|
|||
func() (*aurora.Response, error) {
|
||||
return r.client.StartJobUpdate(context.TODO(), updateJob.req, message)
|
||||
},
|
||||
nil,
|
||||
func() (*aurora.Response, bool) {
|
||||
summariesResp, err := r.readonlyClient.GetJobUpdateSummaries(
|
||||
context.TODO(),
|
||||
&aurora.JobUpdateQuery{
|
||||
JobKey: updateJob.JobKey(),
|
||||
UpdateStatuses: aurora.ACTIVE_JOB_UPDATE_STATES,
|
||||
Limit: 1,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
summaries := response.JobUpdateSummaries(summariesResp)
|
||||
if len(summaries) == 0 {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
return &aurora.Response{
|
||||
ResponseCode: aurora.ResponseCode_OK,
|
||||
Result_: &aurora.Result_{
|
||||
StartJobUpdateResult_: &aurora.StartJobUpdateResult_{
|
||||
UpdateSummary: summaries[0],
|
||||
Key: summaries[0].Key,
|
||||
},
|
||||
},
|
||||
}, true
|
||||
},
|
||||
)
|
||||
|
||||
if retryErr != nil {
|
||||
// A timeout took place when attempting this call, attempt to recover
|
||||
if IsTimeout(retryErr) {
|
||||
return resp, retryErr
|
||||
return nil, retryErr
|
||||
}
|
||||
|
||||
return resp, errors.Wrap(retryErr, "error sending StartJobUpdate command to Aurora Scheduler")
|
||||
return nil, errors.Wrap(retryErr, "error sending StartJobUpdate command to Aurora Scheduler")
|
||||
}
|
||||
return resp, nil
|
||||
|
||||
if resp.GetResult_() == nil {
|
||||
return nil, errors.New("no result in response")
|
||||
}
|
||||
|
||||
return resp.GetResult_().GetStartJobUpdateResult_(), nil
|
||||
}
|
||||
|
||||
// AbortJobUpdate terminates a job update in the scheduler.
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue