diff --git a/errors.go b/errors.go new file mode 100644 index 0000000..242176e --- /dev/null +++ b/errors.go @@ -0,0 +1,69 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package realis + +// Using a pattern described by Dave Cheney to differentiate errors +// https://dave.cheney.net/2016/04/27/dont-just-check-errors-handle-them-gracefully +type timeout interface { + Timeout() bool +} + +func IsTimeout(err error) bool { + temp, ok := err.(timeout) + return ok && temp.Timeout() +} + +type TimeoutErr struct { + error + timeout bool +} + +func (t *TimeoutErr) Timeout() bool { + return t.timeout +} + +func NewTimeoutError(err error) *TimeoutErr { + return &TimeoutErr{error: err, timeout: true} +} + +type temporary interface { + Temporary() bool +} + +func IsTemporary(err error) bool { + temp, ok := err.(temporary) + return ok && temp.Temporary() +} + +type TemporaryErr struct { + error + temporary bool +} + +func (t *TemporaryErr) Temporary() bool { + return t.temporary +} + +// Retrying after receiving this error is advised +func NewTemporaryError(err error) *TemporaryErr { + return &TemporaryErr{error: err, temporary: true} +} + +// Nothing can be done about this error +func NewPermamentError(err error) TemporaryErr { + return TemporaryErr{error: err, temporary: false} +} diff --git a/realis.go b/realis.go index 6e55948..a1669c6 100644 --- a/realis.go +++ b/realis.go @@ -87,6 +87,7 @@ type RealisConfig struct { InsecureSkipVerify bool certspath string clientkey, clientcert string + options []ClientOption } type Backoff struct { @@ -218,6 +219,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { config.timeoutms = 10000 config.backoff = &defaultBackoff config.logger = NoopLogger{} + config.options = options // Override default configs where necessary for _, opt := range options { @@ -240,7 +242,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { url, err = LeaderFromZK(*config.cluster) // If ZK is configured, throw an error if the leader is unable to be determined if err != nil { - return nil, errors.Wrap(err, "LeaderFromZK error") + return nil, NewTemporaryError(errors.Wrap(err, "LeaderFromZK error")) } config.logger.Println("Scheduler URL from ZK: ", url) } else if config.url != "" { @@ -253,7 +255,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { if config.jsonTransport { trans, err := newTJSONTransport(url, config.timeoutms, config) if err != nil { - return nil, errors.Wrap(err, "Error creating realis") + return nil, NewTemporaryError(errors.Wrap(err, "Error creating realis")) } config.transport = trans config.protoFactory = thrift.NewTJSONProtocolFactory() @@ -261,7 +263,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { } else if config.binTransport { trans, err := newTBinTransport(url, config.timeoutms, config) if err != nil { - return nil, errors.Wrap(err, "Error creating realis") + return nil, NewTemporaryError(errors.Wrap(err, "Error creating realis")) } config.transport = trans config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault() @@ -427,75 +429,26 @@ func basicAuth(username, password string) string { } func (r *realisClient) ReestablishConn() error { - //close existing connection.. + // Close existing connection r.logger.Println("ReestablishConn begin ....") r.Close() - //First check cluster object for re-establish; if not available then try with scheduler url. - //var config *RealisConfig - var err error - var url string - if r.config.cluster != nil && r.config.username != "" && r.config.password != "" { - //Re-establish using cluster object. - url, err = LeaderFromZK(*r.config.cluster) - if err != nil { - r.config.logger.Println("LeaderFromZK error: %+v\n ", err) - } - r.logger.Println("ReestablishConn url: ", url) - if r.config.jsonTransport { - trans, err := newTJSONTransport(url, r.config.timeoutms, r.config) - if err != nil { - return errors.Wrap(err, "Error creating realis") - } - r.config.transport = trans - r.config.protoFactory = thrift.NewTJSONProtocolFactory() - } else if r.config.binTransport { - trans, err := newTBinTransport(url, r.config.timeoutms, r.config) - if err != nil { - return errors.Wrap(err, "Error creating realis") - } - r.config.transport = trans - r.config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault() - } - if err != nil { - r.logger.Println("error creating config: ", err) - } - // Configured for basic-auth - AddBasicAuth(r.config, r.config.username, r.config.password) - r.client = aurora.NewAuroraSchedulerManagerClientFactory(r.config.transport, r.config.protoFactory) - r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(r.config.transport, r.config.protoFactory) - r.adminClient = aurora.NewAuroraAdminClientFactory(r.config.transport, r.config.protoFactory) - } else if r.config.url != "" && r.config.username != "" && r.config.password != "" { - //Re-establish using scheduler url. - r.logger.Println("ReestablishConn url: ", r.config.url) - if r.config.jsonTransport { - trans, err := newTJSONTransport(url, r.config.timeoutms, r.config) - if err != nil { - return errors.Wrap(err, "Error creating realis") - } - r.config.transport = trans - r.config.protoFactory = thrift.NewTJSONProtocolFactory() - } else if r.config.binTransport { - trans, err := newTBinTransport(url, r.config.timeoutms, r.config) - if err != nil { - return errors.Wrap(err, "Error creating realis") - } - r.config.transport = trans - r.config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault() - } - AddBasicAuth(r.config, r.config.username, r.config.password) - r.client = aurora.NewAuroraSchedulerManagerClientFactory(r.config.transport, r.config.protoFactory) - r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(r.config.transport, r.config.protoFactory) - r.adminClient = aurora.NewAuroraAdminClientFactory(r.config.transport, r.config.protoFactory) - } else { - r.logger.Println(" Missing Data for ReestablishConn ") - r.logger.Println(" r.config.cluster: ", r.config.cluster) - r.logger.Println(" r.config.username: ", r.config.username) - r.logger.Println(" r.config.passwd: ", r.config.password) - r.logger.Println(" r.config.url: ", r.config.url) - return errors.New(" Missing Data for ReestablishConn ") + // Recreate connection from scratch using original options + newRealis, err := NewRealisClient(r.config.options...) + if err != nil { + return err } - r.logger.Printf(" config url before return: %+v\n", r.config.url) + + // If we are able to successfully re-connect, make receiver + // point to newly established connections. + if newClient, ok := newRealis.(*realisClient); ok { + r.config = newClient.config + r.client = newClient.client + r.readonlyClient = newClient.readonlyClient + r.adminClient = newClient.adminClient + r.logger = newClient.logger + } + return nil } @@ -503,6 +456,7 @@ func (r *realisClient) ReestablishConn() error { func (r *realisClient) Close() { r.client.Transport.Close() r.readonlyClient.Transport.Close() + r.adminClient.Transport.Close() } // Uses predefined set of states to retrieve a set of active jobs in Apache Aurora. diff --git a/realis_e2e_test.go b/realis_e2e_test.go index 735a8b6..64f480f 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -36,14 +36,14 @@ func TestMain(m *testing.M) { // New configuration to connect to Vagrant image r, err = realis.NewRealisClient(realis.SchedulerUrl("http://192.168.33.7:8081"), realis.BasicAuth("aurora", "secret"), - realis.ThriftJSON(), - realis.TimeoutMS(20000), - realis.BackOff(&realis.Backoff{Steps: 2, Duration: 10 * time.Second, Factor: 2.0, Jitter: 0.1})) + realis.TimeoutMS(20000)) if err != nil { fmt.Println("Please run vagrant box before running test suite") os.Exit(1) } + defer r.Close() + // Create monitor monitor = &realis.Monitor{Client: r} @@ -64,6 +64,14 @@ func TestLeaderFromZK(t *testing.T) { assert.Equal(t, "http://aurora.local:8081", url) } +func TestRealisClient_ReestablishConn(t *testing.T) { + + // Test that we're able to tear down the old connection and create a new one. + err := r.ReestablishConn() + + assert.NoError(t, err) +} + func TestGetCacerts(t *testing.T) { certs, err := realis.Getcerts("./examples/certs") assert.NoError(t, err) diff --git a/retry.go b/retry.go index ae1da23..72c1cd1 100644 --- a/retry.go +++ b/retry.go @@ -14,8 +14,6 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Modified version of the Kubernetes exponential-backoff code - package realis import ( @@ -38,6 +36,7 @@ type ConditionFunc func() (done bool, err error) type AuroraThriftCall func() (resp *aurora.Response, err error) +// Modified version of the Kubernetes exponential-backoff code. // ExponentialBackoff repeats a condition check with exponential backoff. // // It checks the condition up to Steps times, increasing the wait by multiplying @@ -59,11 +58,23 @@ func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error { time.Sleep(adjusted) duration = time.Duration(float64(duration) * backoff.Factor) } - if ok, err := condition(); err != nil || ok { - return err + + ok, err := condition() + + // If the function executed says it succeeded, stop retrying + if ok { + return nil } + + // Stop retrying if the error is NOT temporary. + if err != nil { + if !IsTemporary(err) { + return err + } + } + } - return errors.New("Timed out while retrying") + return NewTimeoutError(errors.New("Timed out while retrying")) } // CheckAndRetryConn function takes realis client and a trhift API function to call and returns response and error @@ -71,12 +82,14 @@ func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error { // If Error is retyable return resp and RetryConnErr error. func CheckAndRetryConn(r Realis, auroraCall AuroraThriftCall) (*aurora.Response, error) { resp, cliErr := auroraCall() - if cliErr != nil /*&& (strings.Contains(cliErr.Error(), ConnRefusedErr) || strings.Contains(cliErr.Error(), NoLeaderFoundErr))*/ { + + // TODO: Return different error type based on the error that was returned by the API call + if cliErr != nil { r.ReestablishConn() - return resp, RetryConnErr + return resp, NewPermamentError(RetryConnErr) } if resp != nil && resp.GetResponseCode() == aurora.ResponseCode_ERROR_TRANSIENT { - return resp, RetryConnErr + return resp, NewTemporaryError(errors.New("Aurora scheduler temporarily unavailable")) } - return resp, cliErr + return resp, nil } diff --git a/zk.go b/zk.go index a8a2fe6..195cb3a 100644 --- a/zk.go +++ b/zk.go @@ -48,7 +48,7 @@ func LeaderFromZK(cluster Cluster) (string, error) { //TODO (rdelvalle): When enabling debugging, change logger here c, _, err := zk.Connect(endpoints, time.Second*10, func(c *zk.Conn) { c.SetLogger(NoopLogger{}) }) if err != nil { - return false, errors.Wrap(err, "Failed to connect to Zookeeper at "+cluster.ZK) + return false, NewTemporaryError(errors.Wrap(err, "Failed to connect to Zookeeper at "+cluster.ZK)) } defer c.Close() @@ -73,7 +73,7 @@ func LeaderFromZK(cluster Cluster) (string, error) { err = json.Unmarshal([]byte(data), serviceInst) if err != nil { - return false, errors.Wrap(err, "Unable to unmarshall contents of leader") + return false, NewTemporaryError(errors.Wrap(err, "Unable to unmarshall contents of leader")) } // Should only be one endpoint