From 8f505815d5e272e9afab6f90d839f38a1c9cc4da Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Thu, 23 Mar 2017 19:08:15 -0400 Subject: [PATCH 1/6] Updating wording. Removed vendoring section and some superfluous info. --- README.md | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index cd6af0f..ccc3c18 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,10 @@ # gorealis [![GoDoc](https://godoc.org/github.com/rdelval/gorealis?status.svg)](https://godoc.org/github.com/rdelval/gorealis) -Go library for communicating with [Apache Aurora](https://github.com/apache/aurora). -Named after the northern lights (Aurora Borealis). +Go library for interacting with [Apache Aurora](https://github.com/apache/aurora). ### Aurora version compatibility Please see [.auroraversion](./.auroraversion) to see the latest Aurora version against which this -library has been tested. Vendoring a working version of this library is highly recommended. +library has been tested. ## Usage @@ -17,9 +16,5 @@ library has been tested. Vendoring a working version of this library is highly r * Create or import a custom transport that uses https://github.com/jmcvetta/napping to improve efficiency * End to end testing with Vagrant setup -## Importing -* We suggest using a vendoring tool such as [govendor](https://github.com/kardianos/govendor) and -fetching by version, for example: `govendor fetch github.com/rdelval/gorealis@v1` - ## Contributions -Contributions are very much welcome. Please raise an issue so that the contribution may be discussed before it's made. \ No newline at end of file +Contributions are always welcome. Please raise an issue so that the contribution may be discussed before it's made. \ No newline at end of file From cb6100e6909eac53e61151513b5654a0b5955f86 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Thu, 23 Mar 2017 19:18:23 -0400 Subject: [PATCH 2/6] Reverted change made to vendored thrift library which caused some issues when vendoring gorealis itself. Added a workaround to overcome the default httpPostClient in the thrift library defaulting to headers for the thrift JSON protocol --- realis.go | 7 ++++++- .../git.apache.org/thrift.git/lib/go/thrift/http_client.go | 4 ++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/realis.go b/realis.go index 339f07e..87b904c 100644 --- a/realis.go +++ b/realis.go @@ -31,6 +31,8 @@ import ( "github.com/rdelval/gorealis/response" ) +const VERSION = "1.0.4" + type Realis interface { AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) @@ -225,6 +227,7 @@ func newTJSONConfig(url string, timeoutms int) (*RealisConfig, error) { httpTrans := (trans).(*thrift.THttpClient) httpTrans.SetHeader("Content-Type", "application/x-thrift") + httpTrans.SetHeader("User-Agent", "GoRealis v"+VERSION) return &RealisConfig{transport: trans, protoFactory: thrift.NewTJSONProtocolFactory()}, nil } @@ -237,9 +240,11 @@ func newTBinaryConfig(url string, timeoutms int) (*RealisConfig, error) { } httpTrans := (trans).(*thrift.THttpClient) + httpTrans.DelHeader("Content-Type") // Workaround for using thrift HttpPostClient + httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary") httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary") - httpTrans.SetHeader("User-Agent", "GoRealis v1.0.4") + httpTrans.SetHeader("User-Agent", "GoRealis v"+VERSION) return &RealisConfig{transport: trans, protoFactory: thrift.NewTBinaryProtocolFactoryDefault()}, nil diff --git a/vendor/git.apache.org/thrift.git/lib/go/thrift/http_client.go b/vendor/git.apache.org/thrift.git/lib/go/thrift/http_client.go index f47b3cf..88eb2c1 100644 --- a/vendor/git.apache.org/thrift.git/lib/go/thrift/http_client.go +++ b/vendor/git.apache.org/thrift.git/lib/go/thrift/http_client.go @@ -103,7 +103,7 @@ func NewTHttpClientWithOptions(urlstr string, options THttpClientOptions) (TTran if client == nil { client = DefaultHttpClient } - httpHeader := map[string][]string{} + httpHeader := map[string][]string{"Content-Type": []string{"application/x-thrift"}} return &THttpClient{client: client, response: response, url: parsedURL, header: httpHeader}, nil } @@ -121,7 +121,7 @@ func NewTHttpPostClientWithOptions(urlstr string, options THttpClientOptions) (T if client == nil { client = DefaultHttpClient } - httpHeader := map[string][]string{} + httpHeader := map[string][]string{"Content-Type": []string{"application/x-thrift"}} return &THttpClient{client: client, url: parsedURL, requestBuffer: bytes.NewBuffer(buf), header: httpHeader}, nil } From d27d8a47062e9678ca405be9c36244e786fdde69 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Thu, 23 Mar 2017 20:44:45 -0400 Subject: [PATCH 3/6] Updated end to end test on vagrant images to reflect new client creation. --- realis_e2e_test.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/realis_e2e_test.go b/realis_e2e_test.go index bbcda61..465e085 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -28,8 +28,10 @@ var r Realis var thermosPayload []byte func TestMain(m *testing.M) { + var err error + // New configuration to connect to Vagrant image - config, err := NewDefaultConfig("http://192.168.33.7:8081",10000) + r, err = NewDefaultClientUsingUrl("http://192.168.33.7:8081","aurora", "secret") if err != nil { fmt.Println("Please run vagrant box before running test suite") os.Exit(1) @@ -41,10 +43,6 @@ func TestMain(m *testing.M) { os.Exit(1) } - // Configured for vagrant - AddBasicAuth(&config, "aurora", "secret") - r = NewClient(config) - os.Exit(m.Run()) } From b10df0603e8e9c696d04de0e0554d8911c253f1b Mon Sep 17 00:00:00 2001 From: Kumar Krishna Date: Thu, 30 Mar 2017 18:17:21 -0700 Subject: [PATCH 4/6] gorealis config refactoring --- examples/client.go | 19 +++- monitors.go | 9 +- realis.go | 241 ++++++++++++++++++++++++++++++++++++++++----- 3 files changed, 241 insertions(+), 28 deletions(-) diff --git a/examples/client.go b/examples/client.go index 00f4159..357886b 100644 --- a/examples/client.go +++ b/examples/client.go @@ -20,6 +20,8 @@ import ( "io/ioutil" "os" + "time" + "github.com/rdelval/gorealis" "github.com/rdelval/gorealis/gen-go/apache/aurora" "github.com/rdelval/gorealis/response" @@ -63,6 +65,13 @@ func main() { var monitor *realis.Monitor var r realis.Realis + var defaultBackoff = &realis.Backoff{ + Steps: 5, + Duration: 5 * time.Second, + Factor: 2.0, + Jitter: 0.1, + } + //check if zkUrl is available. if *zkUrl != "" { fmt.Println("zkUrl: ", *zkUrl) @@ -75,19 +84,25 @@ func main() { } fmt.Printf("cluster: %+v \n", cluster) - r, err = realis.NewDefaultClientUsingCluster(cluster, *username, *password) + r, err = realis.NewRealisClient(realis.ZKCluster(cluster), realis.BasicAuth(*username, *password), realis.ThriftJSON(), realis.TimeoutMS(15000)) if err != nil { fmt.Println(err) os.Exit(1) } + //r, err = realis.NewDefaultClientUsingCluster(cluster, *username, *password) + //if err != nil { + // fmt.Println(err) + // os.Exit(1) + //} monitor = &realis.Monitor{r} } else { - r, err = realis.NewDefaultClientUsingUrl(*url, *username, *password) + r, err = realis.NewRealisClient(realis.SchedulerUrl(*url), realis.BasicAuth(*username, *password), realis.ThriftJSON(), realis.TimeoutMS(20000), realis.BackOff(defaultBackoff)) if err != nil { fmt.Println(err) os.Exit(1) } + monitor = &realis.Monitor{r} } defer r.Close() diff --git a/monitors.go b/monitors.go index d7f863f..d6c1c7e 100644 --- a/monitors.go +++ b/monitors.go @@ -36,12 +36,13 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout Limit: 1, } - duration := defaultBackoff.Duration + defaultBackoff := m.Client.RealisConfig().backoff + duration := defaultBackoff.Duration //defaultBackoff.Duration var err error var respDetail *aurora.Response for i := 0; i*interval <= timeout; i++ { - for i := 0; i < defaultBackoff.Steps; i++ { + for step := 0; step < defaultBackoff.Steps; step++ { if i != 0 { adjusted := duration if defaultBackoff.Jitter > 0.0 { @@ -94,12 +95,14 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout } func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval int, timeout int) (bool, error) { + + defaultBackoff := m.Client.RealisConfig().backoff duration := defaultBackoff.Duration var err error var live map[int32]bool for i := 0; i*interval < timeout; i++ { - for i := 0; i < defaultBackoff.Steps; i++ { + for step := 0; step < defaultBackoff.Steps; step++ { if i != 0 { adjusted := duration if defaultBackoff.Jitter > 0.0 { diff --git a/realis.go b/realis.go index 339f07e..120b026 100644 --- a/realis.go +++ b/realis.go @@ -51,6 +51,7 @@ type Realis interface { StartCronJob(key *aurora.JobKey) (*aurora.Response, error) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error) ReestablishConn() error + RealisConfig() *RealisConfig Close() } @@ -60,13 +61,140 @@ type realisClient struct { readonlyClient *aurora.ReadOnlySchedulerClient } +type option func(*RealisConfig) + +//Config sets for options in RealisConfig. +func BasicAuth(username, password string) option { + + return func(config *RealisConfig) { + config.username = username + config.password = password + } +} + +func SchedulerUrl(url string) option { + return func(config *RealisConfig) { + config.url = url + } +} + +func TimeoutMS(timeout int) option { + return func(config *RealisConfig) { + config.timeoutms = timeout + } +} + +func ZKCluster(cluster *Cluster) option { + return func(config *RealisConfig) { + config.cluster = cluster + } +} + +func Retries(backoff *Backoff) option { + return func(config *RealisConfig) { + config.backoff = backoff + } +} + +func ThriftJSON() option { + return func(config *RealisConfig) { + config.jsonTransport = true + } +} + +func ThriftBinary() option { + return func(config *RealisConfig) { + config.binTransport = true + } +} + +func BackOff(b *Backoff) option { + return func(config *RealisConfig) { + config.backoff = b + } +} + +func NewRealisClient(options ...option) (Realis, error) { + config := &RealisConfig{} + fmt.Println(" options length: ", options) + for _, opt := range options { + opt(config) + } + //Default timeout + if config.timeoutms == 0 { + config.timeoutms = 10000 + } + //Set default Transport to JSON if needed. + if !config.jsonTransport && !config.binTransport { + config.jsonTransport = true + } + var url string + var err error + //Cluster or URL? + if config.cluster != nil { + url, err = LeaderFromZK(*config.cluster) + if err != nil { + fmt.Errorf("LeaderFromZK error: %+v\n ", err) + } + fmt.Println("schedURLFromZK: ", url) + } else if config.url != "" { + fmt.Println("Scheduler URL: ", config.url) + url = config.url + } else { + return nil, errors.New("Incomplete Options -- url or cluster required") + } + + if config.jsonTransport { + trans, err := defaultTTransport(url, config.timeoutms) + if err != nil { + return nil, errors.Wrap(err, "Error creating realis") + } + httpTrans := (trans).(*thrift.THttpClient) + httpTrans.SetHeader("Content-Type", "application/x-thrift") + config.transport = trans + config.protoFactory = thrift.NewTJSONProtocolFactory() + } else if config.binTransport { + trans, err := defaultTTransport(url, config.timeoutms) + if err != nil { + return nil, errors.Wrap(err, "Error creating realis") + } + httpTrans := (trans).(*thrift.THttpClient) + httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary") + httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary") + httpTrans.SetHeader("User-Agent", "GoRealis v1.0.4") + config.transport = trans + config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault() + } + + //Basic Authentication. + if config.username != "" && config.password != "" { + AddBasicAuth(config, config.username, config.password) + } + + //Set defaultBackoff if required. + if config.backoff == nil { + config.backoff = &defaultBackoff + } + + fmt.Printf("gorealis config: %+v\n", config) + + return &realisClient{ + config: config, + client: aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory), + readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory)}, nil + +} + // Wrapper object to provide future flexibility type RealisConfig struct { - username, password string - url string - cluster *Cluster - transport thrift.TTransport - protoFactory thrift.TProtocolFactory + username, password string + url string + timeoutms int + binTransport, jsonTransport bool + cluster *Cluster + backoff *Backoff + transport thrift.TTransport + protoFactory thrift.TProtocolFactory } type Backoff struct { @@ -263,34 +391,74 @@ func (r *realisClient) ReestablishConn() error { fmt.Println("ReestablishConn begin ....") r.Close() //First check cluster object for re-establish; if not available then try with scheduler url. + //var config *RealisConfig + var err error + var url string + if r.config.cluster != nil && r.config.username != "" && r.config.password != "" { //Re-establish using cluster object. - url, err := LeaderFromZK(*r.config.cluster) + url, err = LeaderFromZK(*r.config.cluster) if err != nil { fmt.Errorf("LeaderFromZK error: %+v\n ", err) } fmt.Println("ReestablishConn url: ", url) - config, err := newDefaultConfig(url, 10000) + if r.config.jsonTransport { + trans, err := defaultTTransport(url, r.config.timeoutms) + if err != nil { + return errors.Wrap(err, "Error creating realis") + } + httpTrans := (trans).(*thrift.THttpClient) + httpTrans.SetHeader("Content-Type", "application/x-thrift") + r.config.transport = trans + r.config.protoFactory = thrift.NewTJSONProtocolFactory() + } else if r.config.binTransport { + trans, err := defaultTTransport(url, r.config.timeoutms) + if err != nil { + return errors.Wrap(err, "Error creating realis") + } + httpTrans := (trans).(*thrift.THttpClient) + httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary") + httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary") + httpTrans.SetHeader("User-Agent", "GoRealis v1.0.4") + r.config.transport = trans + r.config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault() + } if err != nil { - fmt.Println(err) + fmt.Println("error creating config: ", err) } // Configured for basic-auth - AddBasicAuth(config, r.config.username, r.config.password) - config.cluster = r.config.cluster - r.config = config - r.client = aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory) - r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory) + AddBasicAuth(r.config, r.config.username, r.config.password) + //config.cluster = r.config.cluster + //r.config = config + r.client = aurora.NewAuroraSchedulerManagerClientFactory(r.config.transport, r.config.protoFactory) + r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(r.config.transport, r.config.protoFactory) } else if r.config.url != "" && r.config.username != "" && r.config.password != "" { //Re-establish using scheduler url. - //Create new configuration with default transport layer - config, err := newDefaultConfig(r.config.url, 10000) - if err != nil { - fmt.Println(err) + fmt.Println("ReestablishConn url: ", r.config.url) + if r.config.jsonTransport { + trans, err := defaultTTransport(r.config.url, r.config.timeoutms) + if err != nil { + return errors.Wrap(err, "Error creating realis") + } + httpTrans := (trans).(*thrift.THttpClient) + httpTrans.SetHeader("Content-Type", "application/x-thrift") + r.config.transport = trans + r.config.protoFactory = thrift.NewTJSONProtocolFactory() + } else if r.config.binTransport { + trans, err := defaultTTransport(r.config.url, r.config.timeoutms) + if err != nil { + return errors.Wrap(err, "Error creating realis") + } + httpTrans := (trans).(*thrift.THttpClient) + httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary") + httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary") + httpTrans.SetHeader("User-Agent", "GoRealis v1.0.4") + r.config.transport = trans + r.config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault() } - AddBasicAuth(config, r.config.username, r.config.password) - r.config = config - r.client = aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory) - r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory) + AddBasicAuth(r.config, r.config.username, r.config.password) + r.client = aurora.NewAuroraSchedulerManagerClientFactory(r.config.transport, r.config.protoFactory) + r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(r.config.transport, r.config.protoFactory) } else { fmt.Println(" Missing Data for ReestablishConn ") fmt.Println(" r.config.cluster: ", r.config.cluster) @@ -299,6 +467,7 @@ func (r *realisClient) ReestablishConn() error { fmt.Println(" r.config.url: ", r.config.url) return errors.New(" Missing Data for ReestablishConn ") } + fmt.Printf(" config before return: %+v\n", r.config) return nil } @@ -318,6 +487,8 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche var resp *aurora.Response var err error + fmt.Printf(" config: %+v\n", r.config) + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -356,6 +527,7 @@ func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -385,6 +557,8 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a instanceIds := make(map[int32]bool) var resp *aurora.Response var err error + + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for _, instId := range instances { @@ -413,6 +587,10 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a return nil, errors.Wrap(err, "Error sending Kill command to Aurora Scheduler") } +func (r *realisClient) RealisConfig() *RealisConfig { + return r.config +} + // Sends a kill message to the scheduler for all active tasks under a job. func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { @@ -425,9 +603,10 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { } if len(instanceIds) > 0 { - + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { + fmt.Println(" STEPS: ", i) if i != 0 { adjusted := duration if defaultBackoff.Jitter > 0.0 { @@ -459,9 +638,11 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { + fmt.Println(" STEPS: ", i) adjusted := duration if defaultBackoff.Jitter > 0.0 { adjusted = Jitter(duration, defaultBackoff.Jitter) @@ -470,10 +651,11 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { time.Sleep(adjusted) duration = time.Duration(float64(duration) * defaultBackoff.Factor) } - + fmt.Println(" calling CreateJob") if resp, err = r.client.CreateJob(auroraJob.JobConfig()); err == nil { return response.ResponseCodeCheck(resp) } + fmt.Println("CreateJob err: %+v\n", err) err1 := r.ReestablishConn() if err1 != nil { fmt.Println("error in ReestablishConn: ", err1) @@ -486,6 +668,7 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -514,6 +697,7 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -542,6 +726,7 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -575,6 +760,8 @@ func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) } var resp *aurora.Response var err error + + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -610,6 +797,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) var resp *aurora.Response var err error if len(instanceIds) > 0 { + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -644,6 +832,7 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -673,6 +862,7 @@ func (r *realisClient) AbortJobUpdate( var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -702,6 +892,7 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -750,6 +941,7 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.S var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -795,6 +987,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -844,6 +1037,7 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -870,6 +1064,7 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { From 48ca520eaa6f913b3642247e7f73fbc8a86756f8 Mon Sep 17 00:00:00 2001 From: Kumar Krishna Date: Fri, 31 Mar 2017 10:48:36 -0700 Subject: [PATCH 5/6] realisconfig refactor --- examples/client.go | 12 ++++-------- realis.go | 8 ++++++-- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/examples/client.go b/examples/client.go index 357886b..078ff8c 100644 --- a/examples/client.go +++ b/examples/client.go @@ -66,8 +66,8 @@ func main() { var r realis.Realis var defaultBackoff = &realis.Backoff{ - Steps: 5, - Duration: 5 * time.Second, + Steps: 2, + Duration: 10 * time.Second, Factor: 2.0, Jitter: 0.1, } @@ -84,16 +84,12 @@ func main() { } fmt.Printf("cluster: %+v \n", cluster) - r, err = realis.NewRealisClient(realis.ZKCluster(cluster), realis.BasicAuth(*username, *password), realis.ThriftJSON(), realis.TimeoutMS(15000)) + //r, err = realis.NewRealisClient(realis.ZKCluster(cluster), realis.BasicAuth(*username, *password), realis.ThriftJSON(), realis.TimeoutMS(15000)) + r, err = realis.NewRealisClient(realis.ZKUrl(*zkUrl), realis.BasicAuth(*username, *password), realis.ThriftJSON(), realis.TimeoutMS(15000), realis.BackOff(defaultBackoff)) if err != nil { fmt.Println(err) os.Exit(1) } - //r, err = realis.NewDefaultClientUsingCluster(cluster, *username, *password) - //if err != nil { - // fmt.Println(err) - // os.Exit(1) - //} monitor = &realis.Monitor{r} } else { diff --git a/realis.go b/realis.go index 0e47340..ac78408 100644 --- a/realis.go +++ b/realis.go @@ -92,6 +92,12 @@ func ZKCluster(cluster *Cluster) option { } } +func ZKUrl(url string) option { + return func(config *RealisConfig) { + config.cluster = GetDefaultClusterFromZKUrl(url) + } +} + func Retries(backoff *Backoff) option { return func(config *RealisConfig) { config.backoff = backoff @@ -433,8 +439,6 @@ func (r *realisClient) ReestablishConn() error { } // Configured for basic-auth AddBasicAuth(r.config, r.config.username, r.config.password) - //config.cluster = r.config.cluster - //r.config = config r.client = aurora.NewAuroraSchedulerManagerClientFactory(r.config.transport, r.config.protoFactory) r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(r.config.transport, r.config.protoFactory) } else if r.config.url != "" && r.config.username != "" && r.config.password != "" { From e57dc98d65df0ec1dbb53b41904e3d2e4b0ee5cd Mon Sep 17 00:00:00 2001 From: Kumar Krishna Date: Thu, 6 Apr 2017 23:15:44 -0700 Subject: [PATCH 6/6] add resiliency for LeaderFromZK and other fixes . --- monitors.go | 4 ++-- realis.go | 62 +++++++++++++++++++++++++++++++---------------------- zk.go | 33 ++++++++++++++++++++++++++-- 3 files changed, 69 insertions(+), 30 deletions(-) diff --git a/monitors.go b/monitors.go index d6c1c7e..96face0 100644 --- a/monitors.go +++ b/monitors.go @@ -43,7 +43,7 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout for i := 0; i*interval <= timeout; i++ { for step := 0; step < defaultBackoff.Steps; step++ { - if i != 0 { + if step != 0 { adjusted := duration if defaultBackoff.Jitter > 0.0 { adjusted = Jitter(duration, defaultBackoff.Jitter) @@ -103,7 +103,7 @@ func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval int, t for i := 0; i*interval < timeout; i++ { for step := 0; step < defaultBackoff.Steps; step++ { - if i != 0 { + if step != 0 { adjusted := duration if defaultBackoff.Jitter > 0.0 { adjusted = Jitter(duration, defaultBackoff.Jitter) diff --git a/realis.go b/realis.go index ac78408..0907699 100644 --- a/realis.go +++ b/realis.go @@ -122,6 +122,32 @@ func BackOff(b *Backoff) option { } } +func newTJSONTransport(url string, timeout int) (thrift.TTransport, error) { + + trans, err := defaultTTransport(url, timeout) + if err != nil { + return nil, errors.Wrap(err, "Error creating realis") + } + httpTrans := (trans).(*thrift.THttpClient) + httpTrans.SetHeader("Content-Type", "application/x-thrift") + httpTrans.SetHeader("User-Agent", "GoRealis v"+VERSION) + return trans, err +} + +func newTBinTransport(url string, timeout int) (thrift.TTransport, error) { + trans, err := defaultTTransport(url, timeout) + if err != nil { + return nil, errors.Wrap(err, "Error creating realis") + } + httpTrans := (trans).(*thrift.THttpClient) + httpTrans.DelHeader("Content-Type") // Workaround for using thrift HttpPostClient + httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary") + httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary") + httpTrans.SetHeader("User-Agent", "GoRealis v"+VERSION) + + return trans, err +} + func NewRealisClient(options ...option) (Realis, error) { config := &RealisConfig{} fmt.Println(" options length: ", options) @@ -153,23 +179,18 @@ func NewRealisClient(options ...option) (Realis, error) { } if config.jsonTransport { - trans, err := defaultTTransport(url, config.timeoutms) + trans, err := newTJSONTransport(url, config.timeoutms) if err != nil { return nil, errors.Wrap(err, "Error creating realis") } - httpTrans := (trans).(*thrift.THttpClient) - httpTrans.SetHeader("Content-Type", "application/x-thrift") + config.transport = trans config.protoFactory = thrift.NewTJSONProtocolFactory() } else if config.binTransport { - trans, err := defaultTTransport(url, config.timeoutms) + trans, err := newTBinTransport(url, config.timeoutms) if err != nil { return nil, errors.Wrap(err, "Error creating realis") } - httpTrans := (trans).(*thrift.THttpClient) - httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary") - httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary") - httpTrans.SetHeader("User-Agent", "GoRealis v1.0.4") config.transport = trans config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault() } @@ -182,6 +203,9 @@ func NewRealisClient(options ...option) (Realis, error) { //Set defaultBackoff if required. if config.backoff == nil { config.backoff = &defaultBackoff + } else { + defaultBackoff = *config.backoff + fmt.Printf(" updating default backoff : %+v\n", *config.backoff) } fmt.Printf("gorealis config: %+v\n", config) @@ -414,23 +438,17 @@ func (r *realisClient) ReestablishConn() error { } fmt.Println("ReestablishConn url: ", url) if r.config.jsonTransport { - trans, err := defaultTTransport(url, r.config.timeoutms) + trans, err := newTJSONTransport(url, r.config.timeoutms) if err != nil { return errors.Wrap(err, "Error creating realis") } - httpTrans := (trans).(*thrift.THttpClient) - httpTrans.SetHeader("Content-Type", "application/x-thrift") r.config.transport = trans r.config.protoFactory = thrift.NewTJSONProtocolFactory() } else if r.config.binTransport { - trans, err := defaultTTransport(url, r.config.timeoutms) + trans, err := newTBinTransport(url, r.config.timeoutms) if err != nil { return errors.Wrap(err, "Error creating realis") } - httpTrans := (trans).(*thrift.THttpClient) - httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary") - httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary") - httpTrans.SetHeader("User-Agent", "GoRealis v1.0.4") r.config.transport = trans r.config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault() } @@ -445,23 +463,17 @@ func (r *realisClient) ReestablishConn() error { //Re-establish using scheduler url. fmt.Println("ReestablishConn url: ", r.config.url) if r.config.jsonTransport { - trans, err := defaultTTransport(r.config.url, r.config.timeoutms) + trans, err := newTJSONTransport(url, r.config.timeoutms) if err != nil { return errors.Wrap(err, "Error creating realis") } - httpTrans := (trans).(*thrift.THttpClient) - httpTrans.SetHeader("Content-Type", "application/x-thrift") r.config.transport = trans r.config.protoFactory = thrift.NewTJSONProtocolFactory() } else if r.config.binTransport { - trans, err := defaultTTransport(r.config.url, r.config.timeoutms) + trans, err := newTBinTransport(url, r.config.timeoutms) if err != nil { return errors.Wrap(err, "Error creating realis") } - httpTrans := (trans).(*thrift.THttpClient) - httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary") - httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary") - httpTrans.SetHeader("User-Agent", "GoRealis v1.0.4") r.config.transport = trans r.config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault() } @@ -496,7 +508,6 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche var resp *aurora.Response var err error - fmt.Printf(" config: %+v\n", r.config) defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { @@ -615,7 +626,6 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { - fmt.Println(" STEPS: ", i) if i != 0 { adjusted := duration if defaultBackoff.Jitter > 0.0 { diff --git a/zk.go b/zk.go index a301fcd..1c0c61c 100644 --- a/zk.go +++ b/zk.go @@ -17,11 +17,12 @@ package realis import ( "encoding/json" "fmt" - "github.com/pkg/errors" - "github.com/samuel/go-zookeeper/zk" "strconv" "strings" "time" + + "github.com/pkg/errors" + "github.com/samuel/go-zookeeper/zk" ) type Endpoint struct { @@ -43,6 +44,33 @@ func (NoopLogger) Printf(format string, a ...interface{}) { // Loads leader from ZK endpoint. func LeaderFromZK(cluster Cluster) (string, error) { + var err error + var zkurl string + + duration := defaultBackoff.Duration + for i := 0; i < defaultBackoff.Steps; i++ { + if i != 0 { + adjusted := duration + if defaultBackoff.Jitter > 0.0 { + adjusted = Jitter(duration, defaultBackoff.Jitter) + } + fmt.Println(" sleeping for: ", adjusted) + time.Sleep(adjusted) + duration = time.Duration(float64(duration) * defaultBackoff.Factor) + } + if zkurl, err = leaderFromZK(cluster); err == nil { + return zkurl, err + } + if err != nil { + fmt.Println("error in LeaderFromZK: ", err) + } + } + + return "", err +} + +func leaderFromZK(cluster Cluster) (string, error) { + endpoints := strings.Split(cluster.ZK, ",") //TODO (rdelvalle): When enabling debugging, change logger here @@ -92,4 +120,5 @@ func LeaderFromZK(cluster Cluster) (string, error) { } return "", errors.New("No leader found") + }