diff --git a/README.md b/README.md index cd6af0f..ccc3c18 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,10 @@ # gorealis [![GoDoc](https://godoc.org/github.com/rdelval/gorealis?status.svg)](https://godoc.org/github.com/rdelval/gorealis) -Go library for communicating with [Apache Aurora](https://github.com/apache/aurora). -Named after the northern lights (Aurora Borealis). +Go library for interacting with [Apache Aurora](https://github.com/apache/aurora). ### Aurora version compatibility Please see [.auroraversion](./.auroraversion) to see the latest Aurora version against which this -library has been tested. Vendoring a working version of this library is highly recommended. +library has been tested. ## Usage @@ -17,9 +16,5 @@ library has been tested. Vendoring a working version of this library is highly r * Create or import a custom transport that uses https://github.com/jmcvetta/napping to improve efficiency * End to end testing with Vagrant setup -## Importing -* We suggest using a vendoring tool such as [govendor](https://github.com/kardianos/govendor) and -fetching by version, for example: `govendor fetch github.com/rdelval/gorealis@v1` - ## Contributions -Contributions are very much welcome. Please raise an issue so that the contribution may be discussed before it's made. \ No newline at end of file +Contributions are always welcome. Please raise an issue so that the contribution may be discussed before it's made. \ No newline at end of file diff --git a/examples/client.go b/examples/client.go index 34482aa..7cdfc82 100644 --- a/examples/client.go +++ b/examples/client.go @@ -20,6 +20,8 @@ import ( "io/ioutil" "os" + "time" + "github.com/rdelval/gorealis" "github.com/rdelval/gorealis/gen-go/apache/aurora" "github.com/rdelval/gorealis/response" @@ -63,6 +65,13 @@ func main() { var monitor *realis.Monitor var r realis.Realis + var defaultBackoff = &realis.Backoff{ + Steps: 2, + Duration: 10 * time.Second, + Factor: 2.0, + Jitter: 0.1, + } + //check if zkUrl is available. if *zkUrl != "" { fmt.Println("zkUrl: ", *zkUrl) @@ -75,7 +84,8 @@ func main() { } fmt.Printf("cluster: %+v \n", cluster) - r, err = realis.NewDefaultClientUsingCluster(cluster, *username, *password) + //r, err = realis.NewRealisClient(realis.ZKCluster(cluster), realis.BasicAuth(*username, *password), realis.ThriftJSON(), realis.TimeoutMS(15000)) + r, err = realis.NewRealisClient(realis.ZKUrl(*zkUrl), realis.BasicAuth(*username, *password), realis.ThriftJSON(), realis.TimeoutMS(15000), realis.BackOff(defaultBackoff)) if err != nil { fmt.Println(err) os.Exit(1) @@ -83,11 +93,12 @@ func main() { monitor = &realis.Monitor{r} } else { - r, err = realis.NewDefaultClientUsingUrl(*url, *username, *password) + r, err = realis.NewRealisClient(realis.SchedulerUrl(*url), realis.BasicAuth(*username, *password), realis.ThriftJSON(), realis.TimeoutMS(20000), realis.BackOff(defaultBackoff)) if err != nil { fmt.Println(err) os.Exit(1) } + monitor = &realis.Monitor{r} } defer r.Close() diff --git a/monitors.go b/monitors.go index d7f863f..96face0 100644 --- a/monitors.go +++ b/monitors.go @@ -36,13 +36,14 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout Limit: 1, } - duration := defaultBackoff.Duration + defaultBackoff := m.Client.RealisConfig().backoff + duration := defaultBackoff.Duration //defaultBackoff.Duration var err error var respDetail *aurora.Response for i := 0; i*interval <= timeout; i++ { - for i := 0; i < defaultBackoff.Steps; i++ { - if i != 0 { + for step := 0; step < defaultBackoff.Steps; step++ { + if step != 0 { adjusted := duration if defaultBackoff.Jitter > 0.0 { adjusted = Jitter(duration, defaultBackoff.Jitter) @@ -94,13 +95,15 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout } func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval int, timeout int) (bool, error) { + + defaultBackoff := m.Client.RealisConfig().backoff duration := defaultBackoff.Duration var err error var live map[int32]bool for i := 0; i*interval < timeout; i++ { - for i := 0; i < defaultBackoff.Steps; i++ { - if i != 0 { + for step := 0; step < defaultBackoff.Steps; step++ { + if step != 0 { adjusted := duration if defaultBackoff.Jitter > 0.0 { adjusted = Jitter(duration, defaultBackoff.Jitter) diff --git a/realis.go b/realis.go index a81e127..a604eec 100644 --- a/realis.go +++ b/realis.go @@ -31,6 +31,8 @@ import ( "github.com/rdelval/gorealis/response" ) +const VERSION = "1.0.4" + type Realis interface { AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) @@ -51,6 +53,7 @@ type Realis interface { StartCronJob(key *aurora.JobKey) (*aurora.Response, error) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error) ReestablishConn() error + RealisConfig() *RealisConfig Close() } @@ -60,13 +63,170 @@ type realisClient struct { readonlyClient *aurora.ReadOnlySchedulerClient } +type option func(*RealisConfig) + +//Config sets for options in RealisConfig. +func BasicAuth(username, password string) option { + + return func(config *RealisConfig) { + config.username = username + config.password = password + } +} + +func SchedulerUrl(url string) option { + return func(config *RealisConfig) { + config.url = url + } +} + +func TimeoutMS(timeout int) option { + return func(config *RealisConfig) { + config.timeoutms = timeout + } +} + +func ZKCluster(cluster *Cluster) option { + return func(config *RealisConfig) { + config.cluster = cluster + } +} + +func ZKUrl(url string) option { + return func(config *RealisConfig) { + config.cluster = GetDefaultClusterFromZKUrl(url) + } +} + +func Retries(backoff *Backoff) option { + return func(config *RealisConfig) { + config.backoff = backoff + } +} + +func ThriftJSON() option { + return func(config *RealisConfig) { + config.jsonTransport = true + } +} + +func ThriftBinary() option { + return func(config *RealisConfig) { + config.binTransport = true + } +} + +func BackOff(b *Backoff) option { + return func(config *RealisConfig) { + config.backoff = b + } +} + +func newTJSONTransport(url string, timeout int) (thrift.TTransport, error) { + + trans, err := defaultTTransport(url, timeout) + if err != nil { + return nil, errors.Wrap(err, "Error creating realis") + } + httpTrans := (trans).(*thrift.THttpClient) + httpTrans.SetHeader("Content-Type", "application/x-thrift") + httpTrans.SetHeader("User-Agent", "GoRealis v"+VERSION) + return trans, err +} + +func newTBinTransport(url string, timeout int) (thrift.TTransport, error) { + trans, err := defaultTTransport(url, timeout) + if err != nil { + return nil, errors.Wrap(err, "Error creating realis") + } + httpTrans := (trans).(*thrift.THttpClient) + httpTrans.DelHeader("Content-Type") // Workaround for using thrift HttpPostClient + httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary") + httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary") + httpTrans.SetHeader("User-Agent", "GoRealis v"+VERSION) + + return trans, err +} + +func NewRealisClient(options ...option) (Realis, error) { + config := &RealisConfig{} + fmt.Println(" options length: ", options) + for _, opt := range options { + opt(config) + } + //Default timeout + if config.timeoutms == 0 { + config.timeoutms = 10000 + } + //Set default Transport to JSON if needed. + if !config.jsonTransport && !config.binTransport { + config.jsonTransport = true + } + var url string + var err error + //Cluster or URL? + if config.cluster != nil { + url, err = LeaderFromZK(*config.cluster) + if err != nil { + fmt.Errorf("LeaderFromZK error: %+v\n ", err) + } + fmt.Println("schedURLFromZK: ", url) + } else if config.url != "" { + fmt.Println("Scheduler URL: ", config.url) + url = config.url + } else { + return nil, errors.New("Incomplete Options -- url or cluster required") + } + + if config.jsonTransport { + trans, err := newTJSONTransport(url, config.timeoutms) + if err != nil { + return nil, errors.Wrap(err, "Error creating realis") + } + + config.transport = trans + config.protoFactory = thrift.NewTJSONProtocolFactory() + } else if config.binTransport { + trans, err := newTBinTransport(url, config.timeoutms) + if err != nil { + return nil, errors.Wrap(err, "Error creating realis") + } + config.transport = trans + config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault() + } + + //Basic Authentication. + if config.username != "" && config.password != "" { + AddBasicAuth(config, config.username, config.password) + } + + //Set defaultBackoff if required. + if config.backoff == nil { + config.backoff = &defaultBackoff + } else { + defaultBackoff = *config.backoff + fmt.Printf(" updating default backoff : %+v\n", *config.backoff) + } + + fmt.Printf("gorealis config: %+v\n", config) + + return &realisClient{ + config: config, + client: aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory), + readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory)}, nil + +} + // Wrapper object to provide future flexibility type RealisConfig struct { - username, password string - url string - cluster *Cluster - transport thrift.TTransport - protoFactory thrift.TProtocolFactory + username, password string + url string + timeoutms int + binTransport, jsonTransport bool + cluster *Cluster + backoff *Backoff + transport thrift.TTransport + protoFactory thrift.TProtocolFactory } type Backoff struct { @@ -225,6 +385,7 @@ func newTJSONConfig(url string, timeoutms int) (*RealisConfig, error) { httpTrans := (trans).(*thrift.THttpClient) httpTrans.SetHeader("Content-Type", "application/x-thrift") + httpTrans.SetHeader("User-Agent", "GoRealis v"+VERSION) return &RealisConfig{transport: trans, protoFactory: thrift.NewTJSONProtocolFactory()}, nil } @@ -237,9 +398,11 @@ func newTBinaryConfig(url string, timeoutms int) (*RealisConfig, error) { } httpTrans := (trans).(*thrift.THttpClient) + httpTrans.DelHeader("Content-Type") // Workaround for using thrift HttpPostClient + httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary") httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary") - httpTrans.SetHeader("User-Agent", "GoRealis v1.0.4") + httpTrans.SetHeader("User-Agent", "GoRealis v"+VERSION) return &RealisConfig{transport: trans, protoFactory: thrift.NewTBinaryProtocolFactoryDefault()}, nil @@ -263,34 +426,60 @@ func (r *realisClient) ReestablishConn() error { fmt.Println("ReestablishConn begin ....") r.Close() //First check cluster object for re-establish; if not available then try with scheduler url. + //var config *RealisConfig + var err error + var url string + if r.config.cluster != nil && r.config.username != "" && r.config.password != "" { //Re-establish using cluster object. - url, err := LeaderFromZK(*r.config.cluster) + url, err = LeaderFromZK(*r.config.cluster) if err != nil { fmt.Errorf("LeaderFromZK error: %+v\n ", err) } fmt.Println("ReestablishConn url: ", url) - config, err := newDefaultConfig(url, 10000) + if r.config.jsonTransport { + trans, err := newTJSONTransport(url, r.config.timeoutms) + if err != nil { + return errors.Wrap(err, "Error creating realis") + } + r.config.transport = trans + r.config.protoFactory = thrift.NewTJSONProtocolFactory() + } else if r.config.binTransport { + trans, err := newTBinTransport(url, r.config.timeoutms) + if err != nil { + return errors.Wrap(err, "Error creating realis") + } + r.config.transport = trans + r.config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault() + } if err != nil { - fmt.Println(err) + fmt.Println("error creating config: ", err) } // Configured for basic-auth - AddBasicAuth(config, r.config.username, r.config.password) - config.cluster = r.config.cluster - r.config = config - r.client = aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory) - r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory) + AddBasicAuth(r.config, r.config.username, r.config.password) + r.client = aurora.NewAuroraSchedulerManagerClientFactory(r.config.transport, r.config.protoFactory) + r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(r.config.transport, r.config.protoFactory) } else if r.config.url != "" && r.config.username != "" && r.config.password != "" { //Re-establish using scheduler url. - //Create new configuration with default transport layer - config, err := newDefaultConfig(r.config.url, 10000) - if err != nil { - fmt.Println(err) + fmt.Println("ReestablishConn url: ", r.config.url) + if r.config.jsonTransport { + trans, err := newTJSONTransport(url, r.config.timeoutms) + if err != nil { + return errors.Wrap(err, "Error creating realis") + } + r.config.transport = trans + r.config.protoFactory = thrift.NewTJSONProtocolFactory() + } else if r.config.binTransport { + trans, err := newTBinTransport(url, r.config.timeoutms) + if err != nil { + return errors.Wrap(err, "Error creating realis") + } + r.config.transport = trans + r.config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault() } - AddBasicAuth(config, r.config.username, r.config.password) - r.config = config - r.client = aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory) - r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory) + AddBasicAuth(r.config, r.config.username, r.config.password) + r.client = aurora.NewAuroraSchedulerManagerClientFactory(r.config.transport, r.config.protoFactory) + r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(r.config.transport, r.config.protoFactory) } else { fmt.Println(" Missing Data for ReestablishConn ") fmt.Println(" r.config.cluster: ", r.config.cluster) @@ -299,6 +488,7 @@ func (r *realisClient) ReestablishConn() error { fmt.Println(" r.config.url: ", r.config.url) return errors.New(" Missing Data for ReestablishConn ") } + fmt.Printf(" config before return: %+v\n", r.config) return nil } @@ -318,6 +508,7 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -356,6 +547,7 @@ func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -385,6 +577,8 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a instanceIds := make(map[int32]bool) var resp *aurora.Response var err error + + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for _, instId := range instances { @@ -413,6 +607,10 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a return nil, errors.Wrap(err, "Error sending Kill command to Aurora Scheduler") } +func (r *realisClient) RealisConfig() *RealisConfig { + return r.config +} + // Sends a kill message to the scheduler for all active tasks under a job. func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { @@ -425,7 +623,7 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { } if len(instanceIds) > 0 { - + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -459,9 +657,11 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { + fmt.Println(" STEPS: ", i) adjusted := duration if defaultBackoff.Jitter > 0.0 { adjusted = Jitter(duration, defaultBackoff.Jitter) @@ -470,10 +670,11 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { time.Sleep(adjusted) duration = time.Duration(float64(duration) * defaultBackoff.Factor) } - + fmt.Println(" calling CreateJob") if resp, err = r.client.CreateJob(auroraJob.JobConfig()); err == nil { return response.ResponseCodeCheck(resp) } + fmt.Println("CreateJob err: %+v\n", err) err1 := r.ReestablishConn() if err1 != nil { fmt.Println("error in ReestablishConn: ", err1) @@ -486,6 +687,7 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -514,6 +716,7 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -542,6 +745,7 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -575,6 +779,8 @@ func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) } var resp *aurora.Response var err error + + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -610,6 +816,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) var resp *aurora.Response var err error if len(instanceIds) > 0 { + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -644,6 +851,7 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -673,6 +881,7 @@ func (r *realisClient) AbortJobUpdate( var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -702,6 +911,7 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -750,6 +960,7 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.S var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -795,6 +1006,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -844,6 +1056,7 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -870,6 +1083,7 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { diff --git a/realis_e2e_test.go b/realis_e2e_test.go index bbcda61..465e085 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -28,8 +28,10 @@ var r Realis var thermosPayload []byte func TestMain(m *testing.M) { + var err error + // New configuration to connect to Vagrant image - config, err := NewDefaultConfig("http://192.168.33.7:8081",10000) + r, err = NewDefaultClientUsingUrl("http://192.168.33.7:8081","aurora", "secret") if err != nil { fmt.Println("Please run vagrant box before running test suite") os.Exit(1) @@ -41,10 +43,6 @@ func TestMain(m *testing.M) { os.Exit(1) } - // Configured for vagrant - AddBasicAuth(&config, "aurora", "secret") - r = NewClient(config) - os.Exit(m.Run()) } diff --git a/vendor/git.apache.org/thrift.git/lib/go/thrift/http_client.go b/vendor/git.apache.org/thrift.git/lib/go/thrift/http_client.go index f47b3cf..88eb2c1 100644 --- a/vendor/git.apache.org/thrift.git/lib/go/thrift/http_client.go +++ b/vendor/git.apache.org/thrift.git/lib/go/thrift/http_client.go @@ -103,7 +103,7 @@ func NewTHttpClientWithOptions(urlstr string, options THttpClientOptions) (TTran if client == nil { client = DefaultHttpClient } - httpHeader := map[string][]string{} + httpHeader := map[string][]string{"Content-Type": []string{"application/x-thrift"}} return &THttpClient{client: client, response: response, url: parsedURL, header: httpHeader}, nil } @@ -121,7 +121,7 @@ func NewTHttpPostClientWithOptions(urlstr string, options THttpClientOptions) (T if client == nil { client = DefaultHttpClient } - httpHeader := map[string][]string{} + httpHeader := map[string][]string{"Content-Type": []string{"application/x-thrift"}} return &THttpClient{client: client, url: parsedURL, requestBuffer: bytes.NewBuffer(buf), header: httpHeader}, nil } diff --git a/zk.go b/zk.go index a301fcd..1c0c61c 100644 --- a/zk.go +++ b/zk.go @@ -17,11 +17,12 @@ package realis import ( "encoding/json" "fmt" - "github.com/pkg/errors" - "github.com/samuel/go-zookeeper/zk" "strconv" "strings" "time" + + "github.com/pkg/errors" + "github.com/samuel/go-zookeeper/zk" ) type Endpoint struct { @@ -43,6 +44,33 @@ func (NoopLogger) Printf(format string, a ...interface{}) { // Loads leader from ZK endpoint. func LeaderFromZK(cluster Cluster) (string, error) { + var err error + var zkurl string + + duration := defaultBackoff.Duration + for i := 0; i < defaultBackoff.Steps; i++ { + if i != 0 { + adjusted := duration + if defaultBackoff.Jitter > 0.0 { + adjusted = Jitter(duration, defaultBackoff.Jitter) + } + fmt.Println(" sleeping for: ", adjusted) + time.Sleep(adjusted) + duration = time.Duration(float64(duration) * defaultBackoff.Factor) + } + if zkurl, err = leaderFromZK(cluster); err == nil { + return zkurl, err + } + if err != nil { + fmt.Println("error in LeaderFromZK: ", err) + } + } + + return "", err +} + +func leaderFromZK(cluster Cluster) (string, error) { + endpoints := strings.Split(cluster.ZK, ",") //TODO (rdelvalle): When enabling debugging, change logger here @@ -92,4 +120,5 @@ func LeaderFromZK(cluster Cluster) (string, error) { } return "", errors.New("No leader found") + }