Added a low cost thrift call (GetTasksWithoutConfigs) to the new client function. If this call fails, along with all the retries, then the configuration given is invalid. We do this to fail earlier than the first thrift call that the user makes.
This commit is contained in:
parent
8bd3957247
commit
6ee8f1454e
1 changed files with 55 additions and 28 deletions
83
realis.go
83
realis.go
|
@ -34,7 +34,7 @@ import (
|
|||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
const VERSION = "1.1.0"
|
||||
const VERSION = "1.2.1"
|
||||
|
||||
type Realis interface {
|
||||
AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error)
|
||||
|
@ -280,13 +280,35 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
|
|||
AddBasicAuth(config, config.username, config.password)
|
||||
}
|
||||
|
||||
return &realisClient{
|
||||
client := &realisClient{
|
||||
config: config,
|
||||
client: aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory),
|
||||
readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory),
|
||||
adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory),
|
||||
logger: config.logger}, nil
|
||||
logger: config.logger}
|
||||
|
||||
// Verify that the connection is set up correctly and that the endpoint exists
|
||||
// by making a low cost thrift call using the newly created client. If there is an error, return an empty client.
|
||||
taskQ := &aurora.TaskQuery{
|
||||
Role: "test",
|
||||
Environment: "test",
|
||||
JobName: "test",
|
||||
}
|
||||
|
||||
retryErr := ExponentialBackoff(*config.backoff, func() (bool, error) {
|
||||
resp, err := client.readonlyClient.GetTasksWithoutConfigs(taskQ)
|
||||
if resp == nil || err != nil {
|
||||
return false, NewTemporaryError(err)
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
|
||||
// Unable to successfully make a call even after retrying, return an error
|
||||
if retryErr != nil {
|
||||
return nil, errors.Errorf("Unable to reach scheduler at %v", url)
|
||||
}
|
||||
|
||||
return client, err
|
||||
}
|
||||
|
||||
func GetDefaultClusterFromZKUrl(zkurl string) *Cluster {
|
||||
|
@ -436,7 +458,15 @@ func (r *realisClient) thriftCallHelper(auroraCall auroraThriftCall) (*aurora.Re
|
|||
// as we can always retry to connect to the scheduler.
|
||||
retryConnErr := r.ReestablishConn()
|
||||
|
||||
return resp, retryConnErr
|
||||
// If we had a connection error, return that as the temporary error
|
||||
// otherwise if we were able to recreate our connection objects without issue
|
||||
// return a temporary error with the client error inside.
|
||||
if retryConnErr != nil {
|
||||
return nil, retryConnErr
|
||||
} else {
|
||||
return nil, NewTemporaryError(cliErr)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if resp == nil {
|
||||
|
@ -549,7 +579,7 @@ func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsResult_, error) {
|
||||
func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsResult_, error) {
|
||||
var resp *aurora.Response
|
||||
var result *aurora.GetJobsResult_
|
||||
var clientErr error
|
||||
|
@ -600,7 +630,7 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a
|
|||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Kill command to Aurora Scheduler")
|
||||
return nil, errors.Wrap(retryErr, "Error sending Kill command to Aurora Scheduler")
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
@ -611,7 +641,7 @@ func (r *realisClient) RealisConfig() *RealisConfig {
|
|||
|
||||
// Sends a kill message to the scheduler for all active tasks under a job.
|
||||
func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
|
||||
var clientErr, err error
|
||||
var clientErr error
|
||||
var resp *aurora.Response
|
||||
|
||||
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
|
||||
|
@ -628,7 +658,7 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
|
|||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(err, retryErr.Error()+": Error sending Kill command to Aurora Scheduler")
|
||||
return nil, errors.Wrap(retryErr, "Error sending Kill command to Aurora Scheduler")
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
@ -654,7 +684,7 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
|
|||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Create command to Aurora Scheduler")
|
||||
return nil, errors.Wrap(retryErr, "Error sending Create command to Aurora Scheduler")
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
@ -695,7 +725,7 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
|
|||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Cron Job Schedule message to Aurora Scheduler")
|
||||
return nil, errors.Wrap(retryErr, "Error sending Cron Job Schedule message to Aurora Scheduler")
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
@ -718,15 +748,12 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response,
|
|||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Cron Job De-schedule message to Aurora Scheduler")
|
||||
return nil, errors.Wrap(retryErr, "Error sending Cron Job De-schedule message to Aurora Scheduler")
|
||||
}
|
||||
return resp, nil
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error) {
|
||||
var resp *aurora.Response
|
||||
var clientErr error
|
||||
|
@ -744,7 +771,7 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error
|
|||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Start Cron Job message to Aurora Scheduler")
|
||||
return nil, errors.Wrap(retryErr, "Error sending Start Cron Job message to Aurora Scheduler")
|
||||
}
|
||||
return resp, nil
|
||||
|
||||
|
@ -773,7 +800,7 @@ func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32)
|
|||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Restart command to Aurora Scheduler")
|
||||
return nil, errors.Wrap(retryErr, "Error sending Restart command to Aurora Scheduler")
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
@ -801,7 +828,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
|
|||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Restart command to Aurora Scheduler")
|
||||
return nil, errors.Wrap(retryErr, "Error sending Restart command to Aurora Scheduler")
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
|
@ -829,7 +856,7 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au
|
|||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending StartJobUpdate command to Aurora Scheduler")
|
||||
return nil, errors.Wrap(retryErr, "Error sending StartJobUpdate command to Aurora Scheduler")
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
@ -854,7 +881,7 @@ func (r *realisClient) AbortJobUpdate(
|
|||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending AbortJobUpdate command to Aurora Scheduler")
|
||||
return nil, errors.Wrap(retryErr, "Error sending AbortJobUpdate command to Aurora Scheduler")
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
@ -879,7 +906,7 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a
|
|||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending AddInstances command to Aurora Scheduler")
|
||||
return nil, errors.Wrap(retryErr, "Error sending AddInstances command to Aurora Scheduler")
|
||||
}
|
||||
return resp, nil
|
||||
|
||||
|
@ -926,7 +953,7 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.S
|
|||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error querying Aurora Scheduler for task status")
|
||||
return nil, errors.Wrap(retryErr, "Error querying Aurora Scheduler for task status")
|
||||
}
|
||||
|
||||
return response.ScheduleStatusResult(resp).GetTasks(), nil
|
||||
|
@ -950,7 +977,7 @@ func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks []
|
|||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error querying Aurora Scheduler for task status without configs")
|
||||
return nil, errors.Wrap(retryErr, "Error querying Aurora Scheduler for task status without configs")
|
||||
}
|
||||
|
||||
return response.ScheduleStatusResult(resp).GetTasks(), nil
|
||||
|
@ -987,7 +1014,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
|
|||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error querying Aurora Scheduler for task configuration")
|
||||
return nil, errors.Wrap(retryErr, "Error querying Aurora Scheduler for task configuration")
|
||||
}
|
||||
|
||||
tasks := response.ScheduleStatusResult(resp).GetTasks()
|
||||
|
@ -1022,7 +1049,7 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur
|
|||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(clientErr, retryErr.Error()+": Unable to get job update details")
|
||||
return nil, errors.Wrap(retryErr, "Unable to get job update details")
|
||||
}
|
||||
return resp, nil
|
||||
|
||||
|
@ -1045,7 +1072,7 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string
|
|||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(clientErr, retryErr.Error()+": Unable to roll back job update")
|
||||
return nil, errors.Wrap(retryErr, "Unable to roll back job update")
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
@ -1086,7 +1113,7 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr
|
|||
}
|
||||
|
||||
if retryErr != nil {
|
||||
return resp, result, errors.Wrap(clientErr, retryErr.Error()+": Unable to recover connection")
|
||||
return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
|
||||
}
|
||||
|
||||
return resp, result, nil
|
||||
|
@ -1125,7 +1152,7 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror
|
|||
}
|
||||
|
||||
if retryErr != nil {
|
||||
return resp, result, errors.Wrap(clientErr, retryErr.Error()+": Unable to recover connection")
|
||||
return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
|
||||
}
|
||||
|
||||
return resp, result, nil
|
||||
|
@ -1169,7 +1196,7 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au
|
|||
}
|
||||
|
||||
if retryErr != nil {
|
||||
return resp, result, errors.Wrap(clientErr, retryErr.Error()+": Unable to recover connection")
|
||||
return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
|
||||
}
|
||||
|
||||
return resp, result, nil
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue