Refactored JobUpdateStatus monitor to use a new monitor called JobUpdateQuery. Update monitor will now still continue if it does not find an update to monitor. Furthermore, it has been optimized to reduce returning payloads from the scheduler as much as possible. This is through using the GetJobUpdateSummaries API instead of JobUpdateDetails and by including a the statuses we're searching for as part of the query.

This commit is contained in:
Renan DelValle 2019-05-03 18:19:36 -07:00
parent f75a6bc3dd
commit 19aacbb586
No known key found for this signature in database
GPG key ID: C240AD6D6F443EC9

View file

@ -19,7 +19,6 @@ import (
"time"
"github.com/paypal/gorealis/gen-go/apache/aurora"
"github.com/paypal/gorealis/response"
"github.com/pkg/errors"
)
@ -30,16 +29,20 @@ type Monitor struct {
// Polls the scheduler every certain amount of time to see if the update has succeeded
func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout int) (bool, error) {
status, err := m.JobUpdateStatus(updateKey,
map[aurora.JobUpdateStatus]bool{
aurora.JobUpdateStatus_ROLLED_FORWARD: true,
aurora.JobUpdateStatus_ROLLED_BACK: true,
aurora.JobUpdateStatus_ABORTED: true,
aurora.JobUpdateStatus_ERROR: true,
aurora.JobUpdateStatus_FAILED: true,
updateQ := aurora.JobUpdateQuery{
Key: &updateKey,
Limit: 1,
UpdateStatuses: []aurora.JobUpdateStatus{
aurora.JobUpdateStatus_ROLLED_FORWARD,
aurora.JobUpdateStatus_ROLLED_BACK,
aurora.JobUpdateStatus_ABORTED,
aurora.JobUpdateStatus_ERROR,
aurora.JobUpdateStatus_FAILED,
},
time.Duration(interval)*time.Second,
time.Duration(timeout)*time.Second)
}
updateSummaries, err := m.JobUpdateQuery(updateQ, time.Duration(interval)*time.Second, time.Duration(timeout)*time.Second)
status := updateSummaries[0].State.Status
if err != nil {
return false, err
@ -62,15 +65,33 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout
}
}
func (m *Monitor) JobUpdateStatus(updateKey aurora.JobUpdateKey,
func (m *Monitor) JobUpdateStatus(
updateKey aurora.JobUpdateKey,
desiredStatuses map[aurora.JobUpdateStatus]bool,
interval time.Duration,
timeout time.Duration) (aurora.JobUpdateStatus, error) {
updateQ := aurora.JobUpdateQuery{
Key: &updateKey,
Limit: 1,
desiredStatusesSlice := make([]aurora.JobUpdateStatus, 0)
for k := range desiredStatuses {
desiredStatusesSlice = append(desiredStatusesSlice, k)
}
updateQ := aurora.JobUpdateQuery{
Key: &updateKey,
Limit: 1,
UpdateStatuses: desiredStatusesSlice,
}
summary, err := m.JobUpdateQuery(updateQ, interval, timeout)
return summary[0].State.Status, err
}
func (m *Monitor) JobUpdateQuery(
updateQuery aurora.JobUpdateQuery,
interval time.Duration,
timeout time.Duration) ([]*aurora.JobUpdateSummary, error) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
timer := time.NewTimer(timeout)
@ -81,25 +102,18 @@ func (m *Monitor) JobUpdateStatus(updateKey aurora.JobUpdateKey,
for {
select {
case <-ticker.C:
respDetail, cliErr = m.Client.JobUpdateDetails(updateQ)
respDetail, cliErr = m.Client.GetJobUpdateSummaries(&updateQuery)
if cliErr != nil {
return aurora.JobUpdateStatus(-1), cliErr
return nil, cliErr
}
updateDetail := response.JobUpdateDetails(respDetail)
if len(updateDetail) == 0 {
m.Client.RealisConfig().logger.Println("No update found")
return aurora.JobUpdateStatus(-1), errors.New("no update found for " + updateKey.String())
}
status := updateDetail[0].Update.Summary.State.Status
if _, ok := desiredStatuses[status]; ok {
return status, nil
updateSummaries := respDetail.Result_.GetJobUpdateSummariesResult_.UpdateSummaries
if len(updateSummaries) >= 1 {
return updateSummaries, nil
}
case <-timer.C:
return aurora.JobUpdateStatus(-1), newTimedoutError(errors.New("job update monitor timed out"))
return nil, newTimedoutError(errors.New("job update monitor timed out"))
}
}
}