Making JobUpdate synchronous. MonitorJobUpdateStatus creates a local copy of job key in order to guard against side effects cuased by mutations to the JobKey being performed externally.

This commit is contained in:
Renan DelValle 2019-09-10 18:25:59 -07:00 committed by Renan DelValle
parent 203f178d68
commit 9a70711537
2 changed files with 32 additions and 4 deletions

View file

@ -70,8 +70,18 @@ func (c *Client) MonitorJobUpdateStatus(updateKey aurora.JobUpdateKey,
desiredStatuses map[aurora.JobUpdateStatus]bool,
interval, timeout time.Duration) (aurora.JobUpdateStatus, error) {
// Make deep local copy to avoid side effects from job key being manipulated externally.
updateKeyLocal := &aurora.JobUpdateKey{
Job: &aurora.JobKey{
Role: updateKey.Job.GetRole(),
Environment: updateKey.Job.GetEnvironment(),
Name: updateKey.Job.GetName(),
},
ID: updateKey.GetID(),
}
updateQ := aurora.JobUpdateQuery{
Key: &updateKey,
Key: updateKeyLocal,
Limit: 1,
}
ticker := time.NewTicker(interval)
@ -89,7 +99,7 @@ func (c *Client) MonitorJobUpdateStatus(updateKey aurora.JobUpdateKey,
if len(updateDetails) == 0 {
c.RealisConfig().logger.Println("No update found")
return aurora.JobUpdateStatus(-1), errors.New("no update found for " + updateKey.String())
return aurora.JobUpdateStatus(-1), errors.New("no update found for " + updateKeyLocal.String())
}
status := updateDetails[0].Update.Summary.State.Status

View file

@ -738,16 +738,34 @@ func (c *Client) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) e
func (c *Client) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) error {
c.logger.DebugPrintf("PauseJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
// Thrift uses pointers for optional fields when generating Go code. To guarantee
// immutability of the JobUpdateKey, perform a deep copy and store it locally.
updateKeyLocal := &aurora.JobUpdateKey{
Job: &aurora.JobKey{
Role: updateKey.Job.GetRole(),
Environment: updateKey.Job.GetEnvironment(),
Name: updateKey.Job.GetName(),
},
ID: updateKey.GetID(),
}
_, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
return c.client.PauseJobUpdate(nil, updateKey, message)
return c.client.PauseJobUpdate(nil, updateKeyLocal, message)
})
if retryErr != nil {
return errors.Wrap(retryErr, "error sending PauseJobUpdate command to Aurora Scheduler")
}
return nil
// Make this call synchronous by blocking until it job has successfully transitioned to aborted
_, err := c.MonitorJobUpdateStatus(*updateKeyLocal,
map[aurora.JobUpdateStatus]bool{
aurora.JobUpdateStatus_ABORTED: true,
},
time.Second*5,
time.Minute)
return err
}
// Resume Paused AuroraJob Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.