* Errors have been refactored.
* ZK retries have been cleaned up. We will now retry after every error EXCEPT when we have a badly formed path. * ZK library has been reworked with optional arguments pattern to not be so intertwined with the cluster.json file. * Timeout error has been re-implemented as RetryError. RetryError behaves like a Timeout error but is used exclusively to add more context privately. This allows us to have unit tests that check our retry mechanism is actually retrying. * Additional logging has been added to retry mechanisms as well as to the Zookeeper library we use.
This commit is contained in:
parent
dc327bebad
commit
3d62df1684
5 changed files with 211 additions and 91 deletions
41
errors.go
41
errors.go
|
@ -17,27 +17,50 @@ package realis
|
||||||
// Using a pattern described by Dave Cheney to differentiate errors
|
// Using a pattern described by Dave Cheney to differentiate errors
|
||||||
// https://dave.cheney.net/2016/04/27/dont-just-check-errors-handle-them-gracefully
|
// https://dave.cheney.net/2016/04/27/dont-just-check-errors-handle-them-gracefully
|
||||||
|
|
||||||
// Timeout errors are returned when a function has unsuccessfully retried.
|
// Timeout errors are returned when a function is unable to continue executing due
|
||||||
|
// to a time constraint or meeting a set number of retries.
|
||||||
type timeout interface {
|
type timeout interface {
|
||||||
Timeout() bool
|
Timedout() bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func IsTimeout(err error) bool {
|
func IsTimeout(err error) bool {
|
||||||
temp, ok := err.(timeout)
|
temp, ok := err.(timeout)
|
||||||
return ok && temp.Timeout()
|
return ok && temp.Timedout()
|
||||||
}
|
}
|
||||||
|
|
||||||
type TimeoutErr struct {
|
// retryErr is a superset of timeout which includes extra context
|
||||||
|
// with regards to our retry mechanism. This is done in order to make sure
|
||||||
|
// that our retry mechanism works as expected through our tests and should
|
||||||
|
// never be relied on or used directly. It is not made part of the public API
|
||||||
|
// on purpose.
|
||||||
|
type retryErr struct {
|
||||||
error
|
error
|
||||||
timeout bool
|
timedout bool
|
||||||
|
retryCount int // How many times did the mechanism retry the command
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TimeoutErr) Timeout() bool {
|
// Retry error is a timeout type error with added context.
|
||||||
return t.timeout
|
func (r *retryErr) Timedout() bool {
|
||||||
|
return r.timedout
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTimeoutError(err error) *TimeoutErr {
|
func (r *retryErr) RetryCount() int {
|
||||||
return &TimeoutErr{error: err, timeout: true}
|
return r.retryCount
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper function for testing verification to avoid whitebox testing
|
||||||
|
// as well as keeping retryErr as a private.
|
||||||
|
// Should NOT be used under any other context.
|
||||||
|
func ToRetryCount(err error) *retryErr {
|
||||||
|
if retryErr, ok := err.(*retryErr); ok {
|
||||||
|
return retryErr
|
||||||
|
} else {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newRetryError(err error, retryCount int) *retryErr {
|
||||||
|
return &retryErr{error: err, timedout: true, retryCount: retryCount}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Temporary errors indicate that the action may and should be retried.
|
// Temporary errors indicate that the action may and should be retried.
|
||||||
|
|
50
realis.go
50
realis.go
|
@ -456,7 +456,7 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche
|
||||||
Statuses: states,
|
Statuses: states,
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
return r.client.GetTasksWithoutConfigs(taskQ)
|
return r.client.GetTasksWithoutConfigs(taskQ)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -476,7 +476,7 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error) {
|
func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error) {
|
||||||
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
return r.readonlyClient.GetJobUpdateSummaries(jobUpdateQuery)
|
return r.readonlyClient.GetJobUpdateSummaries(jobUpdateQuery)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -491,7 +491,7 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe
|
||||||
|
|
||||||
var result *aurora.GetJobsResult_
|
var result *aurora.GetJobsResult_
|
||||||
|
|
||||||
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
return r.readonlyClient.GetJobs(role)
|
return r.readonlyClient.GetJobs(role)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -515,7 +515,7 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a
|
||||||
instanceIds[instId] = true
|
instanceIds[instId] = true
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
return r.client.KillTasks(key, instanceIds, "")
|
return r.client.KillTasks(key, instanceIds, "")
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -532,7 +532,7 @@ func (r *realisClient) RealisConfig() *RealisConfig {
|
||||||
// Sends a kill message to the scheduler for all active tasks under a job.
|
// Sends a kill message to the scheduler for all active tasks under a job.
|
||||||
func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
|
func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
|
||||||
|
|
||||||
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
// Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards
|
// Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards
|
||||||
return r.client.KillTasks(key, nil, "")
|
return r.client.KillTasks(key, nil, "")
|
||||||
})
|
})
|
||||||
|
@ -549,7 +549,7 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
|
||||||
// Use this API to create ad-hoc jobs.
|
// Use this API to create ad-hoc jobs.
|
||||||
func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
|
func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
|
||||||
|
|
||||||
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
return r.client.CreateJob(auroraJob.JobConfig())
|
return r.client.CreateJob(auroraJob.JobConfig())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -580,7 +580,7 @@ func (r *realisClient) CreateService(auroraJob Job, settings *aurora.JobUpdateSe
|
||||||
|
|
||||||
func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) {
|
func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) {
|
||||||
|
|
||||||
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
return r.client.ScheduleCronJob(auroraJob.JobConfig())
|
return r.client.ScheduleCronJob(auroraJob.JobConfig())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -592,7 +592,7 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
|
||||||
|
|
||||||
func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) {
|
func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) {
|
||||||
|
|
||||||
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
return r.client.DescheduleCronJob(key)
|
return r.client.DescheduleCronJob(key)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -606,7 +606,7 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response,
|
||||||
|
|
||||||
func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error) {
|
func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error) {
|
||||||
|
|
||||||
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
return r.client.StartCronJob(key)
|
return r.client.StartCronJob(key)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -625,7 +625,7 @@ func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32)
|
||||||
instanceIds[instId] = true
|
instanceIds[instId] = true
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
return r.client.RestartShards(key, instanceIds)
|
return r.client.RestartShards(key, instanceIds)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -644,7 +644,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(instanceIds) > 0 {
|
if len(instanceIds) > 0 {
|
||||||
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
return r.client.RestartShards(key, instanceIds)
|
return r.client.RestartShards(key, instanceIds)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -661,7 +661,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
|
||||||
// Update all tasks under a job configuration. Currently gorealis doesn't support for canary deployments.
|
// Update all tasks under a job configuration. Currently gorealis doesn't support for canary deployments.
|
||||||
func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) {
|
func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) {
|
||||||
|
|
||||||
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
return r.client.StartJobUpdate(updateJob.req, message)
|
return r.client.StartJobUpdate(updateJob.req, message)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -674,7 +674,7 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au
|
||||||
// Abort Job Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI.
|
// Abort Job Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI.
|
||||||
func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) {
|
func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) {
|
||||||
|
|
||||||
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
return r.client.AbortJobUpdate(&updateKey, message)
|
return r.client.AbortJobUpdate(&updateKey, message)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -687,7 +687,7 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str
|
||||||
//Pause Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.
|
//Pause Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.
|
||||||
func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) {
|
func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) {
|
||||||
|
|
||||||
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
return r.client.PauseJobUpdate(updateKey, message)
|
return r.client.PauseJobUpdate(updateKey, message)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -701,7 +701,7 @@ func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st
|
||||||
//Resume Paused Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.
|
//Resume Paused Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.
|
||||||
func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) {
|
func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) {
|
||||||
|
|
||||||
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
return r.client.ResumeJobUpdate(updateKey, message)
|
return r.client.ResumeJobUpdate(updateKey, message)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -715,7 +715,7 @@ func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s
|
||||||
//Pulse Job Update on Aurora. UpdateID is returned from StartJobUpdate or the Aurora web UI.
|
//Pulse Job Update on Aurora. UpdateID is returned from StartJobUpdate or the Aurora web UI.
|
||||||
func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.Response, error) {
|
func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.Response, error) {
|
||||||
|
|
||||||
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
return r.client.PulseJobUpdate(updateKey)
|
return r.client.PulseJobUpdate(updateKey)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -730,7 +730,7 @@ func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R
|
||||||
// instance to scale up.
|
// instance to scale up.
|
||||||
func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) {
|
func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) {
|
||||||
|
|
||||||
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
return r.client.AddInstances(&instKey, count)
|
return r.client.AddInstances(&instKey, count)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -766,7 +766,7 @@ func (r *realisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora
|
||||||
// Get information about task including a fully hydrated task configuration object
|
// Get information about task including a fully hydrated task configuration object
|
||||||
func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) {
|
func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) {
|
||||||
|
|
||||||
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
return r.client.GetTasksStatus(query)
|
return r.client.GetTasksStatus(query)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -780,7 +780,7 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.S
|
||||||
// Get information about task including without a task configuration object
|
// Get information about task including without a task configuration object
|
||||||
func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) {
|
func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) {
|
||||||
|
|
||||||
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
return r.client.GetTasksWithoutConfigs(query)
|
return r.client.GetTasksWithoutConfigs(query)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -806,7 +806,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
|
||||||
Statuses: aurora.ACTIVE_STATES,
|
Statuses: aurora.ACTIVE_STATES,
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
return r.client.GetTasksStatus(taskQ)
|
return r.client.GetTasksStatus(taskQ)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -830,7 +830,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
|
||||||
|
|
||||||
func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aurora.Response, error) {
|
func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aurora.Response, error) {
|
||||||
|
|
||||||
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
return r.client.GetJobUpdateDetails(&updateQuery)
|
return r.client.GetJobUpdateDetails(&updateQuery)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -843,7 +843,7 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur
|
||||||
|
|
||||||
func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) {
|
func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) {
|
||||||
|
|
||||||
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
return r.client.RollbackJobUpdate(&key, message)
|
return r.client.RollbackJobUpdate(&key, message)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -870,7 +870,7 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr
|
||||||
drainList.HostNames[host] = true
|
drainList.HostNames[host] = true
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
return r.adminClient.DrainHosts(drainList)
|
return r.adminClient.DrainHosts(drainList)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -899,7 +899,7 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror
|
||||||
hostList.HostNames[host] = true
|
hostList.HostNames[host] = true
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
return r.adminClient.EndMaintenance(hostList)
|
return r.adminClient.EndMaintenance(hostList)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -930,7 +930,7 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au
|
||||||
|
|
||||||
// Make thrift call. If we encounter an error sending the call, attempt to reconnect
|
// Make thrift call. If we encounter an error sending the call, attempt to reconnect
|
||||||
// and continue trying to resend command until we run out of retries.
|
// and continue trying to resend command until we run out of retries.
|
||||||
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
|
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
return r.adminClient.MaintenanceStatus(hostList)
|
return r.adminClient.MaintenanceStatus(hostList)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
@ -25,8 +25,9 @@ import (
|
||||||
|
|
||||||
"github.com/paypal/gorealis"
|
"github.com/paypal/gorealis"
|
||||||
"github.com/paypal/gorealis/gen-go/apache/aurora"
|
"github.com/paypal/gorealis/gen-go/apache/aurora"
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"github.com/paypal/gorealis/response"
|
"github.com/paypal/gorealis/response"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
)
|
)
|
||||||
|
|
||||||
var r realis.Realis
|
var r realis.Realis
|
||||||
|
@ -59,16 +60,17 @@ func TestMain(m *testing.M) {
|
||||||
os.Exit(m.Run())
|
os.Exit(m.Run())
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBadEndpoint(t *testing.T) {
|
func TestNonExistentEndpoint(t *testing.T) {
|
||||||
|
backoff := &realis.Backoff{ // Reduce penalties for this test to make it quick
|
||||||
|
Steps: 5,
|
||||||
|
Duration: 1 * time.Second,
|
||||||
|
Factor: 1.0,
|
||||||
|
Jitter: 0.1}
|
||||||
|
|
||||||
// Attempt to connect to a bad endpoint
|
// Attempt to connect to a bad endpoint
|
||||||
r, err := realis.NewRealisClient(realis.SchedulerUrl("http://192.168.33.7:8081/scheduler/"),
|
r, err := realis.NewRealisClient(realis.SchedulerUrl("http://127.0.0.1:8081/doesntexist/"),
|
||||||
realis.TimeoutMS(200),
|
realis.TimeoutMS(200),
|
||||||
realis.BackOff(&realis.Backoff{ // Reduce penalties for this test to make it quick
|
realis.BackOff(backoff),
|
||||||
Steps: 5,
|
|
||||||
Duration: 1 * time.Second,
|
|
||||||
Factor: 1.0,
|
|
||||||
Jitter: 0.1}),
|
|
||||||
)
|
)
|
||||||
defer r.Close()
|
defer r.Close()
|
||||||
|
|
||||||
|
@ -83,6 +85,13 @@ func TestBadEndpoint(t *testing.T) {
|
||||||
// Check that we do error out of retrying
|
// Check that we do error out of retrying
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
|
|
||||||
|
// Check that the error before this one was a a retry error
|
||||||
|
// TODO: Consider bubbling up timeout behaving error all the way up to the user.
|
||||||
|
retryErr := realis.ToRetryCount(errors.Cause(err))
|
||||||
|
assert.NotNil(t, retryErr, "error passed in is not a retry error")
|
||||||
|
|
||||||
|
assert.Equal(t, backoff.Steps, retryErr.RetryCount(), "retry count is incorrect")
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLeaderFromZK(t *testing.T) {
|
func TestLeaderFromZK(t *testing.T) {
|
||||||
|
@ -186,11 +195,10 @@ func TestRealisClient_CreateJobWithPulse_Thermos(t *testing.T) {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode)
|
assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode)
|
||||||
|
|
||||||
|
|
||||||
updateQ := aurora.JobUpdateQuery{
|
updateQ := aurora.JobUpdateQuery{
|
||||||
Key: result.GetKey(),
|
Key: result.GetKey(),
|
||||||
Limit: 1,
|
Limit: 1,
|
||||||
}
|
}
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
for i := 0; i*int(pulse) <= timeout; i++ {
|
for i := 0; i*int(pulse) <= timeout; i++ {
|
||||||
|
|
78
retry.go
78
retry.go
|
@ -1,18 +1,16 @@
|
||||||
/*
|
/**
|
||||||
Copyright 2014 The Kubernetes Authors.
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
* You may obtain a copy of the License at
|
||||||
you may not use this file except in compliance with the License.
|
*
|
||||||
You may obtain a copy of the License at
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
Unless required by applicable law or agreed to in writing, software
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
* See the License for the specific language governing permissions and
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* limitations under the License.
|
||||||
See the License for the specific language governing permissions and
|
*/
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package realis
|
package realis
|
||||||
|
|
||||||
|
@ -59,22 +57,30 @@ type ConditionFunc func() (done bool, err error)
|
||||||
// If Jitter is greater than zero, a random amount of each duration is added
|
// If Jitter is greater than zero, a random amount of each duration is added
|
||||||
// (between duration and duration*(1+jitter)).
|
// (between duration and duration*(1+jitter)).
|
||||||
//
|
//
|
||||||
// If the condition never returns true, ErrWaitTimeout is returned. All other
|
// If the condition never returns true, ErrWaitTimeout is returned. Errors
|
||||||
// errors terminate immediately.
|
// do not cause the function to return.
|
||||||
func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error {
|
|
||||||
|
func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc) error {
|
||||||
var err error
|
var err error
|
||||||
var ok bool
|
var ok bool
|
||||||
|
var curStep int
|
||||||
duration := backoff.Duration
|
duration := backoff.Duration
|
||||||
for i := 0; i < backoff.Steps; i++ {
|
|
||||||
if i != 0 {
|
for curStep = 0; curStep < backoff.Steps; curStep++ {
|
||||||
|
|
||||||
|
// Only sleep if it's not the first iteration.
|
||||||
|
if curStep != 0 {
|
||||||
adjusted := duration
|
adjusted := duration
|
||||||
if backoff.Jitter > 0.0 {
|
if backoff.Jitter > 0.0 {
|
||||||
adjusted = Jitter(duration, backoff.Jitter)
|
adjusted = Jitter(duration, backoff.Jitter)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.Printf("A retriable error occurred during function call, backing off for %v before retrying\n", adjusted)
|
||||||
time.Sleep(adjusted)
|
time.Sleep(adjusted)
|
||||||
duration = time.Duration(float64(duration) * backoff.Factor)
|
duration = time.Duration(float64(duration) * backoff.Factor)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Execute function passed in.
|
||||||
ok, err = condition()
|
ok, err = condition()
|
||||||
|
|
||||||
// If the function executed says it succeeded, stop retrying
|
// If the function executed says it succeeded, stop retrying
|
||||||
|
@ -82,43 +88,53 @@ func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop retrying if the error is NOT temporary.
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
||||||
|
// If the error is temporary, continue retrying.
|
||||||
if !IsTemporary(err) {
|
if !IsTemporary(err) {
|
||||||
return err
|
return err
|
||||||
|
} else {
|
||||||
|
// Print out the temporary error we experienced.
|
||||||
|
logger.Println(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if curStep > 1 {
|
||||||
|
logger.Printf("retried this function call %d time(s)", curStep)
|
||||||
|
}
|
||||||
|
|
||||||
// Provide more information to the user wherever possible
|
// Provide more information to the user wherever possible
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return NewTimeoutError(errors.Wrap(err, "Timed out while retrying"))
|
return newRetryError(errors.Wrap(err, "ran out of retries"), curStep)
|
||||||
} else {
|
} else {
|
||||||
return NewTimeoutError(errors.New("Timed out while retrying"))
|
return newRetryError(errors.New("ran out of retries"), curStep)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type auroraThriftCall func() (resp *aurora.Response, err error)
|
type auroraThriftCall func() (resp *aurora.Response, err error)
|
||||||
|
|
||||||
// Duplicates the functionality of ExponentialBackoff but is specifically targeted towards ThriftCalls.
|
// Duplicates the functionality of ExponentialBackoff but is specifically targeted towards ThriftCalls.
|
||||||
func (r *realisClient) ThriftCallWithRetries(thriftCall auroraThriftCall) (*aurora.Response, error) {
|
func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*aurora.Response, error) {
|
||||||
var resp *aurora.Response
|
var resp *aurora.Response
|
||||||
var clientErr error
|
var clientErr error
|
||||||
|
var curStep int
|
||||||
|
|
||||||
backoff := r.config.backoff
|
backoff := r.config.backoff
|
||||||
duration := backoff.Duration
|
duration := backoff.Duration
|
||||||
|
|
||||||
for i := 0; i < backoff.Steps; i++ {
|
for curStep = 0; curStep < backoff.Steps; curStep++ {
|
||||||
|
|
||||||
// If this isn't our first try, backoff before the next try.
|
// If this isn't our first try, backoff before the next try.
|
||||||
if i != 0 {
|
if curStep != 0 {
|
||||||
adjusted := duration
|
adjusted := duration
|
||||||
if backoff.Jitter > 0.0 {
|
if backoff.Jitter > 0.0 {
|
||||||
adjusted = Jitter(duration, backoff.Jitter)
|
adjusted = Jitter(duration, backoff.Jitter)
|
||||||
}
|
}
|
||||||
|
|
||||||
r.logger.Printf("An error occurred during thrift call, backing off for %v before retrying\n", adjusted)
|
r.logger.Printf("A retriable error occurred during thrift call, backing off for %v before retrying\n", adjusted)
|
||||||
|
|
||||||
time.Sleep(adjusted)
|
time.Sleep(adjusted)
|
||||||
duration = time.Duration(float64(duration) * backoff.Factor)
|
duration = time.Duration(float64(duration) * backoff.Factor)
|
||||||
|
@ -176,10 +192,14 @@ func (r *realisClient) ThriftCallWithRetries(thriftCall auroraThriftCall) (*auro
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if curStep > 1 {
|
||||||
|
r.config.logger.Printf("retried this thrift call %d time(s)", curStep)
|
||||||
|
}
|
||||||
|
|
||||||
// Provide more information to the user wherever possible.
|
// Provide more information to the user wherever possible.
|
||||||
if clientErr != nil {
|
if clientErr != nil {
|
||||||
return nil, NewTimeoutError(errors.Wrap(clientErr, "Timed out while retrying, including latest error"))
|
return nil, newRetryError(errors.Wrap(clientErr, "ran out of retries, including latest error"), curStep)
|
||||||
} else {
|
} else {
|
||||||
return nil, NewTimeoutError(errors.New("Timed out while retrying"))
|
return nil, newRetryError(errors.New("ran out of retries"), curStep)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
101
zk.go
101
zk.go
|
@ -16,7 +16,6 @@ package realis
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
@ -36,27 +35,89 @@ type ServiceInstance struct {
|
||||||
Status string `json:"status"`
|
Status string `json:"status"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type zkConfig struct {
|
||||||
|
endpoints []string
|
||||||
|
path string
|
||||||
|
backoff Backoff
|
||||||
|
timeout time.Duration
|
||||||
|
logger Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
type ZKOpt func(z *zkConfig)
|
||||||
|
|
||||||
|
func ZKEndpoints(endpoints ...string) ZKOpt {
|
||||||
|
return func(z *zkConfig) {
|
||||||
|
z.endpoints = endpoints
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func ZKPath(path string) ZKOpt {
|
||||||
|
return func(z *zkConfig) {
|
||||||
|
z.path = path
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func ZKBackoff(b Backoff) ZKOpt {
|
||||||
|
return func(z *zkConfig) {
|
||||||
|
z.backoff = b
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func ZKTimeout(d time.Duration) ZKOpt {
|
||||||
|
return func(z *zkConfig) {
|
||||||
|
z.timeout = d
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func ZKLogger(l Logger) ZKOpt {
|
||||||
|
return func(z *zkConfig) {
|
||||||
|
z.logger = l
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Retrieves current Aurora leader from ZK.
|
// Retrieves current Aurora leader from ZK.
|
||||||
func LeaderFromZK(cluster Cluster) (string, error) {
|
func LeaderFromZK(cluster Cluster) (string, error) {
|
||||||
|
return LeaderFromZKOpts(ZKEndpoints(strings.Split(cluster.ZK, ",")...), ZKPath(cluster.SchedZKPath))
|
||||||
|
}
|
||||||
|
|
||||||
var zkurl string
|
// Retrieves current Aurora leader from ZK with a custom configuration.
|
||||||
|
func LeaderFromZKOpts(options ...ZKOpt) (string, error) {
|
||||||
|
var leaderURL string
|
||||||
|
|
||||||
retryErr := ExponentialBackoff(defaultBackoff, func() (bool, error) {
|
// Load the default configuration for Zookeeper followed by overriding values with those provided by the caller.
|
||||||
|
config := &zkConfig{backoff: defaultBackoff, timeout: time.Second * 10, logger: NoopLogger{}}
|
||||||
|
for _, opt := range options {
|
||||||
|
opt(config)
|
||||||
|
}
|
||||||
|
|
||||||
endpoints := strings.Split(cluster.ZK, ",")
|
if len(config.endpoints) == 0 {
|
||||||
|
return "", errors.New("no Zookeeper endpoints supplied")
|
||||||
|
}
|
||||||
|
|
||||||
//TODO (rdelvalle): When enabling debugging, change logger here
|
if config.path == "" {
|
||||||
c, _, err := zk.Connect(endpoints, time.Second*10, func(c *zk.Conn) { c.SetLogger(NoopLogger{}) })
|
return "", errors.New("no Zookeeper path supplied")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a closure that allows us to use the ExponentialBackoff function.
|
||||||
|
retryErr := ExponentialBackoff(config.backoff, config.logger, func() (bool, error) {
|
||||||
|
|
||||||
|
c, _, err := zk.Connect(config.endpoints, config.timeout, func(c *zk.Conn) { c.SetLogger(config.logger) })
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, NewTemporaryError(errors.Wrap(err, "Failed to connect to Zookeeper at "+cluster.ZK))
|
return false, NewTemporaryError(errors.Wrap(err, "Failed to connect to Zookeeper"))
|
||||||
}
|
}
|
||||||
|
|
||||||
defer c.Close()
|
defer c.Close()
|
||||||
|
|
||||||
// Open up descriptor for the ZK path given
|
// Open up descriptor for the ZK path given
|
||||||
children, _, _, err := c.ChildrenW(cluster.SchedZKPath)
|
children, _, _, err := c.ChildrenW(config.path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, errors.Wrapf(err, "Path %s doesn't exist on Zookeeper ", cluster.SchedZKPath)
|
|
||||||
|
// Sentinel error check as there is no other way to check.
|
||||||
|
if err == zk.ErrInvalidPath {
|
||||||
|
return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", config.path)
|
||||||
|
}
|
||||||
|
|
||||||
|
return false, NewTemporaryError(errors.Wrapf(err, "Path %s doesn't exist on Zookeeper ", config.path))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Search for the leader through all the children in the given path
|
// Search for the leader through all the children in the given path
|
||||||
|
@ -66,9 +127,14 @@ func LeaderFromZK(cluster Cluster) (string, error) {
|
||||||
// Only the leader will start with member_
|
// Only the leader will start with member_
|
||||||
if strings.HasPrefix(child, "member_") {
|
if strings.HasPrefix(child, "member_") {
|
||||||
|
|
||||||
data, _, err := c.Get(cluster.SchedZKPath + "/" + child)
|
childPath := config.path + "/" + child
|
||||||
|
data, _, err := c.Get(childPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, errors.Wrap(err, "Error fetching contents of leader")
|
if err == zk.ErrInvalidPath {
|
||||||
|
return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", childPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
return false, NewTemporaryError(errors.Wrap(err, "Error fetching contents of leader"))
|
||||||
}
|
}
|
||||||
|
|
||||||
err = json.Unmarshal([]byte(data), serviceInst)
|
err = json.Unmarshal([]byte(data), serviceInst)
|
||||||
|
@ -76,9 +142,11 @@ func LeaderFromZK(cluster Cluster) (string, error) {
|
||||||
return false, NewTemporaryError(errors.Wrap(err, "Unable to unmarshall contents of leader"))
|
return false, NewTemporaryError(errors.Wrap(err, "Unable to unmarshall contents of leader"))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Should only be one endpoint
|
// Should only be one endpoint.
|
||||||
|
// This should never be encountered as it would indicate Aurora
|
||||||
|
// writing bad info into Zookeeper but is kept here as a safety net.
|
||||||
if len(serviceInst.AdditionalEndpoints) > 1 {
|
if len(serviceInst.AdditionalEndpoints) > 1 {
|
||||||
fmt.Errorf("Ambiguous end points schemes")
|
return false, NewTemporaryError(errors.New("ambiguous endpoints in json blob, Aurora wrote bad info to ZK"))
|
||||||
}
|
}
|
||||||
|
|
||||||
var scheme, host, port string
|
var scheme, host, port string
|
||||||
|
@ -88,7 +156,7 @@ func LeaderFromZK(cluster Cluster) (string, error) {
|
||||||
port = strconv.Itoa(v.Port)
|
port = strconv.Itoa(v.Port)
|
||||||
}
|
}
|
||||||
|
|
||||||
zkurl = scheme + "://" + host + ":" + port
|
leaderURL = scheme + "://" + host + ":" + port
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -98,8 +166,9 @@ func LeaderFromZK(cluster Cluster) (string, error) {
|
||||||
})
|
})
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
return "", NewTimeoutError(errors.Wrapf(retryErr, "Failed to determine leader after %v attempts", defaultBackoff.Steps))
|
config.logger.Printf("Failed to determine leader after %v attempts", config.backoff.Steps)
|
||||||
|
return "", retryErr
|
||||||
}
|
}
|
||||||
|
|
||||||
return zkurl, nil
|
return leaderURL, nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue