Adding MonitorJobUpdateQuery which serves as the basis for other monitors.

This commit is contained in:
Renan DelValle 2019-09-10 18:47:08 -07:00 committed by Renan DelValle
parent 9a70711537
commit 5d75dcc15e
2 changed files with 41 additions and 25 deletions

View file

@ -32,13 +32,17 @@ func (c *Client) MonitorJobUpdate(updateKey aurora.JobUpdateKey, interval, timeo
if timeout < 1*time.Second { if timeout < 1*time.Second {
timeout = timeout * time.Second timeout = timeout * time.Second
} }
status, err := c.MonitorJobUpdateStatus(updateKey, updateSummaries, err := c.MonitorJobUpdateQuery(
map[aurora.JobUpdateStatus]bool{ aurora.JobUpdateQuery{
aurora.JobUpdateStatus_ROLLED_FORWARD: true, Key: &updateKey,
aurora.JobUpdateStatus_ROLLED_BACK: true, Limit: 1,
aurora.JobUpdateStatus_ABORTED: true, UpdateStatuses: []aurora.JobUpdateStatus{
aurora.JobUpdateStatus_ERROR: true, aurora.JobUpdateStatus_ROLLED_FORWARD,
aurora.JobUpdateStatus_FAILED: true, aurora.JobUpdateStatus_ROLLED_BACK,
aurora.JobUpdateStatus_ABORTED,
aurora.JobUpdateStatus_ERROR,
aurora.JobUpdateStatus_FAILED,
},
}, },
interval, interval,
timeout) timeout)
@ -47,6 +51,8 @@ func (c *Client) MonitorJobUpdate(updateKey aurora.JobUpdateKey, interval, timeo
return false, err return false, err
} }
status := updateSummaries[0].State.Status
c.RealisConfig().logger.Printf("job update status: %v\n", status) c.RealisConfig().logger.Printf("job update status: %v\n", status)
// Rolled forward is the only state in which an update has been successfully updated // Rolled forward is the only state in which an update has been successfully updated
@ -67,9 +73,13 @@ func (c *Client) MonitorJobUpdate(updateKey aurora.JobUpdateKey, interval, timeo
// MonitorJobUpdateStatus polls the scheduler for information about an update until the update enters one of the // MonitorJobUpdateStatus polls the scheduler for information about an update until the update enters one of the
// desired states or until the function times out. // desired states or until the function times out.
func (c *Client) MonitorJobUpdateStatus(updateKey aurora.JobUpdateKey, func (c *Client) MonitorJobUpdateStatus(updateKey aurora.JobUpdateKey,
desiredStatuses map[aurora.JobUpdateStatus]bool, desiredStatuses []aurora.JobUpdateStatus,
interval, timeout time.Duration) (aurora.JobUpdateStatus, error) { interval, timeout time.Duration) (aurora.JobUpdateStatus, error) {
if len(desiredStatuses) == 0 {
return aurora.JobUpdateStatus(-1), errors.New("no desired statuses provided")
}
// Make deep local copy to avoid side effects from job key being manipulated externally. // Make deep local copy to avoid side effects from job key being manipulated externally.
updateKeyLocal := &aurora.JobUpdateKey{ updateKeyLocal := &aurora.JobUpdateKey{
Job: &aurora.JobKey{ Job: &aurora.JobKey{
@ -81,34 +91,42 @@ func (c *Client) MonitorJobUpdateStatus(updateKey aurora.JobUpdateKey,
} }
updateQ := aurora.JobUpdateQuery{ updateQ := aurora.JobUpdateQuery{
Key: updateKeyLocal, Key: updateKeyLocal,
Limit: 1, Limit: 1,
UpdateStatuses: desiredStatuses,
} }
summary, err := c.MonitorJobUpdateQuery(updateQ, interval, timeout)
if len(summary) > 0 {
return summary[0].State.Status, err
}
return aurora.JobUpdateStatus(-1), err
}
func (c *Client) MonitorJobUpdateQuery(
updateQuery aurora.JobUpdateQuery,
interval time.Duration,
timeout time.Duration) ([]*aurora.JobUpdateSummary, error) {
ticker := time.NewTicker(interval) ticker := time.NewTicker(interval)
defer ticker.Stop() defer ticker.Stop()
timer := time.NewTimer(timeout) timer := time.NewTimer(timeout)
defer timer.Stop() defer timer.Stop()
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
updateDetails, cliErr := c.JobUpdateDetails(updateQ) updateSummaryResults, cliErr := c.GetJobUpdateSummaries(&updateQuery)
if cliErr != nil { if cliErr != nil {
return aurora.JobUpdateStatus(-1), cliErr return nil, cliErr
} }
if len(updateDetails) == 0 { if len(updateSummaryResults.GetUpdateSummaries()) >= 1 {
c.RealisConfig().logger.Println("No update found") return updateSummaryResults.GetUpdateSummaries(), nil
return aurora.JobUpdateStatus(-1), errors.New("no update found for " + updateKeyLocal.String())
}
status := updateDetails[0].Update.Summary.State.Status
if _, ok := desiredStatuses[status]; ok {
return status, nil
} }
case <-timer.C: case <-timer.C:
return aurora.JobUpdateStatus(-1), newTimedoutError(errors.New("job update monitor timed out")) return nil, newTimedoutError(errors.New("job update monitor timed out"))
} }
} }
} }

View file

@ -759,9 +759,7 @@ func (c *Client) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string)
// Make this call synchronous by blocking until it job has successfully transitioned to aborted // Make this call synchronous by blocking until it job has successfully transitioned to aborted
_, err := c.MonitorJobUpdateStatus(*updateKeyLocal, _, err := c.MonitorJobUpdateStatus(*updateKeyLocal,
map[aurora.JobUpdateStatus]bool{ []aurora.JobUpdateStatus{aurora.JobUpdateStatus_ABORTED},
aurora.JobUpdateStatus_ABORTED: true,
},
time.Second*5, time.Second*5,
time.Minute) time.Minute)