From 64948c3712795a8ae32f26eea510b8c52e2bb57f Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Tue, 6 Feb 2018 12:44:27 -0800 Subject: [PATCH] Backoff mechanism fix (#54) * Fixing logic that can lead to nil error being returned and retry stopping early. * Fixing possible code path that may lead to an incorrect nil error. --- realis.go | 61 +++++++++++++++++++++++++--------------------- realis_e2e_test.go | 26 ++++++++++++++++++++ retry.go | 15 +++++++++--- 3 files changed, 71 insertions(+), 31 deletions(-) diff --git a/realis.go b/realis.go index 5a3b202..b285d43 100644 --- a/realis.go +++ b/realis.go @@ -34,7 +34,7 @@ import ( "github.com/pkg/errors" ) -const VERSION = "1.1.0" +const VERSION = "1.2.1" type Realis interface { AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) @@ -290,7 +290,6 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory), adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory), logger: config.logger}, nil - } func GetDefaultClusterFromZKUrl(zkurl string) *Cluster { @@ -440,7 +439,15 @@ func (r *realisClient) thriftCallHelper(auroraCall auroraThriftCall) (*aurora.Re // as we can always retry to connect to the scheduler. retryConnErr := r.ReestablishConn() - return resp, retryConnErr + // If we had a connection error, return that as the temporary error + // otherwise if we were able to recreate our connection objects without issue + // return a temporary error with the client error inside. + if retryConnErr != nil { + return nil, retryConnErr + } else { + return nil, NewTemporaryError(cliErr) + } + } if resp == nil { @@ -553,8 +560,8 @@ func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue return resp, nil } - -func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsResult_, error) { +func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsResult_, error) { + var resp *aurora.Response var result *aurora.GetJobsResult_ var clientErr error @@ -605,7 +612,7 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Kill command to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error sending Kill command to Aurora Scheduler") } return resp, nil } @@ -616,7 +623,7 @@ func (r *realisClient) RealisConfig() *RealisConfig { // Sends a kill message to the scheduler for all active tasks under a job. func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { - var clientErr, err error + var clientErr error var resp *aurora.Response retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { @@ -633,7 +640,7 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { }) if retryErr != nil { - return nil, errors.Wrap(err, retryErr.Error()+": Error sending Kill command to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error sending Kill command to Aurora Scheduler") } return resp, nil } @@ -659,7 +666,7 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Create command to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error sending Create command to Aurora Scheduler") } return resp, nil } @@ -700,7 +707,7 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending Cron Job Schedule message to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error sending Cron Job Schedule message to Aurora Scheduler") } return resp, nil } @@ -723,15 +730,13 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending Cron Job De-schedule message to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error sending Cron Job De-schedule message to Aurora Scheduler") + } return resp, nil } - - - func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error) { var resp *aurora.Response var clientErr error @@ -749,7 +754,7 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Start Cron Job message to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error sending Start Cron Job message to Aurora Scheduler") } return resp, nil @@ -778,7 +783,7 @@ func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Restart command to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error sending Restart command to Aurora Scheduler") } return resp, nil } @@ -806,7 +811,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Restart command to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error sending Restart command to Aurora Scheduler") } return resp, nil @@ -834,7 +839,7 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending StartJobUpdate command to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error sending StartJobUpdate command to Aurora Scheduler") } return resp, nil } @@ -859,7 +864,7 @@ func (r *realisClient) AbortJobUpdate( }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending AbortJobUpdate command to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error sending AbortJobUpdate command to Aurora Scheduler") } return resp, nil } @@ -951,7 +956,7 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending AddInstances command to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error sending AddInstances command to Aurora Scheduler") } return resp, nil @@ -998,7 +1003,7 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.S }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error querying Aurora Scheduler for task status") + return nil, errors.Wrap(retryErr, "Error querying Aurora Scheduler for task status") } return response.ScheduleStatusResult(resp).GetTasks(), nil @@ -1022,7 +1027,7 @@ func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks [] }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error querying Aurora Scheduler for task status without configs") + return nil, errors.Wrap(retryErr, "Error querying Aurora Scheduler for task status without configs") } return response.ScheduleStatusResult(resp).GetTasks(), nil @@ -1059,7 +1064,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error querying Aurora Scheduler for task configuration") + return nil, errors.Wrap(retryErr, "Error querying Aurora Scheduler for task configuration") } tasks := response.ScheduleStatusResult(resp).GetTasks() @@ -1094,7 +1099,7 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Unable to get job update details") + return nil, errors.Wrap(retryErr, "Unable to get job update details") } return resp, nil @@ -1117,7 +1122,7 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Unable to roll back job update") + return nil, errors.Wrap(retryErr, "Unable to roll back job update") } return resp, nil } @@ -1158,7 +1163,7 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr } if retryErr != nil { - return resp, result, errors.Wrap(clientErr, retryErr.Error()+": Unable to recover connection") + return resp, result, errors.Wrap(retryErr, "Unable to recover connection") } return resp, result, nil @@ -1197,7 +1202,7 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror } if retryErr != nil { - return resp, result, errors.Wrap(clientErr, retryErr.Error()+": Unable to recover connection") + return resp, result, errors.Wrap(retryErr, "Unable to recover connection") } return resp, result, nil @@ -1241,7 +1246,7 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au } if retryErr != nil { - return resp, result, errors.Wrap(clientErr, retryErr.Error()+": Unable to recover connection") + return resp, result, errors.Wrap(retryErr, "Unable to recover connection") } return resp, result, nil diff --git a/realis_e2e_test.go b/realis_e2e_test.go index 03720a2..0467a8b 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -59,6 +59,32 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } +func TestBadEndpoint(t *testing.T) { + + // Attempt to connect to a bad endpoint + r, err := realis.NewRealisClient(realis.SchedulerUrl("http://192.168.33.7:8081/scheduler/"), + realis.TimeoutMS(200), + realis.BackOff(&realis.Backoff{ // Reduce penalties for this test to make it quick + Steps: 5, + Duration: 1 * time.Second, + Factor: 1.0, + Jitter: 0.1}), + ) + defer r.Close() + + taskQ := &aurora.TaskQuery{ + Role: "no", + Environment: "task", + JobName: "here", + } + + _, err = r.GetTasksWithoutConfigs(taskQ) + + // Check that we do error out of retrying + assert.Error(t, err) + +} + func TestLeaderFromZK(t *testing.T) { cluster := realis.GetDefaultClusterFromZKUrl("192.168.33.7:2181") url, err := realis.LeaderFromZK(*cluster) diff --git a/retry.go b/retry.go index dd7b113..073242b 100644 --- a/retry.go +++ b/retry.go @@ -17,10 +17,11 @@ limitations under the License. package realis import ( - "errors" "time" "math/rand" + + "github.com/pkg/errors" ) // Jitter returns a time.Duration between duration and duration + maxFactor * @@ -52,6 +53,8 @@ type ConditionFunc func() (done bool, err error) // If the condition never returns true, ErrWaitTimeout is returned. All other // errors terminate immediately. func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error { + var err error + var ok bool duration := backoff.Duration for i := 0; i < backoff.Steps; i++ { if i != 0 { @@ -63,7 +66,7 @@ func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error { duration = time.Duration(float64(duration) * backoff.Factor) } - ok, err := condition() + ok, err = condition() // If the function executed says it succeeded, stop retrying if ok { @@ -78,5 +81,11 @@ func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error { } } - return NewTimeoutError(errors.New("Timed out while retrying")) + + // Provide more information to the user wherever possible + if err != nil { + return NewTimeoutError(errors.Wrap(err, "Timed out while retrying")) + } else { + return NewTimeoutError(errors.New("Timed out while retrying")) + } }