* Overhauled error handling. Backoff now recognizes temporary errors and continues to retry if it finds one.

* Changed thrift function call wrapper to be more explicitly named and to perform more safety checks.

* Moved Jitter function from realis to retry.

* API code is now more uniform and follows a certain template.

* Lock added whenever a thrift call is made or when a modification is done to the connection. Note that calling ReestablishConn externally may result in some race conditions. We will move to make this function private in the near future.
This commit is contained in:
Renan DelValle 2018-01-17 15:40:10 -08:00
parent 145de5898e
commit c814ba26e9
No known key found for this signature in database
GPG key ID: C240AD6D6F443EC9
4 changed files with 192 additions and 228 deletions

376
realis.go
View file

@ -21,12 +21,13 @@ import (
"encoding/base64"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"net/http/cookiejar"
"path/filepath"
"time"
"sync"
"git.apache.org/thrift.git/lib/go/thrift"
"github.com/paypal/gorealis/gen-go/apache/aurora"
"github.com/paypal/gorealis/response"
@ -56,6 +57,7 @@ type Realis interface {
ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error)
StartCronJob(key *aurora.JobKey) (*aurora.Response, error)
// TODO: Remove this method and make it private to avoid race conditions
ReestablishConn() error
RealisConfig() *RealisConfig
Close()
@ -72,6 +74,7 @@ type realisClient struct {
readonlyClient *aurora.ReadOnlySchedulerClient
adminClient *aurora.AuroraAdminClient
logger Logger
lock sync.Mutex
}
type RealisConfig struct {
@ -285,19 +288,6 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
}
// Jitter returns a time.Duration between duration and duration + maxFactor *
// duration.
//
// This allows clients to avoid converging on periodic behavior. If maxFactor
// is 0.0, a suggested default value will be chosen.
func Jitter(duration time.Duration, maxFactor float64) time.Duration {
if maxFactor <= 0.0 {
maxFactor = 1.0
}
wait := duration + time.Duration(rand.Float64()*maxFactor*float64(duration))
return wait
}
func GetDefaultClusterFromZKUrl(zkurl string) *Cluster {
return &Cluster{
Name: "defaultCluster",
@ -428,15 +418,51 @@ func basicAuth(username, password string) string {
return base64.StdEncoding.EncodeToString([]byte(auth))
}
type auroraThriftCall func() (resp *aurora.Response, err error)
// Takes a Thrift API function call and returns response and error.
// If Error from the API call is retryable, the functions re-establishes the connection with Aurora by
// using the same configuration used by the original client. Locks usage of and changes to client connection in order
// to make realis sessions thread safe.
func (r *realisClient) thriftCallHelper(auroraCall auroraThriftCall) (*aurora.Response, error) {
// Only allow one go-routine make use or modify the thrift client connection
r.lock.Lock()
resp, cliErr := auroraCall()
if cliErr != nil {
// Re-establish conn returns a temporary error or nil
// as we can always retry to connect to the scheduler.
retryConnErr := r.ReestablishConn()
return resp, retryConnErr
}
r.lock.Unlock()
if resp == nil {
return resp, errors.New("Response is nil")
}
if resp.GetResponseCode() == aurora.ResponseCode_ERROR_TRANSIENT {
return resp, NewTemporaryError(errors.New("Aurora scheduler temporarily unavailable"))
}
if resp.GetResponseCode() != aurora.ResponseCode_OK {
return resp, errors.New(response.CombineMessage(resp))
}
return resp, nil
}
func (r *realisClient) ReestablishConn() error {
// Close existing connection
r.logger.Println("ReestablishConn begin ....")
r.logger.Println("Re-establishing Connection to Aurora")
r.Close()
// Recreate connection from scratch using original options
newRealis, err := NewRealisClient(r.config.options...)
if err != nil {
return err
// This could be a temporary network hiccup
return NewTemporaryError(err)
}
// If we are able to successfully re-connect, make receiver
@ -471,25 +497,25 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche
var resp *aurora.Response
var clientErr error
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.client.GetTasksWithoutConfigs(taskQ)
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
// Pass error directly to backoff which makes exceptions for temporary errors
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
// If we encountered an error we couldn't recover from by retrying, return an error to the user
if retryErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error querying Aurora Scheduler for active IDs")
}
resp, clientErr = response.ResponseCodeCheck(resp)
if clientErr != nil {
return nil, clientErr
}
// Construct instance id map to stay in line with thrift's representation of sets
tasks := response.ScheduleStatusResult(resp).GetTasks()
jobInstanceIds := make(map[int32]bool)
for _, task := range tasks {
@ -504,23 +530,22 @@ func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue
var clientErr error
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.readonlyClient.GetJobUpdateSummaries(jobUpdateQuery)
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error getting job update summaries from Aurora Scheduler")
if retryErr != nil {
return resp, errors.Wrap(clientErr, retryErr.Error()+": Error getting job update summaries from Aurora Scheduler")
}
return response.ResponseCodeCheck(resp)
return resp, nil
}
// Kill specific instances of a job.
@ -535,22 +560,21 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a
}
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.client.KillTasks(key, instanceIds, "")
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Kill command to Aurora Scheduler")
if retryErr != nil {
return resp, errors.Wrap(clientErr, retryErr.Error()+": Error sending Kill command to Aurora Scheduler")
}
return response.ResponseCodeCheck(resp)
return resp, nil
}
func (r *realisClient) RealisConfig() *RealisConfig {
@ -570,21 +594,21 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
if len(instanceIds) > 0 {
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.client.KillTasks(key, instanceIds, "")
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
return nil, errors.Wrap(err, retryErr.Error()+"Error sending Kill command to Aurora Scheduler")
if retryErr != nil {
return resp, errors.Wrap(err, retryErr.Error()+": Error sending Kill command to Aurora Scheduler")
}
return response.ResponseCodeCheck(resp)
return resp, nil
}
return nil, errors.New("No tasks in the Active state")
}
@ -598,23 +622,21 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
var clientErr error
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.client.CreateJob(auroraJob.JobConfig())
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending Create command to Aurora Scheduler")
if retryErr != nil {
return resp, errors.Wrap(clientErr, retryErr.Error()+": Error sending Create command to Aurora Scheduler")
}
return response.ResponseCodeCheck(resp)
return resp, nil
}
// This API uses an update thrift call to create the services giving a few more robust features.
@ -641,22 +663,21 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
var clientErr error
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.client.ScheduleCronJob(auroraJob.JobConfig())
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending Cron Job Schedule message to Aurora Scheduler")
if retryErr != nil {
return resp, errors.Wrap(clientErr, retryErr.Error()+": Error sending Cron Job Schedule message to Aurora Scheduler")
}
return response.ResponseCodeCheck(resp)
return resp, nil
}
func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) {
@ -665,22 +686,21 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response,
var clientErr error
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.client.DescheduleCronJob(key)
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending Cron Job De-schedule message to Aurora Scheduler")
if retryErr != nil {
return resp, errors.Wrap(clientErr, retryErr.Error()+": Error sending Cron Job De-schedule message to Aurora Scheduler")
}
return response.ResponseCodeCheck(resp)
return resp, nil
}
@ -689,22 +709,21 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error
var clientErr error
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.client.StartCronJob(key)
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending Start Cron Job message to Aurora Scheduler")
if retryErr != nil {
return resp, errors.Wrap(clientErr, retryErr.Error()+": Error sending Start Cron Job message to Aurora Scheduler")
}
return response.ResponseCodeCheck(resp)
return resp, nil
}
@ -719,22 +738,21 @@ func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32)
var clientErr error
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.client.RestartShards(key, instanceIds)
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending Restart command to Aurora Scheduler")
if retryErr != nil {
return resp, errors.Wrap(clientErr, retryErr.Error()+": Error sending Restart command to Aurora Scheduler")
}
return response.ResponseCodeCheck(resp)
return resp, nil
}
// Restarts all active tasks under a job configuration.
@ -748,22 +766,22 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
var clientErr error
if len(instanceIds) > 0 {
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.client.RestartShards(key, instanceIds)
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending Restart command to Aurora Scheduler")
}
return response.ResponseCodeCheck(resp)
if retryErr != nil {
return resp, errors.Wrap(clientErr, retryErr.Error()+": Error sending Restart command to Aurora Scheduler")
}
return resp, nil
} else {
return nil, errors.New("No tasks in the Active state")
}
@ -776,22 +794,21 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au
var clientErr error
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.client.StartJobUpdate(updateJob.req, message)
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending StartJobUpdate command to Aurora Scheduler")
if retryErr != nil {
return resp, errors.Wrap(clientErr, retryErr.Error()+": Error sending StartJobUpdate command to Aurora Scheduler")
}
return response.ResponseCodeCheck(resp)
return resp, nil
}
// Abort Job Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI.
@ -802,22 +819,21 @@ func (r *realisClient) AbortJobUpdate(
var clientErr error
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.client.AbortJobUpdate(&updateKey, message)
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending AbortJobUpdate command to Aurora Scheduler")
if retryErr != nil {
return resp, errors.Wrap(clientErr, retryErr.Error()+": Error sending AbortJobUpdate command to Aurora Scheduler")
}
return response.ResponseCodeCheck(resp)
return resp, nil
}
// Scale up the number of instances under a job configuration using the configuration for specific
@ -828,22 +844,21 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a
var clientErr error
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.client.AddInstances(&instKey, count)
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending AddInstances command to Aurora Scheduler")
if retryErr != nil {
return resp, errors.Wrap(clientErr, retryErr.Error()+": Error sending AddInstances command to Aurora Scheduler")
}
return response.ResponseCodeCheck(resp)
return resp, nil
}
@ -876,24 +891,19 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.S
var clientErr error
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.client.GetTasksStatus(query)
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+"Error querying Aurora Scheduler for task status")
}
//Check for response code..
if resp.GetResponseCode() != aurora.ResponseCode_OK {
return nil, errors.New(resp.ResponseCode.String() + "--" + response.CombineMessage(resp))
if retryErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error querying Aurora Scheduler for task status")
}
return response.ScheduleStatusResult(resp).GetTasks(), nil
@ -905,24 +915,19 @@ func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks []
var clientErr error
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.client.GetTasksWithoutConfigs(query)
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+"Error querying Aurora Scheduler for task status without configs")
}
//Check for response code..
if resp.GetResponseCode() != aurora.ResponseCode_OK {
return nil, errors.New(resp.ResponseCode.String() + "--" + response.CombineMessage(resp))
if retryErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error querying Aurora Scheduler for task status without configs")
}
return response.ScheduleStatusResult(resp).GetTasks(), nil
@ -947,24 +952,19 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
var clientErr error
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.client.GetTasksStatus(taskQ)
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+"Error querying Aurora Scheduler for task configuration")
}
//Check for response code..
if resp.GetResponseCode() != aurora.ResponseCode_OK {
return nil, errors.New(resp.ResponseCode.String() + "--" + response.CombineMessage(resp))
if retryErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error querying Aurora Scheduler for task configuration")
}
tasks := response.ScheduleStatusResult(resp).GetTasks()
@ -987,22 +987,21 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur
var clientErr error
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.client.GetJobUpdateDetails(&updateQuery)
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+"Unable to get job update details")
if retryErr != nil {
return resp, errors.Wrap(clientErr, retryErr.Error()+": Unable to get job update details")
}
return response.ResponseCodeCheck(resp)
return resp, nil
}
@ -1011,22 +1010,21 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string
var clientErr error
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.client.RollbackJobUpdate(&key, message)
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+"Unable to roll back job update")
if retryErr != nil {
return resp, errors.Wrap(clientErr, retryErr.Error()+": Unable to roll back job update")
}
return response.ResponseCodeCheck(resp)
return resp, nil
}
// Set a list of nodes to DRAINING. This means nothing will be able to be scheduled on them and any existing
@ -1049,26 +1047,25 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr
}
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.adminClient.DrainHosts(drainList)
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
return nil, nil, errors.Wrap(clientErr, retryErr.Error()+"Unable to recover connection")
}
if resp != nil && resp.GetResult_() != nil {
result = resp.GetResult_().GetDrainHostsResult_()
}
if retryErr != nil {
return resp, result, errors.Wrap(clientErr, retryErr.Error()+": Unable to recover connection")
}
return resp, result, nil
}
@ -1089,26 +1086,25 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror
}
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.adminClient.EndMaintenance(hostList)
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
return nil, nil, errors.Wrap(clientErr, retryErr.Error()+"Unable to recover connection")
}
if resp != nil && resp.GetResult_() != nil {
result = resp.GetResult_().GetEndMaintenanceResult_()
}
if retryErr != nil {
return resp, result, errors.Wrap(clientErr, retryErr.Error()+": Unable to recover connection")
}
return resp, result, nil
}
@ -1116,7 +1112,7 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au
var resp *aurora.Response
var result *aurora.MaintenanceStatusResult_
var returnErr, clientErr, payloadErr error
var clientErr error
if len(hosts) == 0 {
return nil, nil, errors.New("no hosts provided to get maintenance status from")
@ -1130,23 +1126,14 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au
retryErr := ExponentialBackoff(defaultBackoff, func() (bool, error) {
// Send thrift call, if we have a thrift send error, attempt to reconnect
// and continue trying to resend command
if resp, clientErr = r.adminClient.MaintenanceStatus(hostList); clientErr != nil {
// Experienced an connection error
err1 := r.ReestablishConn()
if err1 != nil {
r.logger.Println("error in re-establishing connection: ", err1)
}
return false, nil
}
// Make thrift call. If we encounter an error sending the call, attempt to reconnect
// and continue trying to resend command until we run out of retries.
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.adminClient.MaintenanceStatus(hostList)
})
// If error is NOT due to connection
if _, payloadErr = response.ResponseCodeCheck(resp); payloadErr != nil {
// TODO(rdelvalle): an leader election may cause the response to have
// failed when it should have succeeded. Retry everything for now until
// we figure out a more concrete fix.
return false, nil
if clientErr != nil {
return false, clientErr
}
// Successful call
@ -1158,20 +1145,9 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au
result = resp.GetResult_().GetMaintenanceStatusResult_()
}
// Prioritize returning a bad payload error over a client error as a bad payload error indicates
// a deeper issue
if payloadErr != nil {
returnErr = payloadErr
} else {
returnErr = clientErr
}
// Timed out on retries. *Note that when we fix the unexpected errors with a correct payload,
// this will can become either a timeout error or a payload error
if retryErr != nil {
return resp, result, errors.Wrap(returnErr, "Unable to recover connection")
return resp, result, errors.Wrap(clientErr, retryErr.Error()+": Unable to recover connection")
}
return resp, result, nil
}

View file

@ -73,7 +73,7 @@ func TestRealisClient_ReestablishConn(t *testing.T) {
}
func TestGetCacerts(t *testing.T) {
certs, err := realis.Getcerts("./examples/certs")
certs, err := realis.GetCerts("./examples/certs")
assert.NoError(t, err)
assert.Equal(t, len(certs.Subjects()), 2)

View file

@ -17,9 +17,9 @@ package response
import (
"bytes"
"errors"
"github.com/paypal/gorealis/gen-go/apache/aurora"
"github.com/pkg/errors"
)
// Get key from a response created by a StartJobUpdate call
@ -39,6 +39,7 @@ func JobUpdateSummaries(resp *aurora.Response) []*aurora.JobUpdateSummary {
return resp.GetResult_().GetGetJobUpdateSummariesResult_().GetUpdateSummaries()
}
// Deprecated: Replaced by checks done inside of thriftCallHelper
func ResponseCodeCheck(resp *aurora.Response) (*aurora.Response, error) {
if resp == nil {
return resp, errors.New("Response is nil")

View file

@ -20,22 +20,26 @@ import (
"errors"
"time"
"github.com/paypal/gorealis/gen-go/apache/aurora"
"math/rand"
)
const (
ConnRefusedErr = "connection refused"
NoLeaderFoundErr = "No leader found"
)
var RetryConnErr = errors.New("error occured during with aurora retrying")
// Jitter returns a time.Duration between duration and duration + maxFactor *
// duration.
//
// This allows clients to avoid converging on periodic behavior. If maxFactor
// is 0.0, a suggested default value will be chosen.
func Jitter(duration time.Duration, maxFactor float64) time.Duration {
if maxFactor <= 0.0 {
maxFactor = 1.0
}
wait := duration + time.Duration(rand.Float64()*maxFactor*float64(duration))
return wait
}
// ConditionFunc returns true if the condition is satisfied, or an error
// if the loop should be aborted.
type ConditionFunc func() (done bool, err error)
type AuroraThriftCall func() (resp *aurora.Response, err error)
// Modified version of the Kubernetes exponential-backoff code.
// ExponentialBackoff repeats a condition check with exponential backoff.
//
@ -76,20 +80,3 @@ func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error {
}
return NewTimeoutError(errors.New("Timed out while retrying"))
}
// CheckAndRetryConn function takes realis client and a trhift API function to call and returns response and error
// If Error from the APi call is Retry able . THe functions re establishes the connection with aurora by getting the latest aurora master from zookeeper.
// If Error is retyable return resp and RetryConnErr error.
func CheckAndRetryConn(r Realis, auroraCall AuroraThriftCall) (*aurora.Response, error) {
resp, cliErr := auroraCall()
// TODO: Return different error type based on the error that was returned by the API call
if cliErr != nil {
r.ReestablishConn()
return resp, NewPermamentError(RetryConnErr)
}
if resp != nil && resp.GetResponseCode() == aurora.ResponseCode_ERROR_TRANSIENT {
return resp, NewTemporaryError(errors.New("Aurora scheduler temporarily unavailable"))
}
return resp, nil
}