diff --git a/errors.go b/errors.go index 08600ee..be85828 100644 --- a/errors.go +++ b/errors.go @@ -16,6 +16,8 @@ package realis // Using a pattern described by Dave Cheney to differentiate errors // https://dave.cheney.net/2016/04/27/dont-just-check-errors-handle-them-gracefully + +// Timeout errors are returned when a function has unsuccessfully retried. type timeout interface { Timeout() bool } @@ -38,6 +40,7 @@ func NewTimeoutError(err error) *TimeoutErr { return &TimeoutErr{error: err, timeout: true} } +// Temporary errors indicate that the action may and should be retried. type temporary interface { Temporary() bool } @@ -60,8 +63,3 @@ func (t *TemporaryErr) Temporary() bool { func NewTemporaryError(err error) *TemporaryErr { return &TemporaryErr{error: err, temporary: true} } - -// Nothing can be done about this error -func NewPermamentError(err error) TemporaryErr { - return TemporaryErr{error: err, temporary: false} -} diff --git a/realis.go b/realis.go index b285d43..2ee4b0d 100644 --- a/realis.go +++ b/realis.go @@ -98,13 +98,6 @@ type RealisConfig struct { options []ClientOption } -type Backoff struct { - Duration time.Duration // the base duration - Factor float64 // Duration is multipled by factor each iteration - Jitter float64 // The amount of jitter applied each iteration - Steps int // Exit with error after this many steps -} - var defaultBackoff = Backoff{ Steps: 3, Duration: 10 * time.Second, @@ -422,49 +415,6 @@ func basicAuth(username, password string) string { return base64.StdEncoding.EncodeToString([]byte(auth)) } -type auroraThriftCall func() (resp *aurora.Response, err error) - -// Takes a Thrift API function call and returns response and error. -// If Error from the API call is retryable, the functions re-establishes the connection with Aurora by -// using the same configuration used by the original client. Locks usage of and changes to client connection in order -// to make realis sessions thread safe. -func (r *realisClient) thriftCallHelper(auroraCall auroraThriftCall) (*aurora.Response, error) { - // Only allow one go-routine make use or modify the thrift client connection - r.lock.Lock() - defer r.lock.Unlock() - resp, cliErr := auroraCall() - - if cliErr != nil { - // Re-establish conn returns a temporary error or nil - // as we can always retry to connect to the scheduler. - retryConnErr := r.ReestablishConn() - - // If we had a connection error, return that as the temporary error - // otherwise if we were able to recreate our connection objects without issue - // return a temporary error with the client error inside. - if retryConnErr != nil { - return nil, retryConnErr - } else { - return nil, NewTemporaryError(cliErr) - } - - } - - if resp == nil { - return nil, errors.New("Response is nil") - } - - if resp.GetResponseCode() == aurora.ResponseCode_ERROR_TRANSIENT { - return resp, NewTemporaryError(errors.New("Aurora scheduler temporarily unavailable")) - } - - if resp.GetResponseCode() != aurora.ResponseCode_OK { - return nil, errors.New(response.CombineMessage(resp)) - } - - return resp, nil -} - func (r *realisClient) ReestablishConn() error { // Close existing connection r.logger.Println("Re-establishing Connection to Aurora") @@ -506,25 +456,13 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche Statuses: states, } - var resp *aurora.Response - var clientErr error - retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - - resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { - return r.client.GetTasksWithoutConfigs(taskQ) - }) - - // Pass error directly to backoff which makes exceptions for temporary errors - if clientErr != nil { - return false, clientErr - } - - return true, nil + resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + return r.client.GetTasksWithoutConfigs(taskQ) }) // If we encountered an error we couldn't recover from by retrying, return an error to the user if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error querying Aurora Scheduler for active IDs") + return nil, errors.Wrap(retryErr, "Error querying Aurora Scheduler for active IDs") } // Construct instance id map to stay in line with thrift's representation of sets @@ -538,51 +476,31 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche } func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error) { - var resp *aurora.Response - var clientErr error - - retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { - return r.readonlyClient.GetJobUpdateSummaries(jobUpdateQuery) - }) - - if clientErr != nil { - return false, clientErr - } - - return true, nil + resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + return r.readonlyClient.GetJobUpdateSummaries(jobUpdateQuery) }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error getting job update summaries from Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error getting job update summaries from Aurora Scheduler") } return resp, nil } func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsResult_, error) { - - var resp *aurora.Response + var result *aurora.GetJobsResult_ - var clientErr error - retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { - return r.readonlyClient.GetJobs(role) - }) - - if clientErr != nil { - return false, clientErr - } - return true, nil + resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + return r.readonlyClient.GetJobs(role) }) - if resp != nil && resp.GetResult_() != nil { + if resp.GetResult_() != nil { result = resp.GetResult_().GetJobsResult_ } if retryErr != nil { - return nil, result, errors.Wrap(clientErr, retryErr.Error()+": Error getting Jobs from Aurora Scheduler") + return nil, result, errors.Wrap(retryErr, "Error getting Jobs from Aurora Scheduler") } return resp, result, nil @@ -592,23 +510,13 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) { instanceIds := make(map[int32]bool) - var resp *aurora.Response - var clientErr error for _, instId := range instances { instanceIds[instId] = true } - retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { - return r.client.KillTasks(key, instanceIds, "") - }) - - if clientErr != nil { - return false, clientErr - } - - return true, nil + resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + return r.client.KillTasks(key, instanceIds, "") }) if retryErr != nil { @@ -623,20 +531,10 @@ func (r *realisClient) RealisConfig() *RealisConfig { // Sends a kill message to the scheduler for all active tasks under a job. func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { - var clientErr error - var resp *aurora.Response - retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { - // Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards - return r.client.KillTasks(key, nil, "") - }) - - if clientErr != nil { - return false, clientErr - } - - return true, nil + resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + // Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards + return r.client.KillTasks(key, nil, "") }) if retryErr != nil { @@ -650,19 +548,9 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { // as that API uses the update thrift call which has a few extra features available. // Use this API to create ad-hoc jobs. func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { - var resp *aurora.Response - var clientErr error - retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { - return r.client.CreateJob(auroraJob.JobConfig()) - }) - - if clientErr != nil { - return false, clientErr - } - - return true, nil + resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + return r.client.CreateJob(auroraJob.JobConfig()) }) if retryErr != nil { @@ -691,19 +579,9 @@ func (r *realisClient) CreateService(auroraJob Job, settings *aurora.JobUpdateSe } func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) { - var resp *aurora.Response - var clientErr error - retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { - return r.client.ScheduleCronJob(auroraJob.JobConfig()) - }) - - if clientErr != nil { - return false, clientErr - } - - return true, nil + resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + return r.client.ScheduleCronJob(auroraJob.JobConfig()) }) if retryErr != nil { @@ -714,19 +592,8 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) { - var resp *aurora.Response - var clientErr error - - retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { - return r.client.DescheduleCronJob(key) - }) - - if clientErr != nil { - return false, clientErr - } - - return true, nil + resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + return r.client.DescheduleCronJob(key) }) if retryErr != nil { @@ -738,19 +605,9 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, } func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error) { - var resp *aurora.Response - var clientErr error - retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { - return r.client.StartCronJob(key) - }) - - if clientErr != nil { - return false, clientErr - } - - return true, nil + resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + return r.client.StartCronJob(key) }) if retryErr != nil { @@ -767,19 +624,9 @@ func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) for _, instId := range instances { instanceIds[instId] = true } - var resp *aurora.Response - var clientErr error - retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { - return r.client.RestartShards(key, instanceIds) - }) - - if clientErr != nil { - return false, clientErr - } - - return true, nil + resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + return r.client.RestartShards(key, instanceIds) }) if retryErr != nil { @@ -795,19 +642,10 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) if err1 != nil { return nil, errors.Wrap(err1, "Could not retrieve relevant task instance IDs") } - var resp *aurora.Response - var clientErr error + if len(instanceIds) > 0 { - retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { - return r.client.RestartShards(key, instanceIds) - }) - - if clientErr != nil { - return false, clientErr - } - - return true, nil + resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + return r.client.RestartShards(key, instanceIds) }) if retryErr != nil { @@ -823,19 +661,8 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) // Update all tasks under a job configuration. Currently gorealis doesn't support for canary deployments. func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) { - var resp *aurora.Response - var clientErr error - - retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { - return r.client.StartJobUpdate(updateJob.req, message) - }) - - if clientErr != nil { - return false, clientErr - } - - return true, nil + resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + return r.client.StartJobUpdate(updateJob.req, message) }) if retryErr != nil { @@ -845,22 +672,10 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au } // Abort Job Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI. -func (r *realisClient) AbortJobUpdate( - updateKey aurora.JobUpdateKey, - message string) (*aurora.Response, error) { - var resp *aurora.Response - var clientErr error +func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) { - retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { - return r.client.AbortJobUpdate(&updateKey, message) - }) - - if clientErr != nil { - return false, clientErr - } - - return true, nil + resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + return r.client.AbortJobUpdate(&updateKey, message) }) if retryErr != nil { @@ -871,21 +686,13 @@ func (r *realisClient) AbortJobUpdate( //Pause Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) { - var resp *aurora.Response - var clientErr error - retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { - return r.client.PauseJobUpdate(updateKey, message) - }) - if clientErr != nil { - return false, clientErr - } - - return true, nil + resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + return r.client.PauseJobUpdate(updateKey, message) }) + if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending PauseJobUpdate command to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error sending PauseJobUpdate command to Aurora Scheduler") } return resp, nil @@ -893,19 +700,11 @@ func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st //Resume Paused Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) { - var resp *aurora.Response - var clientErr error - retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { - return r.client.ResumeJobUpdate(updateKey, message) - }) - if clientErr != nil { - return false, clientErr - } - - return true, nil + resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + return r.client.ResumeJobUpdate(updateKey, message) }) + if retryErr != nil { return nil, errors.Wrap(retryErr, "Error sending ResumeJobUpdate command to Aurora Scheduler") } @@ -916,19 +715,10 @@ func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s //Pulse Job Update on Aurora. UpdateID is returned from StartJobUpdate or the Aurora web UI. func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.Response, error) { - var resp *aurora.Response - var clientErr error - retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { - return r.client.PulseJobUpdate(updateKey) - }) - - if clientErr != nil { - return false, clientErr - } - - return true, nil + resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + return r.client.PulseJobUpdate(updateKey) }) + if retryErr != nil { return nil, errors.Wrap(retryErr, "Error sending PulseJobUpdate command to Aurora Scheduler") } @@ -940,19 +730,8 @@ func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R // instance to scale up. func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) { - var resp *aurora.Response - var clientErr error - - retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { - return r.client.AddInstances(&instKey, count) - }) - - if clientErr != nil { - return false, clientErr - } - - return true, nil + resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + return r.client.AddInstances(&instKey, count) }) if retryErr != nil { @@ -987,19 +766,8 @@ func (r *realisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora // Get information about task including a fully hydrated task configuration object func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) { - var resp *aurora.Response - var clientErr error - - retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { - return r.client.GetTasksStatus(query) - }) - - if clientErr != nil { - return false, clientErr - } - - return true, nil + resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + return r.client.GetTasksStatus(query) }) if retryErr != nil { @@ -1011,19 +779,9 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.S // Get information about task including without a task configuration object func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) { - var resp *aurora.Response - var clientErr error - retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { - return r.client.GetTasksWithoutConfigs(query) - }) - - if clientErr != nil { - return false, clientErr - } - - return true, nil + resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + return r.client.GetTasksWithoutConfigs(query) }) if retryErr != nil { @@ -1048,19 +806,8 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task Statuses: aurora.ACTIVE_STATES, } - var resp *aurora.Response - var clientErr error - - retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { - return r.client.GetTasksStatus(taskQ) - }) - - if clientErr != nil { - return false, clientErr - } - - return true, nil + resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + return r.client.GetTasksStatus(taskQ) }) if retryErr != nil { @@ -1083,19 +830,8 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aurora.Response, error) { - var resp *aurora.Response - var clientErr error - - retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { - return r.client.GetJobUpdateDetails(&updateQuery) - }) - - if clientErr != nil { - return false, clientErr - } - - return true, nil + resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + return r.client.GetJobUpdateDetails(&updateQuery) }) if retryErr != nil { @@ -1106,19 +842,9 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur } func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) { - var resp *aurora.Response - var clientErr error - retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { - return r.client.RollbackJobUpdate(&key, message) - }) - - if clientErr != nil { - return false, clientErr - } - - return true, nil + resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + return r.client.RollbackJobUpdate(&key, message) }) if retryErr != nil { @@ -1132,9 +858,7 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string // to return to running unless there is enough capacity in the cluster to run them. func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.DrainHostsResult_, error) { - var resp *aurora.Response var result *aurora.DrainHostsResult_ - var clientErr error if len(hosts) == 0 { return nil, nil, errors.New("no hosts provided to drain") @@ -1146,16 +870,8 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr drainList.HostNames[host] = true } - retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { - return r.adminClient.DrainHosts(drainList) - }) - - if clientErr != nil { - return false, clientErr - } - - return true, nil + resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + return r.adminClient.DrainHosts(drainList) }) if resp != nil && resp.GetResult_() != nil { @@ -1171,9 +887,7 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *aurora.EndMaintenanceResult_, error) { - var resp *aurora.Response var result *aurora.EndMaintenanceResult_ - var clientErr error if len(hosts) == 0 { return nil, nil, errors.New("no hosts provided to end maintenance on") @@ -1185,19 +899,11 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror hostList.HostNames[host] = true } - retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { - return r.adminClient.EndMaintenance(hostList) - }) - - if clientErr != nil { - return false, clientErr - } - - return true, nil + resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + return r.adminClient.EndMaintenance(hostList) }) - if resp != nil && resp.GetResult_() != nil { + if resp.GetResult_() != nil { result = resp.GetResult_().GetEndMaintenanceResult_() } @@ -1210,9 +916,7 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *aurora.MaintenanceStatusResult_, error) { - var resp *aurora.Response var result *aurora.MaintenanceStatusResult_ - var clientErr error if len(hosts) == 0 { return nil, nil, errors.New("no hosts provided to get maintenance status from") @@ -1224,24 +928,13 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au hostList.HostNames[host] = true } - retryErr := ExponentialBackoff(defaultBackoff, func() (bool, error) { - - // Make thrift call. If we encounter an error sending the call, attempt to reconnect - // and continue trying to resend command until we run out of retries. - resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { - return r.adminClient.MaintenanceStatus(hostList) - }) - - if clientErr != nil { - return false, clientErr - } - - // Successful call - return true, nil - + // Make thrift call. If we encounter an error sending the call, attempt to reconnect + // and continue trying to resend command until we run out of retries. + resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + return r.adminClient.MaintenanceStatus(hostList) }) - if resp != nil && resp.GetResult_() != nil { + if resp.GetResult_() != nil { result = resp.GetResult_().GetMaintenanceStatusResult_() } diff --git a/retry.go b/retry.go index 073242b..f2299c3 100644 --- a/retry.go +++ b/retry.go @@ -21,9 +21,18 @@ import ( "math/rand" + "github.com/paypal/gorealis/gen-go/apache/aurora" + "github.com/paypal/gorealis/response" "github.com/pkg/errors" ) +type Backoff struct { + Duration time.Duration // the base duration + Factor float64 // Duration is multipled by factor each iteration + Jitter float64 // The amount of jitter applied each iteration + Steps int // Exit with error after this many steps +} + // Jitter returns a time.Duration between duration and duration + maxFactor * // duration. // @@ -89,3 +98,88 @@ func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error { return NewTimeoutError(errors.New("Timed out while retrying")) } } + +type auroraThriftCall func() (resp *aurora.Response, err error) + +// Duplicates the functionality of ExponentialBackoff but is specifically targeted towards ThriftCalls. +func (r *realisClient) ThriftCallWithRetries(thriftCall auroraThriftCall) (*aurora.Response, error) { + var resp *aurora.Response + var clientErr error + + backoff := r.config.backoff + duration := backoff.Duration + + for i := 0; i < backoff.Steps; i++ { + + // If this isn't our first try, backoff before the next try. + if i != 0 { + adjusted := duration + if backoff.Jitter > 0.0 { + adjusted = Jitter(duration, backoff.Jitter) + } + + r.logger.Printf("An error occurred during thrift call, backing off for %v before retrying\n", adjusted) + + time.Sleep(adjusted) + duration = time.Duration(float64(duration) * backoff.Factor) + } + + // Only allow one go-routine make use or modify the thrift client connection. + // Placing this in an anonymous function in order to create a new, short-lived stack allowing unlock + // to be run in case of a panic inside of thriftCall. + func() { + r.lock.Lock() + defer r.lock.Unlock() + resp, clientErr = thriftCall() + }() + + // Check if our thrift call is returning an error. This is a retriable event as we don't know + // if it was caused by network issues. + if clientErr != nil { + r.ReestablishConn() + + // In the future, reestablish connection should be able to check if it is actually possible + // to make a thrift call to Aurora. For now, a reconnect should always lead to a retry. + continue + } + + // If there was no client error, but the response is nil, something went wrong. + // Ideally, we'll never encounter this but we're placing a safeguard here. + if resp == nil { + return nil, errors.New("Response from aurora is nil") + } + + // Check Response Code from thrift and make a decision to continue retrying or not. + switch responseCode := resp.GetResponseCode(); responseCode { + + // If the thrift call succeeded, stop retrying + case aurora.ResponseCode_OK: + return resp, nil + + // If the response code is transient, continue retrying + case aurora.ResponseCode_ERROR_TRANSIENT: + r.logger.Println("Aurora replied with Transient error code, retrying") + continue + + // Failure scenarios, these indicate a bad payload or a bad config. Stop retrying. + case aurora.ResponseCode_INVALID_REQUEST: + case aurora.ResponseCode_ERROR: + case aurora.ResponseCode_AUTH_FAILED: + case aurora.ResponseCode_JOB_UPDATING_ERROR: + return nil, errors.New(response.CombineMessage(resp)) + + // The only case that should fall down to here is a WARNING response code. + // It is currently not used as a response in the scheduler so it is unknown how to handle it. + default: + return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String()) + } + + } + + // Provide more information to the user wherever possible. + if clientErr != nil { + return nil, NewTimeoutError(errors.Wrap(clientErr, "Timed out while retrying, including latest error")) + } else { + return nil, NewTimeoutError(errors.New("Timed out while retrying")) + } +} diff --git a/zk.go b/zk.go index 195cb3a..f5f55ff 100644 --- a/zk.go +++ b/zk.go @@ -93,11 +93,12 @@ func LeaderFromZK(cluster Cluster) (string, error) { } } - return false, errors.New("No leader found") + // Leader data might not be available yet, try to fetch again. + return false, NewTemporaryError(errors.New("No leader found")) }) if retryErr != nil { - return "", errors.Wrapf(retryErr, "Failed to determine leader after %v attempts", defaultBackoff.Steps) + return "", NewTimeoutError(errors.Wrapf(retryErr, "Failed to determine leader after %v attempts", defaultBackoff.Steps)) } return zkurl, nil