diff --git a/zk.go b/zk.go index 9d5b659..8856357 100644 --- a/zk.go +++ b/zk.go @@ -286,3 +286,209 @@ func MesosFromZKOpts(options ...ZKOpt) (string, error) { return mesosURL, nil } + +// Retrieves current Mesos master nodes/leader from ZK with a custom configuration. +func MasterNodesFromZKOpts(options ...ZKOpt) (map[string]string, error) { + result := make(map[string]string) + + // Load the default configuration for Zookeeper followed by overriding values with those provided by the caller. + // 10s timeout is not enough. Use 100s + config := &zkConfig{backoff: defaultBackoff, timeout: time.Second * 100, logger: NoopLogger{}} + for _, opt := range options { + opt(config) + } + + if len(config.endpoints) == 0 { + return nil, errors.New("no Zookeeper endpoints supplied") + } + + if config.path == "" { + return nil, errors.New("no Zookeeper path supplied") + } + + // Create a closure that allows us to use the ExponentialBackoff function. + retryErr := ExponentialBackoff(config.backoff, config.logger, func() (bool, error) { + + c, _, err := zk.Connect(config.endpoints, config.timeout, func(c *zk.Conn) { c.SetLogger(config.logger) }) + if err != nil { + return false, NewTemporaryError(errors.Wrap(err, "Failed to connect to Zookeeper")) + } + + defer c.Close() + + // Open up descriptor for the ZK path given + children, _, _, err := c.ChildrenW(config.path) + if err != nil { + + // Sentinel error check as there is no other way to check. + if err == zk.ErrInvalidPath { + return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", config.path) + } + + return false, + NewTemporaryError(errors.Wrapf(err, "path %s doesn't exist on Zookeeper ", config.path)) + } + + // Get all the master nodes through all the children in the given path + serviceInst := new(ServiceInstance) + var host string + for _, child := range children { + childPath := config.path + "/" + child + data, _, err := c.Get(childPath) + if err != nil { + if err == zk.ErrInvalidPath { + return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", childPath) + } + + return false, NewTemporaryError(errors.Wrap(err, "error fetching contents of leader")) + } + // Only leader is in json format. Have to parse data differently between member_ and not member_ + if strings.HasPrefix(child, "member_") { + err = json.Unmarshal([]byte(data), &serviceInst) + if err != nil { + return false, + NewTemporaryError(errors.Wrap(err, "unable to unmarshal contents of leader")) + } + // Should only be one endpoint. + // This should never be encountered as it would indicate Aurora + // writing bad info into Zookeeper but is kept here as a safety net. + if len(serviceInst.AdditionalEndpoints) > 1 { + return false, + NewTemporaryError( + errors.New("ambiguous endpoints in json blob, Aurora wrote bad info to ZK")) + } + + for _, v := range serviceInst.AdditionalEndpoints { + result["leader"] = v.Host + } + } else { + // data is not in a json format + hostname := string(data) + // Combine all master nodes into comma-separated + if len(host) > 0 { + host += "," + } + host += hostname + result["masterNodes"] = host + } + + } + + // Master nodes data might not be available yet, try to fetch again. + if len(result["leader"]) == 0 || len(result["masterNodes"]) == 0 { + return false, NewTemporaryError(errors.New("no master nodes found")) + } + return true, nil + }) + + if retryErr != nil { + config.logger.Printf("Failed to get master nodes after %v attempts", config.backoff.Steps) + return nil, retryErr + } + + return result, nil +} + +// Retrieves current mesos master nodes/leader from ZK with a custom configuration. +func MesosMasterNodesFromZKOpts(options ...ZKOpt) (map[string]string, error) { + result := make(map[string]string) + + // Load the default configuration for Zookeeper followed by overriding values with those provided by the caller. + // 10s timeout is not enough. Use 100s + config := &zkConfig{backoff: defaultBackoff, timeout: time.Second * 100, logger: NoopLogger{}} + for _, opt := range options { + opt(config) + } + + if len(config.endpoints) == 0 { + return nil, errors.New("no Zookeeper endpoints supplied") + } + + if config.path == "" { + return nil, errors.New("no Zookeeper path supplied") + } + + // Create a closure that allows us to use the ExponentialBackoff function. + retryErr := ExponentialBackoff(config.backoff, config.logger, func() (bool, error) { + + c, _, err := zk.Connect(config.endpoints, config.timeout, func(c *zk.Conn) { c.SetLogger(config.logger) }) + if err != nil { + return false, NewTemporaryError(errors.Wrap(err, "Failed to connect to Zookeeper")) + } + + defer c.Close() + + // Open up descriptor for the ZK path given + children, _, _, err := c.ChildrenW(config.path) + if err != nil { + + // Sentinel error check as there is no other way to check. + if err == zk.ErrInvalidPath { + return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", config.path) + } + + return false, + NewTemporaryError(errors.Wrapf(err, "path %s doesn't exist on Zookeeper ", config.path)) + } + + // Get all the master nodes through all the children in the given path + minScore := math.MaxInt64 + var mesosInstance MesosInstance + var host string + for _, child := range children { + // Only the master nodes will start with json.info_ + if strings.HasPrefix(child, "json.info_") { + strs := strings.Split(child, "_") + if len(strs) < 2 { + config.logger.Printf("Zk node %v/%v's name is malformed.", config.path, child) + continue + } + score, err := strconv.Atoi(strs[1]) + if err != nil { + return false, NewTemporaryError(errors.Wrap(err, "unable to read the zk node for Mesos.")) + } + + childPath := config.path + "/" + child + data, _, err := c.Get(childPath) + if err != nil { + if err == zk.ErrInvalidPath { + return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", childPath) + } + + return false, NewTemporaryError(errors.Wrap(err, "error fetching contents of leader")) + } + + err = json.Unmarshal([]byte(data), &mesosInstance) + if err != nil { + config.logger.Printf("%s", err) + return false, + NewTemporaryError(errors.Wrap(err, "unable to unmarshal contents of leader")) + } + // Combine all master nodes into comma-separated + // Return hostname instead of ip to be consistent with aurora master nodes + if len(host) > 0 { + host += "," + } + host += mesosInstance.Address.Hostname + result["masterNodes"] = host + // get the leader from the child with the smallest score. + if score < minScore { + minScore = score + result["leader"] = mesosInstance.Address.Hostname + } + } + } + // Master nodes data might not be available yet, try to fetch again. + if len(result["leader"]) == 0 || len(result["masterNodes"]) == 0 { + return false, NewTemporaryError(errors.New("no mesos master nodes found")) + } + return true, nil + }) + + if retryErr != nil { + config.logger.Printf("Failed to get mesos master nodes after %v attempts", config.backoff.Steps) + return nil, retryErr + } + + return result, nil +}