Fixing possible race condition when passing backoff around as a pointer.
This commit is contained in:
parent
c0d2969976
commit
7152f568fe
3 changed files with 13 additions and 8 deletions
12
realis.go
12
realis.go
|
@ -90,7 +90,7 @@ type RealisConfig struct {
|
||||||
timeoutms int
|
timeoutms int
|
||||||
binTransport, jsonTransport bool
|
binTransport, jsonTransport bool
|
||||||
cluster *Cluster
|
cluster *Cluster
|
||||||
backoff *Backoff
|
backoff Backoff
|
||||||
transport thrift.TTransport
|
transport thrift.TTransport
|
||||||
protoFactory thrift.TProtocolFactory
|
protoFactory thrift.TProtocolFactory
|
||||||
logger Logger
|
logger Logger
|
||||||
|
@ -141,7 +141,7 @@ func ZKUrl(url string) ClientOption {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Retries(backoff *Backoff) ClientOption {
|
func Retries(backoff Backoff) ClientOption {
|
||||||
return func(config *RealisConfig) {
|
return func(config *RealisConfig) {
|
||||||
config.backoff = backoff
|
config.backoff = backoff
|
||||||
}
|
}
|
||||||
|
@ -159,7 +159,7 @@ func ThriftBinary() ClientOption {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func BackOff(b *Backoff) ClientOption {
|
func BackOff(b Backoff) ClientOption {
|
||||||
return func(config *RealisConfig) {
|
return func(config *RealisConfig) {
|
||||||
config.backoff = b
|
config.backoff = b
|
||||||
}
|
}
|
||||||
|
@ -220,7 +220,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
|
||||||
|
|
||||||
// Default configs
|
// Default configs
|
||||||
config.timeoutms = 10000
|
config.timeoutms = 10000
|
||||||
config.backoff = &defaultBackoff
|
config.backoff = defaultBackoff
|
||||||
config.logger = NoopLogger{}
|
config.logger = NoopLogger{}
|
||||||
config.options = options
|
config.options = options
|
||||||
|
|
||||||
|
@ -570,14 +570,14 @@ func (r *realisClient) CreateService(auroraJob Job, settings *aurora.JobUpdateSe
|
||||||
|
|
||||||
resp, err := r.StartJobUpdate(update, "")
|
resp, err := r.StartJobUpdate(update, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return resp, nil, errors.Wrap(err, "unable to create service")
|
return nil, nil, errors.Wrap(err, "unable to create service")
|
||||||
}
|
}
|
||||||
|
|
||||||
if resp != nil && resp.GetResult_() != nil {
|
if resp != nil && resp.GetResult_() != nil {
|
||||||
return resp, resp.GetResult_().GetStartJobUpdateResult_(), nil
|
return resp, resp.GetResult_().GetStartJobUpdateResult_(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return resp, nil, errors.New("results object is nil")
|
return nil, nil, errors.New("results object is nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) {
|
func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) {
|
||||||
|
|
|
@ -61,7 +61,7 @@ func TestMain(m *testing.M) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNonExistentEndpoint(t *testing.T) {
|
func TestNonExistentEndpoint(t *testing.T) {
|
||||||
backoff := &realis.Backoff{ // Reduce penalties for this test to make it quick
|
backoff := realis.Backoff{ // Reduce penalties for this test to make it quick
|
||||||
Steps: 5,
|
Steps: 5,
|
||||||
Duration: 1 * time.Second,
|
Duration: 1 * time.Second,
|
||||||
Factor: 1.0,
|
Factor: 1.0,
|
||||||
|
|
7
retry.go
7
retry.go
|
@ -134,7 +134,7 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro
|
||||||
adjusted = Jitter(duration, backoff.Jitter)
|
adjusted = Jitter(duration, backoff.Jitter)
|
||||||
}
|
}
|
||||||
|
|
||||||
r.logger.Printf("A retriable error occurred during thrift call, backing off for %v before retrying\n", adjusted)
|
r.logger.Printf("A retriable error occurred during thrift call, backing off for %v before retry %v\n", adjusted, curStep+1)
|
||||||
|
|
||||||
time.Sleep(adjusted)
|
time.Sleep(adjusted)
|
||||||
duration = time.Duration(float64(duration) * backoff.Factor)
|
duration = time.Duration(float64(duration) * backoff.Factor)
|
||||||
|
@ -146,6 +146,7 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro
|
||||||
func() {
|
func() {
|
||||||
r.lock.Lock()
|
r.lock.Lock()
|
||||||
defer r.lock.Unlock()
|
defer r.lock.Unlock()
|
||||||
|
r.logger.Println("Beginning Aurora Thrift Call")
|
||||||
resp, clientErr = thriftCall()
|
resp, clientErr = thriftCall()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@ -169,6 +170,9 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro
|
||||||
return nil, errors.New("Response from aurora is nil")
|
return nil, errors.New("Response from aurora is nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Log response code for debugging
|
||||||
|
r.logger.Printf("Aurora replied with code: %v\n", resp.GetResponseCode().String())
|
||||||
|
|
||||||
// Check Response Code from thrift and make a decision to continue retrying or not.
|
// Check Response Code from thrift and make a decision to continue retrying or not.
|
||||||
switch responseCode := resp.GetResponseCode(); responseCode {
|
switch responseCode := resp.GetResponseCode(); responseCode {
|
||||||
|
|
||||||
|
@ -191,6 +195,7 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro
|
||||||
// The only case that should fall down to here is a WARNING response code.
|
// The only case that should fall down to here is a WARNING response code.
|
||||||
// It is currently not used as a response in the scheduler so it is unknown how to handle it.
|
// It is currently not used as a response in the scheduler so it is unknown how to handle it.
|
||||||
default:
|
default:
|
||||||
|
r.logger.Println("unhandled response code received from Aurora")
|
||||||
return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String())
|
return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue