diff --git a/realis.go b/realis.go index 554a92a..a6925c9 100644 --- a/realis.go +++ b/realis.go @@ -16,6 +16,7 @@ package realis import ( + "context" "crypto/tls" "crypto/x509" "encoding/base64" @@ -66,6 +67,7 @@ type clientConfig struct { debug bool trace bool zkOptions []ZKOpt + failOnPermanentErrors bool } var defaultBackoff = Backoff{ @@ -189,6 +191,14 @@ func Trace() ClientOption { } } +// FailOnPermanentErrors - If the client encounters a connection error the standard library +// considers permanent, stop retrying and return an error to the user. +func FailOnPermanentErrors() ClientOption { + return func(config *clientConfig) { + config.failOnPermanentErrors = true + } +} + func newTJSONTransport(url string, timeout time.Duration, config *clientConfig) (thrift.TTransport, error) { trans, err := defaultTTransport(url, timeout, config) if err != nil { @@ -442,8 +452,8 @@ func (c *Client) GetInstanceIds(key aurora.JobKey, states []aurora.ScheduleStatu c.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", taskQ) - resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { - return c.client.GetTasksWithoutConfigs(nil, taskQ) + resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.client.GetTasksWithoutConfigs(context.TODO(), taskQ) }) // If we encountered an error we couldn't recover from by retrying, return an error to the user @@ -464,8 +474,8 @@ func (c *Client) GetInstanceIds(key aurora.JobKey, states []aurora.ScheduleStatu func (c *Client) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.GetJobUpdateSummariesResult_, error) { c.logger.DebugPrintf("GetJobUpdateSummaries Thrift Payload: %+v\n", jobUpdateQuery) - resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { - return c.readonlyClient.GetJobUpdateSummaries(nil, jobUpdateQuery) + resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.readonlyClient.GetJobUpdateSummaries(context.TODO(), jobUpdateQuery) }) if retryErr != nil { @@ -479,8 +489,8 @@ func (c *Client) GetJobs(role string) (*aurora.GetJobsResult_, error) { var result *aurora.GetJobsResult_ - resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { - return c.readonlyClient.GetJobs(nil, role) + resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.readonlyClient.GetJobs(context.TODO(), role) }) if retryErr != nil { @@ -499,8 +509,8 @@ func (c *Client) GetJobs(role string) (*aurora.GetJobsResult_, error) { func (c *Client) KillInstances(key aurora.JobKey, instances ...int32) (bool, error) { c.logger.DebugPrintf("KillTasks Thrift Payload: %+v %v\n", key, instances) - resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { - return c.client.KillTasks(nil, &key, instances, "") + resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.client.KillTasks(context.TODO(), &key, instances, "") }) if retryErr != nil { @@ -525,9 +535,9 @@ func (c *Client) KillJob(key aurora.JobKey) error { c.logger.DebugPrintf("KillTasks Thrift Payload: %+v\n", key) - _, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { + _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { // Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards - return c.client.KillTasks(nil, &key, nil, "") + return c.client.KillTasks(context.TODO(), &key, nil, "") }) if retryErr != nil { @@ -550,8 +560,8 @@ func (c *Client) CreateJob(auroraJob *AuroraJob) error { return errors.Wrap(err, "unable to create Thermos payload") } - _, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { - return c.client.CreateJob(nil, auroraJob.JobConfig()) + _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.client.CreateJob(context.TODO(), auroraJob.JobConfig()) }) if retryErr != nil { @@ -581,8 +591,8 @@ func (c *Client) ScheduleCronJob(auroraJob *AuroraJob) error { return errors.Wrap(err, "Unable to create Thermos payload") } - _, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { - return c.client.ScheduleCronJob(nil, auroraJob.JobConfig()) + _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.client.ScheduleCronJob(context.TODO(), auroraJob.JobConfig()) }) if retryErr != nil { @@ -595,8 +605,8 @@ func (c *Client) DescheduleCronJob(key aurora.JobKey) error { c.logger.DebugPrintf("DescheduleCronJob Thrift Payload: %+v\n", key) - _, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { - return c.client.DescheduleCronJob(nil, &key) + _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.client.DescheduleCronJob(context.TODO(), &key) }) if retryErr != nil { @@ -611,8 +621,8 @@ func (c *Client) StartCronJob(key aurora.JobKey) error { c.logger.DebugPrintf("StartCronJob Thrift Payload: %+v\n", key) - _, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { - return c.client.StartCronJob(nil, &key) + _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.client.StartCronJob(context.TODO(), &key) }) if retryErr != nil { @@ -626,8 +636,8 @@ func (c *Client) StartCronJob(key aurora.JobKey) error { func (c *Client) RestartInstances(key aurora.JobKey, instances ...int32) error { c.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instances) - _, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { - return c.client.RestartShards(nil, &key, instances) + _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.client.RestartShards(context.TODO(), &key, instances) }) if retryErr != nil { @@ -647,8 +657,8 @@ func (c *Client) RestartJob(key aurora.JobKey) error { c.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instanceIds) if len(instanceIds) > 0 { - _, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { - return c.client.RestartShards(nil, &key, instanceIds) + _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.client.RestartShards(context.TODO(), &key, instanceIds) }) if retryErr != nil { @@ -670,7 +680,7 @@ func (c *Client) StartJobUpdate(updateJob *JobUpdate, message string) (*aurora.S c.logger.DebugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message) - resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { + resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.StartJobUpdate(nil, updateJob.request, message) }) @@ -690,8 +700,8 @@ func (c *Client) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) e c.logger.DebugPrintf("AbortJobUpdate Thrift Payload: %+v %v\n", updateKey, message) - _, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { - return c.client.AbortJobUpdate(nil, &updateKey, message) + _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.client.AbortJobUpdate(context.TODO(), &updateKey, message) }) if retryErr != nil { @@ -715,7 +725,7 @@ func (c *Client) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) ID: updateKey.GetID(), } - _, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { + _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.PauseJobUpdate(nil, updateKeyLocal, message) }) @@ -737,8 +747,8 @@ func (c *Client) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message string) c.logger.DebugPrintf("ResumeJobUpdate Thrift Payload: %+v %v\n", updateKey, message) - _, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { - return c.client.ResumeJobUpdate(nil, updateKey, message) + _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.client.ResumeJobUpdate(context.TODO(), updateKey, message) }) if retryErr != nil { @@ -753,8 +763,8 @@ func (c *Client) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (aurora.JobUpdat c.logger.DebugPrintf("PulseJobUpdate Thrift Payload: %+v\n", updateKey) - resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { - return c.client.PulseJobUpdate(nil, updateKey) + resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.client.PulseJobUpdate(context.TODO(), updateKey) }) if retryErr != nil { @@ -775,8 +785,8 @@ func (c *Client) AddInstances(instKey aurora.InstanceKey, count int32) error { c.logger.DebugPrintf("AddInstances Thrift Payload: %+v %v\n", instKey, count) - _, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { - return c.client.AddInstances(nil, &instKey, count) + _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.client.AddInstances(context.TODO(), &instKey, count) }) if retryErr != nil { @@ -820,8 +830,8 @@ func (c *Client) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask c.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", query) - resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { - return c.client.GetTasksStatus(nil, query) + resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.client.GetTasksStatus(context.TODO(), query) }) if retryErr != nil { @@ -836,8 +846,8 @@ func (c *Client) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.PendingRea c.logger.DebugPrintf("GetPendingReason Thrift Payload: %+v\n", query) - resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { - return c.client.GetPendingReason(nil, query) + resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.client.GetPendingReason(context.TODO(), query) }) if retryErr != nil { @@ -858,8 +868,8 @@ func (c *Client) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*aurora.Sche c.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query) - resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { - return c.client.GetTasksWithoutConfigs(nil, query) + resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.client.GetTasksWithoutConfigs(context.TODO(), query) }) if retryErr != nil { @@ -885,8 +895,8 @@ func (c *Client) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig c.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", taskQ) - resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { - return c.client.GetTasksStatus(nil, taskQ) + resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.client.GetTasksStatus(context.TODO(), taskQ) }) if retryErr != nil { @@ -911,8 +921,8 @@ func (c *Client) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) ([]*aurora. c.logger.DebugPrintf("GetJobUpdateDetails Thrift Payload: %+v\n", updateQuery) - resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { - return c.client.GetJobUpdateDetails(nil, &updateQuery) + resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.client.GetJobUpdateDetails(context.TODO(), &updateQuery) }) if retryErr != nil { @@ -930,8 +940,8 @@ func (c *Client) RollbackJobUpdate(key aurora.JobUpdateKey, message string) erro c.logger.DebugPrintf("RollbackJobUpdate Thrift Payload: %+v %v\n", key, message) - _, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { - return c.client.RollbackJobUpdate(nil, &key, message) + _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.client.RollbackJobUpdate(context.TODO(), &key, message) }) if retryErr != nil { @@ -958,8 +968,8 @@ func (c *Client) DrainHosts(hosts ...string) ([]*aurora.HostStatus, error) { c.logger.DebugPrintf("DrainHosts Thrift Payload: %v\n", drainList) - resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { - return c.adminClient.DrainHosts(nil, drainList) + resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.adminClient.DrainHosts(context.TODO(), drainList) }) if retryErr != nil { @@ -987,8 +997,8 @@ func (c *Client) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, hosts .. c.logger.DebugPrintf("SLADrainHosts Thrift Payload: %v\n", drainList) - resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { - return c.adminClient.SlaDrainHosts(nil, drainList, policy, timeout) + resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.adminClient.SlaDrainHosts(context.TODO(), drainList, policy, timeout) }) if retryErr != nil { @@ -1013,8 +1023,8 @@ func (c *Client) StartMaintenance(hosts ...string) ([]*aurora.HostStatus, error) c.logger.DebugPrintf("StartMaintenance Thrift Payload: %v\n", hostList) - resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { - return c.adminClient.StartMaintenance(nil, hostList) + resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.adminClient.StartMaintenance(context.TODO(), hostList) }) if retryErr != nil { @@ -1039,8 +1049,8 @@ func (c *Client) EndMaintenance(hosts ...string) ([]*aurora.HostStatus, error) { c.logger.DebugPrintf("EndMaintenance Thrift Payload: %v\n", hostList) - resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { - return c.adminClient.EndMaintenance(nil, hostList) + resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.adminClient.EndMaintenance(context.TODO(), hostList) }) if retryErr != nil { @@ -1070,8 +1080,8 @@ func (c *Client) MaintenanceStatus(hosts ...string) (*aurora.MaintenanceStatusRe // Make thrift call. If we encounter an error sending the call, attempt to reconnect // and continue trying to resend command until we run out of retries. - resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { - return c.adminClient.MaintenanceStatus(nil, hostList) + resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.adminClient.MaintenanceStatus(context.TODO(), hostList) }) if retryErr != nil { @@ -1098,8 +1108,8 @@ func (c *Client) SetQuota(role string, cpu *float64, ramMb *int64, diskMb *int64 quota := aurora.NewResourceAggregate() quota.Resources = []*aurora.Resource{ramResource, cpuResource, diskResource} - _, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { - return c.adminClient.SetQuota(nil, role, quota) + _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.adminClient.SetQuota(context.TODO(), role, quota) }) if retryErr != nil { @@ -1112,8 +1122,8 @@ func (c *Client) SetQuota(role string, cpu *float64, ramMb *int64, diskMb *int64 // GetQuota returns the resource aggregate for the given role func (c *Client) GetQuota(role string) (*aurora.GetQuotaResult_, error) { - resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { - return c.adminClient.GetQuota(nil, role) + resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.adminClient.GetQuota(context.TODO(), role) }) if retryErr != nil { @@ -1130,8 +1140,8 @@ func (c *Client) GetQuota(role string) (*aurora.GetQuotaResult_, error) { // Force Aurora Scheduler to perform a snapshot and write to Mesos log func (c *Client) Snapshot() error { - _, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { - return c.adminClient.Snapshot(nil) + _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.adminClient.Snapshot(context.TODO()) }) if retryErr != nil { @@ -1144,8 +1154,8 @@ func (c *Client) Snapshot() error { // Force Aurora Scheduler to write backup file to a file in the backup directory func (c *Client) PerformBackup() error { - _, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { - return c.adminClient.PerformBackup(nil) + _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.adminClient.PerformBackup(context.TODO()) }) if retryErr != nil { @@ -1158,8 +1168,8 @@ func (c *Client) PerformBackup() error { // Force an Implicit reconciliation between Mesos and Aurora func (c *Client) ForceImplicitTaskReconciliation() error { - _, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { - return c.adminClient.TriggerImplicitTaskReconciliation(nil) + _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.adminClient.TriggerImplicitTaskReconciliation(context.TODO()) }) if retryErr != nil { @@ -1179,8 +1189,8 @@ func (c *Client) ForceExplicitTaskReconciliation(batchSize *int32) error { settings.BatchSize = batchSize - _, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { - return c.adminClient.TriggerExplicitTaskReconciliation(nil, settings) + _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.adminClient.TriggerExplicitTaskReconciliation(context.TODO(), settings) }) if retryErr != nil { diff --git a/retry.go b/retry.go index 508df41..4a82431 100644 --- a/retry.go +++ b/retry.go @@ -69,6 +69,7 @@ func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc) var err error var ok bool var curStep int + duration := backoff.Duration for curStep = 0; curStep < backoff.Steps; curStep++ { @@ -80,7 +81,7 @@ func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc) adjusted = Jitter(duration, backoff.Jitter) } - logger.Printf("A retriable error occurred during function call, backing off for %v before retrying\n", adjusted) + logger.Printf("A retryable error occurred during function call, backing off for %v before retrying\n", adjusted) time.Sleep(adjusted) duration = time.Duration(float64(duration) * backoff.Factor) } @@ -119,10 +120,11 @@ func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc) type auroraThriftCall func() (resp *aurora.Response, err error) // Duplicates the functionality of ExponentialBackoff but is specifically targeted towards ThriftCalls. -func (c *Client) thriftCallWithRetries(thriftCall auroraThriftCall) (*aurora.Response, error) { +func (c *Client) thriftCallWithRetries(returnOnTimeout bool, thriftCall auroraThriftCall) (*aurora.Response, error) { var resp *aurora.Response var clientErr error var curStep int + var timeouts int backoff := c.config.backoff duration := backoff.Duration @@ -136,7 +138,7 @@ func (c *Client) thriftCallWithRetries(thriftCall auroraThriftCall) (*aurora.Res adjusted = Jitter(duration, backoff.Jitter) } - c.logger.Printf("A retriable error occurred during thrift call, backing off for %v before retry %v\n", adjusted, curStep) + c.logger.Printf("A retryable error occurred during thrift call, backing off for %v before retry %v\n", adjusted, curStep) time.Sleep(adjusted) duration = time.Duration(float64(duration) * backoff.Factor) @@ -154,7 +156,7 @@ func (c *Client) thriftCallWithRetries(thriftCall auroraThriftCall) (*aurora.Res c.logger.TracePrintf("Aurora Thrift Call ended resp: %v clientErr: %v\n", resp, clientErr) }() - // Check if our thrift call is returning an error. This is a retriable event as we don't know + // Check if our thrift call is returning an error. This is a retryable event as we don't know // if it was caused by network issues. if clientErr != nil { @@ -177,22 +179,37 @@ func (c *Client) thriftCallWithRetries(thriftCall auroraThriftCall) (*aurora.Res // EOF error occurs when the server closes the read buffer of the client. This is common // when the server is overloaded and should be retried. All other errors that are permanent // will not be retried. - if e.Err != io.EOF && !e.Temporary() { - return nil, errors.Wrap(clientErr, "Permanent connection error") + if e.Err != io.EOF && !e.Temporary() && c.RealisConfig().failOnPermanentErrors { + return nil, errors.Wrap(clientErr, "permanent connection error") + } + // Corner case where thrift payload was received by Aurora but connection timedout before Aurora was + // able to reply. In this case we will return whatever response was received and a TimedOut behaving + // error. Users can take special action on a timeout by using IsTimedout and reacting accordingly. + if e.Timeout() { + timeouts++ + c.logger.DebugPrintf( + "Client closed connection (timedout) %d times before server responded,"+ + " consider increasing connection timeout", + timeouts) + if returnOnTimeout { + return resp, + newTimedoutError(errors.New("client connection closed before server answer")) + } } } } // In the future, reestablish connection should be able to check if it is actually possible // to make a thrift call to Aurora. For now, a reconnect should always lead to a retry. - c.ReestablishConn() + // Ignoring error due to the fact that an error should be retried regardless + _ = c.ReestablishConn() } else { // If there was no client error, but the response is nil, something went wrong. // Ideally, we'll never encounter this but we're placing a safeguard here. if resp == nil { - return nil, errors.New("Response from aurora is nil") + return nil, errors.New("response from aurora is nil") } // Check Response Code from thrift and make a decision to continue retrying or not. @@ -219,7 +236,7 @@ func (c *Client) thriftCallWithRetries(thriftCall auroraThriftCall) (*aurora.Res // It is currently not used as a response in the scheduler so it is unknown how to handle it. default: c.logger.DebugPrintf("unhandled response code %v received from Aurora\n", responseCode) - return nil, errors.Errorf("unhandled response code from Aurora %v\n", responseCode.String()) + return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String()) } }