From 6ee8f1454e769c6f2edaab024c0cbb4bc0d712b5 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Tue, 30 Jan 2018 16:20:02 -0800 Subject: [PATCH] Added a low cost thrift call (GetTasksWithoutConfigs) to the new client function. If this call fails, along with all the retries, then the configuration given is invalid. We do this to fail earlier than the first thrift call that the user makes. --- realis.go | 83 ++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 55 insertions(+), 28 deletions(-) diff --git a/realis.go b/realis.go index 03554aa..bb1ea5a 100644 --- a/realis.go +++ b/realis.go @@ -34,7 +34,7 @@ import ( "github.com/pkg/errors" ) -const VERSION = "1.1.0" +const VERSION = "1.2.1" type Realis interface { AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) @@ -280,13 +280,35 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { AddBasicAuth(config, config.username, config.password) } - return &realisClient{ + client := &realisClient{ config: config, client: aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory), readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory), adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory), - logger: config.logger}, nil + logger: config.logger} + // Verify that the connection is set up correctly and that the endpoint exists + // by making a low cost thrift call using the newly created client. If there is an error, return an empty client. + taskQ := &aurora.TaskQuery{ + Role: "test", + Environment: "test", + JobName: "test", + } + + retryErr := ExponentialBackoff(*config.backoff, func() (bool, error) { + resp, err := client.readonlyClient.GetTasksWithoutConfigs(taskQ) + if resp == nil || err != nil { + return false, NewTemporaryError(err) + } + return true, nil + }) + + // Unable to successfully make a call even after retrying, return an error + if retryErr != nil { + return nil, errors.Errorf("Unable to reach scheduler at %v", url) + } + + return client, err } func GetDefaultClusterFromZKUrl(zkurl string) *Cluster { @@ -436,7 +458,15 @@ func (r *realisClient) thriftCallHelper(auroraCall auroraThriftCall) (*aurora.Re // as we can always retry to connect to the scheduler. retryConnErr := r.ReestablishConn() - return resp, retryConnErr + // If we had a connection error, return that as the temporary error + // otherwise if we were able to recreate our connection objects without issue + // return a temporary error with the client error inside. + if retryConnErr != nil { + return nil, retryConnErr + } else { + return nil, NewTemporaryError(cliErr) + } + } if resp == nil { @@ -549,7 +579,7 @@ func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue return resp, nil } -func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsResult_, error) { +func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsResult_, error) { var resp *aurora.Response var result *aurora.GetJobsResult_ var clientErr error @@ -600,7 +630,7 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Kill command to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error sending Kill command to Aurora Scheduler") } return resp, nil } @@ -611,7 +641,7 @@ func (r *realisClient) RealisConfig() *RealisConfig { // Sends a kill message to the scheduler for all active tasks under a job. func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { - var clientErr, err error + var clientErr error var resp *aurora.Response retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { @@ -628,7 +658,7 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { }) if retryErr != nil { - return nil, errors.Wrap(err, retryErr.Error()+": Error sending Kill command to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error sending Kill command to Aurora Scheduler") } return resp, nil } @@ -654,7 +684,7 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Create command to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error sending Create command to Aurora Scheduler") } return resp, nil } @@ -695,7 +725,7 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Cron Job Schedule message to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error sending Cron Job Schedule message to Aurora Scheduler") } return resp, nil } @@ -718,15 +748,12 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Cron Job De-schedule message to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error sending Cron Job De-schedule message to Aurora Scheduler") } return resp, nil } - - - func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error) { var resp *aurora.Response var clientErr error @@ -744,7 +771,7 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Start Cron Job message to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error sending Start Cron Job message to Aurora Scheduler") } return resp, nil @@ -773,7 +800,7 @@ func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Restart command to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error sending Restart command to Aurora Scheduler") } return resp, nil } @@ -801,7 +828,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Restart command to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error sending Restart command to Aurora Scheduler") } return resp, nil @@ -829,7 +856,7 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending StartJobUpdate command to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error sending StartJobUpdate command to Aurora Scheduler") } return resp, nil } @@ -854,7 +881,7 @@ func (r *realisClient) AbortJobUpdate( }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending AbortJobUpdate command to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error sending AbortJobUpdate command to Aurora Scheduler") } return resp, nil } @@ -879,7 +906,7 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending AddInstances command to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error sending AddInstances command to Aurora Scheduler") } return resp, nil @@ -926,7 +953,7 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.S }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error querying Aurora Scheduler for task status") + return nil, errors.Wrap(retryErr, "Error querying Aurora Scheduler for task status") } return response.ScheduleStatusResult(resp).GetTasks(), nil @@ -950,7 +977,7 @@ func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks [] }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error querying Aurora Scheduler for task status without configs") + return nil, errors.Wrap(retryErr, "Error querying Aurora Scheduler for task status without configs") } return response.ScheduleStatusResult(resp).GetTasks(), nil @@ -987,7 +1014,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error querying Aurora Scheduler for task configuration") + return nil, errors.Wrap(retryErr, "Error querying Aurora Scheduler for task configuration") } tasks := response.ScheduleStatusResult(resp).GetTasks() @@ -1022,7 +1049,7 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Unable to get job update details") + return nil, errors.Wrap(retryErr, "Unable to get job update details") } return resp, nil @@ -1045,7 +1072,7 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Unable to roll back job update") + return nil, errors.Wrap(retryErr, "Unable to roll back job update") } return resp, nil } @@ -1086,7 +1113,7 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr } if retryErr != nil { - return resp, result, errors.Wrap(clientErr, retryErr.Error()+": Unable to recover connection") + return resp, result, errors.Wrap(retryErr, "Unable to recover connection") } return resp, result, nil @@ -1125,7 +1152,7 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror } if retryErr != nil { - return resp, result, errors.Wrap(clientErr, retryErr.Error()+": Unable to recover connection") + return resp, result, errors.Wrap(retryErr, "Unable to recover connection") } return resp, result, nil @@ -1169,7 +1196,7 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au } if retryErr != nil { - return resp, result, errors.Wrap(clientErr, retryErr.Error()+": Unable to recover connection") + return resp, result, errors.Wrap(retryErr, "Unable to recover connection") } return resp, result, nil