Adding JobUpdateStatus monitor as well as renaming all monitor functions to be Monitor + <subject>
This commit is contained in:
parent
04471c6918
commit
6f20f5b62f
4 changed files with 71 additions and 29 deletions
|
@ -166,7 +166,7 @@ func main() {
|
||||||
log.Fatalln(err)
|
log.Fatalln(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if ok, mErr := r.InstancesMonitor(job.JobKey(), job.GetInstanceCount(), 5*time.Second, 50*time.Second); !ok || mErr != nil {
|
if ok, mErr := r.MonitorInstances(job.JobKey(), job.GetInstanceCount(), 5*time.Second, 50*time.Second); !ok || mErr != nil {
|
||||||
err := r.KillJob(job.JobKey())
|
err := r.KillJob(job.JobKey())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalln(err)
|
log.Fatalln(err)
|
||||||
|
@ -185,7 +185,7 @@ func main() {
|
||||||
}
|
}
|
||||||
fmt.Println(result.String())
|
fmt.Println(result.String())
|
||||||
|
|
||||||
if ok, mErr := r.JobUpdateMonitor(*result.GetKey(), 5*time.Second, 180*time.Second); !ok || mErr != nil {
|
if ok, mErr := r.MonitorJobUpdate(*result.GetKey(), 5*time.Second, 180*time.Second); !ok || mErr != nil {
|
||||||
err := r.AbortJobUpdate(*result.GetKey(), "Monitor timed out")
|
err := r.AbortJobUpdate(*result.GetKey(), "Monitor timed out")
|
||||||
err = r.KillJob(job.JobKey())
|
err = r.KillJob(job.JobKey())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -203,7 +203,7 @@ func main() {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if ok, err := r.InstancesMonitor(job.JobKey(), job.GetInstanceCount(), 10*time.Second, 300*time.Second); !ok || err != nil {
|
if ok, err := r.MonitorInstances(job.JobKey(), job.GetInstanceCount(), 10*time.Second, 300*time.Second); !ok || err != nil {
|
||||||
err := r.KillJob(job.JobKey())
|
err := r.KillJob(job.JobKey())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
|
@ -219,7 +219,7 @@ func main() {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if ok, err := r.InstancesMonitor(job.JobKey(), job.GetInstanceCount(), 10*time.Second, 300*time.Second); !ok || err != nil {
|
if ok, err := r.MonitorInstances(job.JobKey(), job.GetInstanceCount(), 10*time.Second, 300*time.Second); !ok || err != nil {
|
||||||
err := r.KillJob(job.JobKey())
|
err := r.KillJob(job.JobKey())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
|
@ -258,7 +258,7 @@ func main() {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if ok, err := r.InstancesMonitor(job.JobKey(), 0, 5*time.Second, 50*time.Second); !ok || err != nil {
|
if ok, err := r.MonitorInstances(job.JobKey(), 0, 5*time.Second, 50*time.Second); !ok || err != nil {
|
||||||
log.Fatal("Unable to kill all instances of job")
|
log.Fatal("Unable to kill all instances of job")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -312,7 +312,7 @@ func main() {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if ok, err := r.InstancesMonitor(job.JobKey(), int32(currInstances+numOfInstances), 5*time.Second, 50*time.Second); !ok || err != nil {
|
if ok, err := r.MonitorInstances(job.JobKey(), int32(currInstances+numOfInstances), 5*time.Second, 50*time.Second); !ok || err != nil {
|
||||||
fmt.Println("Flexing up failed")
|
fmt.Println("Flexing up failed")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -333,7 +333,7 @@ func main() {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if ok, err := r.InstancesMonitor(job.JobKey(), int32(currInstances-numOfInstances), 5*time.Second, 100*time.Second); !ok || err != nil {
|
if ok, err := r.MonitorInstances(job.JobKey(), int32(currInstances-numOfInstances), 5*time.Second, 100*time.Second); !ok || err != nil {
|
||||||
fmt.Println("flexDown failed")
|
fmt.Println("flexDown failed")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -360,7 +360,7 @@ func main() {
|
||||||
}
|
}
|
||||||
|
|
||||||
jobUpdateKey := result.GetKey()
|
jobUpdateKey := result.GetKey()
|
||||||
_, err = r.JobUpdateMonitor(*jobUpdateKey, 5*time.Second, 6*time.Minute)
|
_, err = r.MonitorJobUpdate(*jobUpdateKey, 5*time.Second, 6*time.Minute)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -518,7 +518,7 @@ func main() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Monitor change to DRAINING and DRAINED mode
|
// Monitor change to DRAINING and DRAINED mode
|
||||||
hostResult, err := r.HostMaintenanceMonitor(
|
hostResult, err := r.MonitorHostMaintenance(
|
||||||
hosts,
|
hosts,
|
||||||
[]aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING},
|
[]aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING},
|
||||||
5*time.Second,
|
5*time.Second,
|
||||||
|
@ -547,7 +547,7 @@ func main() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Monitor change to DRAINING and DRAINED mode
|
// Monitor change to DRAINING and DRAINED mode
|
||||||
hostResult, err := r.HostMaintenanceMonitor(
|
hostResult, err := r.MonitorHostMaintenance(
|
||||||
hosts,
|
hosts,
|
||||||
[]aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING},
|
[]aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING},
|
||||||
5*time.Second,
|
5*time.Second,
|
||||||
|
@ -573,7 +573,7 @@ func main() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Monitor change to DRAINING and DRAINED mode
|
// Monitor change to DRAINING and DRAINED mode
|
||||||
hostResult, err := r.HostMaintenanceMonitor(
|
hostResult, err := r.MonitorHostMaintenance(
|
||||||
hosts,
|
hosts,
|
||||||
[]aurora.MaintenanceMode{aurora.MaintenanceMode_NONE},
|
[]aurora.MaintenanceMode{aurora.MaintenanceMode_NONE},
|
||||||
5*time.Second,
|
5*time.Second,
|
||||||
|
|
|
@ -208,7 +208,7 @@ func main() {
|
||||||
fmt.Println("Error creating Aurora job: ", jobCreationErr)
|
fmt.Println("Error creating Aurora job: ", jobCreationErr)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
} else {
|
} else {
|
||||||
if ok, monitorErr := r.InstancesMonitor(auroraJob.JobKey(), auroraJob.GetInstanceCount(), 5, 50); !ok || monitorErr != nil {
|
if ok, monitorErr := r.MonitorInstances(auroraJob.JobKey(), auroraJob.GetInstanceCount(), 5, 50); !ok || monitorErr != nil {
|
||||||
if jobErr := r.KillJob(auroraJob.JobKey()); jobErr !=
|
if jobErr := r.KillJob(auroraJob.JobKey()); jobErr !=
|
||||||
nil {
|
nil {
|
||||||
fmt.Println(jobErr)
|
fmt.Println(jobErr)
|
||||||
|
|
52
monitors.go
52
monitors.go
|
@ -29,7 +29,7 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
// Polls the scheduler every certain amount of time to see if the update has succeeded
|
// Polls the scheduler every certain amount of time to see if the update has succeeded
|
||||||
func (c *Client) JobUpdateMonitor(updateKey aurora.JobUpdateKey, interval, timeout time.Duration) (bool, error) {
|
func (c *Client) MonitorJobUpdate(updateKey aurora.JobUpdateKey, interval, timeout time.Duration) (bool, error) {
|
||||||
if interval < 1*time.Second {
|
if interval < 1*time.Second {
|
||||||
interval = interval * time.Second
|
interval = interval * time.Second
|
||||||
}
|
}
|
||||||
|
@ -87,15 +87,55 @@ func (c *Client) JobUpdateMonitor(updateKey aurora.JobUpdateKey, interval, timeo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Client) MonitorJobUpdateStatus(updateKey aurora.JobUpdateKey,
|
||||||
|
desiredStatuses map[aurora.JobUpdateStatus]bool,
|
||||||
|
interval, timeout time.Duration) (aurora.JobUpdateStatus, error) {
|
||||||
|
|
||||||
|
updateQ := aurora.JobUpdateQuery{
|
||||||
|
Key: &updateKey,
|
||||||
|
Limit: 1,
|
||||||
|
}
|
||||||
|
ticker := time.NewTicker(interval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
timer := time.NewTimer(timeout)
|
||||||
|
defer timer.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
updateDetails, cliErr := c.JobUpdateDetails(updateQ)
|
||||||
|
if cliErr != nil {
|
||||||
|
return aurora.JobUpdateStatus(-1), cliErr
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(updateDetails) == 0 {
|
||||||
|
c.RealisConfig().logger.Println("No update found")
|
||||||
|
return aurora.JobUpdateStatus(-1), errors.New("No update found for " + updateKey.String())
|
||||||
|
}
|
||||||
|
status := updateDetails[0].Update.Summary.State.Status
|
||||||
|
|
||||||
|
if _, ok := desiredStatuses[status]; ok {
|
||||||
|
return status, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
case <-timer.C:
|
||||||
|
return aurora.JobUpdateStatus(-1), newTimedoutError(errors.New("job update monitor timed out"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Monitor a AuroraJob until all instances enter one of the LiveStates
|
// Monitor a AuroraJob until all instances enter one of the LiveStates
|
||||||
func (c *Client) InstancesMonitor(key aurora.JobKey, instances int32, interval, timeout time.Duration) (bool, error) {
|
func (c *Client) MonitorInstances(key aurora.JobKey, instances int32, interval, timeout time.Duration) (bool, error) {
|
||||||
return c.ScheduleStatusMonitor(key, instances, aurora.LIVE_STATES, interval, timeout)
|
return c.MonitorScheduleStatus(key, instances, aurora.LIVE_STATES, interval, timeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Monitor a AuroraJob until all instances enter a desired status.
|
// Monitor a AuroraJob until all instances enter a desired status.
|
||||||
// Defaults sets of desired statuses provided by the thrift API include:
|
// Defaults sets of desired statuses provided by the thrift API include:
|
||||||
// ActiveStates, SlaveAssignedStates, LiveStates, and TerminalStates
|
// ActiveStates, SlaveAssignedStates, LiveStates, and TerminalStates
|
||||||
func (c *Client) ScheduleStatusMonitor(key aurora.JobKey, instanceCount int32, desiredStatuses []aurora.ScheduleStatus, interval, timeout time.Duration) (bool, error) {
|
func (c *Client) MonitorScheduleStatus(key aurora.JobKey,
|
||||||
|
instanceCount int32,
|
||||||
|
desiredStatuses []aurora.ScheduleStatus,
|
||||||
|
interval, timeout time.Duration) (bool, error) {
|
||||||
if interval < 1*time.Second {
|
if interval < 1*time.Second {
|
||||||
interval = interval * time.Second
|
interval = interval * time.Second
|
||||||
}
|
}
|
||||||
|
@ -131,7 +171,9 @@ func (c *Client) ScheduleStatusMonitor(key aurora.JobKey, instanceCount int32, d
|
||||||
|
|
||||||
// Monitor host status until all hosts match the status provided. Returns a map where the value is true if the host
|
// Monitor host status until all hosts match the status provided. Returns a map where the value is true if the host
|
||||||
// is in one of the desired mode(s) or false if it is not as of the time when the monitor exited.
|
// is in one of the desired mode(s) or false if it is not as of the time when the monitor exited.
|
||||||
func (c *Client) HostMaintenanceMonitor(hosts []string, modes []aurora.MaintenanceMode, interval, timeout time.Duration) (map[string]bool, error) {
|
func (c *Client) MonitorHostMaintenance(hosts []string,
|
||||||
|
modes []aurora.MaintenanceMode,
|
||||||
|
interval, timeout time.Duration) (map[string]bool, error) {
|
||||||
if interval < 1*time.Second {
|
if interval < 1*time.Second {
|
||||||
interval = interval * time.Second
|
interval = interval * time.Second
|
||||||
}
|
}
|
||||||
|
|
|
@ -238,7 +238,7 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
// Test Instances Monitor
|
// Test Instances Monitor
|
||||||
success, err := r.InstancesMonitor(job.JobKey(), job.GetInstanceCount(), 1*time.Second, 50*time.Second)
|
success, err := r.MonitorInstances(job.JobKey(), job.GetInstanceCount(), 1*time.Second, 50*time.Second)
|
||||||
assert.True(t, success)
|
assert.True(t, success)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
@ -265,7 +265,7 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) {
|
||||||
err := r.KillJob(job.JobKey())
|
err := r.KillJob(job.JobKey())
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
success, err := r.InstancesMonitor(job.JobKey(), 0, 1*time.Second, 60*time.Second)
|
success, err := r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 60*time.Second)
|
||||||
assert.True(t, success)
|
assert.True(t, success)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
})
|
})
|
||||||
|
@ -429,7 +429,7 @@ func TestRealisClient_CreateService(t *testing.T) {
|
||||||
var ok bool
|
var ok bool
|
||||||
var mErr error
|
var mErr error
|
||||||
|
|
||||||
if ok, mErr = r.JobUpdateMonitor(*result.GetKey(), 5*time.Second, 4*time.Minute); !ok || mErr != nil {
|
if ok, mErr = r.MonitorJobUpdate(*result.GetKey(), 5*time.Second, 4*time.Minute); !ok || mErr != nil {
|
||||||
// Update may already be in a terminal state so don't check for error
|
// Update may already be in a terminal state so don't check for error
|
||||||
err := r.AbortJobUpdate(*result.GetKey(), "Monitor timed out.")
|
err := r.AbortJobUpdate(*result.GetKey(), "Monitor timed out.")
|
||||||
|
|
||||||
|
@ -507,7 +507,7 @@ func TestRealisClient_StartMaintenance(t *testing.T) {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
// Monitor change to DRAINING and DRAINED mode
|
// Monitor change to DRAINING and DRAINED mode
|
||||||
hostResults, err := r.HostMaintenanceMonitor(
|
hostResults, err := r.MonitorHostMaintenance(
|
||||||
hosts,
|
hosts,
|
||||||
[]aurora.MaintenanceMode{aurora.MaintenanceMode_SCHEDULED},
|
[]aurora.MaintenanceMode{aurora.MaintenanceMode_SCHEDULED},
|
||||||
1*time.Second,
|
1*time.Second,
|
||||||
|
@ -519,7 +519,7 @@ func TestRealisClient_StartMaintenance(t *testing.T) {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
// Monitor change to DRAINING and DRAINED mode
|
// Monitor change to DRAINING and DRAINED mode
|
||||||
_, err = r.HostMaintenanceMonitor(
|
_, err = r.MonitorHostMaintenance(
|
||||||
hosts,
|
hosts,
|
||||||
[]aurora.MaintenanceMode{aurora.MaintenanceMode_NONE},
|
[]aurora.MaintenanceMode{aurora.MaintenanceMode_NONE},
|
||||||
5*time.Second,
|
5*time.Second,
|
||||||
|
@ -533,7 +533,7 @@ func TestRealisClient_DrainHosts(t *testing.T) {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
// Monitor change to DRAINING and DRAINED mode
|
// Monitor change to DRAINING and DRAINED mode
|
||||||
hostResults, err := r.HostMaintenanceMonitor(
|
hostResults, err := r.MonitorHostMaintenance(
|
||||||
hosts,
|
hosts,
|
||||||
[]aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING},
|
[]aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING},
|
||||||
1*time.Second,
|
1*time.Second,
|
||||||
|
@ -543,7 +543,7 @@ func TestRealisClient_DrainHosts(t *testing.T) {
|
||||||
|
|
||||||
t.Run("TestRealisClient_MonitorNontransitioned", func(t *testing.T) {
|
t.Run("TestRealisClient_MonitorNontransitioned", func(t *testing.T) {
|
||||||
// Monitor change to DRAINING and DRAINED mode
|
// Monitor change to DRAINING and DRAINED mode
|
||||||
hostResults, err := r.HostMaintenanceMonitor(
|
hostResults, err := r.MonitorHostMaintenance(
|
||||||
append(hosts, "IMAGINARY_HOST"),
|
append(hosts, "IMAGINARY_HOST"),
|
||||||
[]aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING},
|
[]aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING},
|
||||||
1*time.Second,
|
1*time.Second,
|
||||||
|
@ -559,7 +559,7 @@ func TestRealisClient_DrainHosts(t *testing.T) {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
// Monitor change to DRAINING and DRAINED mode
|
// Monitor change to DRAINING and DRAINED mode
|
||||||
_, err = r.HostMaintenanceMonitor(
|
_, err = r.MonitorHostMaintenance(
|
||||||
hosts,
|
hosts,
|
||||||
[]aurora.MaintenanceMode{aurora.MaintenanceMode_NONE},
|
[]aurora.MaintenanceMode{aurora.MaintenanceMode_NONE},
|
||||||
5*time.Second,
|
5*time.Second,
|
||||||
|
@ -580,7 +580,7 @@ func TestRealisClient_SLADrainHosts(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Monitor change to DRAINING and DRAINED mode
|
// Monitor change to DRAINING and DRAINED mode
|
||||||
hostResults, err := r.HostMaintenanceMonitor(
|
hostResults, err := r.MonitorHostMaintenance(
|
||||||
hosts,
|
hosts,
|
||||||
[]aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING},
|
[]aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING},
|
||||||
1*time.Second,
|
1*time.Second,
|
||||||
|
@ -592,7 +592,7 @@ func TestRealisClient_SLADrainHosts(t *testing.T) {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
// Monitor change to DRAINING and DRAINED mode
|
// Monitor change to DRAINING and DRAINED mode
|
||||||
_, err = r.HostMaintenanceMonitor(
|
_, err = r.MonitorHostMaintenance(
|
||||||
hosts,
|
hosts,
|
||||||
[]aurora.MaintenanceMode{aurora.MaintenanceMode_NONE},
|
[]aurora.MaintenanceMode{aurora.MaintenanceMode_NONE},
|
||||||
5*time.Second,
|
5*time.Second,
|
||||||
|
@ -627,7 +627,7 @@ func TestRealisClient_SessionThreadSafety(t *testing.T) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
// Test Schedule status monitor for terminal state and timing out after 30 seconds
|
// Test Schedule status monitor for terminal state and timing out after 30 seconds
|
||||||
success, err := r.ScheduleStatusMonitor(job.JobKey(), job.GetInstanceCount(), aurora.LIVE_STATES, 1, 30)
|
success, err := r.MonitorScheduleStatus(job.JobKey(), job.GetInstanceCount(), aurora.LIVE_STATES, 1, 30)
|
||||||
assert.False(t, success)
|
assert.False(t, success)
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
|
|
||||||
|
@ -710,7 +710,7 @@ func TestRealisClient_PartitionPolicy(t *testing.T) {
|
||||||
var ok bool
|
var ok bool
|
||||||
var mErr error
|
var mErr error
|
||||||
|
|
||||||
if ok, mErr = r.JobUpdateMonitor(*result.GetKey(), 5*time.Second, 4*time.Minute); !ok || mErr != nil {
|
if ok, mErr = r.MonitorJobUpdate(*result.GetKey(), 5*time.Second, 4*time.Minute); !ok || mErr != nil {
|
||||||
// Update may already be in a terminal state so don't check for error
|
// Update may already be in a terminal state so don't check for error
|
||||||
err := r.AbortJobUpdate(*result.GetKey(), "Monitor timed out.")
|
err := r.AbortJobUpdate(*result.GetKey(), "Monitor timed out.")
|
||||||
err = r.KillJob(job.JobKey())
|
err = r.KillJob(job.JobKey())
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue