Support Australis API to get aurora master nodes and mesos master nodes (#20)
This commit is contained in:
parent
e33a2d99d8
commit
8db625730f
5 changed files with 238 additions and 0 deletions
|
@ -28,6 +28,7 @@ type Cluster struct {
|
|||
ZK string `json:"zk"`
|
||||
ZKPort int `json:"zk_port"`
|
||||
SchedZKPath string `json:"scheduler_zk_path"`
|
||||
MesosZKPath string `json:"mesos_zk_path"`
|
||||
SchedURI string `json:"scheduler_uri"`
|
||||
ProxyURL string `json:"proxy_url"`
|
||||
AuthMechanism string `json:"auth_mechanism"`
|
||||
|
@ -61,6 +62,7 @@ func GetDefaultClusterFromZKUrl(zkURL string) *Cluster {
|
|||
AuthMechanism: "UNAUTHENTICATED",
|
||||
ZK: zkURL,
|
||||
SchedZKPath: "/aurora/scheduler",
|
||||
MesosZKPath: "/mesos",
|
||||
AgentRunDir: "latest",
|
||||
AgentRoot: "/var/lib/mesos",
|
||||
}
|
||||
|
|
|
@ -32,6 +32,7 @@ func TestLoadClusters(t *testing.T) {
|
|||
assert.Equal(t, clusters["devcluster"].Name, "devcluster")
|
||||
assert.Equal(t, clusters["devcluster"].ZK, "192.168.33.7")
|
||||
assert.Equal(t, clusters["devcluster"].SchedZKPath, "/aurora/scheduler")
|
||||
assert.Equal(t, clusters["devcluster"].MesosZKPath, "/mesos")
|
||||
assert.Equal(t, clusters["devcluster"].AuthMechanism, "UNAUTHENTICATED")
|
||||
assert.Equal(t, clusters["devcluster"].AgentRunDir, "latest")
|
||||
assert.Equal(t, clusters["devcluster"].AgentRoot, "/var/lib/mesos")
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
"name": "devcluster",
|
||||
"zk": "192.168.33.7",
|
||||
"scheduler_zk_path": "/aurora/scheduler",
|
||||
"mesos_zk_path": "/mesos",
|
||||
"auth_mechanism": "UNAUTHENTICATED",
|
||||
"slave_run_directory": "latest",
|
||||
"slave_root": "/var/lib/mesos"
|
||||
|
|
|
@ -180,6 +180,35 @@ func TestLeaderFromZK(t *testing.T) {
|
|||
assert.Equal(t, "http://192.168.33.7:8081", url)
|
||||
|
||||
}
|
||||
|
||||
func TestMasterFromZK(t *testing.T) {
|
||||
cluster := realis.GetDefaultClusterFromZKUrl("192.168.33.2:2181")
|
||||
masterNodesMap, err := realis.MasterNodesFromZK(*cluster)
|
||||
|
||||
assert.NoError(t, err)
|
||||
|
||||
for _, hostnames := range masterNodesMap {
|
||||
for _, hostname := range hostnames {
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, "192.168.33.7", hostname)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMesosMasterFromZK(t *testing.T) {
|
||||
cluster := realis.GetDefaultClusterFromZKUrl("192.168.33.2:2181")
|
||||
masterNodesMap, err := realis.MesosMasterNodesFromZK(*cluster)
|
||||
|
||||
assert.NoError(t, err)
|
||||
|
||||
for _, hostnames := range masterNodesMap {
|
||||
for _, hostname := range hostnames {
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, "localhost", hostname)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestInvalidAuroraURL(t *testing.T) {
|
||||
for _, url := range []string{
|
||||
"http://doesntexist.com:8081/apitest",
|
||||
|
|
205
zk.go
205
zk.go
|
@ -286,3 +286,208 @@ func MesosFromZKOpts(options ...ZKOpt) (string, error) {
|
|||
|
||||
return mesosURL, nil
|
||||
}
|
||||
|
||||
// Retrieves current Aurora master nodes from ZK.
|
||||
func MasterNodesFromZK(cluster Cluster) (map[string][]string, error) {
|
||||
return MasterNodesFromZKOpts(ZKEndpoints(strings.Split(cluster.ZK, ",")...), ZKPath(cluster.SchedZKPath))
|
||||
}
|
||||
|
||||
// Retrieves current Mesos master nodes/leader from ZK with a custom configuration.
|
||||
func MasterNodesFromZKOpts(options ...ZKOpt) (map[string][]string, error) {
|
||||
result := make(map[string][]string)
|
||||
|
||||
// Load the default configuration for Zookeeper followed by overriding values with those provided by the caller.
|
||||
config := &zkConfig{backoff: defaultBackoff, timeout: time.Second * 10, logger: NoopLogger{}}
|
||||
for _, opt := range options {
|
||||
opt(config)
|
||||
}
|
||||
|
||||
if len(config.endpoints) == 0 {
|
||||
return nil, errors.New("no Zookeeper endpoints supplied")
|
||||
}
|
||||
|
||||
if config.path == "" {
|
||||
return nil, errors.New("no Zookeeper path supplied")
|
||||
}
|
||||
|
||||
// Create a closure that allows us to use the ExponentialBackoff function.
|
||||
retryErr := ExponentialBackoff(config.backoff, config.logger, func() (bool, error) {
|
||||
|
||||
c, _, err := zk.Connect(config.endpoints, config.timeout, func(c *zk.Conn) { c.SetLogger(config.logger) })
|
||||
if err != nil {
|
||||
return false, NewTemporaryError(errors.Wrap(err, "Failed to connect to Zookeeper"))
|
||||
}
|
||||
|
||||
defer c.Close()
|
||||
|
||||
// Open up descriptor for the ZK path given
|
||||
children, _, _, err := c.ChildrenW(config.path)
|
||||
if err != nil {
|
||||
|
||||
// Sentinel error check as there is no other way to check.
|
||||
if err == zk.ErrInvalidPath {
|
||||
return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", config.path)
|
||||
}
|
||||
|
||||
return false,
|
||||
NewTemporaryError(errors.Wrapf(err, "path %s doesn't exist on Zookeeper ", config.path))
|
||||
}
|
||||
|
||||
// Get all the master nodes through all the children in the given path
|
||||
serviceInst := new(ServiceInstance)
|
||||
var hosts []string
|
||||
for _, child := range children {
|
||||
childPath := config.path + "/" + child
|
||||
data, _, err := c.Get(childPath)
|
||||
if err != nil {
|
||||
if err == zk.ErrInvalidPath {
|
||||
return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", childPath)
|
||||
}
|
||||
|
||||
return false, NewTemporaryError(errors.Wrap(err, "error fetching contents of leader"))
|
||||
}
|
||||
// Only leader is in json format. Have to parse data differently between member_ and not member_
|
||||
if strings.HasPrefix(child, "member_") {
|
||||
err = json.Unmarshal([]byte(data), &serviceInst)
|
||||
if err != nil {
|
||||
return false,
|
||||
NewTemporaryError(errors.Wrap(err, "unable to unmarshal contents of leader"))
|
||||
}
|
||||
// Should only be one endpoint.
|
||||
// This should never be encountered as it would indicate Aurora
|
||||
// writing bad info into Zookeeper but is kept here as a safety net.
|
||||
if len(serviceInst.AdditionalEndpoints) > 1 {
|
||||
return false,
|
||||
NewTemporaryError(
|
||||
errors.New("ambiguous endpoints in json blob, Aurora wrote bad info to ZK"))
|
||||
}
|
||||
|
||||
for _, v := range serviceInst.AdditionalEndpoints {
|
||||
result["leader"] = append(result["leader"], v.Host)
|
||||
}
|
||||
} else {
|
||||
// data is not in a json format
|
||||
hosts = append(hosts, string(data))
|
||||
}
|
||||
}
|
||||
result["masterNodes"] = hosts
|
||||
|
||||
// Master nodes data might not be available yet, try to fetch again.
|
||||
if len(result["masterNodes"]) == 0 {
|
||||
return false, NewTemporaryError(errors.New("no master nodes found"))
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
config.logger.Printf("Failed to get master nodes after %v attempts", config.backoff.Steps)
|
||||
return nil, retryErr
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// Retrieves current Mesos Aurora master nodes from ZK.
|
||||
func MesosMasterNodesFromZK(cluster Cluster) (map[string][]string, error) {
|
||||
return MesosMasterNodesFromZKOpts(ZKEndpoints(strings.Split(cluster.ZK, ",")...), ZKPath(cluster.MesosZKPath))
|
||||
}
|
||||
|
||||
// Retrieves current mesos master nodes/leader from ZK with a custom configuration.
|
||||
func MesosMasterNodesFromZKOpts(options ...ZKOpt) (map[string][]string, error) {
|
||||
result := make(map[string][]string)
|
||||
|
||||
// Load the default configuration for Zookeeper followed by overriding values with those provided by the caller.]
|
||||
config := &zkConfig{backoff: defaultBackoff, timeout: time.Second * 10, logger: NoopLogger{}}
|
||||
for _, opt := range options {
|
||||
opt(config)
|
||||
}
|
||||
|
||||
if len(config.endpoints) == 0 {
|
||||
return nil, errors.New("no Zookeeper endpoints supplied")
|
||||
}
|
||||
|
||||
if config.path == "" {
|
||||
return nil, errors.New("no Zookeeper path supplied")
|
||||
}
|
||||
|
||||
// Create a closure that allows us to use the ExponentialBackoff function.
|
||||
retryErr := ExponentialBackoff(config.backoff, config.logger, func() (bool, error) {
|
||||
|
||||
c, _, err := zk.Connect(config.endpoints, config.timeout, func(c *zk.Conn) { c.SetLogger(config.logger) })
|
||||
if err != nil {
|
||||
return false, NewTemporaryError(errors.Wrap(err, "Failed to connect to Zookeeper"))
|
||||
}
|
||||
|
||||
defer c.Close()
|
||||
|
||||
// Open up descriptor for the ZK path given
|
||||
children, _, _, err := c.ChildrenW(config.path)
|
||||
if err != nil {
|
||||
|
||||
// Sentinel error check as there is no other way to check.
|
||||
if err == zk.ErrInvalidPath {
|
||||
return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", config.path)
|
||||
}
|
||||
|
||||
return false,
|
||||
NewTemporaryError(errors.Wrapf(err, "path %s doesn't exist on Zookeeper ", config.path))
|
||||
}
|
||||
|
||||
// Get all the master nodes through all the children in the given path
|
||||
minScore := math.MaxInt64
|
||||
var mesosInstance MesosInstance
|
||||
var hosts []string
|
||||
for _, child := range children {
|
||||
// Only the master nodes will start with json.info_
|
||||
if strings.HasPrefix(child, "json.info_") {
|
||||
strs := strings.Split(child, "_")
|
||||
if len(strs) < 2 {
|
||||
config.logger.Printf("Zk node %v/%v's name is malformed.", config.path, child)
|
||||
continue
|
||||
}
|
||||
score, err := strconv.Atoi(strs[1])
|
||||
if err != nil {
|
||||
return false, NewTemporaryError(errors.Wrap(err, "unable to read the zk node for Mesos."))
|
||||
}
|
||||
|
||||
childPath := config.path + "/" + child
|
||||
data, _, err := c.Get(childPath)
|
||||
if err != nil {
|
||||
if err == zk.ErrInvalidPath {
|
||||
return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", childPath)
|
||||
}
|
||||
|
||||
return false, NewTemporaryError(errors.Wrap(err, "error fetching contents of leader"))
|
||||
}
|
||||
|
||||
err = json.Unmarshal([]byte(data), &mesosInstance)
|
||||
if err != nil {
|
||||
config.logger.Printf("%s", err)
|
||||
return false,
|
||||
NewTemporaryError(errors.Wrap(err, "unable to unmarshal contents of leader"))
|
||||
}
|
||||
// Combine all master nodes into comma-separated
|
||||
// Return hostname instead of ip to be consistent with aurora master nodes
|
||||
hosts = append(hosts, mesosInstance.Address.Hostname)
|
||||
// get the leader from the child with the smallest score.
|
||||
if score < minScore {
|
||||
minScore = score
|
||||
result["leader"] = append(result["leader"], mesosInstance.Address.Hostname)
|
||||
}
|
||||
}
|
||||
}
|
||||
result["masterNodes"] = hosts
|
||||
// Master nodes data might not be available yet, try to fetch again.
|
||||
if len(result["masterNodes"]) == 0 {
|
||||
return false, NewTemporaryError(errors.New("no mesos master nodes found"))
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
config.logger.Printf("Failed to get mesos master nodes after %v attempts", config.backoff.Steps)
|
||||
return nil, retryErr
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue