Making the API backwards compatible.

This commit is contained in:
Renán Del Valle 2021-05-07 15:12:03 -07:00
parent c0f09ffd06
commit 1f779d0b2f
No known key found for this signature in database
GPG key ID: C240AD6D6F443EC9
4 changed files with 85 additions and 58 deletions

View file

@ -166,10 +166,11 @@ func main() {
switch cmd { switch cmd {
case "create": case "create":
fmt.Println("Creating job") fmt.Println("Creating job")
err := r.CreateJob(job) resp, err := r.CreateJob(job)
if err != nil { if err != nil {
log.Fatalln(err) log.Fatalln(err)
} }
fmt.Println(resp.String())
if ok, mErr := monitor.Instances(job.JobKey(), job.GetInstanceCount(), 5, 50); !ok || mErr != nil { if ok, mErr := monitor.Instances(job.JobKey(), job.GetInstanceCount(), 5, 50); !ok || mErr != nil {
_, err := r.KillJob(job.JobKey()) _, err := r.KillJob(job.JobKey())
@ -184,9 +185,10 @@ func main() {
fmt.Println("Creating service") fmt.Println("Creating service")
settings := realis.NewUpdateSettings() settings := realis.NewUpdateSettings()
job.InstanceCount(3) job.InstanceCount(3)
result, err := r.CreateService(job, settings) resp, result, err := r.CreateService(job, settings)
if err != nil { if err != nil {
log.Println("error: ", err) log.Println("error: ", err)
log.Fatal("response: ", resp.String())
} }
fmt.Println(result.String()) fmt.Println(result.String())
@ -203,10 +205,11 @@ func main() {
fmt.Println("Creating a docker based job") fmt.Println("Creating a docker based job")
container := realis.NewDockerContainer().Image("python:2.7").AddParameter("network", "host") container := realis.NewDockerContainer().Image("python:2.7").AddParameter("network", "host")
job.Container(container) job.Container(container)
err := r.CreateJob(job) resp, err := r.CreateJob(job)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
fmt.Println(resp.String())
if ok, err := monitor.Instances(job.JobKey(), job.GetInstanceCount(), 10, 300); !ok || err != nil { if ok, err := monitor.Instances(job.JobKey(), job.GetInstanceCount(), 10, 300); !ok || err != nil {
_, err := r.KillJob(job.JobKey()) _, err := r.KillJob(job.JobKey())
@ -219,10 +222,11 @@ func main() {
fmt.Println("Creating a docker based job") fmt.Println("Creating a docker based job")
container := realis.NewMesosContainer().DockerImage("python", "2.7") container := realis.NewMesosContainer().DockerImage("python", "2.7")
job.Container(container) job.Container(container)
err := r.CreateJob(job) resp, err := r.CreateJob(job)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
fmt.Println(resp.String())
if ok, err := monitor.Instances(job.JobKey(), job.GetInstanceCount(), 10, 300); !ok || err != nil { if ok, err := monitor.Instances(job.JobKey(), job.GetInstanceCount(), 10, 300); !ok || err != nil {
_, err := r.KillJob(job.JobKey()) _, err := r.KillJob(job.JobKey())
@ -365,12 +369,13 @@ func main() {
updateJob := realis.NewDefaultUpdateJob(taskConfig) updateJob := realis.NewDefaultUpdateJob(taskConfig)
updateJob.InstanceCount(5).RAM(128) updateJob.InstanceCount(5).RAM(128)
result, err := r.StartJobUpdate(updateJob, "") resp, err := r.StartJobUpdate(updateJob, "")
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
_, _ = monitor.JobUpdate(*result.GetKey(), 5, 500) jobUpdateKey := response.JobUpdateKey(resp)
monitor.JobUpdate(*jobUpdateKey, 5, 500)
case "pauseJobUpdate": case "pauseJobUpdate":
resp, err := r.PauseJobUpdate(&aurora.JobUpdateKey{ resp, err := r.PauseJobUpdate(&aurora.JobUpdateKey{

24
helpers.go Normal file
View file

@ -0,0 +1,24 @@
package realis
import (
"context"
"github.com/paypal/gorealis/gen-go/apache/aurora"
)
func (r *realisClient) jobExists(key aurora.JobKey) bool {
resp, err := r.client.GetConfigSummary(context.TODO(), &key)
if err != nil {
return false
}
if resp == nil ||
resp.GetResult_() == nil ||
resp.GetResult_().GetConfigSummaryResult_() == nil ||
resp.GetResult_().GetConfigSummaryResult_().GetSummary() == nil ||
resp.GetResponseCode() != aurora.ResponseCode_OK {
return false
}
return true
}

View file

@ -44,10 +44,10 @@ const version = "1.23.1"
type Realis interface { type Realis interface {
AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error)
AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error)
CreateJob(auroraJob Job) error CreateJob(auroraJob Job) (*aurora.Response, error)
CreateService( CreateService(
auroraJob Job, auroraJob Job,
settings *aurora.JobUpdateSettings) (*aurora.StartJobUpdateResult_, error) settings *aurora.JobUpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error)
DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error)
FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error)
GetInstanceIds(key *aurora.JobKey, states []aurora.ScheduleStatus) ([]int32, error) GetInstanceIds(key *aurora.JobKey, states []aurora.ScheduleStatus) ([]int32, error)
@ -64,8 +64,7 @@ type Realis interface {
RestartJob(key *aurora.JobKey) (*aurora.Response, error) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error)
ScheduleCronJob(auroraJob Job) (*aurora.Response, error) ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.StartJobUpdateResult_, error) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error)
PauseJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error) PauseJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error)
ResumeJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error) ResumeJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error)
PulseJobUpdate(key *aurora.JobUpdateKey) (*aurora.Response, error) PulseJobUpdate(key *aurora.JobUpdateKey) (*aurora.Response, error)
@ -668,12 +667,12 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
// Although this API is able to create service jobs, it is better to use CreateService instead // Although this API is able to create service jobs, it is better to use CreateService instead
// as that API uses the update thrift call which has a few extra features available. // as that API uses the update thrift call which has a few extra features available.
// Use this API to create ad-hoc jobs. // Use this API to create ad-hoc jobs.
func (r *realisClient) CreateJob(auroraJob Job) error { func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
r.logger.debugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig()) r.logger.debugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig())
// Response is checked by the thrift retry code // Response is checked by the thrift retry code
_, retryErr := r.thriftCallWithRetries( resp, retryErr := r.thriftCallWithRetries(
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.CreateJob(context.TODO(), auroraJob.JobConfig()) return r.client.CreateJob(context.TODO(), auroraJob.JobConfig())
@ -681,46 +680,38 @@ func (r *realisClient) CreateJob(auroraJob Job) error {
// On a client timeout, attempt to verify that payload made to the Scheduler by // On a client timeout, attempt to verify that payload made to the Scheduler by
// trying to get the config summary for the job key // trying to get the config summary for the job key
func() (*aurora.Response, bool) { func() (*aurora.Response, bool) {
configResp, err := r.client.GetConfigSummary(context.TODO(), auroraJob.JobKey()) if r.jobExists(*auroraJob.JobKey()) {
if err != nil { return &aurora.Response{ResponseCode: aurora.ResponseCode_OK}, true
return nil, false
} }
if configResp == nil || return nil, false
configResp.GetResult_() == nil ||
configResp.GetResult_().GetConfigSummaryResult_() == nil ||
configResp.GetResult_().GetConfigSummaryResult_().GetSummary() == nil ||
configResp.GetResponseCode() != aurora.ResponseCode_OK {
return nil, false
}
return &aurora.Response{ResponseCode: aurora.ResponseCode_OK}, true
}, },
) )
if retryErr != nil { if retryErr != nil {
return errors.Wrap(retryErr, "error sending Create command to Aurora Scheduler") return resp, errors.Wrap(retryErr, "error sending Create command to Aurora Scheduler")
} }
return nil
return resp, nil
} }
// CreateService uses the scheduler's updating mechanism to create a job. // CreateService uses the scheduler's updating mechanism to create a job.
func (r *realisClient) CreateService( func (r *realisClient) CreateService(
auroraJob Job, auroraJob Job,
settings *aurora.JobUpdateSettings) (*aurora.StartJobUpdateResult_, error) { settings *aurora.JobUpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error) {
// Create a new job update object and ship it to the StartJobUpdate api // Create a new job update object and ship it to the StartJobUpdate api
update := NewUpdateJob(auroraJob.TaskConfig(), settings) update := NewUpdateJob(auroraJob.TaskConfig(), settings)
update.InstanceCount(auroraJob.GetInstanceCount()) update.InstanceCount(auroraJob.GetInstanceCount())
jobUpdateResult, err := r.StartJobUpdate(update, "") resp, err := r.StartJobUpdate(update, "")
if err != nil { if err != nil {
if IsTimeout(err) { if IsTimeout(err) {
return nil, err return nil, nil, err
} }
return nil, errors.Wrap(err, "unable to create service") return resp, nil, errors.Wrap(err, "unable to create service")
} }
return jobUpdateResult, nil return resp, resp.GetResult_().StartJobUpdateResult_, nil
} }
func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) { func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) {
@ -827,7 +818,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
} }
// StartJobUpdate updates all instances under a job configuration. // StartJobUpdate updates all instances under a job configuration.
func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.StartJobUpdateResult_, error) { func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) {
r.logger.debugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message) r.logger.debugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message)
@ -872,14 +863,14 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au
return nil, retryErr return nil, retryErr
} }
return nil, errors.Wrap(retryErr, "error sending StartJobUpdate command to Aurora Scheduler") return resp, errors.Wrap(retryErr, "error sending StartJobUpdate command to Aurora Scheduler")
} }
if resp.GetResult_() == nil { if resp.GetResult_() == nil {
return nil, errors.New("no result in response") return resp, errors.New("no result in response")
} }
return resp.GetResult_().GetStartJobUpdateResult_(), nil return resp, nil
} }
// AbortJobUpdate terminates a job update in the scheduler. // AbortJobUpdate terminates a job update in the scheduler.

