Merge pull request #58 from paypal/zkPolish

Zookeeper functions and retry functions cleanup
This commit is contained in:
kkrishna 2018-03-05 12:09:18 -08:00 committed by GitHub
commit 66809c55f7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 284 additions and 91 deletions

View file

@ -17,27 +17,50 @@ package realis
// Using a pattern described by Dave Cheney to differentiate errors
// https://dave.cheney.net/2016/04/27/dont-just-check-errors-handle-them-gracefully
// Timeout errors are returned when a function has unsuccessfully retried.
// Timeout errors are returned when a function is unable to continue executing due
// to a time constraint or meeting a set number of retries.
type timeout interface {
Timeout() bool
Timedout() bool
}
func IsTimeout(err error) bool {
temp, ok := err.(timeout)
return ok && temp.Timeout()
return ok && temp.Timedout()
}
type TimeoutErr struct {
// retryErr is a superset of timeout which includes extra context
// with regards to our retry mechanism. This is done in order to make sure
// that our retry mechanism works as expected through our tests and should
// never be relied on or used directly. It is not made part of the public API
// on purpose.
type retryErr struct {
error
timeout bool
timedout bool
retryCount int // How many times did the mechanism retry the command
}
func (t *TimeoutErr) Timeout() bool {
return t.timeout
// Retry error is a timeout type error with added context.
func (r *retryErr) Timedout() bool {
return r.timedout
}
func NewTimeoutError(err error) *TimeoutErr {
return &TimeoutErr{error: err, timeout: true}
func (r *retryErr) RetryCount() int {
return r.retryCount
}
// Helper function for testing verification to avoid whitebox testing
// as well as keeping retryErr as a private.
// Should NOT be used under any other context.
func ToRetryCount(err error) *retryErr {
if retryErr, ok := err.(*retryErr); ok {
return retryErr
} else {
return nil
}
}
func newRetryError(err error, retryCount int) *retryErr {
return &retryErr{error: err, timedout: true, retryCount: retryCount}
}
// Temporary errors indicate that the action may and should be retried.

View file

@ -456,7 +456,7 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche
Statuses: states,
}
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.GetTasksWithoutConfigs(taskQ)
})
@ -476,7 +476,7 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche
}
func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error) {
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.readonlyClient.GetJobUpdateSummaries(jobUpdateQuery)
})
@ -491,7 +491,7 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe
var result *aurora.GetJobsResult_
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.readonlyClient.GetJobs(role)
})
@ -515,7 +515,7 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a
instanceIds[instId] = true
}
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.KillTasks(key, instanceIds, "")
})
@ -532,7 +532,7 @@ func (r *realisClient) RealisConfig() *RealisConfig {
// Sends a kill message to the scheduler for all active tasks under a job.
func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
// Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards
return r.client.KillTasks(key, nil, "")
})
@ -549,7 +549,7 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
// Use this API to create ad-hoc jobs.
func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.CreateJob(auroraJob.JobConfig())
})
@ -580,7 +580,7 @@ func (r *realisClient) CreateService(auroraJob Job, settings *aurora.JobUpdateSe
func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) {
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.ScheduleCronJob(auroraJob.JobConfig())
})
@ -592,7 +592,7 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) {
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.DescheduleCronJob(key)
})
@ -606,7 +606,7 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response,
func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error) {
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.StartCronJob(key)
})
@ -625,7 +625,7 @@ func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32)
instanceIds[instId] = true
}
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.RestartShards(key, instanceIds)
})
@ -644,7 +644,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
}
if len(instanceIds) > 0 {
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.RestartShards(key, instanceIds)
})
@ -661,7 +661,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
// Update all tasks under a job configuration. Currently gorealis doesn't support for canary deployments.
func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) {
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.StartJobUpdate(updateJob.req, message)
})
@ -674,7 +674,7 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au
// Abort Job Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI.
func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) {
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.AbortJobUpdate(&updateKey, message)
})
@ -687,7 +687,7 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str
//Pause Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.
func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) {
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.PauseJobUpdate(updateKey, message)
})
@ -701,7 +701,7 @@ func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st
//Resume Paused Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.
func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) {
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.ResumeJobUpdate(updateKey, message)
})
@ -715,7 +715,7 @@ func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s
//Pulse Job Update on Aurora. UpdateID is returned from StartJobUpdate or the Aurora web UI.
func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.Response, error) {
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.PulseJobUpdate(updateKey)
})
@ -730,7 +730,7 @@ func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R
// instance to scale up.
func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) {
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.AddInstances(&instKey, count)
})
@ -766,7 +766,7 @@ func (r *realisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora
// Get information about task including a fully hydrated task configuration object
func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) {
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.GetTasksStatus(query)
})
@ -780,7 +780,7 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.S
// Get information about task including without a task configuration object
func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) {
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.GetTasksWithoutConfigs(query)
})
@ -806,7 +806,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
Statuses: aurora.ACTIVE_STATES,
}
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.GetTasksStatus(taskQ)
})
@ -830,7 +830,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aurora.Response, error) {
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.GetJobUpdateDetails(&updateQuery)
})
@ -843,7 +843,7 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur
func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) {
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.RollbackJobUpdate(&key, message)
})
@ -870,7 +870,7 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr
drainList.HostNames[host] = true
}
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.DrainHosts(drainList)
})
@ -899,7 +899,7 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror
hostList.HostNames[host] = true
}
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.EndMaintenance(hostList)
})
@ -930,7 +930,7 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au
// Make thrift call. If we encounter an error sending the call, attempt to reconnect
// and continue trying to resend command until we run out of retries.
resp, retryErr := r.ThriftCallWithRetries(func() (*aurora.Response, error) {
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.MaintenanceStatus(hostList)
})

View file

@ -25,8 +25,9 @@ import (
"github.com/paypal/gorealis"
"github.com/paypal/gorealis/gen-go/apache/aurora"
"github.com/stretchr/testify/assert"
"github.com/paypal/gorealis/response"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)
var r realis.Realis
@ -59,16 +60,17 @@ func TestMain(m *testing.M) {
os.Exit(m.Run())
}
func TestBadEndpoint(t *testing.T) {
func TestNonExistentEndpoint(t *testing.T) {
backoff := &realis.Backoff{ // Reduce penalties for this test to make it quick
Steps: 5,
Duration: 1 * time.Second,
Factor: 1.0,
Jitter: 0.1}
// Attempt to connect to a bad endpoint
r, err := realis.NewRealisClient(realis.SchedulerUrl("http://192.168.33.7:8081/scheduler/"),
r, err := realis.NewRealisClient(realis.SchedulerUrl("http://127.0.0.1:8081/doesntexist/"),
realis.TimeoutMS(200),
realis.BackOff(&realis.Backoff{ // Reduce penalties for this test to make it quick
Steps: 5,
Duration: 1 * time.Second,
Factor: 1.0,
Jitter: 0.1}),
realis.BackOff(backoff),
)
defer r.Close()
@ -83,6 +85,13 @@ func TestBadEndpoint(t *testing.T) {
// Check that we do error out of retrying
assert.Error(t, err)
// Check that the error before this one was a a retry error
// TODO: Consider bubbling up timeout behaving error all the way up to the user.
retryErr := realis.ToRetryCount(errors.Cause(err))
assert.NotNil(t, retryErr, "error passed in is not a retry error")
assert.Equal(t, backoff.Steps, retryErr.RetryCount(), "retry count is incorrect")
}
func TestLeaderFromZK(t *testing.T) {
@ -186,11 +195,10 @@ func TestRealisClient_CreateJobWithPulse_Thermos(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode)
updateQ := aurora.JobUpdateQuery{
Key: result.GetKey(),
Limit: 1,
}
Key: result.GetKey(),
Limit: 1,
}
start := time.Now()
for i := 0; i*int(pulse) <= timeout; i++ {

View file

@ -1,18 +1,16 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package realis
@ -59,22 +57,30 @@ type ConditionFunc func() (done bool, err error)
// If Jitter is greater than zero, a random amount of each duration is added
// (between duration and duration*(1+jitter)).
//
// If the condition never returns true, ErrWaitTimeout is returned. All other
// errors terminate immediately.
func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error {
// If the condition never returns true, ErrWaitTimeout is returned. Errors
// do not cause the function to return.
func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc) error {
var err error
var ok bool
var curStep int
duration := backoff.Duration
for i := 0; i < backoff.Steps; i++ {
if i != 0 {
for curStep = 0; curStep < backoff.Steps; curStep++ {
// Only sleep if it's not the first iteration.
if curStep != 0 {
adjusted := duration
if backoff.Jitter > 0.0 {
adjusted = Jitter(duration, backoff.Jitter)
}
logger.Printf("A retriable error occurred during function call, backing off for %v before retrying\n", adjusted)
time.Sleep(adjusted)
duration = time.Duration(float64(duration) * backoff.Factor)
}
// Execute function passed in.
ok, err = condition()
// If the function executed says it succeeded, stop retrying
@ -82,43 +88,53 @@ func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error {
return nil
}
// Stop retrying if the error is NOT temporary.
if err != nil {
// If the error is temporary, continue retrying.
if !IsTemporary(err) {
return err
} else {
// Print out the temporary error we experienced.
logger.Println(err)
}
}
}
if curStep > 1 {
logger.Printf("retried this function call %d time(s)", curStep)
}
// Provide more information to the user wherever possible
if err != nil {
return NewTimeoutError(errors.Wrap(err, "Timed out while retrying"))
return newRetryError(errors.Wrap(err, "ran out of retries"), curStep)
} else {
return NewTimeoutError(errors.New("Timed out while retrying"))
return newRetryError(errors.New("ran out of retries"), curStep)
}
}
type auroraThriftCall func() (resp *aurora.Response, err error)
// Duplicates the functionality of ExponentialBackoff but is specifically targeted towards ThriftCalls.
func (r *realisClient) ThriftCallWithRetries(thriftCall auroraThriftCall) (*aurora.Response, error) {
func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*aurora.Response, error) {
var resp *aurora.Response
var clientErr error
var curStep int
backoff := r.config.backoff
duration := backoff.Duration
for i := 0; i < backoff.Steps; i++ {
for curStep = 0; curStep < backoff.Steps; curStep++ {
// If this isn't our first try, backoff before the next try.
if i != 0 {
if curStep != 0 {
adjusted := duration
if backoff.Jitter > 0.0 {
adjusted = Jitter(duration, backoff.Jitter)
}
r.logger.Printf("An error occurred during thrift call, backing off for %v before retrying\n", adjusted)
r.logger.Printf("A retriable error occurred during thrift call, backing off for %v before retrying\n", adjusted)
time.Sleep(adjusted)
duration = time.Duration(float64(duration) * backoff.Factor)
@ -136,6 +152,10 @@ func (r *realisClient) ThriftCallWithRetries(thriftCall auroraThriftCall) (*auro
// Check if our thrift call is returning an error. This is a retriable event as we don't know
// if it was caused by network issues.
if clientErr != nil {
// Print out the error to the user
r.logger.Println(clientErr)
r.ReestablishConn()
// In the future, reestablish connection should be able to check if it is actually possible
@ -176,10 +196,14 @@ func (r *realisClient) ThriftCallWithRetries(thriftCall auroraThriftCall) (*auro
}
if curStep > 1 {
r.config.logger.Printf("retried this thrift call %d time(s)", curStep)
}
// Provide more information to the user wherever possible.
if clientErr != nil {
return nil, NewTimeoutError(errors.Wrap(clientErr, "Timed out while retrying, including latest error"))
return nil, newRetryError(errors.Wrap(clientErr, "ran out of retries, including latest error"), curStep)
} else {
return nil, NewTimeoutError(errors.New("Timed out while retrying"))
return nil, newRetryError(errors.New("ran out of retries"), curStep)
}
}

101
zk.go
View file

@ -16,7 +16,6 @@ package realis
import (
"encoding/json"
"fmt"
"strconv"
"strings"
"time"
@ -36,27 +35,89 @@ type ServiceInstance struct {
Status string `json:"status"`
}
type zkConfig struct {
endpoints []string
path string
backoff Backoff
timeout time.Duration
logger Logger
}
type ZKOpt func(z *zkConfig)
func ZKEndpoints(endpoints ...string) ZKOpt {
return func(z *zkConfig) {
z.endpoints = endpoints
}
}
func ZKPath(path string) ZKOpt {
return func(z *zkConfig) {
z.path = path
}
}
func ZKBackoff(b Backoff) ZKOpt {
return func(z *zkConfig) {
z.backoff = b
}
}
func ZKTimeout(d time.Duration) ZKOpt {
return func(z *zkConfig) {
z.timeout = d
}
}
func ZKLogger(l Logger) ZKOpt {
return func(z *zkConfig) {
z.logger = l
}
}
// Retrieves current Aurora leader from ZK.
func LeaderFromZK(cluster Cluster) (string, error) {
return LeaderFromZKOpts(ZKEndpoints(strings.Split(cluster.ZK, ",")...), ZKPath(cluster.SchedZKPath))
}
var zkurl string
// Retrieves current Aurora leader from ZK with a custom configuration.
func LeaderFromZKOpts(options ...ZKOpt) (string, error) {
var leaderURL string
retryErr := ExponentialBackoff(defaultBackoff, func() (bool, error) {
// Load the default configuration for Zookeeper followed by overriding values with those provided by the caller.
config := &zkConfig{backoff: defaultBackoff, timeout: time.Second * 10, logger: NoopLogger{}}
for _, opt := range options {
opt(config)
}
endpoints := strings.Split(cluster.ZK, ",")
if len(config.endpoints) == 0 {
return "", errors.New("no Zookeeper endpoints supplied")
}
//TODO (rdelvalle): When enabling debugging, change logger here
c, _, err := zk.Connect(endpoints, time.Second*10, func(c *zk.Conn) { c.SetLogger(NoopLogger{}) })
if config.path == "" {
return "", errors.New("no Zookeeper path supplied")
}
// Create a closure that allows us to use the ExponentialBackoff function.
retryErr := ExponentialBackoff(config.backoff, config.logger, func() (bool, error) {
c, _, err := zk.Connect(config.endpoints, config.timeout, func(c *zk.Conn) { c.SetLogger(config.logger) })
if err != nil {
return false, NewTemporaryError(errors.Wrap(err, "Failed to connect to Zookeeper at "+cluster.ZK))
return false, NewTemporaryError(errors.Wrap(err, "Failed to connect to Zookeeper"))
}
defer c.Close()
// Open up descriptor for the ZK path given
children, _, _, err := c.ChildrenW(cluster.SchedZKPath)
children, _, _, err := c.ChildrenW(config.path)
if err != nil {
return false, errors.Wrapf(err, "Path %s doesn't exist on Zookeeper ", cluster.SchedZKPath)
// Sentinel error check as there is no other way to check.
if err == zk.ErrInvalidPath {
return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", config.path)
}
return false, NewTemporaryError(errors.Wrapf(err, "Path %s doesn't exist on Zookeeper ", config.path))
}
// Search for the leader through all the children in the given path
@ -66,9 +127,14 @@ func LeaderFromZK(cluster Cluster) (string, error) {
// Only the leader will start with member_
if strings.HasPrefix(child, "member_") {
data, _, err := c.Get(cluster.SchedZKPath + "/" + child)
childPath := config.path + "/" + child
data, _, err := c.Get(childPath)
if err != nil {
return false, errors.Wrap(err, "Error fetching contents of leader")
if err == zk.ErrInvalidPath {
return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", childPath)
}
return false, NewTemporaryError(errors.Wrap(err, "Error fetching contents of leader"))
}
err = json.Unmarshal([]byte(data), serviceInst)
@ -76,9 +142,11 @@ func LeaderFromZK(cluster Cluster) (string, error) {
return false, NewTemporaryError(errors.Wrap(err, "Unable to unmarshall contents of leader"))
}
// Should only be one endpoint
// Should only be one endpoint.
// This should never be encountered as it would indicate Aurora
// writing bad info into Zookeeper but is kept here as a safety net.
if len(serviceInst.AdditionalEndpoints) > 1 {
fmt.Errorf("Ambiguous end points schemes")
return false, NewTemporaryError(errors.New("ambiguous endpoints in json blob, Aurora wrote bad info to ZK"))
}
var scheme, host, port string
@ -88,7 +156,7 @@ func LeaderFromZK(cluster Cluster) (string, error) {
port = strconv.Itoa(v.Port)
}
zkurl = scheme + "://" + host + ":" + port
leaderURL = scheme + "://" + host + ":" + port
return true, nil
}
}
@ -98,8 +166,9 @@ func LeaderFromZK(cluster Cluster) (string, error) {
})
if retryErr != nil {
return "", NewTimeoutError(errors.Wrapf(retryErr, "Failed to determine leader after %v attempts", defaultBackoff.Steps))
config.logger.Printf("Failed to determine leader after %v attempts", config.backoff.Steps)
return "", retryErr
}
return zkurl, nil
return leaderURL, nil
}

69
zk_test.go Normal file
View file

@ -0,0 +1,69 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package realis_test
import (
"log"
"os"
"testing"
"time"
"github.com/paypal/gorealis"
"github.com/stretchr/testify/assert"
)
var backoff realis.Backoff = realis.Backoff{ // Reduce penalties for this test to make it quick
Steps: 5,
Duration: 1 * time.Second,
Factor: 1.0,
Jitter: 0.1}
// Test for behavior when no endpoints are given to the ZK leader finding function.
func TestZKNoEndpoints(t *testing.T) {
_, err := realis.LeaderFromZKOpts()
assert.Error(t, err)
}
// Test for behavior when no path is given to the ZK leader finding function.
func TestZKNoPath(t *testing.T) {
_, err := realis.LeaderFromZKOpts(realis.ZKEndpoints("127.0.0.1:2181"))
assert.Error(t, err)
}
// Test for behavior when a valid but non-existent path is given to the ZK leader finding function.
func TestZKPathDoesntExist(t *testing.T) {
_, err := realis.LeaderFromZKOpts(realis.ZKEndpoints("127.0.0.1:2181"),
realis.ZKPath("/somepath"),
realis.ZKBackoff(backoff),
realis.ZKLogger(log.New(os.Stdout, "realis-debug: ", log.Ldate)))
assert.True(t, realis.IsTimeout(err), "a non-existent path should result in a timeout behaving error")
retryErr := realis.ToRetryCount(err)
assert.NotNil(t, retryErr, "conversion to retry error failed")
assert.Equal(t, backoff.Steps, retryErr.RetryCount(), "retry count is off")
}
// Test for behavior when an invalid Zookeeper path is passed. Should fail right away.
func TestZKBadPath(t *testing.T) {
_, err := realis.LeaderFromZKOpts(realis.ZKEndpoints("127.0.0.1:2181"),
realis.ZKPath("invalidpath"),
realis.ZKBackoff(backoff),
realis.ZKLogger(log.New(os.Stdout, "realis-debug: ", log.Ldate)))
assert.False(t, realis.IsTimeout(err), "a bad path should result in a NON-timeout behaving error")
}