Backoff mechanism fix (#54)
* Fixing logic that can lead to nil error being returned and retry stopping early. * Fixing possible code path that may lead to an incorrect nil error.
This commit is contained in:
parent
a6b077d1fd
commit
64948c3712
3 changed files with 71 additions and 31 deletions
61
realis.go
61
realis.go
|
@ -34,7 +34,7 @@ import (
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
const VERSION = "1.1.0"
|
const VERSION = "1.2.1"
|
||||||
|
|
||||||
type Realis interface {
|
type Realis interface {
|
||||||
AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error)
|
AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error)
|
||||||
|
@ -290,7 +290,6 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
|
||||||
readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory),
|
readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory),
|
||||||
adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory),
|
adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory),
|
||||||
logger: config.logger}, nil
|
logger: config.logger}, nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetDefaultClusterFromZKUrl(zkurl string) *Cluster {
|
func GetDefaultClusterFromZKUrl(zkurl string) *Cluster {
|
||||||
|
@ -440,7 +439,15 @@ func (r *realisClient) thriftCallHelper(auroraCall auroraThriftCall) (*aurora.Re
|
||||||
// as we can always retry to connect to the scheduler.
|
// as we can always retry to connect to the scheduler.
|
||||||
retryConnErr := r.ReestablishConn()
|
retryConnErr := r.ReestablishConn()
|
||||||
|
|
||||||
return resp, retryConnErr
|
// If we had a connection error, return that as the temporary error
|
||||||
|
// otherwise if we were able to recreate our connection objects without issue
|
||||||
|
// return a temporary error with the client error inside.
|
||||||
|
if retryConnErr != nil {
|
||||||
|
return nil, retryConnErr
|
||||||
|
} else {
|
||||||
|
return nil, NewTemporaryError(cliErr)
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if resp == nil {
|
if resp == nil {
|
||||||
|
@ -553,8 +560,8 @@ func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsResult_, error) {
|
||||||
func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsResult_, error) {
|
|
||||||
var resp *aurora.Response
|
var resp *aurora.Response
|
||||||
var result *aurora.GetJobsResult_
|
var result *aurora.GetJobsResult_
|
||||||
var clientErr error
|
var clientErr error
|
||||||
|
@ -605,7 +612,7 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a
|
||||||
})
|
})
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Kill command to Aurora Scheduler")
|
return nil, errors.Wrap(retryErr, "Error sending Kill command to Aurora Scheduler")
|
||||||
}
|
}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
@ -616,7 +623,7 @@ func (r *realisClient) RealisConfig() *RealisConfig {
|
||||||
|
|
||||||
// Sends a kill message to the scheduler for all active tasks under a job.
|
// Sends a kill message to the scheduler for all active tasks under a job.
|
||||||
func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
|
func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
|
||||||
var clientErr, err error
|
var clientErr error
|
||||||
var resp *aurora.Response
|
var resp *aurora.Response
|
||||||
|
|
||||||
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
|
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
|
||||||
|
@ -633,7 +640,7 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
|
||||||
})
|
})
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
return nil, errors.Wrap(err, retryErr.Error()+": Error sending Kill command to Aurora Scheduler")
|
return nil, errors.Wrap(retryErr, "Error sending Kill command to Aurora Scheduler")
|
||||||
}
|
}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
@ -659,7 +666,7 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
|
||||||
})
|
})
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Create command to Aurora Scheduler")
|
return nil, errors.Wrap(retryErr, "Error sending Create command to Aurora Scheduler")
|
||||||
}
|
}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
@ -700,7 +707,7 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
|
||||||
})
|
})
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending Cron Job Schedule message to Aurora Scheduler")
|
return nil, errors.Wrap(retryErr, "Error sending Cron Job Schedule message to Aurora Scheduler")
|
||||||
}
|
}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
@ -723,15 +730,13 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response,
|
||||||
})
|
})
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending Cron Job De-schedule message to Aurora Scheduler")
|
return nil, errors.Wrap(retryErr, "Error sending Cron Job De-schedule message to Aurora Scheduler")
|
||||||
|
|
||||||
}
|
}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error) {
|
func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error) {
|
||||||
var resp *aurora.Response
|
var resp *aurora.Response
|
||||||
var clientErr error
|
var clientErr error
|
||||||
|
@ -749,7 +754,7 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error
|
||||||
})
|
})
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Start Cron Job message to Aurora Scheduler")
|
return nil, errors.Wrap(retryErr, "Error sending Start Cron Job message to Aurora Scheduler")
|
||||||
}
|
}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
|
|
||||||
|
@ -778,7 +783,7 @@ func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32)
|
||||||
})
|
})
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Restart command to Aurora Scheduler")
|
return nil, errors.Wrap(retryErr, "Error sending Restart command to Aurora Scheduler")
|
||||||
}
|
}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
@ -806,7 +811,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
|
||||||
})
|
})
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Restart command to Aurora Scheduler")
|
return nil, errors.Wrap(retryErr, "Error sending Restart command to Aurora Scheduler")
|
||||||
}
|
}
|
||||||
|
|
||||||
return resp, nil
|
return resp, nil
|
||||||
|
@ -834,7 +839,7 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au
|
||||||
})
|
})
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending StartJobUpdate command to Aurora Scheduler")
|
return nil, errors.Wrap(retryErr, "Error sending StartJobUpdate command to Aurora Scheduler")
|
||||||
}
|
}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
@ -859,7 +864,7 @@ func (r *realisClient) AbortJobUpdate(
|
||||||
})
|
})
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending AbortJobUpdate command to Aurora Scheduler")
|
return nil, errors.Wrap(retryErr, "Error sending AbortJobUpdate command to Aurora Scheduler")
|
||||||
}
|
}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
@ -951,7 +956,7 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a
|
||||||
})
|
})
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending AddInstances command to Aurora Scheduler")
|
return nil, errors.Wrap(retryErr, "Error sending AddInstances command to Aurora Scheduler")
|
||||||
}
|
}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
|
|
||||||
|
@ -998,7 +1003,7 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.S
|
||||||
})
|
})
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error querying Aurora Scheduler for task status")
|
return nil, errors.Wrap(retryErr, "Error querying Aurora Scheduler for task status")
|
||||||
}
|
}
|
||||||
|
|
||||||
return response.ScheduleStatusResult(resp).GetTasks(), nil
|
return response.ScheduleStatusResult(resp).GetTasks(), nil
|
||||||
|
@ -1022,7 +1027,7 @@ func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks []
|
||||||
})
|
})
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error querying Aurora Scheduler for task status without configs")
|
return nil, errors.Wrap(retryErr, "Error querying Aurora Scheduler for task status without configs")
|
||||||
}
|
}
|
||||||
|
|
||||||
return response.ScheduleStatusResult(resp).GetTasks(), nil
|
return response.ScheduleStatusResult(resp).GetTasks(), nil
|
||||||
|
@ -1059,7 +1064,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
|
||||||
})
|
})
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error querying Aurora Scheduler for task configuration")
|
return nil, errors.Wrap(retryErr, "Error querying Aurora Scheduler for task configuration")
|
||||||
}
|
}
|
||||||
|
|
||||||
tasks := response.ScheduleStatusResult(resp).GetTasks()
|
tasks := response.ScheduleStatusResult(resp).GetTasks()
|
||||||
|
@ -1094,7 +1099,7 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur
|
||||||
})
|
})
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
return nil, errors.Wrap(clientErr, retryErr.Error()+": Unable to get job update details")
|
return nil, errors.Wrap(retryErr, "Unable to get job update details")
|
||||||
}
|
}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
|
|
||||||
|
@ -1117,7 +1122,7 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string
|
||||||
})
|
})
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
return nil, errors.Wrap(clientErr, retryErr.Error()+": Unable to roll back job update")
|
return nil, errors.Wrap(retryErr, "Unable to roll back job update")
|
||||||
}
|
}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
@ -1158,7 +1163,7 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr
|
||||||
}
|
}
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
return resp, result, errors.Wrap(clientErr, retryErr.Error()+": Unable to recover connection")
|
return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
|
||||||
}
|
}
|
||||||
|
|
||||||
return resp, result, nil
|
return resp, result, nil
|
||||||
|
@ -1197,7 +1202,7 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror
|
||||||
}
|
}
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
return resp, result, errors.Wrap(clientErr, retryErr.Error()+": Unable to recover connection")
|
return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
|
||||||
}
|
}
|
||||||
|
|
||||||
return resp, result, nil
|
return resp, result, nil
|
||||||
|
@ -1241,7 +1246,7 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au
|
||||||
}
|
}
|
||||||
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
return resp, result, errors.Wrap(clientErr, retryErr.Error()+": Unable to recover connection")
|
return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
|
||||||
}
|
}
|
||||||
|
|
||||||
return resp, result, nil
|
return resp, result, nil
|
||||||
|
|
|
@ -59,6 +59,32 @@ func TestMain(m *testing.M) {
|
||||||
os.Exit(m.Run())
|
os.Exit(m.Run())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestBadEndpoint(t *testing.T) {
|
||||||
|
|
||||||
|
// Attempt to connect to a bad endpoint
|
||||||
|
r, err := realis.NewRealisClient(realis.SchedulerUrl("http://192.168.33.7:8081/scheduler/"),
|
||||||
|
realis.TimeoutMS(200),
|
||||||
|
realis.BackOff(&realis.Backoff{ // Reduce penalties for this test to make it quick
|
||||||
|
Steps: 5,
|
||||||
|
Duration: 1 * time.Second,
|
||||||
|
Factor: 1.0,
|
||||||
|
Jitter: 0.1}),
|
||||||
|
)
|
||||||
|
defer r.Close()
|
||||||
|
|
||||||
|
taskQ := &aurora.TaskQuery{
|
||||||
|
Role: "no",
|
||||||
|
Environment: "task",
|
||||||
|
JobName: "here",
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = r.GetTasksWithoutConfigs(taskQ)
|
||||||
|
|
||||||
|
// Check that we do error out of retrying
|
||||||
|
assert.Error(t, err)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func TestLeaderFromZK(t *testing.T) {
|
func TestLeaderFromZK(t *testing.T) {
|
||||||
cluster := realis.GetDefaultClusterFromZKUrl("192.168.33.7:2181")
|
cluster := realis.GetDefaultClusterFromZKUrl("192.168.33.7:2181")
|
||||||
url, err := realis.LeaderFromZK(*cluster)
|
url, err := realis.LeaderFromZK(*cluster)
|
||||||
|
|
15
retry.go
15
retry.go
|
@ -17,10 +17,11 @@ limitations under the License.
|
||||||
package realis
|
package realis
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Jitter returns a time.Duration between duration and duration + maxFactor *
|
// Jitter returns a time.Duration between duration and duration + maxFactor *
|
||||||
|
@ -52,6 +53,8 @@ type ConditionFunc func() (done bool, err error)
|
||||||
// If the condition never returns true, ErrWaitTimeout is returned. All other
|
// If the condition never returns true, ErrWaitTimeout is returned. All other
|
||||||
// errors terminate immediately.
|
// errors terminate immediately.
|
||||||
func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error {
|
func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error {
|
||||||
|
var err error
|
||||||
|
var ok bool
|
||||||
duration := backoff.Duration
|
duration := backoff.Duration
|
||||||
for i := 0; i < backoff.Steps; i++ {
|
for i := 0; i < backoff.Steps; i++ {
|
||||||
if i != 0 {
|
if i != 0 {
|
||||||
|
@ -63,7 +66,7 @@ func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error {
|
||||||
duration = time.Duration(float64(duration) * backoff.Factor)
|
duration = time.Duration(float64(duration) * backoff.Factor)
|
||||||
}
|
}
|
||||||
|
|
||||||
ok, err := condition()
|
ok, err = condition()
|
||||||
|
|
||||||
// If the function executed says it succeeded, stop retrying
|
// If the function executed says it succeeded, stop retrying
|
||||||
if ok {
|
if ok {
|
||||||
|
@ -78,5 +81,11 @@ func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
return NewTimeoutError(errors.New("Timed out while retrying"))
|
|
||||||
|
// Provide more information to the user wherever possible
|
||||||
|
if err != nil {
|
||||||
|
return NewTimeoutError(errors.Wrap(err, "Timed out while retrying"))
|
||||||
|
} else {
|
||||||
|
return NewTimeoutError(errors.New("Timed out while retrying"))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue