Aurora jobupdate functionality -- pause/resume/pulse api (#55)

* Adding GetJobs api

* Adding Aurora pause/resume/pulse api
This commit is contained in:
kkrishna 2018-02-06 12:39:02 -08:00 committed by Renan DelValle
parent 8bd3957247
commit a6b077d1fd
4 changed files with 214 additions and 69 deletions

View file

@ -40,7 +40,7 @@ type Realis interface {
AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error)
AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error)
CreateJob(auroraJob Job) (*aurora.Response, error)
CreateService(auroraJob Job, settings UpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error)
CreateService(auroraJob Job, settings *aurora.JobUpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error)
DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error)
FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error)
GetInstanceIds(key *aurora.JobKey, states map[aurora.ScheduleStatus]bool) (map[int32]bool, error)
@ -57,6 +57,10 @@ type Realis interface {
RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error)
ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error)
PauseJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error)
ResumeJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error)
PulseJobUpdate(key *aurora.JobUpdateKey) (*aurora.Response, error)
StartCronJob(key *aurora.JobKey) (*aurora.Response, error)
// TODO: Remove this method and make it private to avoid race conditions
ReestablishConn() error
@ -549,6 +553,7 @@ func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue
return resp, nil
}
func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsResult_, error) {
var resp *aurora.Response
var result *aurora.GetJobsResult_
@ -660,9 +665,9 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
}
// This API uses an update thrift call to create the services giving a few more robust features.
func (r *realisClient) CreateService(auroraJob Job, settings UpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error) {
func (r *realisClient) CreateService(auroraJob Job, settings *aurora.JobUpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error) {
// Create a new job update object and ship it to the StartJobUpdate api
update := NewUpdateJob(auroraJob.TaskConfig(), &settings.settings)
update := NewUpdateJob(auroraJob.TaskConfig(), settings)
update.InstanceCount(auroraJob.GetInstanceCount())
update.BatchSize(auroraJob.GetInstanceCount())
@ -695,7 +700,7 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
})
if retryErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Cron Job Schedule message to Aurora Scheduler")
return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending Cron Job Schedule message to Aurora Scheduler")
}
return resp, nil
}
@ -718,7 +723,7 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response,
})
if retryErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Cron Job De-schedule message to Aurora Scheduler")
return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending Cron Job De-schedule message to Aurora Scheduler")
}
return resp, nil
@ -859,6 +864,73 @@ func (r *realisClient) AbortJobUpdate(
return resp, nil
}
//Pause Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.
func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) {
var resp *aurora.Response
var clientErr error
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.client.PauseJobUpdate(updateKey, message)
})
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if retryErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending PauseJobUpdate command to Aurora Scheduler")
}
return resp, nil
}
//Resume Paused Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.
func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) {
var resp *aurora.Response
var clientErr error
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.client.ResumeJobUpdate(updateKey, message)
})
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if retryErr != nil {
return nil, errors.Wrap(retryErr, "Error sending ResumeJobUpdate command to Aurora Scheduler")
}
return resp, nil
}
//Pulse Job Update on Aurora. UpdateID is returned from StartJobUpdate or the Aurora web UI.
func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.Response, error) {
var resp *aurora.Response
var clientErr error
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.client.PulseJobUpdate(updateKey)
})
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if retryErr != nil {
return nil, errors.Wrap(retryErr, "Error sending PulseJobUpdate command to Aurora Scheduler")
}
return resp, nil
}
// Scale up the number of instances under a job configuration using the configuration for specific
// instance to scale up.
func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) {