From 1f779d0b2f50f5fb3be6bd6845c153723f79d6df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A1n=20Del=20Valle?= Date: Fri, 7 May 2021 15:12:03 -0700 Subject: [PATCH] Making the API backwards compatible. --- examples/client.go | 17 +++++++++------ helpers.go | 24 +++++++++++++++++++++ realis.go | 49 +++++++++++++++++------------------------- realis_e2e_test.go | 53 ++++++++++++++++++++++++++-------------------- 4 files changed, 85 insertions(+), 58 deletions(-) create mode 100644 helpers.go diff --git a/examples/client.go b/examples/client.go index bd3fb44..69e3751 100644 --- a/examples/client.go +++ b/examples/client.go @@ -166,10 +166,11 @@ func main() { switch cmd { case "create": fmt.Println("Creating job") - err := r.CreateJob(job) + resp, err := r.CreateJob(job) if err != nil { log.Fatalln(err) } + fmt.Println(resp.String()) if ok, mErr := monitor.Instances(job.JobKey(), job.GetInstanceCount(), 5, 50); !ok || mErr != nil { _, err := r.KillJob(job.JobKey()) @@ -184,9 +185,10 @@ func main() { fmt.Println("Creating service") settings := realis.NewUpdateSettings() job.InstanceCount(3) - result, err := r.CreateService(job, settings) + resp, result, err := r.CreateService(job, settings) if err != nil { log.Println("error: ", err) + log.Fatal("response: ", resp.String()) } fmt.Println(result.String()) @@ -203,10 +205,11 @@ func main() { fmt.Println("Creating a docker based job") container := realis.NewDockerContainer().Image("python:2.7").AddParameter("network", "host") job.Container(container) - err := r.CreateJob(job) + resp, err := r.CreateJob(job) if err != nil { log.Fatal(err) } + fmt.Println(resp.String()) if ok, err := monitor.Instances(job.JobKey(), job.GetInstanceCount(), 10, 300); !ok || err != nil { _, err := r.KillJob(job.JobKey()) @@ -219,10 +222,11 @@ func main() { fmt.Println("Creating a docker based job") container := realis.NewMesosContainer().DockerImage("python", "2.7") job.Container(container) - err := r.CreateJob(job) + resp, err := r.CreateJob(job) if err != nil { log.Fatal(err) } + fmt.Println(resp.String()) if ok, err := monitor.Instances(job.JobKey(), job.GetInstanceCount(), 10, 300); !ok || err != nil { _, err := r.KillJob(job.JobKey()) @@ -365,12 +369,13 @@ func main() { updateJob := realis.NewDefaultUpdateJob(taskConfig) updateJob.InstanceCount(5).RAM(128) - result, err := r.StartJobUpdate(updateJob, "") + resp, err := r.StartJobUpdate(updateJob, "") if err != nil { log.Fatal(err) } - _, _ = monitor.JobUpdate(*result.GetKey(), 5, 500) + jobUpdateKey := response.JobUpdateKey(resp) + monitor.JobUpdate(*jobUpdateKey, 5, 500) case "pauseJobUpdate": resp, err := r.PauseJobUpdate(&aurora.JobUpdateKey{ diff --git a/helpers.go b/helpers.go new file mode 100644 index 0000000..526e6e3 --- /dev/null +++ b/helpers.go @@ -0,0 +1,24 @@ +package realis + +import ( + "context" + + "github.com/paypal/gorealis/gen-go/apache/aurora" +) + +func (r *realisClient) jobExists(key aurora.JobKey) bool { + resp, err := r.client.GetConfigSummary(context.TODO(), &key) + if err != nil { + return false + } + + if resp == nil || + resp.GetResult_() == nil || + resp.GetResult_().GetConfigSummaryResult_() == nil || + resp.GetResult_().GetConfigSummaryResult_().GetSummary() == nil || + resp.GetResponseCode() != aurora.ResponseCode_OK { + return false + } + + return true +} diff --git a/realis.go b/realis.go index 55eecf2..27eac29 100644 --- a/realis.go +++ b/realis.go @@ -44,10 +44,10 @@ const version = "1.23.1" type Realis interface { AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) - CreateJob(auroraJob Job) error + CreateJob(auroraJob Job) (*aurora.Response, error) CreateService( auroraJob Job, - settings *aurora.JobUpdateSettings) (*aurora.StartJobUpdateResult_, error) + settings *aurora.JobUpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error) GetInstanceIds(key *aurora.JobKey, states []aurora.ScheduleStatus) ([]int32, error) @@ -64,8 +64,7 @@ type Realis interface { RestartJob(key *aurora.JobKey) (*aurora.Response, error) RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) - StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.StartJobUpdateResult_, error) - + StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) PauseJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error) ResumeJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error) PulseJobUpdate(key *aurora.JobUpdateKey) (*aurora.Response, error) @@ -668,12 +667,12 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { // Although this API is able to create service jobs, it is better to use CreateService instead // as that API uses the update thrift call which has a few extra features available. // Use this API to create ad-hoc jobs. -func (r *realisClient) CreateJob(auroraJob Job) error { +func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { r.logger.debugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig()) // Response is checked by the thrift retry code - _, retryErr := r.thriftCallWithRetries( + resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { return r.client.CreateJob(context.TODO(), auroraJob.JobConfig()) @@ -681,46 +680,38 @@ func (r *realisClient) CreateJob(auroraJob Job) error { // On a client timeout, attempt to verify that payload made to the Scheduler by // trying to get the config summary for the job key func() (*aurora.Response, bool) { - configResp, err := r.client.GetConfigSummary(context.TODO(), auroraJob.JobKey()) - if err != nil { - return nil, false + if r.jobExists(*auroraJob.JobKey()) { + return &aurora.Response{ResponseCode: aurora.ResponseCode_OK}, true } - if configResp == nil || - configResp.GetResult_() == nil || - configResp.GetResult_().GetConfigSummaryResult_() == nil || - configResp.GetResult_().GetConfigSummaryResult_().GetSummary() == nil || - configResp.GetResponseCode() != aurora.ResponseCode_OK { - return nil, false - } - - return &aurora.Response{ResponseCode: aurora.ResponseCode_OK}, true + return nil, false }, ) if retryErr != nil { - return errors.Wrap(retryErr, "error sending Create command to Aurora Scheduler") + return resp, errors.Wrap(retryErr, "error sending Create command to Aurora Scheduler") } - return nil + + return resp, nil } // CreateService uses the scheduler's updating mechanism to create a job. func (r *realisClient) CreateService( auroraJob Job, - settings *aurora.JobUpdateSettings) (*aurora.StartJobUpdateResult_, error) { + settings *aurora.JobUpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error) { // Create a new job update object and ship it to the StartJobUpdate api update := NewUpdateJob(auroraJob.TaskConfig(), settings) update.InstanceCount(auroraJob.GetInstanceCount()) - jobUpdateResult, err := r.StartJobUpdate(update, "") + resp, err := r.StartJobUpdate(update, "") if err != nil { if IsTimeout(err) { - return nil, err + return nil, nil, err } - return nil, errors.Wrap(err, "unable to create service") + return resp, nil, errors.Wrap(err, "unable to create service") } - return jobUpdateResult, nil + return resp, resp.GetResult_().StartJobUpdateResult_, nil } func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) { @@ -827,7 +818,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) } // StartJobUpdate updates all instances under a job configuration. -func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.StartJobUpdateResult_, error) { +func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) { r.logger.debugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message) @@ -872,14 +863,14 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au return nil, retryErr } - return nil, errors.Wrap(retryErr, "error sending StartJobUpdate command to Aurora Scheduler") + return resp, errors.Wrap(retryErr, "error sending StartJobUpdate command to Aurora Scheduler") } if resp.GetResult_() == nil { - return nil, errors.New("no result in response") + return resp, errors.New("no result in response") } - return resp.GetResult_().GetStartJobUpdateResult_(), nil + return resp, nil } // AbortJobUpdate terminates a job update in the scheduler. diff --git a/realis_e2e_test.go b/realis_e2e_test.go index d3c27e4..27e8b03 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -211,7 +211,7 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) { InstanceCount(2). AddPorts(1) - err := r.CreateJob(job) + _, err := r.CreateJob(job) require.NoError(t, err) // Test Instances Monitor @@ -296,7 +296,7 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) { AddLabel("hostname", "chips"). AddLabel("chips", "chips") - err := r.CreateJob(job) + _, err := r.CreateJob(job) require.NoError(t, err) success, err := monitor.Instances(job.JobKey(), 2, 1, 50) @@ -313,7 +313,7 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) { AddValueConstraint("zone", false, "east"). AddValueConstraint("zone", true, "west") - err := r.CreateJob(job) + _, err := r.CreateJob(job) require.NoError(t, err) success, err := monitor.Instances(job.JobKey(), 2, 1, 50) @@ -330,7 +330,7 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) { AddValueConstraint("zone", true, "west", "east"). AddLimitConstraint("zone", 2) - err := r.CreateJob(job) + _, err := r.CreateJob(job) require.NoError(t, err) success, err := monitor.Instances(job.JobKey(), 2, 1, 50) @@ -356,8 +356,9 @@ func TestRealisClient_CreateJob_ExecutorDoesNotExist(t *testing.T) { Disk(10). InstanceCount(1) - err := r.CreateJob(job) + resp, err := r.CreateJob(job) assert.Error(t, err) + assert.Equal(t, aurora.ResponseCode_INVALID_REQUEST, resp.GetResponseCode()) } // Test configuring an executor that doesn't exist for CreateJob API @@ -379,8 +380,9 @@ func TestRealisClient_GetPendingReason(t *testing.T) { Disk(100). InstanceCount(1) - err := r.CreateJob(job) + resp, err := r.CreateJob(job) require.NoError(t, err) + assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode) taskQ := &aurora.TaskQuery{ Role: &role, @@ -421,7 +423,7 @@ func TestRealisClient_CreateService_WithPulse_Thermos(t *testing.T) { settings.WaitForBatchCompletion = true job.InstanceCount(2) - result, err := r.CreateService(job, settings) + _, result, err := r.CreateService(job, settings) require.NoError(t, err) updateQ := aurora.JobUpdateQuery{ @@ -497,7 +499,7 @@ func TestRealisClient_CreateService(t *testing.T) { settings.MinWaitInInstanceRunningMs = 5000 job.InstanceCount(3) - result, err := r.CreateService(job, settings) + _, result, err := r.CreateService(job, settings) require.NoError(t, err) assert.NotNil(t, result) @@ -547,7 +549,7 @@ func TestRealisClient_CreateService(t *testing.T) { job.Name("createService_timeout") // Make sure a timedout error was returned - _, err = timeoutClient.CreateService(job, settings) + _, _, err = timeoutClient.CreateService(job, settings) require.Error(t, err) assert.True(t, realis.IsTimeout(err)) @@ -575,7 +577,7 @@ func TestRealisClient_CreateService(t *testing.T) { job.Name("createService_timeout_bad_payload") // Make sure a timedout error was returned - _, err = timeoutClient.CreateService(job, settings) + _, _, err = timeoutClient.CreateService(job, settings) require.Error(t, err) assert.True(t, realis.IsTimeout(err)) @@ -587,7 +589,7 @@ func TestRealisClient_CreateService(t *testing.T) { assert.NoError(t, err) // Payload should have been rejected, no update should exist - require.Len(t, summary, 0) + require.Len(t, summary.GetResult_().GetGetJobUpdateSummariesResult_().GetUpdateSummaries(), 0) }) } @@ -609,9 +611,10 @@ func TestRealisClient_CreateService_ExecutorDoesNotExist(t *testing.T) { settings := realis.NewUpdateSettings() job.InstanceCount(3) - result, err := r.CreateService(job, settings) + resp, result, err := r.CreateService(job, settings) require.Error(t, err) assert.Nil(t, result) + require.Equal(t, aurora.ResponseCode_INVALID_REQUEST, resp.GetResponseCode()) } func TestRealisClient_ScheduleCronJob_Thermos(t *testing.T) { @@ -764,7 +767,7 @@ func TestRealisClient_SessionThreadSafety(t *testing.T) { Disk(10). InstanceCount(1000) // Impossible amount to go live in any sane machine - err := r.CreateJob(job) + _, err := r.CreateJob(job) assert.NoError(t, err) wg := sync.WaitGroup{} @@ -859,7 +862,7 @@ func TestRealisClient_PartitionPolicy(t *testing.T) { settings := realis.NewUpdateSettings() settings.UpdateGroupSize = 2 - result, err := r.CreateService(job, settings) + _, result, err := r.CreateService(job, settings) assert.NoError(t, err) var ok bool @@ -922,7 +925,7 @@ func TestAuroraJob_UpdateSlaPolicy(t *testing.T) { settings.UpdateGroupSize = 2 settings.MinWaitInInstanceRunningMs = 5 * 1000 - result, err := r.CreateService(job, settings) + _, result, err := r.CreateService(job, settings) require.NoError(t, err) assert.NotNil(t, result) @@ -992,15 +995,17 @@ func TestRealisClient_UpdateStrategies(t *testing.T) { for _, strategy := range strategies { t.Run("TestRealisClient_UpdateStrategies_"+strategy.Name, func(t *testing.T) { job.Name("update_strategies_" + strategy.Name) - result, err := r.StartJobUpdate(strategy.UpdateJob, "") + resp, err := r.StartJobUpdate(strategy.UpdateJob, "") assert.NoError(t, err) - assert.NotNil(t, result) - assert.NotNil(t, result.GetKey()) + assert.NotNil(t, resp) + assert.NotNil(t, resp.GetResult_()) + assert.NotNil(t, resp.GetResult_().GetStartJobUpdateResult_()) + assert.NotNil(t, resp.GetResult_().GetStartJobUpdateResult_().GetKey()) var ok bool var mErr error - key := *result.GetKey() + key := *resp.GetResult_().GetStartJobUpdateResult_().GetKey() if ok, mErr = monitor.JobUpdate(key, 5, 240); !ok || mErr != nil { // Update may already be in a terminal state so don't check for error @@ -1035,12 +1040,14 @@ func TestRealisClient_BatchAwareAutoPause(t *testing.T) { InstanceCount(6). WatchTime(1000) - result, err := r.StartJobUpdate(strategy, "") + resp, err := r.StartJobUpdate(strategy, "") require.NoError(t, err) - require.NotNil(t, result) - require.NotNil(t, result.GetKey()) + require.NotNil(t, resp) + require.NotNil(t, resp.GetResult_()) + require.NotNil(t, resp.GetResult_().GetStartJobUpdateResult_()) + require.NotNil(t, resp.GetResult_().GetStartJobUpdateResult_().GetKey()) - key := *result.GetKey() + key := *resp.GetResult_().GetStartJobUpdateResult_().GetKey() for i := range updateGroups { curStep, mErr := monitor.AutoPausedUpdateMonitor(key, time.Second*5, time.Second*240)