From ae295b9ceab51772e5c0eccb94266a4d58a04755 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A1n=20Del=20Valle?= Date: Fri, 30 Apr 2021 11:08:28 -0700 Subject: [PATCH] Rewrite retry logic Reduced nesting levels of retry logic. --- retry.go | 152 ++++++++++++++++++++++++++++++------------------------- 1 file changed, 84 insertions(+), 68 deletions(-) diff --git a/retry.go b/retry.go index 4a2ffbc..1ecf467 100644 --- a/retry.go +++ b/retry.go @@ -113,6 +113,8 @@ func ExponentialBackoff(backoff Backoff, logger logger, condition ConditionFunc) } type auroraThriftCall func() (resp *aurora.Response, err error) + +// Verification functions should strive to keep the number of thrift calls to one wherever possible. type verifyOnTimeout func() (*aurora.Response, bool) // Duplicates the functionality of ExponentialBackoff but is specifically targeted towards ThriftCalls. @@ -162,50 +164,37 @@ func (r *realisClient) thriftCallWithRetries( // Check if our thrift call is returning an error. This is a retryable event as we don't know // if it was caused by network issues. if clientErr != nil { - // Print out the error to the user r.logger.Printf("Client Error: %v", clientErr) - // Determine if error is a temporary URL error by going up the stack - e, ok := clientErr.(thrift.TTransportException) - if ok { - r.logger.debugPrint("Encountered a transport exception") + temporary, timedout := processClientError(clientErr) + if !temporary && r.RealisConfig().failOnPermanentErrors { + return nil, errors.Wrap(clientErr, "permanent connection error") + } - e, ok := e.Err().(*url.Error) - if ok { + if timedout { + timeouts++ + r.logger.debugPrintf( + "Client closed connection %d times before server responded, "+ + "consider increasing connection timeout", + timeouts) - // EOF error occurs when the server closes the read buffer of the client. This is common - // when the server is overloaded and should be retried. All other errors that are permanent - // will not be retried. - if e.Err != io.EOF && !e.Temporary() && r.RealisConfig().failOnPermanentErrors { - return nil, errors.Wrap(clientErr, "permanent connection error") + // Allow caller to provide a function which checks if the original call was successful before + // it timed out. + if verifyOnTimeout != nil { + if verifyResp, ok := verifyOnTimeout(); ok { + r.logger.debugPrint("verified that the call went through successfully") + // Response here might be different than the original. + return verifyResp, nil } + } - // Corner case where thrift payload was received by Aurora but connection timed out before Aurora was - // able to reply. In this case we will return whatever response was received and a TimedOut behaving - // error. Users can take special action on a timeout by using IsTimedout and reacting accordingly. - if e.Timeout() { - timeouts++ - r.logger.debugPrintf( - "Client closed connection (timedout) %d times before server responded, "+ - "consider increasing connection timeout", - timeouts) - if returnOnTimeout { - return resp, newTimedoutError(errors.New("client connection closed before server answer")) - } - - // Allow caller to provide a function which checks if the original call was successful before - // it timed out. - if verifyOnTimeout != nil { - verifyResp, ok := verifyOnTimeout() - if ok { - r.logger.debugPrint("verified that the call went through successfully") - // Response might be different than the original. - return verifyResp, nil - - } - } - } + if returnOnTimeout { + // There exists a corner case where thrift payload was received by Aurora but + // connection timed out before Aurora was able to reply. + // Users can take special action on a timeout by using IsTimedout and reacting accordingly + // if they have configured the client to return on a timeout. + return resp, newTimedoutError(errors.New("client connection closed before server answer")) } } @@ -216,48 +205,48 @@ func (r *realisClient) thriftCallWithRetries( if reestablishErr != nil { r.logger.debugPrintf("error re-establishing connection ", reestablishErr) } - } else { - // If there was no client error, but the response is nil, something went wrong. - // Ideally, we'll never encounter this but we're placing a safeguard here. - if resp == nil { - return nil, errors.New("response from aurora is nil") - } + // Retry the thrift payload + continue + } - // Check Response Code from thrift and make a decision to continue retrying or not. - switch responseCode := resp.GetResponseCode(); responseCode { + // If there was no client error, but the response is nil, something went wrong. + // Ideally, we'll never encounter this but we're placing a safeguard here. + if resp == nil { + return nil, errors.New("response from aurora is nil") + } - // If the thrift call succeeded, stop retrying - case aurora.ResponseCode_OK: - return resp, nil + // Check Response Code from thrift and make a decision to continue retrying or not. + switch responseCode := resp.GetResponseCode(); responseCode { - // If the response code is transient, continue retrying - case aurora.ResponseCode_ERROR_TRANSIENT: - r.logger.Println("Aurora replied with Transient error code, retrying") - continue + // If the thrift call succeeded, stop retrying + case aurora.ResponseCode_OK: + return resp, nil - // Failure scenarios, these indicate a bad payload or a bad config. Stop retrying. - case aurora.ResponseCode_INVALID_REQUEST, - aurora.ResponseCode_ERROR, - aurora.ResponseCode_AUTH_FAILED, - aurora.ResponseCode_JOB_UPDATING_ERROR: - r.logger.Printf("Terminal Response Code %v from Aurora, won't retry\n", resp.GetResponseCode().String()) - return resp, errors.New(response.CombineMessage(resp)) + // If the response code is transient, continue retrying + case aurora.ResponseCode_ERROR_TRANSIENT: + r.logger.Println("Aurora replied with Transient error code, retrying") + continue - // The only case that should fall down to here is a WARNING response code. - // It is currently not used as a response in the scheduler so it is unknown how to handle it. - default: - r.logger.debugPrintf("unhandled response code %v received from Aurora\n", responseCode) - return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String()) - } + // Failure scenarios, these indicate a bad payload or a bad config. Stop retrying. + case aurora.ResponseCode_INVALID_REQUEST, + aurora.ResponseCode_ERROR, + aurora.ResponseCode_AUTH_FAILED, + aurora.ResponseCode_JOB_UPDATING_ERROR: + r.logger.Printf("Terminal Response Code %v from Aurora, won't retry\n", resp.GetResponseCode().String()) + return resp, errors.New(response.CombineMessage(resp)) + + // The only case that should fall down to here is a WARNING response code. + // It is currently not used as a response in the scheduler so it is unknown how to handle it. + default: + r.logger.debugPrintf("unhandled response code %v received from Aurora\n", responseCode) + return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String()) } } - r.logger.debugPrintf("it took %v retries to complete this operation\n", curStep) - if curStep > 1 { - r.config.logger.Printf("retried this thrift call %d time(s)", curStep) + r.config.logger.Printf("this thrift call was retried %d time(s)", curStep) } // Provide more information to the user wherever possible. @@ -267,3 +256,30 @@ func (r *realisClient) thriftCallWithRetries( return nil, newRetryError(errors.New("ran out of retries"), curStep) } + +// processClientError processes the error received by the client. +// The return values indicate weather this was determined to be a temporary error +// and weather it was determined to be a timeout error +func processClientError(err error) (bool, bool) { + + // Determine if error is a temporary URL error by going up the stack + transportException, ok := err.(thrift.TTransportException) + if !ok { + return false, false + } + + urlError, ok := transportException.Err().(*url.Error) + if !ok { + return false, false + } + + // EOF error occurs when the server closes the read buffer of the client. This is common + // when the server is overloaded and we consider it temporary. + // All other which are not temporary as per the member function Temporary(), + // are considered not temporary (permanent). + if urlError.Err != io.EOF && !urlError.Temporary() { + return false, false + } + + return true, urlError.Timeout() +}