Changing realis API to use new JobUpdate struct and to use concrete JobKey types.

This commit is contained in:
Renan DelValle 2018-12-12 14:13:45 -08:00
parent 0c32a7e683
commit 992e52eba2
No known key found for this signature in database
GPG key ID: C240AD6D6F443EC9
2 changed files with 17 additions and 21 deletions

View file

@ -88,14 +88,14 @@ func (c *Client) JobUpdateMonitor(updateKey aurora.JobUpdateKey, interval, timeo
}
// Monitor a AuroraJob until all instances enter one of the LiveStates
func (c *Client) InstancesMonitor(key *aurora.JobKey, instances int32, interval, timeout time.Duration) (bool, error) {
func (c *Client) InstancesMonitor(key aurora.JobKey, instances int32, interval, timeout time.Duration) (bool, error) {
return c.ScheduleStatusMonitor(key, instances, aurora.LIVE_STATES, interval, timeout)
}
// Monitor a AuroraJob until all instances enter a desired status.
// Defaults sets of desired statuses provided by the thrift API include:
// ActiveStates, SlaveAssignedStates, LiveStates, and TerminalStates
func (c *Client) ScheduleStatusMonitor(key *aurora.JobKey, instanceCount int32, desiredStatuses []aurora.ScheduleStatus, interval, timeout time.Duration) (bool, error) {
func (c *Client) ScheduleStatusMonitor(key aurora.JobKey, instanceCount int32, desiredStatuses []aurora.ScheduleStatus, interval, timeout time.Duration) (bool, error) {
if interval < 1*time.Second {
interval = interval * time.Second
}

View file

@ -452,7 +452,7 @@ func (c *Client) Close() {
}
// Uses predefined set of states to retrieve a set of active jobs in Apache Aurora.
func (c *Client) GetInstanceIds(key *aurora.JobKey, states []aurora.ScheduleStatus) ([]int32, error) {
func (c *Client) GetInstanceIds(key aurora.JobKey, states []aurora.ScheduleStatus) ([]int32, error) {
taskQ := &aurora.TaskQuery{
Role: &key.Role,
Environment: &key.Environment,
@ -516,11 +516,11 @@ func (c *Client) GetJobs(role string) (*aurora.GetJobsResult_, error) {
// Kill specific instances of a job. Returns true, nil if a task was actually killed as a result of this API call.
// Returns false, nil if no tasks were killed as a result of this call but there was no error making the call.
func (c *Client) KillInstances(key *aurora.JobKey, instances ...int32) (bool, error) {
func (c *Client) KillInstances(key aurora.JobKey, instances ...int32) (bool, error) {
c.logger.DebugPrintf("KillTasks Thrift Payload: %+v %v\n", key, instances)
resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
return c.client.KillTasks(nil, key, instances, "")
return c.client.KillTasks(nil, &key, instances, "")
})
if retryErr != nil {
@ -541,13 +541,13 @@ func (c *Client) RealisConfig() *clientConfig {
}
// Sends a kill message to the scheduler for all active tasks under a job.
func (c *Client) KillJob(key *aurora.JobKey) error {
func (c *Client) KillJob(key aurora.JobKey) error {
c.logger.DebugPrintf("KillTasks Thrift Payload: %+v\n", key)
_, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
// Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards
return c.client.KillTasks(nil, key, nil, "")
return c.client.KillTasks(nil, &key, nil, "")
})
if retryErr != nil {
@ -576,11 +576,7 @@ func (c *Client) CreateJob(auroraJob *AuroraJob) error {
}
// This API uses an update thrift call to create the services giving a few more robust features.
func (c *Client) CreateService(auroraJob *AuroraJob, settings *aurora.JobUpdateSettings) (*aurora.StartJobUpdateResult_, error) {
// Create a new job update object and ship it to the StartJobUpdate api
update := NewUpdateJob(auroraJob.TaskConfig(), settings)
update.InstanceCount(auroraJob.GetInstanceCount())
func (c *Client) CreateService(update *JobUpdate) (*aurora.StartJobUpdateResult_, error) {
updateResult, err := c.StartJobUpdate(update, "")
if err != nil {
return nil, errors.Wrap(err, "unable to create service")
@ -606,12 +602,12 @@ func (c *Client) ScheduleCronJob(auroraJob *AuroraJob) error {
return nil
}
func (c *Client) DescheduleCronJob(key *aurora.JobKey) error {
func (c *Client) DescheduleCronJob(key aurora.JobKey) error {
c.logger.DebugPrintf("DescheduleCronJob Thrift Payload: %+v\n", key)
_, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
return c.client.DescheduleCronJob(nil, key)
return c.client.DescheduleCronJob(nil, &key)
})
if retryErr != nil {
@ -622,12 +618,12 @@ func (c *Client) DescheduleCronJob(key *aurora.JobKey) error {
}
func (c *Client) StartCronJob(key *aurora.JobKey) error {
func (c *Client) StartCronJob(key aurora.JobKey) error {
c.logger.DebugPrintf("StartCronJob Thrift Payload: %+v\n", key)
_, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
return c.client.StartCronJob(nil, key)
return c.client.StartCronJob(nil, &key)
})
if retryErr != nil {
@ -638,11 +634,11 @@ func (c *Client) StartCronJob(key *aurora.JobKey) error {
}
// Restarts specific instances specified
func (c *Client) RestartInstances(key *aurora.JobKey, instances ...int32) error {
func (c *Client) RestartInstances(key aurora.JobKey, instances ...int32) error {
c.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instances)
_, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
return c.client.RestartShards(nil, key, instances)
return c.client.RestartShards(nil, &key, instances)
})
if retryErr != nil {
@ -652,7 +648,7 @@ func (c *Client) RestartInstances(key *aurora.JobKey, instances ...int32) error
}
// Restarts all active tasks under a job configuration.
func (c *Client) RestartJob(key *aurora.JobKey) error {
func (c *Client) RestartJob(key aurora.JobKey) error {
instanceIds, err1 := c.GetInstanceIds(key, aurora.ACTIVE_STATES)
if err1 != nil {
@ -663,7 +659,7 @@ func (c *Client) RestartJob(key *aurora.JobKey) error {
if len(instanceIds) > 0 {
_, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
return c.client.RestartShards(nil, key, instanceIds)
return c.client.RestartShards(nil, &key, instanceIds)
})
if retryErr != nil {
@ -779,7 +775,7 @@ func (c *Client) AddInstances(instKey aurora.InstanceKey, count int32) error {
// Scale down the number of instances under a job configuration using the configuration of a specific instance
// Instances with a higher instance ID will be removed first. For example, if our instance ID list is [0,1,2,3]
// and we want to remove 2 instances, 2 and 3 will always be picked.
func (c *Client) RemoveInstances(key *aurora.JobKey, count int) error {
func (c *Client) RemoveInstances(key aurora.JobKey, count int) error {
instanceIds, err := c.GetInstanceIds(key, aurora.ACTIVE_STATES)
if err != nil {
return errors.Wrap(err, "RemoveInstances: Could not retrieve relevant instance IDs")