From 72b746e43129279425b90b0cd1f00a0637f99beb Mon Sep 17 00:00:00 2001
From: Sivaram Mothiki <sivaram.mothiki@gmail.com>
Date: Sat, 4 Nov 2017 15:06:26 -0700
Subject: [PATCH] use exponential back off func from realis lib (#39)

* use exponential back off func from realis lib

* remove exponential backoffs from monitors

* dont compare for retry errors
---
 .gitignore             |   1 +
 examples/client.go     |   3 +-
 examples/clusters.json |   2 +-
 monitors.go            | 160 ++++-----
 realis.go              | 770 +++++++++++++++++------------------------
 retry.go               |  28 +-
 6 files changed, 407 insertions(+), 557 deletions(-)

diff --git a/.gitignore b/.gitignore
index cd0d5d1..66a91e3 100644
--- a/.gitignore
+++ b/.gitignore
@@ -6,6 +6,7 @@
 # Folders
 _obj
 _test
+.idea
 
 # Architecture specific extensions/prefixes
 *.[568vq]
diff --git a/examples/client.go b/examples/client.go
index 4afd0e9..ac21a28 100644
--- a/examples/client.go
+++ b/examples/client.go
@@ -22,10 +22,11 @@ import (
 
 	"time"
 
+	"strings"
+
 	"github.com/paypal/gorealis"
 	"github.com/paypal/gorealis/gen-go/apache/aurora"
 	"github.com/paypal/gorealis/response"
-	"strings"
 )
 
 var cmd, executor, url, clustersConfig, clusterName, updateId, username, password, zkUrl, hostList string
diff --git a/examples/clusters.json b/examples/clusters.json
index 287a618..c456bd8 100644
--- a/examples/clusters.json
+++ b/examples/clusters.json
@@ -5,4 +5,4 @@
   "auth_mechanism": "UNAUTHENTICATED",
   "slave_run_directory": "latest",
   "slave_root": "/var/lib/mesos"
-}]
\ No newline at end of file
+}]
diff --git a/monitors.go b/monitors.go
index aee8590..16ed7b2 100644
--- a/monitors.go
+++ b/monitors.go
@@ -19,14 +19,15 @@ import (
 	"fmt"
 	"time"
 
-	"github.com/pkg/errors"
 	"github.com/paypal/gorealis/gen-go/apache/aurora"
 	"github.com/paypal/gorealis/response"
+	"github.com/pkg/errors"
 )
 
 const (
 	UpdateFailed = "update failed"
 	RolledBack   = "update rolled back"
+	Timeout      = "timeout"
 )
 
 type Monitor struct {
@@ -40,117 +41,86 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout
 		Key:   &updateKey,
 		Limit: 1,
 	}
-
-	defaultBackoff := m.Client.RealisConfig().backoff
-	duration := defaultBackoff.Duration //defaultBackoff.Duration
-	var err error
+	ticker := time.NewTicker(time.Second * time.Duration(interval))
+	defer ticker.Stop()
+	timer := time.NewTimer(time.Second * time.Duration(timeout))
+	defer timer.Stop()
+	var cliErr error
 	var respDetail *aurora.Response
+	timedout := false
+	for {
+		select {
+		case <-ticker.C:
+			respDetail, cliErr = m.Client.JobUpdateDetails(updateQ)
+			if cliErr != nil {
+				return false, cliErr
+			}
 
-	for i := 0; i*interval <= timeout; i++ {
-		for step := 0; step < defaultBackoff.Steps; step++ {
-			if step != 0 {
-				adjusted := duration
-				if defaultBackoff.Jitter > 0.0 {
-					adjusted = Jitter(duration, defaultBackoff.Jitter)
+			updateDetail := response.JobUpdateDetails(respDetail)
+
+			if len(updateDetail) == 0 {
+				fmt.Println("No update found")
+				return false, errors.New("No update found for " + updateKey.String())
+			}
+			status := updateDetail[0].Update.Summary.State.Status
+
+			if _, ok := aurora.ACTIVE_JOB_UPDATE_STATES[status]; !ok {
+
+				// Rolled forward is the only state in which an update has been successfully updated
+				// if we encounter an inactive state and it is not at rolled forward, update failed
+				switch status {
+				case aurora.JobUpdateStatus_ROLLED_FORWARD:
+					fmt.Println("Update succeded")
+					return true, nil
+				case aurora.JobUpdateStatus_FAILED:
+					fmt.Println("Update failed")
+					return false, errors.New(UpdateFailed)
+				case aurora.JobUpdateStatus_ROLLED_BACK:
+					fmt.Println("rolled back")
+					return false, errors.New(RolledBack)
+				default:
+					return false, nil
 				}
-				fmt.Println(" sleeping for: ", adjusted)
-				time.Sleep(adjusted)
-				duration = time.Duration(float64(duration) * defaultBackoff.Factor)
-			}
-			if respDetail, err = m.Client.JobUpdateDetails(updateQ); err == nil {
-				break
-			}
-			err1 := m.Client.ReestablishConn()
-			if err1 != nil {
-				fmt.Println("error in ReestablishConn: ", err1)
 			}
+		case <-timer.C:
+			timedout = true
 		}
-		// if error remains then return (false, err).
-		if err != nil {
-			return false, err
+		if timedout {
+			break
 		}
-
-		updateDetail := response.JobUpdateDetails(respDetail)
-
-		if len(updateDetail) == 0 {
-			fmt.Println("No update found")
-			return false, errors.New("No update found for " + updateKey.String())
-		}
-		status := updateDetail[0].Update.Summary.State.Status
-
-		if _, ok := aurora.ACTIVE_JOB_UPDATE_STATES[status]; !ok {
-
-			// Rolled forward is the only state in which an update has been successfully updated
-			// if we encounter an inactive state and it is not at rolled forward, update failed
-			switch status {
-			case aurora.JobUpdateStatus_ROLLED_FORWARD:
-				fmt.Println("Update succeded")
-				return true, nil
-			case aurora.JobUpdateStatus_FAILED:
-				fmt.Println("Update failed")
-				return false, errors.New(UpdateFailed)
-			case aurora.JobUpdateStatus_ROLLED_BACK:
-				fmt.Println("rolled back")
-				return false, errors.New(RolledBack)
-			default:
-				return false, nil
-			}
-		}
-
-		fmt.Println("Polling, update still active...")
-		time.Sleep(time.Duration(interval) * time.Second)
 	}
-
-	fmt.Println("Timed out")
-	return false, nil
+	return false, errors.New(Timeout)
 }
 
 func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval int, timeout int) (bool, error) {
 
-	defaultBackoff := m.Client.RealisConfig().backoff
-	duration := defaultBackoff.Duration
-	var err error
+	var cliErr error
 	var live map[int32]bool
+	ticker := time.NewTicker(time.Second * time.Duration(interval))
+	defer ticker.Stop()
+	timer := time.NewTimer(time.Second * time.Duration(timeout))
+	defer timer.Stop()
 
-	for i := 0; i*interval < timeout; i++ {
-		for step := 0; step < defaultBackoff.Steps; step++ {
-			if step != 0 {
-				adjusted := duration
-				if defaultBackoff.Jitter > 0.0 {
-					adjusted = Jitter(duration, defaultBackoff.Jitter)
-				}
-				fmt.Println(" sleeping for: ", adjusted)
-				time.Sleep(adjusted)
-				fmt.Println(" sleeping done")
-				duration = time.Duration(float64(duration) * defaultBackoff.Factor)
+	timedout := false
+	for {
+		select {
+		case <-ticker.C:
+			live, cliErr = m.Client.GetInstanceIds(key, aurora.LIVE_STATES)
+
+			if cliErr != nil {
+				return false, errors.Wrap(cliErr, "Unable to communicate with Aurora")
 			}
-			if live, err = m.Client.GetInstanceIds(key, aurora.LIVE_STATES); err == nil {
-				fmt.Println(" live: ", live)
-				break
+			if len(live) == int(instances) {
+				return true, nil
 			}
-
-			if err != nil {
-				err1 := m.Client.ReestablishConn()
-				if err1 != nil {
-					fmt.Println("error in ReestablishConn: ", err1)
-				}
-			}
-
+		case <-timer.C:
+			timedout = true
 		}
-
-		//live, err := m.Client.GetInstanceIds(key, aurora.LIVE_STATES)
-		if err != nil {
-			return false, errors.Wrap(err, "Unable to communicate with Aurora")
+		if timedout {
+			break
 		}
-		if len(live) == int(instances) {
-			return true, nil
-		}
-		fmt.Println("Polling, instances running: ", len(live))
-		time.Sleep(time.Duration(interval) * time.Second)
 	}
-
-	fmt.Println("Timed out")
-	return false, nil
+	return false, errors.New(Timeout)
 }
 
 // Monitor host status until all hosts match the status provided. Returns a map where the value is true if the host
@@ -206,5 +176,5 @@ func (m *Monitor) HostMaintenance(hosts []string, modes []aurora.MaintenanceMode
 		hostResult[host] = false
 	}
 
-	return hostResult, errors.New("Timed out")
+	return hostResult, errors.New(Timeout)
 }
diff --git a/realis.go b/realis.go
index 01e3950..ac6c20d 100644
--- a/realis.go
+++ b/realis.go
@@ -16,19 +16,19 @@
 package realis
 
 import (
+	"crypto/tls"
 	"encoding/base64"
+	"fmt"
 	"net/http"
 	"net/http/cookiejar"
 	"time"
 
-	"fmt"
-
 	"math/rand"
 
 	"git.apache.org/thrift.git/lib/go/thrift"
-	"github.com/pkg/errors"
 	"github.com/paypal/gorealis/gen-go/apache/aurora"
 	"github.com/paypal/gorealis/response"
+	"github.com/pkg/errors"
 )
 
 const VERSION = "1.0.4"
@@ -307,7 +307,7 @@ func GetDefaultClusterFromZKUrl(zkurl string) *Cluster {
 //This api would create default cluster object..
 func NewDefaultClientUsingZKUrl(zkUrl, user, passwd string) (Realis, error) {
 
-	fmt.Println(" zkUrl: %s", zkUrl)
+	fmt.Printf(" zkUrl: %s\n", zkUrl)
 	cluster := GetDefaultClusterFromZKUrl(zkUrl)
 
 	url, err := LeaderFromZK(*cluster)
@@ -369,9 +369,11 @@ func defaultTTransport(urlstr string, timeoutms int) (thrift.TTransport, error)
 	if err != nil {
 		return &thrift.THttpClient{}, errors.Wrap(err, "Error creating Cookie Jar")
 	}
-
+	transport := &http.Transport{
+		TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
+	}
 	trans, err := thrift.NewTHttpPostClientWithOptions(urlstr+"/api",
-		thrift.THttpClientOptions{Client: &http.Client{Timeout: time.Millisecond * time.Duration(timeoutms), Jar: jar}})
+		thrift.THttpClientOptions{Client: &http.Client{Timeout: time.Millisecond * time.Duration(timeoutms), Transport: transport, Jar: jar}})
 
 	if err != nil {
 		return &thrift.THttpClient{}, errors.Wrap(err, "Error creating transport")
@@ -522,34 +524,26 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche
 		Statuses:    states}
 
 	var resp *aurora.Response
-	var err error
+	var clientErr error
+	retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
+		resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
+			return r.client.GetTasksWithoutConfigs(taskQ)
+		})
+		if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
+			return false, nil
+		}
+		if clientErr != nil {
+			return false, clientErr
+		}
+		return true, nil
+	})
 
-	defaultBackoff := r.config.backoff
-	duration := defaultBackoff.Duration
-	for i := 0; i < defaultBackoff.Steps; i++ {
-		if i != 0 {
-			adjusted := duration
-			if defaultBackoff.Jitter > 0.0 {
-				adjusted = Jitter(duration, defaultBackoff.Jitter)
-			}
-			fmt.Println(" sleeping for: ", adjusted)
-			time.Sleep(adjusted)
-			duration = time.Duration(float64(duration) * defaultBackoff.Factor)
-		}
-		if resp, err = r.client.GetTasksWithoutConfigs(taskQ); err == nil {
-			break
-		}
-		err1 := r.ReestablishConn()
-		if err1 != nil {
-			fmt.Println("error in ReestablishConn: ", err1)
-		}
+	if clientErr != nil {
+		return nil, errors.Wrap(clientErr, retryErr.Error()+": Error querying Aurora Scheduler for active IDs")
 	}
-	if err != nil {
-		return nil, errors.Wrap(err, "Error querying Aurora Scheduler for active IDs")
-	}
-	resp, err = response.ResponseCodeCheck(resp)
-	if err != nil {
-		return nil, err
+	resp, clientErr = response.ResponseCodeCheck(resp)
+	if clientErr != nil {
+		return nil, clientErr
 	}
 	tasks := response.ScheduleStatusResult(resp).GetTasks()
 	jobInstanceIds := make(map[int32]bool)
@@ -562,30 +556,26 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche
 
 func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error) {
 	var resp *aurora.Response
-	var err error
+	var clientErr error
 
-	defaultBackoff := r.config.backoff
-	duration := defaultBackoff.Duration
-	for i := 0; i < defaultBackoff.Steps; i++ {
-		if i != 0 {
-			adjusted := duration
-			if defaultBackoff.Jitter > 0.0 {
-				adjusted = Jitter(duration, defaultBackoff.Jitter)
-			}
-			fmt.Println(" sleeping for: ", adjusted)
-			time.Sleep(adjusted)
-			duration = time.Duration(float64(duration) * defaultBackoff.Factor)
+	retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
+		resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
+			return r.readonlyClient.GetJobUpdateSummaries(jobUpdateQuery)
+		})
+		if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
+			return false, nil
 		}
-		if resp, err = r.readonlyClient.GetJobUpdateSummaries(jobUpdateQuery); err == nil {
-			return response.ResponseCodeCheck(resp)
-		}
-		err1 := r.ReestablishConn()
-		if err1 != nil {
-			fmt.Println("error in ReestablishConn: ", err1)
+		if clientErr != nil {
+			return false, clientErr
 		}
+		return true, nil
+	})
 
+	if clientErr != nil {
+		return nil, errors.Wrap(clientErr, retryErr.Error()+": Error getting job update summaries from Aurora Scheduler")
 	}
-	return nil, errors.Wrap(err, "Error getting job update summaries from Aurora Scheduler")
+	return response.ResponseCodeCheck(resp)
+
 }
 
 // Kill specific instances of a job.
@@ -593,35 +583,29 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a
 
 	instanceIds := make(map[int32]bool)
 	var resp *aurora.Response
-	var err error
-
-	defaultBackoff := r.config.backoff
-	duration := defaultBackoff.Duration
+	var clientErr error
 
 	for _, instId := range instances {
 		instanceIds[instId] = true
 	}
 
-	for i := 0; i < defaultBackoff.Steps; i++ {
-		if i != 0 {
-			adjusted := duration
-			if defaultBackoff.Jitter > 0.0 {
-				adjusted = Jitter(duration, defaultBackoff.Jitter)
-			}
-			fmt.Println(" sleeping for: ", adjusted)
-			time.Sleep(adjusted)
-			duration = time.Duration(float64(duration) * defaultBackoff.Factor)
+	retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
+		resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
+			return r.client.KillTasks(key, instanceIds)
+		})
+		if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
+			return false, nil
 		}
-		if resp, err = r.client.KillTasks(key, instanceIds); err == nil {
-			return response.ResponseCodeCheck(resp)
-		}
-		err1 := r.ReestablishConn()
-		if err1 != nil {
-			fmt.Println("error in ReestablishConn: ", err1)
+		if clientErr != nil {
+			return false, clientErr
 		}
+		return true, nil
+	})
 
+	if clientErr != nil {
+		return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Kill command to Aurora Scheduler")
 	}
-	return nil, errors.Wrap(err, "Error sending Kill command to Aurora Scheduler")
+	return response.ResponseCodeCheck(resp)
 }
 
 func (r *realisClient) RealisConfig() *RealisConfig {
@@ -632,7 +616,7 @@ func (r *realisClient) RealisConfig() *RealisConfig {
 func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
 
 	var instanceIds map[int32]bool
-	var err error
+	var clientErr, err error
 	var resp *aurora.Response
 	instanceIds, err = r.GetInstanceIds(key, aurora.ACTIVE_STATES)
 	if err != nil {
@@ -640,31 +624,22 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
 	}
 
 	if len(instanceIds) > 0 {
-		defaultBackoff := r.config.backoff
-		duration := defaultBackoff.Duration
-		for i := 0; i < defaultBackoff.Steps; i++ {
-			if i != 0 {
-				adjusted := duration
-				if defaultBackoff.Jitter > 0.0 {
-					adjusted = Jitter(duration, defaultBackoff.Jitter)
-				}
-				fmt.Println(" sleeping for: ", adjusted)
-				time.Sleep(adjusted)
-				duration = time.Duration(float64(duration) * defaultBackoff.Factor)
+		retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
+			resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
+				return r.client.KillTasks(key, instanceIds)
+			})
+			if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
+				return false, nil
 			}
-
-			if resp, err = r.client.KillTasks(key, instanceIds); err == nil {
-				return response.ResponseCodeCheck(resp)
-			}
-
-			err1 := r.ReestablishConn()
-			if err1 != nil {
-				fmt.Println("error in ReestablishConn: ", err1)
+			if clientErr != nil {
+				return false, clientErr
 			}
+			return true, nil
+		})
+		if clientErr != nil {
+			return nil, errors.Wrap(err, retryErr.Error()+"Error sending Kill command to Aurora Scheduler")
 		}
-		if err != nil {
-			return nil, errors.Wrap(err, "Error sending Kill command to Aurora Scheduler")
-		}
+		return response.ResponseCodeCheck(resp)
 	}
 	return nil, errors.New("No tasks in the Active state")
 }
@@ -672,119 +647,98 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
 // Sends a create job message to the scheduler with a specific job configuration.
 func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
 	var resp *aurora.Response
-	var err error
+	var clientErr error
 
-	defaultBackoff := r.config.backoff
-	duration := defaultBackoff.Duration
-	for i := 0; i < defaultBackoff.Steps; i++ {
-		if i != 0 {
-			fmt.Println(" STEPS: ", i)
-			adjusted := duration
-			if defaultBackoff.Jitter > 0.0 {
-				adjusted = Jitter(duration, defaultBackoff.Jitter)
-			}
-			fmt.Println(" sleeping for: ", adjusted)
-			time.Sleep(adjusted)
-			duration = time.Duration(float64(duration) * defaultBackoff.Factor)
+	retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
+		resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
+			return r.client.CreateJob(auroraJob.JobConfig())
+		})
+		if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
+			return false, nil
 		}
-		fmt.Println(" calling  CreateJob")
-		if resp, err = r.client.CreateJob(auroraJob.JobConfig()); err == nil {
-			return response.ResponseCodeCheck(resp)
-		}
-		fmt.Printf("CreateJob err: %+v\n", err)
-		err1 := r.ReestablishConn()
-		if err1 != nil {
-			fmt.Println("error in ReestablishConn: ", err1)
+		if clientErr != nil {
+			return false, clientErr
 		}
+		return true, nil
+	})
+
+	if clientErr != nil {
+		return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending Create command to Aurora Scheduler")
 	}
-	return nil, errors.Wrap(err, "Error sending Create command to Aurora Scheduler")
+	return response.ResponseCodeCheck(resp)
+
 }
 
 func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) {
 	var resp *aurora.Response
-	var err error
+	var clientErr error
 
-	defaultBackoff := r.config.backoff
-	duration := defaultBackoff.Duration
-	for i := 0; i < defaultBackoff.Steps; i++ {
-		if i != 0 {
-			adjusted := duration
-			if defaultBackoff.Jitter > 0.0 {
-				adjusted = Jitter(duration, defaultBackoff.Jitter)
-			}
-			fmt.Println(" sleeping for: ", adjusted)
-			time.Sleep(adjusted)
-			duration = time.Duration(float64(duration) * defaultBackoff.Factor)
+	retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
+		resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
+			return r.client.ScheduleCronJob(auroraJob.JobConfig())
+		})
+		if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
+			return false, nil
 		}
+		if clientErr != nil {
+			return false, clientErr
+		}
+		return true, nil
+	})
 
-		if resp, err = r.client.ScheduleCronJob(auroraJob.JobConfig()); err == nil {
-			return response.ResponseCodeCheck(resp)
-		}
-		err1 := r.ReestablishConn()
-		if err1 != nil {
-			fmt.Println("error in ReestablishConn: ", err1)
-		}
+	if clientErr != nil {
+		return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending Cron Job Schedule message to Aurora Scheduler")
 	}
-	return nil, errors.Wrap(err, "Error sending Cron Job Schedule message to Aurora Scheduler")
+	return response.ResponseCodeCheck(resp)
 }
 
 func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) {
 
 	var resp *aurora.Response
-	var err error
+	var clientErr error
 
-	defaultBackoff := r.config.backoff
-	duration := defaultBackoff.Duration
-	for i := 0; i < defaultBackoff.Steps; i++ {
-		if i != 0 {
-			adjusted := duration
-			if defaultBackoff.Jitter > 0.0 {
-				adjusted = Jitter(duration, defaultBackoff.Jitter)
-			}
-			fmt.Println(" sleeping for: ", adjusted)
-			time.Sleep(adjusted)
-			duration = time.Duration(float64(duration) * defaultBackoff.Factor)
+	retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
+		resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
+			return r.client.DescheduleCronJob(key)
+		})
+		if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
+			return false, nil
 		}
+		if clientErr != nil {
+			return false, clientErr
+		}
+		return true, nil
+	})
 
-		if resp, err = r.client.DescheduleCronJob(key); err == nil {
-			return response.ResponseCodeCheck(resp)
-		}
-		err1 := r.ReestablishConn()
-		if err1 != nil {
-			fmt.Println("error in ReestablishConn: ", err1)
-		}
+	if clientErr != nil {
+		return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending Cron Job De-schedule message to Aurora Scheduler")
 	}
+	return response.ResponseCodeCheck(resp)
 
-	return nil, errors.Wrap(err, "Error sending Cron Job De-schedule message to Aurora Scheduler")
 }
 
 func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error) {
 	var resp *aurora.Response
-	var err error
+	var clientErr error
 
-	defaultBackoff := r.config.backoff
-	duration := defaultBackoff.Duration
-	for i := 0; i < defaultBackoff.Steps; i++ {
-		if i != 0 {
-			adjusted := duration
-			if defaultBackoff.Jitter > 0.0 {
-				adjusted = Jitter(duration, defaultBackoff.Jitter)
-			}
-			fmt.Println(" sleeping for: ", adjusted)
-			time.Sleep(adjusted)
-			duration = time.Duration(float64(duration) * defaultBackoff.Factor)
+	retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
+		resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
+			return r.client.StartCronJob(key)
+		})
+		if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
+			return false, nil
 		}
+		if clientErr != nil {
+			return false, clientErr
+		}
+		return true, nil
+	})
 
-		if resp, err = r.client.StartCronJob(key); err == nil {
-			return response.ResponseCodeCheck(resp)
-		}
-		err1 := r.ReestablishConn()
-		if err1 != nil {
-			fmt.Println("error in ReestablishConn: ", err1)
-		}
+	if clientErr != nil {
+		return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending Start Cron Job  message to Aurora Scheduler")
 	}
+	return response.ResponseCodeCheck(resp)
 
-	return nil, errors.Wrap(err, "Error sending Start Cron Job  message to Aurora Scheduler")
 }
 
 // Restarts specific instances specified
@@ -795,31 +749,25 @@ func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32)
 		instanceIds[instId] = true
 	}
 	var resp *aurora.Response
-	var err error
+	var clientErr error
 
-	defaultBackoff := r.config.backoff
-	duration := defaultBackoff.Duration
-	for i := 0; i < defaultBackoff.Steps; i++ {
-		if i != 0 {
-			adjusted := duration
-			if defaultBackoff.Jitter > 0.0 {
-				adjusted = Jitter(duration, defaultBackoff.Jitter)
-			}
-			fmt.Println(" sleeping for: ", adjusted)
-			time.Sleep(adjusted)
-			duration = time.Duration(float64(duration) * defaultBackoff.Factor)
-		}
-
-		if resp, err = r.client.RestartShards(key, instanceIds); err == nil {
-			return response.ResponseCodeCheck(resp)
-		}
-		err1 := r.ReestablishConn()
-		if err1 != nil {
-			fmt.Println("error in ReestablishConn: ", err1)
+	retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
+		resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
+			return r.client.RestartShards(key, instanceIds)
+		})
+		if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
+			return false, nil
 		}
+		if clientErr != nil {
+			return false, clientErr
+		}
+		return true, nil
+	})
 
+	if clientErr != nil {
+		return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending Restart command to Aurora Scheduler")
 	}
-	return nil, errors.Wrap(err, "Error sending Restart command to Aurora Scheduler")
+	return response.ResponseCodeCheck(resp)
 }
 
 // Restarts all active tasks under a job configuration.
@@ -829,33 +777,25 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
 	if err1 != nil {
 		return nil, errors.Wrap(err1, "Could not retrieve relevant task instance IDs")
 	}
-
 	var resp *aurora.Response
-	var err error
+	var clientErr error
 	if len(instanceIds) > 0 {
-		defaultBackoff := r.config.backoff
-		duration := defaultBackoff.Duration
-		for i := 0; i < defaultBackoff.Steps; i++ {
-			if i != 0 {
-				adjusted := duration
-				if defaultBackoff.Jitter > 0.0 {
-					adjusted = Jitter(duration, defaultBackoff.Jitter)
-				}
-				fmt.Println(" sleeping for: ", adjusted)
-				time.Sleep(adjusted)
-				duration = time.Duration(float64(duration) * defaultBackoff.Factor)
+		retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
+			resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
+				return r.client.RestartShards(key, instanceIds)
+			})
+			if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
+				return false, nil
 			}
-
-			if resp, err = r.client.RestartShards(key, instanceIds); err == nil {
-				return response.ResponseCodeCheck(resp)
+			if clientErr != nil {
+				return false, clientErr
 			}
-			err1 := r.ReestablishConn()
-			if err1 != nil {
-				fmt.Println("error in ReestablishConn: ", err1)
-			}
-
+			return true, nil
+		})
+		if clientErr != nil {
+			return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending Restart command to Aurora Scheduler")
 		}
-		return nil, errors.Wrap(err, "Error sending Restart command to Aurora Scheduler")
+		return response.ResponseCodeCheck(resp)
 
 	} else {
 		return nil, errors.New("No tasks in the Active state")
@@ -866,60 +806,51 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
 func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) {
 
 	var resp *aurora.Response
-	var err error
+	var clientErr error
 
-	defaultBackoff := r.config.backoff
-	duration := defaultBackoff.Duration
-	for i := 0; i < defaultBackoff.Steps; i++ {
-		if i != 0 {
-			adjusted := duration
-			if defaultBackoff.Jitter > 0.0 {
-				adjusted = Jitter(duration, defaultBackoff.Jitter)
-			}
-			fmt.Println(" sleeping for: ", adjusted)
-			time.Sleep(adjusted)
-			duration = time.Duration(float64(duration) * defaultBackoff.Factor)
+	retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
+		resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
+			return r.client.StartJobUpdate(updateJob.req, message)
+		})
+		if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
+			return false, nil
 		}
-		if resp, err = r.client.StartJobUpdate(updateJob.req, message); err == nil {
-			return response.ResponseCodeCheck(resp)
-		}
-		err1 := r.ReestablishConn()
-		if err1 != nil {
-			fmt.Println("error in ReestablishConn: ", err1)
+		if clientErr != nil {
+			return false, clientErr
 		}
+		return true, nil
+	})
+
+	if clientErr != nil {
+		return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending StartJobUpdate command to Aurora Scheduler")
 	}
-	return nil, errors.Wrap(err, "Error sending StartJobUpdate command to Aurora Scheduler")
+	return response.ResponseCodeCheck(resp)
 }
 
 // Abort Job Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI.
 func (r *realisClient) AbortJobUpdate(
 	updateKey aurora.JobUpdateKey,
 	message string) (*aurora.Response, error) {
-
 	var resp *aurora.Response
-	var err error
-	defaultBackoff := r.config.backoff
-	duration := defaultBackoff.Duration
-	for i := 0; i < defaultBackoff.Steps; i++ {
-		if i != 0 {
-			adjusted := duration
-			if defaultBackoff.Jitter > 0.0 {
-				adjusted = Jitter(duration, defaultBackoff.Jitter)
-			}
-			fmt.Println(" sleeping for: ", adjusted)
-			time.Sleep(adjusted)
-			duration = time.Duration(float64(duration) * defaultBackoff.Factor)
-		}
-		if resp, err = r.client.AbortJobUpdate(&updateKey, message); err == nil {
-			return response.ResponseCodeCheck(resp)
-		}
-		err1 := r.ReestablishConn()
-		if err1 != nil {
-			fmt.Println("error in ReestablishConn: ", err1)
-		}
-	}
+	var clientErr error
 
-	return nil, errors.Wrap(err, "Error sending AbortJobUpdate command to Aurora Scheduler")
+	retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
+		resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
+			return r.client.AbortJobUpdate(&updateKey, message)
+		})
+		if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
+			return false, nil
+		}
+		if clientErr != nil {
+			return false, clientErr
+		}
+		return true, nil
+	})
+
+	if clientErr != nil {
+		return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending AbortJobUpdate command to Aurora Scheduler")
+	}
+	return response.ResponseCodeCheck(resp)
 }
 
 // Scale up the number of instances under a job configuration using the configuration for specific
@@ -927,28 +858,26 @@ func (r *realisClient) AbortJobUpdate(
 func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) {
 
 	var resp *aurora.Response
-	var err error
-	defaultBackoff := r.config.backoff
-	duration := defaultBackoff.Duration
-	for i := 0; i < defaultBackoff.Steps; i++ {
-		if i != 0 {
-			adjusted := duration
-			if defaultBackoff.Jitter > 0.0 {
-				adjusted = Jitter(duration, defaultBackoff.Jitter)
-			}
-			fmt.Println(" sleeping for: ", adjusted)
-			time.Sleep(adjusted)
-			duration = time.Duration(float64(duration) * defaultBackoff.Factor)
+	var clientErr error
+
+	retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
+		resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
+			return r.client.AddInstances(&instKey, count)
+		})
+		if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
+			return false, nil
 		}
-		if resp, err = r.client.AddInstances(&instKey, count); err == nil {
-			return response.ResponseCodeCheck(resp)
-		}
-		err1 := r.ReestablishConn()
-		if err1 != nil {
-			fmt.Println("error in ReestablishConn: ", err1)
+		if clientErr != nil {
+			return false, clientErr
 		}
+		return true, nil
+	})
+
+	if clientErr != nil {
+		return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending AddInstances command to Aurora Scheduler")
 	}
-	return nil, errors.Wrap(err, "Error sending AddInstances command to Aurora Scheduler")
+	return response.ResponseCodeCheck(resp)
+
 }
 
 //Scale down the number of instances under a job configuration using the configuration of a specific instance
@@ -977,30 +906,23 @@ func (r *realisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora
 func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) {
 
 	var resp *aurora.Response
-	var err error
-	defaultBackoff := r.config.backoff
-	duration := defaultBackoff.Duration
-	for i := 0; i < defaultBackoff.Steps; i++ {
-		if i != 0 {
-			adjusted := duration
-			if defaultBackoff.Jitter > 0.0 {
-				adjusted = Jitter(duration, defaultBackoff.Jitter)
-			}
-			fmt.Println(" sleeping for: ", adjusted)
-			time.Sleep(adjusted)
-			duration = time.Duration(float64(duration) * defaultBackoff.Factor)
-		}
-		if resp, err = r.client.GetTasksStatus(query); err == nil {
-			break
-		}
-		err1 := r.ReestablishConn()
-		if err1 != nil {
-			fmt.Println("error in ReestablishConn: ", err1)
-		}
-	}
+	var clientErr error
 
-	if err != nil {
-		return nil, errors.Wrap(err, "Error querying Aurora Scheduler for task status")
+	retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
+		resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
+			return r.client.GetTasksStatus(query)
+		})
+		if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
+			return false, nil
+		}
+		if clientErr != nil {
+			return false, clientErr
+		}
+		return true, nil
+	})
+
+	if clientErr != nil {
+		return nil, errors.Wrap(clientErr, retryErr.Error()+"Error querying Aurora Scheduler for task status")
 	}
 	//Check for response code..
 	if resp.GetResponseCode() != aurora.ResponseCode_OK {
@@ -1013,30 +935,23 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.S
 // Get information about task including without a task configuration object
 func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) {
 	var resp *aurora.Response
-	var err error
-	defaultBackoff := r.config.backoff
-	duration := defaultBackoff.Duration
-	for i := 0; i < defaultBackoff.Steps; i++ {
-		if i != 0 {
-			adjusted := duration
-			if defaultBackoff.Jitter > 0.0 {
-				adjusted = Jitter(duration, defaultBackoff.Jitter)
-			}
-			fmt.Println(" sleeping for: ", adjusted)
-			time.Sleep(adjusted)
-			duration = time.Duration(float64(duration) * defaultBackoff.Factor)
-		}
-		if resp, err = r.client.GetTasksWithoutConfigs(query); err == nil {
-			break
-		}
-		err1 := r.ReestablishConn()
-		if err1 != nil {
-			fmt.Println("error in ReestablishConn: ", err1)
-		}
-	}
+	var clientErr error
 
-	if err != nil {
-		return nil, errors.Wrap(err, "Error querying Aurora Scheduler for task status without configs")
+	retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
+		resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
+			return r.client.GetTasksWithoutConfigs(query)
+		})
+		if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
+			return false, nil
+		}
+		if clientErr != nil {
+			return false, clientErr
+		}
+		return true, nil
+	})
+
+	if clientErr != nil {
+		return nil, errors.Wrap(clientErr, retryErr.Error()+"Error querying Aurora Scheduler for task status without configs")
 	}
 	//Check for response code..
 	if resp.GetResponseCode() != aurora.ResponseCode_OK {
@@ -1044,6 +959,7 @@ func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks []
 	}
 
 	return response.ScheduleStatusResult(resp).GetTasks(), nil
+
 }
 
 func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error) {
@@ -1058,34 +974,24 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
 		Statuses:    aurora.ACTIVE_STATES}
 
 	var resp *aurora.Response
-	var err error
+	var clientErr error
 
-	defaultBackoff := r.config.backoff
-	duration := defaultBackoff.Duration
-	for i := 0; i < defaultBackoff.Steps; i++ {
-		if i != 0 {
-			adjusted := duration
-			if defaultBackoff.Jitter > 0.0 {
-				adjusted = Jitter(duration, defaultBackoff.Jitter)
-			}
-			fmt.Println(" sleeping for: ", adjusted)
-			time.Sleep(adjusted)
-			duration = time.Duration(float64(duration) * defaultBackoff.Factor)
+	retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
+		resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
+			return r.client.GetTasksStatus(taskQ)
+		})
+		if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
+			return false, nil
 		}
+		if clientErr != nil {
+			return false, clientErr
+		}
+		return true, nil
+	})
 
-		if resp, err = r.client.GetTasksStatus(taskQ); err == nil {
-			break
-		}
-		err1 := r.ReestablishConn()
-		if err1 != nil {
-			fmt.Println("error in ReestablishConn: ", err1)
-		}
+	if clientErr != nil {
+		return nil, errors.Wrap(clientErr, retryErr.Error()+"Error querying Aurora Scheduler for task configuration")
 	}
-
-	if err != nil {
-		return nil, errors.Wrap(err, "Error querying Aurora Scheduler for task configuration")
-	}
-
 	//Check for response code..
 	if resp.GetResponseCode() != aurora.ResponseCode_OK {
 		return nil, errors.New(resp.ResponseCode.String() + "--" + response.CombineMessage(resp))
@@ -1108,57 +1014,49 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
 func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aurora.Response, error) {
 
 	var resp *aurora.Response
-	var err error
+	var clientErr error
 
-	defaultBackoff := r.config.backoff
-	duration := defaultBackoff.Duration
-	for i := 0; i < defaultBackoff.Steps; i++ {
-		if i != 0 {
-			adjusted := duration
-			if defaultBackoff.Jitter > 0.0 {
-				adjusted = Jitter(duration, defaultBackoff.Jitter)
-			}
-			fmt.Println(" sleeping for: ", adjusted)
-			time.Sleep(adjusted)
-			duration = time.Duration(float64(duration) * defaultBackoff.Factor)
+	retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
+		resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
+			return r.client.GetJobUpdateDetails(&updateQuery)
+		})
+		if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
+			return false, nil
 		}
-		if resp, err = r.client.GetJobUpdateDetails(&updateQuery); err == nil {
-			return response.ResponseCodeCheck(resp)
-		}
-		err1 := r.ReestablishConn()
-		if err1 != nil {
-			fmt.Println("error in ReestablishConn: ", err1)
+		if clientErr != nil {
+			return false, clientErr
 		}
+		return true, nil
+	})
+
+	if clientErr != nil {
+		return nil, errors.Wrap(clientErr, retryErr.Error()+"Unable to get job update details")
 	}
-	return nil, errors.Wrap(err, "Unable to get job update details")
+	return response.ResponseCodeCheck(resp)
+
 }
 
 func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) {
 	var resp *aurora.Response
-	var err error
+	var clientErr error
 
-	defaultBackoff := r.config.backoff
-	duration := defaultBackoff.Duration
-	for i := 0; i < defaultBackoff.Steps; i++ {
-		if i != 0 {
-			adjusted := duration
-			if defaultBackoff.Jitter > 0.0 {
-				adjusted = Jitter(duration, defaultBackoff.Jitter)
-			}
-			fmt.Println(" sleeping for: ", adjusted)
-			time.Sleep(adjusted)
-			duration = time.Duration(float64(duration) * defaultBackoff.Factor)
+	retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
+		resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
+			return r.client.RollbackJobUpdate(&key, message)
+		})
+		if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
+			return false, nil
 		}
-		if resp, err = r.client.RollbackJobUpdate(&key, message); err == nil {
-			return response.ResponseCodeCheck(resp)
-		}
-		err1 := r.ReestablishConn()
-		if err1 != nil {
-			fmt.Println("error in ReestablishConn: ", err1)
+		if clientErr != nil {
+			return false, clientErr
 		}
+		return true, nil
+	})
+
+	if clientErr != nil {
+		return nil, errors.Wrap(clientErr, retryErr.Error()+"Unable to roll back job update")
 	}
-
-	return nil, errors.Wrap(err, "Unable to roll back job update")
+	return response.ResponseCodeCheck(resp)
 }
 
 // Set a list of nodes to DRAINING. This means nothing will be able to be scheduled on them and any existing
@@ -1168,7 +1066,7 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr
 
 	var resp *aurora.Response
 	var result *aurora.DrainHostsResult_
-	var returnErr, clientErr, payloadErr error
+	var clientErr error
 
 	if len(hosts) == 0 {
 		return nil, nil, errors.New("no hosts provided to drain")
@@ -1180,50 +1078,27 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr
 		drainList.HostNames[host] = true
 	}
 
-	retryErr := ExponentialBackoff(defaultBackoff, func() (bool, error) {
-
-		// Send thrift call, if we have a thrift send error, attempt to reconnect
-		// and continue trying to resend command
-		if resp, clientErr = r.adminClient.DrainHosts(drainList); clientErr != nil {
-			// Experienced an connection error
-			err1 := r.ReestablishConn()
-			if err1 != nil {
-				fmt.Println("error in re-establishing connection: ", err1)
-			}
+	retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
+		resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
+			return r.adminClient.DrainHosts(drainList)
+		})
+		if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
 			return false, nil
 		}
-
-		// If error is NOT due to connection
-		if _, payloadErr = response.ResponseCodeCheck(resp); payloadErr != nil {
-			// TODO(rdelvalle): an leader election may cause the response to have
-			// failed when it should have succeeded. Retry everything for now until
-			// we figure out a more concrete fix.
-			return false, nil
+		if clientErr != nil {
+			return false, clientErr
 		}
-
-		// Successful call
 		return true, nil
-
 	})
 
+	if clientErr != nil {
+		return nil, nil, errors.Wrap(clientErr, retryErr.Error()+"Unable to recover connection")
+	}
+
 	if resp != nil && resp.GetResult_() != nil {
 		result = resp.GetResult_().GetDrainHostsResult_()
 	}
 
-	// Prioritize returning a bad payload error over a client error as a bad payload error indicates
-	// a deeper issue
-	if payloadErr != nil {
-		returnErr = payloadErr
-	} else {
-		returnErr = clientErr
-	}
-
-	// Timed out on retries. *Note that when we fix the unexpected errors with a correct payload,
-	// this will can become either a timeout error or a payload error
-	if retryErr != nil {
-		return resp, result, errors.Wrap(returnErr, "Unable to recover connection")
-	}
-
 	return resp, result, nil
 }
 
@@ -1231,7 +1106,7 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror
 
 	var resp *aurora.Response
 	var result *aurora.EndMaintenanceResult_
-	var returnErr, clientErr, payloadErr error
+	var clientErr error
 
 	if len(hosts) == 0 {
 		return nil, nil, errors.New("no hosts provided to end maintenance on")
@@ -1243,50 +1118,27 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror
 		hostList.HostNames[host] = true
 	}
 
-	retryErr := ExponentialBackoff(defaultBackoff, func() (bool, error) {
-
-		// Send thrift call, if we have a thrift send error, attempt to reconnect
-		// and continue trying to resend command
-		if resp, clientErr = r.adminClient.EndMaintenance(hostList); clientErr != nil {
-			// Experienced an connection error
-			err1 := r.ReestablishConn()
-			if err1 != nil {
-				fmt.Println("error in re-establishing connection: ", err1)
-			}
+	retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
+		resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
+			return r.adminClient.EndMaintenance(hostList)
+		})
+		if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
 			return false, nil
 		}
-
-		// If error is NOT due to connection
-		if _, payloadErr = response.ResponseCodeCheck(resp); payloadErr != nil {
-			// TODO(rdelvalle): an leader election may cause the response to have
-			// failed when it should have succeeded. Retry everything for now until
-			// we figure out a more concrete fix.
-			return false, nil
+		if clientErr != nil {
+			return false, clientErr
 		}
-
-		// Successful call
 		return true, nil
-
 	})
 
+	if clientErr != nil {
+		return nil, nil, errors.Wrap(clientErr, retryErr.Error()+"Unable to recover connection")
+	}
+
 	if resp != nil && resp.GetResult_() != nil {
 		result = resp.GetResult_().GetEndMaintenanceResult_()
 	}
 
-	// Prioritize returning a bad payload error over a client error as a bad payload error indicates
-	// a deeper issue
-	if payloadErr != nil {
-		returnErr = payloadErr
-	} else {
-		returnErr = clientErr
-	}
-
-	// Timed out on retries. *Note that when we fix the unexpected errors with a correct payload,
-	// this will can become either a timeout error or a payload error
-	if retryErr != nil {
-		return resp, result, errors.Wrap(returnErr, "Unable to recover connection")
-	}
-
 	return resp, result, nil
 }
 
diff --git a/retry.go b/retry.go
index fba37d4..ae1da23 100644
--- a/retry.go
+++ b/retry.go
@@ -19,14 +19,25 @@ limitations under the License.
 package realis
 
 import (
-	"time"
 	"errors"
+	"time"
+
+	"github.com/paypal/gorealis/gen-go/apache/aurora"
 )
 
+const (
+	ConnRefusedErr   = "connection refused"
+	NoLeaderFoundErr = "No leader found"
+)
+
+var RetryConnErr = errors.New("error occured during with aurora retrying")
+
 // ConditionFunc returns true if the condition is satisfied, or an error
 // if the loop should be aborted.
 type ConditionFunc func() (done bool, err error)
 
+type AuroraThriftCall func() (resp *aurora.Response, err error)
+
 // ExponentialBackoff repeats a condition check with exponential backoff.
 //
 // It checks the condition up to Steps times, increasing the wait by multiplying
@@ -54,3 +65,18 @@ func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error {
 	}
 	return errors.New("Timed out while retrying")
 }
+
+// CheckAndRetryConn function takes realis client and a trhift API function to call and returns response and error
+// If Error from the APi call is Retry able . THe functions re establishes the connection with aurora by getting the latest aurora master from zookeeper.
+// If Error is retyable return resp and RetryConnErr error.
+func CheckAndRetryConn(r Realis, auroraCall AuroraThriftCall) (*aurora.Response, error) {
+	resp, cliErr := auroraCall()
+	if cliErr != nil /*&& (strings.Contains(cliErr.Error(), ConnRefusedErr) || strings.Contains(cliErr.Error(), NoLeaderFoundErr))*/ {
+		r.ReestablishConn()
+		return resp, RetryConnErr
+	}
+	if resp != nil && resp.GetResponseCode() == aurora.ResponseCode_ERROR_TRANSIENT {
+		return resp, RetryConnErr
+	}
+	return resp, cliErr
+}