remove exponential backoffs from monitors

This commit is contained in:
Mothiki 2017-11-02 20:33:49 -07:00
parent 7538f366ea
commit 0e12d273af
4 changed files with 229 additions and 251 deletions

1
.gitignore vendored
View file

@ -6,6 +6,7 @@
# Folders # Folders
_obj _obj
_test _test
.idea
# Architecture specific extensions/prefixes # Architecture specific extensions/prefixes
*.[568vq] *.[568vq]

View file

@ -17,7 +17,6 @@ package realis
import ( import (
"fmt" "fmt"
"strings"
"time" "time"
"github.com/paypal/gorealis/gen-go/apache/aurora" "github.com/paypal/gorealis/gen-go/apache/aurora"
@ -28,6 +27,7 @@ import (
const ( const (
UpdateFailed = "update failed" UpdateFailed = "update failed"
RolledBack = "update rolled back" RolledBack = "update rolled back"
Timeout = "timeout"
) )
type Monitor struct { type Monitor struct {
@ -41,84 +41,86 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout
Key: &updateKey, Key: &updateKey,
Limit: 1, Limit: 1,
} }
ticker := time.NewTicker(time.Second * time.Duration(interval))
defer ticker.Stop()
timer := time.NewTimer(time.Second * time.Duration(timeout))
defer timer.Stop()
var cliErr error var cliErr error
var respDetail *aurora.Response var respDetail *aurora.Response
timedout := false
retryErr := ExponentialBackoff(*m.Client.RealisConfig().backoff, func() (bool, error) { for {
respDetail, cliErr = CheckAndRetryConn(m.Client, func() (*aurora.Response, error) { select {
return m.Client.JobUpdateDetails(updateQ) case <-ticker.C:
}) respDetail, cliErr = m.Client.JobUpdateDetails(updateQ)
if cliErr == RetryConnErr { if cliErr != nil {
return false, nil return false, cliErr
} else {
return false, cliErr
}
updateDetail := response.JobUpdateDetails(respDetail)
if len(updateDetail) == 0 {
fmt.Println("No update found")
return false, errors.New("No update found for " + updateKey.String())
}
status := updateDetail[0].Update.Summary.State.Status
if _, ok := aurora.ACTIVE_JOB_UPDATE_STATES[status]; !ok {
// Rolled forward is the only state in which an update has been successfully updated
// if we encounter an inactive state and it is not at rolled forward, update failed
switch status {
case aurora.JobUpdateStatus_ROLLED_FORWARD:
fmt.Println("Update succeded")
return true, nil
case aurora.JobUpdateStatus_FAILED:
fmt.Println("Update failed")
return false, errors.New(UpdateFailed)
case aurora.JobUpdateStatus_ROLLED_BACK:
fmt.Println("rolled back")
return false, errors.New(RolledBack)
default:
return false, nil
} }
updateDetail := response.JobUpdateDetails(respDetail)
if len(updateDetail) == 0 {
fmt.Println("No update found")
return false, errors.New("No update found for " + updateKey.String())
}
status := updateDetail[0].Update.Summary.State.Status
if _, ok := aurora.ACTIVE_JOB_UPDATE_STATES[status]; !ok {
// Rolled forward is the only state in which an update has been successfully updated
// if we encounter an inactive state and it is not at rolled forward, update failed
switch status {
case aurora.JobUpdateStatus_ROLLED_FORWARD:
fmt.Println("Update succeded")
return true, nil
case aurora.JobUpdateStatus_FAILED:
fmt.Println("Update failed")
return false, errors.New(UpdateFailed)
case aurora.JobUpdateStatus_ROLLED_BACK:
fmt.Println("rolled back")
return false, errors.New(RolledBack)
default:
return false, nil
}
}
case <-timer.C:
timedout = true
}
if timedout {
break
} }
return false, nil
})
if retryErr != nil {
return false, errors.Wrap(cliErr, retryErr.Error())
} }
return true, nil return false, errors.New(Timeout)
} }
func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval int, timeout int) (bool, error) { func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval int, timeout int) (bool, error) {
var cliErr error var cliErr error
var live map[int32]bool var live map[int32]bool
ticker := time.NewTicker(time.Second * time.Duration(interval))
defer ticker.Stop()
timer := time.NewTimer(time.Second * time.Duration(timeout))
defer timer.Stop()
retryErr := ExponentialBackoff(*m.Client.RealisConfig().backoff, func() (bool, error) { timedout := false
live, cliErr = m.Client.GetInstanceIds(key, aurora.LIVE_STATES) for {
if strings.Contains(cliErr.Error(), ConnRefusedErr) || strings.Contains(cliErr.Error(), NoLeaderFoundErr) { select {
// TODO try this condition only if the error is connection related case <-ticker.C:
conErr := m.Client.ReestablishConn() live, cliErr = m.Client.GetInstanceIds(key, aurora.LIVE_STATES)
if conErr != nil {
// TODO: identify specific type of connection errors if cliErr != nil {
return false, nil return false, errors.Wrap(cliErr, "Unable to communicate with Aurora")
} }
return false, nil if len(live) == int(instances) {
return true, nil
}
case <-timer.C:
timedout = true
} }
if cliErr != nil { if timedout {
return false, errors.Wrap(cliErr, "Unable to communicate with Aurora") break
} }
if len(live) == int(instances) {
return true, nil
}
return false, nil
})
if cliErr != nil {
return false, cliErr
} }
if retryErr != nil { return false, errors.New(Timeout)
return false, retryErr
}
return true, nil
} }
// Monitor host status until all hosts match the status provided. Returns a map where the value is true if the host // Monitor host status until all hosts match the status provided. Returns a map where the value is true if the host
@ -174,5 +176,5 @@ func (m *Monitor) HostMaintenance(hosts []string, modes []aurora.MaintenanceMode
hostResult[host] = false hostResult[host] = false
} }
return hostResult, errors.New("Timed out") return hostResult, errors.New(Timeout)
} }

334
realis.go
View file

@ -16,13 +16,13 @@
package realis package realis
import ( import (
"crypto/tls"
"encoding/base64" "encoding/base64"
"fmt"
"net/http" "net/http"
"net/http/cookiejar" "net/http/cookiejar"
"time" "time"
"fmt"
"math/rand" "math/rand"
"git.apache.org/thrift.git/lib/go/thrift" "git.apache.org/thrift.git/lib/go/thrift"
@ -307,7 +307,7 @@ func GetDefaultClusterFromZKUrl(zkurl string) *Cluster {
//This api would create default cluster object.. //This api would create default cluster object..
func NewDefaultClientUsingZKUrl(zkUrl, user, passwd string) (Realis, error) { func NewDefaultClientUsingZKUrl(zkUrl, user, passwd string) (Realis, error) {
fmt.Println(" zkUrl: %s", zkUrl) fmt.Printf(" zkUrl: %s\n", zkUrl)
cluster := GetDefaultClusterFromZKUrl(zkUrl) cluster := GetDefaultClusterFromZKUrl(zkUrl)
url, err := LeaderFromZK(*cluster) url, err := LeaderFromZK(*cluster)
@ -369,9 +369,11 @@ func defaultTTransport(urlstr string, timeoutms int) (thrift.TTransport, error)
if err != nil { if err != nil {
return &thrift.THttpClient{}, errors.Wrap(err, "Error creating Cookie Jar") return &thrift.THttpClient{}, errors.Wrap(err, "Error creating Cookie Jar")
} }
transport := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
trans, err := thrift.NewTHttpPostClientWithOptions(urlstr+"/api", trans, err := thrift.NewTHttpPostClientWithOptions(urlstr+"/api",
thrift.THttpClientOptions{Client: &http.Client{Timeout: time.Millisecond * time.Duration(timeoutms), Jar: jar}}) thrift.THttpClientOptions{Client: &http.Client{Timeout: time.Millisecond * time.Duration(timeoutms), Transport: transport, Jar: jar}})
if err != nil { if err != nil {
return &thrift.THttpClient{}, errors.Wrap(err, "Error creating transport") return &thrift.THttpClient{}, errors.Wrap(err, "Error creating transport")
@ -527,9 +529,10 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) { resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
return r.client.GetTasksWithoutConfigs(taskQ) return r.client.GetTasksWithoutConfigs(taskQ)
}) })
if clientErr == RetryConnErr { if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil return false, nil
} else { }
if clientErr != nil {
return false, clientErr return false, clientErr
} }
return true, nil return true, nil
@ -559,9 +562,10 @@ func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) { resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
return r.readonlyClient.GetJobUpdateSummaries(jobUpdateQuery) return r.readonlyClient.GetJobUpdateSummaries(jobUpdateQuery)
}) })
if clientErr == RetryConnErr { if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil return false, nil
} else { }
if clientErr != nil {
return false, clientErr return false, clientErr
} }
return true, nil return true, nil
@ -589,9 +593,10 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) { resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
return r.client.KillTasks(key, instanceIds) return r.client.KillTasks(key, instanceIds)
}) })
if clientErr == RetryConnErr { if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil return false, nil
} else { }
if clientErr != nil {
return false, clientErr return false, clientErr
} }
return true, nil return true, nil
@ -623,9 +628,10 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) { resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
return r.client.KillTasks(key, instanceIds) return r.client.KillTasks(key, instanceIds)
}) })
if clientErr == RetryConnErr { if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil return false, nil
} else { }
if clientErr != nil {
return false, clientErr return false, clientErr
} }
return true, nil return true, nil
@ -647,9 +653,10 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) { resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
return r.client.CreateJob(auroraJob.JobConfig()) return r.client.CreateJob(auroraJob.JobConfig())
}) })
if clientErr == RetryConnErr { if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil return false, nil
} else { }
if clientErr != nil {
return false, clientErr return false, clientErr
} }
return true, nil return true, nil
@ -670,9 +677,10 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) { resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
return r.client.ScheduleCronJob(auroraJob.JobConfig()) return r.client.ScheduleCronJob(auroraJob.JobConfig())
}) })
if clientErr == RetryConnErr { if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil return false, nil
} else { }
if clientErr != nil {
return false, clientErr return false, clientErr
} }
return true, nil return true, nil
@ -693,9 +701,10 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response,
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) { resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
return r.client.DescheduleCronJob(key) return r.client.DescheduleCronJob(key)
}) })
if clientErr == RetryConnErr { if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil return false, nil
} else { }
if clientErr != nil {
return false, clientErr return false, clientErr
} }
return true, nil return true, nil
@ -716,9 +725,10 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) { resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
return r.client.StartCronJob(key) return r.client.StartCronJob(key)
}) })
if clientErr == RetryConnErr { if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil return false, nil
} else { }
if clientErr != nil {
return false, clientErr return false, clientErr
} }
return true, nil return true, nil
@ -745,9 +755,10 @@ func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32)
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) { resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
return r.client.RestartShards(key, instanceIds) return r.client.RestartShards(key, instanceIds)
}) })
if clientErr == RetryConnErr { if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil return false, nil
} else { }
if clientErr != nil {
return false, clientErr return false, clientErr
} }
return true, nil return true, nil
@ -773,9 +784,10 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) { resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
return r.client.RestartShards(key, instanceIds) return r.client.RestartShards(key, instanceIds)
}) })
if clientErr == RetryConnErr { if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil return false, nil
} else { }
if clientErr != nil {
return false, clientErr return false, clientErr
} }
return true, nil return true, nil
@ -800,9 +812,10 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) { resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
return r.client.StartJobUpdate(updateJob.req, message) return r.client.StartJobUpdate(updateJob.req, message)
}) })
if clientErr == RetryConnErr { if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil return false, nil
} else { }
if clientErr != nil {
return false, clientErr return false, clientErr
} }
return true, nil return true, nil
@ -819,20 +832,23 @@ func (r *realisClient) AbortJobUpdate(
updateKey aurora.JobUpdateKey, updateKey aurora.JobUpdateKey,
message string) (*aurora.Response, error) { message string) (*aurora.Response, error) {
var resp *aurora.Response var resp *aurora.Response
var err error var clientErr error
defaultBackoff := r.config.backoff
xerr := ExponentialBackoff(*defaultBackoff, func() (bool, error) { retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, err = r.client.AbortJobUpdate(&updateKey, message) resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
if err != nil { return r.client.AbortJobUpdate(&updateKey, message)
err1 := r.ReestablishConn() })
if err1 != nil { if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil return false, nil
} }
if clientErr != nil {
return false, clientErr
} }
return true, nil return true, nil
}) })
if err != nil {
return nil, errors.Wrap(err, xerr.Error()+"Error sending AbortJobUpdate command to Aurora Scheduler") if clientErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending AbortJobUpdate command to Aurora Scheduler")
} }
return response.ResponseCodeCheck(resp) return response.ResponseCodeCheck(resp)
} }
@ -842,21 +858,23 @@ func (r *realisClient) AbortJobUpdate(
func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) { func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) {
var resp *aurora.Response var resp *aurora.Response
var err error var clientErr error
defaultBackoff := r.config.backoff retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
xerr := ExponentialBackoff(*defaultBackoff, func() (bool, error) { resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, err = r.client.AddInstances(&instKey, count) return r.client.AddInstances(&instKey, count)
if err != nil { })
err1 := r.ReestablishConn() if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
if err1 != nil { return false, nil
return false, nil }
} if clientErr != nil {
return false, clientErr
} }
return true, nil return true, nil
}) })
if err != nil {
return nil, errors.Wrap(err, xerr.Error()+"Error sending AddInstances command to Aurora Scheduler") if clientErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending AddInstances command to Aurora Scheduler")
} }
return response.ResponseCodeCheck(resp) return response.ResponseCodeCheck(resp)
@ -888,23 +906,23 @@ func (r *realisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora
func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) { func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) {
var resp *aurora.Response var resp *aurora.Response
var err error var clientErr error
defaultBackoff := r.config.backoff retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
xerr := ExponentialBackoff(*defaultBackoff, func() (bool, error) { resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
start: return r.client.GetTasksStatus(query)
resp, err = r.client.GetTasksStatus(query) })
if err != nil { if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
err1 := r.ReestablishConn() return false, nil
if err1 != nil { }
return false, nil if clientErr != nil {
} return false, clientErr
goto start
} }
return true, nil return true, nil
}) })
if err != nil {
return nil, errors.Wrap(err, xerr.Error()+"Error querying Aurora Scheduler for task status") if clientErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+"Error querying Aurora Scheduler for task status")
} }
//Check for response code.. //Check for response code..
if resp.GetResponseCode() != aurora.ResponseCode_OK { if resp.GetResponseCode() != aurora.ResponseCode_OK {
@ -917,23 +935,23 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.S
// Get information about task including without a task configuration object // Get information about task including without a task configuration object
func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) { func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) {
var resp *aurora.Response var resp *aurora.Response
var err error var clientErr error
defaultBackoff := r.config.backoff retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
xerr := ExponentialBackoff(*defaultBackoff, func() (bool, error) { resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
start: return r.client.GetTasksWithoutConfigs(query)
resp, err = r.client.GetTasksWithoutConfigs(query) })
if err != nil { if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
err1 := r.ReestablishConn() return false, nil
if err1 != nil { }
return false, nil if clientErr != nil {
} return false, clientErr
goto start
} }
return true, nil return true, nil
}) })
if err != nil {
return nil, errors.Wrap(err, xerr.Error()+"Error querying Aurora Scheduler for task status without configs") if clientErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+"Error querying Aurora Scheduler for task status without configs")
} }
//Check for response code.. //Check for response code..
if resp.GetResponseCode() != aurora.ResponseCode_OK { if resp.GetResponseCode() != aurora.ResponseCode_OK {
@ -956,21 +974,23 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
Statuses: aurora.ACTIVE_STATES} Statuses: aurora.ACTIVE_STATES}
var resp *aurora.Response var resp *aurora.Response
var err error var clientErr error
defaultBackoff := r.config.backoff retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
xerr := ExponentialBackoff(*defaultBackoff, func() (bool, error) { resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, err = r.client.GetTasksStatus(taskQ) return r.client.GetTasksStatus(taskQ)
if err != nil { })
err1 := r.ReestablishConn() if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
if err1 != nil { return false, nil
return false, nil }
} if clientErr != nil {
return false, clientErr
} }
return true, nil return true, nil
}) })
if err != nil {
return nil, errors.Wrap(err, xerr.Error()+"Error querying Aurora Scheduler for task configuration") if clientErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+"Error querying Aurora Scheduler for task configuration")
} }
//Check for response code.. //Check for response code..
if resp.GetResponseCode() != aurora.ResponseCode_OK { if resp.GetResponseCode() != aurora.ResponseCode_OK {
@ -994,21 +1014,23 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aurora.Response, error) { func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aurora.Response, error) {
var resp *aurora.Response var resp *aurora.Response
var err error var clientErr error
defaultBackoff := r.config.backoff retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
xerr := ExponentialBackoff(*defaultBackoff, func() (bool, error) { resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, err = r.client.GetJobUpdateDetails(&updateQuery) return r.client.GetJobUpdateDetails(&updateQuery)
if err != nil { })
err1 := r.ReestablishConn() if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
if err1 != nil { return false, nil
return false, nil }
} if clientErr != nil {
return false, clientErr
} }
return true, nil return true, nil
}) })
if err != nil {
return nil, errors.Wrap(err, xerr.Error()+"Unable to get job update details") if clientErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+"Unable to get job update details")
} }
return response.ResponseCodeCheck(resp) return response.ResponseCodeCheck(resp)
@ -1016,21 +1038,23 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur
func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) { func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) {
var resp *aurora.Response var resp *aurora.Response
var err error var clientErr error
defaultBackoff := r.config.backoff retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
xerr := ExponentialBackoff(*defaultBackoff, func() (bool, error) { resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, err = r.client.RollbackJobUpdate(&key, message) return r.client.RollbackJobUpdate(&key, message)
if err != nil { })
err1 := r.ReestablishConn() if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
if err1 != nil { return false, nil
return false, nil }
} if clientErr != nil {
return false, clientErr
} }
return true, nil return true, nil
}) })
if err != nil {
return nil, errors.Wrap(err, xerr.Error()+"Unable to roll back job update") if clientErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+"Unable to roll back job update")
} }
return response.ResponseCodeCheck(resp) return response.ResponseCodeCheck(resp)
} }
@ -1042,7 +1066,7 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr
var resp *aurora.Response var resp *aurora.Response
var result *aurora.DrainHostsResult_ var result *aurora.DrainHostsResult_
var returnErr, clientErr, payloadErr error var clientErr error
if len(hosts) == 0 { if len(hosts) == 0 {
return nil, nil, errors.New("no hosts provided to drain") return nil, nil, errors.New("no hosts provided to drain")
@ -1054,50 +1078,27 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr
drainList.HostNames[host] = true drainList.HostNames[host] = true
} }
retryErr := ExponentialBackoff(defaultBackoff, func() (bool, error) { retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
// Send thrift call, if we have a thrift send error, attempt to reconnect return r.adminClient.DrainHosts(drainList)
// and continue trying to resend command })
if resp, clientErr = r.adminClient.DrainHosts(drainList); clientErr != nil { if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
// Experienced an connection error
err1 := r.ReestablishConn()
if err1 != nil {
fmt.Println("error in re-establishing connection: ", err1)
}
return false, nil return false, nil
} }
if clientErr != nil {
// If error is NOT due to connection return false, clientErr
if _, payloadErr = response.ResponseCodeCheck(resp); payloadErr != nil {
// TODO(rdelvalle): an leader election may cause the response to have
// failed when it should have succeeded. Retry everything for now until
// we figure out a more concrete fix.
return false, nil
} }
// Successful call
return true, nil return true, nil
}) })
if clientErr != nil {
return nil, nil, errors.Wrap(clientErr, retryErr.Error()+"Unable to recover connection")
}
if resp != nil && resp.GetResult_() != nil { if resp != nil && resp.GetResult_() != nil {
result = resp.GetResult_().GetDrainHostsResult_() result = resp.GetResult_().GetDrainHostsResult_()
} }
// Prioritize returning a bad payload error over a client error as a bad payload error indicates
// a deeper issue
if payloadErr != nil {
returnErr = payloadErr
} else {
returnErr = clientErr
}
// Timed out on retries. *Note that when we fix the unexpected errors with a correct payload,
// this will can become either a timeout error or a payload error
if retryErr != nil {
return resp, result, errors.Wrap(returnErr, "Unable to recover connection")
}
return resp, result, nil return resp, result, nil
} }
@ -1105,7 +1106,7 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror
var resp *aurora.Response var resp *aurora.Response
var result *aurora.EndMaintenanceResult_ var result *aurora.EndMaintenanceResult_
var returnErr, clientErr, payloadErr error var clientErr error
if len(hosts) == 0 { if len(hosts) == 0 {
return nil, nil, errors.New("no hosts provided to end maintenance on") return nil, nil, errors.New("no hosts provided to end maintenance on")
@ -1117,50 +1118,27 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror
hostList.HostNames[host] = true hostList.HostNames[host] = true
} }
retryErr := ExponentialBackoff(defaultBackoff, func() (bool, error) { retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
// Send thrift call, if we have a thrift send error, attempt to reconnect return r.adminClient.EndMaintenance(hostList)
// and continue trying to resend command })
if resp, clientErr = r.adminClient.EndMaintenance(hostList); clientErr != nil { if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
// Experienced an connection error
err1 := r.ReestablishConn()
if err1 != nil {
fmt.Println("error in re-establishing connection: ", err1)
}
return false, nil return false, nil
} }
if clientErr != nil {
// If error is NOT due to connection return false, clientErr
if _, payloadErr = response.ResponseCodeCheck(resp); payloadErr != nil {
// TODO(rdelvalle): an leader election may cause the response to have
// failed when it should have succeeded. Retry everything for now until
// we figure out a more concrete fix.
return false, nil
} }
// Successful call
return true, nil return true, nil
}) })
if clientErr != nil {
return nil, nil, errors.Wrap(clientErr, retryErr.Error()+"Unable to recover connection")
}
if resp != nil && resp.GetResult_() != nil { if resp != nil && resp.GetResult_() != nil {
result = resp.GetResult_().GetEndMaintenanceResult_() result = resp.GetResult_().GetEndMaintenanceResult_()
} }
// Prioritize returning a bad payload error over a client error as a bad payload error indicates
// a deeper issue
if payloadErr != nil {
returnErr = payloadErr
} else {
returnErr = clientErr
}
// Timed out on retries. *Note that when we fix the unexpected errors with a correct payload,
// this will can become either a timeout error or a payload error
if retryErr != nil {
return resp, result, errors.Wrap(returnErr, "Unable to recover connection")
}
return resp, result, nil return resp, result, nil
} }

View file

@ -37,7 +37,7 @@ var RetryConnErr = errors.New("error occured during with aurora retrying")
// if the loop should be aborted. // if the loop should be aborted.
type ConditionFunc func() (done bool, err error) type ConditionFunc func() (done bool, err error)
type AuroraFunc func() (resp *aurora.Response, err error) type AuroraThriftCall func() (resp *aurora.Response, err error)
// ExponentialBackoff repeats a condition check with exponential backoff. // ExponentialBackoff repeats a condition check with exponential backoff.
// //
@ -70,16 +70,13 @@ func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error {
// CheckAndRetryConn function takes realis client and a trhift API function to call and returns response and error // CheckAndRetryConn function takes realis client and a trhift API function to call and returns response and error
// If Error from the APi call is Retry able . THe functions re establishes the connection with aurora by getting the latest aurora master from zookeeper. // If Error from the APi call is Retry able . THe functions re establishes the connection with aurora by getting the latest aurora master from zookeeper.
// If Error is retyable return resp and RetryConnErr error. // If Error is retyable return resp and RetryConnErr error.
func CheckAndRetryConn(r Realis, aurorajob AuroraFunc) (*aurora.Response, error) { func CheckAndRetryConn(r Realis, auroraCall AuroraThriftCall) (*aurora.Response, error) {
resp, cliErr := aurorajob() resp, cliErr := auroraCall()
if strings.Contains(cliErr.Error(), ConnRefusedErr) || strings.Contains(cliErr.Error(), NoLeaderFoundErr) { if cliErr != nil && (strings.Contains(cliErr.Error(), ConnRefusedErr) || strings.Contains(cliErr.Error(), NoLeaderFoundErr)) {
conErr := r.ReestablishConn() r.ReestablishConn()
if conErr != nil {
return resp, RetryConnErr
}
return resp, RetryConnErr return resp, RetryConnErr
} }
if resp != nil && resp.GetResponseCode() != aurora.ResponseCode_ERROR_TRANSIENT { if resp != nil && resp.GetResponseCode() == aurora.ResponseCode_ERROR_TRANSIENT {
return resp, RetryConnErr return resp, RetryConnErr
} }
return resp, cliErr return resp, cliErr