Cleaned up ZK connection code by using the backoff function. Added a test to the end to end to test that we're getting the host correctly from ZK. Changed clusters test to be an outside package.
This commit is contained in:
parent
f60aa3dd88
commit
f6a21e0f59
3 changed files with 65 additions and 71 deletions
|
@ -12,17 +12,18 @@
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package realis
|
package realis_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"testing"
|
"testing"
|
||||||
|
"github.com/paypal/gorealis"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestLoadClusters(t *testing.T) {
|
func TestLoadClusters(t *testing.T) {
|
||||||
|
|
||||||
clusters, err := LoadClusters("examples/clusters.json")
|
clusters, err := realis.LoadClusters("examples/clusters.json")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Print(err)
|
fmt.Print(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,6 +55,14 @@ func TestMain(m *testing.M) {
|
||||||
os.Exit(m.Run())
|
os.Exit(m.Run())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestLeaderFromZK(t *testing.T) {
|
||||||
|
cluster := realis.GetDefaultClusterFromZKUrl("192.168.33.7:2181")
|
||||||
|
url, err := realis.LeaderFromZK(*cluster)
|
||||||
|
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, url, "http://aurora.local:8081")
|
||||||
|
}
|
||||||
|
|
||||||
func TestRealisClient_CreateJob_Thermos(t *testing.T) {
|
func TestRealisClient_CreateJob_Thermos(t *testing.T) {
|
||||||
|
|
||||||
job := realis.NewJob().
|
job := realis.NewJob().
|
||||||
|
|
123
zk.go
123
zk.go
|
@ -44,81 +44,66 @@ func (NoopLogger) Printf(format string, a ...interface{}) {
|
||||||
// Retrieves current Aurora leader from ZK.
|
// Retrieves current Aurora leader from ZK.
|
||||||
func LeaderFromZK(cluster Cluster) (string, error) {
|
func LeaderFromZK(cluster Cluster) (string, error) {
|
||||||
|
|
||||||
var err error
|
|
||||||
var zkurl string
|
var zkurl string
|
||||||
|
|
||||||
duration := defaultBackoff.Duration
|
retryErr := ExponentialBackoff(defaultBackoff, func() (bool, error) {
|
||||||
for step := 0; step < defaultBackoff.Steps; step++ {
|
|
||||||
|
|
||||||
// Attempt to find leader
|
endpoints := strings.Split(cluster.ZK, ",")
|
||||||
zkurl, err = leaderFromZK(cluster)
|
|
||||||
if err == nil {
|
//TODO (rdelvalle): When enabling debugging, change logger here
|
||||||
return zkurl, err
|
c, _, err := zk.Connect(endpoints, time.Second*10, func(c *zk.Conn) { c.SetLogger(NoopLogger{}) })
|
||||||
|
if err != nil {
|
||||||
|
return false, errors.Wrap(err, "Failed to connect to Zookeeper at "+cluster.ZK)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Backoff if we failed to determine leader
|
defer c.Close()
|
||||||
adjusted := duration
|
|
||||||
if defaultBackoff.Jitter > 0.0 {
|
// Open up descriptor for the ZK path given
|
||||||
adjusted = Jitter(duration, defaultBackoff.Jitter)
|
children, _, _, err := c.ChildrenW(cluster.SchedZKPath)
|
||||||
|
if err != nil {
|
||||||
|
return false, errors.Wrapf(err, "Path %s doesn't exist on Zookeeper ", cluster.SchedZKPath)
|
||||||
}
|
}
|
||||||
fmt.Printf("Error determining Aurora leader: %v; retrying in %v\n", err, adjusted)
|
|
||||||
time.Sleep(adjusted)
|
// Search for the leader through all the children in the given path
|
||||||
duration = time.Duration(float64(duration) * defaultBackoff.Factor)
|
serviceInst := new(ServiceInstance)
|
||||||
|
for _, child := range children {
|
||||||
|
|
||||||
|
// Only the leader will start with member_
|
||||||
|
if strings.HasPrefix(child, "member_") {
|
||||||
|
|
||||||
|
data, _, err := c.Get(cluster.SchedZKPath + "/" + child)
|
||||||
|
if err != nil {
|
||||||
|
return false, errors.Wrap(err, "Error fetching contents of leader")
|
||||||
|
}
|
||||||
|
|
||||||
|
err = json.Unmarshal([]byte(data), serviceInst)
|
||||||
|
if err != nil {
|
||||||
|
return false, errors.Wrap(err, "Unable to unmarshall contents of leader")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should only be one endpoint
|
||||||
|
if len(serviceInst.AdditionalEndpoints) > 1 {
|
||||||
|
fmt.Errorf("Ambiguous end points schemes")
|
||||||
|
}
|
||||||
|
|
||||||
|
var scheme, host, port string
|
||||||
|
for k, v := range serviceInst.AdditionalEndpoints {
|
||||||
|
scheme = k
|
||||||
|
host = v.Host
|
||||||
|
port = strconv.Itoa(v.Port)
|
||||||
|
}
|
||||||
|
|
||||||
|
zkurl = scheme + "://" + host + ":" + port
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false, errors.New("No leader found")
|
||||||
|
})
|
||||||
|
|
||||||
|
if retryErr != nil {
|
||||||
|
return "", errors.Wrapf(retryErr, "Failed to determine leader after %v attempts", defaultBackoff.Steps)
|
||||||
}
|
}
|
||||||
|
|
||||||
return "", errors.Wrapf(err, "Failed to determine leader after %v attempts", defaultBackoff.Steps)
|
return zkurl, nil
|
||||||
}
|
|
||||||
|
|
||||||
func leaderFromZK(cluster Cluster) (string, error) {
|
|
||||||
|
|
||||||
endpoints := strings.Split(cluster.ZK, ",")
|
|
||||||
|
|
||||||
//TODO (rdelvalle): When enabling debugging, change logger here
|
|
||||||
c, _, err := zk.Connect(endpoints, time.Second*10, func(c *zk.Conn) { c.SetLogger(NoopLogger{}) })
|
|
||||||
if err != nil {
|
|
||||||
return "", errors.Wrap(err, "Failed to connect to Zookeeper at "+cluster.ZK)
|
|
||||||
}
|
|
||||||
|
|
||||||
defer c.Close()
|
|
||||||
|
|
||||||
children, _, _, err := c.ChildrenW(cluster.SchedZKPath)
|
|
||||||
if err != nil {
|
|
||||||
return "", errors.Wrapf(err, "Path %s doesn't exist on Zookeeper ", cluster.SchedZKPath)
|
|
||||||
}
|
|
||||||
|
|
||||||
serviceInst := new(ServiceInstance)
|
|
||||||
|
|
||||||
for _, child := range children {
|
|
||||||
|
|
||||||
// Only the leader will start with member_
|
|
||||||
if strings.HasPrefix(child, "member_") {
|
|
||||||
|
|
||||||
data, _, err := c.Get(cluster.SchedZKPath + "/" + child)
|
|
||||||
if err != nil {
|
|
||||||
return "", errors.Wrap(err, "Error fetching contents of leader")
|
|
||||||
}
|
|
||||||
|
|
||||||
err = json.Unmarshal([]byte(data), serviceInst)
|
|
||||||
if err != nil {
|
|
||||||
return "", errors.Wrap(err, "Unable to unmarshall contents of leader")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Should only be one endpoint
|
|
||||||
if len(serviceInst.AdditionalEndpoints) > 1 {
|
|
||||||
fmt.Errorf("Ambiguous end points schemes")
|
|
||||||
}
|
|
||||||
|
|
||||||
var scheme, host, port string
|
|
||||||
for k, v := range serviceInst.AdditionalEndpoints {
|
|
||||||
scheme = k
|
|
||||||
host = v.Host
|
|
||||||
port = strconv.Itoa(v.Port)
|
|
||||||
}
|
|
||||||
|
|
||||||
return scheme + "://" + host + ":" + port, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return "", errors.New("No leader found")
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue