Adding more fine grained controls to retry mechanism. Retry mechanism may now be configured to not retry if an error is hit or to specifically stop retrying if a timeout error is encountered.
This commit is contained in:
parent
fe4a0dc06e
commit
55cf9bcb70
2 changed files with 102 additions and 75 deletions
142
realis.go
142
realis.go
|
@ -16,6 +16,7 @@
|
|||
package realis
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"encoding/base64"
|
||||
|
@ -66,6 +67,7 @@ type clientConfig struct {
|
|||
debug bool
|
||||
trace bool
|
||||
zkOptions []ZKOpt
|
||||
failOnPermanentErrors bool
|
||||
}
|
||||
|
||||
var defaultBackoff = Backoff{
|
||||
|
@ -189,6 +191,14 @@ func Trace() ClientOption {
|
|||
}
|
||||
}
|
||||
|
||||
// FailOnPermanentErrors - If the client encounters a connection error the standard library
|
||||
// considers permanent, stop retrying and return an error to the user.
|
||||
func FailOnPermanentErrors() ClientOption {
|
||||
return func(config *clientConfig) {
|
||||
config.failOnPermanentErrors = true
|
||||
}
|
||||
}
|
||||
|
||||
func newTJSONTransport(url string, timeout time.Duration, config *clientConfig) (thrift.TTransport, error) {
|
||||
trans, err := defaultTTransport(url, timeout, config)
|
||||
if err != nil {
|
||||
|
@ -442,8 +452,8 @@ func (c *Client) GetInstanceIds(key aurora.JobKey, states []aurora.ScheduleStatu
|
|||
|
||||
c.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", taskQ)
|
||||
|
||||
resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return c.client.GetTasksWithoutConfigs(nil, taskQ)
|
||||
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.client.GetTasksWithoutConfigs(context.TODO(), taskQ)
|
||||
})
|
||||
|
||||
// If we encountered an error we couldn't recover from by retrying, return an error to the user
|
||||
|
@ -464,8 +474,8 @@ func (c *Client) GetInstanceIds(key aurora.JobKey, states []aurora.ScheduleStatu
|
|||
func (c *Client) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.GetJobUpdateSummariesResult_, error) {
|
||||
c.logger.DebugPrintf("GetJobUpdateSummaries Thrift Payload: %+v\n", jobUpdateQuery)
|
||||
|
||||
resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return c.readonlyClient.GetJobUpdateSummaries(nil, jobUpdateQuery)
|
||||
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.readonlyClient.GetJobUpdateSummaries(context.TODO(), jobUpdateQuery)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -479,8 +489,8 @@ func (c *Client) GetJobs(role string) (*aurora.GetJobsResult_, error) {
|
|||
|
||||
var result *aurora.GetJobsResult_
|
||||
|
||||
resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return c.readonlyClient.GetJobs(nil, role)
|
||||
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.readonlyClient.GetJobs(context.TODO(), role)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -499,8 +509,8 @@ func (c *Client) GetJobs(role string) (*aurora.GetJobsResult_, error) {
|
|||
func (c *Client) KillInstances(key aurora.JobKey, instances ...int32) (bool, error) {
|
||||
c.logger.DebugPrintf("KillTasks Thrift Payload: %+v %v\n", key, instances)
|
||||
|
||||
resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return c.client.KillTasks(nil, &key, instances, "")
|
||||
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.client.KillTasks(context.TODO(), &key, instances, "")
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -525,9 +535,9 @@ func (c *Client) KillJob(key aurora.JobKey) error {
|
|||
|
||||
c.logger.DebugPrintf("KillTasks Thrift Payload: %+v\n", key)
|
||||
|
||||
_, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
// Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards
|
||||
return c.client.KillTasks(nil, &key, nil, "")
|
||||
return c.client.KillTasks(context.TODO(), &key, nil, "")
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -550,8 +560,8 @@ func (c *Client) CreateJob(auroraJob *AuroraJob) error {
|
|||
return errors.Wrap(err, "unable to create Thermos payload")
|
||||
}
|
||||
|
||||
_, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return c.client.CreateJob(nil, auroraJob.JobConfig())
|
||||
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.client.CreateJob(context.TODO(), auroraJob.JobConfig())
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -581,8 +591,8 @@ func (c *Client) ScheduleCronJob(auroraJob *AuroraJob) error {
|
|||
return errors.Wrap(err, "Unable to create Thermos payload")
|
||||
}
|
||||
|
||||
_, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return c.client.ScheduleCronJob(nil, auroraJob.JobConfig())
|
||||
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.client.ScheduleCronJob(context.TODO(), auroraJob.JobConfig())
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -595,8 +605,8 @@ func (c *Client) DescheduleCronJob(key aurora.JobKey) error {
|
|||
|
||||
c.logger.DebugPrintf("DescheduleCronJob Thrift Payload: %+v\n", key)
|
||||
|
||||
_, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return c.client.DescheduleCronJob(nil, &key)
|
||||
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.client.DescheduleCronJob(context.TODO(), &key)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -611,8 +621,8 @@ func (c *Client) StartCronJob(key aurora.JobKey) error {
|
|||
|
||||
c.logger.DebugPrintf("StartCronJob Thrift Payload: %+v\n", key)
|
||||
|
||||
_, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return c.client.StartCronJob(nil, &key)
|
||||
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.client.StartCronJob(context.TODO(), &key)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -626,8 +636,8 @@ func (c *Client) StartCronJob(key aurora.JobKey) error {
|
|||
func (c *Client) RestartInstances(key aurora.JobKey, instances ...int32) error {
|
||||
c.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instances)
|
||||
|
||||
_, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return c.client.RestartShards(nil, &key, instances)
|
||||
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.client.RestartShards(context.TODO(), &key, instances)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -647,8 +657,8 @@ func (c *Client) RestartJob(key aurora.JobKey) error {
|
|||
c.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instanceIds)
|
||||
|
||||
if len(instanceIds) > 0 {
|
||||
_, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return c.client.RestartShards(nil, &key, instanceIds)
|
||||
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.client.RestartShards(context.TODO(), &key, instanceIds)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -670,7 +680,7 @@ func (c *Client) StartJobUpdate(updateJob *JobUpdate, message string) (*aurora.S
|
|||
|
||||
c.logger.DebugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message)
|
||||
|
||||
resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.client.StartJobUpdate(nil, updateJob.request, message)
|
||||
})
|
||||
|
||||
|
@ -690,8 +700,8 @@ func (c *Client) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) e
|
|||
|
||||
c.logger.DebugPrintf("AbortJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
|
||||
|
||||
_, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return c.client.AbortJobUpdate(nil, &updateKey, message)
|
||||
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.client.AbortJobUpdate(context.TODO(), &updateKey, message)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -715,7 +725,7 @@ func (c *Client) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string)
|
|||
ID: updateKey.GetID(),
|
||||
}
|
||||
|
||||
_, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.client.PauseJobUpdate(nil, updateKeyLocal, message)
|
||||
})
|
||||
|
||||
|
@ -737,8 +747,8 @@ func (c *Client) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message string)
|
|||
|
||||
c.logger.DebugPrintf("ResumeJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
|
||||
|
||||
_, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return c.client.ResumeJobUpdate(nil, updateKey, message)
|
||||
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.client.ResumeJobUpdate(context.TODO(), updateKey, message)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -753,8 +763,8 @@ func (c *Client) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (aurora.JobUpdat
|
|||
|
||||
c.logger.DebugPrintf("PulseJobUpdate Thrift Payload: %+v\n", updateKey)
|
||||
|
||||
resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return c.client.PulseJobUpdate(nil, updateKey)
|
||||
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.client.PulseJobUpdate(context.TODO(), updateKey)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -775,8 +785,8 @@ func (c *Client) AddInstances(instKey aurora.InstanceKey, count int32) error {
|
|||
|
||||
c.logger.DebugPrintf("AddInstances Thrift Payload: %+v %v\n", instKey, count)
|
||||
|
||||
_, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return c.client.AddInstances(nil, &instKey, count)
|
||||
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.client.AddInstances(context.TODO(), &instKey, count)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -820,8 +830,8 @@ func (c *Client) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask
|
|||
|
||||
c.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", query)
|
||||
|
||||
resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return c.client.GetTasksStatus(nil, query)
|
||||
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.client.GetTasksStatus(context.TODO(), query)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -836,8 +846,8 @@ func (c *Client) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.PendingRea
|
|||
|
||||
c.logger.DebugPrintf("GetPendingReason Thrift Payload: %+v\n", query)
|
||||
|
||||
resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return c.client.GetPendingReason(nil, query)
|
||||
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.client.GetPendingReason(context.TODO(), query)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -858,8 +868,8 @@ func (c *Client) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*aurora.Sche
|
|||
|
||||
c.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query)
|
||||
|
||||
resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return c.client.GetTasksWithoutConfigs(nil, query)
|
||||
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.client.GetTasksWithoutConfigs(context.TODO(), query)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -885,8 +895,8 @@ func (c *Client) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig
|
|||
|
||||
c.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", taskQ)
|
||||
|
||||
resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return c.client.GetTasksStatus(nil, taskQ)
|
||||
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.client.GetTasksStatus(context.TODO(), taskQ)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -911,8 +921,8 @@ func (c *Client) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) ([]*aurora.
|
|||
|
||||
c.logger.DebugPrintf("GetJobUpdateDetails Thrift Payload: %+v\n", updateQuery)
|
||||
|
||||
resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return c.client.GetJobUpdateDetails(nil, &updateQuery)
|
||||
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.client.GetJobUpdateDetails(context.TODO(), &updateQuery)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -930,8 +940,8 @@ func (c *Client) RollbackJobUpdate(key aurora.JobUpdateKey, message string) erro
|
|||
|
||||
c.logger.DebugPrintf("RollbackJobUpdate Thrift Payload: %+v %v\n", key, message)
|
||||
|
||||
_, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return c.client.RollbackJobUpdate(nil, &key, message)
|
||||
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.client.RollbackJobUpdate(context.TODO(), &key, message)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -958,8 +968,8 @@ func (c *Client) DrainHosts(hosts ...string) ([]*aurora.HostStatus, error) {
|
|||
|
||||
c.logger.DebugPrintf("DrainHosts Thrift Payload: %v\n", drainList)
|
||||
|
||||
resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return c.adminClient.DrainHosts(nil, drainList)
|
||||
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.adminClient.DrainHosts(context.TODO(), drainList)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -987,8 +997,8 @@ func (c *Client) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, hosts ..
|
|||
|
||||
c.logger.DebugPrintf("SLADrainHosts Thrift Payload: %v\n", drainList)
|
||||
|
||||
resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return c.adminClient.SlaDrainHosts(nil, drainList, policy, timeout)
|
||||
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.adminClient.SlaDrainHosts(context.TODO(), drainList, policy, timeout)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -1013,8 +1023,8 @@ func (c *Client) StartMaintenance(hosts ...string) ([]*aurora.HostStatus, error)
|
|||
|
||||
c.logger.DebugPrintf("StartMaintenance Thrift Payload: %v\n", hostList)
|
||||
|
||||
resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return c.adminClient.StartMaintenance(nil, hostList)
|
||||
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.adminClient.StartMaintenance(context.TODO(), hostList)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -1039,8 +1049,8 @@ func (c *Client) EndMaintenance(hosts ...string) ([]*aurora.HostStatus, error) {
|
|||
|
||||
c.logger.DebugPrintf("EndMaintenance Thrift Payload: %v\n", hostList)
|
||||
|
||||
resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return c.adminClient.EndMaintenance(nil, hostList)
|
||||
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.adminClient.EndMaintenance(context.TODO(), hostList)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -1070,8 +1080,8 @@ func (c *Client) MaintenanceStatus(hosts ...string) (*aurora.MaintenanceStatusRe
|
|||
|
||||
// Make thrift call. If we encounter an error sending the call, attempt to reconnect
|
||||
// and continue trying to resend command until we run out of retries.
|
||||
resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return c.adminClient.MaintenanceStatus(nil, hostList)
|
||||
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.adminClient.MaintenanceStatus(context.TODO(), hostList)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -1098,8 +1108,8 @@ func (c *Client) SetQuota(role string, cpu *float64, ramMb *int64, diskMb *int64
|
|||
quota := aurora.NewResourceAggregate()
|
||||
quota.Resources = []*aurora.Resource{ramResource, cpuResource, diskResource}
|
||||
|
||||
_, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return c.adminClient.SetQuota(nil, role, quota)
|
||||
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.adminClient.SetQuota(context.TODO(), role, quota)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -1112,8 +1122,8 @@ func (c *Client) SetQuota(role string, cpu *float64, ramMb *int64, diskMb *int64
|
|||
// GetQuota returns the resource aggregate for the given role
|
||||
func (c *Client) GetQuota(role string) (*aurora.GetQuotaResult_, error) {
|
||||
|
||||
resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return c.adminClient.GetQuota(nil, role)
|
||||
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.adminClient.GetQuota(context.TODO(), role)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -1130,8 +1140,8 @@ func (c *Client) GetQuota(role string) (*aurora.GetQuotaResult_, error) {
|
|||
// Force Aurora Scheduler to perform a snapshot and write to Mesos log
|
||||
func (c *Client) Snapshot() error {
|
||||
|
||||
_, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return c.adminClient.Snapshot(nil)
|
||||
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.adminClient.Snapshot(context.TODO())
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -1144,8 +1154,8 @@ func (c *Client) Snapshot() error {
|
|||
// Force Aurora Scheduler to write backup file to a file in the backup directory
|
||||
func (c *Client) PerformBackup() error {
|
||||
|
||||
_, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return c.adminClient.PerformBackup(nil)
|
||||
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.adminClient.PerformBackup(context.TODO())
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -1158,8 +1168,8 @@ func (c *Client) PerformBackup() error {
|
|||
// Force an Implicit reconciliation between Mesos and Aurora
|
||||
func (c *Client) ForceImplicitTaskReconciliation() error {
|
||||
|
||||
_, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return c.adminClient.TriggerImplicitTaskReconciliation(nil)
|
||||
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.adminClient.TriggerImplicitTaskReconciliation(context.TODO())
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -1179,8 +1189,8 @@ func (c *Client) ForceExplicitTaskReconciliation(batchSize *int32) error {
|
|||
|
||||
settings.BatchSize = batchSize
|
||||
|
||||
_, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return c.adminClient.TriggerExplicitTaskReconciliation(nil, settings)
|
||||
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.adminClient.TriggerExplicitTaskReconciliation(context.TODO(), settings)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue