Rewrite retry logic
Reduced nesting levels of retry logic.
This commit is contained in:
parent
34a41e86a8
commit
ae295b9cea
1 changed files with 84 additions and 68 deletions
152
retry.go
152
retry.go
|
@ -113,6 +113,8 @@ func ExponentialBackoff(backoff Backoff, logger logger, condition ConditionFunc)
|
||||||
}
|
}
|
||||||
|
|
||||||
type auroraThriftCall func() (resp *aurora.Response, err error)
|
type auroraThriftCall func() (resp *aurora.Response, err error)
|
||||||
|
|
||||||
|
// Verification functions should strive to keep the number of thrift calls to one wherever possible.
|
||||||
type verifyOnTimeout func() (*aurora.Response, bool)
|
type verifyOnTimeout func() (*aurora.Response, bool)
|
||||||
|
|
||||||
// Duplicates the functionality of ExponentialBackoff but is specifically targeted towards ThriftCalls.
|
// Duplicates the functionality of ExponentialBackoff but is specifically targeted towards ThriftCalls.
|
||||||
|
@ -162,50 +164,37 @@ func (r *realisClient) thriftCallWithRetries(
|
||||||
// Check if our thrift call is returning an error. This is a retryable event as we don't know
|
// Check if our thrift call is returning an error. This is a retryable event as we don't know
|
||||||
// if it was caused by network issues.
|
// if it was caused by network issues.
|
||||||
if clientErr != nil {
|
if clientErr != nil {
|
||||||
|
|
||||||
// Print out the error to the user
|
// Print out the error to the user
|
||||||
r.logger.Printf("Client Error: %v", clientErr)
|
r.logger.Printf("Client Error: %v", clientErr)
|
||||||
|
|
||||||
// Determine if error is a temporary URL error by going up the stack
|
temporary, timedout := processClientError(clientErr)
|
||||||
e, ok := clientErr.(thrift.TTransportException)
|
if !temporary && r.RealisConfig().failOnPermanentErrors {
|
||||||
if ok {
|
return nil, errors.Wrap(clientErr, "permanent connection error")
|
||||||
r.logger.debugPrint("Encountered a transport exception")
|
}
|
||||||
|
|
||||||
e, ok := e.Err().(*url.Error)
|
if timedout {
|
||||||
if ok {
|
timeouts++
|
||||||
|
r.logger.debugPrintf(
|
||||||
|
"Client closed connection %d times before server responded, "+
|
||||||
|
"consider increasing connection timeout",
|
||||||
|
timeouts)
|
||||||
|
|
||||||
// EOF error occurs when the server closes the read buffer of the client. This is common
|
// Allow caller to provide a function which checks if the original call was successful before
|
||||||
// when the server is overloaded and should be retried. All other errors that are permanent
|
// it timed out.
|
||||||
// will not be retried.
|
if verifyOnTimeout != nil {
|
||||||
if e.Err != io.EOF && !e.Temporary() && r.RealisConfig().failOnPermanentErrors {
|
if verifyResp, ok := verifyOnTimeout(); ok {
|
||||||
return nil, errors.Wrap(clientErr, "permanent connection error")
|
r.logger.debugPrint("verified that the call went through successfully")
|
||||||
|
// Response here might be different than the original.
|
||||||
|
return verifyResp, nil
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Corner case where thrift payload was received by Aurora but connection timed out before Aurora was
|
if returnOnTimeout {
|
||||||
// able to reply. In this case we will return whatever response was received and a TimedOut behaving
|
// There exists a corner case where thrift payload was received by Aurora but
|
||||||
// error. Users can take special action on a timeout by using IsTimedout and reacting accordingly.
|
// connection timed out before Aurora was able to reply.
|
||||||
if e.Timeout() {
|
// Users can take special action on a timeout by using IsTimedout and reacting accordingly
|
||||||
timeouts++
|
// if they have configured the client to return on a timeout.
|
||||||
r.logger.debugPrintf(
|
return resp, newTimedoutError(errors.New("client connection closed before server answer"))
|
||||||
"Client closed connection (timedout) %d times before server responded, "+
|
|
||||||
"consider increasing connection timeout",
|
|
||||||
timeouts)
|
|
||||||
if returnOnTimeout {
|
|
||||||
return resp, newTimedoutError(errors.New("client connection closed before server answer"))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Allow caller to provide a function which checks if the original call was successful before
|
|
||||||
// it timed out.
|
|
||||||
if verifyOnTimeout != nil {
|
|
||||||
verifyResp, ok := verifyOnTimeout()
|
|
||||||
if ok {
|
|
||||||
r.logger.debugPrint("verified that the call went through successfully")
|
|
||||||
// Response might be different than the original.
|
|
||||||
return verifyResp, nil
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -216,48 +205,48 @@ func (r *realisClient) thriftCallWithRetries(
|
||||||
if reestablishErr != nil {
|
if reestablishErr != nil {
|
||||||
r.logger.debugPrintf("error re-establishing connection ", reestablishErr)
|
r.logger.debugPrintf("error re-establishing connection ", reestablishErr)
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
|
|
||||||
// If there was no client error, but the response is nil, something went wrong.
|
// Retry the thrift payload
|
||||||
// Ideally, we'll never encounter this but we're placing a safeguard here.
|
continue
|
||||||
if resp == nil {
|
}
|
||||||
return nil, errors.New("response from aurora is nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check Response Code from thrift and make a decision to continue retrying or not.
|
// If there was no client error, but the response is nil, something went wrong.
|
||||||
switch responseCode := resp.GetResponseCode(); responseCode {
|
// Ideally, we'll never encounter this but we're placing a safeguard here.
|
||||||
|
if resp == nil {
|
||||||
|
return nil, errors.New("response from aurora is nil")
|
||||||
|
}
|
||||||
|
|
||||||
// If the thrift call succeeded, stop retrying
|
// Check Response Code from thrift and make a decision to continue retrying or not.
|
||||||
case aurora.ResponseCode_OK:
|
switch responseCode := resp.GetResponseCode(); responseCode {
|
||||||
return resp, nil
|
|
||||||
|
|
||||||
// If the response code is transient, continue retrying
|
// If the thrift call succeeded, stop retrying
|
||||||
case aurora.ResponseCode_ERROR_TRANSIENT:
|
case aurora.ResponseCode_OK:
|
||||||
r.logger.Println("Aurora replied with Transient error code, retrying")
|
return resp, nil
|
||||||
continue
|
|
||||||
|
|
||||||
// Failure scenarios, these indicate a bad payload or a bad config. Stop retrying.
|
// If the response code is transient, continue retrying
|
||||||
case aurora.ResponseCode_INVALID_REQUEST,
|
case aurora.ResponseCode_ERROR_TRANSIENT:
|
||||||
aurora.ResponseCode_ERROR,
|
r.logger.Println("Aurora replied with Transient error code, retrying")
|
||||||
aurora.ResponseCode_AUTH_FAILED,
|
continue
|
||||||
aurora.ResponseCode_JOB_UPDATING_ERROR:
|
|
||||||
r.logger.Printf("Terminal Response Code %v from Aurora, won't retry\n", resp.GetResponseCode().String())
|
|
||||||
return resp, errors.New(response.CombineMessage(resp))
|
|
||||||
|
|
||||||
// The only case that should fall down to here is a WARNING response code.
|
// Failure scenarios, these indicate a bad payload or a bad config. Stop retrying.
|
||||||
// It is currently not used as a response in the scheduler so it is unknown how to handle it.
|
case aurora.ResponseCode_INVALID_REQUEST,
|
||||||
default:
|
aurora.ResponseCode_ERROR,
|
||||||
r.logger.debugPrintf("unhandled response code %v received from Aurora\n", responseCode)
|
aurora.ResponseCode_AUTH_FAILED,
|
||||||
return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String())
|
aurora.ResponseCode_JOB_UPDATING_ERROR:
|
||||||
}
|
r.logger.Printf("Terminal Response Code %v from Aurora, won't retry\n", resp.GetResponseCode().String())
|
||||||
|
return resp, errors.New(response.CombineMessage(resp))
|
||||||
|
|
||||||
|
// The only case that should fall down to here is a WARNING response code.
|
||||||
|
// It is currently not used as a response in the scheduler so it is unknown how to handle it.
|
||||||
|
default:
|
||||||
|
r.logger.debugPrintf("unhandled response code %v received from Aurora\n", responseCode)
|
||||||
|
return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
r.logger.debugPrintf("it took %v retries to complete this operation\n", curStep)
|
|
||||||
|
|
||||||
if curStep > 1 {
|
if curStep > 1 {
|
||||||
r.config.logger.Printf("retried this thrift call %d time(s)", curStep)
|
r.config.logger.Printf("this thrift call was retried %d time(s)", curStep)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Provide more information to the user wherever possible.
|
// Provide more information to the user wherever possible.
|
||||||
|
@ -267,3 +256,30 @@ func (r *realisClient) thriftCallWithRetries(
|
||||||
|
|
||||||
return nil, newRetryError(errors.New("ran out of retries"), curStep)
|
return nil, newRetryError(errors.New("ran out of retries"), curStep)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// processClientError processes the error received by the client.
|
||||||
|
// The return values indicate weather this was determined to be a temporary error
|
||||||
|
// and weather it was determined to be a timeout error
|
||||||
|
func processClientError(err error) (bool, bool) {
|
||||||
|
|
||||||
|
// Determine if error is a temporary URL error by going up the stack
|
||||||
|
transportException, ok := err.(thrift.TTransportException)
|
||||||
|
if !ok {
|
||||||
|
return false, false
|
||||||
|
}
|
||||||
|
|
||||||
|
urlError, ok := transportException.Err().(*url.Error)
|
||||||
|
if !ok {
|
||||||
|
return false, false
|
||||||
|
}
|
||||||
|
|
||||||
|
// EOF error occurs when the server closes the read buffer of the client. This is common
|
||||||
|
// when the server is overloaded and we consider it temporary.
|
||||||
|
// All other which are not temporary as per the member function Temporary(),
|
||||||
|
// are considered not temporary (permanent).
|
||||||
|
if urlError.Err != io.EOF && !urlError.Temporary() {
|
||||||
|
return false, false
|
||||||
|
}
|
||||||
|
|
||||||
|
return true, urlError.Timeout()
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue