From e57dc98d65df0ec1dbb53b41904e3d2e4b0ee5cd Mon Sep 17 00:00:00 2001 From: Kumar Krishna Date: Thu, 6 Apr 2017 23:15:44 -0700 Subject: [PATCH] add resiliency for LeaderFromZK and other fixes . --- monitors.go | 4 ++-- realis.go | 62 +++++++++++++++++++++++++++++++---------------------- zk.go | 33 ++++++++++++++++++++++++++-- 3 files changed, 69 insertions(+), 30 deletions(-) diff --git a/monitors.go b/monitors.go index d6c1c7e..96face0 100644 --- a/monitors.go +++ b/monitors.go @@ -43,7 +43,7 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout for i := 0; i*interval <= timeout; i++ { for step := 0; step < defaultBackoff.Steps; step++ { - if i != 0 { + if step != 0 { adjusted := duration if defaultBackoff.Jitter > 0.0 { adjusted = Jitter(duration, defaultBackoff.Jitter) @@ -103,7 +103,7 @@ func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval int, t for i := 0; i*interval < timeout; i++ { for step := 0; step < defaultBackoff.Steps; step++ { - if i != 0 { + if step != 0 { adjusted := duration if defaultBackoff.Jitter > 0.0 { adjusted = Jitter(duration, defaultBackoff.Jitter) diff --git a/realis.go b/realis.go index ac78408..0907699 100644 --- a/realis.go +++ b/realis.go @@ -122,6 +122,32 @@ func BackOff(b *Backoff) option { } } +func newTJSONTransport(url string, timeout int) (thrift.TTransport, error) { + + trans, err := defaultTTransport(url, timeout) + if err != nil { + return nil, errors.Wrap(err, "Error creating realis") + } + httpTrans := (trans).(*thrift.THttpClient) + httpTrans.SetHeader("Content-Type", "application/x-thrift") + httpTrans.SetHeader("User-Agent", "GoRealis v"+VERSION) + return trans, err +} + +func newTBinTransport(url string, timeout int) (thrift.TTransport, error) { + trans, err := defaultTTransport(url, timeout) + if err != nil { + return nil, errors.Wrap(err, "Error creating realis") + } + httpTrans := (trans).(*thrift.THttpClient) + httpTrans.DelHeader("Content-Type") // Workaround for using thrift HttpPostClient + httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary") + httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary") + httpTrans.SetHeader("User-Agent", "GoRealis v"+VERSION) + + return trans, err +} + func NewRealisClient(options ...option) (Realis, error) { config := &RealisConfig{} fmt.Println(" options length: ", options) @@ -153,23 +179,18 @@ func NewRealisClient(options ...option) (Realis, error) { } if config.jsonTransport { - trans, err := defaultTTransport(url, config.timeoutms) + trans, err := newTJSONTransport(url, config.timeoutms) if err != nil { return nil, errors.Wrap(err, "Error creating realis") } - httpTrans := (trans).(*thrift.THttpClient) - httpTrans.SetHeader("Content-Type", "application/x-thrift") + config.transport = trans config.protoFactory = thrift.NewTJSONProtocolFactory() } else if config.binTransport { - trans, err := defaultTTransport(url, config.timeoutms) + trans, err := newTBinTransport(url, config.timeoutms) if err != nil { return nil, errors.Wrap(err, "Error creating realis") } - httpTrans := (trans).(*thrift.THttpClient) - httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary") - httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary") - httpTrans.SetHeader("User-Agent", "GoRealis v1.0.4") config.transport = trans config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault() } @@ -182,6 +203,9 @@ func NewRealisClient(options ...option) (Realis, error) { //Set defaultBackoff if required. if config.backoff == nil { config.backoff = &defaultBackoff + } else { + defaultBackoff = *config.backoff + fmt.Printf(" updating default backoff : %+v\n", *config.backoff) } fmt.Printf("gorealis config: %+v\n", config) @@ -414,23 +438,17 @@ func (r *realisClient) ReestablishConn() error { } fmt.Println("ReestablishConn url: ", url) if r.config.jsonTransport { - trans, err := defaultTTransport(url, r.config.timeoutms) + trans, err := newTJSONTransport(url, r.config.timeoutms) if err != nil { return errors.Wrap(err, "Error creating realis") } - httpTrans := (trans).(*thrift.THttpClient) - httpTrans.SetHeader("Content-Type", "application/x-thrift") r.config.transport = trans r.config.protoFactory = thrift.NewTJSONProtocolFactory() } else if r.config.binTransport { - trans, err := defaultTTransport(url, r.config.timeoutms) + trans, err := newTBinTransport(url, r.config.timeoutms) if err != nil { return errors.Wrap(err, "Error creating realis") } - httpTrans := (trans).(*thrift.THttpClient) - httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary") - httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary") - httpTrans.SetHeader("User-Agent", "GoRealis v1.0.4") r.config.transport = trans r.config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault() } @@ -445,23 +463,17 @@ func (r *realisClient) ReestablishConn() error { //Re-establish using scheduler url. fmt.Println("ReestablishConn url: ", r.config.url) if r.config.jsonTransport { - trans, err := defaultTTransport(r.config.url, r.config.timeoutms) + trans, err := newTJSONTransport(url, r.config.timeoutms) if err != nil { return errors.Wrap(err, "Error creating realis") } - httpTrans := (trans).(*thrift.THttpClient) - httpTrans.SetHeader("Content-Type", "application/x-thrift") r.config.transport = trans r.config.protoFactory = thrift.NewTJSONProtocolFactory() } else if r.config.binTransport { - trans, err := defaultTTransport(r.config.url, r.config.timeoutms) + trans, err := newTBinTransport(url, r.config.timeoutms) if err != nil { return errors.Wrap(err, "Error creating realis") } - httpTrans := (trans).(*thrift.THttpClient) - httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary") - httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary") - httpTrans.SetHeader("User-Agent", "GoRealis v1.0.4") r.config.transport = trans r.config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault() } @@ -496,7 +508,6 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche var resp *aurora.Response var err error - fmt.Printf(" config: %+v\n", r.config) defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { @@ -615,7 +626,6 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { - fmt.Println(" STEPS: ", i) if i != 0 { adjusted := duration if defaultBackoff.Jitter > 0.0 { diff --git a/zk.go b/zk.go index a301fcd..1c0c61c 100644 --- a/zk.go +++ b/zk.go @@ -17,11 +17,12 @@ package realis import ( "encoding/json" "fmt" - "github.com/pkg/errors" - "github.com/samuel/go-zookeeper/zk" "strconv" "strings" "time" + + "github.com/pkg/errors" + "github.com/samuel/go-zookeeper/zk" ) type Endpoint struct { @@ -43,6 +44,33 @@ func (NoopLogger) Printf(format string, a ...interface{}) { // Loads leader from ZK endpoint. func LeaderFromZK(cluster Cluster) (string, error) { + var err error + var zkurl string + + duration := defaultBackoff.Duration + for i := 0; i < defaultBackoff.Steps; i++ { + if i != 0 { + adjusted := duration + if defaultBackoff.Jitter > 0.0 { + adjusted = Jitter(duration, defaultBackoff.Jitter) + } + fmt.Println(" sleeping for: ", adjusted) + time.Sleep(adjusted) + duration = time.Duration(float64(duration) * defaultBackoff.Factor) + } + if zkurl, err = leaderFromZK(cluster); err == nil { + return zkurl, err + } + if err != nil { + fmt.Println("error in LeaderFromZK: ", err) + } + } + + return "", err +} + +func leaderFromZK(cluster Cluster) (string, error) { + endpoints := strings.Split(cluster.ZK, ",") //TODO (rdelvalle): When enabling debugging, change logger here @@ -92,4 +120,5 @@ func LeaderFromZK(cluster Cluster) (string, error) { } return "", errors.New("No leader found") + }