diff --git a/go.mod b/go.mod index 506556b..9497185 100644 --- a/go.mod +++ b/go.mod @@ -8,5 +8,5 @@ require ( github.com/pkg/errors v0.9.1 github.com/pmezard/go-difflib v1.0.0 // indirect github.com/samuel/go-zookeeper v0.0.0-20171117190445-471cd4e61d7a - github.com/stretchr/testify v1.2.0 + github.com/stretchr/testify v1.7.0 ) diff --git a/helpers.go b/helpers.go new file mode 100644 index 0000000..56821c1 --- /dev/null +++ b/helpers.go @@ -0,0 +1,21 @@ +package realis + +import ( + "context" + + "github.com/paypal/gorealis/gen-go/apache/aurora" +) + +func (r *realisClient) jobExists(key aurora.JobKey) (bool, error) { + resp, err := r.client.GetConfigSummary(context.TODO(), &key) + if err != nil { + return false, err + } + + return resp == nil || + resp.GetResult_() == nil || + resp.GetResult_().GetConfigSummaryResult_() == nil || + resp.GetResult_().GetConfigSummaryResult_().GetSummary() == nil || + resp.GetResponseCode() != aurora.ResponseCode_OK, + nil +} diff --git a/realis.go b/realis.go index 262f22d..772e9ab 100644 --- a/realis.go +++ b/realis.go @@ -65,7 +65,6 @@ type Realis interface { RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) - PauseJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error) ResumeJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error) PulseJobUpdate(key *aurora.JobUpdateKey) (*aurora.Response, error) @@ -556,7 +555,9 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states []aurora.Schedu false, func() (*aurora.Response, error) { return r.client.GetTasksWithoutConfigs(context.TODO(), taskQ) - }) + }, + nil, + ) // If we encountered an error we couldn't recover from by retrying, return an error to the user if retryErr != nil { @@ -581,10 +582,16 @@ func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue false, func() (*aurora.Response, error) { return r.readonlyClient.GetJobUpdateSummaries(context.TODO(), jobUpdateQuery) - }) + }, + nil, + ) if retryErr != nil { - return nil, errors.Wrap(retryErr, "error getting job update summaries from Aurora Scheduler") + return resp, errors.Wrap(retryErr, "error getting job update summaries from Aurora Scheduler") + } + + if resp.GetResult_() == nil || resp.GetResult_().GetGetJobUpdateSummariesResult_() == nil { + return nil, errors.New("unexpected response from scheduler") } return resp, nil @@ -598,7 +605,9 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe false, func() (*aurora.Response, error) { return r.readonlyClient.GetJobs(context.TODO(), role) - }) + }, + nil, + ) if retryErr != nil { return nil, result, errors.Wrap(retryErr, "error getting Jobs from Aurora Scheduler") @@ -619,7 +628,9 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a false, func() (*aurora.Response, error) { return r.client.KillTasks(context.TODO(), key, instances, "") - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler") @@ -641,7 +652,9 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { func() (*aurora.Response, error) { // Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards return r.client.KillTasks(context.TODO(), key, nil, "") - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler") @@ -657,15 +670,32 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { r.logger.debugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig()) + // Response is checked by the thrift retry code resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { return r.client.CreateJob(context.TODO(), auroraJob.JobConfig()) - }) + }, + // On a client timeout, attempt to verify that payload made to the Scheduler by + // trying to get the config summary for the job key + func() (*aurora.Response, bool) { + exists, err := r.jobExists(*auroraJob.JobKey()) + if err != nil { + r.logger.Print("verification failed ", err) + } + + if exists { + return &aurora.Response{ResponseCode: aurora.ResponseCode_OK}, true + } + + return nil, false + }, + ) if retryErr != nil { return resp, errors.Wrap(retryErr, "error sending Create command to Aurora Scheduler") } + return resp, nil } @@ -680,17 +710,12 @@ func (r *realisClient) CreateService( resp, err := r.StartJobUpdate(update, "") if err != nil { if IsTimeout(err) { - return resp, nil, err + return nil, nil, err } - return resp, nil, errors.Wrap(err, "unable to create service") } - if resp.GetResult_() != nil { - return resp, resp.GetResult_().GetStartJobUpdateResult_(), nil - } - - return resp, nil, errors.New("results object is nil") + return resp, resp.GetResult_().StartJobUpdateResult_, nil } func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) { @@ -700,7 +725,9 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) false, func() (*aurora.Response, error) { return r.client.ScheduleCronJob(context.TODO(), auroraJob.JobConfig()) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending Cron Job Schedule message to Aurora Scheduler") @@ -716,7 +743,9 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, false, func() (*aurora.Response, error) { return r.client.DescheduleCronJob(context.TODO(), key) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending Cron Job De-schedule message to Aurora Scheduler") @@ -734,7 +763,9 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error false, func() (*aurora.Response, error) { return r.client.StartCronJob(context.TODO(), key) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending Start Cron Job message to Aurora Scheduler") @@ -751,7 +782,9 @@ func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) false, func() (*aurora.Response, error) { return r.client.RestartShards(context.TODO(), key, instances) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler") @@ -774,7 +807,9 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) false, func() (*aurora.Response, error) { return r.client.RestartShards(context.TODO(), key, instanceIds) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler") @@ -795,16 +830,51 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au true, func() (*aurora.Response, error) { return r.client.StartJobUpdate(context.TODO(), updateJob.req, message) - }) + }, + func() (*aurora.Response, bool) { + summariesResp, err := r.readonlyClient.GetJobUpdateSummaries( + context.TODO(), + &aurora.JobUpdateQuery{ + JobKey: updateJob.JobKey(), + UpdateStatuses: aurora.ACTIVE_JOB_UPDATE_STATES, + Limit: 1, + }) + + if err != nil { + r.logger.Print("verification failed ", err) + return nil, false + } + + summaries := response.JobUpdateSummaries(summariesResp) + if len(summaries) == 0 { + return nil, false + } + + return &aurora.Response{ + ResponseCode: aurora.ResponseCode_OK, + Result_: &aurora.Result_{ + StartJobUpdateResult_: &aurora.StartJobUpdateResult_{ + UpdateSummary: summaries[0], + Key: summaries[0].Key, + }, + }, + }, true + }, + ) if retryErr != nil { // A timeout took place when attempting this call, attempt to recover if IsTimeout(retryErr) { - return resp, retryErr + return nil, retryErr } return resp, errors.Wrap(retryErr, "error sending StartJobUpdate command to Aurora Scheduler") } + + if resp.GetResult_() == nil { + return resp, errors.New("no result in response") + } + return resp, nil } @@ -820,7 +890,9 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str false, func() (*aurora.Response, error) { return r.client.AbortJobUpdate(context.TODO(), &updateKey, message) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending AbortJobUpdate command to Aurora Scheduler") @@ -847,7 +919,9 @@ func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st false, func() (*aurora.Response, error) { return r.client.PauseJobUpdate(context.TODO(), updateKey, message) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending PauseJobUpdate command to Aurora Scheduler") @@ -865,7 +939,9 @@ func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s false, func() (*aurora.Response, error) { return r.client.ResumeJobUpdate(context.TODO(), updateKey, message) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending ResumeJobUpdate command to Aurora Scheduler") @@ -883,7 +959,9 @@ func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R false, func() (*aurora.Response, error) { return r.client.PulseJobUpdate(context.TODO(), updateKey) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending PulseJobUpdate command to Aurora Scheduler") @@ -901,7 +979,9 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a false, func() (*aurora.Response, error) { return r.client.AddInstances(context.TODO(), &instKey, count) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending AddInstances command to Aurora Scheduler") @@ -940,7 +1020,9 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.Schedul false, func() (*aurora.Response, error) { return r.client.GetTasksStatus(context.TODO(), query) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status") @@ -958,7 +1040,9 @@ func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.Pend false, func() (*aurora.Response, error) { return r.client.GetPendingReason(context.TODO(), query) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for pending Reasons") @@ -983,7 +1067,9 @@ func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*auror false, func() (*aurora.Response, error) { return r.client.GetTasksWithoutConfigs(context.TODO(), query) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status without configs") @@ -1009,7 +1095,9 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task false, func() (*aurora.Response, error) { return r.client.GetTasksStatus(context.TODO(), taskQ) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task configuration") @@ -1037,7 +1125,9 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur false, func() (*aurora.Response, error) { return r.client.GetJobUpdateDetails(context.TODO(), &updateQuery) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "unable to get job update details") @@ -1054,7 +1144,9 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string false, func() (*aurora.Response, error) { return r.client.RollbackJobUpdate(context.TODO(), &key, message) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "unable to roll back job update") diff --git a/realis_admin.go b/realis_admin.go index 184ae55..cec92af 100644 --- a/realis_admin.go +++ b/realis_admin.go @@ -30,7 +30,9 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr false, func() (*aurora.Response, error) { return r.adminClient.DrainHosts(context.TODO(), drainList) - }) + }, + nil, + ) if retryErr != nil { return resp, result, errors.Wrap(retryErr, "Unable to recover connection") @@ -65,7 +67,9 @@ func (r *realisClient) SLADrainHosts( false, func() (*aurora.Response, error) { return r.adminClient.SlaDrainHosts(context.TODO(), drainList, policy, timeout) - }) + }, + nil, + ) if retryErr != nil { return result, errors.Wrap(retryErr, "Unable to recover connection") @@ -95,7 +99,9 @@ func (r *realisClient) StartMaintenance(hosts ...string) (*aurora.Response, *aur false, func() (*aurora.Response, error) { return r.adminClient.StartMaintenance(context.TODO(), hostList) - }) + }, + nil, + ) if retryErr != nil { return resp, result, errors.Wrap(retryErr, "Unable to recover connection") @@ -125,7 +131,9 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror false, func() (*aurora.Response, error) { return r.adminClient.EndMaintenance(context.TODO(), hostList) - }) + }, + nil, + ) if retryErr != nil { return resp, result, errors.Wrap(retryErr, "Unable to recover connection") @@ -157,7 +165,9 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au false, func() (*aurora.Response, error) { return r.adminClient.MaintenanceStatus(context.TODO(), hostList) - }) + }, + nil, + ) if retryErr != nil { return resp, result, errors.Wrap(retryErr, "Unable to recover connection") @@ -182,7 +192,9 @@ func (r *realisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb false, func() (*aurora.Response, error) { return r.adminClient.SetQuota(context.TODO(), role, quota) - }) + }, + nil, + ) if retryErr != nil { return resp, errors.Wrap(retryErr, "Unable to set role quota") @@ -198,7 +210,9 @@ func (r *realisClient) GetQuota(role string) (*aurora.Response, error) { false, func() (*aurora.Response, error) { return r.adminClient.GetQuota(context.TODO(), role) - }) + }, + nil, + ) if retryErr != nil { return resp, errors.Wrap(retryErr, "Unable to get role quota") @@ -213,7 +227,9 @@ func (r *realisClient) Snapshot() error { false, func() (*aurora.Response, error) { return r.adminClient.Snapshot(context.TODO()) - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "Unable to recover connection") @@ -229,7 +245,9 @@ func (r *realisClient) PerformBackup() error { false, func() (*aurora.Response, error) { return r.adminClient.PerformBackup(context.TODO()) - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "Unable to recover connection") @@ -244,7 +262,9 @@ func (r *realisClient) ForceImplicitTaskReconciliation() error { false, func() (*aurora.Response, error) { return r.adminClient.TriggerImplicitTaskReconciliation(context.TODO()) - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "Unable to recover connection") @@ -265,7 +285,9 @@ func (r *realisClient) ForceExplicitTaskReconciliation(batchSize *int32) error { _, retryErr := r.thriftCallWithRetries(false, func() (*aurora.Response, error) { return r.adminClient.TriggerExplicitTaskReconciliation(context.TODO(), settings) - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "Unable to recover connection") diff --git a/response/response.go b/response/response.go index b77348d..4a67ca0 100644 --- a/response/response.go +++ b/response/response.go @@ -36,6 +36,10 @@ func ScheduleStatusResult(resp *aurora.Response) *aurora.ScheduleStatusResult_ { } func JobUpdateSummaries(resp *aurora.Response) []*aurora.JobUpdateSummary { + if resp.GetResult_() == nil || resp.GetResult_().GetGetJobUpdateSummariesResult_() == nil { + return nil + } + return resp.GetResult_().GetGetJobUpdateSummariesResult_().GetUpdateSummaries() } diff --git a/retry.go b/retry.go index 0491194..eefcf2a 100644 --- a/retry.go +++ b/retry.go @@ -114,10 +114,19 @@ func ExponentialBackoff(backoff Backoff, logger logger, condition ConditionFunc) type auroraThriftCall func() (resp *aurora.Response, err error) +// verifyOntimeout defines the type of function that will be used to verify whether a Thirft call to the Scheduler +// made it to the scheduler or not. In general, these types of functions will have to interact with the scheduler +// through the very same Thrift API which previously encountered a time out from the client. +// This means that the functions themselves should be kept to a minimum number of Thrift calls. +// It should also be noted that this is a best effort mechanism and +// is likely to fail for the same reasons that the original call failed. +type verifyOnTimeout func() (*aurora.Response, bool) + // Duplicates the functionality of ExponentialBackoff but is specifically targeted towards ThriftCalls. func (r *realisClient) thriftCallWithRetries( returnOnTimeout bool, - thriftCall auroraThriftCall) (*aurora.Response, error) { + thriftCall auroraThriftCall, + verifyOnTimeout verifyOnTimeout) (*aurora.Response, error) { var resp *aurora.Response var clientErr error @@ -157,42 +166,22 @@ func (r *realisClient) thriftCallWithRetries( r.logger.tracePrintf("Aurora Thrift Call ended resp: %v clientErr: %v", resp, clientErr) }() - // Check if our thrift call is returning an error. This is a retryable event as we don't know - // if it was caused by network issues. + // Check if our thrift call is returning an error. if clientErr != nil { - // Print out the error to the user r.logger.Printf("Client Error: %v", clientErr) - // Determine if error is a temporary URL error by going up the stack - e, ok := clientErr.(thrift.TTransportException) - if ok { - r.logger.debugPrint("Encountered a transport exception") + temporary, timedout := isConnectionError(clientErr) + if !temporary && r.RealisConfig().failOnPermanentErrors { + return nil, errors.Wrap(clientErr, "permanent connection error") + } - e, ok := e.Err().(*url.Error) - if ok { - - // EOF error occurs when the server closes the read buffer of the client. This is common - // when the server is overloaded and should be retried. All other errors that are permanent - // will not be retried. - if e.Err != io.EOF && !e.Temporary() && r.RealisConfig().failOnPermanentErrors { - return nil, errors.Wrap(clientErr, "permanent connection error") - } - - // Corner case where thrift payload was received by Aurora but connection timed out before Aurora was - // able to reply. In this case we will return whatever response was received and a TimedOut behaving - // error. Users can take special action on a timeout by using IsTimedout and reacting accordingly. - if e.Timeout() { - timeouts++ - r.logger.debugPrintf( - "Client closed connection (timedout) %d times before server responded, "+ - "consider increasing connection timeout", - timeouts) - if returnOnTimeout { - return resp, newTimedoutError(errors.New("client connection closed before server answer")) - } - } - } + // There exists a corner case where thrift payload was received by Aurora but + // connection timed out before Aurora was able to reply. + // Users can take special action on a timeout by using IsTimedout and reacting accordingly + // if they have configured the client to return on a timeout. + if timedout && returnOnTimeout { + return resp, newTimedoutError(errors.New("client connection closed before server answer")) } // In the future, reestablish connection should be able to check if it is actually possible @@ -202,48 +191,71 @@ func (r *realisClient) thriftCallWithRetries( if reestablishErr != nil { r.logger.debugPrintf("error re-establishing connection ", reestablishErr) } - } else { - // If there was no client error, but the response is nil, something went wrong. - // Ideally, we'll never encounter this but we're placing a safeguard here. - if resp == nil { - return nil, errors.New("response from aurora is nil") + // If users did not opt for a return on timeout in order to react to a timedout error, + // attempt to verify that the call made it to the scheduler after the connection was re-established. + if timedout { + timeouts++ + r.logger.debugPrintf( + "Client closed connection %d times before server responded, "+ + "consider increasing connection timeout", + timeouts) + + // Allow caller to provide a function which checks if the original call was successful before + // it timed out. + if verifyOnTimeout != nil { + if verifyResp, ok := verifyOnTimeout(); ok { + r.logger.Print("verified that the call went through successfully after a client timeout") + // Response here might be different than the original as it is no longer constructed + // by the scheduler but mimicked. + // This is OK since the scheduler is very unlikely to change responses at this point in its + // development cycle but we must be careful to not return an incorrectly constructed response. + return verifyResp, nil + } + } } - // Check Response Code from thrift and make a decision to continue retrying or not. - switch responseCode := resp.GetResponseCode(); responseCode { + // Retry the thrift payload + continue + } - // If the thrift call succeeded, stop retrying - case aurora.ResponseCode_OK: - return resp, nil + // If there was no client error, but the response is nil, something went wrong. + // Ideally, we'll never encounter this but we're placing a safeguard here. + if resp == nil { + return nil, errors.New("response from aurora is nil") + } - // If the response code is transient, continue retrying - case aurora.ResponseCode_ERROR_TRANSIENT: - r.logger.Println("Aurora replied with Transient error code, retrying") - continue + // Check Response Code from thrift and make a decision to continue retrying or not. + switch responseCode := resp.GetResponseCode(); responseCode { - // Failure scenarios, these indicate a bad payload or a bad config. Stop retrying. - case aurora.ResponseCode_INVALID_REQUEST, - aurora.ResponseCode_ERROR, - aurora.ResponseCode_AUTH_FAILED, - aurora.ResponseCode_JOB_UPDATING_ERROR: - r.logger.Printf("Terminal Response Code %v from Aurora, won't retry\n", resp.GetResponseCode().String()) - return resp, errors.New(response.CombineMessage(resp)) + // If the thrift call succeeded, stop retrying + case aurora.ResponseCode_OK: + return resp, nil - // The only case that should fall down to here is a WARNING response code. - // It is currently not used as a response in the scheduler so it is unknown how to handle it. - default: - r.logger.debugPrintf("unhandled response code %v received from Aurora\n", responseCode) - return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String()) - } + // If the response code is transient, continue retrying + case aurora.ResponseCode_ERROR_TRANSIENT: + r.logger.Println("Aurora replied with Transient error code, retrying") + continue + + // Failure scenarios, these indicate a bad payload or a bad config. Stop retrying. + case aurora.ResponseCode_INVALID_REQUEST, + aurora.ResponseCode_ERROR, + aurora.ResponseCode_AUTH_FAILED, + aurora.ResponseCode_JOB_UPDATING_ERROR: + r.logger.Printf("Terminal Response Code %v from Aurora, won't retry\n", resp.GetResponseCode().String()) + return resp, errors.New(response.CombineMessage(resp)) + + // The only case that should fall down to here is a WARNING response code. + // It is currently not used as a response in the scheduler so it is unknown how to handle it. + default: + r.logger.debugPrintf("unhandled response code %v received from Aurora\n", responseCode) + return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String()) } } - r.logger.debugPrintf("it took %v retries to complete this operation\n", curStep) - if curStep > 1 { - r.config.logger.Printf("retried this thrift call %d time(s)", curStep) + r.config.logger.Printf("this thrift call was retried %d time(s)", curStep) } // Provide more information to the user wherever possible. @@ -253,3 +265,30 @@ func (r *realisClient) thriftCallWithRetries( return nil, newRetryError(errors.New("ran out of retries"), curStep) } + +// isConnectionError processes the error received by the client. +// The return values indicate weather this was determined to be a temporary error +// and weather it was determined to be a timeout error +func isConnectionError(err error) (bool, bool) { + + // Determine if error is a temporary URL error by going up the stack + transportException, ok := err.(thrift.TTransportException) + if !ok { + return false, false + } + + urlError, ok := transportException.Err().(*url.Error) + if !ok { + return false, false + } + + // EOF error occurs when the server closes the read buffer of the client. This is common + // when the server is overloaded and we consider it temporary. + // All other which are not temporary as per the member function Temporary(), + // are considered not temporary (permanent). + if urlError.Err != io.EOF && !urlError.Temporary() { + return false, false + } + + return true, urlError.Timeout() +} diff --git a/util.go b/util.go index 19930e2..4307d1c 100644 --- a/util.go +++ b/util.go @@ -29,7 +29,7 @@ var TerminalStates = make(map[aurora.ScheduleStatus]bool) // ActiveJobUpdateStates - States a Job Update may be in where it is considered active. var ActiveJobUpdateStates = make(map[aurora.JobUpdateStatus]bool) -// TerminalJobUpdateStates returns a slice containing all the terminal states an update may end up in. +// TerminalUpdateStates returns a slice containing all the terminal states an update may be in. // This is a function in order to avoid having a slice that can be accidentally mutated. func TerminalUpdateStates() []aurora.JobUpdateStatus { return []aurora.JobUpdateStatus{