diff --git a/clusters.go b/clusters.go index 49e93f6..c0e81dd 100644 --- a/clusters.go +++ b/clusters.go @@ -28,6 +28,7 @@ type Cluster struct { ZK string `json:"zk"` ZKPort int `json:"zk_port"` SchedZKPath string `json:"scheduler_zk_path"` + MesosZKPath string `json:"mesos_zk_path"` SchedURI string `json:"scheduler_uri"` ProxyURL string `json:"proxy_url"` AuthMechanism string `json:"auth_mechanism"` @@ -61,6 +62,7 @@ func GetDefaultClusterFromZKUrl(zkURL string) *Cluster { AuthMechanism: "UNAUTHENTICATED", ZK: zkURL, SchedZKPath: "/aurora/scheduler", + MesosZKPath: "/mesos", AgentRunDir: "latest", AgentRoot: "/var/lib/mesos", } diff --git a/clusters_test.go b/clusters_test.go index 48b2f03..f0bfd54 100644 --- a/clusters_test.go +++ b/clusters_test.go @@ -32,6 +32,7 @@ func TestLoadClusters(t *testing.T) { assert.Equal(t, clusters["devcluster"].Name, "devcluster") assert.Equal(t, clusters["devcluster"].ZK, "192.168.33.7") assert.Equal(t, clusters["devcluster"].SchedZKPath, "/aurora/scheduler") + assert.Equal(t, clusters["devcluster"].MesosZKPath, "/mesos") assert.Equal(t, clusters["devcluster"].AuthMechanism, "UNAUTHENTICATED") assert.Equal(t, clusters["devcluster"].AgentRunDir, "latest") assert.Equal(t, clusters["devcluster"].AgentRoot, "/var/lib/mesos") diff --git a/examples/clusters.json b/examples/clusters.json index c456bd8..33723a5 100644 --- a/examples/clusters.json +++ b/examples/clusters.json @@ -2,6 +2,7 @@ "name": "devcluster", "zk": "192.168.33.7", "scheduler_zk_path": "/aurora/scheduler", + "mesos_zk_path": "/mesos", "auth_mechanism": "UNAUTHENTICATED", "slave_run_directory": "latest", "slave_root": "/var/lib/mesos" diff --git a/realis_e2e_test.go b/realis_e2e_test.go index bf95f72..1f544d8 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -180,6 +180,35 @@ func TestLeaderFromZK(t *testing.T) { assert.Equal(t, "http://192.168.33.7:8081", url) } + +func TestMasterFromZK(t *testing.T) { + cluster := realis.GetDefaultClusterFromZKUrl("192.168.33.2:2181") + masterNodesMap, err := realis.MasterNodesFromZK(*cluster) + + assert.NoError(t, err) + + for _, hostnames := range masterNodesMap { + for _, hostname := range hostnames { + assert.NoError(t, err) + assert.Equal(t, "192.168.33.7", hostname) + } + } +} + +func TestMesosMasterFromZK(t *testing.T) { + cluster := realis.GetDefaultClusterFromZKUrl("192.168.33.2:2181") + masterNodesMap, err := realis.MesosMasterNodesFromZK(*cluster) + + assert.NoError(t, err) + + for _, hostnames := range masterNodesMap { + for _, hostname := range hostnames { + assert.NoError(t, err) + assert.Equal(t, "localhost", hostname) + } + } +} + func TestInvalidAuroraURL(t *testing.T) { for _, url := range []string{ "http://doesntexist.com:8081/apitest", diff --git a/zk.go b/zk.go index 9d5b659..b1c4ad4 100644 --- a/zk.go +++ b/zk.go @@ -286,3 +286,208 @@ func MesosFromZKOpts(options ...ZKOpt) (string, error) { return mesosURL, nil } + +// Retrieves current Aurora master nodes from ZK. +func MasterNodesFromZK(cluster Cluster) (map[string][]string, error) { + return MasterNodesFromZKOpts(ZKEndpoints(strings.Split(cluster.ZK, ",")...), ZKPath(cluster.SchedZKPath)) +} + +// Retrieves current Mesos master nodes/leader from ZK with a custom configuration. +func MasterNodesFromZKOpts(options ...ZKOpt) (map[string][]string, error) { + result := make(map[string][]string) + + // Load the default configuration for Zookeeper followed by overriding values with those provided by the caller. + config := &zkConfig{backoff: defaultBackoff, timeout: time.Second * 10, logger: NoopLogger{}} + for _, opt := range options { + opt(config) + } + + if len(config.endpoints) == 0 { + return nil, errors.New("no Zookeeper endpoints supplied") + } + + if config.path == "" { + return nil, errors.New("no Zookeeper path supplied") + } + + // Create a closure that allows us to use the ExponentialBackoff function. + retryErr := ExponentialBackoff(config.backoff, config.logger, func() (bool, error) { + + c, _, err := zk.Connect(config.endpoints, config.timeout, func(c *zk.Conn) { c.SetLogger(config.logger) }) + if err != nil { + return false, NewTemporaryError(errors.Wrap(err, "Failed to connect to Zookeeper")) + } + + defer c.Close() + + // Open up descriptor for the ZK path given + children, _, _, err := c.ChildrenW(config.path) + if err != nil { + + // Sentinel error check as there is no other way to check. + if err == zk.ErrInvalidPath { + return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", config.path) + } + + return false, + NewTemporaryError(errors.Wrapf(err, "path %s doesn't exist on Zookeeper ", config.path)) + } + + // Get all the master nodes through all the children in the given path + serviceInst := new(ServiceInstance) + var hosts []string + for _, child := range children { + childPath := config.path + "/" + child + data, _, err := c.Get(childPath) + if err != nil { + if err == zk.ErrInvalidPath { + return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", childPath) + } + + return false, NewTemporaryError(errors.Wrap(err, "error fetching contents of leader")) + } + // Only leader is in json format. Have to parse data differently between member_ and not member_ + if strings.HasPrefix(child, "member_") { + err = json.Unmarshal([]byte(data), &serviceInst) + if err != nil { + return false, + NewTemporaryError(errors.Wrap(err, "unable to unmarshal contents of leader")) + } + // Should only be one endpoint. + // This should never be encountered as it would indicate Aurora + // writing bad info into Zookeeper but is kept here as a safety net. + if len(serviceInst.AdditionalEndpoints) > 1 { + return false, + NewTemporaryError( + errors.New("ambiguous endpoints in json blob, Aurora wrote bad info to ZK")) + } + + for _, v := range serviceInst.AdditionalEndpoints { + result["leader"] = append(result["leader"], v.Host) + } + } else { + // data is not in a json format + hosts = append(hosts, string(data)) + } + } + result["masterNodes"] = hosts + + // Master nodes data might not be available yet, try to fetch again. + if len(result["masterNodes"]) == 0 { + return false, NewTemporaryError(errors.New("no master nodes found")) + } + return true, nil + }) + + if retryErr != nil { + config.logger.Printf("Failed to get master nodes after %v attempts", config.backoff.Steps) + return nil, retryErr + } + + return result, nil +} + +// Retrieves current Mesos Aurora master nodes from ZK. +func MesosMasterNodesFromZK(cluster Cluster) (map[string][]string, error) { + return MesosMasterNodesFromZKOpts(ZKEndpoints(strings.Split(cluster.ZK, ",")...), ZKPath(cluster.MesosZKPath)) +} + +// Retrieves current mesos master nodes/leader from ZK with a custom configuration. +func MesosMasterNodesFromZKOpts(options ...ZKOpt) (map[string][]string, error) { + result := make(map[string][]string) + + // Load the default configuration for Zookeeper followed by overriding values with those provided by the caller.] + config := &zkConfig{backoff: defaultBackoff, timeout: time.Second * 10, logger: NoopLogger{}} + for _, opt := range options { + opt(config) + } + + if len(config.endpoints) == 0 { + return nil, errors.New("no Zookeeper endpoints supplied") + } + + if config.path == "" { + return nil, errors.New("no Zookeeper path supplied") + } + + // Create a closure that allows us to use the ExponentialBackoff function. + retryErr := ExponentialBackoff(config.backoff, config.logger, func() (bool, error) { + + c, _, err := zk.Connect(config.endpoints, config.timeout, func(c *zk.Conn) { c.SetLogger(config.logger) }) + if err != nil { + return false, NewTemporaryError(errors.Wrap(err, "Failed to connect to Zookeeper")) + } + + defer c.Close() + + // Open up descriptor for the ZK path given + children, _, _, err := c.ChildrenW(config.path) + if err != nil { + + // Sentinel error check as there is no other way to check. + if err == zk.ErrInvalidPath { + return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", config.path) + } + + return false, + NewTemporaryError(errors.Wrapf(err, "path %s doesn't exist on Zookeeper ", config.path)) + } + + // Get all the master nodes through all the children in the given path + minScore := math.MaxInt64 + var mesosInstance MesosInstance + var hosts []string + for _, child := range children { + // Only the master nodes will start with json.info_ + if strings.HasPrefix(child, "json.info_") { + strs := strings.Split(child, "_") + if len(strs) < 2 { + config.logger.Printf("Zk node %v/%v's name is malformed.", config.path, child) + continue + } + score, err := strconv.Atoi(strs[1]) + if err != nil { + return false, NewTemporaryError(errors.Wrap(err, "unable to read the zk node for Mesos.")) + } + + childPath := config.path + "/" + child + data, _, err := c.Get(childPath) + if err != nil { + if err == zk.ErrInvalidPath { + return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", childPath) + } + + return false, NewTemporaryError(errors.Wrap(err, "error fetching contents of leader")) + } + + err = json.Unmarshal([]byte(data), &mesosInstance) + if err != nil { + config.logger.Printf("%s", err) + return false, + NewTemporaryError(errors.Wrap(err, "unable to unmarshal contents of leader")) + } + // Combine all master nodes into comma-separated + // Return hostname instead of ip to be consistent with aurora master nodes + hosts = append(hosts, mesosInstance.Address.Hostname) + // get the leader from the child with the smallest score. + if score < minScore { + minScore = score + result["leader"] = append(result["leader"], mesosInstance.Address.Hostname) + } + } + } + result["masterNodes"] = hosts + // Master nodes data might not be available yet, try to fetch again. + if len(result["masterNodes"]) == 0 { + return false, NewTemporaryError(errors.New("no mesos master nodes found")) + } + return true, nil + }) + + if retryErr != nil { + config.logger.Printf("Failed to get mesos master nodes after %v attempts", config.backoff.Steps) + return nil, retryErr + } + + return result, nil +}