diff --git a/monitors.go b/monitors.go index b6a932a..79ed0f7 100644 --- a/monitors.go +++ b/monitors.go @@ -32,13 +32,17 @@ func (c *Client) MonitorJobUpdate(updateKey aurora.JobUpdateKey, interval, timeo if timeout < 1*time.Second { timeout = timeout * time.Second } - status, err := c.MonitorJobUpdateStatus(updateKey, - map[aurora.JobUpdateStatus]bool{ - aurora.JobUpdateStatus_ROLLED_FORWARD: true, - aurora.JobUpdateStatus_ROLLED_BACK: true, - aurora.JobUpdateStatus_ABORTED: true, - aurora.JobUpdateStatus_ERROR: true, - aurora.JobUpdateStatus_FAILED: true, + updateSummaries, err := c.MonitorJobUpdateQuery( + aurora.JobUpdateQuery{ + Key: &updateKey, + Limit: 1, + UpdateStatuses: []aurora.JobUpdateStatus{ + aurora.JobUpdateStatus_ROLLED_FORWARD, + aurora.JobUpdateStatus_ROLLED_BACK, + aurora.JobUpdateStatus_ABORTED, + aurora.JobUpdateStatus_ERROR, + aurora.JobUpdateStatus_FAILED, + }, }, interval, timeout) @@ -47,6 +51,8 @@ func (c *Client) MonitorJobUpdate(updateKey aurora.JobUpdateKey, interval, timeo return false, err } + status := updateSummaries[0].State.Status + c.RealisConfig().logger.Printf("job update status: %v\n", status) // Rolled forward is the only state in which an update has been successfully updated @@ -67,9 +73,13 @@ func (c *Client) MonitorJobUpdate(updateKey aurora.JobUpdateKey, interval, timeo // MonitorJobUpdateStatus polls the scheduler for information about an update until the update enters one of the // desired states or until the function times out. func (c *Client) MonitorJobUpdateStatus(updateKey aurora.JobUpdateKey, - desiredStatuses map[aurora.JobUpdateStatus]bool, + desiredStatuses []aurora.JobUpdateStatus, interval, timeout time.Duration) (aurora.JobUpdateStatus, error) { + if len(desiredStatuses) == 0 { + return aurora.JobUpdateStatus(-1), errors.New("no desired statuses provided") + } + // Make deep local copy to avoid side effects from job key being manipulated externally. updateKeyLocal := &aurora.JobUpdateKey{ Job: &aurora.JobKey{ @@ -81,34 +91,42 @@ func (c *Client) MonitorJobUpdateStatus(updateKey aurora.JobUpdateKey, } updateQ := aurora.JobUpdateQuery{ - Key: updateKeyLocal, - Limit: 1, + Key: updateKeyLocal, + Limit: 1, + UpdateStatuses: desiredStatuses, } + + summary, err := c.MonitorJobUpdateQuery(updateQ, interval, timeout) + if len(summary) > 0 { + return summary[0].State.Status, err + } + + return aurora.JobUpdateStatus(-1), err +} + +func (c *Client) MonitorJobUpdateQuery( + updateQuery aurora.JobUpdateQuery, + interval time.Duration, + timeout time.Duration) ([]*aurora.JobUpdateSummary, error) { + ticker := time.NewTicker(interval) defer ticker.Stop() timer := time.NewTimer(timeout) defer timer.Stop() - for { select { case <-ticker.C: - updateDetails, cliErr := c.JobUpdateDetails(updateQ) + updateSummaryResults, cliErr := c.GetJobUpdateSummaries(&updateQuery) if cliErr != nil { - return aurora.JobUpdateStatus(-1), cliErr + return nil, cliErr } - if len(updateDetails) == 0 { - c.RealisConfig().logger.Println("No update found") - return aurora.JobUpdateStatus(-1), errors.New("no update found for " + updateKeyLocal.String()) - } - status := updateDetails[0].Update.Summary.State.Status - - if _, ok := desiredStatuses[status]; ok { - return status, nil + if len(updateSummaryResults.GetUpdateSummaries()) >= 1 { + return updateSummaryResults.GetUpdateSummaries(), nil } case <-timer.C: - return aurora.JobUpdateStatus(-1), newTimedoutError(errors.New("job update monitor timed out")) + return nil, newTimedoutError(errors.New("job update monitor timed out")) } } } diff --git a/realis.go b/realis.go index 48d2aa1..32c4d5f 100644 --- a/realis.go +++ b/realis.go @@ -759,9 +759,7 @@ func (c *Client) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) // Make this call synchronous by blocking until it job has successfully transitioned to aborted _, err := c.MonitorJobUpdateStatus(*updateKeyLocal, - map[aurora.JobUpdateStatus]bool{ - aurora.JobUpdateStatus_ABORTED: true, - }, + []aurora.JobUpdateStatus{aurora.JobUpdateStatus_ABORTED}, time.Second*5, time.Minute)