diff --git a/errors.go b/errors.go index be85828..7007f83 100644 --- a/errors.go +++ b/errors.go @@ -17,27 +17,50 @@ package realis // Using a pattern described by Dave Cheney to differentiate errors // https://dave.cheney.net/2016/04/27/dont-just-check-errors-handle-them-gracefully -// Timeout errors are returned when a function has unsuccessfully retried. +// Timeout errors are returned when a function is unable to continue executing due +// to a time constraint or meeting a set number of retries. type timeout interface { - Timeout() bool + Timedout() bool } func IsTimeout(err error) bool { temp, ok := err.(timeout) - return ok && temp.Timeout() + return ok && temp.Timedout() } -type TimeoutErr struct { +// retryErr is a superset of timeout which includes extra context +// with regards to our retry mechanism. This is done in order to make sure +// that our retry mechanism works as expected through our tests and should +// never be relied on or used directly. It is not made part of the public API +// on purpose. +type retryErr struct { error - timeout bool + timedout bool + retryCount int // How many times did the mechanism retry the command } -func (t *TimeoutErr) Timeout() bool { - return t.timeout +// Retry error is a timeout type error with added context. +func (r *retryErr) Timedout() bool { + return r.timedout } -func NewTimeoutError(err error) *TimeoutErr { - return &TimeoutErr{error: err, timeout: true} +func (r *retryErr) RetryCount() int { + return r.retryCount +} + +// Helper function for testing verification to avoid whitebox testing +// as well as keeping retryErr as a private. +// Should NOT be used under any other context. +func ToRetryCount(err error) *retryErr { + if retryErr, ok := err.(*retryErr); ok { + return retryErr + } else { + return nil + } +} + +func newRetryError(err error, retryCount int) *retryErr { + return &retryErr{error: err, timedout: true, retryCount: retryCount} } // Temporary errors indicate that the action may and should be retried. diff --git a/realis.go b/realis.go index 9522da5..6c2f6d6 100644 --- a/realis.go +++ b/realis.go @@ -456,7 +456,7 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche Statuses: states, } - resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.GetTasksWithoutConfigs(taskQ) }) @@ -476,7 +476,7 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche } func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error) { - resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.readonlyClient.GetJobUpdateSummaries(jobUpdateQuery) }) @@ -491,7 +491,7 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe var result *aurora.GetJobsResult_ - resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.readonlyClient.GetJobs(role) }) @@ -515,7 +515,7 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a instanceIds[instId] = true } - resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.KillTasks(key, instanceIds, "") }) @@ -532,7 +532,7 @@ func (r *realisClient) RealisConfig() *RealisConfig { // Sends a kill message to the scheduler for all active tasks under a job. func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { - resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { // Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards return r.client.KillTasks(key, nil, "") }) @@ -549,7 +549,7 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { // Use this API to create ad-hoc jobs. func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { - resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.CreateJob(auroraJob.JobConfig()) }) @@ -580,7 +580,7 @@ func (r *realisClient) CreateService(auroraJob Job, settings *aurora.JobUpdateSe func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) { - resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.ScheduleCronJob(auroraJob.JobConfig()) }) @@ -592,7 +592,7 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) { - resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.DescheduleCronJob(key) }) @@ -606,7 +606,7 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error) { - resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.StartCronJob(key) }) @@ -625,7 +625,7 @@ func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) instanceIds[instId] = true } - resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.RestartShards(key, instanceIds) }) @@ -644,7 +644,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) } if len(instanceIds) > 0 { - resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.RestartShards(key, instanceIds) }) @@ -661,7 +661,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) // Update all tasks under a job configuration. Currently gorealis doesn't support for canary deployments. func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) { - resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.StartJobUpdate(updateJob.req, message) }) @@ -674,7 +674,7 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au // Abort Job Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI. func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) { - resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.AbortJobUpdate(&updateKey, message) }) @@ -687,7 +687,7 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str //Pause Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) { - resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.PauseJobUpdate(updateKey, message) }) @@ -701,7 +701,7 @@ func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st //Resume Paused Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) { - resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.ResumeJobUpdate(updateKey, message) }) @@ -715,7 +715,7 @@ func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s //Pulse Job Update on Aurora. UpdateID is returned from StartJobUpdate or the Aurora web UI. func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.Response, error) { - resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.PulseJobUpdate(updateKey) }) @@ -730,7 +730,7 @@ func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R // instance to scale up. func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) { - resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.AddInstances(&instKey, count) }) @@ -766,7 +766,7 @@ func (r *realisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora // Get information about task including a fully hydrated task configuration object func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) { - resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.GetTasksStatus(query) }) @@ -780,7 +780,7 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.S // Get information about task including without a task configuration object func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) { - resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.GetTasksWithoutConfigs(query) }) @@ -806,7 +806,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task Statuses: aurora.ACTIVE_STATES, } - resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.GetTasksStatus(taskQ) }) @@ -830,7 +830,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aurora.Response, error) { - resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.GetJobUpdateDetails(&updateQuery) }) @@ -843,7 +843,7 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) { - resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.RollbackJobUpdate(&key, message) }) @@ -870,7 +870,7 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr drainList.HostNames[host] = true } - resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.adminClient.DrainHosts(drainList) }) @@ -899,7 +899,7 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror hostList.HostNames[host] = true } - resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.adminClient.EndMaintenance(hostList) }) @@ -930,7 +930,7 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au // Make thrift call. If we encounter an error sending the call, attempt to reconnect // and continue trying to resend command until we run out of retries. - resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) { + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.adminClient.MaintenanceStatus(hostList) }) diff --git a/realis_e2e_test.go b/realis_e2e_test.go index 0467a8b..34817ab 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -25,8 +25,9 @@ import ( "github.com/paypal/gorealis" "github.com/paypal/gorealis/gen-go/apache/aurora" - "github.com/stretchr/testify/assert" "github.com/paypal/gorealis/response" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" ) var r realis.Realis @@ -59,16 +60,17 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } -func TestBadEndpoint(t *testing.T) { +func TestNonExistentEndpoint(t *testing.T) { + backoff := &realis.Backoff{ // Reduce penalties for this test to make it quick + Steps: 5, + Duration: 1 * time.Second, + Factor: 1.0, + Jitter: 0.1} // Attempt to connect to a bad endpoint - r, err := realis.NewRealisClient(realis.SchedulerUrl("http://192.168.33.7:8081/scheduler/"), + r, err := realis.NewRealisClient(realis.SchedulerUrl("http://127.0.0.1:8081/doesntexist/"), realis.TimeoutMS(200), - realis.BackOff(&realis.Backoff{ // Reduce penalties for this test to make it quick - Steps: 5, - Duration: 1 * time.Second, - Factor: 1.0, - Jitter: 0.1}), + realis.BackOff(backoff), ) defer r.Close() @@ -83,6 +85,13 @@ func TestBadEndpoint(t *testing.T) { // Check that we do error out of retrying assert.Error(t, err) + // Check that the error before this one was a a retry error + // TODO: Consider bubbling up timeout behaving error all the way up to the user. + retryErr := realis.ToRetryCount(errors.Cause(err)) + assert.NotNil(t, retryErr, "error passed in is not a retry error") + + assert.Equal(t, backoff.Steps, retryErr.RetryCount(), "retry count is incorrect") + } func TestLeaderFromZK(t *testing.T) { @@ -186,11 +195,10 @@ func TestRealisClient_CreateJobWithPulse_Thermos(t *testing.T) { assert.NoError(t, err) assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode) - updateQ := aurora.JobUpdateQuery{ - Key: result.GetKey(), - Limit: 1, - } + Key: result.GetKey(), + Limit: 1, + } start := time.Now() for i := 0; i*int(pulse) <= timeout; i++ { diff --git a/retry.go b/retry.go index f2299c3..586b60c 100644 --- a/retry.go +++ b/retry.go @@ -1,18 +1,16 @@ -/* -Copyright 2014 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package realis @@ -59,22 +57,30 @@ type ConditionFunc func() (done bool, err error) // If Jitter is greater than zero, a random amount of each duration is added // (between duration and duration*(1+jitter)). // -// If the condition never returns true, ErrWaitTimeout is returned. All other -// errors terminate immediately. -func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error { +// If the condition never returns true, ErrWaitTimeout is returned. Errors +// do not cause the function to return. + +func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc) error { var err error var ok bool + var curStep int duration := backoff.Duration - for i := 0; i < backoff.Steps; i++ { - if i != 0 { + + for curStep = 0; curStep < backoff.Steps; curStep++ { + + // Only sleep if it's not the first iteration. + if curStep != 0 { adjusted := duration if backoff.Jitter > 0.0 { adjusted = Jitter(duration, backoff.Jitter) } + + logger.Printf("A retriable error occurred during function call, backing off for %v before retrying\n", adjusted) time.Sleep(adjusted) duration = time.Duration(float64(duration) * backoff.Factor) } + // Execute function passed in. ok, err = condition() // If the function executed says it succeeded, stop retrying @@ -82,43 +88,53 @@ func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error { return nil } - // Stop retrying if the error is NOT temporary. if err != nil { + + // If the error is temporary, continue retrying. if !IsTemporary(err) { return err + } else { + // Print out the temporary error we experienced. + logger.Println(err) } + } } + if curStep > 1 { + logger.Printf("retried this function call %d time(s)", curStep) + } + // Provide more information to the user wherever possible if err != nil { - return NewTimeoutError(errors.Wrap(err, "Timed out while retrying")) + return newRetryError(errors.Wrap(err, "ran out of retries"), curStep) } else { - return NewTimeoutError(errors.New("Timed out while retrying")) + return newRetryError(errors.New("ran out of retries"), curStep) } } type auroraThriftCall func() (resp *aurora.Response, err error) // Duplicates the functionality of ExponentialBackoff but is specifically targeted towards ThriftCalls. -func (r *realisClient) ThriftCallWithRetries(thriftCall auroraThriftCall) (*aurora.Response, error) { +func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*aurora.Response, error) { var resp *aurora.Response var clientErr error + var curStep int backoff := r.config.backoff duration := backoff.Duration - for i := 0; i < backoff.Steps; i++ { + for curStep = 0; curStep < backoff.Steps; curStep++ { // If this isn't our first try, backoff before the next try. - if i != 0 { + if curStep != 0 { adjusted := duration if backoff.Jitter > 0.0 { adjusted = Jitter(duration, backoff.Jitter) } - r.logger.Printf("An error occurred during thrift call, backing off for %v before retrying\n", adjusted) + r.logger.Printf("A retriable error occurred during thrift call, backing off for %v before retrying\n", adjusted) time.Sleep(adjusted) duration = time.Duration(float64(duration) * backoff.Factor) @@ -176,10 +192,14 @@ func (r *realisClient) ThriftCallWithRetries(thriftCall auroraThriftCall) (*auro } + if curStep > 1 { + r.config.logger.Printf("retried this thrift call %d time(s)", curStep) + } + // Provide more information to the user wherever possible. if clientErr != nil { - return nil, NewTimeoutError(errors.Wrap(clientErr, "Timed out while retrying, including latest error")) + return nil, newRetryError(errors.Wrap(clientErr, "ran out of retries, including latest error"), curStep) } else { - return nil, NewTimeoutError(errors.New("Timed out while retrying")) + return nil, newRetryError(errors.New("ran out of retries"), curStep) } } diff --git a/zk.go b/zk.go index f5f55ff..dd711e0 100644 --- a/zk.go +++ b/zk.go @@ -16,7 +16,6 @@ package realis import ( "encoding/json" - "fmt" "strconv" "strings" "time" @@ -36,27 +35,89 @@ type ServiceInstance struct { Status string `json:"status"` } +type zkConfig struct { + endpoints []string + path string + backoff Backoff + timeout time.Duration + logger Logger +} + +type ZKOpt func(z *zkConfig) + +func ZKEndpoints(endpoints ...string) ZKOpt { + return func(z *zkConfig) { + z.endpoints = endpoints + } +} + +func ZKPath(path string) ZKOpt { + return func(z *zkConfig) { + z.path = path + } +} + +func ZKBackoff(b Backoff) ZKOpt { + return func(z *zkConfig) { + z.backoff = b + } +} + +func ZKTimeout(d time.Duration) ZKOpt { + return func(z *zkConfig) { + z.timeout = d + } +} + +func ZKLogger(l Logger) ZKOpt { + return func(z *zkConfig) { + z.logger = l + } +} + // Retrieves current Aurora leader from ZK. func LeaderFromZK(cluster Cluster) (string, error) { + return LeaderFromZKOpts(ZKEndpoints(strings.Split(cluster.ZK, ",")...), ZKPath(cluster.SchedZKPath)) +} - var zkurl string +// Retrieves current Aurora leader from ZK with a custom configuration. +func LeaderFromZKOpts(options ...ZKOpt) (string, error) { + var leaderURL string - retryErr := ExponentialBackoff(defaultBackoff, func() (bool, error) { + // Load the default configuration for Zookeeper followed by overriding values with those provided by the caller. + config := &zkConfig{backoff: defaultBackoff, timeout: time.Second * 10, logger: NoopLogger{}} + for _, opt := range options { + opt(config) + } - endpoints := strings.Split(cluster.ZK, ",") + if len(config.endpoints) == 0 { + return "", errors.New("no Zookeeper endpoints supplied") + } - //TODO (rdelvalle): When enabling debugging, change logger here - c, _, err := zk.Connect(endpoints, time.Second*10, func(c *zk.Conn) { c.SetLogger(NoopLogger{}) }) + if config.path == "" { + return "", errors.New("no Zookeeper path supplied") + } + + // Create a closure that allows us to use the ExponentialBackoff function. + retryErr := ExponentialBackoff(config.backoff, config.logger, func() (bool, error) { + + c, _, err := zk.Connect(config.endpoints, config.timeout, func(c *zk.Conn) { c.SetLogger(config.logger) }) if err != nil { - return false, NewTemporaryError(errors.Wrap(err, "Failed to connect to Zookeeper at "+cluster.ZK)) + return false, NewTemporaryError(errors.Wrap(err, "Failed to connect to Zookeeper")) } defer c.Close() // Open up descriptor for the ZK path given - children, _, _, err := c.ChildrenW(cluster.SchedZKPath) + children, _, _, err := c.ChildrenW(config.path) if err != nil { - return false, errors.Wrapf(err, "Path %s doesn't exist on Zookeeper ", cluster.SchedZKPath) + + // Sentinel error check as there is no other way to check. + if err == zk.ErrInvalidPath { + return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", config.path) + } + + return false, NewTemporaryError(errors.Wrapf(err, "Path %s doesn't exist on Zookeeper ", config.path)) } // Search for the leader through all the children in the given path @@ -66,9 +127,14 @@ func LeaderFromZK(cluster Cluster) (string, error) { // Only the leader will start with member_ if strings.HasPrefix(child, "member_") { - data, _, err := c.Get(cluster.SchedZKPath + "/" + child) + childPath := config.path + "/" + child + data, _, err := c.Get(childPath) if err != nil { - return false, errors.Wrap(err, "Error fetching contents of leader") + if err == zk.ErrInvalidPath { + return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", childPath) + } + + return false, NewTemporaryError(errors.Wrap(err, "Error fetching contents of leader")) } err = json.Unmarshal([]byte(data), serviceInst) @@ -76,9 +142,11 @@ func LeaderFromZK(cluster Cluster) (string, error) { return false, NewTemporaryError(errors.Wrap(err, "Unable to unmarshall contents of leader")) } - // Should only be one endpoint + // Should only be one endpoint. + // This should never be encountered as it would indicate Aurora + // writing bad info into Zookeeper but is kept here as a safety net. if len(serviceInst.AdditionalEndpoints) > 1 { - fmt.Errorf("Ambiguous end points schemes") + return false, NewTemporaryError(errors.New("ambiguous endpoints in json blob, Aurora wrote bad info to ZK")) } var scheme, host, port string @@ -88,7 +156,7 @@ func LeaderFromZK(cluster Cluster) (string, error) { port = strconv.Itoa(v.Port) } - zkurl = scheme + "://" + host + ":" + port + leaderURL = scheme + "://" + host + ":" + port return true, nil } } @@ -98,8 +166,9 @@ func LeaderFromZK(cluster Cluster) (string, error) { }) if retryErr != nil { - return "", NewTimeoutError(errors.Wrapf(retryErr, "Failed to determine leader after %v attempts", defaultBackoff.Steps)) + config.logger.Printf("Failed to determine leader after %v attempts", config.backoff.Steps) + return "", retryErr } - return zkurl, nil + return leaderURL, nil }