From 979fb1146461324d2dc25a20c0d259a124778f39 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Wed, 1 May 2019 09:17:44 -0700 Subject: [PATCH] Adding an argument to allow the retry mechanism to stop if a timeout has been encountered. This is useful for non-mutating API calls. Only StartUpdate and CreateService have enabled by default stop at timeout. --- realis.go | 203 ++++++++++++++++++++++++++++++------------------ realis_admin.go | 99 +++++++++++++---------- retry.go | 18 ++++- 3 files changed, 202 insertions(+), 118 deletions(-) diff --git a/realis.go b/realis.go index 5e2894c..21ce193 100644 --- a/realis.go +++ b/realis.go @@ -405,10 +405,8 @@ func GetCerts(certPath string) (*x509.CertPool, error) { func defaultTTransport(url string, timeoutMs int, config *RealisConfig) (thrift.TTransport, error) { var transport http.Transport if config != nil { - tlsConfig := &tls.Config{} - if config.InsecureSkipVerify { - tlsConfig.InsecureSkipVerify = true - } + tlsConfig := &tls.Config{InsecureSkipVerify: config.InsecureSkipVerify} + if config.certspath != "" { rootCAs, err := GetCerts(config.certspath) if err != nil { @@ -505,9 +503,11 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states []aurora.Schedu r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", taskQ) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.GetTasksWithoutConfigs(nil, taskQ) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.client.GetTasksWithoutConfigs(nil, taskQ) + }) // If we encountered an error we couldn't recover from by retrying, return an error to the user if retryErr != nil { @@ -528,9 +528,11 @@ func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue r.logger.DebugPrintf("GetJobUpdateSummaries Thrift Payload: %+v\n", jobUpdateQuery) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.readonlyClient.GetJobUpdateSummaries(nil, jobUpdateQuery) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.readonlyClient.GetJobUpdateSummaries(nil, jobUpdateQuery) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error getting job update summaries from Aurora Scheduler") @@ -543,9 +545,11 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe var result *aurora.GetJobsResult_ - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.readonlyClient.GetJobs(nil, role) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.readonlyClient.GetJobs(nil, role) + }) if retryErr != nil { return nil, result, errors.Wrap(retryErr, "error getting Jobs from Aurora Scheduler") @@ -562,9 +566,11 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) { r.logger.DebugPrintf("KillTasks Thrift Payload: %+v %v\n", key, instances) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.KillTasks(nil, key, instances, "") - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.client.KillTasks(nil, key, instances, "") + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler") @@ -581,10 +587,12 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { r.logger.DebugPrintf("KillTasks Thrift Payload: %+v\n", key) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - // Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards - return r.client.KillTasks(nil, key, nil, "") - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + // Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards + return r.client.KillTasks(nil, key, nil, "") + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler") @@ -600,9 +608,11 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { r.logger.DebugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig()) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.CreateJob(nil, auroraJob.JobConfig()) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.client.CreateJob(nil, auroraJob.JobConfig()) + }) if retryErr != nil { return resp, errors.Wrap(retryErr, "error sending Create command to Aurora Scheduler") @@ -618,6 +628,10 @@ func (r *realisClient) CreateService(auroraJob Job, settings *aurora.JobUpdateSe resp, err := r.StartJobUpdate(update, "") if err != nil { + if IsTimeout(err) { + return resp, nil, err + } + return resp, nil, errors.Wrap(err, "unable to create service") } @@ -631,9 +645,11 @@ func (r *realisClient) CreateService(auroraJob Job, settings *aurora.JobUpdateSe func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) { r.logger.DebugPrintf("ScheduleCronJob Thrift Payload: %+v\n", auroraJob.JobConfig()) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.ScheduleCronJob(nil, auroraJob.JobConfig()) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.client.ScheduleCronJob(nil, auroraJob.JobConfig()) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending Cron Job Schedule message to Aurora Scheduler") @@ -645,9 +661,11 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, r.logger.DebugPrintf("DescheduleCronJob Thrift Payload: %+v\n", key) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.DescheduleCronJob(nil, key) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.client.DescheduleCronJob(nil, key) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending Cron Job De-schedule message to Aurora Scheduler") @@ -661,9 +679,11 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error r.logger.DebugPrintf("StartCronJob Thrift Payload: %+v\n", key) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.StartCronJob(nil, key) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.client.StartCronJob(nil, key) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending Start Cron Job message to Aurora Scheduler") @@ -676,9 +696,11 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) { r.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instances) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.RestartShards(nil, key, instances) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.client.RestartShards(nil, key, instances) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler") @@ -697,9 +719,11 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) r.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instanceIds) if len(instanceIds) > 0 { - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.RestartShards(nil, key, instanceIds) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.client.RestartShards(nil, key, instanceIds) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler") @@ -716,12 +740,19 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au r.logger.DebugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.StartJobUpdate(nil, updateJob.req, message) - }) + resp, retryErr := r.thriftCallWithRetries( + true, + func() (*aurora.Response, error) { + return r.client.StartJobUpdate(nil, updateJob.req, message) + }) if retryErr != nil { - return resp, errors.Wrap(retryErr, "error sending StartJobUpdate command to Aurora Scheduler") + // A timeout took place when attempting this call, attempt to recover + if IsTimeout(retryErr) { + return resp, retryErr + } else { + return resp, errors.Wrap(retryErr, "error sending StartJobUpdate command to Aurora Scheduler") + } } return resp, nil } @@ -733,9 +764,11 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str r.logger.DebugPrintf("AbortJobUpdate Thrift Payload: %+v %v\n", updateKey, message) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.AbortJobUpdate(nil, &updateKey, message) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.client.AbortJobUpdate(nil, &updateKey, message) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending AbortJobUpdate command to Aurora Scheduler") @@ -753,9 +786,11 @@ func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st r.logger.DebugPrintf("PauseJobUpdate Thrift Payload: %+v %v\n", updateKey, message) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.PauseJobUpdate(nil, updateKey, message) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.client.PauseJobUpdate(nil, updateKey, message) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending PauseJobUpdate command to Aurora Scheduler") @@ -769,9 +804,11 @@ func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s r.logger.DebugPrintf("ResumeJobUpdate Thrift Payload: %+v %v\n", updateKey, message) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.ResumeJobUpdate(nil, updateKey, message) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.client.ResumeJobUpdate(nil, updateKey, message) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending ResumeJobUpdate command to Aurora Scheduler") @@ -785,9 +822,11 @@ func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R r.logger.DebugPrintf("PulseJobUpdate Thrift Payload: %+v\n", updateKey) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.PulseJobUpdate(nil, updateKey) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.client.PulseJobUpdate(nil, updateKey) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending PulseJobUpdate command to Aurora Scheduler") @@ -802,9 +841,11 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a r.logger.DebugPrintf("AddInstances Thrift Payload: %+v %v\n", instKey, count) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.AddInstances(nil, &instKey, count) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.client.AddInstances(nil, &instKey, count) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending AddInstances command to Aurora Scheduler") @@ -839,9 +880,11 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.Schedul r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", query) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.GetTasksStatus(nil, query) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.client.GetTasksStatus(nil, query) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status") @@ -855,9 +898,11 @@ func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.Pend r.logger.DebugPrintf("GetPendingReason Thrift Payload: %+v\n", query) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.GetPendingReason(nil, query) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.client.GetPendingReason(nil, query) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for pending Reasons") @@ -877,9 +922,11 @@ func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*auror r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.GetTasksWithoutConfigs(nil, query) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.client.GetTasksWithoutConfigs(nil, query) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status without configs") @@ -901,9 +948,11 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", taskQ) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.GetTasksStatus(nil, taskQ) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.client.GetTasksStatus(nil, taskQ) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task configuration") @@ -927,9 +976,11 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur r.logger.DebugPrintf("GetJobUpdateDetails Thrift Payload: %+v\n", updateQuery) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.GetJobUpdateDetails(nil, &updateQuery) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.client.GetJobUpdateDetails(nil, &updateQuery) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "Unable to get job update details") @@ -942,9 +993,11 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string r.logger.DebugPrintf("RollbackJobUpdate Thrift Payload: %+v %v\n", key, message) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.RollbackJobUpdate(nil, &key, message) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.client.RollbackJobUpdate(nil, &key, message) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "unable to roll back job update") diff --git a/realis_admin.go b/realis_admin.go index 464f7a6..8ce3ea3 100644 --- a/realis_admin.go +++ b/realis_admin.go @@ -24,9 +24,11 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr r.logger.DebugPrintf("DrainHosts Thrift Payload: %v\n", drainList) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.DrainHosts(nil, drainList) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.adminClient.DrainHosts(nil, drainList) + }) if retryErr != nil { return resp, result, errors.Wrap(retryErr, "Unable to recover connection") @@ -54,9 +56,11 @@ func (r *realisClient) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, ho r.logger.DebugPrintf("SLADrainHosts Thrift Payload: %v\n", drainList) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.SlaDrainHosts(nil, drainList, policy, timeout) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.adminClient.SlaDrainHosts(nil, drainList, policy, timeout) + }) if retryErr != nil { return result, errors.Wrap(retryErr, "Unable to recover connection") @@ -82,9 +86,11 @@ func (r *realisClient) StartMaintenance(hosts ...string) (*aurora.Response, *aur r.logger.DebugPrintf("StartMaintenance Thrift Payload: %v\n", hostList) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.StartMaintenance(nil, hostList) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.adminClient.StartMaintenance(nil, hostList) + }) if retryErr != nil { return resp, result, errors.Wrap(retryErr, "Unable to recover connection") @@ -110,9 +116,11 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror r.logger.DebugPrintf("EndMaintenance Thrift Payload: %v\n", hostList) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.EndMaintenance(nil, hostList) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.adminClient.EndMaintenance(nil, hostList) + }) if retryErr != nil { return resp, result, errors.Wrap(retryErr, "Unable to recover connection") @@ -140,9 +148,11 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au // Make thrift call. If we encounter an error sending the call, attempt to reconnect // and continue trying to resend command until we run out of retries. - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.MaintenanceStatus(nil, hostList) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.adminClient.MaintenanceStatus(nil, hostList) + }) if retryErr != nil { return resp, result, errors.Wrap(retryErr, "Unable to recover connection") @@ -158,17 +168,15 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au // SetQuota sets a quota aggregate for the given role // TODO(zircote) Currently investigating an error that is returned from thrift calls that include resources for `NamedPort` and `NumGpu` func (r *realisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb *int64) (*aurora.Response, error) { - ramRes := aurora.NewResource() - ramRes.RamMb = ramMb - cpuRes := aurora.NewResource() - cpuRes.NumCpus = cpu - diskRes := aurora.NewResource() - diskRes.DiskMb = diskMb - quota := aurora.NewResourceAggregate() - quota.Resources = []*aurora.Resource{cpuRes, ramRes, diskRes} - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.SetQuota(nil, role, quota) - }) + quota := &aurora.ResourceAggregate{ + Resources: []*aurora.Resource{{NumCpus: cpu}, {RamMb: ramMb}, {DiskMb: diskMb}}, + } + + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.adminClient.SetQuota(nil, role, quota) + }) if retryErr != nil { return resp, errors.Wrap(retryErr, "Unable to set role quota") @@ -180,9 +188,11 @@ func (r *realisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb // GetQuota returns the resource aggregate for the given role func (r *realisClient) GetQuota(role string) (*aurora.Response, error) { - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.GetQuota(nil, role) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.adminClient.GetQuota(nil, role) + }) if retryErr != nil { return resp, errors.Wrap(retryErr, "Unable to get role quota") @@ -193,9 +203,11 @@ func (r *realisClient) GetQuota(role string) (*aurora.Response, error) { // Force Aurora Scheduler to perform a snapshot and write to Mesos log func (r *realisClient) Snapshot() error { - _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.Snapshot(nil) - }) + _, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.adminClient.Snapshot(nil) + }) if retryErr != nil { return errors.Wrap(retryErr, "Unable to recover connection") @@ -207,9 +219,11 @@ func (r *realisClient) Snapshot() error { // Force Aurora Scheduler to write backup file to a file in the backup directory func (r *realisClient) PerformBackup() error { - _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.PerformBackup(nil) - }) + _, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.adminClient.PerformBackup(nil) + }) if retryErr != nil { return errors.Wrap(retryErr, "Unable to recover connection") @@ -220,9 +234,11 @@ func (r *realisClient) PerformBackup() error { func (r *realisClient) ForceImplicitTaskReconciliation() error { - _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.TriggerImplicitTaskReconciliation(nil) - }) + _, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.adminClient.TriggerImplicitTaskReconciliation(nil) + }) if retryErr != nil { return errors.Wrap(retryErr, "Unable to recover connection") @@ -240,9 +256,10 @@ func (r *realisClient) ForceExplicitTaskReconciliation(batchSize *int32) error { settings.BatchSize = batchSize - _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.TriggerExplicitTaskReconciliation(nil, settings) - }) + _, retryErr := r.thriftCallWithRetries(false, + func() (*aurora.Response, error) { + return r.adminClient.TriggerExplicitTaskReconciliation(nil, settings) + }) if retryErr != nil { return errors.Wrap(retryErr, "Unable to recover connection") diff --git a/retry.go b/retry.go index ca88264..ba472a8 100644 --- a/retry.go +++ b/retry.go @@ -116,10 +116,11 @@ func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc) type auroraThriftCall func() (resp *aurora.Response, err error) // Duplicates the functionality of ExponentialBackoff but is specifically targeted towards ThriftCalls. -func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*aurora.Response, error) { +func (r *realisClient) thriftCallWithRetries(returnOnTimeout bool, thriftCall auroraThriftCall) (*aurora.Response, error) { var resp *aurora.Response var clientErr error var curStep int + timeouts := 0 backoff := r.config.backoff duration := backoff.Duration @@ -151,7 +152,7 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro r.logger.TracePrintf("Aurora Thrift Call ended resp: %v clientErr: %v\n", resp, clientErr) }() - // Check if our thrift call is returning an error. This is a retriable event as we don't know + // Check if our thrift call is returning an error. This is a retryable event as we don't know // if it was caused by network issues. if clientErr != nil { @@ -171,6 +172,19 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro if e.Err != io.EOF && !e.Temporary() { return nil, errors.Wrap(clientErr, "permanent connection error") } + + // Corner case where thrift payload was received by Aurora but connection timedout before Aurora was + // able to reply. In this case we will return whatever response was received and a TimedOut behaving + // error. Users can take special action on a timeout by using IsTimedout and reacting accordingly. + if e.Timeout() { + timeouts++ + r.logger.DebugPrintf( + "Client closed connection (timedout) %v times before server responded, consider increasing connection timeout", + timeouts) + if returnOnTimeout { + return resp, newTimedoutError(errors.New("client connection closed before server answer")) + } + } } }