Addressing code review feedback.
This commit is contained in:
parent
e709f8405c
commit
7906fd6c67
3 changed files with 39 additions and 25 deletions
13
helpers.go
13
helpers.go
|
@ -6,15 +6,16 @@ import (
|
|||
"github.com/paypal/gorealis/gen-go/apache/aurora"
|
||||
)
|
||||
|
||||
func (r *realisClient) jobExists(key aurora.JobKey) bool {
|
||||
func (r *realisClient) jobExists(key aurora.JobKey) (bool, error) {
|
||||
resp, err := r.client.GetConfigSummary(context.TODO(), &key)
|
||||
if err != nil {
|
||||
return false
|
||||
return false, err
|
||||
}
|
||||
|
||||
return resp == nil ||
|
||||
resp.GetResult_() == nil ||
|
||||
resp.GetResult_().GetConfigSummaryResult_() == nil ||
|
||||
resp.GetResult_().GetConfigSummaryResult_().GetSummary() == nil ||
|
||||
resp.GetResponseCode() != aurora.ResponseCode_OK
|
||||
resp.GetResult_() == nil ||
|
||||
resp.GetResult_().GetConfigSummaryResult_() == nil ||
|
||||
resp.GetResult_().GetConfigSummaryResult_().GetSummary() == nil ||
|
||||
resp.GetResponseCode() != aurora.ResponseCode_OK,
|
||||
nil
|
||||
}
|
||||
|
|
|
@ -679,7 +679,12 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
|
|||
// On a client timeout, attempt to verify that payload made to the Scheduler by
|
||||
// trying to get the config summary for the job key
|
||||
func() (*aurora.Response, bool) {
|
||||
if r.jobExists(*auroraJob.JobKey()) {
|
||||
exists, err := r.jobExists(*auroraJob.JobKey())
|
||||
if err != nil {
|
||||
r.logger.Print("verification failed ", err)
|
||||
}
|
||||
|
||||
if exists {
|
||||
return &aurora.Response{ResponseCode: aurora.ResponseCode_OK}, true
|
||||
}
|
||||
|
||||
|
@ -836,6 +841,7 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au
|
|||
})
|
||||
|
||||
if err != nil {
|
||||
r.logger.Print("verification failed ", err)
|
||||
return nil, false
|
||||
}
|
||||
|
||||
|
|
43
retry.go
43
retry.go
|
@ -114,7 +114,12 @@ func ExponentialBackoff(backoff Backoff, logger logger, condition ConditionFunc)
|
|||
|
||||
type auroraThriftCall func() (resp *aurora.Response, err error)
|
||||
|
||||
// Verification functions should strive to keep the number of thrift calls to one wherever possible.
|
||||
// verifyOntimeout defines the type of function that will be used to verify whether a Thirft call to the Scheduler
|
||||
// made it to the scheduler or not. In general, these types of functions will have to interact with the scheduler
|
||||
// through the very same Thrift API which previously encountered a time out from the client.
|
||||
// This means that the functions themselves should be kept to a minimum number of Thrift calls.
|
||||
// It should also be noted that this is a best effort mechanism and
|
||||
// is likely to fail for the same reasons that the original call failed.
|
||||
type verifyOnTimeout func() (*aurora.Response, bool)
|
||||
|
||||
// Duplicates the functionality of ExponentialBackoff but is specifically targeted towards ThriftCalls.
|
||||
|
@ -171,6 +176,24 @@ func (r *realisClient) thriftCallWithRetries(
|
|||
return nil, errors.Wrap(clientErr, "permanent connection error")
|
||||
}
|
||||
|
||||
// There exists a corner case where thrift payload was received by Aurora but
|
||||
// connection timed out before Aurora was able to reply.
|
||||
// Users can take special action on a timeout by using IsTimedout and reacting accordingly
|
||||
// if they have configured the client to return on a timeout.
|
||||
if timedout && returnOnTimeout {
|
||||
return resp, newTimedoutError(errors.New("client connection closed before server answer"))
|
||||
}
|
||||
|
||||
// In the future, reestablish connection should be able to check if it is actually possible
|
||||
// to make a thrift call to Aurora. For now, a reconnect should always lead to a retry.
|
||||
// Ignoring error due to the fact that an error should be retried regardless
|
||||
reestablishErr := r.ReestablishConn()
|
||||
if reestablishErr != nil {
|
||||
r.logger.debugPrintf("error re-establishing connection ", reestablishErr)
|
||||
}
|
||||
|
||||
// If users did not opt for a return on timeout in order to react to a timedout error,
|
||||
// attempt to verify that the call made it to the scheduler after the connection was re-established.
|
||||
if timedout {
|
||||
timeouts++
|
||||
r.logger.debugPrintf(
|
||||
|
@ -182,7 +205,7 @@ func (r *realisClient) thriftCallWithRetries(
|
|||
// it timed out.
|
||||
if verifyOnTimeout != nil {
|
||||
if verifyResp, ok := verifyOnTimeout(); ok {
|
||||
r.logger.debugPrint("verified that the call went through successfully")
|
||||
r.logger.Print("verified that the call went through successfully after a client timeout")
|
||||
// Response here might be different than the original as it is no longer constructed
|
||||
// by the scheduler but mimicked.
|
||||
// This is OK since the scheduler is very unlikely to change responses at this point in its
|
||||
|
@ -190,22 +213,6 @@ func (r *realisClient) thriftCallWithRetries(
|
|||
return verifyResp, nil
|
||||
}
|
||||
}
|
||||
|
||||
if returnOnTimeout {
|
||||
// There exists a corner case where thrift payload was received by Aurora but
|
||||
// connection timed out before Aurora was able to reply.
|
||||
// Users can take special action on a timeout by using IsTimedout and reacting accordingly
|
||||
// if they have configured the client to return on a timeout.
|
||||
return resp, newTimedoutError(errors.New("client connection closed before server answer"))
|
||||
}
|
||||
}
|
||||
|
||||
// In the future, reestablish connection should be able to check if it is actually possible
|
||||
// to make a thrift call to Aurora. For now, a reconnect should always lead to a retry.
|
||||
// Ignoring error due to the fact that an error should be retried regardless
|
||||
reestablishErr := r.ReestablishConn()
|
||||
if reestablishErr != nil {
|
||||
r.logger.debugPrintf("error re-establishing connection ", reestablishErr)
|
||||
}
|
||||
|
||||
// Retry the thrift payload
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue