Initial commit adding verification funciton

This commit is contained in:
Renán Del Valle 2021-04-29 19:44:56 -07:00
parent a9d99067ee
commit 9999a0834d
No known key found for this signature in database
GPG key ID: C240AD6D6F443EC9
4 changed files with 115 additions and 36 deletions

2
go.mod
View file

@ -8,5 +8,5 @@ require (
github.com/pkg/errors v0.9.1 github.com/pkg/errors v0.9.1
github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/samuel/go-zookeeper v0.0.0-20171117190445-471cd4e61d7a github.com/samuel/go-zookeeper v0.0.0-20171117190445-471cd4e61d7a
github.com/stretchr/testify v1.2.0 github.com/stretchr/testify v1.7.0
) )

View file

@ -556,7 +556,9 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states []aurora.Schedu
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.GetTasksWithoutConfigs(context.TODO(), taskQ) return r.client.GetTasksWithoutConfigs(context.TODO(), taskQ)
}) },
nil,
)
// If we encountered an error we couldn't recover from by retrying, return an error to the user // If we encountered an error we couldn't recover from by retrying, return an error to the user
if retryErr != nil { if retryErr != nil {
@ -581,7 +583,9 @@ func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.readonlyClient.GetJobUpdateSummaries(context.TODO(), jobUpdateQuery) return r.readonlyClient.GetJobUpdateSummaries(context.TODO(), jobUpdateQuery)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error getting job update summaries from Aurora Scheduler") return nil, errors.Wrap(retryErr, "error getting job update summaries from Aurora Scheduler")
@ -598,7 +602,9 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.readonlyClient.GetJobs(context.TODO(), role) return r.readonlyClient.GetJobs(context.TODO(), role)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, result, errors.Wrap(retryErr, "error getting Jobs from Aurora Scheduler") return nil, result, errors.Wrap(retryErr, "error getting Jobs from Aurora Scheduler")
@ -619,7 +625,9 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.KillTasks(context.TODO(), key, instances, "") return r.client.KillTasks(context.TODO(), key, instances, "")
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler")
@ -641,7 +649,9 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
// Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards // Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards
return r.client.KillTasks(context.TODO(), key, nil, "") return r.client.KillTasks(context.TODO(), key, nil, "")
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler")
@ -661,7 +671,9 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.CreateJob(context.TODO(), auroraJob.JobConfig()) return r.client.CreateJob(context.TODO(), auroraJob.JobConfig())
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return resp, errors.Wrap(retryErr, "error sending Create command to Aurora Scheduler") return resp, errors.Wrap(retryErr, "error sending Create command to Aurora Scheduler")
@ -700,7 +712,9 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.ScheduleCronJob(context.TODO(), auroraJob.JobConfig()) return r.client.ScheduleCronJob(context.TODO(), auroraJob.JobConfig())
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending Cron Job Schedule message to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending Cron Job Schedule message to Aurora Scheduler")
@ -716,7 +730,9 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response,
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.DescheduleCronJob(context.TODO(), key) return r.client.DescheduleCronJob(context.TODO(), key)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending Cron Job De-schedule message to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending Cron Job De-schedule message to Aurora Scheduler")
@ -734,7 +750,9 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.StartCronJob(context.TODO(), key) return r.client.StartCronJob(context.TODO(), key)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending Start Cron Job message to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending Start Cron Job message to Aurora Scheduler")
@ -751,7 +769,9 @@ func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32)
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.RestartShards(context.TODO(), key, instances) return r.client.RestartShards(context.TODO(), key, instances)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler")
@ -774,7 +794,9 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.RestartShards(context.TODO(), key, instanceIds) return r.client.RestartShards(context.TODO(), key, instanceIds)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler")
@ -795,7 +817,9 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au
true, true,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.StartJobUpdate(context.TODO(), updateJob.req, message) return r.client.StartJobUpdate(context.TODO(), updateJob.req, message)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
// A timeout took place when attempting this call, attempt to recover // A timeout took place when attempting this call, attempt to recover
@ -820,7 +844,9 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.AbortJobUpdate(context.TODO(), &updateKey, message) return r.client.AbortJobUpdate(context.TODO(), &updateKey, message)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending AbortJobUpdate command to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending AbortJobUpdate command to Aurora Scheduler")
@ -847,7 +873,9 @@ func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.PauseJobUpdate(context.TODO(), updateKey, message) return r.client.PauseJobUpdate(context.TODO(), updateKey, message)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending PauseJobUpdate command to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending PauseJobUpdate command to Aurora Scheduler")
@ -865,7 +893,9 @@ func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.ResumeJobUpdate(context.TODO(), updateKey, message) return r.client.ResumeJobUpdate(context.TODO(), updateKey, message)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending ResumeJobUpdate command to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending ResumeJobUpdate command to Aurora Scheduler")
@ -883,7 +913,9 @@ func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.PulseJobUpdate(context.TODO(), updateKey) return r.client.PulseJobUpdate(context.TODO(), updateKey)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending PulseJobUpdate command to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending PulseJobUpdate command to Aurora Scheduler")
@ -901,7 +933,9 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.AddInstances(context.TODO(), &instKey, count) return r.client.AddInstances(context.TODO(), &instKey, count)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending AddInstances command to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending AddInstances command to Aurora Scheduler")
@ -940,7 +974,9 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.Schedul
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.GetTasksStatus(context.TODO(), query) return r.client.GetTasksStatus(context.TODO(), query)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status") return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status")
@ -958,7 +994,9 @@ func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.Pend
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.GetPendingReason(context.TODO(), query) return r.client.GetPendingReason(context.TODO(), query)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for pending Reasons") return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for pending Reasons")
@ -983,7 +1021,9 @@ func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*auror
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.GetTasksWithoutConfigs(context.TODO(), query) return r.client.GetTasksWithoutConfigs(context.TODO(), query)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status without configs") return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status without configs")
@ -1009,7 +1049,9 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.GetTasksStatus(context.TODO(), taskQ) return r.client.GetTasksStatus(context.TODO(), taskQ)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task configuration") return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task configuration")
@ -1037,7 +1079,9 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.GetJobUpdateDetails(context.TODO(), &updateQuery) return r.client.GetJobUpdateDetails(context.TODO(), &updateQuery)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "unable to get job update details") return nil, errors.Wrap(retryErr, "unable to get job update details")
@ -1054,7 +1098,9 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.RollbackJobUpdate(context.TODO(), &key, message) return r.client.RollbackJobUpdate(context.TODO(), &key, message)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "unable to roll back job update") return nil, errors.Wrap(retryErr, "unable to roll back job update")

