diff --git a/realis.go b/realis.go index 9972a2a..97e588f 100644 --- a/realis.go +++ b/realis.go @@ -251,6 +251,10 @@ func newTBinTransport(url string, timeout int, config *RealisConfig) (thrift.TTr return trans, err } +// This client implementation of the realis interface uses a retry mechanism for all Thrift Calls. +// It will retry all calls which result in a temporary failure as well as calls that fail due to an EOF +// being returned by the http client. Most permanent failures are now being caught by the thriftCallWithRetries +// function and not being retried but there may be corner cases not yet handled. func NewRealisClient(options ...ClientOption) (Realis, error) { config := &RealisConfig{} @@ -443,7 +447,7 @@ func newTJSONConfig(url string, timeoutms int, config *RealisConfig) (*RealisCon httpTrans := (trans).(*thrift.THttpClient) httpTrans.SetHeader("Content-Type", "application/x-thrift") - httpTrans.SetHeader("User-Agent", "GoRealis v"+VERSION) + httpTrans.SetHeader("User-Agent", "gorealis v"+VERSION) return &RealisConfig{transport: trans, protoFactory: thrift.NewTJSONProtocolFactory()}, nil } @@ -460,7 +464,7 @@ func newTBinaryConfig(url string, timeoutms int, config *RealisConfig) (*RealisC httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary") httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary") - httpTrans.SetHeader("User-Agent", "GoRealis v"+VERSION) + httpTrans.SetHeader("User-Agent", "gorealis v"+VERSION) return &RealisConfig{transport: trans, protoFactory: thrift.NewTBinaryProtocolFactoryDefault()}, nil @@ -476,6 +480,9 @@ func (r *realisClient) ReestablishConn() error { r.logger.Println("Re-establishing Connection to Aurora") r.Close() + r.lock.Lock() + defer r.lock.Unlock() + // Recreate connection from scratch using original options newRealis, err := NewRealisClient(r.config.options...) if err != nil { @@ -498,6 +505,10 @@ func (r *realisClient) ReestablishConn() error { // Releases resources associated with the realis client. func (r *realisClient) Close() { + + r.lock.Lock() + defer r.lock.Unlock() + r.client.Transport.Close() r.readonlyClient.Transport.Close() r.adminClient.Transport.Close() @@ -555,14 +566,14 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe return r.readonlyClient.GetJobs(role) }) - if resp.GetResult_() != nil { - result = resp.GetResult_().GetJobsResult_ - } - if retryErr != nil { return nil, result, errors.Wrap(retryErr, "Error getting Jobs from Aurora Scheduler") } + if resp.GetResult_() != nil { + result = resp.GetResult_().GetJobsResult_ + } + return resp, result, nil } @@ -635,7 +646,7 @@ func (r *realisClient) CreateService(auroraJob Job, settings *aurora.JobUpdateSe return resp, nil, errors.Wrap(err, "unable to create service") } - if resp != nil && resp.GetResult_() != nil { + if resp.GetResult_() != nil { return resp, resp.GetResult_().GetStartJobUpdateResult_(), nil } @@ -879,7 +890,7 @@ func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) (pendingReasons var result map[*aurora.PendingReason]bool - if resp != nil && resp.GetResult_() != nil { + if resp.GetResult_() != nil { result = resp.GetResult_().GetGetPendingReasonResult_().GetReasons() } for reason := range result { @@ -999,14 +1010,14 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr return r.adminClient.DrainHosts(drainList) }) - if resp != nil && resp.GetResult_() != nil { - result = resp.GetResult_().GetDrainHostsResult_() - } - if retryErr != nil { return resp, result, errors.Wrap(retryErr, "Unable to recover connection") } + if resp.GetResult_() != nil { + result = resp.GetResult_().GetDrainHostsResult_() + } + return resp, result, nil } @@ -1030,14 +1041,14 @@ func (r *realisClient) StartMaintenance(hosts ...string) (*aurora.Response, *aur return r.adminClient.StartMaintenance(hostList) }) - if resp.GetResult_() != nil { - result = resp.GetResult_().GetStartMaintenanceResult_() - } - if retryErr != nil { return resp, result, errors.Wrap(retryErr, "Unable to recover connection") } + if resp.GetResult_() != nil { + result = resp.GetResult_().GetStartMaintenanceResult_() + } + return resp, result, nil } @@ -1061,14 +1072,14 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror return r.adminClient.EndMaintenance(hostList) }) - if resp.GetResult_() != nil { - result = resp.GetResult_().GetEndMaintenanceResult_() - } - if retryErr != nil { return resp, result, errors.Wrap(retryErr, "Unable to recover connection") } + if resp.GetResult_() != nil { + result = resp.GetResult_().GetEndMaintenanceResult_() + } + return resp, result, nil } @@ -1094,14 +1105,14 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au return r.adminClient.MaintenanceStatus(hostList) }) - if resp.GetResult_() != nil { - result = resp.GetResult_().GetMaintenanceStatusResult_() - } - if retryErr != nil { return resp, result, errors.Wrap(retryErr, "Unable to recover connection") } + if resp.GetResult_() != nil { + result = resp.GetResult_().GetMaintenanceStatusResult_() + } + return resp, result, nil } diff --git a/retry.go b/retry.go index 90cf55e..17b5c1a 100644 --- a/retry.go +++ b/retry.go @@ -15,9 +15,12 @@ package realis import ( + "io" "math/rand" + "net/url" "time" + "git.apache.org/thrift.git/lib/go/thrift" "github.com/paypal/gorealis/gen-go/apache/aurora" "github.com/paypal/gorealis/response" "github.com/pkg/errors" @@ -88,7 +91,6 @@ func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc) } if err != nil { - // If the error is temporary, continue retrying. if !IsTemporary(err) { return err @@ -96,9 +98,7 @@ func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc) // Print out the temporary error we experienced. logger.Println(err) } - } - } if curStep > 1 { @@ -158,6 +158,22 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro // Print out the error to the user r.logger.Printf("Client Error: %v\n", clientErr) + // Determine if error is a temporary URL error by going up the stack + e, ok := clientErr.(thrift.TTransportException) + if ok { + r.logger.DebugPrint("Encountered a transport exception") + + e, ok := e.Err().(*url.Error) + if ok { + // EOF error occurs when the server closes the read buffer of the client. This is common + // when the server is overloaded and should be retried. All other errors that are permanent + // will not be retried. + if e.Err != io.EOF && !e.Temporary() { + return nil, errors.Wrap(clientErr, "Permanent connection error") + } + } + } + // In the future, reestablish connection should be able to check if it is actually possible // to make a thrift call to Aurora. For now, a reconnect should always lead to a retry. r.ReestablishConn()