diff --git a/monitors.go b/monitors.go index c0bc35d..72e7027 100644 --- a/monitors.go +++ b/monitors.go @@ -19,7 +19,6 @@ import ( "time" "github.com/paypal/gorealis/gen-go/apache/aurora" - "github.com/paypal/gorealis/response" "github.com/pkg/errors" ) @@ -30,16 +29,20 @@ type Monitor struct { // Polls the scheduler every certain amount of time to see if the update has succeeded func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout int) (bool, error) { - status, err := m.JobUpdateStatus(updateKey, - map[aurora.JobUpdateStatus]bool{ - aurora.JobUpdateStatus_ROLLED_FORWARD: true, - aurora.JobUpdateStatus_ROLLED_BACK: true, - aurora.JobUpdateStatus_ABORTED: true, - aurora.JobUpdateStatus_ERROR: true, - aurora.JobUpdateStatus_FAILED: true, + updateQ := aurora.JobUpdateQuery{ + Key: &updateKey, + Limit: 1, + UpdateStatuses: []aurora.JobUpdateStatus{ + aurora.JobUpdateStatus_ROLLED_FORWARD, + aurora.JobUpdateStatus_ROLLED_BACK, + aurora.JobUpdateStatus_ABORTED, + aurora.JobUpdateStatus_ERROR, + aurora.JobUpdateStatus_FAILED, }, - time.Duration(interval)*time.Second, - time.Duration(timeout)*time.Second) + } + updateSummaries, err := m.JobUpdateQuery(updateQ, time.Duration(interval)*time.Second, time.Duration(timeout)*time.Second) + + status := updateSummaries[0].State.Status if err != nil { return false, err @@ -62,15 +65,33 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout } } -func (m *Monitor) JobUpdateStatus(updateKey aurora.JobUpdateKey, +func (m *Monitor) JobUpdateStatus( + updateKey aurora.JobUpdateKey, desiredStatuses map[aurora.JobUpdateStatus]bool, interval time.Duration, timeout time.Duration) (aurora.JobUpdateStatus, error) { - updateQ := aurora.JobUpdateQuery{ - Key: &updateKey, - Limit: 1, + desiredStatusesSlice := make([]aurora.JobUpdateStatus, 0) + + for k := range desiredStatuses { + desiredStatusesSlice = append(desiredStatusesSlice, k) } + + updateQ := aurora.JobUpdateQuery{ + Key: &updateKey, + Limit: 1, + UpdateStatuses: desiredStatusesSlice, + } + summary, err := m.JobUpdateQuery(updateQ, interval, timeout) + + return summary[0].State.Status, err +} + +func (m *Monitor) JobUpdateQuery( + updateQuery aurora.JobUpdateQuery, + interval time.Duration, + timeout time.Duration) ([]*aurora.JobUpdateSummary, error) { + ticker := time.NewTicker(interval) defer ticker.Stop() timer := time.NewTimer(timeout) @@ -81,25 +102,18 @@ func (m *Monitor) JobUpdateStatus(updateKey aurora.JobUpdateKey, for { select { case <-ticker.C: - respDetail, cliErr = m.Client.JobUpdateDetails(updateQ) + respDetail, cliErr = m.Client.GetJobUpdateSummaries(&updateQuery) if cliErr != nil { - return aurora.JobUpdateStatus(-1), cliErr + return nil, cliErr } - updateDetail := response.JobUpdateDetails(respDetail) - - if len(updateDetail) == 0 { - m.Client.RealisConfig().logger.Println("No update found") - return aurora.JobUpdateStatus(-1), errors.New("no update found for " + updateKey.String()) - } - status := updateDetail[0].Update.Summary.State.Status - - if _, ok := desiredStatuses[status]; ok { - return status, nil + updateSummaries := respDetail.Result_.GetJobUpdateSummariesResult_.UpdateSummaries + if len(updateSummaries) >= 1 { + return updateSummaries, nil } case <-timer.C: - return aurora.JobUpdateStatus(-1), newTimedoutError(errors.New("job update monitor timed out")) + return nil, newTimedoutError(errors.New("job update monitor timed out")) } } }