View file

@ -211,7 +211,7 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) {
InstanceCount(2). InstanceCount(2).
AddPorts(1) AddPorts(1)
err := r.CreateJob(job) _, err := r.CreateJob(job)
require.NoError(t, err) require.NoError(t, err)
// Test Instances Monitor // Test Instances Monitor
@ -296,7 +296,7 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) {
AddLabel("hostname", "chips"). AddLabel("hostname", "chips").
AddLabel("chips", "chips") AddLabel("chips", "chips")
err := r.CreateJob(job) _, err := r.CreateJob(job)
require.NoError(t, err) require.NoError(t, err)
success, err := monitor.Instances(job.JobKey(), 2, 1, 50) success, err := monitor.Instances(job.JobKey(), 2, 1, 50)
@ -313,7 +313,7 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) {
AddValueConstraint("zone", false, "east"). AddValueConstraint("zone", false, "east").
AddValueConstraint("zone", true, "west") AddValueConstraint("zone", true, "west")
err := r.CreateJob(job) _, err := r.CreateJob(job)
require.NoError(t, err) require.NoError(t, err)
success, err := monitor.Instances(job.JobKey(), 2, 1, 50) success, err := monitor.Instances(job.JobKey(), 2, 1, 50)
@ -330,7 +330,7 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) {
AddValueConstraint("zone", true, "west", "east"). AddValueConstraint("zone", true, "west", "east").
AddLimitConstraint("zone", 2) AddLimitConstraint("zone", 2)
err := r.CreateJob(job) _, err := r.CreateJob(job)
require.NoError(t, err) require.NoError(t, err)
success, err := monitor.Instances(job.JobKey(), 2, 1, 50) success, err := monitor.Instances(job.JobKey(), 2, 1, 50)
@ -356,8 +356,9 @@ func TestRealisClient_CreateJob_ExecutorDoesNotExist(t *testing.T) {
Disk(10). Disk(10).
InstanceCount(1) InstanceCount(1)
err := r.CreateJob(job) resp, err := r.CreateJob(job)
assert.Error(t, err) assert.Error(t, err)
assert.Equal(t, aurora.ResponseCode_INVALID_REQUEST, resp.GetResponseCode())
} }
// Test configuring an executor that doesn't exist for CreateJob API // Test configuring an executor that doesn't exist for CreateJob API
@ -379,8 +380,9 @@ func TestRealisClient_GetPendingReason(t *testing.T) {
Disk(100). Disk(100).
InstanceCount(1) InstanceCount(1)
err := r.CreateJob(job) resp, err := r.CreateJob(job)
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode)
taskQ := &aurora.TaskQuery{ taskQ := &aurora.TaskQuery{
Role: &role, Role: &role,
@ -421,7 +423,7 @@ func TestRealisClient_CreateService_WithPulse_Thermos(t *testing.T) {
settings.WaitForBatchCompletion = true settings.WaitForBatchCompletion = true
job.InstanceCount(2) job.InstanceCount(2)
result, err := r.CreateService(job, settings) _, result, err := r.CreateService(job, settings)
require.NoError(t, err) require.NoError(t, err)
updateQ := aurora.JobUpdateQuery{ updateQ := aurora.JobUpdateQuery{
@ -497,7 +499,7 @@ func TestRealisClient_CreateService(t *testing.T) {
settings.MinWaitInInstanceRunningMs = 5000 settings.MinWaitInInstanceRunningMs = 5000
job.InstanceCount(3) job.InstanceCount(3)
result, err := r.CreateService(job, settings) _, result, err := r.CreateService(job, settings)
require.NoError(t, err) require.NoError(t, err)
assert.NotNil(t, result) assert.NotNil(t, result)
@ -547,7 +549,7 @@ func TestRealisClient_CreateService(t *testing.T) {
job.Name("createService_timeout") job.Name("createService_timeout")
// Make sure a timedout error was returned // Make sure a timedout error was returned
_, err = timeoutClient.CreateService(job, settings) _, _, err = timeoutClient.CreateService(job, settings)
require.Error(t, err) require.Error(t, err)
assert.True(t, realis.IsTimeout(err)) assert.True(t, realis.IsTimeout(err))
@ -575,7 +577,7 @@ func TestRealisClient_CreateService(t *testing.T) {
job.Name("createService_timeout_bad_payload") job.Name("createService_timeout_bad_payload")
// Make sure a timedout error was returned // Make sure a timedout error was returned
_, err = timeoutClient.CreateService(job, settings) _, _, err = timeoutClient.CreateService(job, settings)
require.Error(t, err) require.Error(t, err)
assert.True(t, realis.IsTimeout(err)) assert.True(t, realis.IsTimeout(err))
@ -587,7 +589,7 @@ func TestRealisClient_CreateService(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
// Payload should have been rejected, no update should exist // Payload should have been rejected, no update should exist
require.Len(t, summary, 0) require.Len(t, summary.GetResult_().GetGetJobUpdateSummariesResult_().GetUpdateSummaries(), 0)
}) })
} }
@ -609,9 +611,10 @@ func TestRealisClient_CreateService_ExecutorDoesNotExist(t *testing.T) {
settings := realis.NewUpdateSettings() settings := realis.NewUpdateSettings()
job.InstanceCount(3) job.InstanceCount(3)
result, err := r.CreateService(job, settings) resp, result, err := r.CreateService(job, settings)
require.Error(t, err) require.Error(t, err)
assert.Nil(t, result) assert.Nil(t, result)
require.Equal(t, aurora.ResponseCode_INVALID_REQUEST, resp.GetResponseCode())
} }
func TestRealisClient_ScheduleCronJob_Thermos(t *testing.T) { func TestRealisClient_ScheduleCronJob_Thermos(t *testing.T) {
@ -764,7 +767,7 @@ func TestRealisClient_SessionThreadSafety(t *testing.T) {
Disk(10). Disk(10).
InstanceCount(1000) // Impossible amount to go live in any sane machine InstanceCount(1000) // Impossible amount to go live in any sane machine
err := r.CreateJob(job) _, err := r.CreateJob(job)
assert.NoError(t, err) assert.NoError(t, err)
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
@ -859,7 +862,7 @@ func TestRealisClient_PartitionPolicy(t *testing.T) {
settings := realis.NewUpdateSettings() settings := realis.NewUpdateSettings()
settings.UpdateGroupSize = 2 settings.UpdateGroupSize = 2
result, err := r.CreateService(job, settings) _, result, err := r.CreateService(job, settings)
assert.NoError(t, err) assert.NoError(t, err)
var ok bool var ok bool
@ -922,7 +925,7 @@ func TestAuroraJob_UpdateSlaPolicy(t *testing.T) {
settings.UpdateGroupSize = 2 settings.UpdateGroupSize = 2
settings.MinWaitInInstanceRunningMs = 5 * 1000 settings.MinWaitInInstanceRunningMs = 5 * 1000
result, err := r.CreateService(job, settings) _, result, err := r.CreateService(job, settings)
require.NoError(t, err) require.NoError(t, err)
assert.NotNil(t, result) assert.NotNil(t, result)
@ -992,15 +995,17 @@ func TestRealisClient_UpdateStrategies(t *testing.T) {
for _, strategy := range strategies { for _, strategy := range strategies {
t.Run("TestRealisClient_UpdateStrategies_"+strategy.Name, func(t *testing.T) { t.Run("TestRealisClient_UpdateStrategies_"+strategy.Name, func(t *testing.T) {
job.Name("update_strategies_" + strategy.Name) job.Name("update_strategies_" + strategy.Name)
result, err := r.StartJobUpdate(strategy.UpdateJob, "") resp, err := r.StartJobUpdate(strategy.UpdateJob, "")
assert.NoError(t, err) assert.NoError(t, err)
assert.NotNil(t, result) assert.NotNil(t, resp)
assert.NotNil(t, result.GetKey()) assert.NotNil(t, resp.GetResult_())
assert.NotNil(t, resp.GetResult_().GetStartJobUpdateResult_())
assert.NotNil(t, resp.GetResult_().GetStartJobUpdateResult_().GetKey())
var ok bool var ok bool
var mErr error var mErr error
key := *result.GetKey() key := *resp.GetResult_().GetStartJobUpdateResult_().GetKey()
if ok, mErr = monitor.JobUpdate(key, 5, 240); !ok || mErr != nil { if ok, mErr = monitor.JobUpdate(key, 5, 240); !ok || mErr != nil {
// Update may already be in a terminal state so don't check for error // Update may already be in a terminal state so don't check for error
@ -1035,12 +1040,14 @@ func TestRealisClient_BatchAwareAutoPause(t *testing.T) {
InstanceCount(6). InstanceCount(6).
WatchTime(1000) WatchTime(1000)
result, err := r.StartJobUpdate(strategy, "") resp, err := r.StartJobUpdate(strategy, "")
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, result) require.NotNil(t, resp)
require.NotNil(t, result.GetKey()) require.NotNil(t, resp.GetResult_())
require.NotNil(t, resp.GetResult_().GetStartJobUpdateResult_())
require.NotNil(t, resp.GetResult_().GetStartJobUpdateResult_().GetKey())
key := *result.GetKey() key := *resp.GetResult_().GetStartJobUpdateResult_().GetKey()
for i := range updateGroups { for i := range updateGroups {
curStep, mErr := monitor.AutoPausedUpdateMonitor(key, time.Second*5, time.Second*240) curStep, mErr := monitor.AutoPausedUpdateMonitor(key, time.Second*5, time.Second*240)