Removing double closure at as it's unmaintable and can be error prone. Separated back offs into a generic one and a thrift call specific one.

This commit is contained in:
Renan DelValle 2018-02-12 15:30:50 -08:00
parent 49342ec140
commit 3a8442522f
No known key found for this signature in database
GPG key ID: C240AD6D6F443EC9
2 changed files with 148 additions and 373 deletions

View file

@ -21,9 +21,18 @@ import (
"math/rand"
"github.com/paypal/gorealis/gen-go/apache/aurora"
"github.com/paypal/gorealis/response"
"github.com/pkg/errors"
)
type Backoff struct {
Duration time.Duration // the base duration
Factor float64 // Duration is multipled by factor each iteration
Jitter float64 // The amount of jitter applied each iteration
Steps int // Exit with error after this many steps
}
// Jitter returns a time.Duration between duration and duration + maxFactor *
// duration.
//
@ -89,3 +98,76 @@ func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error {
return NewTimeoutError(errors.New("Timed out while retrying"))
}
}
type auroraThriftCall func() (resp *aurora.Response, err error)
// Duplicates the functionality of ExponentialBackoff but is specifically targeted towards ThriftCalls.
func (r *realisClient) ThriftCallWithRetries(thriftCall auroraThriftCall) (*aurora.Response, error) {
var resp *aurora.Response
var clientErr error
backoff := r.config.backoff
duration := backoff.Duration
for i := 0; i < backoff.Steps; i++ {
// If this isn't our first try, backoff before the next try
if i != 0 {
adjusted := duration
if backoff.Jitter > 0.0 {
adjusted = Jitter(duration, backoff.Jitter)
}
time.Sleep(adjusted)
duration = time.Duration(float64(duration) * backoff.Factor)
}
// Only allow one go-routine make use or modify the thrift client connection
r.lock.Lock()
resp, clientErr = thriftCall()
r.lock.Unlock()
if clientErr != nil {
r.ReestablishConn()
// In the future, reestablish connection should be able to check if it is actually possible
// to make a thrift call to Aurora. For now, a reconnect should always lead to a retry.
continue
}
if resp == nil {
return nil, errors.New("Response from aurora is nil")
}
// Check Response Code from thrift and make a decision to continue retrying or not
switch responseCode := resp.GetResponseCode(); responseCode {
// If the thrift call succeeded, stop retrying
case aurora.ResponseCode_OK:
return resp, nil
// If the response code is transient, continue retrying
case aurora.ResponseCode_ERROR_TRANSIENT:
continue
// Failure scenarios, these indicate a bad payload or config. Stop retrying.
case aurora.ResponseCode_INVALID_REQUEST:
case aurora.ResponseCode_ERROR:
case aurora.ResponseCode_AUTH_FAILED:
case aurora.ResponseCode_JOB_UPDATING_ERROR:
return nil, errors.New(response.CombineMessage(resp))
// The only case that should fall down to here is WARNING. It is currently not used
// as a response in the scheduler.
default:
return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String())
}
}
// Provide more information to the user wherever possible.
if clientErr != nil {
return nil, NewTimeoutError(errors.Wrap(clientErr, "Timed out while retrying, including latest error"))
} else {
return nil, NewTimeoutError(errors.New("Timed out while retrying"))
}
}