From 992e52eba2eea512d5b88d680d620dab740dc1b5 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Wed, 12 Dec 2018 14:13:45 -0800 Subject: [PATCH] Changing realis API to use new JobUpdate struct and to use concrete JobKey types. --- monitors.go | 4 ++-- realis.go | 34 +++++++++++++++------------------- 2 files changed, 17 insertions(+), 21 deletions(-) diff --git a/monitors.go b/monitors.go index 50f9c33..0c024ad 100644 --- a/monitors.go +++ b/monitors.go @@ -88,14 +88,14 @@ func (c *Client) JobUpdateMonitor(updateKey aurora.JobUpdateKey, interval, timeo } // Monitor a AuroraJob until all instances enter one of the LiveStates -func (c *Client) InstancesMonitor(key *aurora.JobKey, instances int32, interval, timeout time.Duration) (bool, error) { +func (c *Client) InstancesMonitor(key aurora.JobKey, instances int32, interval, timeout time.Duration) (bool, error) { return c.ScheduleStatusMonitor(key, instances, aurora.LIVE_STATES, interval, timeout) } // Monitor a AuroraJob until all instances enter a desired status. // Defaults sets of desired statuses provided by the thrift API include: // ActiveStates, SlaveAssignedStates, LiveStates, and TerminalStates -func (c *Client) ScheduleStatusMonitor(key *aurora.JobKey, instanceCount int32, desiredStatuses []aurora.ScheduleStatus, interval, timeout time.Duration) (bool, error) { +func (c *Client) ScheduleStatusMonitor(key aurora.JobKey, instanceCount int32, desiredStatuses []aurora.ScheduleStatus, interval, timeout time.Duration) (bool, error) { if interval < 1*time.Second { interval = interval * time.Second } diff --git a/realis.go b/realis.go index 75c66da..c505ac5 100644 --- a/realis.go +++ b/realis.go @@ -452,7 +452,7 @@ func (c *Client) Close() { } // Uses predefined set of states to retrieve a set of active jobs in Apache Aurora. -func (c *Client) GetInstanceIds(key *aurora.JobKey, states []aurora.ScheduleStatus) ([]int32, error) { +func (c *Client) GetInstanceIds(key aurora.JobKey, states []aurora.ScheduleStatus) ([]int32, error) { taskQ := &aurora.TaskQuery{ Role: &key.Role, Environment: &key.Environment, @@ -516,11 +516,11 @@ func (c *Client) GetJobs(role string) (*aurora.GetJobsResult_, error) { // Kill specific instances of a job. Returns true, nil if a task was actually killed as a result of this API call. // Returns false, nil if no tasks were killed as a result of this call but there was no error making the call. -func (c *Client) KillInstances(key *aurora.JobKey, instances ...int32) (bool, error) { +func (c *Client) KillInstances(key aurora.JobKey, instances ...int32) (bool, error) { c.logger.DebugPrintf("KillTasks Thrift Payload: %+v %v\n", key, instances) resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { - return c.client.KillTasks(nil, key, instances, "") + return c.client.KillTasks(nil, &key, instances, "") }) if retryErr != nil { @@ -541,13 +541,13 @@ func (c *Client) RealisConfig() *clientConfig { } // Sends a kill message to the scheduler for all active tasks under a job. -func (c *Client) KillJob(key *aurora.JobKey) error { +func (c *Client) KillJob(key aurora.JobKey) error { c.logger.DebugPrintf("KillTasks Thrift Payload: %+v\n", key) _, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { // Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards - return c.client.KillTasks(nil, key, nil, "") + return c.client.KillTasks(nil, &key, nil, "") }) if retryErr != nil { @@ -576,11 +576,7 @@ func (c *Client) CreateJob(auroraJob *AuroraJob) error { } // This API uses an update thrift call to create the services giving a few more robust features. -func (c *Client) CreateService(auroraJob *AuroraJob, settings *aurora.JobUpdateSettings) (*aurora.StartJobUpdateResult_, error) { - // Create a new job update object and ship it to the StartJobUpdate api - update := NewUpdateJob(auroraJob.TaskConfig(), settings) - update.InstanceCount(auroraJob.GetInstanceCount()) - +func (c *Client) CreateService(update *JobUpdate) (*aurora.StartJobUpdateResult_, error) { updateResult, err := c.StartJobUpdate(update, "") if err != nil { return nil, errors.Wrap(err, "unable to create service") @@ -606,12 +602,12 @@ func (c *Client) ScheduleCronJob(auroraJob *AuroraJob) error { return nil } -func (c *Client) DescheduleCronJob(key *aurora.JobKey) error { +func (c *Client) DescheduleCronJob(key aurora.JobKey) error { c.logger.DebugPrintf("DescheduleCronJob Thrift Payload: %+v\n", key) _, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { - return c.client.DescheduleCronJob(nil, key) + return c.client.DescheduleCronJob(nil, &key) }) if retryErr != nil { @@ -622,12 +618,12 @@ func (c *Client) DescheduleCronJob(key *aurora.JobKey) error { } -func (c *Client) StartCronJob(key *aurora.JobKey) error { +func (c *Client) StartCronJob(key aurora.JobKey) error { c.logger.DebugPrintf("StartCronJob Thrift Payload: %+v\n", key) _, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { - return c.client.StartCronJob(nil, key) + return c.client.StartCronJob(nil, &key) }) if retryErr != nil { @@ -638,11 +634,11 @@ func (c *Client) StartCronJob(key *aurora.JobKey) error { } // Restarts specific instances specified -func (c *Client) RestartInstances(key *aurora.JobKey, instances ...int32) error { +func (c *Client) RestartInstances(key aurora.JobKey, instances ...int32) error { c.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instances) _, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { - return c.client.RestartShards(nil, key, instances) + return c.client.RestartShards(nil, &key, instances) }) if retryErr != nil { @@ -652,7 +648,7 @@ func (c *Client) RestartInstances(key *aurora.JobKey, instances ...int32) error } // Restarts all active tasks under a job configuration. -func (c *Client) RestartJob(key *aurora.JobKey) error { +func (c *Client) RestartJob(key aurora.JobKey) error { instanceIds, err1 := c.GetInstanceIds(key, aurora.ACTIVE_STATES) if err1 != nil { @@ -663,7 +659,7 @@ func (c *Client) RestartJob(key *aurora.JobKey) error { if len(instanceIds) > 0 { _, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { - return c.client.RestartShards(nil, key, instanceIds) + return c.client.RestartShards(nil, &key, instanceIds) }) if retryErr != nil { @@ -779,7 +775,7 @@ func (c *Client) AddInstances(instKey aurora.InstanceKey, count int32) error { // Scale down the number of instances under a job configuration using the configuration of a specific instance // Instances with a higher instance ID will be removed first. For example, if our instance ID list is [0,1,2,3] // and we want to remove 2 instances, 2 and 3 will always be picked. -func (c *Client) RemoveInstances(key *aurora.JobKey, count int) error { +func (c *Client) RemoveInstances(key aurora.JobKey, count int) error { instanceIds, err := c.GetInstanceIds(key, aurora.ACTIVE_STATES) if err != nil { return errors.Wrap(err, "RemoveInstances: Could not retrieve relevant instance IDs")