From 1c426dd363ce7a018b55f90cdc1368acaaa6f52a Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Sun, 7 Jan 2018 13:30:02 -0800 Subject: [PATCH] Changing the drain monitor to match the rest of the monitors using timer and ticker. Made a generic schedule status monitor that can be used with any of the default sets provided. (#49) --- monitors.go | 96 ++++++++++++++++++++++++---------------------- realis_e2e_test.go | 9 ++++- 2 files changed, 57 insertions(+), 48 deletions(-) diff --git a/monitors.go b/monitors.go index 75afff9..1799658 100644 --- a/monitors.go +++ b/monitors.go @@ -46,7 +46,7 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout defer timer.Stop() var cliErr error var respDetail *aurora.Response - timedout := false + for { select { case <-ticker.C: @@ -82,49 +82,49 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout } } case <-timer.C: - timedout = true - } - if timedout { - break + return false, errors.New(Timeout) } } - return false, errors.New(Timeout) } -func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval int, timeout int) (bool, error) { +// Monitor a Job until all instances enter one of the LIVE_STATES +func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval, timeout int) (bool, error) { + return m.ScheduleStatus(key, instances, aurora.LIVE_STATES, interval, timeout) +} + +// Monitor a Job until all instances enter a desired status. +// Defaults sets of desired statuses provided by the thrift API include: +// ACTIVE_STATES, SLAVE_ASSIGNED_STATES, LIVE_STATES, and TERMINAL_STATES +func (m *Monitor) ScheduleStatus(key *aurora.JobKey, instanceCount int32, desiredStatuses map[aurora.ScheduleStatus]bool, interval, timeout int) (bool, error) { - var cliErr error - var live map[int32]bool ticker := time.NewTicker(time.Second * time.Duration(interval)) defer ticker.Stop() timer := time.NewTimer(time.Second * time.Duration(timeout)) defer timer.Stop() - timedout := false for { select { case <-ticker.C: - live, cliErr = m.Client.GetInstanceIds(key, aurora.LIVE_STATES) + // Query Aurora for the state of the job key ever interval + instCount, cliErr := m.Client.GetInstanceIds(key, desiredStatuses) if cliErr != nil { return false, errors.Wrap(cliErr, "Unable to communicate with Aurora") } - if len(live) == int(instances) { + if len(instCount) == int(instanceCount) { return true, nil } case <-timer.C: - timedout = true - } - if timedout { - break + + // If the timer runs out, return a timeout error to user + return false, errors.New(Timeout) } } - return false, errors.New(Timeout) } // Monitor host status until all hosts match the status provided. Returns a map where the value is true if the host // is in one of the desired mode(s) or false if it is not as of the time when the monitor exited. -func (m *Monitor) HostMaintenance(hosts []string, modes []aurora.MaintenanceMode, sleepTime, steps int) (map[string]bool, error) { +func (m *Monitor) HostMaintenance(hosts []string, modes []aurora.MaintenanceMode, interval, timeout int) (map[string]bool, error) { // Transform modes to monitor for into a set for easy lookup desiredMode := make(map[aurora.MaintenanceMode]struct{}) @@ -142,38 +142,42 @@ func (m *Monitor) HostMaintenance(hosts []string, modes []aurora.MaintenanceMode hostResult := make(map[string]bool) - for step := 0; step < steps; step++ { - if step != 0 { - time.Sleep(time.Duration(sleepTime) * time.Second) - } + ticker := time.NewTicker(time.Second * time.Duration(interval)) + defer ticker.Stop() + timer := time.NewTimer(time.Second * time.Duration(timeout)) + defer timer.Stop() - // Client call has multiple retries internally - _, result, err := m.Client.MaintenanceStatus(hosts...) - if err != nil { - // Error is either a payload error or a severe connection error + for { + select { + case <-ticker.C: + // Client call has multiple retries internally + _, result, err := m.Client.MaintenanceStatus(hosts...) + if err != nil { + // Error is either a payload error or a severe connection error + for host := range remainingHosts { + hostResult[host] = false + } + return hostResult, errors.Wrap(err, "client error in monitor") + } + + for status := range result.GetStatuses() { + + if _, ok := desiredMode[status.GetMode()]; ok { + hostResult[status.GetHost()] = true + delete(remainingHosts, status.GetHost()) + + if len(remainingHosts) == 0 { + return hostResult, nil + } + } + } + + case <-timer.C: for host := range remainingHosts { hostResult[host] = false } - return hostResult, errors.Wrap(err, "client error in monitor") + + return hostResult, errors.New(Timeout) } - - for status := range result.GetStatuses() { - - if _, ok := desiredMode[status.GetMode()]; ok { - hostResult[status.GetHost()] = true - delete(remainingHosts, status.GetHost()) - - if len(remainingHosts) == 0 { - return hostResult, nil - } - } - } - } - - for host := range remainingHosts { - hostResult[host] = false - } - - return hostResult, errors.New(Timeout) } diff --git a/realis_e2e_test.go b/realis_e2e_test.go index a5cda02..735a8b6 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -94,6 +94,11 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) { assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode) fmt.Printf("Create call took %d ns\n", (end.UnixNano() - start.UnixNano())) + // Test Instances Monitor + success, err := monitor.Instances(job.JobKey(), job.GetInstanceCount(), 1, 50) + assert.True(t, success) + assert.NoError(t, err) + // Tasks must exist for it to be killed t.Run("TestRealisClient_KillJob_Thermos", func(t *testing.T) { start := time.Now() @@ -166,8 +171,8 @@ func TestRealisClient_DrainHosts(t *testing.T) { hostResults, err := monitor.HostMaintenance( hosts, []aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING}, - 5, - 10) + 1, + 50) assert.Equal(t, map[string]bool{"192.168.33.7": true}, hostResults) assert.NoError(t, err)