diff --git a/examples/client.go b/examples/client.go index 7c92ac5..b693eb7 100644 --- a/examples/client.go +++ b/examples/client.go @@ -166,7 +166,7 @@ func main() { log.Fatalln(err) } - if ok, mErr := r.InstancesMonitor(job.JobKey(), job.GetInstanceCount(), 5*time.Second, 50*time.Second); !ok || mErr != nil { + if ok, mErr := r.MonitorInstances(job.JobKey(), job.GetInstanceCount(), 5*time.Second, 50*time.Second); !ok || mErr != nil { err := r.KillJob(job.JobKey()) if err != nil { log.Fatalln(err) @@ -185,7 +185,7 @@ func main() { } fmt.Println(result.String()) - if ok, mErr := r.JobUpdateMonitor(*result.GetKey(), 5*time.Second, 180*time.Second); !ok || mErr != nil { + if ok, mErr := r.MonitorJobUpdate(*result.GetKey(), 5*time.Second, 180*time.Second); !ok || mErr != nil { err := r.AbortJobUpdate(*result.GetKey(), "Monitor timed out") err = r.KillJob(job.JobKey()) if err != nil { @@ -203,7 +203,7 @@ func main() { log.Fatal(err) } - if ok, err := r.InstancesMonitor(job.JobKey(), job.GetInstanceCount(), 10*time.Second, 300*time.Second); !ok || err != nil { + if ok, err := r.MonitorInstances(job.JobKey(), job.GetInstanceCount(), 10*time.Second, 300*time.Second); !ok || err != nil { err := r.KillJob(job.JobKey()) if err != nil { log.Fatal(err) @@ -219,7 +219,7 @@ func main() { log.Fatal(err) } - if ok, err := r.InstancesMonitor(job.JobKey(), job.GetInstanceCount(), 10*time.Second, 300*time.Second); !ok || err != nil { + if ok, err := r.MonitorInstances(job.JobKey(), job.GetInstanceCount(), 10*time.Second, 300*time.Second); !ok || err != nil { err := r.KillJob(job.JobKey()) if err != nil { log.Fatal(err) @@ -258,7 +258,7 @@ func main() { log.Fatal(err) } - if ok, err := r.InstancesMonitor(job.JobKey(), 0, 5*time.Second, 50*time.Second); !ok || err != nil { + if ok, err := r.MonitorInstances(job.JobKey(), 0, 5*time.Second, 50*time.Second); !ok || err != nil { log.Fatal("Unable to kill all instances of job") } @@ -312,7 +312,7 @@ func main() { log.Fatal(err) } - if ok, err := r.InstancesMonitor(job.JobKey(), int32(currInstances+numOfInstances), 5*time.Second, 50*time.Second); !ok || err != nil { + if ok, err := r.MonitorInstances(job.JobKey(), int32(currInstances+numOfInstances), 5*time.Second, 50*time.Second); !ok || err != nil { fmt.Println("Flexing up failed") } @@ -333,7 +333,7 @@ func main() { log.Fatal(err) } - if ok, err := r.InstancesMonitor(job.JobKey(), int32(currInstances-numOfInstances), 5*time.Second, 100*time.Second); !ok || err != nil { + if ok, err := r.MonitorInstances(job.JobKey(), int32(currInstances-numOfInstances), 5*time.Second, 100*time.Second); !ok || err != nil { fmt.Println("flexDown failed") } @@ -360,7 +360,7 @@ func main() { } jobUpdateKey := result.GetKey() - _, err = r.JobUpdateMonitor(*jobUpdateKey, 5*time.Second, 6*time.Minute) + _, err = r.MonitorJobUpdate(*jobUpdateKey, 5*time.Second, 6*time.Minute) if err != nil { log.Fatal(err) } @@ -518,7 +518,7 @@ func main() { } // Monitor change to DRAINING and DRAINED mode - hostResult, err := r.HostMaintenanceMonitor( + hostResult, err := r.MonitorHostMaintenance( hosts, []aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING}, 5*time.Second, @@ -547,7 +547,7 @@ func main() { } // Monitor change to DRAINING and DRAINED mode - hostResult, err := r.HostMaintenanceMonitor( + hostResult, err := r.MonitorHostMaintenance( hosts, []aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING}, 5*time.Second, @@ -573,7 +573,7 @@ func main() { } // Monitor change to DRAINING and DRAINED mode - hostResult, err := r.HostMaintenanceMonitor( + hostResult, err := r.MonitorHostMaintenance( hosts, []aurora.MaintenanceMode{aurora.MaintenanceMode_NONE}, 5*time.Second, diff --git a/examples/jsonClient.go b/examples/jsonClient.go index 692a613..4696474 100644 --- a/examples/jsonClient.go +++ b/examples/jsonClient.go @@ -208,7 +208,7 @@ func main() { fmt.Println("Error creating Aurora job: ", jobCreationErr) os.Exit(1) } else { - if ok, monitorErr := r.InstancesMonitor(auroraJob.JobKey(), auroraJob.GetInstanceCount(), 5, 50); !ok || monitorErr != nil { + if ok, monitorErr := r.MonitorInstances(auroraJob.JobKey(), auroraJob.GetInstanceCount(), 5, 50); !ok || monitorErr != nil { if jobErr := r.KillJob(auroraJob.JobKey()); jobErr != nil { fmt.Println(jobErr) diff --git a/monitors.go b/monitors.go index d45963e..e78a747 100644 --- a/monitors.go +++ b/monitors.go @@ -29,7 +29,7 @@ const ( ) // Polls the scheduler every certain amount of time to see if the update has succeeded -func (c *Client) JobUpdateMonitor(updateKey aurora.JobUpdateKey, interval, timeout time.Duration) (bool, error) { +func (c *Client) MonitorJobUpdate(updateKey aurora.JobUpdateKey, interval, timeout time.Duration) (bool, error) { if interval < 1*time.Second { interval = interval * time.Second } @@ -87,15 +87,55 @@ func (c *Client) JobUpdateMonitor(updateKey aurora.JobUpdateKey, interval, timeo } } +func (c *Client) MonitorJobUpdateStatus(updateKey aurora.JobUpdateKey, + desiredStatuses map[aurora.JobUpdateStatus]bool, + interval, timeout time.Duration) (aurora.JobUpdateStatus, error) { + + updateQ := aurora.JobUpdateQuery{ + Key: &updateKey, + Limit: 1, + } + ticker := time.NewTicker(interval) + defer ticker.Stop() + timer := time.NewTimer(timeout) + defer timer.Stop() + + for { + select { + case <-ticker.C: + updateDetails, cliErr := c.JobUpdateDetails(updateQ) + if cliErr != nil { + return aurora.JobUpdateStatus(-1), cliErr + } + + if len(updateDetails) == 0 { + c.RealisConfig().logger.Println("No update found") + return aurora.JobUpdateStatus(-1), errors.New("No update found for " + updateKey.String()) + } + status := updateDetails[0].Update.Summary.State.Status + + if _, ok := desiredStatuses[status]; ok { + return status, nil + } + + case <-timer.C: + return aurora.JobUpdateStatus(-1), newTimedoutError(errors.New("job update monitor timed out")) + } + } +} + // Monitor a AuroraJob until all instances enter one of the LiveStates -func (c *Client) InstancesMonitor(key aurora.JobKey, instances int32, interval, timeout time.Duration) (bool, error) { - return c.ScheduleStatusMonitor(key, instances, aurora.LIVE_STATES, interval, timeout) +func (c *Client) MonitorInstances(key aurora.JobKey, instances int32, interval, timeout time.Duration) (bool, error) { + return c.MonitorScheduleStatus(key, instances, aurora.LIVE_STATES, interval, timeout) } // Monitor a AuroraJob until all instances enter a desired status. // Defaults sets of desired statuses provided by the thrift API include: // ActiveStates, SlaveAssignedStates, LiveStates, and TerminalStates -func (c *Client) ScheduleStatusMonitor(key aurora.JobKey, instanceCount int32, desiredStatuses []aurora.ScheduleStatus, interval, timeout time.Duration) (bool, error) { +func (c *Client) MonitorScheduleStatus(key aurora.JobKey, + instanceCount int32, + desiredStatuses []aurora.ScheduleStatus, + interval, timeout time.Duration) (bool, error) { if interval < 1*time.Second { interval = interval * time.Second } @@ -131,7 +171,9 @@ func (c *Client) ScheduleStatusMonitor(key aurora.JobKey, instanceCount int32, d // Monitor host status until all hosts match the status provided. Returns a map where the value is true if the host // is in one of the desired mode(s) or false if it is not as of the time when the monitor exited. -func (c *Client) HostMaintenanceMonitor(hosts []string, modes []aurora.MaintenanceMode, interval, timeout time.Duration) (map[string]bool, error) { +func (c *Client) MonitorHostMaintenance(hosts []string, + modes []aurora.MaintenanceMode, + interval, timeout time.Duration) (map[string]bool, error) { if interval < 1*time.Second { interval = interval * time.Second } diff --git a/realis_e2e_test.go b/realis_e2e_test.go index 79fe25c..cef96fa 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -238,7 +238,7 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) { assert.NoError(t, err) // Test Instances Monitor - success, err := r.InstancesMonitor(job.JobKey(), job.GetInstanceCount(), 1*time.Second, 50*time.Second) + success, err := r.MonitorInstances(job.JobKey(), job.GetInstanceCount(), 1*time.Second, 50*time.Second) assert.True(t, success) assert.NoError(t, err) @@ -265,7 +265,7 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) { err := r.KillJob(job.JobKey()) assert.NoError(t, err) - success, err := r.InstancesMonitor(job.JobKey(), 0, 1*time.Second, 60*time.Second) + success, err := r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 60*time.Second) assert.True(t, success) assert.NoError(t, err) }) @@ -429,7 +429,7 @@ func TestRealisClient_CreateService(t *testing.T) { var ok bool var mErr error - if ok, mErr = r.JobUpdateMonitor(*result.GetKey(), 5*time.Second, 4*time.Minute); !ok || mErr != nil { + if ok, mErr = r.MonitorJobUpdate(*result.GetKey(), 5*time.Second, 4*time.Minute); !ok || mErr != nil { // Update may already be in a terminal state so don't check for error err := r.AbortJobUpdate(*result.GetKey(), "Monitor timed out.") @@ -507,7 +507,7 @@ func TestRealisClient_StartMaintenance(t *testing.T) { assert.NoError(t, err) // Monitor change to DRAINING and DRAINED mode - hostResults, err := r.HostMaintenanceMonitor( + hostResults, err := r.MonitorHostMaintenance( hosts, []aurora.MaintenanceMode{aurora.MaintenanceMode_SCHEDULED}, 1*time.Second, @@ -519,7 +519,7 @@ func TestRealisClient_StartMaintenance(t *testing.T) { assert.NoError(t, err) // Monitor change to DRAINING and DRAINED mode - _, err = r.HostMaintenanceMonitor( + _, err = r.MonitorHostMaintenance( hosts, []aurora.MaintenanceMode{aurora.MaintenanceMode_NONE}, 5*time.Second, @@ -533,7 +533,7 @@ func TestRealisClient_DrainHosts(t *testing.T) { assert.NoError(t, err) // Monitor change to DRAINING and DRAINED mode - hostResults, err := r.HostMaintenanceMonitor( + hostResults, err := r.MonitorHostMaintenance( hosts, []aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING}, 1*time.Second, @@ -543,7 +543,7 @@ func TestRealisClient_DrainHosts(t *testing.T) { t.Run("TestRealisClient_MonitorNontransitioned", func(t *testing.T) { // Monitor change to DRAINING and DRAINED mode - hostResults, err := r.HostMaintenanceMonitor( + hostResults, err := r.MonitorHostMaintenance( append(hosts, "IMAGINARY_HOST"), []aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING}, 1*time.Second, @@ -559,7 +559,7 @@ func TestRealisClient_DrainHosts(t *testing.T) { assert.NoError(t, err) // Monitor change to DRAINING and DRAINED mode - _, err = r.HostMaintenanceMonitor( + _, err = r.MonitorHostMaintenance( hosts, []aurora.MaintenanceMode{aurora.MaintenanceMode_NONE}, 5*time.Second, @@ -580,7 +580,7 @@ func TestRealisClient_SLADrainHosts(t *testing.T) { } // Monitor change to DRAINING and DRAINED mode - hostResults, err := r.HostMaintenanceMonitor( + hostResults, err := r.MonitorHostMaintenance( hosts, []aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING}, 1*time.Second, @@ -592,7 +592,7 @@ func TestRealisClient_SLADrainHosts(t *testing.T) { assert.NoError(t, err) // Monitor change to DRAINING and DRAINED mode - _, err = r.HostMaintenanceMonitor( + _, err = r.MonitorHostMaintenance( hosts, []aurora.MaintenanceMode{aurora.MaintenanceMode_NONE}, 5*time.Second, @@ -627,7 +627,7 @@ func TestRealisClient_SessionThreadSafety(t *testing.T) { defer wg.Done() // Test Schedule status monitor for terminal state and timing out after 30 seconds - success, err := r.ScheduleStatusMonitor(job.JobKey(), job.GetInstanceCount(), aurora.LIVE_STATES, 1, 30) + success, err := r.MonitorScheduleStatus(job.JobKey(), job.GetInstanceCount(), aurora.LIVE_STATES, 1, 30) assert.False(t, success) assert.Error(t, err) @@ -710,7 +710,7 @@ func TestRealisClient_PartitionPolicy(t *testing.T) { var ok bool var mErr error - if ok, mErr = r.JobUpdateMonitor(*result.GetKey(), 5*time.Second, 4*time.Minute); !ok || mErr != nil { + if ok, mErr = r.MonitorJobUpdate(*result.GetKey(), 5*time.Second, 4*time.Minute); !ok || mErr != nil { // Update may already be in a terminal state so don't check for error err := r.AbortJobUpdate(*result.GetKey(), "Monitor timed out.") err = r.KillJob(job.JobKey())