Changing the drain monitor to match the rest of the monitors using timer and ticker. Made a generic schedule status monitor that can be used with any of the default sets provided. (#49)

This commit is contained in:
Renan DelValle 2018-01-07 13:30:02 -08:00 committed by GitHub
parent 8d445c1c77
commit 1c426dd363
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 57 additions and 48 deletions

View file

@ -46,7 +46,7 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout
defer timer.Stop()
var cliErr error
var respDetail *aurora.Response
timedout := false
for {
select {
case <-ticker.C:
@ -82,49 +82,49 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout
}
}
case <-timer.C:
timedout = true
}
if timedout {
break
return false, errors.New(Timeout)
}
}
return false, errors.New(Timeout)
}
func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval int, timeout int) (bool, error) {
// Monitor a Job until all instances enter one of the LIVE_STATES
func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval, timeout int) (bool, error) {
return m.ScheduleStatus(key, instances, aurora.LIVE_STATES, interval, timeout)
}
// Monitor a Job until all instances enter a desired status.
// Defaults sets of desired statuses provided by the thrift API include:
// ACTIVE_STATES, SLAVE_ASSIGNED_STATES, LIVE_STATES, and TERMINAL_STATES
func (m *Monitor) ScheduleStatus(key *aurora.JobKey, instanceCount int32, desiredStatuses map[aurora.ScheduleStatus]bool, interval, timeout int) (bool, error) {
var cliErr error
var live map[int32]bool
ticker := time.NewTicker(time.Second * time.Duration(interval))
defer ticker.Stop()
timer := time.NewTimer(time.Second * time.Duration(timeout))
defer timer.Stop()
timedout := false
for {
select {
case <-ticker.C:
live, cliErr = m.Client.GetInstanceIds(key, aurora.LIVE_STATES)
// Query Aurora for the state of the job key ever interval
instCount, cliErr := m.Client.GetInstanceIds(key, desiredStatuses)
if cliErr != nil {
return false, errors.Wrap(cliErr, "Unable to communicate with Aurora")
}
if len(live) == int(instances) {
if len(instCount) == int(instanceCount) {
return true, nil
}
case <-timer.C:
timedout = true
}
if timedout {
break
// If the timer runs out, return a timeout error to user
return false, errors.New(Timeout)
}
}
return false, errors.New(Timeout)
}
// Monitor host status until all hosts match the status provided. Returns a map where the value is true if the host
// is in one of the desired mode(s) or false if it is not as of the time when the monitor exited.
func (m *Monitor) HostMaintenance(hosts []string, modes []aurora.MaintenanceMode, sleepTime, steps int) (map[string]bool, error) {
func (m *Monitor) HostMaintenance(hosts []string, modes []aurora.MaintenanceMode, interval, timeout int) (map[string]bool, error) {
// Transform modes to monitor for into a set for easy lookup
desiredMode := make(map[aurora.MaintenanceMode]struct{})
@ -142,38 +142,42 @@ func (m *Monitor) HostMaintenance(hosts []string, modes []aurora.MaintenanceMode
hostResult := make(map[string]bool)
for step := 0; step < steps; step++ {
if step != 0 {
time.Sleep(time.Duration(sleepTime) * time.Second)
}
ticker := time.NewTicker(time.Second * time.Duration(interval))
defer ticker.Stop()
timer := time.NewTimer(time.Second * time.Duration(timeout))
defer timer.Stop()
// Client call has multiple retries internally
_, result, err := m.Client.MaintenanceStatus(hosts...)
if err != nil {
// Error is either a payload error or a severe connection error
for {
select {
case <-ticker.C:
// Client call has multiple retries internally
_, result, err := m.Client.MaintenanceStatus(hosts...)
if err != nil {
// Error is either a payload error or a severe connection error
for host := range remainingHosts {
hostResult[host] = false
}
return hostResult, errors.Wrap(err, "client error in monitor")
}
for status := range result.GetStatuses() {
if _, ok := desiredMode[status.GetMode()]; ok {
hostResult[status.GetHost()] = true
delete(remainingHosts, status.GetHost())
if len(remainingHosts) == 0 {
return hostResult, nil
}
}
}
case <-timer.C:
for host := range remainingHosts {
hostResult[host] = false
}
return hostResult, errors.Wrap(err, "client error in monitor")
return hostResult, errors.New(Timeout)
}
for status := range result.GetStatuses() {
if _, ok := desiredMode[status.GetMode()]; ok {
hostResult[status.GetHost()] = true
delete(remainingHosts, status.GetHost())
if len(remainingHosts) == 0 {
return hostResult, nil
}
}
}
}
for host := range remainingHosts {
hostResult[host] = false
}
return hostResult, errors.New(Timeout)
}

View file

@ -94,6 +94,11 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) {
assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode)
fmt.Printf("Create call took %d ns\n", (end.UnixNano() - start.UnixNano()))
// Test Instances Monitor
success, err := monitor.Instances(job.JobKey(), job.GetInstanceCount(), 1, 50)
assert.True(t, success)
assert.NoError(t, err)
// Tasks must exist for it to be killed
t.Run("TestRealisClient_KillJob_Thermos", func(t *testing.T) {
start := time.Now()
@ -166,8 +171,8 @@ func TestRealisClient_DrainHosts(t *testing.T) {
hostResults, err := monitor.HostMaintenance(
hosts,
[]aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING},
5,
10)
1,
50)
assert.Equal(t, map[string]bool{"192.168.33.7": true}, hostResults)
assert.NoError(t, err)