From f6a21e0f59f98aaf9f63720cf889dbdf68e24e8c Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Tue, 28 Nov 2017 17:30:33 -0800 Subject: [PATCH] Cleaned up ZK connection code by using the backoff function. Added a test to the end to end to test that we're getting the host correctly from ZK. Changed clusters test to be an outside package. --- clusters_test.go | 5 +- realis_e2e_test.go | 8 +++ zk.go | 123 ++++++++++++++++++++------------------------- 3 files changed, 65 insertions(+), 71 deletions(-) diff --git a/clusters_test.go b/clusters_test.go index 2e7118b..6f854dd 100644 --- a/clusters_test.go +++ b/clusters_test.go @@ -12,17 +12,18 @@ * limitations under the License. */ -package realis +package realis_test import ( "fmt" "github.com/stretchr/testify/assert" "testing" + "github.com/paypal/gorealis" ) func TestLoadClusters(t *testing.T) { - clusters, err := LoadClusters("examples/clusters.json") + clusters, err := realis.LoadClusters("examples/clusters.json") if err != nil { fmt.Print(err) } diff --git a/realis_e2e_test.go b/realis_e2e_test.go index 413d334..82b339a 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -55,6 +55,14 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } +func TestLeaderFromZK(t *testing.T) { + cluster := realis.GetDefaultClusterFromZKUrl("192.168.33.7:2181") + url, err := realis.LeaderFromZK(*cluster) + + assert.NoError(t, err) + assert.Equal(t, url, "http://aurora.local:8081") +} + func TestRealisClient_CreateJob_Thermos(t *testing.T) { job := realis.NewJob(). diff --git a/zk.go b/zk.go index 4a6f593..f59b906 100644 --- a/zk.go +++ b/zk.go @@ -44,81 +44,66 @@ func (NoopLogger) Printf(format string, a ...interface{}) { // Retrieves current Aurora leader from ZK. func LeaderFromZK(cluster Cluster) (string, error) { - var err error var zkurl string - duration := defaultBackoff.Duration - for step := 0; step < defaultBackoff.Steps; step++ { + retryErr := ExponentialBackoff(defaultBackoff, func() (bool, error) { - // Attempt to find leader - zkurl, err = leaderFromZK(cluster) - if err == nil { - return zkurl, err + endpoints := strings.Split(cluster.ZK, ",") + + //TODO (rdelvalle): When enabling debugging, change logger here + c, _, err := zk.Connect(endpoints, time.Second*10, func(c *zk.Conn) { c.SetLogger(NoopLogger{}) }) + if err != nil { + return false, errors.Wrap(err, "Failed to connect to Zookeeper at "+cluster.ZK) } - // Backoff if we failed to determine leader - adjusted := duration - if defaultBackoff.Jitter > 0.0 { - adjusted = Jitter(duration, defaultBackoff.Jitter) + defer c.Close() + + // Open up descriptor for the ZK path given + children, _, _, err := c.ChildrenW(cluster.SchedZKPath) + if err != nil { + return false, errors.Wrapf(err, "Path %s doesn't exist on Zookeeper ", cluster.SchedZKPath) } - fmt.Printf("Error determining Aurora leader: %v; retrying in %v\n", err, adjusted) - time.Sleep(adjusted) - duration = time.Duration(float64(duration) * defaultBackoff.Factor) + + // Search for the leader through all the children in the given path + serviceInst := new(ServiceInstance) + for _, child := range children { + + // Only the leader will start with member_ + if strings.HasPrefix(child, "member_") { + + data, _, err := c.Get(cluster.SchedZKPath + "/" + child) + if err != nil { + return false, errors.Wrap(err, "Error fetching contents of leader") + } + + err = json.Unmarshal([]byte(data), serviceInst) + if err != nil { + return false, errors.Wrap(err, "Unable to unmarshall contents of leader") + } + + // Should only be one endpoint + if len(serviceInst.AdditionalEndpoints) > 1 { + fmt.Errorf("Ambiguous end points schemes") + } + + var scheme, host, port string + for k, v := range serviceInst.AdditionalEndpoints { + scheme = k + host = v.Host + port = strconv.Itoa(v.Port) + } + + zkurl = scheme + "://" + host + ":" + port + return true, nil + } + } + + return false, errors.New("No leader found") + }) + + if retryErr != nil { + return "", errors.Wrapf(retryErr, "Failed to determine leader after %v attempts", defaultBackoff.Steps) } - return "", errors.Wrapf(err, "Failed to determine leader after %v attempts", defaultBackoff.Steps) -} - -func leaderFromZK(cluster Cluster) (string, error) { - - endpoints := strings.Split(cluster.ZK, ",") - - //TODO (rdelvalle): When enabling debugging, change logger here - c, _, err := zk.Connect(endpoints, time.Second*10, func(c *zk.Conn) { c.SetLogger(NoopLogger{}) }) - if err != nil { - return "", errors.Wrap(err, "Failed to connect to Zookeeper at "+cluster.ZK) - } - - defer c.Close() - - children, _, _, err := c.ChildrenW(cluster.SchedZKPath) - if err != nil { - return "", errors.Wrapf(err, "Path %s doesn't exist on Zookeeper ", cluster.SchedZKPath) - } - - serviceInst := new(ServiceInstance) - - for _, child := range children { - - // Only the leader will start with member_ - if strings.HasPrefix(child, "member_") { - - data, _, err := c.Get(cluster.SchedZKPath + "/" + child) - if err != nil { - return "", errors.Wrap(err, "Error fetching contents of leader") - } - - err = json.Unmarshal([]byte(data), serviceInst) - if err != nil { - return "", errors.Wrap(err, "Unable to unmarshall contents of leader") - } - - // Should only be one endpoint - if len(serviceInst.AdditionalEndpoints) > 1 { - fmt.Errorf("Ambiguous end points schemes") - } - - var scheme, host, port string - for k, v := range serviceInst.AdditionalEndpoints { - scheme = k - host = v.Host - port = strconv.Itoa(v.Port) - } - - return scheme + "://" + host + ":" + port, nil - } - } - - return "", errors.New("No leader found") - + return zkurl, nil }