diff --git a/zk.go b/zk.go index bc60a97..9d5b659 100644 --- a/zk.go +++ b/zk.go @@ -16,6 +16,7 @@ package realis import ( "encoding/json" + "math" "strconv" "strings" "time" @@ -35,6 +36,18 @@ type ServiceInstance struct { Status string `json:"status"` } +type MesosAddress struct { + Hostname string `json:"hostname"` + IP string `json:"ip"` + Port uint16 `json:"port"` +} + +// MesosInstance is defined for mesos json stored in ZK. +type MesosInstance struct { + Address MesosAddress `json:"address"` + Version string `json:"version"` +} + type zkConfig struct { endpoints []string path string @@ -176,3 +189,100 @@ func LeaderFromZKOpts(options ...ZKOpt) (string, error) { return leaderURL, nil } + +// Retrieves current mesos leader from ZK with a custom configuration. +func MesosFromZKOpts(options ...ZKOpt) (string, error) { + var mesosURL string + + // Load the default configuration for Zookeeper followed by overriding values with those provided by the caller. + config := &zkConfig{backoff: defaultBackoff, timeout: time.Second * 10, logger: NoopLogger{}} + for _, opt := range options { + opt(config) + } + + if len(config.endpoints) == 0 { + return "", errors.New("no Zookeeper endpoints supplied") + } + + if config.path == "" { + return "", errors.New("no Zookeeper path supplied") + } + + // Create a closure that allows us to use the ExponentialBackoff function. + retryErr := ExponentialBackoff(config.backoff, config.logger, func() (bool, error) { + + c, _, err := zk.Connect(config.endpoints, config.timeout, func(c *zk.Conn) { c.SetLogger(config.logger) }) + if err != nil { + return false, NewTemporaryError(errors.Wrap(err, "Failed to connect to Zookeeper")) + } + + defer c.Close() + + // Open up descriptor for the ZK path given + children, _, _, err := c.ChildrenW(config.path) + if err != nil { + + // Sentinel error check as there is no other way to check. + if err == zk.ErrInvalidPath { + return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", config.path) + } + + return false, + NewTemporaryError(errors.Wrapf(err, "path %s doesn't exist on Zookeeper ", config.path)) + } + + // Search for the leader through all the children in the given path + minScore := math.MaxInt64 + var mesosInstance MesosInstance + for _, child := range children { + // Only the leader will start with json.info_ + if strings.HasPrefix(child, "json.info_") { + strs := strings.Split(child, "_") + if len(strs) < 2 { + config.logger.Printf("Zk node %v/%v's name is malformed.", config.path, child) + continue + } + score, err := strconv.Atoi(strs[1]) + if err != nil { + return false, NewTemporaryError(errors.Wrap(err, "unable to read the zk node for Mesos.")) + } + + // get the leader from the child with the smallest score. + if score < minScore { + minScore = score + childPath := config.path + "/" + child + data, _, err := c.Get(childPath) + if err != nil { + if err == zk.ErrInvalidPath { + return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", childPath) + } + + return false, NewTemporaryError(errors.Wrap(err, "error fetching contents of leader")) + } + + err = json.Unmarshal([]byte(data), &mesosInstance) + if err != nil { + config.logger.Printf("%s", err) + return false, + NewTemporaryError(errors.Wrap(err, "unable to unmarshal contents of leader")) + } + + mesosURL = mesosInstance.Address.IP + ":" + strconv.Itoa(int(mesosInstance.Address.Port)) + } + } + } + if len(mesosURL) > 0 { + return true, nil + } + + // Leader data might not be available yet, try to fetch again. + return false, NewTemporaryError(errors.New("no leader found")) + }) + + if retryErr != nil { + config.logger.Printf("Failed to determine leader after %v attempts", config.backoff.Steps) + return "", retryErr + } + + return mesosURL, nil +}