From 1c732146b1b466d974740580255d3bf6eba2340f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A1n=20Del=20Valle?= Date: Sat, 1 May 2021 13:08:33 -0700 Subject: [PATCH] Breaking Changes Change in StartUpdate API API now retunrs a (*StartJobUpdateResult_, error) tuple instead of (*aurora.Response, error) Change in GetJobSummary API API now returns a []*aurora.JobUpdateSummary slice instead of (*aurora.Response, error) --- examples/client.go | 8 ++--- monitors.go | 5 +-- realis.go | 81 ++++++++++++++++++++++++++++++++------------ realis_e2e_test.go | 37 +++++++++----------- response/response.go | 4 +++ retry.go | 1 + util.go | 2 +- 7 files changed, 85 insertions(+), 53 deletions(-) diff --git a/examples/client.go b/examples/client.go index c57a9fe..bd3fb44 100644 --- a/examples/client.go +++ b/examples/client.go @@ -184,10 +184,9 @@ func main() { fmt.Println("Creating service") settings := realis.NewUpdateSettings() job.InstanceCount(3) - resp, result, err := r.CreateService(job, settings) + result, err := r.CreateService(job, settings) if err != nil { log.Println("error: ", err) - log.Fatal("response: ", resp.String()) } fmt.Println(result.String()) @@ -366,13 +365,12 @@ func main() { updateJob := realis.NewDefaultUpdateJob(taskConfig) updateJob.InstanceCount(5).RAM(128) - resp, err := r.StartJobUpdate(updateJob, "") + result, err := r.StartJobUpdate(updateJob, "") if err != nil { log.Fatal(err) } - jobUpdateKey := response.JobUpdateKey(resp) - monitor.JobUpdate(*jobUpdateKey, 5, 500) + _, _ = monitor.JobUpdate(*result.GetKey(), 5, 500) case "pauseJobUpdate": resp, err := r.PauseJobUpdate(&aurora.JobUpdateKey{ diff --git a/monitors.go b/monitors.go index 0d9423a..194c2fb 100644 --- a/monitors.go +++ b/monitors.go @@ -96,17 +96,14 @@ func (m *Monitor) JobUpdateQuery( timer := time.NewTimer(timeout) defer timer.Stop() - var cliErr error - var respDetail *aurora.Response for { select { case <-ticker.C: - respDetail, cliErr = m.Client.GetJobUpdateSummaries(&updateQuery) + updateSummaries, cliErr := m.Client.GetJobUpdateSummaries(&updateQuery) if cliErr != nil { return nil, cliErr } - updateSummaries := respDetail.Result_.GetJobUpdateSummariesResult_.UpdateSummaries if len(updateSummaries) >= 1 { return updateSummaries, nil } diff --git a/realis.go b/realis.go index e9ea136..4c18b18 100644 --- a/realis.go +++ b/realis.go @@ -47,11 +47,11 @@ type Realis interface { CreateJob(auroraJob Job) error CreateService( auroraJob Job, - settings *aurora.JobUpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error) + settings *aurora.JobUpdateSettings) (*aurora.StartJobUpdateResult_, error) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error) GetInstanceIds(key *aurora.JobKey, states []aurora.ScheduleStatus) ([]int32, error) - GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error) + GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) ([]*aurora.JobUpdateSummary, error) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) GetJobs(role string) (*aurora.Response, *aurora.GetJobsResult_, error) @@ -64,7 +64,7 @@ type Realis interface { RestartJob(key *aurora.JobKey) (*aurora.Response, error) RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) - StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) + StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.StartJobUpdateResult_, error) PauseJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error) ResumeJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error) @@ -575,7 +575,7 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states []aurora.Schedu } -func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error) { +func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) ([]*aurora.JobUpdateSummary, error) { r.logger.debugPrintf("GetJobUpdateSummaries Thrift Payload: %+v\n", jobUpdateQuery) @@ -591,7 +591,12 @@ func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue return nil, errors.Wrap(retryErr, "error getting job update summaries from Aurora Scheduler") } - return resp, nil + if resp.GetResult_() == nil || resp.GetResult_().GetGetJobUpdateSummariesResult_() == nil { + return nil, errors.New("unexpected response from scheduler") + + } + + return resp.GetResult_().GetGetJobUpdateSummariesResult_().GetUpdateSummaries(), nil } func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsResult_, error) { @@ -673,10 +678,15 @@ func (r *realisClient) CreateJob(auroraJob Job) error { func() (*aurora.Response, error) { return r.client.CreateJob(context.TODO(), auroraJob.JobConfig()) }, + // Verify by checking the number of tasks in an active state + // match the number of instances in the original job func() (*aurora.Response, bool) { getTaskResp, err := r.client.GetTasksStatus( context.TODO(), - &aurora.TaskQuery{JobKeys: []*aurora.JobKey{auroraJob.JobKey()}}, + &aurora.TaskQuery{ + JobKeys: []*aurora.JobKey{auroraJob.JobKey()}, + Statuses: aurora.ACTIVE_STATES, + }, ) if err != nil { @@ -688,7 +698,7 @@ func (r *realisClient) CreateJob(auroraJob Job) error { return nil, false } - return nil, true + return &aurora.Response{ResponseCode: aurora.ResponseCode_OK}, true }, ) @@ -701,25 +711,20 @@ func (r *realisClient) CreateJob(auroraJob Job) error { // CreateService uses the scheduler's updating mechanism to create a job. func (r *realisClient) CreateService( auroraJob Job, - settings *aurora.JobUpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error) { + settings *aurora.JobUpdateSettings) (*aurora.StartJobUpdateResult_, error) { // Create a new job update object and ship it to the StartJobUpdate api update := NewUpdateJob(auroraJob.TaskConfig(), settings) update.InstanceCount(auroraJob.GetInstanceCount()) - resp, err := r.StartJobUpdate(update, "") + jobUpdateResult, err := r.StartJobUpdate(update, "") if err != nil { if IsTimeout(err) { - return resp, nil, err + return nil, err } - - return resp, nil, errors.Wrap(err, "unable to create service") + return nil, errors.Wrap(err, "unable to create service") } - if resp.GetResult_() != nil { - return resp, resp.GetResult_().GetStartJobUpdateResult_(), nil - } - - return resp, nil, errors.New("results object is nil") + return jobUpdateResult, nil } func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) { @@ -826,7 +831,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) } // StartJobUpdate updates all instances under a job configuration. -func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) { +func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.StartJobUpdateResult_, error) { r.logger.debugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message) @@ -835,18 +840,50 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au func() (*aurora.Response, error) { return r.client.StartJobUpdate(context.TODO(), updateJob.req, message) }, - nil, + func() (*aurora.Response, bool) { + summariesResp, err := r.readonlyClient.GetJobUpdateSummaries( + context.TODO(), + &aurora.JobUpdateQuery{ + JobKey: updateJob.JobKey(), + UpdateStatuses: aurora.ACTIVE_JOB_UPDATE_STATES, + Limit: 1, + }) + + if err != nil { + return nil, false + } + + summaries := response.JobUpdateSummaries(summariesResp) + if len(summaries) == 0 { + return nil, false + } + + return &aurora.Response{ + ResponseCode: aurora.ResponseCode_OK, + Result_: &aurora.Result_{ + StartJobUpdateResult_: &aurora.StartJobUpdateResult_{ + UpdateSummary: summaries[0], + Key: summaries[0].Key, + }, + }, + }, true + }, ) if retryErr != nil { // A timeout took place when attempting this call, attempt to recover if IsTimeout(retryErr) { - return resp, retryErr + return nil, retryErr } - return resp, errors.Wrap(retryErr, "error sending StartJobUpdate command to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "error sending StartJobUpdate command to Aurora Scheduler") } - return resp, nil + + if resp.GetResult_() == nil { + return nil, errors.New("no result in response") + } + + return resp.GetResult_().GetStartJobUpdateResult_(), nil } // AbortJobUpdate terminates a job update in the scheduler. diff --git a/realis_e2e_test.go b/realis_e2e_test.go index 35a9051..d3c27e4 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -421,7 +421,7 @@ func TestRealisClient_CreateService_WithPulse_Thermos(t *testing.T) { settings.WaitForBatchCompletion = true job.InstanceCount(2) - _, result, err := r.CreateService(job, settings) + result, err := r.CreateService(job, settings) require.NoError(t, err) updateQ := aurora.JobUpdateQuery{ @@ -497,7 +497,7 @@ func TestRealisClient_CreateService(t *testing.T) { settings.MinWaitInInstanceRunningMs = 5000 job.InstanceCount(3) - _, result, err := r.CreateService(job, settings) + result, err := r.CreateService(job, settings) require.NoError(t, err) assert.NotNil(t, result) @@ -547,7 +547,7 @@ func TestRealisClient_CreateService(t *testing.T) { job.Name("createService_timeout") // Make sure a timedout error was returned - _, _, err = timeoutClient.CreateService(job, settings) + _, err = timeoutClient.CreateService(job, settings) require.Error(t, err) assert.True(t, realis.IsTimeout(err)) @@ -575,7 +575,7 @@ func TestRealisClient_CreateService(t *testing.T) { job.Name("createService_timeout_bad_payload") // Make sure a timedout error was returned - _, _, err = timeoutClient.CreateService(job, settings) + _, err = timeoutClient.CreateService(job, settings) require.Error(t, err) assert.True(t, realis.IsTimeout(err)) @@ -587,7 +587,7 @@ func TestRealisClient_CreateService(t *testing.T) { assert.NoError(t, err) // Payload should have been rejected, no update should exist - require.Len(t, summary.GetResult_().GetGetJobUpdateSummariesResult_().GetUpdateSummaries(), 0) + require.Len(t, summary, 0) }) } @@ -609,10 +609,9 @@ func TestRealisClient_CreateService_ExecutorDoesNotExist(t *testing.T) { settings := realis.NewUpdateSettings() job.InstanceCount(3) - resp, result, err := r.CreateService(job, settings) + result, err := r.CreateService(job, settings) require.Error(t, err) assert.Nil(t, result) - require.Equal(t, aurora.ResponseCode_INVALID_REQUEST, resp.GetResponseCode()) } func TestRealisClient_ScheduleCronJob_Thermos(t *testing.T) { @@ -860,7 +859,7 @@ func TestRealisClient_PartitionPolicy(t *testing.T) { settings := realis.NewUpdateSettings() settings.UpdateGroupSize = 2 - _, result, err := r.CreateService(job, settings) + result, err := r.CreateService(job, settings) assert.NoError(t, err) var ok bool @@ -923,7 +922,7 @@ func TestAuroraJob_UpdateSlaPolicy(t *testing.T) { settings.UpdateGroupSize = 2 settings.MinWaitInInstanceRunningMs = 5 * 1000 - _, result, err := r.CreateService(job, settings) + result, err := r.CreateService(job, settings) require.NoError(t, err) assert.NotNil(t, result) @@ -993,17 +992,15 @@ func TestRealisClient_UpdateStrategies(t *testing.T) { for _, strategy := range strategies { t.Run("TestRealisClient_UpdateStrategies_"+strategy.Name, func(t *testing.T) { job.Name("update_strategies_" + strategy.Name) - resp, err := r.StartJobUpdate(strategy.UpdateJob, "") + result, err := r.StartJobUpdate(strategy.UpdateJob, "") assert.NoError(t, err) - assert.NotNil(t, resp) - assert.NotNil(t, resp.GetResult_()) - assert.NotNil(t, resp.GetResult_().GetStartJobUpdateResult_()) - assert.NotNil(t, resp.GetResult_().GetStartJobUpdateResult_().GetKey()) + assert.NotNil(t, result) + assert.NotNil(t, result.GetKey()) var ok bool var mErr error - key := *resp.GetResult_().GetStartJobUpdateResult_().GetKey() + key := *result.GetKey() if ok, mErr = monitor.JobUpdate(key, 5, 240); !ok || mErr != nil { // Update may already be in a terminal state so don't check for error @@ -1038,14 +1035,12 @@ func TestRealisClient_BatchAwareAutoPause(t *testing.T) { InstanceCount(6). WatchTime(1000) - resp, err := r.StartJobUpdate(strategy, "") + result, err := r.StartJobUpdate(strategy, "") require.NoError(t, err) - require.NotNil(t, resp) - require.NotNil(t, resp.GetResult_()) - require.NotNil(t, resp.GetResult_().GetStartJobUpdateResult_()) - require.NotNil(t, resp.GetResult_().GetStartJobUpdateResult_().GetKey()) + require.NotNil(t, result) + require.NotNil(t, result.GetKey()) - key := *resp.GetResult_().GetStartJobUpdateResult_().GetKey() + key := *result.GetKey() for i := range updateGroups { curStep, mErr := monitor.AutoPausedUpdateMonitor(key, time.Second*5, time.Second*240) diff --git a/response/response.go b/response/response.go index b77348d..4a67ca0 100644 --- a/response/response.go +++ b/response/response.go @@ -36,6 +36,10 @@ func ScheduleStatusResult(resp *aurora.Response) *aurora.ScheduleStatusResult_ { } func JobUpdateSummaries(resp *aurora.Response) []*aurora.JobUpdateSummary { + if resp.GetResult_() == nil || resp.GetResult_().GetGetJobUpdateSummariesResult_() == nil { + return nil + } + return resp.GetResult_().GetGetJobUpdateSummariesResult_().GetUpdateSummaries() } diff --git a/retry.go b/retry.go index c7b3ef6..4108015 100644 --- a/retry.go +++ b/retry.go @@ -182,6 +182,7 @@ func (r *realisClient) thriftCallWithRetries( // Allow caller to provide a function which checks if the original call was successful before // it timed out. if verifyOnTimeout != nil { + if verifyResp, ok := verifyOnTimeout(); ok { r.logger.debugPrint("verified that the call went through successfully") // Response here might be different than the original. diff --git a/util.go b/util.go index 19930e2..4307d1c 100644 --- a/util.go +++ b/util.go @@ -29,7 +29,7 @@ var TerminalStates = make(map[aurora.ScheduleStatus]bool) // ActiveJobUpdateStates - States a Job Update may be in where it is considered active. var ActiveJobUpdateStates = make(map[aurora.JobUpdateStatus]bool) -// TerminalJobUpdateStates returns a slice containing all the terminal states an update may end up in. +// TerminalUpdateStates returns a slice containing all the terminal states an update may be in. // This is a function in order to avoid having a slice that can be accidentally mutated. func TerminalUpdateStates() []aurora.JobUpdateStatus { return []aurora.JobUpdateStatus{