View file

@ -30,7 +30,9 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.DrainHosts(context.TODO(), drainList) return r.adminClient.DrainHosts(context.TODO(), drainList)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return resp, result, errors.Wrap(retryErr, "Unable to recover connection") return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
@ -65,7 +67,9 @@ func (r *realisClient) SLADrainHosts(
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.SlaDrainHosts(context.TODO(), drainList, policy, timeout) return r.adminClient.SlaDrainHosts(context.TODO(), drainList, policy, timeout)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return result, errors.Wrap(retryErr, "Unable to recover connection") return result, errors.Wrap(retryErr, "Unable to recover connection")
@ -95,7 +99,9 @@ func (r *realisClient) StartMaintenance(hosts ...string) (*aurora.Response, *aur
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.StartMaintenance(context.TODO(), hostList) return r.adminClient.StartMaintenance(context.TODO(), hostList)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return resp, result, errors.Wrap(retryErr, "Unable to recover connection") return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
@ -125,7 +131,9 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.EndMaintenance(context.TODO(), hostList) return r.adminClient.EndMaintenance(context.TODO(), hostList)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return resp, result, errors.Wrap(retryErr, "Unable to recover connection") return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
@ -157,7 +165,9 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.MaintenanceStatus(context.TODO(), hostList) return r.adminClient.MaintenanceStatus(context.TODO(), hostList)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return resp, result, errors.Wrap(retryErr, "Unable to recover connection") return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
@ -182,7 +192,9 @@ func (r *realisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.SetQuota(context.TODO(), role, quota) return r.adminClient.SetQuota(context.TODO(), role, quota)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return resp, errors.Wrap(retryErr, "Unable to set role quota") return resp, errors.Wrap(retryErr, "Unable to set role quota")
@ -198,7 +210,9 @@ func (r *realisClient) GetQuota(role string) (*aurora.Response, error) {
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.GetQuota(context.TODO(), role) return r.adminClient.GetQuota(context.TODO(), role)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return resp, errors.Wrap(retryErr, "Unable to get role quota") return resp, errors.Wrap(retryErr, "Unable to get role quota")
@ -213,7 +227,9 @@ func (r *realisClient) Snapshot() error {
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.Snapshot(context.TODO()) return r.adminClient.Snapshot(context.TODO())
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return errors.Wrap(retryErr, "Unable to recover connection") return errors.Wrap(retryErr, "Unable to recover connection")
@ -229,7 +245,9 @@ func (r *realisClient) PerformBackup() error {
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.PerformBackup(context.TODO()) return r.adminClient.PerformBackup(context.TODO())
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return errors.Wrap(retryErr, "Unable to recover connection") return errors.Wrap(retryErr, "Unable to recover connection")
@ -244,7 +262,9 @@ func (r *realisClient) ForceImplicitTaskReconciliation() error {
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.TriggerImplicitTaskReconciliation(context.TODO()) return r.adminClient.TriggerImplicitTaskReconciliation(context.TODO())
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return errors.Wrap(retryErr, "Unable to recover connection") return errors.Wrap(retryErr, "Unable to recover connection")
@ -265,7 +285,9 @@ func (r *realisClient) ForceExplicitTaskReconciliation(batchSize *int32) error {
_, retryErr := r.thriftCallWithRetries(false, _, retryErr := r.thriftCallWithRetries(false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.TriggerExplicitTaskReconciliation(context.TODO(), settings) return r.adminClient.TriggerExplicitTaskReconciliation(context.TODO(), settings)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return errors.Wrap(retryErr, "Unable to recover connection") return errors.Wrap(retryErr, "Unable to recover connection")

View file

@ -113,11 +113,13 @@ func ExponentialBackoff(backoff Backoff, logger logger, condition ConditionFunc)
} }
type auroraThriftCall func() (resp *aurora.Response, err error) type auroraThriftCall func() (resp *aurora.Response, err error)
type verifyOnTimeout func() (*aurora.Response, bool)
// Duplicates the functionality of ExponentialBackoff but is specifically targeted towards ThriftCalls. // Duplicates the functionality of ExponentialBackoff but is specifically targeted towards ThriftCalls.
func (r *realisClient) thriftCallWithRetries( func (r *realisClient) thriftCallWithRetries(
returnOnTimeout bool, returnOnTimeout bool,
thriftCall auroraThriftCall) (*aurora.Response, error) { thriftCall auroraThriftCall,
verifyOnTimeout verifyOnTimeout) (*aurora.Response, error) {
var resp *aurora.Response var resp *aurora.Response
var clientErr error var clientErr error
@ -191,6 +193,15 @@ func (r *realisClient) thriftCallWithRetries(
if returnOnTimeout { if returnOnTimeout {
return resp, newTimedoutError(errors.New("client connection closed before server answer")) return resp, newTimedoutError(errors.New("client connection closed before server answer"))
} }
// Allow caller to provide a function which checks if the original call was successful before
// it timed out.
if verifyOnTimeout != nil {
resp, ok := verifyOnTimeout()
if ok {
return resp, nil
}
}
} }
} }
} }