Add verification to retry mechanism (#131)
CreateJob, CreateService, and StartJobUpdate now include a rudimentary verification function to check if the call made it to the Aurora Scheduler when the client experiences a timeout.
This commit is contained in:
parent
a9d99067ee
commit
82b40a53f0
7 changed files with 286 additions and 108 deletions
2
go.mod
2
go.mod
|
@ -8,5 +8,5 @@ require (
|
|||
github.com/pkg/errors v0.9.1
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/samuel/go-zookeeper v0.0.0-20171117190445-471cd4e61d7a
|
||||
github.com/stretchr/testify v1.2.0
|
||||
github.com/stretchr/testify v1.7.0
|
||||
)
|
||||
|
|
21
helpers.go
Normal file
21
helpers.go
Normal file
|
@ -0,0 +1,21 @@
|
|||
package realis
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/paypal/gorealis/gen-go/apache/aurora"
|
||||
)
|
||||
|
||||
func (r *realisClient) jobExists(key aurora.JobKey) (bool, error) {
|
||||
resp, err := r.client.GetConfigSummary(context.TODO(), &key)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return resp == nil ||
|
||||
resp.GetResult_() == nil ||
|
||||
resp.GetResult_().GetConfigSummaryResult_() == nil ||
|
||||
resp.GetResult_().GetConfigSummaryResult_().GetSummary() == nil ||
|
||||
resp.GetResponseCode() != aurora.ResponseCode_OK,
|
||||
nil
|
||||
}
|
158
realis.go
158
realis.go
|
@ -65,7 +65,6 @@ type Realis interface {
|
|||
RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error)
|
||||
ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
|
||||
StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error)
|
||||
|
||||
PauseJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error)
|
||||
ResumeJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error)
|
||||
PulseJobUpdate(key *aurora.JobUpdateKey) (*aurora.Response, error)
|
||||
|
@ -556,7 +555,9 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states []aurora.Schedu
|
|||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.GetTasksWithoutConfigs(context.TODO(), taskQ)
|
||||
})
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
// If we encountered an error we couldn't recover from by retrying, return an error to the user
|
||||
if retryErr != nil {
|
||||
|
@ -581,10 +582,16 @@ func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue
|
|||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.readonlyClient.GetJobUpdateSummaries(context.TODO(), jobUpdateQuery)
|
||||
})
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "error getting job update summaries from Aurora Scheduler")
|
||||
return resp, errors.Wrap(retryErr, "error getting job update summaries from Aurora Scheduler")
|
||||
}
|
||||
|
||||
if resp.GetResult_() == nil || resp.GetResult_().GetGetJobUpdateSummariesResult_() == nil {
|
||||
return nil, errors.New("unexpected response from scheduler")
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
|
@ -598,7 +605,9 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe
|
|||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.readonlyClient.GetJobs(context.TODO(), role)
|
||||
})
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, result, errors.Wrap(retryErr, "error getting Jobs from Aurora Scheduler")
|
||||
|
@ -619,7 +628,9 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a
|
|||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.KillTasks(context.TODO(), key, instances, "")
|
||||
})
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler")
|
||||
|
@ -641,7 +652,9 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
|
|||
func() (*aurora.Response, error) {
|
||||
// Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards
|
||||
return r.client.KillTasks(context.TODO(), key, nil, "")
|
||||
})
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler")
|
||||
|
@ -657,15 +670,32 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
|
|||
|
||||
r.logger.debugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig())
|
||||
|
||||
// Response is checked by the thrift retry code
|
||||
resp, retryErr := r.thriftCallWithRetries(
|
||||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.CreateJob(context.TODO(), auroraJob.JobConfig())
|
||||
})
|
||||
},
|
||||
// On a client timeout, attempt to verify that payload made to the Scheduler by
|
||||
// trying to get the config summary for the job key
|
||||
func() (*aurora.Response, bool) {
|
||||
exists, err := r.jobExists(*auroraJob.JobKey())
|
||||
if err != nil {
|
||||
r.logger.Print("verification failed ", err)
|
||||
}
|
||||
|
||||
if exists {
|
||||
return &aurora.Response{ResponseCode: aurora.ResponseCode_OK}, true
|
||||
}
|
||||
|
||||
return nil, false
|
||||
},
|
||||
)
|
||||
|
||||
if retryErr != nil {
|
||||
return resp, errors.Wrap(retryErr, "error sending Create command to Aurora Scheduler")
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
|
@ -680,17 +710,12 @@ func (r *realisClient) CreateService(
|
|||
resp, err := r.StartJobUpdate(update, "")
|
||||
if err != nil {
|
||||
if IsTimeout(err) {
|
||||
return resp, nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return resp, nil, errors.Wrap(err, "unable to create service")
|
||||
}
|
||||
|
||||
if resp.GetResult_() != nil {
|
||||
return resp, resp.GetResult_().GetStartJobUpdateResult_(), nil
|
||||
}
|
||||
|
||||
return resp, nil, errors.New("results object is nil")
|
||||
return resp, resp.GetResult_().StartJobUpdateResult_, nil
|
||||
}
|
||||
|
||||
func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) {
|
||||
|
@ -700,7 +725,9 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
|
|||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.ScheduleCronJob(context.TODO(), auroraJob.JobConfig())
|
||||
})
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "error sending Cron Job Schedule message to Aurora Scheduler")
|
||||
|
@ -716,7 +743,9 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response,
|
|||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.DescheduleCronJob(context.TODO(), key)
|
||||
})
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "error sending Cron Job De-schedule message to Aurora Scheduler")
|
||||
|
@ -734,7 +763,9 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error
|
|||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.StartCronJob(context.TODO(), key)
|
||||
})
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "error sending Start Cron Job message to Aurora Scheduler")
|
||||
|
@ -751,7 +782,9 @@ func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32)
|
|||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.RestartShards(context.TODO(), key, instances)
|
||||
})
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler")
|
||||
|
@ -774,7 +807,9 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
|
|||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.RestartShards(context.TODO(), key, instanceIds)
|
||||
})
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler")
|
||||
|
@ -795,16 +830,51 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au
|
|||
true,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.StartJobUpdate(context.TODO(), updateJob.req, message)
|
||||
})
|
||||
},
|
||||
func() (*aurora.Response, bool) {
|
||||
summariesResp, err := r.readonlyClient.GetJobUpdateSummaries(
|
||||
context.TODO(),
|
||||
&aurora.JobUpdateQuery{
|
||||
JobKey: updateJob.JobKey(),
|
||||
UpdateStatuses: aurora.ACTIVE_JOB_UPDATE_STATES,
|
||||
Limit: 1,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
r.logger.Print("verification failed ", err)
|
||||
return nil, false
|
||||
}
|
||||
|
||||
summaries := response.JobUpdateSummaries(summariesResp)
|
||||
if len(summaries) == 0 {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
return &aurora.Response{
|
||||
ResponseCode: aurora.ResponseCode_OK,
|
||||
Result_: &aurora.Result_{
|
||||
StartJobUpdateResult_: &aurora.StartJobUpdateResult_{
|
||||
UpdateSummary: summaries[0],
|
||||
Key: summaries[0].Key,
|
||||
},
|
||||
},
|
||||
}, true
|
||||
},
|
||||
)
|
||||
|
||||
if retryErr != nil {
|
||||
// A timeout took place when attempting this call, attempt to recover
|
||||
if IsTimeout(retryErr) {
|
||||
return resp, retryErr
|
||||
return nil, retryErr
|
||||
}
|
||||
|
||||
return resp, errors.Wrap(retryErr, "error sending StartJobUpdate command to Aurora Scheduler")
|
||||
}
|
||||
|
||||
if resp.GetResult_() == nil {
|
||||
return resp, errors.New("no result in response")
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
|
@ -820,7 +890,9 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str
|
|||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.AbortJobUpdate(context.TODO(), &updateKey, message)
|
||||
})
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "error sending AbortJobUpdate command to Aurora Scheduler")
|
||||
|
@ -847,7 +919,9 @@ func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st
|
|||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.PauseJobUpdate(context.TODO(), updateKey, message)
|
||||
})
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "error sending PauseJobUpdate command to Aurora Scheduler")
|
||||
|
@ -865,7 +939,9 @@ func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s
|
|||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.ResumeJobUpdate(context.TODO(), updateKey, message)
|
||||
})
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "error sending ResumeJobUpdate command to Aurora Scheduler")
|
||||
|
@ -883,7 +959,9 @@ func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R
|
|||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.PulseJobUpdate(context.TODO(), updateKey)
|
||||
})
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "error sending PulseJobUpdate command to Aurora Scheduler")
|
||||
|
@ -901,7 +979,9 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a
|
|||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.AddInstances(context.TODO(), &instKey, count)
|
||||
})
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "error sending AddInstances command to Aurora Scheduler")
|
||||
|
@ -940,7 +1020,9 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.Schedul
|
|||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.GetTasksStatus(context.TODO(), query)
|
||||
})
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status")
|
||||
|
@ -958,7 +1040,9 @@ func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.Pend
|
|||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.GetPendingReason(context.TODO(), query)
|
||||
})
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for pending Reasons")
|
||||
|
@ -983,7 +1067,9 @@ func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*auror
|
|||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.GetTasksWithoutConfigs(context.TODO(), query)
|
||||
})
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status without configs")
|
||||
|
@ -1009,7 +1095,9 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
|
|||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.GetTasksStatus(context.TODO(), taskQ)
|
||||
})
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task configuration")
|
||||
|
@ -1037,7 +1125,9 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur
|
|||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.GetJobUpdateDetails(context.TODO(), &updateQuery)
|
||||
})
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "unable to get job update details")
|
||||
|
@ -1054,7 +1144,9 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string
|
|||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.RollbackJobUpdate(context.TODO(), &key, message)
|
||||
})
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "unable to roll back job update")
|
||||
|
|
|
@ -30,7 +30,9 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr
|
|||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.adminClient.DrainHosts(context.TODO(), drainList)
|
||||
})
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
if retryErr != nil {
|
||||
return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
|
||||
|
@ -65,7 +67,9 @@ func (r *realisClient) SLADrainHosts(
|
|||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.adminClient.SlaDrainHosts(context.TODO(), drainList, policy, timeout)
|
||||
})
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
if retryErr != nil {
|
||||
return result, errors.Wrap(retryErr, "Unable to recover connection")
|
||||
|
@ -95,7 +99,9 @@ func (r *realisClient) StartMaintenance(hosts ...string) (*aurora.Response, *aur
|
|||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.adminClient.StartMaintenance(context.TODO(), hostList)
|
||||
})
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
if retryErr != nil {
|
||||
return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
|
||||
|
@ -125,7 +131,9 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror
|
|||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.adminClient.EndMaintenance(context.TODO(), hostList)
|
||||
})
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
if retryErr != nil {
|
||||
return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
|
||||
|
@ -157,7 +165,9 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au
|
|||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.adminClient.MaintenanceStatus(context.TODO(), hostList)
|
||||
})
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
if retryErr != nil {
|
||||
return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
|
||||
|
@ -182,7 +192,9 @@ func (r *realisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb
|
|||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.adminClient.SetQuota(context.TODO(), role, quota)
|
||||
})
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
if retryErr != nil {
|
||||
return resp, errors.Wrap(retryErr, "Unable to set role quota")
|
||||
|
@ -198,7 +210,9 @@ func (r *realisClient) GetQuota(role string) (*aurora.Response, error) {
|
|||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.adminClient.GetQuota(context.TODO(), role)
|
||||
})
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
if retryErr != nil {
|
||||
return resp, errors.Wrap(retryErr, "Unable to get role quota")
|
||||
|
@ -213,7 +227,9 @@ func (r *realisClient) Snapshot() error {
|
|||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.adminClient.Snapshot(context.TODO())
|
||||
})
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
if retryErr != nil {
|
||||
return errors.Wrap(retryErr, "Unable to recover connection")
|
||||
|
@ -229,7 +245,9 @@ func (r *realisClient) PerformBackup() error {
|
|||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.adminClient.PerformBackup(context.TODO())
|
||||
})
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
if retryErr != nil {
|
||||
return errors.Wrap(retryErr, "Unable to recover connection")
|
||||
|
@ -244,7 +262,9 @@ func (r *realisClient) ForceImplicitTaskReconciliation() error {
|
|||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.adminClient.TriggerImplicitTaskReconciliation(context.TODO())
|
||||
})
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
if retryErr != nil {
|
||||
return errors.Wrap(retryErr, "Unable to recover connection")
|
||||
|
@ -265,7 +285,9 @@ func (r *realisClient) ForceExplicitTaskReconciliation(batchSize *int32) error {
|
|||
_, retryErr := r.thriftCallWithRetries(false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.adminClient.TriggerExplicitTaskReconciliation(context.TODO(), settings)
|
||||
})
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
if retryErr != nil {
|
||||
return errors.Wrap(retryErr, "Unable to recover connection")
|
||||
|
|
|
@ -36,6 +36,10 @@ func ScheduleStatusResult(resp *aurora.Response) *aurora.ScheduleStatusResult_ {
|
|||
}
|
||||
|
||||
func JobUpdateSummaries(resp *aurora.Response) []*aurora.JobUpdateSummary {
|
||||
if resp.GetResult_() == nil || resp.GetResult_().GetGetJobUpdateSummariesResult_() == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return resp.GetResult_().GetGetJobUpdateSummariesResult_().GetUpdateSummaries()
|
||||
}
|
||||
|
||||
|
|
163
retry.go
163
retry.go
|
@ -114,10 +114,19 @@ func ExponentialBackoff(backoff Backoff, logger logger, condition ConditionFunc)
|
|||
|
||||
type auroraThriftCall func() (resp *aurora.Response, err error)
|
||||
|
||||
// verifyOntimeout defines the type of function that will be used to verify whether a Thirft call to the Scheduler
|
||||
// made it to the scheduler or not. In general, these types of functions will have to interact with the scheduler
|
||||
// through the very same Thrift API which previously encountered a time out from the client.
|
||||
// This means that the functions themselves should be kept to a minimum number of Thrift calls.
|
||||
// It should also be noted that this is a best effort mechanism and
|
||||
// is likely to fail for the same reasons that the original call failed.
|
||||
type verifyOnTimeout func() (*aurora.Response, bool)
|
||||
|
||||
// Duplicates the functionality of ExponentialBackoff but is specifically targeted towards ThriftCalls.
|
||||
func (r *realisClient) thriftCallWithRetries(
|
||||
returnOnTimeout bool,
|
||||
thriftCall auroraThriftCall) (*aurora.Response, error) {
|
||||
thriftCall auroraThriftCall,
|
||||
verifyOnTimeout verifyOnTimeout) (*aurora.Response, error) {
|
||||
|
||||
var resp *aurora.Response
|
||||
var clientErr error
|
||||
|
@ -157,42 +166,22 @@ func (r *realisClient) thriftCallWithRetries(
|
|||
r.logger.tracePrintf("Aurora Thrift Call ended resp: %v clientErr: %v", resp, clientErr)
|
||||
}()
|
||||
|
||||
// Check if our thrift call is returning an error. This is a retryable event as we don't know
|
||||
// if it was caused by network issues.
|
||||
// Check if our thrift call is returning an error.
|
||||
if clientErr != nil {
|
||||
|
||||
// Print out the error to the user
|
||||
r.logger.Printf("Client Error: %v", clientErr)
|
||||
|
||||
// Determine if error is a temporary URL error by going up the stack
|
||||
e, ok := clientErr.(thrift.TTransportException)
|
||||
if ok {
|
||||
r.logger.debugPrint("Encountered a transport exception")
|
||||
temporary, timedout := isConnectionError(clientErr)
|
||||
if !temporary && r.RealisConfig().failOnPermanentErrors {
|
||||
return nil, errors.Wrap(clientErr, "permanent connection error")
|
||||
}
|
||||
|
||||
e, ok := e.Err().(*url.Error)
|
||||
if ok {
|
||||
|
||||
// EOF error occurs when the server closes the read buffer of the client. This is common
|
||||
// when the server is overloaded and should be retried. All other errors that are permanent
|
||||
// will not be retried.
|
||||
if e.Err != io.EOF && !e.Temporary() && r.RealisConfig().failOnPermanentErrors {
|
||||
return nil, errors.Wrap(clientErr, "permanent connection error")
|
||||
}
|
||||
|
||||
// Corner case where thrift payload was received by Aurora but connection timed out before Aurora was
|
||||
// able to reply. In this case we will return whatever response was received and a TimedOut behaving
|
||||
// error. Users can take special action on a timeout by using IsTimedout and reacting accordingly.
|
||||
if e.Timeout() {
|
||||
timeouts++
|
||||
r.logger.debugPrintf(
|
||||
"Client closed connection (timedout) %d times before server responded, "+
|
||||
"consider increasing connection timeout",
|
||||
timeouts)
|
||||
if returnOnTimeout {
|
||||
return resp, newTimedoutError(errors.New("client connection closed before server answer"))
|
||||
}
|
||||
}
|
||||
}
|
||||
// There exists a corner case where thrift payload was received by Aurora but
|
||||
// connection timed out before Aurora was able to reply.
|
||||
// Users can take special action on a timeout by using IsTimedout and reacting accordingly
|
||||
// if they have configured the client to return on a timeout.
|
||||
if timedout && returnOnTimeout {
|
||||
return resp, newTimedoutError(errors.New("client connection closed before server answer"))
|
||||
}
|
||||
|
||||
// In the future, reestablish connection should be able to check if it is actually possible
|
||||
|
@ -202,48 +191,71 @@ func (r *realisClient) thriftCallWithRetries(
|
|||
if reestablishErr != nil {
|
||||
r.logger.debugPrintf("error re-establishing connection ", reestablishErr)
|
||||
}
|
||||
} else {
|
||||
|
||||
// If there was no client error, but the response is nil, something went wrong.
|
||||
// Ideally, we'll never encounter this but we're placing a safeguard here.
|
||||
if resp == nil {
|
||||
return nil, errors.New("response from aurora is nil")
|
||||
// If users did not opt for a return on timeout in order to react to a timedout error,
|
||||
// attempt to verify that the call made it to the scheduler after the connection was re-established.
|
||||
if timedout {
|
||||
timeouts++
|
||||
r.logger.debugPrintf(
|
||||
"Client closed connection %d times before server responded, "+
|
||||
"consider increasing connection timeout",
|
||||
timeouts)
|
||||
|
||||
// Allow caller to provide a function which checks if the original call was successful before
|
||||
// it timed out.
|
||||
if verifyOnTimeout != nil {
|
||||
if verifyResp, ok := verifyOnTimeout(); ok {
|
||||
r.logger.Print("verified that the call went through successfully after a client timeout")
|
||||
// Response here might be different than the original as it is no longer constructed
|
||||
// by the scheduler but mimicked.
|
||||
// This is OK since the scheduler is very unlikely to change responses at this point in its
|
||||
// development cycle but we must be careful to not return an incorrectly constructed response.
|
||||
return verifyResp, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check Response Code from thrift and make a decision to continue retrying or not.
|
||||
switch responseCode := resp.GetResponseCode(); responseCode {
|
||||
// Retry the thrift payload
|
||||
continue
|
||||
}
|
||||
|
||||
// If the thrift call succeeded, stop retrying
|
||||
case aurora.ResponseCode_OK:
|
||||
return resp, nil
|
||||
// If there was no client error, but the response is nil, something went wrong.
|
||||
// Ideally, we'll never encounter this but we're placing a safeguard here.
|
||||
if resp == nil {
|
||||
return nil, errors.New("response from aurora is nil")
|
||||
}
|
||||
|
||||
// If the response code is transient, continue retrying
|
||||
case aurora.ResponseCode_ERROR_TRANSIENT:
|
||||
r.logger.Println("Aurora replied with Transient error code, retrying")
|
||||
continue
|
||||
// Check Response Code from thrift and make a decision to continue retrying or not.
|
||||
switch responseCode := resp.GetResponseCode(); responseCode {
|
||||
|
||||
// Failure scenarios, these indicate a bad payload or a bad config. Stop retrying.
|
||||
case aurora.ResponseCode_INVALID_REQUEST,
|
||||
aurora.ResponseCode_ERROR,
|
||||
aurora.ResponseCode_AUTH_FAILED,
|
||||
aurora.ResponseCode_JOB_UPDATING_ERROR:
|
||||
r.logger.Printf("Terminal Response Code %v from Aurora, won't retry\n", resp.GetResponseCode().String())
|
||||
return resp, errors.New(response.CombineMessage(resp))
|
||||
// If the thrift call succeeded, stop retrying
|
||||
case aurora.ResponseCode_OK:
|
||||
return resp, nil
|
||||
|
||||
// The only case that should fall down to here is a WARNING response code.
|
||||
// It is currently not used as a response in the scheduler so it is unknown how to handle it.
|
||||
default:
|
||||
r.logger.debugPrintf("unhandled response code %v received from Aurora\n", responseCode)
|
||||
return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String())
|
||||
}
|
||||
// If the response code is transient, continue retrying
|
||||
case aurora.ResponseCode_ERROR_TRANSIENT:
|
||||
r.logger.Println("Aurora replied with Transient error code, retrying")
|
||||
continue
|
||||
|
||||
// Failure scenarios, these indicate a bad payload or a bad config. Stop retrying.
|
||||
case aurora.ResponseCode_INVALID_REQUEST,
|
||||
aurora.ResponseCode_ERROR,
|
||||
aurora.ResponseCode_AUTH_FAILED,
|
||||
aurora.ResponseCode_JOB_UPDATING_ERROR:
|
||||
r.logger.Printf("Terminal Response Code %v from Aurora, won't retry\n", resp.GetResponseCode().String())
|
||||
return resp, errors.New(response.CombineMessage(resp))
|
||||
|
||||
// The only case that should fall down to here is a WARNING response code.
|
||||
// It is currently not used as a response in the scheduler so it is unknown how to handle it.
|
||||
default:
|
||||
r.logger.debugPrintf("unhandled response code %v received from Aurora\n", responseCode)
|
||||
return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String())
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
r.logger.debugPrintf("it took %v retries to complete this operation\n", curStep)
|
||||
|
||||
if curStep > 1 {
|
||||
r.config.logger.Printf("retried this thrift call %d time(s)", curStep)
|
||||
r.config.logger.Printf("this thrift call was retried %d time(s)", curStep)
|
||||
}
|
||||
|
||||
// Provide more information to the user wherever possible.
|
||||
|
@ -253,3 +265,30 @@ func (r *realisClient) thriftCallWithRetries(
|
|||
|
||||
return nil, newRetryError(errors.New("ran out of retries"), curStep)
|
||||
}
|
||||
|
||||
// isConnectionError processes the error received by the client.
|
||||
// The return values indicate weather this was determined to be a temporary error
|
||||
// and weather it was determined to be a timeout error
|
||||
func isConnectionError(err error) (bool, bool) {
|
||||
|
||||
// Determine if error is a temporary URL error by going up the stack
|
||||
transportException, ok := err.(thrift.TTransportException)
|
||||
if !ok {
|
||||
return false, false
|
||||
}
|
||||
|
||||
urlError, ok := transportException.Err().(*url.Error)
|
||||
if !ok {
|
||||
return false, false
|
||||
}
|
||||
|
||||
// EOF error occurs when the server closes the read buffer of the client. This is common
|
||||
// when the server is overloaded and we consider it temporary.
|
||||
// All other which are not temporary as per the member function Temporary(),
|
||||
// are considered not temporary (permanent).
|
||||
if urlError.Err != io.EOF && !urlError.Temporary() {
|
||||
return false, false
|
||||
}
|
||||
|
||||
return true, urlError.Timeout()
|
||||
}
|
||||
|
|
2
util.go
2
util.go
|
@ -29,7 +29,7 @@ var TerminalStates = make(map[aurora.ScheduleStatus]bool)
|
|||
// ActiveJobUpdateStates - States a Job Update may be in where it is considered active.
|
||||
var ActiveJobUpdateStates = make(map[aurora.JobUpdateStatus]bool)
|
||||
|
||||
// TerminalJobUpdateStates returns a slice containing all the terminal states an update may end up in.
|
||||
// TerminalUpdateStates returns a slice containing all the terminal states an update may be in.
|
||||
// This is a function in order to avoid having a slice that can be accidentally mutated.
|
||||
func TerminalUpdateStates() []aurora.JobUpdateStatus {
|
||||
return []aurora.JobUpdateStatus{
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue