diff --git a/errors.go b/errors.go index 7007f83..7411a5e 100644 --- a/errors.go +++ b/errors.go @@ -28,6 +28,19 @@ func IsTimeout(err error) bool { return ok && temp.Timedout() } +type timeoutErr struct { + error + timedout bool +} + +func (r *timeoutErr) Timedout() bool { + return r.timedout +} + +func newTimedoutError(err error) *timeoutErr { + return &timeoutErr{error: err, timedout: true} +} + // retryErr is a superset of timeout which includes extra context // with regards to our retry mechanism. This is done in order to make sure // that our retry mechanism works as expected through our tests and should diff --git a/monitors.go b/monitors.go index ac5b3fa..3106cc6 100644 --- a/monitors.go +++ b/monitors.go @@ -23,12 +23,6 @@ import ( "github.com/pkg/errors" ) -const ( - UpdateFailed = "update failed" - RolledBack = "update rolled back" - Timeout = "timeout" -) - type Monitor struct { Client Realis } @@ -36,53 +30,73 @@ type Monitor struct { // Polls the scheduler every certain amount of time to see if the update has succeeded func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout int) (bool, error) { + status, err := m.JobUpdateStatus(updateKey, + map[aurora.JobUpdateStatus]bool{ + aurora.JobUpdateStatus_ROLLED_FORWARD: true, + aurora.JobUpdateStatus_ROLLED_BACK: true, + aurora.JobUpdateStatus_ABORTED: true, + aurora.JobUpdateStatus_ERROR: true, + aurora.JobUpdateStatus_FAILED: true, + }, + time.Duration(interval)*time.Second, + time.Duration(timeout)*time.Second) + + if err != nil { + return false, err + } + + m.Client.RealisConfig().logger.Printf("job update status: %v\n", status) + + // Rolled forward is the only state in which an update has been successfully updated + // if we encounter an inactive state and it is not at rolled forward, update failed + switch status { + case aurora.JobUpdateStatus_ROLLED_FORWARD: + return true, nil + case aurora.JobUpdateStatus_ROLLED_BACK, aurora.JobUpdateStatus_ABORTED, aurora.JobUpdateStatus_ERROR, aurora.JobUpdateStatus_FAILED: + return false, errors.Errorf("bad terminal state for update: %v", status) + default: + return false, errors.Errorf("unexpected update state: %v", status) + } +} + +func (m *Monitor) JobUpdateStatus(updateKey aurora.JobUpdateKey, + desiredStatuses map[aurora.JobUpdateStatus]bool, + interval time.Duration, + timeout time.Duration) (aurora.JobUpdateStatus, error) { + updateQ := aurora.JobUpdateQuery{ Key: &updateKey, Limit: 1, } - ticker := time.NewTicker(time.Second * time.Duration(interval)) + ticker := time.NewTicker(interval) defer ticker.Stop() - timer := time.NewTimer(time.Second * time.Duration(timeout)) + timer := time.NewTimer(timeout) defer timer.Stop() + var cliErr error var respDetail *aurora.Response - for { select { case <-ticker.C: respDetail, cliErr = m.Client.JobUpdateDetails(updateQ) if cliErr != nil { - return false, cliErr + return aurora.JobUpdateStatus(-1), cliErr } updateDetail := response.JobUpdateDetails(respDetail) if len(updateDetail) == 0 { m.Client.RealisConfig().logger.Println("No update found") - return false, errors.New("No update found for " + updateKey.String()) + return aurora.JobUpdateStatus(-1), errors.New("No update found for " + updateKey.String()) } status := updateDetail[0].Update.Summary.State.Status - if _, ok := aurora.ACTIVE_JOB_UPDATE_STATES[status]; !ok { - - // Rolled forward is the only state in which an update has been successfully updated - // if we encounter an inactive state and it is not at rolled forward, update failed - switch status { - case aurora.JobUpdateStatus_ROLLED_FORWARD: - m.Client.RealisConfig().logger.Println("Update succeeded") - return true, nil - case aurora.JobUpdateStatus_FAILED: - m.Client.RealisConfig().logger.Println("Update failed") - return false, errors.New(UpdateFailed) - case aurora.JobUpdateStatus_ROLLED_BACK: - m.Client.RealisConfig().logger.Println("rolled back") - return false, errors.New(RolledBack) - default: - return false, nil - } + if _, ok := desiredStatuses[status]; ok { + return status, nil } + case <-timer.C: - return false, errors.New(Timeout) + return aurora.JobUpdateStatus(-1), newTimedoutError(errors.New("job update monitor timed out")) } } } @@ -117,7 +131,7 @@ func (m *Monitor) ScheduleStatus(key *aurora.JobKey, instanceCount int32, desire case <-timer.C: // If the timer runs out, return a timeout error to user - return false, errors.New(Timeout) + return false, newTimedoutError(errors.New("schedule status monitor timed out")) } } } @@ -177,7 +191,7 @@ func (m *Monitor) HostMaintenance(hosts []string, modes []aurora.MaintenanceMode hostResult[host] = false } - return hostResult, errors.New(Timeout) + return hostResult, newTimedoutError(errors.New("host maintenance monitor timed out")) } } } diff --git a/realis.go b/realis.go index d274fd0..f12f922 100644 --- a/realis.go +++ b/realis.go @@ -775,6 +775,8 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au } // Abort Job Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI. +// This API is meant to be synchronous. It will attempt to wait until the update transitions to the aborted state. +// However, if the job update does not transition to the ABORT state an error will be returned. func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) { r.logger.DebugPrintf("AbortJobUpdate Thrift Payload: %+v %v\n", updateKey, message) @@ -786,7 +788,12 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str if retryErr != nil { return nil, errors.Wrap(retryErr, "Error sending AbortJobUpdate command to Aurora Scheduler") } - return resp, nil + + // Make this call synchronous by blocking until it job has successfully transitioned to aborted + m := Monitor{Client: r} + _, err := m.JobUpdateStatus(updateKey, map[aurora.JobUpdateStatus]bool{aurora.JobUpdateStatus_ABORTED: true}, time.Second*5, time.Minute) + + return resp, err } //Pause Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.