From 9a7071153708f7b51490fceff008ff8ad7c31cc4 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Tue, 10 Sep 2019 18:25:59 -0700 Subject: [PATCH] Making JobUpdate synchronous. MonitorJobUpdateStatus creates a local copy of job key in order to guard against side effects cuased by mutations to the JobKey being performed externally. --- monitors.go | 14 ++++++++++++-- realis.go | 22 ++++++++++++++++++++-- 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/monitors.go b/monitors.go index fe5b0d0..b6a932a 100644 --- a/monitors.go +++ b/monitors.go @@ -70,8 +70,18 @@ func (c *Client) MonitorJobUpdateStatus(updateKey aurora.JobUpdateKey, desiredStatuses map[aurora.JobUpdateStatus]bool, interval, timeout time.Duration) (aurora.JobUpdateStatus, error) { + // Make deep local copy to avoid side effects from job key being manipulated externally. + updateKeyLocal := &aurora.JobUpdateKey{ + Job: &aurora.JobKey{ + Role: updateKey.Job.GetRole(), + Environment: updateKey.Job.GetEnvironment(), + Name: updateKey.Job.GetName(), + }, + ID: updateKey.GetID(), + } + updateQ := aurora.JobUpdateQuery{ - Key: &updateKey, + Key: updateKeyLocal, Limit: 1, } ticker := time.NewTicker(interval) @@ -89,7 +99,7 @@ func (c *Client) MonitorJobUpdateStatus(updateKey aurora.JobUpdateKey, if len(updateDetails) == 0 { c.RealisConfig().logger.Println("No update found") - return aurora.JobUpdateStatus(-1), errors.New("no update found for " + updateKey.String()) + return aurora.JobUpdateStatus(-1), errors.New("no update found for " + updateKeyLocal.String()) } status := updateDetails[0].Update.Summary.State.Status diff --git a/realis.go b/realis.go index c7d0a89..48d2aa1 100644 --- a/realis.go +++ b/realis.go @@ -738,16 +738,34 @@ func (c *Client) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) e func (c *Client) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) error { c.logger.DebugPrintf("PauseJobUpdate Thrift Payload: %+v %v\n", updateKey, message) + // Thrift uses pointers for optional fields when generating Go code. To guarantee + // immutability of the JobUpdateKey, perform a deep copy and store it locally. + updateKeyLocal := &aurora.JobUpdateKey{ + Job: &aurora.JobKey{ + Role: updateKey.Job.GetRole(), + Environment: updateKey.Job.GetEnvironment(), + Name: updateKey.Job.GetName(), + }, + ID: updateKey.GetID(), + } _, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { - return c.client.PauseJobUpdate(nil, updateKey, message) + return c.client.PauseJobUpdate(nil, updateKeyLocal, message) }) if retryErr != nil { return errors.Wrap(retryErr, "error sending PauseJobUpdate command to Aurora Scheduler") } - return nil + // Make this call synchronous by blocking until it job has successfully transitioned to aborted + _, err := c.MonitorJobUpdateStatus(*updateKeyLocal, + map[aurora.JobUpdateStatus]bool{ + aurora.JobUpdateStatus_ABORTED: true, + }, + time.Second*5, + time.Minute) + + return err } // Resume Paused AuroraJob Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.