diff --git a/realis.go b/realis.go index 03554aa..bb1ea5a 100644 --- a/realis.go +++ b/realis.go @@ -34,7 +34,7 @@ import ( "github.com/pkg/errors" ) -const VERSION = "1.1.0" +const VERSION = "1.2.1" type Realis interface { AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) @@ -280,13 +280,35 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { AddBasicAuth(config, config.username, config.password) } - return &realisClient{ + client := &realisClient{ config: config, client: aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory), readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory), adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory), - logger: config.logger}, nil + logger: config.logger} + // Verify that the connection is set up correctly and that the endpoint exists + // by making a low cost thrift call using the newly created client. If there is an error, return an empty client. + taskQ := &aurora.TaskQuery{ + Role: "test", + Environment: "test", + JobName: "test", + } + + retryErr := ExponentialBackoff(*config.backoff, func() (bool, error) { + resp, err := client.readonlyClient.GetTasksWithoutConfigs(taskQ) + if resp == nil || err != nil { + return false, NewTemporaryError(err) + } + return true, nil + }) + + // Unable to successfully make a call even after retrying, return an error + if retryErr != nil { + return nil, errors.Errorf("Unable to reach scheduler at %v", url) + } + + return client, err } func GetDefaultClusterFromZKUrl(zkurl string) *Cluster { @@ -436,7 +458,15 @@ func (r *realisClient) thriftCallHelper(auroraCall auroraThriftCall) (*aurora.Re // as we can always retry to connect to the scheduler. retryConnErr := r.ReestablishConn() - return resp, retryConnErr + // If we had a connection error, return that as the temporary error + // otherwise if we were able to recreate our connection objects without issue + // return a temporary error with the client error inside. + if retryConnErr != nil { + return nil, retryConnErr + } else { + return nil, NewTemporaryError(cliErr) + } + } if resp == nil { @@ -549,7 +579,7 @@ func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue return resp, nil } -func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsResult_, error) { +func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsResult_, error) { var resp *aurora.Response var result *aurora.GetJobsResult_ var clientErr error @@ -600,7 +630,7 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Kill command to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error sending Kill command to Aurora Scheduler") } return resp, nil } @@ -611,7 +641,7 @@ func (r *realisClient) RealisConfig() *RealisConfig { // Sends a kill message to the scheduler for all active tasks under a job. func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { - var clientErr, err error + var clientErr error var resp *aurora.Response retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { @@ -628,7 +658,7 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { }) if retryErr != nil { - return nil, errors.Wrap(err, retryErr.Error()+": Error sending Kill command to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error sending Kill command to Aurora Scheduler") } return resp, nil } @@ -654,7 +684,7 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Create command to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error sending Create command to Aurora Scheduler") } return resp, nil } @@ -695,7 +725,7 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Cron Job Schedule message to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error sending Cron Job Schedule message to Aurora Scheduler") } return resp, nil } @@ -718,15 +748,12 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Cron Job De-schedule message to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error sending Cron Job De-schedule message to Aurora Scheduler") } return resp, nil } - - - func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error) { var resp *aurora.Response var clientErr error @@ -744,7 +771,7 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Start Cron Job message to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error sending Start Cron Job message to Aurora Scheduler") } return resp, nil @@ -773,7 +800,7 @@ func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Restart command to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error sending Restart command to Aurora Scheduler") } return resp, nil } @@ -801,7 +828,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Restart command to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error sending Restart command to Aurora Scheduler") } return resp, nil @@ -829,7 +856,7 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending StartJobUpdate command to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error sending StartJobUpdate command to Aurora Scheduler") } return resp, nil } @@ -854,7 +881,7 @@ func (r *realisClient) AbortJobUpdate( }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending AbortJobUpdate command to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error sending AbortJobUpdate command to Aurora Scheduler") } return resp, nil } @@ -879,7 +906,7 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending AddInstances command to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error sending AddInstances command to Aurora Scheduler") } return resp, nil @@ -926,7 +953,7 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.S }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error querying Aurora Scheduler for task status") + return nil, errors.Wrap(retryErr, "Error querying Aurora Scheduler for task status") } return response.ScheduleStatusResult(resp).GetTasks(), nil @@ -950,7 +977,7 @@ func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks [] }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error querying Aurora Scheduler for task status without configs") + return nil, errors.Wrap(retryErr, "Error querying Aurora Scheduler for task status without configs") } return response.ScheduleStatusResult(resp).GetTasks(), nil @@ -987,7 +1014,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error querying Aurora Scheduler for task configuration") + return nil, errors.Wrap(retryErr, "Error querying Aurora Scheduler for task configuration") } tasks := response.ScheduleStatusResult(resp).GetTasks() @@ -1022,7 +1049,7 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Unable to get job update details") + return nil, errors.Wrap(retryErr, "Unable to get job update details") } return resp, nil @@ -1045,7 +1072,7 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Unable to roll back job update") + return nil, errors.Wrap(retryErr, "Unable to roll back job update") } return resp, nil } @@ -1086,7 +1113,7 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr } if retryErr != nil { - return resp, result, errors.Wrap(clientErr, retryErr.Error()+": Unable to recover connection") + return resp, result, errors.Wrap(retryErr, "Unable to recover connection") } return resp, result, nil @@ -1125,7 +1152,7 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror } if retryErr != nil { - return resp, result, errors.Wrap(clientErr, retryErr.Error()+": Unable to recover connection") + return resp, result, errors.Wrap(retryErr, "Unable to recover connection") } return resp, result, nil @@ -1169,7 +1196,7 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au } if retryErr != nil { - return resp, result, errors.Wrap(clientErr, retryErr.Error()+": Unable to recover connection") + return resp, result, errors.Wrap(retryErr, "Unable to recover connection") } return resp, result, nil