Introducing temporary errors. Refactored reestablish connection code … (#50)
* Introducing temporary errors. * Refactored reestablish connection code to use NewClient. * Added reestablish connection test to end to end tests.
This commit is contained in:
parent
1c426dd363
commit
b2ffb73183
5 changed files with 126 additions and 82 deletions
69
errors.go
Normal file
69
errors.go
Normal file
|
@ -0,0 +1,69 @@
|
||||||
|
/*
|
||||||
|
Copyright 2014 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package realis
|
||||||
|
|
||||||
|
// Using a pattern described by Dave Cheney to differentiate errors
|
||||||
|
// https://dave.cheney.net/2016/04/27/dont-just-check-errors-handle-them-gracefully
|
||||||
|
type timeout interface {
|
||||||
|
Timeout() bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func IsTimeout(err error) bool {
|
||||||
|
temp, ok := err.(timeout)
|
||||||
|
return ok && temp.Timeout()
|
||||||
|
}
|
||||||
|
|
||||||
|
type TimeoutErr struct {
|
||||||
|
error
|
||||||
|
timeout bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TimeoutErr) Timeout() bool {
|
||||||
|
return t.timeout
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewTimeoutError(err error) *TimeoutErr {
|
||||||
|
return &TimeoutErr{error: err, timeout: true}
|
||||||
|
}
|
||||||
|
|
||||||
|
type temporary interface {
|
||||||
|
Temporary() bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func IsTemporary(err error) bool {
|
||||||
|
temp, ok := err.(temporary)
|
||||||
|
return ok && temp.Temporary()
|
||||||
|
}
|
||||||
|
|
||||||
|
type TemporaryErr struct {
|
||||||
|
error
|
||||||
|
temporary bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TemporaryErr) Temporary() bool {
|
||||||
|
return t.temporary
|
||||||
|
}
|
||||||
|
|
||||||
|
// Retrying after receiving this error is advised
|
||||||
|
func NewTemporaryError(err error) *TemporaryErr {
|
||||||
|
return &TemporaryErr{error: err, temporary: true}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Nothing can be done about this error
|
||||||
|
func NewPermamentError(err error) TemporaryErr {
|
||||||
|
return TemporaryErr{error: err, temporary: false}
|
||||||
|
}
|
90
realis.go
90
realis.go
|
@ -87,6 +87,7 @@ type RealisConfig struct {
|
||||||
InsecureSkipVerify bool
|
InsecureSkipVerify bool
|
||||||
certspath string
|
certspath string
|
||||||
clientkey, clientcert string
|
clientkey, clientcert string
|
||||||
|
options []ClientOption
|
||||||
}
|
}
|
||||||
|
|
||||||
type Backoff struct {
|
type Backoff struct {
|
||||||
|
@ -218,6 +219,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
|
||||||
config.timeoutms = 10000
|
config.timeoutms = 10000
|
||||||
config.backoff = &defaultBackoff
|
config.backoff = &defaultBackoff
|
||||||
config.logger = NoopLogger{}
|
config.logger = NoopLogger{}
|
||||||
|
config.options = options
|
||||||
|
|
||||||
// Override default configs where necessary
|
// Override default configs where necessary
|
||||||
for _, opt := range options {
|
for _, opt := range options {
|
||||||
|
@ -240,7 +242,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
|
||||||
url, err = LeaderFromZK(*config.cluster)
|
url, err = LeaderFromZK(*config.cluster)
|
||||||
// If ZK is configured, throw an error if the leader is unable to be determined
|
// If ZK is configured, throw an error if the leader is unable to be determined
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "LeaderFromZK error")
|
return nil, NewTemporaryError(errors.Wrap(err, "LeaderFromZK error"))
|
||||||
}
|
}
|
||||||
config.logger.Println("Scheduler URL from ZK: ", url)
|
config.logger.Println("Scheduler URL from ZK: ", url)
|
||||||
} else if config.url != "" {
|
} else if config.url != "" {
|
||||||
|
@ -253,7 +255,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
|
||||||
if config.jsonTransport {
|
if config.jsonTransport {
|
||||||
trans, err := newTJSONTransport(url, config.timeoutms, config)
|
trans, err := newTJSONTransport(url, config.timeoutms, config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "Error creating realis")
|
return nil, NewTemporaryError(errors.Wrap(err, "Error creating realis"))
|
||||||
}
|
}
|
||||||
config.transport = trans
|
config.transport = trans
|
||||||
config.protoFactory = thrift.NewTJSONProtocolFactory()
|
config.protoFactory = thrift.NewTJSONProtocolFactory()
|
||||||
|
@ -261,7 +263,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
|
||||||
} else if config.binTransport {
|
} else if config.binTransport {
|
||||||
trans, err := newTBinTransport(url, config.timeoutms, config)
|
trans, err := newTBinTransport(url, config.timeoutms, config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "Error creating realis")
|
return nil, NewTemporaryError(errors.Wrap(err, "Error creating realis"))
|
||||||
}
|
}
|
||||||
config.transport = trans
|
config.transport = trans
|
||||||
config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault()
|
config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault()
|
||||||
|
@ -427,75 +429,26 @@ func basicAuth(username, password string) string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *realisClient) ReestablishConn() error {
|
func (r *realisClient) ReestablishConn() error {
|
||||||
//close existing connection..
|
// Close existing connection
|
||||||
r.logger.Println("ReestablishConn begin ....")
|
r.logger.Println("ReestablishConn begin ....")
|
||||||
r.Close()
|
r.Close()
|
||||||
//First check cluster object for re-establish; if not available then try with scheduler url.
|
|
||||||
//var config *RealisConfig
|
|
||||||
var err error
|
|
||||||
var url string
|
|
||||||
|
|
||||||
if r.config.cluster != nil && r.config.username != "" && r.config.password != "" {
|
// Recreate connection from scratch using original options
|
||||||
//Re-establish using cluster object.
|
newRealis, err := NewRealisClient(r.config.options...)
|
||||||
url, err = LeaderFromZK(*r.config.cluster)
|
if err != nil {
|
||||||
if err != nil {
|
return err
|
||||||
r.config.logger.Println("LeaderFromZK error: %+v\n ", err)
|
|
||||||
}
|
|
||||||
r.logger.Println("ReestablishConn url: ", url)
|
|
||||||
if r.config.jsonTransport {
|
|
||||||
trans, err := newTJSONTransport(url, r.config.timeoutms, r.config)
|
|
||||||
if err != nil {
|
|
||||||
return errors.Wrap(err, "Error creating realis")
|
|
||||||
}
|
|
||||||
r.config.transport = trans
|
|
||||||
r.config.protoFactory = thrift.NewTJSONProtocolFactory()
|
|
||||||
} else if r.config.binTransport {
|
|
||||||
trans, err := newTBinTransport(url, r.config.timeoutms, r.config)
|
|
||||||
if err != nil {
|
|
||||||
return errors.Wrap(err, "Error creating realis")
|
|
||||||
}
|
|
||||||
r.config.transport = trans
|
|
||||||
r.config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault()
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
r.logger.Println("error creating config: ", err)
|
|
||||||
}
|
|
||||||
// Configured for basic-auth
|
|
||||||
AddBasicAuth(r.config, r.config.username, r.config.password)
|
|
||||||
r.client = aurora.NewAuroraSchedulerManagerClientFactory(r.config.transport, r.config.protoFactory)
|
|
||||||
r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(r.config.transport, r.config.protoFactory)
|
|
||||||
r.adminClient = aurora.NewAuroraAdminClientFactory(r.config.transport, r.config.protoFactory)
|
|
||||||
} else if r.config.url != "" && r.config.username != "" && r.config.password != "" {
|
|
||||||
//Re-establish using scheduler url.
|
|
||||||
r.logger.Println("ReestablishConn url: ", r.config.url)
|
|
||||||
if r.config.jsonTransport {
|
|
||||||
trans, err := newTJSONTransport(url, r.config.timeoutms, r.config)
|
|
||||||
if err != nil {
|
|
||||||
return errors.Wrap(err, "Error creating realis")
|
|
||||||
}
|
|
||||||
r.config.transport = trans
|
|
||||||
r.config.protoFactory = thrift.NewTJSONProtocolFactory()
|
|
||||||
} else if r.config.binTransport {
|
|
||||||
trans, err := newTBinTransport(url, r.config.timeoutms, r.config)
|
|
||||||
if err != nil {
|
|
||||||
return errors.Wrap(err, "Error creating realis")
|
|
||||||
}
|
|
||||||
r.config.transport = trans
|
|
||||||
r.config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault()
|
|
||||||
}
|
|
||||||
AddBasicAuth(r.config, r.config.username, r.config.password)
|
|
||||||
r.client = aurora.NewAuroraSchedulerManagerClientFactory(r.config.transport, r.config.protoFactory)
|
|
||||||
r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(r.config.transport, r.config.protoFactory)
|
|
||||||
r.adminClient = aurora.NewAuroraAdminClientFactory(r.config.transport, r.config.protoFactory)
|
|
||||||
} else {
|
|
||||||
r.logger.Println(" Missing Data for ReestablishConn ")
|
|
||||||
r.logger.Println(" r.config.cluster: ", r.config.cluster)
|
|
||||||
r.logger.Println(" r.config.username: ", r.config.username)
|
|
||||||
r.logger.Println(" r.config.passwd: ", r.config.password)
|
|
||||||
r.logger.Println(" r.config.url: ", r.config.url)
|
|
||||||
return errors.New(" Missing Data for ReestablishConn ")
|
|
||||||
}
|
}
|
||||||
r.logger.Printf(" config url before return: %+v\n", r.config.url)
|
|
||||||
|
// If we are able to successfully re-connect, make receiver
|
||||||
|
// point to newly established connections.
|
||||||
|
if newClient, ok := newRealis.(*realisClient); ok {
|
||||||
|
r.config = newClient.config
|
||||||
|
r.client = newClient.client
|
||||||
|
r.readonlyClient = newClient.readonlyClient
|
||||||
|
r.adminClient = newClient.adminClient
|
||||||
|
r.logger = newClient.logger
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -503,6 +456,7 @@ func (r *realisClient) ReestablishConn() error {
|
||||||
func (r *realisClient) Close() {
|
func (r *realisClient) Close() {
|
||||||
r.client.Transport.Close()
|
r.client.Transport.Close()
|
||||||
r.readonlyClient.Transport.Close()
|
r.readonlyClient.Transport.Close()
|
||||||
|
r.adminClient.Transport.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Uses predefined set of states to retrieve a set of active jobs in Apache Aurora.
|
// Uses predefined set of states to retrieve a set of active jobs in Apache Aurora.
|
||||||
|
|
|
@ -36,14 +36,14 @@ func TestMain(m *testing.M) {
|
||||||
// New configuration to connect to Vagrant image
|
// New configuration to connect to Vagrant image
|
||||||
r, err = realis.NewRealisClient(realis.SchedulerUrl("http://192.168.33.7:8081"),
|
r, err = realis.NewRealisClient(realis.SchedulerUrl("http://192.168.33.7:8081"),
|
||||||
realis.BasicAuth("aurora", "secret"),
|
realis.BasicAuth("aurora", "secret"),
|
||||||
realis.ThriftJSON(),
|
realis.TimeoutMS(20000))
|
||||||
realis.TimeoutMS(20000),
|
|
||||||
realis.BackOff(&realis.Backoff{Steps: 2, Duration: 10 * time.Second, Factor: 2.0, Jitter: 0.1}))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println("Please run vagrant box before running test suite")
|
fmt.Println("Please run vagrant box before running test suite")
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
defer r.Close()
|
||||||
|
|
||||||
// Create monitor
|
// Create monitor
|
||||||
monitor = &realis.Monitor{Client: r}
|
monitor = &realis.Monitor{Client: r}
|
||||||
|
|
||||||
|
@ -64,6 +64,14 @@ func TestLeaderFromZK(t *testing.T) {
|
||||||
assert.Equal(t, "http://aurora.local:8081", url)
|
assert.Equal(t, "http://aurora.local:8081", url)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRealisClient_ReestablishConn(t *testing.T) {
|
||||||
|
|
||||||
|
// Test that we're able to tear down the old connection and create a new one.
|
||||||
|
err := r.ReestablishConn()
|
||||||
|
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
func TestGetCacerts(t *testing.T) {
|
func TestGetCacerts(t *testing.T) {
|
||||||
certs, err := realis.Getcerts("./examples/certs")
|
certs, err := realis.Getcerts("./examples/certs")
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
31
retry.go
31
retry.go
|
@ -14,8 +14,6 @@ See the License for the specific language governing permissions and
|
||||||
limitations under the License.
|
limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
// Modified version of the Kubernetes exponential-backoff code
|
|
||||||
|
|
||||||
package realis
|
package realis
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
@ -38,6 +36,7 @@ type ConditionFunc func() (done bool, err error)
|
||||||
|
|
||||||
type AuroraThriftCall func() (resp *aurora.Response, err error)
|
type AuroraThriftCall func() (resp *aurora.Response, err error)
|
||||||
|
|
||||||
|
// Modified version of the Kubernetes exponential-backoff code.
|
||||||
// ExponentialBackoff repeats a condition check with exponential backoff.
|
// ExponentialBackoff repeats a condition check with exponential backoff.
|
||||||
//
|
//
|
||||||
// It checks the condition up to Steps times, increasing the wait by multiplying
|
// It checks the condition up to Steps times, increasing the wait by multiplying
|
||||||
|
@ -59,11 +58,23 @@ func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error {
|
||||||
time.Sleep(adjusted)
|
time.Sleep(adjusted)
|
||||||
duration = time.Duration(float64(duration) * backoff.Factor)
|
duration = time.Duration(float64(duration) * backoff.Factor)
|
||||||
}
|
}
|
||||||
if ok, err := condition(); err != nil || ok {
|
|
||||||
return err
|
ok, err := condition()
|
||||||
|
|
||||||
|
// If the function executed says it succeeded, stop retrying
|
||||||
|
if ok {
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Stop retrying if the error is NOT temporary.
|
||||||
|
if err != nil {
|
||||||
|
if !IsTemporary(err) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
return errors.New("Timed out while retrying")
|
return NewTimeoutError(errors.New("Timed out while retrying"))
|
||||||
}
|
}
|
||||||
|
|
||||||
// CheckAndRetryConn function takes realis client and a trhift API function to call and returns response and error
|
// CheckAndRetryConn function takes realis client and a trhift API function to call and returns response and error
|
||||||
|
@ -71,12 +82,14 @@ func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error {
|
||||||
// If Error is retyable return resp and RetryConnErr error.
|
// If Error is retyable return resp and RetryConnErr error.
|
||||||
func CheckAndRetryConn(r Realis, auroraCall AuroraThriftCall) (*aurora.Response, error) {
|
func CheckAndRetryConn(r Realis, auroraCall AuroraThriftCall) (*aurora.Response, error) {
|
||||||
resp, cliErr := auroraCall()
|
resp, cliErr := auroraCall()
|
||||||
if cliErr != nil /*&& (strings.Contains(cliErr.Error(), ConnRefusedErr) || strings.Contains(cliErr.Error(), NoLeaderFoundErr))*/ {
|
|
||||||
|
// TODO: Return different error type based on the error that was returned by the API call
|
||||||
|
if cliErr != nil {
|
||||||
r.ReestablishConn()
|
r.ReestablishConn()
|
||||||
return resp, RetryConnErr
|
return resp, NewPermamentError(RetryConnErr)
|
||||||
}
|
}
|
||||||
if resp != nil && resp.GetResponseCode() == aurora.ResponseCode_ERROR_TRANSIENT {
|
if resp != nil && resp.GetResponseCode() == aurora.ResponseCode_ERROR_TRANSIENT {
|
||||||
return resp, RetryConnErr
|
return resp, NewTemporaryError(errors.New("Aurora scheduler temporarily unavailable"))
|
||||||
}
|
}
|
||||||
return resp, cliErr
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
4
zk.go
4
zk.go
|
@ -48,7 +48,7 @@ func LeaderFromZK(cluster Cluster) (string, error) {
|
||||||
//TODO (rdelvalle): When enabling debugging, change logger here
|
//TODO (rdelvalle): When enabling debugging, change logger here
|
||||||
c, _, err := zk.Connect(endpoints, time.Second*10, func(c *zk.Conn) { c.SetLogger(NoopLogger{}) })
|
c, _, err := zk.Connect(endpoints, time.Second*10, func(c *zk.Conn) { c.SetLogger(NoopLogger{}) })
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, errors.Wrap(err, "Failed to connect to Zookeeper at "+cluster.ZK)
|
return false, NewTemporaryError(errors.Wrap(err, "Failed to connect to Zookeeper at "+cluster.ZK))
|
||||||
}
|
}
|
||||||
|
|
||||||
defer c.Close()
|
defer c.Close()
|
||||||
|
@ -73,7 +73,7 @@ func LeaderFromZK(cluster Cluster) (string, error) {
|
||||||
|
|
||||||
err = json.Unmarshal([]byte(data), serviceInst)
|
err = json.Unmarshal([]byte(data), serviceInst)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, errors.Wrap(err, "Unable to unmarshall contents of leader")
|
return false, NewTemporaryError(errors.Wrap(err, "Unable to unmarshall contents of leader"))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Should only be one endpoint
|
// Should only be one endpoint
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue