Adding an argument to allow the retry mechanism to stop if a timeout has been encountered. This is useful for non-mutating API calls. Only StartUpdate and CreateService have enabled by default stop at timeout.

This commit is contained in:
Renan DelValle 2019-05-01 09:17:44 -07:00
parent ee1a95831c
commit 979fb11464
No known key found for this signature in database
GPG key ID: C240AD6D6F443EC9
3 changed files with 202 additions and 118 deletions

203
realis.go
View file

@ -405,10 +405,8 @@ func GetCerts(certPath string) (*x509.CertPool, error) {
func defaultTTransport(url string, timeoutMs int, config *RealisConfig) (thrift.TTransport, error) {
var transport http.Transport
if config != nil {
tlsConfig := &tls.Config{}
if config.InsecureSkipVerify {
tlsConfig.InsecureSkipVerify = true
}
tlsConfig := &tls.Config{InsecureSkipVerify: config.InsecureSkipVerify}
if config.certspath != "" {
rootCAs, err := GetCerts(config.certspath)
if err != nil {
@ -505,9 +503,11 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states []aurora.Schedu
r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", taskQ)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.GetTasksWithoutConfigs(nil, taskQ)
})
resp, retryErr := r.thriftCallWithRetries(
false,
func() (*aurora.Response, error) {
return r.client.GetTasksWithoutConfigs(nil, taskQ)
})
// If we encountered an error we couldn't recover from by retrying, return an error to the user
if retryErr != nil {
@ -528,9 +528,11 @@ func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue
r.logger.DebugPrintf("GetJobUpdateSummaries Thrift Payload: %+v\n", jobUpdateQuery)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.readonlyClient.GetJobUpdateSummaries(nil, jobUpdateQuery)
})
resp, retryErr := r.thriftCallWithRetries(
false,
func() (*aurora.Response, error) {
return r.readonlyClient.GetJobUpdateSummaries(nil, jobUpdateQuery)
})
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error getting job update summaries from Aurora Scheduler")
@ -543,9 +545,11 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe
var result *aurora.GetJobsResult_
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.readonlyClient.GetJobs(nil, role)
})
resp, retryErr := r.thriftCallWithRetries(
false,
func() (*aurora.Response, error) {
return r.readonlyClient.GetJobs(nil, role)
})
if retryErr != nil {
return nil, result, errors.Wrap(retryErr, "error getting Jobs from Aurora Scheduler")
@ -562,9 +566,11 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe
func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) {
r.logger.DebugPrintf("KillTasks Thrift Payload: %+v %v\n", key, instances)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.KillTasks(nil, key, instances, "")
})
resp, retryErr := r.thriftCallWithRetries(
false,
func() (*aurora.Response, error) {
return r.client.KillTasks(nil, key, instances, "")
})
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler")
@ -581,10 +587,12 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
r.logger.DebugPrintf("KillTasks Thrift Payload: %+v\n", key)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
// Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards
return r.client.KillTasks(nil, key, nil, "")
})
resp, retryErr := r.thriftCallWithRetries(
false,
func() (*aurora.Response, error) {
// Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards
return r.client.KillTasks(nil, key, nil, "")
})
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler")
@ -600,9 +608,11 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
r.logger.DebugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig())
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.CreateJob(nil, auroraJob.JobConfig())
})
resp, retryErr := r.thriftCallWithRetries(
false,
func() (*aurora.Response, error) {
return r.client.CreateJob(nil, auroraJob.JobConfig())
})
if retryErr != nil {
return resp, errors.Wrap(retryErr, "error sending Create command to Aurora Scheduler")
@ -618,6 +628,10 @@ func (r *realisClient) CreateService(auroraJob Job, settings *aurora.JobUpdateSe
resp, err := r.StartJobUpdate(update, "")
if err != nil {
if IsTimeout(err) {
return resp, nil, err
}
return resp, nil, errors.Wrap(err, "unable to create service")
}
@ -631,9 +645,11 @@ func (r *realisClient) CreateService(auroraJob Job, settings *aurora.JobUpdateSe
func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) {
r.logger.DebugPrintf("ScheduleCronJob Thrift Payload: %+v\n", auroraJob.JobConfig())
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.ScheduleCronJob(nil, auroraJob.JobConfig())
})
resp, retryErr := r.thriftCallWithRetries(
false,
func() (*aurora.Response, error) {
return r.client.ScheduleCronJob(nil, auroraJob.JobConfig())
})
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending Cron Job Schedule message to Aurora Scheduler")
@ -645,9 +661,11 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response,
r.logger.DebugPrintf("DescheduleCronJob Thrift Payload: %+v\n", key)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.DescheduleCronJob(nil, key)
})
resp, retryErr := r.thriftCallWithRetries(
false,
func() (*aurora.Response, error) {
return r.client.DescheduleCronJob(nil, key)
})
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending Cron Job De-schedule message to Aurora Scheduler")
@ -661,9 +679,11 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error
r.logger.DebugPrintf("StartCronJob Thrift Payload: %+v\n", key)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.StartCronJob(nil, key)
})
resp, retryErr := r.thriftCallWithRetries(
false,
func() (*aurora.Response, error) {
return r.client.StartCronJob(nil, key)
})
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending Start Cron Job message to Aurora Scheduler")
@ -676,9 +696,11 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error
func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) {
r.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instances)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.RestartShards(nil, key, instances)
})
resp, retryErr := r.thriftCallWithRetries(
false,
func() (*aurora.Response, error) {
return r.client.RestartShards(nil, key, instances)
})
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler")
@ -697,9 +719,11 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
r.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instanceIds)
if len(instanceIds) > 0 {
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.RestartShards(nil, key, instanceIds)
})
resp, retryErr := r.thriftCallWithRetries(
false,
func() (*aurora.Response, error) {
return r.client.RestartShards(nil, key, instanceIds)
})
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler")
@ -716,12 +740,19 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au
r.logger.DebugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.StartJobUpdate(nil, updateJob.req, message)
})
resp, retryErr := r.thriftCallWithRetries(
true,
func() (*aurora.Response, error) {
return r.client.StartJobUpdate(nil, updateJob.req, message)
})
if retryErr != nil {
return resp, errors.Wrap(retryErr, "error sending StartJobUpdate command to Aurora Scheduler")
// A timeout took place when attempting this call, attempt to recover
if IsTimeout(retryErr) {
return resp, retryErr
} else {
return resp, errors.Wrap(retryErr, "error sending StartJobUpdate command to Aurora Scheduler")
}
}
return resp, nil
}
@ -733,9 +764,11 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str
r.logger.DebugPrintf("AbortJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.AbortJobUpdate(nil, &updateKey, message)
})
resp, retryErr := r.thriftCallWithRetries(
false,
func() (*aurora.Response, error) {
return r.client.AbortJobUpdate(nil, &updateKey, message)
})
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending AbortJobUpdate command to Aurora Scheduler")
@ -753,9 +786,11 @@ func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st
r.logger.DebugPrintf("PauseJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.PauseJobUpdate(nil, updateKey, message)
})
resp, retryErr := r.thriftCallWithRetries(
false,
func() (*aurora.Response, error) {
return r.client.PauseJobUpdate(nil, updateKey, message)
})
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending PauseJobUpdate command to Aurora Scheduler")
@ -769,9 +804,11 @@ func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s
r.logger.DebugPrintf("ResumeJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.ResumeJobUpdate(nil, updateKey, message)
})
resp, retryErr := r.thriftCallWithRetries(
false,
func() (*aurora.Response, error) {
return r.client.ResumeJobUpdate(nil, updateKey, message)
})
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending ResumeJobUpdate command to Aurora Scheduler")
@ -785,9 +822,11 @@ func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R
r.logger.DebugPrintf("PulseJobUpdate Thrift Payload: %+v\n", updateKey)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.PulseJobUpdate(nil, updateKey)
})
resp, retryErr := r.thriftCallWithRetries(
false,
func() (*aurora.Response, error) {
return r.client.PulseJobUpdate(nil, updateKey)
})
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending PulseJobUpdate command to Aurora Scheduler")
@ -802,9 +841,11 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a
r.logger.DebugPrintf("AddInstances Thrift Payload: %+v %v\n", instKey, count)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.AddInstances(nil, &instKey, count)
})
resp, retryErr := r.thriftCallWithRetries(
false,
func() (*aurora.Response, error) {
return r.client.AddInstances(nil, &instKey, count)
})
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending AddInstances command to Aurora Scheduler")
@ -839,9 +880,11 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.Schedul
r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", query)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.GetTasksStatus(nil, query)
})
resp, retryErr := r.thriftCallWithRetries(
false,
func() (*aurora.Response, error) {
return r.client.GetTasksStatus(nil, query)
})
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status")
@ -855,9 +898,11 @@ func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.Pend
r.logger.DebugPrintf("GetPendingReason Thrift Payload: %+v\n", query)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.GetPendingReason(nil, query)
})
resp, retryErr := r.thriftCallWithRetries(
false,
func() (*aurora.Response, error) {
return r.client.GetPendingReason(nil, query)
})
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for pending Reasons")
@ -877,9 +922,11 @@ func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*auror
r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.GetTasksWithoutConfigs(nil, query)
})
resp, retryErr := r.thriftCallWithRetries(
false,
func() (*aurora.Response, error) {
return r.client.GetTasksWithoutConfigs(nil, query)
})
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status without configs")
@ -901,9 +948,11 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", taskQ)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.GetTasksStatus(nil, taskQ)
})
resp, retryErr := r.thriftCallWithRetries(
false,
func() (*aurora.Response, error) {
return r.client.GetTasksStatus(nil, taskQ)
})
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task configuration")
@ -927,9 +976,11 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur
r.logger.DebugPrintf("GetJobUpdateDetails Thrift Payload: %+v\n", updateQuery)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.GetJobUpdateDetails(nil, &updateQuery)
})
resp, retryErr := r.thriftCallWithRetries(
false,
func() (*aurora.Response, error) {
return r.client.GetJobUpdateDetails(nil, &updateQuery)
})
if retryErr != nil {
return nil, errors.Wrap(retryErr, "Unable to get job update details")
@ -942,9 +993,11 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string
r.logger.DebugPrintf("RollbackJobUpdate Thrift Payload: %+v %v\n", key, message)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.RollbackJobUpdate(nil, &key, message)
})
resp, retryErr := r.thriftCallWithRetries(
false,
func() (*aurora.Response, error) {
return r.client.RollbackJobUpdate(nil, &key, message)
})
if retryErr != nil {
return nil, errors.Wrap(retryErr, "unable to roll back job update")

View file

@ -24,9 +24,11 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr
r.logger.DebugPrintf("DrainHosts Thrift Payload: %v\n", drainList)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.DrainHosts(nil, drainList)
})
resp, retryErr := r.thriftCallWithRetries(
false,
func() (*aurora.Response, error) {
return r.adminClient.DrainHosts(nil, drainList)
})
if retryErr != nil {
return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
@ -54,9 +56,11 @@ func (r *realisClient) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, ho
r.logger.DebugPrintf("SLADrainHosts Thrift Payload: %v\n", drainList)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.SlaDrainHosts(nil, drainList, policy, timeout)
})
resp, retryErr := r.thriftCallWithRetries(
false,
func() (*aurora.Response, error) {
return r.adminClient.SlaDrainHosts(nil, drainList, policy, timeout)
})
if retryErr != nil {
return result, errors.Wrap(retryErr, "Unable to recover connection")
@ -82,9 +86,11 @@ func (r *realisClient) StartMaintenance(hosts ...string) (*aurora.Response, *aur
r.logger.DebugPrintf("StartMaintenance Thrift Payload: %v\n", hostList)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.StartMaintenance(nil, hostList)
})
resp, retryErr := r.thriftCallWithRetries(
false,
func() (*aurora.Response, error) {
return r.adminClient.StartMaintenance(nil, hostList)
})
if retryErr != nil {
return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
@ -110,9 +116,11 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror
r.logger.DebugPrintf("EndMaintenance Thrift Payload: %v\n", hostList)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.EndMaintenance(nil, hostList)
})
resp, retryErr := r.thriftCallWithRetries(
false,
func() (*aurora.Response, error) {
return r.adminClient.EndMaintenance(nil, hostList)
})
if retryErr != nil {
return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
@ -140,9 +148,11 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au
// Make thrift call. If we encounter an error sending the call, attempt to reconnect
// and continue trying to resend command until we run out of retries.
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.MaintenanceStatus(nil, hostList)
})
resp, retryErr := r.thriftCallWithRetries(
false,
func() (*aurora.Response, error) {
return r.adminClient.MaintenanceStatus(nil, hostList)
})
if retryErr != nil {
return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
@ -158,17 +168,15 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au
// SetQuota sets a quota aggregate for the given role
// TODO(zircote) Currently investigating an error that is returned from thrift calls that include resources for `NamedPort` and `NumGpu`
func (r *realisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb *int64) (*aurora.Response, error) {
ramRes := aurora.NewResource()
ramRes.RamMb = ramMb
cpuRes := aurora.NewResource()
cpuRes.NumCpus = cpu
diskRes := aurora.NewResource()
diskRes.DiskMb = diskMb
quota := aurora.NewResourceAggregate()
quota.Resources = []*aurora.Resource{cpuRes, ramRes, diskRes}
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.SetQuota(nil, role, quota)
})
quota := &aurora.ResourceAggregate{
Resources: []*aurora.Resource{{NumCpus: cpu}, {RamMb: ramMb}, {DiskMb: diskMb}},
}
resp, retryErr := r.thriftCallWithRetries(
false,
func() (*aurora.Response, error) {
return r.adminClient.SetQuota(nil, role, quota)
})
if retryErr != nil {
return resp, errors.Wrap(retryErr, "Unable to set role quota")
@ -180,9 +188,11 @@ func (r *realisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb
// GetQuota returns the resource aggregate for the given role
func (r *realisClient) GetQuota(role string) (*aurora.Response, error) {
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.GetQuota(nil, role)
})
resp, retryErr := r.thriftCallWithRetries(
false,
func() (*aurora.Response, error) {
return r.adminClient.GetQuota(nil, role)
})
if retryErr != nil {
return resp, errors.Wrap(retryErr, "Unable to get role quota")
@ -193,9 +203,11 @@ func (r *realisClient) GetQuota(role string) (*aurora.Response, error) {
// Force Aurora Scheduler to perform a snapshot and write to Mesos log
func (r *realisClient) Snapshot() error {
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.Snapshot(nil)
})
_, retryErr := r.thriftCallWithRetries(
false,
func() (*aurora.Response, error) {
return r.adminClient.Snapshot(nil)
})
if retryErr != nil {
return errors.Wrap(retryErr, "Unable to recover connection")
@ -207,9 +219,11 @@ func (r *realisClient) Snapshot() error {
// Force Aurora Scheduler to write backup file to a file in the backup directory
func (r *realisClient) PerformBackup() error {
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.PerformBackup(nil)
})
_, retryErr := r.thriftCallWithRetries(
false,
func() (*aurora.Response, error) {
return r.adminClient.PerformBackup(nil)
})
if retryErr != nil {
return errors.Wrap(retryErr, "Unable to recover connection")
@ -220,9 +234,11 @@ func (r *realisClient) PerformBackup() error {
func (r *realisClient) ForceImplicitTaskReconciliation() error {
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.TriggerImplicitTaskReconciliation(nil)
})
_, retryErr := r.thriftCallWithRetries(
false,
func() (*aurora.Response, error) {
return r.adminClient.TriggerImplicitTaskReconciliation(nil)
})
if retryErr != nil {
return errors.Wrap(retryErr, "Unable to recover connection")
@ -240,9 +256,10 @@ func (r *realisClient) ForceExplicitTaskReconciliation(batchSize *int32) error {
settings.BatchSize = batchSize
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.TriggerExplicitTaskReconciliation(nil, settings)
})
_, retryErr := r.thriftCallWithRetries(false,
func() (*aurora.Response, error) {
return r.adminClient.TriggerExplicitTaskReconciliation(nil, settings)
})
if retryErr != nil {
return errors.Wrap(retryErr, "Unable to recover connection")

View file

@ -116,10 +116,11 @@ func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc)
type auroraThriftCall func() (resp *aurora.Response, err error)
// Duplicates the functionality of ExponentialBackoff but is specifically targeted towards ThriftCalls.
func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*aurora.Response, error) {
func (r *realisClient) thriftCallWithRetries(returnOnTimeout bool, thriftCall auroraThriftCall) (*aurora.Response, error) {
var resp *aurora.Response
var clientErr error
var curStep int
timeouts := 0
backoff := r.config.backoff
duration := backoff.Duration
@ -151,7 +152,7 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro
r.logger.TracePrintf("Aurora Thrift Call ended resp: %v clientErr: %v\n", resp, clientErr)
}()
// Check if our thrift call is returning an error. This is a retriable event as we don't know
// Check if our thrift call is returning an error. This is a retryable event as we don't know
// if it was caused by network issues.
if clientErr != nil {
@ -171,6 +172,19 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro
if e.Err != io.EOF && !e.Temporary() {
return nil, errors.Wrap(clientErr, "permanent connection error")
}
// Corner case where thrift payload was received by Aurora but connection timedout before Aurora was
// able to reply. In this case we will return whatever response was received and a TimedOut behaving
// error. Users can take special action on a timeout by using IsTimedout and reacting accordingly.
if e.Timeout() {
timeouts++
r.logger.DebugPrintf(
"Client closed connection (timedout) %v times before server responded, consider increasing connection timeout",
timeouts)
if returnOnTimeout {
return resp, newTimedoutError(errors.New("client connection closed before server answer"))
}
}
}
}