Simplifying retry mechanism for Thrift Calls (#56)
* Deleting permament error as it doesn't make sense. Just return a plain old error and that will be considered permanent. * Removing double closure at as it's unmaintainable and can be error prone. Separated back offs into a generic one and a thrift call specific one. * ZK leader finder now returns a temporary error instead of constantly no leader found and quitting. It could be that the leader info is being propagated so it's worth trying another time. * Adding more logging to the retry. * Wrapping lock and unlock in an anonymous function so that we can use defer on unlock such that it is called in the case of a panic.
This commit is contained in:
parent
64948c3712
commit
a43dc81ea8
4 changed files with 166 additions and 380 deletions
|
@ -16,6 +16,8 @@ package realis
|
||||||
|
|
||||||
// Using a pattern described by Dave Cheney to differentiate errors
|
// Using a pattern described by Dave Cheney to differentiate errors
|
||||||
// https://dave.cheney.net/2016/04/27/dont-just-check-errors-handle-them-gracefully
|
// https://dave.cheney.net/2016/04/27/dont-just-check-errors-handle-them-gracefully
|
||||||
|
|
||||||
|
// Timeout errors are returned when a function has unsuccessfully retried.
|
||||||
type timeout interface {
|
type timeout interface {
|
||||||
Timeout() bool
|
Timeout() bool
|
||||||
}
|
}
|
||||||
|
@ -38,6 +40,7 @@ func NewTimeoutError(err error) *TimeoutErr {
|
||||||
return &TimeoutErr{error: err, timeout: true}
|
return &TimeoutErr{error: err, timeout: true}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Temporary errors indicate that the action may and should be retried.
|
||||||
type temporary interface {
|
type temporary interface {
|
||||||
Temporary() bool
|
Temporary() bool
|
||||||
}
|
}
|
||||||
|
@ -60,8 +63,3 @@ func (t *TemporaryErr) Temporary() bool {
|
||||||
func NewTemporaryError(err error) *TemporaryErr {
|
func NewTemporaryError(err error) *TemporaryErr {
|
||||||
return &TemporaryErr{error: err, temporary: true}
|
return &TemporaryErr{error: err, temporary: true}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Nothing can be done about this error
|
|
||||||
func NewPermamentError(err error) TemporaryErr {
|
|
||||||
return TemporaryErr{error: err, temporary: false}
|
|
||||||
}
|
|
||||||
|
|
439
realis.go
439
realis.go
|
@ -98,13 +98,6 @@ type RealisConfig struct {
|
||||||
options []ClientOption
|
options []ClientOption
|
||||||
}
|
}
|
||||||
|
|
||||||
type Backoff struct {
|
|
||||||
Duration time.Duration // the base duration
|
|
||||||
Factor float64 // Duration is multipled by factor each iteration
|
|
||||||
Jitter float64 // The amount of jitter applied each iteration
|
|
||||||
Steps int // Exit with error after this many steps
|
|
||||||
}
|
|
||||||
|
|
||||||
var defaultBackoff = Backoff{
|
var defaultBackoff = Backoff{
|
||||||
Steps: 3,
|
Steps: 3,
|
||||||
Duration: 10 * time.Second,
|
Duration: 10 * time.Second,
|
||||||
|
@ -422,49 +415,6 @@ func basicAuth(username, password string) string {
|
||||||
return base64.StdEncoding.EncodeToString([]byte(auth))
|
return base64.StdEncoding.EncodeToString([]byte(auth))
|
||||||
}
|
}
|
||||||
|
|
||||||
type auroraThriftCall func() (resp *aurora.Response, err error)
|
|
||||||
|
|
||||||
// Takes a Thrift API function call and returns response and error.
|
|
||||||
// If Error from the API call is retryable, the functions re-establishes the connection with Aurora by
|
|
||||||
// using the same configuration used by the original client. Locks usage of and changes to client connection in order
|
|
||||||
// to make realis sessions thread safe.
|
|
||||||
func (r *realisClient) thriftCallHelper(auroraCall auroraThriftCall) (*aurora.Response, error) {
|
|
||||||
// Only allow one go-routine make use or modify the thrift client connection
|
|
||||||
r.lock.Lock()
|
|
||||||
defer r.lock.Unlock()
|
|
||||||
resp, cliErr := auroraCall()
|
|
||||||
|
|
||||||
if cliErr != nil {
|
|
||||||
// Re-establish conn returns a temporary error or nil
|
|
||||||
// as we can always retry to connect to the scheduler.
|
|
||||||
retryConnErr := r.ReestablishConn()
|
|
||||||
|
|
||||||
// If we had a connection error, return that as the temporary error
|
|
||||||
// otherwise if we were able to recreate our connection objects without issue
|
|
||||||
// return a temporary error with the client error inside.
|
|
||||||
if retryConnErr != nil {
|
|
||||||
return nil, retryConnErr
|
|
||||||
} else {
|
|
||||||
return nil, NewTemporaryError(cliErr)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
if resp == nil {
|
|
||||||
return nil, errors.New("Response is nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
if resp.GetResponseCode() == aurora.ResponseCode_ERROR_TRANSIENT {
|
|
||||||
return resp, NewTemporaryError(errors.New("Aurora scheduler temporarily unavailable"))
|
|
||||||
}
|
|
||||||
|
|
||||||
if resp.GetResponseCode() != aurora.ResponseCode_OK {
|
|
||||||
return nil, errors.New(response.CombineMessage(resp))
|
|
||||||
}
|
|
||||||
|
|
||||||
return resp, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *realisClient) ReestablishConn() error {
|
func (r *realisClient) ReestablishConn() error {
|
||||||
// Close existing connection
|
// Close existing connection
|
||||||
r.logger.Println("Re-establishing Connection to Aurora")
|
r.logger.Println("Re-establishing Connection to Aurora")
|
||||||
|
@ -506,25 +456,13 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche
|
||||||
Statuses: states,
|
Statuses: states,
|
||||||
}
|
}
|
||||||
|
|
||||||
var resp *aurora.Response
|
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
var clientErr error
|
return r.client.GetTasksWithoutConfigs(taskQ)
|
||||||
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
|
|
||||||
|
|
||||||
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
|
|
||||||
return r.client.GetTasksWithoutConfigs(taskQ)
|
|
||||||
})
|
|
||||||
|
|
||||||
// Pass error directly to backoff which makes exceptions for temporary errors
|
|
||||||
if clientErr != nil {
|
|
||||||
return false, clientErr
|
|
||||||
}
|
|
||||||
|
|
||||||
return true, nil
|
|
||||||
})
|
})
|
||||||
|
|
||||||
// If we encountered an error we couldn't recover from by retrying, return an error to the user
|
// If we encountered an error we couldn't recover from by retrying, return an error to the user
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error querying Aurora Scheduler for active IDs")
|
return nil, errors.Wrap(retryErr, "Error querying Aurora Scheduler for active IDs")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Construct instance id map to stay in line with thrift's representation of sets
|
// Construct instance id map to stay in line with thrift's representation of sets
|
||||||
|
@ -538,51 +476,31 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error) {
|
func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error) {
|
||||||
var resp *aurora.Response
|
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
var clientErr error
|
return r.readonlyClient.GetJobUpdateSummaries(jobUpdateQuery)
|
||||||
|
|
||||||
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
|
|
||||||
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
|
|
||||||
return r.readonlyClient.GetJobUpdateSummaries(jobUpdateQuery)
|
|
||||||
})
|
|
||||||
|
|
||||||
if clientErr != nil {
|
|
||||||
return false, clientErr
|
|
||||||
}
|
|
||||||
|
|
||||||
return true, nil
|
|
||||||
})
|
})
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error getting job update summaries from Aurora Scheduler")
|
return nil, errors.Wrap(retryErr, "Error getting job update summaries from Aurora Scheduler")
|
||||||
}
|
}
|
||||||
|
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsResult_, error) {
|
func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsResult_, error) {
|
||||||
|
|
||||||
var resp *aurora.Response
|
|
||||||
var result *aurora.GetJobsResult_
|
var result *aurora.GetJobsResult_
|
||||||
var clientErr error
|
|
||||||
|
|
||||||
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
|
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
|
return r.readonlyClient.GetJobs(role)
|
||||||
return r.readonlyClient.GetJobs(role)
|
|
||||||
})
|
|
||||||
|
|
||||||
if clientErr != nil {
|
|
||||||
return false, clientErr
|
|
||||||
}
|
|
||||||
return true, nil
|
|
||||||
})
|
})
|
||||||
|
|
||||||
if resp != nil && resp.GetResult_() != nil {
|
if resp.GetResult_() != nil {
|
||||||
result = resp.GetResult_().GetJobsResult_
|
result = resp.GetResult_().GetJobsResult_
|
||||||
}
|
}
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
return nil, result, errors.Wrap(clientErr, retryErr.Error()+": Error getting Jobs from Aurora Scheduler")
|
return nil, result, errors.Wrap(retryErr, "Error getting Jobs from Aurora Scheduler")
|
||||||
}
|
}
|
||||||
|
|
||||||
return resp, result, nil
|
return resp, result, nil
|
||||||
|
@ -592,23 +510,13 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe
|
||||||
func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) {
|
func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) {
|
||||||
|
|
||||||
instanceIds := make(map[int32]bool)
|
instanceIds := make(map[int32]bool)
|
||||||
var resp *aurora.Response
|
|
||||||
var clientErr error
|
|
||||||
|
|
||||||
for _, instId := range instances {
|
for _, instId := range instances {
|
||||||
instanceIds[instId] = true
|
instanceIds[instId] = true
|
||||||
}
|
}
|
||||||
|
|
||||||
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
|
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
|
return r.client.KillTasks(key, instanceIds, "")
|
||||||
return r.client.KillTasks(key, instanceIds, "")
|
|
||||||
})
|
|
||||||
|
|
||||||
if clientErr != nil {
|
|
||||||
return false, clientErr
|
|
||||||
}
|
|
||||||
|
|
||||||
return true, nil
|
|
||||||
})
|
})
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
|
@ -623,20 +531,10 @@ func (r *realisClient) RealisConfig() *RealisConfig {
|
||||||
|
|
||||||
// Sends a kill message to the scheduler for all active tasks under a job.
|
// Sends a kill message to the scheduler for all active tasks under a job.
|
||||||
func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
|
func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
|
||||||
var clientErr error
|
|
||||||
var resp *aurora.Response
|
|
||||||
|
|
||||||
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
|
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
|
// Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards
|
||||||
// Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards
|
return r.client.KillTasks(key, nil, "")
|
||||||
return r.client.KillTasks(key, nil, "")
|
|
||||||
})
|
|
||||||
|
|
||||||
if clientErr != nil {
|
|
||||||
return false, clientErr
|
|
||||||
}
|
|
||||||
|
|
||||||
return true, nil
|
|
||||||
})
|
})
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
|
@ -650,19 +548,9 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
|
||||||
// as that API uses the update thrift call which has a few extra features available.
|
// as that API uses the update thrift call which has a few extra features available.
|
||||||
// Use this API to create ad-hoc jobs.
|
// Use this API to create ad-hoc jobs.
|
||||||
func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
|
func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
|
||||||
var resp *aurora.Response
|
|
||||||
var clientErr error
|
|
||||||
|
|
||||||
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
|
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
|
return r.client.CreateJob(auroraJob.JobConfig())
|
||||||
return r.client.CreateJob(auroraJob.JobConfig())
|
|
||||||
})
|
|
||||||
|
|
||||||
if clientErr != nil {
|
|
||||||
return false, clientErr
|
|
||||||
}
|
|
||||||
|
|
||||||
return true, nil
|
|
||||||
})
|
})
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
|
@ -691,19 +579,9 @@ func (r *realisClient) CreateService(auroraJob Job, settings *aurora.JobUpdateSe
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) {
|
func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) {
|
||||||
var resp *aurora.Response
|
|
||||||
var clientErr error
|
|
||||||
|
|
||||||
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
|
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
|
return r.client.ScheduleCronJob(auroraJob.JobConfig())
|
||||||
return r.client.ScheduleCronJob(auroraJob.JobConfig())
|
|
||||||
})
|
|
||||||
|
|
||||||
if clientErr != nil {
|
|
||||||
return false, clientErr
|
|
||||||
}
|
|
||||||
|
|
||||||
return true, nil
|
|
||||||
})
|
})
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
|
@ -714,19 +592,8 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
|
||||||
|
|
||||||
func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) {
|
func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) {
|
||||||
|
|
||||||
var resp *aurora.Response
|
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
var clientErr error
|
return r.client.DescheduleCronJob(key)
|
||||||
|
|
||||||
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
|
|
||||||
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
|
|
||||||
return r.client.DescheduleCronJob(key)
|
|
||||||
})
|
|
||||||
|
|
||||||
if clientErr != nil {
|
|
||||||
return false, clientErr
|
|
||||||
}
|
|
||||||
|
|
||||||
return true, nil
|
|
||||||
})
|
})
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
|
@ -738,19 +605,9 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response,
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error) {
|
func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error) {
|
||||||
var resp *aurora.Response
|
|
||||||
var clientErr error
|
|
||||||
|
|
||||||
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
|
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
|
return r.client.StartCronJob(key)
|
||||||
return r.client.StartCronJob(key)
|
|
||||||
})
|
|
||||||
|
|
||||||
if clientErr != nil {
|
|
||||||
return false, clientErr
|
|
||||||
}
|
|
||||||
|
|
||||||
return true, nil
|
|
||||||
})
|
})
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
|
@ -767,19 +624,9 @@ func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32)
|
||||||
for _, instId := range instances {
|
for _, instId := range instances {
|
||||||
instanceIds[instId] = true
|
instanceIds[instId] = true
|
||||||
}
|
}
|
||||||
var resp *aurora.Response
|
|
||||||
var clientErr error
|
|
||||||
|
|
||||||
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
|
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
|
return r.client.RestartShards(key, instanceIds)
|
||||||
return r.client.RestartShards(key, instanceIds)
|
|
||||||
})
|
|
||||||
|
|
||||||
if clientErr != nil {
|
|
||||||
return false, clientErr
|
|
||||||
}
|
|
||||||
|
|
||||||
return true, nil
|
|
||||||
})
|
})
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
|
@ -795,19 +642,10 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
|
||||||
if err1 != nil {
|
if err1 != nil {
|
||||||
return nil, errors.Wrap(err1, "Could not retrieve relevant task instance IDs")
|
return nil, errors.Wrap(err1, "Could not retrieve relevant task instance IDs")
|
||||||
}
|
}
|
||||||
var resp *aurora.Response
|
|
||||||
var clientErr error
|
|
||||||
if len(instanceIds) > 0 {
|
if len(instanceIds) > 0 {
|
||||||
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
|
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
|
return r.client.RestartShards(key, instanceIds)
|
||||||
return r.client.RestartShards(key, instanceIds)
|
|
||||||
})
|
|
||||||
|
|
||||||
if clientErr != nil {
|
|
||||||
return false, clientErr
|
|
||||||
}
|
|
||||||
|
|
||||||
return true, nil
|
|
||||||
})
|
})
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
|
@ -823,19 +661,8 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
|
||||||
// Update all tasks under a job configuration. Currently gorealis doesn't support for canary deployments.
|
// Update all tasks under a job configuration. Currently gorealis doesn't support for canary deployments.
|
||||||
func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) {
|
func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) {
|
||||||
|
|
||||||
var resp *aurora.Response
|
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
var clientErr error
|
return r.client.StartJobUpdate(updateJob.req, message)
|
||||||
|
|
||||||
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
|
|
||||||
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
|
|
||||||
return r.client.StartJobUpdate(updateJob.req, message)
|
|
||||||
})
|
|
||||||
|
|
||||||
if clientErr != nil {
|
|
||||||
return false, clientErr
|
|
||||||
}
|
|
||||||
|
|
||||||
return true, nil
|
|
||||||
})
|
})
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
|
@ -845,22 +672,10 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au
|
||||||
}
|
}
|
||||||
|
|
||||||
// Abort Job Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI.
|
// Abort Job Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI.
|
||||||
func (r *realisClient) AbortJobUpdate(
|
func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) {
|
||||||
updateKey aurora.JobUpdateKey,
|
|
||||||
message string) (*aurora.Response, error) {
|
|
||||||
var resp *aurora.Response
|
|
||||||
var clientErr error
|
|
||||||
|
|
||||||
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
|
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
|
return r.client.AbortJobUpdate(&updateKey, message)
|
||||||
return r.client.AbortJobUpdate(&updateKey, message)
|
|
||||||
})
|
|
||||||
|
|
||||||
if clientErr != nil {
|
|
||||||
return false, clientErr
|
|
||||||
}
|
|
||||||
|
|
||||||
return true, nil
|
|
||||||
})
|
})
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
|
@ -871,21 +686,13 @@ func (r *realisClient) AbortJobUpdate(
|
||||||
|
|
||||||
//Pause Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.
|
//Pause Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.
|
||||||
func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) {
|
func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) {
|
||||||
var resp *aurora.Response
|
|
||||||
var clientErr error
|
|
||||||
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
|
|
||||||
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
|
|
||||||
return r.client.PauseJobUpdate(updateKey, message)
|
|
||||||
})
|
|
||||||
|
|
||||||
if clientErr != nil {
|
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
return false, clientErr
|
return r.client.PauseJobUpdate(updateKey, message)
|
||||||
}
|
|
||||||
|
|
||||||
return true, nil
|
|
||||||
})
|
})
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending PauseJobUpdate command to Aurora Scheduler")
|
return nil, errors.Wrap(retryErr, "Error sending PauseJobUpdate command to Aurora Scheduler")
|
||||||
}
|
}
|
||||||
|
|
||||||
return resp, nil
|
return resp, nil
|
||||||
|
@ -893,19 +700,11 @@ func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st
|
||||||
|
|
||||||
//Resume Paused Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.
|
//Resume Paused Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.
|
||||||
func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) {
|
func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) {
|
||||||
var resp *aurora.Response
|
|
||||||
var clientErr error
|
|
||||||
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
|
|
||||||
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
|
|
||||||
return r.client.ResumeJobUpdate(updateKey, message)
|
|
||||||
})
|
|
||||||
|
|
||||||
if clientErr != nil {
|
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
return false, clientErr
|
return r.client.ResumeJobUpdate(updateKey, message)
|
||||||
}
|
|
||||||
|
|
||||||
return true, nil
|
|
||||||
})
|
})
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
return nil, errors.Wrap(retryErr, "Error sending ResumeJobUpdate command to Aurora Scheduler")
|
return nil, errors.Wrap(retryErr, "Error sending ResumeJobUpdate command to Aurora Scheduler")
|
||||||
}
|
}
|
||||||
|
@ -916,19 +715,10 @@ func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s
|
||||||
//Pulse Job Update on Aurora. UpdateID is returned from StartJobUpdate or the Aurora web UI.
|
//Pulse Job Update on Aurora. UpdateID is returned from StartJobUpdate or the Aurora web UI.
|
||||||
func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.Response, error) {
|
func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.Response, error) {
|
||||||
|
|
||||||
var resp *aurora.Response
|
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
var clientErr error
|
return r.client.PulseJobUpdate(updateKey)
|
||||||
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
|
|
||||||
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
|
|
||||||
return r.client.PulseJobUpdate(updateKey)
|
|
||||||
})
|
|
||||||
|
|
||||||
if clientErr != nil {
|
|
||||||
return false, clientErr
|
|
||||||
}
|
|
||||||
|
|
||||||
return true, nil
|
|
||||||
})
|
})
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
return nil, errors.Wrap(retryErr, "Error sending PulseJobUpdate command to Aurora Scheduler")
|
return nil, errors.Wrap(retryErr, "Error sending PulseJobUpdate command to Aurora Scheduler")
|
||||||
}
|
}
|
||||||
|
@ -940,19 +730,8 @@ func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R
|
||||||
// instance to scale up.
|
// instance to scale up.
|
||||||
func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) {
|
func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) {
|
||||||
|
|
||||||
var resp *aurora.Response
|
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
var clientErr error
|
return r.client.AddInstances(&instKey, count)
|
||||||
|
|
||||||
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
|
|
||||||
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
|
|
||||||
return r.client.AddInstances(&instKey, count)
|
|
||||||
})
|
|
||||||
|
|
||||||
if clientErr != nil {
|
|
||||||
return false, clientErr
|
|
||||||
}
|
|
||||||
|
|
||||||
return true, nil
|
|
||||||
})
|
})
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
|
@ -987,19 +766,8 @@ func (r *realisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora
|
||||||
// Get information about task including a fully hydrated task configuration object
|
// Get information about task including a fully hydrated task configuration object
|
||||||
func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) {
|
func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) {
|
||||||
|
|
||||||
var resp *aurora.Response
|
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
var clientErr error
|
return r.client.GetTasksStatus(query)
|
||||||
|
|
||||||
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
|
|
||||||
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
|
|
||||||
return r.client.GetTasksStatus(query)
|
|
||||||
})
|
|
||||||
|
|
||||||
if clientErr != nil {
|
|
||||||
return false, clientErr
|
|
||||||
}
|
|
||||||
|
|
||||||
return true, nil
|
|
||||||
})
|
})
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
|
@ -1011,19 +779,9 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.S
|
||||||
|
|
||||||
// Get information about task including without a task configuration object
|
// Get information about task including without a task configuration object
|
||||||
func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) {
|
func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) {
|
||||||
var resp *aurora.Response
|
|
||||||
var clientErr error
|
|
||||||
|
|
||||||
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
|
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
|
return r.client.GetTasksWithoutConfigs(query)
|
||||||
return r.client.GetTasksWithoutConfigs(query)
|
|
||||||
})
|
|
||||||
|
|
||||||
if clientErr != nil {
|
|
||||||
return false, clientErr
|
|
||||||
}
|
|
||||||
|
|
||||||
return true, nil
|
|
||||||
})
|
})
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
|
@ -1048,19 +806,8 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
|
||||||
Statuses: aurora.ACTIVE_STATES,
|
Statuses: aurora.ACTIVE_STATES,
|
||||||
}
|
}
|
||||||
|
|
||||||
var resp *aurora.Response
|
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
var clientErr error
|
return r.client.GetTasksStatus(taskQ)
|
||||||
|
|
||||||
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
|
|
||||||
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
|
|
||||||
return r.client.GetTasksStatus(taskQ)
|
|
||||||
})
|
|
||||||
|
|
||||||
if clientErr != nil {
|
|
||||||
return false, clientErr
|
|
||||||
}
|
|
||||||
|
|
||||||
return true, nil
|
|
||||||
})
|
})
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
|
@ -1083,19 +830,8 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
|
||||||
|
|
||||||
func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aurora.Response, error) {
|
func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aurora.Response, error) {
|
||||||
|
|
||||||
var resp *aurora.Response
|
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
var clientErr error
|
return r.client.GetJobUpdateDetails(&updateQuery)
|
||||||
|
|
||||||
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
|
|
||||||
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
|
|
||||||
return r.client.GetJobUpdateDetails(&updateQuery)
|
|
||||||
})
|
|
||||||
|
|
||||||
if clientErr != nil {
|
|
||||||
return false, clientErr
|
|
||||||
}
|
|
||||||
|
|
||||||
return true, nil
|
|
||||||
})
|
})
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
|
@ -1106,19 +842,9 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) {
|
func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) {
|
||||||
var resp *aurora.Response
|
|
||||||
var clientErr error
|
|
||||||
|
|
||||||
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
|
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
|
return r.client.RollbackJobUpdate(&key, message)
|
||||||
return r.client.RollbackJobUpdate(&key, message)
|
|
||||||
})
|
|
||||||
|
|
||||||
if clientErr != nil {
|
|
||||||
return false, clientErr
|
|
||||||
}
|
|
||||||
|
|
||||||
return true, nil
|
|
||||||
})
|
})
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
|
@ -1132,9 +858,7 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string
|
||||||
// to return to running unless there is enough capacity in the cluster to run them.
|
// to return to running unless there is enough capacity in the cluster to run them.
|
||||||
func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.DrainHostsResult_, error) {
|
func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.DrainHostsResult_, error) {
|
||||||
|
|
||||||
var resp *aurora.Response
|
|
||||||
var result *aurora.DrainHostsResult_
|
var result *aurora.DrainHostsResult_
|
||||||
var clientErr error
|
|
||||||
|
|
||||||
if len(hosts) == 0 {
|
if len(hosts) == 0 {
|
||||||
return nil, nil, errors.New("no hosts provided to drain")
|
return nil, nil, errors.New("no hosts provided to drain")
|
||||||
|
@ -1146,16 +870,8 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr
|
||||||
drainList.HostNames[host] = true
|
drainList.HostNames[host] = true
|
||||||
}
|
}
|
||||||
|
|
||||||
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
|
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
|
return r.adminClient.DrainHosts(drainList)
|
||||||
return r.adminClient.DrainHosts(drainList)
|
|
||||||
})
|
|
||||||
|
|
||||||
if clientErr != nil {
|
|
||||||
return false, clientErr
|
|
||||||
}
|
|
||||||
|
|
||||||
return true, nil
|
|
||||||
})
|
})
|
||||||
|
|
||||||
if resp != nil && resp.GetResult_() != nil {
|
if resp != nil && resp.GetResult_() != nil {
|
||||||
|
@ -1171,9 +887,7 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr
|
||||||
|
|
||||||
func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *aurora.EndMaintenanceResult_, error) {
|
func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *aurora.EndMaintenanceResult_, error) {
|
||||||
|
|
||||||
var resp *aurora.Response
|
|
||||||
var result *aurora.EndMaintenanceResult_
|
var result *aurora.EndMaintenanceResult_
|
||||||
var clientErr error
|
|
||||||
|
|
||||||
if len(hosts) == 0 {
|
if len(hosts) == 0 {
|
||||||
return nil, nil, errors.New("no hosts provided to end maintenance on")
|
return nil, nil, errors.New("no hosts provided to end maintenance on")
|
||||||
|
@ -1185,19 +899,11 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror
|
||||||
hostList.HostNames[host] = true
|
hostList.HostNames[host] = true
|
||||||
}
|
}
|
||||||
|
|
||||||
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
|
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
|
return r.adminClient.EndMaintenance(hostList)
|
||||||
return r.adminClient.EndMaintenance(hostList)
|
|
||||||
})
|
|
||||||
|
|
||||||
if clientErr != nil {
|
|
||||||
return false, clientErr
|
|
||||||
}
|
|
||||||
|
|
||||||
return true, nil
|
|
||||||
})
|
})
|
||||||
|
|
||||||
if resp != nil && resp.GetResult_() != nil {
|
if resp.GetResult_() != nil {
|
||||||
result = resp.GetResult_().GetEndMaintenanceResult_()
|
result = resp.GetResult_().GetEndMaintenanceResult_()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1210,9 +916,7 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror
|
||||||
|
|
||||||
func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *aurora.MaintenanceStatusResult_, error) {
|
func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *aurora.MaintenanceStatusResult_, error) {
|
||||||
|
|
||||||
var resp *aurora.Response
|
|
||||||
var result *aurora.MaintenanceStatusResult_
|
var result *aurora.MaintenanceStatusResult_
|
||||||
var clientErr error
|
|
||||||
|
|
||||||
if len(hosts) == 0 {
|
if len(hosts) == 0 {
|
||||||
return nil, nil, errors.New("no hosts provided to get maintenance status from")
|
return nil, nil, errors.New("no hosts provided to get maintenance status from")
|
||||||
|
@ -1224,24 +928,13 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au
|
||||||
hostList.HostNames[host] = true
|
hostList.HostNames[host] = true
|
||||||
}
|
}
|
||||||
|
|
||||||
retryErr := ExponentialBackoff(defaultBackoff, func() (bool, error) {
|
// Make thrift call. If we encounter an error sending the call, attempt to reconnect
|
||||||
|
// and continue trying to resend command until we run out of retries.
|
||||||
// Make thrift call. If we encounter an error sending the call, attempt to reconnect
|
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
// and continue trying to resend command until we run out of retries.
|
return r.adminClient.MaintenanceStatus(hostList)
|
||||||
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
|
|
||||||
return r.adminClient.MaintenanceStatus(hostList)
|
|
||||||
})
|
|
||||||
|
|
||||||
if clientErr != nil {
|
|
||||||
return false, clientErr
|
|
||||||
}
|
|
||||||
|
|
||||||
// Successful call
|
|
||||||
return true, nil
|
|
||||||
|
|
||||||
})
|
})
|
||||||
|
|
||||||
if resp != nil && resp.GetResult_() != nil {
|
if resp.GetResult_() != nil {
|
||||||
result = resp.GetResult_().GetMaintenanceStatusResult_()
|
result = resp.GetResult_().GetMaintenanceStatusResult_()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
94
retry.go
94
retry.go
|
@ -21,9 +21,18 @@ import (
|
||||||
|
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
|
||||||
|
"github.com/paypal/gorealis/gen-go/apache/aurora"
|
||||||
|
"github.com/paypal/gorealis/response"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type Backoff struct {
|
||||||
|
Duration time.Duration // the base duration
|
||||||
|
Factor float64 // Duration is multipled by factor each iteration
|
||||||
|
Jitter float64 // The amount of jitter applied each iteration
|
||||||
|
Steps int // Exit with error after this many steps
|
||||||
|
}
|
||||||
|
|
||||||
// Jitter returns a time.Duration between duration and duration + maxFactor *
|
// Jitter returns a time.Duration between duration and duration + maxFactor *
|
||||||
// duration.
|
// duration.
|
||||||
//
|
//
|
||||||
|
@ -89,3 +98,88 @@ func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error {
|
||||||
return NewTimeoutError(errors.New("Timed out while retrying"))
|
return NewTimeoutError(errors.New("Timed out while retrying"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type auroraThriftCall func() (resp *aurora.Response, err error)
|
||||||
|
|
||||||
|
// Duplicates the functionality of ExponentialBackoff but is specifically targeted towards ThriftCalls.
|
||||||
|
func (r *realisClient) ThriftCallWithRetries(thriftCall auroraThriftCall) (*aurora.Response, error) {
|
||||||
|
var resp *aurora.Response
|
||||||
|
var clientErr error
|
||||||
|
|
||||||
|
backoff := r.config.backoff
|
||||||
|
duration := backoff.Duration
|
||||||
|
|
||||||
|
for i := 0; i < backoff.Steps; i++ {
|
||||||
|
|
||||||
|
// If this isn't our first try, backoff before the next try.
|
||||||
|
if i != 0 {
|
||||||
|
adjusted := duration
|
||||||
|
if backoff.Jitter > 0.0 {
|
||||||
|
adjusted = Jitter(duration, backoff.Jitter)
|
||||||
|
}
|
||||||
|
|
||||||
|
r.logger.Printf("An error occurred during thrift call, backing off for %v before retrying\n", adjusted)
|
||||||
|
|
||||||
|
time.Sleep(adjusted)
|
||||||
|
duration = time.Duration(float64(duration) * backoff.Factor)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only allow one go-routine make use or modify the thrift client connection.
|
||||||
|
// Placing this in an anonymous function in order to create a new, short-lived stack allowing unlock
|
||||||
|
// to be run in case of a panic inside of thriftCall.
|
||||||
|
func() {
|
||||||
|
r.lock.Lock()
|
||||||
|
defer r.lock.Unlock()
|
||||||
|
resp, clientErr = thriftCall()
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Check if our thrift call is returning an error. This is a retriable event as we don't know
|
||||||
|
// if it was caused by network issues.
|
||||||
|
if clientErr != nil {
|
||||||
|
r.ReestablishConn()
|
||||||
|
|
||||||
|
// In the future, reestablish connection should be able to check if it is actually possible
|
||||||
|
// to make a thrift call to Aurora. For now, a reconnect should always lead to a retry.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// If there was no client error, but the response is nil, something went wrong.
|
||||||
|
// Ideally, we'll never encounter this but we're placing a safeguard here.
|
||||||
|
if resp == nil {
|
||||||
|
return nil, errors.New("Response from aurora is nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check Response Code from thrift and make a decision to continue retrying or not.
|
||||||
|
switch responseCode := resp.GetResponseCode(); responseCode {
|
||||||
|
|
||||||
|
// If the thrift call succeeded, stop retrying
|
||||||
|
case aurora.ResponseCode_OK:
|
||||||
|
return resp, nil
|
||||||
|
|
||||||
|
// If the response code is transient, continue retrying
|
||||||
|
case aurora.ResponseCode_ERROR_TRANSIENT:
|
||||||
|
r.logger.Println("Aurora replied with Transient error code, retrying")
|
||||||
|
continue
|
||||||
|
|
||||||
|
// Failure scenarios, these indicate a bad payload or a bad config. Stop retrying.
|
||||||
|
case aurora.ResponseCode_INVALID_REQUEST:
|
||||||
|
case aurora.ResponseCode_ERROR:
|
||||||
|
case aurora.ResponseCode_AUTH_FAILED:
|
||||||
|
case aurora.ResponseCode_JOB_UPDATING_ERROR:
|
||||||
|
return nil, errors.New(response.CombineMessage(resp))
|
||||||
|
|
||||||
|
// The only case that should fall down to here is a WARNING response code.
|
||||||
|
// It is currently not used as a response in the scheduler so it is unknown how to handle it.
|
||||||
|
default:
|
||||||
|
return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// Provide more information to the user wherever possible.
|
||||||
|
if clientErr != nil {
|
||||||
|
return nil, NewTimeoutError(errors.Wrap(clientErr, "Timed out while retrying, including latest error"))
|
||||||
|
} else {
|
||||||
|
return nil, NewTimeoutError(errors.New("Timed out while retrying"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
5
zk.go
5
zk.go
|
@ -93,11 +93,12 @@ func LeaderFromZK(cluster Cluster) (string, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return false, errors.New("No leader found")
|
// Leader data might not be available yet, try to fetch again.
|
||||||
|
return false, NewTemporaryError(errors.New("No leader found"))
|
||||||
})
|
})
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
return "", errors.Wrapf(retryErr, "Failed to determine leader after %v attempts", defaultBackoff.Steps)
|
return "", NewTimeoutError(errors.Wrapf(retryErr, "Failed to determine leader after %v attempts", defaultBackoff.Steps))
|
||||||
}
|
}
|
||||||
|
|
||||||
return zkurl, nil
|
return zkurl, nil
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue