From a4eb4bceed9f11bd98a10351a3a5028fde363481 Mon Sep 17 00:00:00 2001
From: Lawrence Wong <lawwong@paypal.com>
Date: Tue, 3 Jan 2023 11:44:29 -0800
Subject: [PATCH] merge retry mechanism change from gorealis v1 to gorealis v2
 Part 2

---
 go.mod               |   2 +-
 go.sum               |   4 +
 helpers.go           |  21 +++++
 realis.go            | 206 +++++++++++++++++++++++++++++++----------
 realis_admin.go      |  59 +++++++-----
 response/response.go |   4 +
 retry.go             | 216 +++++++++++++++++++++++++------------------
 util.go              |   2 +-
 8 files changed, 351 insertions(+), 163 deletions(-)
 create mode 100644 helpers.go

diff --git a/go.mod b/go.mod
index 3b3095f..c9c3dee 100644
--- a/go.mod
+++ b/go.mod
@@ -4,7 +4,7 @@ require (
 	github.com/apache/thrift v0.14.0
 	github.com/pkg/errors v0.9.1
 	github.com/samuel/go-zookeeper v0.0.0-20171117190445-471cd4e61d7a
-	github.com/stretchr/testify v1.5.0
+	github.com/stretchr/testify v1.7.0
 )
 
 go 1.16
diff --git a/go.sum b/go.sum
index 9324c97..d65a779 100644
--- a/go.sum
+++ b/go.sum
@@ -12,7 +12,11 @@ github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/testify v1.5.0 h1:DMOzIV76tmoDNE9pX6RSN0aDtCYeCg5VueieJaAo1uw=
 github.com/stretchr/testify v1.5.0/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
+github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
+github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
 gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/helpers.go b/helpers.go
new file mode 100644
index 0000000..adaa9a6
--- /dev/null
+++ b/helpers.go
@@ -0,0 +1,21 @@
+package realis
+
+import (
+	"context"
+
+	"github.com/aurora-scheduler/gorealis/v2/gen-go/apache/aurora"
+)
+
+func (r *Client) jobExists(key aurora.JobKey) (bool, error) {
+	resp, err := r.client.GetConfigSummary(context.TODO(), &key)
+	if err != nil {
+		return false, err
+	}
+
+	return !(resp == nil ||
+			resp.GetResult_() == nil ||
+			resp.GetResult_().GetConfigSummaryResult_() == nil ||
+			resp.GetResult_().GetConfigSummaryResult_().GetSummary() == nil ||
+			resp.GetResponseCode() != aurora.ResponseCode_OK),
+		nil
+}
diff --git a/realis.go b/realis.go
index 1219e92..3552f61 100644
--- a/realis.go
+++ b/realis.go
@@ -315,11 +315,13 @@ func (c *Client) GetInstanceIds(key aurora.JobKey, states []aurora.ScheduleStatu
 		Statuses:    states,
 	}
 
-	c.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", taskQ)
+	c.logger.DebugPrintf("GetInstanceIds Thrift Payload: %+v\n", taskQ)
 
 	resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
 		return c.client.GetTasksWithoutConfigs(context.TODO(), taskQ)
-	})
+	},
+		nil,
+	)
 
 	// If we encountered an error we couldn't recover from by retrying, return an error to the user
 	if retryErr != nil {
@@ -341,8 +343,13 @@ func (c *Client) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*
 
 	resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
 		return c.readonlyClient.GetJobUpdateSummaries(context.TODO(), jobUpdateQuery)
-	})
+	},
+		nil,
+	)
 
+	if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetGetJobUpdateSummariesResult_() == nil {
+		return nil, errors.New("unexpected response from scheduler")
+	}
 	if retryErr != nil {
 		return nil, errors.Wrap(retryErr, "error getting job update summaries from Aurora Scheduler")
 	}
@@ -354,8 +361,12 @@ func (c *Client) GetJobSummary(role string) (*aurora.JobSummaryResult_, error) {
 
 	resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
 		return c.readonlyClient.GetJobSummary(context.TODO(), role)
-	})
-
+	},
+		nil,
+	)
+	if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetJobSummaryResult_() == nil {
+		return nil, errors.New("unexpected response from scheduler")
+	}
 	if retryErr != nil {
 		return nil, errors.Wrap(retryErr, "error getting job summaries from Aurora Scheduler")
 	}
@@ -369,13 +380,15 @@ func (c *Client) GetJobs(role string) (*aurora.GetJobsResult_, error) {
 
 	resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
 		return c.readonlyClient.GetJobs(context.TODO(), role)
-	})
+	},
+		nil,
+	)
 
 	if retryErr != nil {
 		return result, errors.Wrap(retryErr, "error getting Jobs from Aurora Scheduler")
 	}
 
-	if resp.GetResult_() != nil {
+	if resp != nil && resp.GetResult_() != nil {
 		result = resp.GetResult_().GetJobsResult_
 	}
 
@@ -389,19 +402,19 @@ func (c *Client) KillInstances(key aurora.JobKey, instances ...int32) (bool, err
 
 	resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
 		return c.client.KillTasks(context.TODO(), &key, instances, "")
-	})
+	},
+		nil,
+	)
 
 	if retryErr != nil {
 		return false, errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler")
 	}
 
-	if len(resp.GetDetails()) > 0 {
+	if resp == nil || len(resp.GetDetails()) > 0 {
 		c.logger.Println("KillTasks was called but no tasks killed as a result.")
 		return false, nil
-	} else {
-		return true, nil
 	}
-
+	return true, nil
 }
 
 func (c *Client) RealisConfig() *clientConfig {
@@ -416,7 +429,9 @@ func (c *Client) KillJob(key aurora.JobKey) error {
 	_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
 		// Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards
 		return c.client.KillTasks(context.TODO(), &key, nil, "")
-	})
+	},
+		nil,
+	)
 
 	if retryErr != nil {
 		return errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler")
@@ -438,9 +453,27 @@ func (c *Client) CreateJob(auroraJob *AuroraJob) error {
 		return errors.Wrap(err, "unable to create Thermos payload")
 	}
 
-	_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
-		return c.client.CreateJob(context.TODO(), auroraJob.JobConfig())
-	})
+	// Response is checked by the thrift retry code
+	_, retryErr := c.thriftCallWithRetries(
+		false,
+		func() (*aurora.Response, error) {
+			return c.client.CreateJob(context.TODO(), auroraJob.JobConfig())
+		},
+		// On a client timeout, attempt to verify that payload made to the Scheduler by
+		// trying to get the config summary for the job key
+		func() (*aurora.Response, bool) {
+			exists, err := c.jobExists(auroraJob.JobKey())
+			if err != nil {
+				c.logger.Print("verification failed ", err)
+			}
+
+			if exists {
+				return &aurora.Response{ResponseCode: aurora.ResponseCode_OK}, true
+			}
+
+			return nil, false
+		},
+	)
 
 	if retryErr != nil {
 		return errors.Wrap(retryErr, "error sending Create command to Aurora Scheduler")
@@ -471,7 +504,9 @@ func (c *Client) ScheduleCronJob(auroraJob *AuroraJob) error {
 
 	_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
 		return c.client.ScheduleCronJob(context.TODO(), auroraJob.JobConfig())
-	})
+	},
+		nil,
+	)
 
 	if retryErr != nil {
 		return errors.Wrap(retryErr, "error sending Cron AuroraJob Schedule message to Aurora Scheduler")
@@ -485,7 +520,9 @@ func (c *Client) DescheduleCronJob(key aurora.JobKey) error {
 
 	_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
 		return c.client.DescheduleCronJob(context.TODO(), &key)
-	})
+	},
+		nil,
+	)
 
 	if retryErr != nil {
 		return errors.Wrap(retryErr, "error sending Cron AuroraJob De-schedule message to Aurora Scheduler")
@@ -501,7 +538,9 @@ func (c *Client) StartCronJob(key aurora.JobKey) error {
 
 	_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
 		return c.client.StartCronJob(context.TODO(), &key)
-	})
+	},
+		nil,
+	)
 
 	if retryErr != nil {
 		return errors.Wrap(retryErr, "error sending Start Cron AuroraJob  message to Aurora Scheduler")
@@ -516,7 +555,9 @@ func (c *Client) RestartInstances(key aurora.JobKey, instances ...int32) error {
 
 	_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
 		return c.client.RestartShards(context.TODO(), &key, instances)
-	})
+	},
+		nil,
+	)
 
 	if retryErr != nil {
 		return errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler")
@@ -537,16 +578,17 @@ func (c *Client) RestartJob(key aurora.JobKey) error {
 	if len(instanceIds) > 0 {
 		_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
 			return c.client.RestartShards(context.TODO(), &key, instanceIds)
-		})
+		},
+			nil,
+		)
 
 		if retryErr != nil {
 			return errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler")
 		}
 
 		return nil
-	} else {
-		return errors.New("no tasks in the Active state")
 	}
+	return errors.New("no tasks in the Active state")
 }
 
 // Update all tasks under a job configuration. Currently gorealis doesn't support for canary deployments.
@@ -558,34 +600,82 @@ func (c *Client) StartJobUpdate(updateJob *JobUpdate, message string) (*aurora.S
 
 	c.logger.DebugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message)
 
-	resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
-		return c.client.StartJobUpdate(nil, updateJob.request, message)
-	})
+	resp, retryErr := c.thriftCallWithRetries(false,
+		func() (*aurora.Response, error) {
+			return c.client.StartJobUpdate(context.TODO(), updateJob.request, message)
+		},
+		func() (*aurora.Response, bool) {
+			key := updateJob.JobKey()
+			summariesResp, err := c.readonlyClient.GetJobUpdateSummaries(
+				context.TODO(),
+				&aurora.JobUpdateQuery{
+					JobKey:         &key,
+					UpdateStatuses: aurora.ACTIVE_JOB_UPDATE_STATES,
+					Limit:          1,
+				})
+
+			if err != nil {
+				c.logger.Print("verification failed ", err)
+				return nil, false
+			}
+
+			summaries := response.JobUpdateSummaries(summariesResp)
+			if len(summaries) == 0 {
+				return nil, false
+			}
+
+			return &aurora.Response{
+				ResponseCode: aurora.ResponseCode_OK,
+				Result_: &aurora.Result_{
+					StartJobUpdateResult_: &aurora.StartJobUpdateResult_{
+						UpdateSummary: summaries[0],
+						Key:           summaries[0].Key,
+					},
+				},
+			}, true
+		},
+	)
 
 	if retryErr != nil {
+		// A timeout took place when attempting this call, attempt to recover
+		if IsTimeout(retryErr) {
+			return nil, retryErr
+		}
+
 		return nil, errors.Wrap(retryErr, "error sending StartJobUpdate command to Aurora Scheduler")
 	}
 
-	if resp.GetResult_() != nil && resp.GetResult_().GetStartJobUpdateResult_() != nil {
+	if resp != nil && resp.GetResult_() != nil && resp.GetResult_().GetStartJobUpdateResult_() != nil {
 		return resp.GetResult_().GetStartJobUpdateResult_(), nil
 	}
 
 	return nil, errors.New("thrift error: Field in response is nil unexpectedly.")
 }
 
-// Abort AuroraJob Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI.
+// AbortJobUpdate terminates a job update in the scheduler.
+// It requires the updateId which can be obtained on the Aurora web UI.
+// This API is meant to be synchronous. It will attempt to wait until the update transitions to the aborted state.
+// However, if the job update does not transition to the ABORT state an error will be returned.
 func (c *Client) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) error {
 
 	c.logger.DebugPrintf("AbortJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
 
 	_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
 		return c.client.AbortJobUpdate(context.TODO(), &updateKey, message)
-	})
+	},
+		nil,
+	)
 
 	if retryErr != nil {
 		return errors.Wrap(retryErr, "error sending AbortJobUpdate command to Aurora Scheduler")
 	}
-	return nil
+	// Make this call synchronous by  blocking until it job has successfully transitioned to aborted
+	_, err := c.MonitorJobUpdateStatus(
+		updateKey,
+		[]aurora.JobUpdateStatus{aurora.JobUpdateStatus_ABORTED},
+		time.Second*5,
+		time.Minute)
+	return err
 }
 
 // Pause AuroraJob Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.
@@ -605,7 +695,9 @@ func (c *Client) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string)
 
 	_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
 		return c.client.PauseJobUpdate(nil, updateKeyLocal, message)
-	})
+	},
+		nil,
+	)
 
 	if retryErr != nil {
 		return errors.Wrap(retryErr, "error sending PauseJobUpdate command to Aurora Scheduler")
@@ -632,7 +724,9 @@ func (c *Client) ResumeJobUpdate(updateKey aurora.JobUpdateKey, message string)
 
 	_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
 		return c.client.ResumeJobUpdate(context.TODO(), &updateKey, message)
-	})
+	},
+		nil,
+	)
 
 	if retryErr != nil {
 		return errors.Wrap(retryErr, "error sending ResumeJobUpdate command to Aurora Scheduler")
@@ -653,18 +747,18 @@ func (c *Client) PulseJobUpdate(updateKey aurora.JobUpdateKey) (aurora.JobUpdate
 
 	resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
 		return c.client.PulseJobUpdate(context.TODO(), &updateKey)
-	})
+	},
+		nil,
+	)
 
 	if retryErr != nil {
 		return aurora.JobUpdatePulseStatus(0), errors.Wrap(retryErr, "error sending PulseJobUpdate command to Aurora Scheduler")
 	}
 
-	if resp.GetResult_() != nil && resp.GetResult_().GetPulseJobUpdateResult_() != nil {
+	if resp != nil && resp.GetResult_() != nil && resp.GetResult_().GetPulseJobUpdateResult_() != nil {
 		return resp.GetResult_().GetPulseJobUpdateResult_().GetStatus(), nil
-	} else {
-		return aurora.JobUpdatePulseStatus(0), errors.New("thrift error, field was nil unexpectedly")
 	}
-
+	return aurora.JobUpdatePulseStatus(0), errors.New("thrift error, field was nil unexpectedly")
 }
 
 // Scale up the number of instances under a job configuration using the configuration for specific
@@ -681,7 +775,9 @@ func (c *Client) AddInstances(instKey aurora.InstanceKey, count int32) error {
 
 	_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
 		return c.client.AddInstances(context.TODO(), &instKey, count)
-	})
+	},
+		nil,
+	)
 
 	if retryErr != nil {
 		return errors.Wrap(retryErr, "error sending AddInstances command to Aurora Scheduler")
@@ -726,7 +822,9 @@ func (c *Client) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask
 
 	resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
 		return c.client.GetTasksStatus(context.TODO(), query)
-	})
+	},
+		nil,
+	)
 
 	if retryErr != nil {
 		return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status")
@@ -742,7 +840,9 @@ func (c *Client) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.PendingRea
 
 	resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
 		return c.client.GetPendingReason(context.TODO(), query)
-	})
+	},
+		nil,
+	)
 
 	if retryErr != nil {
 		return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for pending Reasons")
@@ -750,21 +850,24 @@ func (c *Client) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.PendingRea
 
 	var result []*aurora.PendingReason
 
-	if resp.GetResult_() != nil {
+	if resp != nil && resp.GetResult_() != nil && resp.GetResult_().GetGetPendingReasonResult_() != nil {
 		result = resp.GetResult_().GetGetPendingReasonResult_().GetReasons()
 	}
 
 	return result, nil
 }
 
-// Get information about task including without a task configuration object
+// GetTasksWithoutConfigs gets information about task including without a task configuration object.
+// This is a more lightweight version of GetTaskStatus but contains less information as a result.
 func (c *Client) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) {
 
 	c.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query)
 
 	resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
 		return c.client.GetTasksWithoutConfigs(context.TODO(), query)
-	})
+	},
+		nil,
+	)
 
 	if retryErr != nil {
 		return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status without configs")
@@ -791,7 +894,9 @@ func (c *Client) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig
 
 	resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
 		return c.client.GetTasksStatus(context.TODO(), taskQ)
-	})
+	},
+		nil,
+	)
 
 	if retryErr != nil {
 		return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task configuration")
@@ -817,17 +922,18 @@ func (c *Client) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) ([]*aurora.
 
 	resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
 		return c.client.GetJobUpdateDetails(context.TODO(), &updateQuery)
-	})
+	},
+		nil,
+	)
 
 	if retryErr != nil {
 		return nil, errors.Wrap(retryErr, "unable to get job update details")
 	}
 
-	if resp.GetResult_() != nil && resp.GetResult_().GetGetJobUpdateDetailsResult_() != nil {
+	if resp != nil && resp.GetResult_() != nil && resp.GetResult_().GetGetJobUpdateDetailsResult_() != nil {
 		return resp.GetResult_().GetGetJobUpdateDetailsResult_().GetDetailsList(), nil
-	} else {
-		return nil, errors.New("unknown Thrift error, field is nil.")
 	}
+	return nil, errors.New("unknown Thrift error, field is nil.")
 }
 
 func (c *Client) RollbackJobUpdate(key aurora.JobUpdateKey, message string) error {
@@ -836,7 +942,9 @@ func (c *Client) RollbackJobUpdate(key aurora.JobUpdateKey, message string) erro
 
 	_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
 		return c.client.RollbackJobUpdate(context.TODO(), &key, message)
-	})
+	},
+		nil,
+	)
 
 	if retryErr != nil {
 		return errors.Wrap(retryErr, "unable to roll back job update")
diff --git a/realis_admin.go b/realis_admin.go
index f2759ea..a1fa50f 100644
--- a/realis_admin.go
+++ b/realis_admin.go
@@ -37,7 +37,9 @@ func (c *Client) DrainHosts(hosts ...string) ([]*aurora.HostStatus, error) {
 
 	resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
 		return c.adminClient.DrainHosts(context.TODO(), drainList)
-	})
+	},
+		nil,
+	)
 
 	if retryErr != nil {
 		return nil, errors.Wrap(retryErr, "unable to recover connection")
@@ -45,9 +47,8 @@ func (c *Client) DrainHosts(hosts ...string) ([]*aurora.HostStatus, error) {
 
 	if resp.GetResult_() != nil && resp.GetResult_().GetDrainHostsResult_() != nil {
 		return resp.GetResult_().GetDrainHostsResult_().GetStatuses(), nil
-	} else {
-		return nil, errors.New("thrift error: Field in response is nil unexpectedly.")
 	}
+	return nil, errors.New("thrift error: Field in response is nil unexpectedly.")
 }
 
 // Start SLA Aware Drain.
@@ -78,7 +79,9 @@ func (c *Client) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, hosts ..
 
 	resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
 		return c.adminClient.SlaDrainHosts(context.TODO(), drainList, policy, timeout)
-	})
+	},
+		nil,
+	)
 
 	if retryErr != nil {
 		return nil, errors.Wrap(retryErr, "unable to recover connection")
@@ -86,9 +89,8 @@ func (c *Client) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, hosts ..
 
 	if resp.GetResult_() != nil && resp.GetResult_().GetDrainHostsResult_() != nil {
 		return resp.GetResult_().GetDrainHostsResult_().GetStatuses(), nil
-	} else {
-		return nil, errors.New("thrift error: Field in response is nil unexpectedly.")
 	}
+	return nil, errors.New("thrift error: Field in response is nil unexpectedly.")
 }
 
 func (c *Client) StartMaintenance(hosts ...string) ([]*aurora.HostStatus, error) {
@@ -104,7 +106,9 @@ func (c *Client) StartMaintenance(hosts ...string) ([]*aurora.HostStatus, error)
 
 	resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
 		return c.adminClient.StartMaintenance(context.TODO(), hostList)
-	})
+	},
+		nil,
+	)
 
 	if retryErr != nil {
 		return nil, errors.Wrap(retryErr, "unable to recover connection")
@@ -112,9 +116,8 @@ func (c *Client) StartMaintenance(hosts ...string) ([]*aurora.HostStatus, error)
 
 	if resp.GetResult_() != nil && resp.GetResult_().GetStartMaintenanceResult_() != nil {
 		return resp.GetResult_().GetStartMaintenanceResult_().GetStatuses(), nil
-	} else {
-		return nil, errors.New("thrift error: Field in response is nil unexpectedly.")
 	}
+	return nil, errors.New("thrift error: Field in response is nil unexpectedly.")
 }
 
 func (c *Client) EndMaintenance(hosts ...string) ([]*aurora.HostStatus, error) {
@@ -130,7 +133,9 @@ func (c *Client) EndMaintenance(hosts ...string) ([]*aurora.HostStatus, error) {
 
 	resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
 		return c.adminClient.EndMaintenance(context.TODO(), hostList)
-	})
+	},
+		nil,
+	)
 
 	if retryErr != nil {
 		return nil, errors.Wrap(retryErr, "unable to recover connection")
@@ -138,9 +143,8 @@ func (c *Client) EndMaintenance(hosts ...string) ([]*aurora.HostStatus, error) {
 
 	if resp.GetResult_() != nil && resp.GetResult_().GetEndMaintenanceResult_() != nil {
 		return resp.GetResult_().GetEndMaintenanceResult_().GetStatuses(), nil
-	} else {
-		return nil, errors.New("thrift error: Field in response is nil unexpectedly.")
 	}
+	return nil, errors.New("thrift error: Field in response is nil unexpectedly.")
 
 }
 
@@ -161,7 +165,9 @@ func (c *Client) MaintenanceStatus(hosts ...string) (*aurora.MaintenanceStatusRe
 	// and continue trying to resend command until we run out of retries.
 	resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
 		return c.adminClient.MaintenanceStatus(context.TODO(), hostList)
-	})
+	},
+		nil,
+	)
 
 	if retryErr != nil {
 		return result, errors.Wrap(retryErr, "unable to recover connection")
@@ -189,7 +195,9 @@ func (c *Client) SetQuota(role string, cpu *float64, ramMb *int64, diskMb *int64
 
 	_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
 		return c.adminClient.SetQuota(context.TODO(), role, quota)
-	})
+	},
+		nil,
+	)
 
 	if retryErr != nil {
 		return errors.Wrap(retryErr, "unable to set role quota")
@@ -203,7 +211,9 @@ func (c *Client) GetQuota(role string) (*aurora.GetQuotaResult_, error) {
 
 	resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
 		return c.adminClient.GetQuota(context.TODO(), role)
-	})
+	},
+		nil,
+	)
 
 	if retryErr != nil {
 		return nil, errors.Wrap(retryErr, "unable to get role quota")
@@ -211,9 +221,8 @@ func (c *Client) GetQuota(role string) (*aurora.GetQuotaResult_, error) {
 
 	if resp.GetResult_() != nil {
 		return resp.GetResult_().GetGetQuotaResult_(), nil
-	} else {
-		return nil, errors.New("thrift error: Field in response is nil unexpectedly.")
 	}
+	return nil, errors.New("thrift error: Field in response is nil unexpectedly.")
 }
 
 // Force Aurora Scheduler to perform a snapshot and write to Mesos log
@@ -221,7 +230,9 @@ func (c *Client) Snapshot() error {
 
 	_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
 		return c.adminClient.Snapshot(context.TODO())
-	})
+	},
+		nil,
+	)
 
 	if retryErr != nil {
 		return errors.Wrap(retryErr, "unable to recover connection")
@@ -235,7 +246,9 @@ func (c *Client) PerformBackup() error {
 
 	_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
 		return c.adminClient.PerformBackup(context.TODO())
-	})
+	},
+		nil,
+	)
 
 	if retryErr != nil {
 		return errors.Wrap(retryErr, "unable to recover connection")
@@ -249,7 +262,9 @@ func (c *Client) ForceImplicitTaskReconciliation() error {
 
 	_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
 		return c.adminClient.TriggerImplicitTaskReconciliation(context.TODO())
-	})
+	},
+		nil,
+	)
 
 	if retryErr != nil {
 		return errors.Wrap(retryErr, "unable to recover connection")
@@ -270,7 +285,9 @@ func (c *Client) ForceExplicitTaskReconciliation(batchSize *int32) error {
 
 	_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
 		return c.adminClient.TriggerExplicitTaskReconciliation(context.TODO(), settings)
-	})
+	},
+		nil,
+	)
 
 	if retryErr != nil {
 		return errors.Wrap(retryErr, "unable to recover connection")
diff --git a/response/response.go b/response/response.go
index 1663b1b..15081ec 100644
--- a/response/response.go
+++ b/response/response.go
@@ -35,6 +35,10 @@ func ScheduleStatusResult(resp *aurora.Response) *aurora.ScheduleStatusResult_ {
 }
 
 func JobUpdateSummaries(resp *aurora.Response) []*aurora.JobUpdateSummary {
+	if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetGetJobUpdateSummariesResult_() == nil {
+		return nil
+	}
+
 	return resp.GetResult_().GetGetJobUpdateSummariesResult_().GetUpdateSummaries()
 }
 
diff --git a/retry.go b/retry.go
index 9860e42..8a05cf5 100644
--- a/retry.go
+++ b/retry.go
@@ -17,10 +17,7 @@ package realis
 import (
 	"io"
 	"math/rand"
-	"net/http"
 	"net/url"
-	"strconv"
-	"strings"
 	"time"
 
 	"github.com/apache/thrift/lib/go/thrift"
@@ -29,9 +26,11 @@ import (
 	"github.com/pkg/errors"
 )
 
+// Backoff determines how the retry mechanism should react after each failure and how many failures it should
+// tolerate.
 type Backoff struct {
 	Duration time.Duration // the base duration
-	Factor   float64       // Duration is multipled by factor each iteration
+	Factor   float64       // Duration is multiplied by a factor each iteration
 	Jitter   float64       // The amount of jitter applied each iteration
 	Steps    int           // Exit with error after this many steps
 }
@@ -53,18 +52,15 @@ func Jitter(duration time.Duration, maxFactor float64) time.Duration {
 // if the loop should be aborted.
 type ConditionFunc func() (done bool, err error)
 
-// Modified version of the Kubernetes exponential-backoff code.
-// ExponentialBackoff repeats a condition check with exponential backoff.
-//
-// It checks the condition up to Steps times, increasing the wait by multiplying
-// the previous duration by Factor.
+// ExponentialBackoff is a modified version of the Kubernetes exponential-backoff code.
+// It repeats a condition check with exponential backoff and checks the condition up to
+// Steps times, increasing the wait by multiplying the previous duration by Factor.
 //
 // If Jitter is greater than zero, a random amount of each duration is added
 // (between duration and duration*(1+jitter)).
 //
 // If the condition never returns true, ErrWaitTimeout is returned. Errors
 // do not cause the function to return.
-
 func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc) error {
 	var err error
 	var ok bool
@@ -98,10 +94,9 @@ func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc)
 			// If the error is temporary, continue retrying.
 			if !IsTemporary(err) {
 				return err
-			} else {
-				// Print out the temporary error we experienced.
-				logger.Println(err)
 			}
+			// Print out the temporary error we experienced.
+			logger.Println(err)
 		}
 	}
 
@@ -112,19 +107,28 @@ func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc)
 	// Provide more information to the user wherever possible
 	if err != nil {
 		return newRetryError(errors.Wrap(err, "ran out of retries"), curStep)
-	} else {
-		return newRetryError(errors.New("ran out of retries"), curStep)
 	}
+
+	return newRetryError(errors.New("ran out of retries"), curStep)
 }
 
 type auroraThriftCall func() (resp *aurora.Response, err error)
 
+// verifyOntimeout defines the type of function that will be used to verify whether a Thirft call to the Scheduler
+// made it to the scheduler or not. In general, these types of functions will have to interact with the scheduler
+// through the very same Thrift API which previously encountered a time out from the client.
+// This means that the functions themselves should be kept to a minimum number of Thrift calls.
+// It should also be noted that this is a best effort mechanism and
+// is likely to fail for the same reasons that the original call failed.
+type verifyOnTimeout func() (*aurora.Response, bool)
+
 // Duplicates the functionality of ExponentialBackoff but is specifically targeted towards ThriftCalls.
-func (c *Client) thriftCallWithRetries(returnOnTimeout bool, thriftCall auroraThriftCall) (*aurora.Response, error) {
+func (c *Client) thriftCallWithRetries(returnOnTimeout bool, thriftCall auroraThriftCall,
+	verifyOnTimeout verifyOnTimeout) (*aurora.Response, error) {
 	var resp *aurora.Response
 	var clientErr error
 	var curStep int
-	var timeouts int
+	timeouts := 0
 
 	backoff := c.config.backoff
 	duration := backoff.Duration
@@ -138,7 +142,10 @@ func (c *Client) thriftCallWithRetries(returnOnTimeout bool, thriftCall auroraTh
 				adjusted = Jitter(duration, backoff.Jitter)
 			}
 
-			c.logger.Printf("A retryable error occurred during thrift call, backing off for %v before retry %v\n", adjusted, curStep)
+			c.logger.Printf(
+				"A retryable error occurred during thrift call, backing off for %v before retry %v",
+				adjusted,
+				curStep)
 
 			time.Sleep(adjusted)
 			duration = time.Duration(float64(duration) * backoff.Factor)
@@ -153,105 +160,132 @@ func (c *Client) thriftCallWithRetries(returnOnTimeout bool, thriftCall auroraTh
 
 			resp, clientErr = thriftCall()
 
-			c.logger.TracePrintf("Aurora Thrift Call ended resp: %v clientErr: %v\n", resp, clientErr)
+			c.logger.TracePrintf("Aurora Thrift Call ended resp: %v clientErr: %v", resp, clientErr)
 		}()
 
 		// Check if our thrift call is returning an error. This is a retryable event as we don't know
 		// if it was caused by network issues.
 		if clientErr != nil {
-
 			// Print out the error to the user
-			c.logger.Printf("Client Error: %v\n", clientErr)
+			c.logger.Printf("Client Error: %v", clientErr)
 
-			// Determine if error is a temporary URL error by going up the stack
-			e, ok := clientErr.(thrift.TTransportException)
-			if ok {
-				c.logger.DebugPrint("Encountered a transport exception")
+			temporary, timedout := isConnectionError(clientErr)
+			if !temporary && c.RealisConfig().failOnPermanentErrors {
+				return nil, errors.Wrap(clientErr, "permanent connection error")
+			}
 
-				// TODO(rdelvalle): Figure out a better way to obtain the error code as this is a very brittle solution
-				// 401 Unauthorized means the wrong username and password were provided
-				if strings.Contains(e.Error(), strconv.Itoa(http.StatusUnauthorized)) {
-					return nil, errors.Wrap(clientErr, "wrong username or password provided")
-				}
-
-				e, ok := e.Err().(*url.Error)
-				if ok {
-					// EOF error occurs when the server closes the read buffer of the client. This is common
-					// when the server is overloaded and should be retried. All other errors that are permanent
-					// will not be retried.
-					if e.Err != io.EOF && !e.Temporary() && c.RealisConfig().failOnPermanentErrors {
-						return nil, errors.Wrap(clientErr, "permanent connection error")
-					}
-					// Corner case where thrift payload was received by Aurora but connection timedout before Aurora was
-					// able to reply. In this case we will return whatever response was received and a TimedOut behaving
-					// error. Users can take special action on a timeout by using IsTimedout and reacting accordingly.
-					if e.Timeout() {
-						timeouts++
-						c.logger.DebugPrintf(
-							"Client closed connection (timedout) %d times before server responded,"+
-								" consider increasing connection timeout",
-							timeouts)
-						if returnOnTimeout {
-							return resp,
-								newTimedoutError(errors.New("client connection closed before server answer"))
-						}
-					}
-				}
+			// There exists a corner case where thrift payload was received by Aurora but
+			// connection timed out before Aurora was able to reply.
+			// Users can take special action on a timeout by using IsTimedout and reacting accordingly
+			// if they have configured the client to return on a timeout.
+			if timedout && returnOnTimeout {
+				return resp, newTimedoutError(errors.New("client connection closed before server answer"))
 			}
 
 			// In the future, reestablish connection should be able to check if it is actually possible
 			// to make a thrift call to Aurora. For now, a reconnect should always lead to a retry.
 			// Ignoring error due to the fact that an error should be retried regardless
-			_ = c.ReestablishConn()
-
-		} else {
-
-			// If there was no client error, but the response is nil, something went wrong.
-			// Ideally, we'll never encounter this but we're placing a safeguard here.
-			if resp == nil {
-				return nil, errors.New("response from aurora is nil")
+			reestablishErr := c.ReestablishConn()
+			if reestablishErr != nil {
+				c.logger.DebugPrintf("error re-establishing connection ", reestablishErr)
 			}
 
-			// Check Response Code from thrift and make a decision to continue retrying or not.
-			switch responseCode := resp.GetResponseCode(); responseCode {
+			// If users did not opt for a return on timeout in order to react to a timedout error,
+			// attempt to verify that the call made it to the scheduler after the connection was re-established.
+			if timedout {
+				timeouts++
+				c.logger.DebugPrintf(
+					"Client closed connection %d times before server responded, "+
+						"consider increasing connection timeout",
+					timeouts)
 
-			// If the thrift call succeeded, stop retrying
-			case aurora.ResponseCode_OK:
-				return resp, nil
-
-			// If the response code is transient, continue retrying
-			case aurora.ResponseCode_ERROR_TRANSIENT:
-				c.logger.Println("Aurora replied with Transient error code, retrying")
-				continue
-
-			// Failure scenarios, these indicate a bad payload or a bad clientConfig. Stop retrying.
-			case aurora.ResponseCode_INVALID_REQUEST,
-				aurora.ResponseCode_ERROR,
-				aurora.ResponseCode_AUTH_FAILED,
-				aurora.ResponseCode_JOB_UPDATING_ERROR:
-				c.logger.Printf("Terminal Response Code %v from Aurora, won't retry\n", resp.GetResponseCode().String())
-				return resp, errors.New(response.CombineMessage(resp))
-
-				// The only case that should fall down to here is a WARNING response code.
-				// It is currently not used as a response in the scheduler so it is unknown how to handle it.
-			default:
-				c.logger.DebugPrintf("unhandled response code %v received from Aurora\n", responseCode)
-				return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String())
+				// Allow caller to provide a function which checks if the original call was successful before
+				// it timed out.
+				if verifyOnTimeout != nil {
+					if verifyResp, ok := verifyOnTimeout(); ok {
+						c.logger.Print("verified that the call went through successfully after a client timeout")
+						// Response here might be different than the original as it is no longer constructed
+						// by the scheduler but mimicked.
+						// This is OK since the scheduler is very unlikely to change responses at this point in its
+						// development cycle but we must be careful to not return an incorrectly constructed response.
+						return verifyResp, nil
+					}
+				}
 			}
+
+			// Retry the thrift payload
+			continue
 		}
 
+		// If there was no client error, but the response is nil, something went wrong.
+		// Ideally, we'll never encounter this but we're placing a safeguard here.
+		if resp == nil {
+			return nil, errors.New("response from aurora is nil")
+		}
+
+		// Check Response Code from thrift and make a decision to continue retrying or not.
+		switch responseCode := resp.GetResponseCode(); responseCode {
+
+		// If the thrift call succeeded, stop retrying
+		case aurora.ResponseCode_OK:
+			return resp, nil
+
+		// If the response code is transient, continue retrying
+		case aurora.ResponseCode_ERROR_TRANSIENT:
+			c.logger.Println("Aurora replied with Transient error code, retrying")
+			continue
+
+		// Failure scenarios, these indicate a bad payload or a bad clientConfig. Stop retrying.
+		case aurora.ResponseCode_INVALID_REQUEST,
+			aurora.ResponseCode_ERROR,
+			aurora.ResponseCode_AUTH_FAILED,
+			aurora.ResponseCode_JOB_UPDATING_ERROR:
+			c.logger.Printf("Terminal Response Code %v from Aurora, won't retry\n", resp.GetResponseCode().String())
+			return resp, errors.New(response.CombineMessage(resp))
+
+			// The only case that should fall down to here is a WARNING response code.
+			// It is currently not used as a response in the scheduler so it is unknown how to handle it.
+		default:
+			c.logger.DebugPrintf("unhandled response code %v received from Aurora\n", responseCode)
+			return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String())
+		}
 	}
 
-	c.logger.DebugPrintf("it took %v retries to complete this operation\n", curStep)
-
 	if curStep > 1 {
-		c.config.logger.Printf("retried this thrift call %d time(s)", curStep)
+		c.config.logger.Printf("this thrift call was retried %d time(s)", curStep)
 	}
 
 	// Provide more information to the user wherever possible.
 	if clientErr != nil {
 		return nil, newRetryError(errors.Wrap(clientErr, "ran out of retries, including latest error"), curStep)
-	} else {
-		return nil, newRetryError(errors.New("ran out of retries"), curStep)
 	}
+
+	return nil, newRetryError(errors.New("ran out of retries"), curStep)
+}
+
+// isConnectionError processes the error received by the client.
+// The return values indicate whether this was determined to be a temporary error
+// and whether it was determined to be a timeout error
+func isConnectionError(err error) (bool, bool) {
+
+	// Determine if error is a temporary URL error by going up the stack
+	transportException, ok := err.(thrift.TTransportException)
+	if !ok {
+		return false, false
+	}
+
+	urlError, ok := transportException.Err().(*url.Error)
+	if !ok {
+		return false, false
+	}
+
+	// EOF error occurs when the server closes the read buffer of the client. This is common
+	// when the server is overloaded and we consider it temporary.
+	// All other which are not temporary as per the member function Temporary(),
+	// are considered not temporary (permanent).
+	if urlError.Err != io.EOF && !urlError.Temporary() {
+		return false, false
+	}
+
+	return true, urlError.Timeout()
 }
diff --git a/util.go b/util.go
index f993aaa..a822b3f 100644
--- a/util.go
+++ b/util.go
@@ -40,7 +40,7 @@ func init() {
 	}
 }
 
-// TerminalJobUpdateStates returns a slice containing all the terminal states an update may end up in.
+// TerminalUpdateStates returns a slice containing all the terminal states an update may be in.
 // This is a function in order to avoid having a slice that can be accidentally mutated.
 func TerminalUpdateStates() []aurora.JobUpdateStatus {
 	return []aurora.JobUpdateStatus{