From 8fe3780949254e337da25f2f13456304bee09310 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Wed, 27 Sep 2017 12:55:50 -0700 Subject: [PATCH 01/13] Added end maintenance API which allows DRAINED hosts to be transitioned to ACTIVE. Fixed bug where payload error would never be returned if call failed due to a bad payload. --- examples/client.go | 22 +++++++++++--- realis.go | 75 ++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 91 insertions(+), 6 deletions(-) diff --git a/examples/client.go b/examples/client.go index 5d7a01f..86972b1 100644 --- a/examples/client.go +++ b/examples/client.go @@ -28,7 +28,7 @@ import ( "strings" ) -var cmd, executor, url, clustersConfig, clusterName, updateId, username, password, zkUrl, drainCandidates string +var cmd, executor, url, clustersConfig, clusterName, updateId, username, password, zkUrl, hostList string var CONNECTION_TIMEOUT = 20000 @@ -42,7 +42,7 @@ func init() { flag.StringVar(&username, "username", "aurora", "Username to use for authorization") flag.StringVar(&password, "password", "secret", "Password to use for authorization") flag.StringVar(&zkUrl, "zkurl", "", "zookeeper url") - flag.StringVar(&drainCandidates, "drainCandidates", "", "Comma separated list of candidate hosts to drain") + flag.StringVar(&hostList, "hostList", "", "Comma separated list of hosts to operate on") flag.Parse() } @@ -501,11 +501,11 @@ func main() { case "drainHosts": fmt.Println("Setting hosts to DRAINING") - if drainCandidates == "" { + if hostList == "" { fmt.Println("No hosts specified to drain") os.Exit(1) } - hosts := strings.Split(drainCandidates, ",") + hosts := strings.Split(hostList, ",") _, result, err := r.DrainHosts(hosts...) if err != nil { fmt.Printf("error: %+v\n", err.Error()) @@ -513,6 +513,20 @@ func main() { } fmt.Print(result.String()) + case "endMaintenance": + fmt.Println("Setting hosts to ACTIVE") + if hostList == "" { + fmt.Println("No hosts specified to drain") + os.Exit(1) + } + hosts := strings.Split(hostList, ",") + _, result, err := r.EndMaintenance(hosts...) + if err != nil { + fmt.Printf("error: %+v\n", err.Error()) + os.Exit(1) + } + fmt.Print(result.String()) + default: fmt.Println("Command not supported") os.Exit(1) diff --git a/realis.go b/realis.go index 37fb65a..d9a4c4d 100644 --- a/realis.go +++ b/realis.go @@ -59,6 +59,7 @@ type Realis interface { // Admin functions DrainHosts(hosts ...string) (*aurora.Response, *aurora.DrainHostsResult_, error) + EndMaintenance(hosts ...string) (*aurora.Response, *aurora.EndMaintenanceResult_, error) } type realisClient struct { @@ -1164,7 +1165,7 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr var resp *aurora.Response var result *aurora.DrainHostsResult_ - var clientErr, payloadErr error + var returnErr, clientErr, payloadErr error if len(hosts) == 0 { return nil, nil, errors.New("no hosts provided to drain") @@ -1206,11 +1207,81 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr result = resp.GetResult_().GetDrainHostsResult_() } + // Prioritize returning a bad payload error over a client error as a bad payload error indicates + // a deeper issue + if payloadErr != nil { + returnErr = payloadErr + } else { + returnErr = clientErr + } // Timed out on retries. *Note that when we fix the unexpected errors with a correct payload, // this will can become either a timeout error or a payload error if retryErr != nil { - return resp, result, errors.Wrap(clientErr, "Unable to recover connection") + return resp, result, errors.Wrap(returnErr, "Unable to recover connection") + } + + return resp, result, nil +} + +func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *aurora.EndMaintenanceResult_, error) { + + var resp *aurora.Response + var result *aurora.EndMaintenanceResult_ + var returnErr, clientErr, payloadErr error + + if len(hosts) == 0 { + return nil, nil, errors.New("no hosts provided to drain") + } + + hostList := aurora.NewHosts() + hostList.HostNames = make(map[string]bool) + for _, host := range hosts { + hostList.HostNames[host] = true + } + + retryErr := ExponentialBackoff(defaultBackoff, func() (bool, error) { + + // Send thrift call, if we have a thrift send error, attempt to reconnect + // and continue trying to resend command + if resp, clientErr = r.adminClient.EndMaintenance(hostList); clientErr != nil { + // Experienced an connection error + err1 := r.ReestablishConn() + if err1 != nil { + fmt.Println("error in re-establishing connection: ", err1) + } + return false, nil + } + + // If error is NOT due to connection + if _, payloadErr = response.ResponseCodeCheck(resp); payloadErr != nil { + // TODO(rdelvalle): an leader election may cause the response to have + // failed when it should have succeeded. Retry everything for now until + // we figure out a more concrete fix. + return false, nil + } + + // Successful call + return true, nil + + }) + + if resp != nil && resp.GetResult_() != nil { + result = resp.GetResult_().GetEndMaintenanceResult_() + } + + // Prioritize returning a bad payload error over a client error as a bad payload error indicates + // a deeper issue + if payloadErr != nil { + returnErr = payloadErr + } else { + returnErr = clientErr + } + + // Timed out on retries. *Note that when we fix the unexpected errors with a correct payload, + // this will can become either a timeout error or a payload error + if retryErr != nil { + return resp, result, errors.Wrap(returnErr, "Unable to recover connection") } return resp, result, nil From c03e8bf79cdd40d88fe07c851ee2b4a4b56e4507 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Thu, 28 Sep 2017 16:32:17 -0700 Subject: [PATCH 02/13] Added Maintenance status API --- realis.go | 67 ++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 66 insertions(+), 1 deletion(-) diff --git a/realis.go b/realis.go index d9a4c4d..88a695e 100644 --- a/realis.go +++ b/realis.go @@ -60,6 +60,7 @@ type Realis interface { // Admin functions DrainHosts(hosts ...string) (*aurora.Response, *aurora.DrainHostsResult_, error) EndMaintenance(hosts ...string) (*aurora.Response, *aurora.EndMaintenanceResult_, error) + MaintenanceStatus(hosts ...string) (*aurora.Response, *aurora.MaintenanceStatusResult_, error) } type realisClient struct { @@ -1231,7 +1232,7 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror var returnErr, clientErr, payloadErr error if len(hosts) == 0 { - return nil, nil, errors.New("no hosts provided to drain") + return nil, nil, errors.New("no hosts provided to end maintenance on") } hostList := aurora.NewHosts() @@ -1286,3 +1287,67 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror return resp, result, nil } + +func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *aurora.MaintenanceStatusResult_, error) { + + var resp *aurora.Response + var result *aurora.MaintenanceStatusResult_ + var returnErr, clientErr, payloadErr error + + if len(hosts) == 0 { + return nil, nil, errors.New("no hosts provided to get maintenance status from") + } + + hostList := aurora.NewHosts() + hostList.HostNames = make(map[string]bool) + for _, host := range hosts { + hostList.HostNames[host] = true + } + + retryErr := ExponentialBackoff(defaultBackoff, func() (bool, error) { + + // Send thrift call, if we have a thrift send error, attempt to reconnect + // and continue trying to resend command + if resp, clientErr = r.adminClient.MaintenanceStatus(hostList); clientErr != nil { + // Experienced an connection error + err1 := r.ReestablishConn() + if err1 != nil { + fmt.Println("error in re-establishing connection: ", err1) + } + return false, nil + } + + // If error is NOT due to connection + if _, payloadErr = response.ResponseCodeCheck(resp); payloadErr != nil { + // TODO(rdelvalle): an leader election may cause the response to have + // failed when it should have succeeded. Retry everything for now until + // we figure out a more concrete fix. + return false, nil + } + + // Successful call + return true, nil + + }) + + if resp != nil && resp.GetResult_() != nil { + result = resp.GetResult_().GetMaintenanceStatusResult_() + } + + // Prioritize returning a bad payload error over a client error as a bad payload error indicates + // a deeper issue + if payloadErr != nil { + returnErr = payloadErr + } else { + returnErr = clientErr + } + + // Timed out on retries. *Note that when we fix the unexpected errors with a correct payload, + // this will can become either a timeout error or a payload error + if retryErr != nil { + return resp, result, errors.Wrap(returnErr, "Unable to recover connection") + } + + return resp, result, nil + +} From dc6848f8046358fde7d1ec6359a72f7abd0c1861 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Thu, 28 Sep 2017 16:35:24 -0700 Subject: [PATCH 03/13] Host Maintenance monitor which allows to block until all hosts in a list have entered of the states in a provided set --- monitors.go | 50 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/monitors.go b/monitors.go index caf98d0..69285c0 100644 --- a/monitors.go +++ b/monitors.go @@ -152,3 +152,53 @@ func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval int, t fmt.Println("Timed out") return false, nil } + +// Monitor host status until all hosts match the status provided +func (m *Monitor) HostMaintenance(hosts []string, modes []aurora.MaintenanceMode, sleepTime, steps int) (bool, error) { + + // Transform modes into a look up table + desiredMode := make(map[aurora.MaintenanceMode]struct{}) + for _,mode := range modes { + desiredMode[mode] = struct{}{} + } + hostMode := make(map[string]bool) + + // Initial map has all hosts we're looking for. + // For each node we find in the correct mode, eliminate it from the map. If we reach 0 elements in the map, + // we found all hosts we we're monitoring. This avoids having to go through and check the list one by one each cycle. + for _,host := range hosts { + hostMode[host] = true + } + + fmt.Println("mode map and hosts have the same number of elements: ", len(hostMode) == len(hosts)) + + for step := 0; step < steps; step++ { + + // Client may have multiple retries handle retries + _, result, err := m.Client.MaintenanceStatus(hosts...) + if err != nil { + // Error is either a payload error or a severe connection error + return false, errors.Wrap(err,"client error") + } + + for stat := range result.GetStatuses() { + if _, ok := desiredMode[stat.GetMode()]; ok { + fmt.Printf("host %s\n", stat.GetHost()) + fmt.Println(hostMode) + delete(hostMode, stat.GetHost()) + } + } + + if len(hostMode) == 0 { + fmt.Println("Provided hosts have all entered the desired state(s)") + return true, nil + } else { + fmt.Printf("%d host(s) not in desired state\n", len(hostMode)) + } + + time.Sleep(time.Duration(sleepTime) * time.Second) + } + + return false, errors.New("Timed out") + +} From 8334dde12fbc12878f7cf463f13c6ef421c59f6a Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Thu, 28 Sep 2017 16:50:46 -0700 Subject: [PATCH 04/13] Sample client now blocks until all hosts entered desired state. Cleaned up host maintenance monitor. --- examples/client.go | 23 +++++++++++++++++++++++ monitors.go | 11 ++++------- 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/examples/client.go b/examples/client.go index 86972b1..8e173ca 100644 --- a/examples/client.go +++ b/examples/client.go @@ -511,6 +511,18 @@ func main() { fmt.Printf("error: %+v\n", err.Error()) os.Exit(1) } + + // Monitor change to DRAINING and DRAINED mode + _, err = monitor.HostMaintenance( + hosts, + []aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING}, + 5, + 10) + if err != nil { + fmt.Printf("error: %+v\n", err.Error()) + os.Exit(1) + } + fmt.Print(result.String()) case "endMaintenance": @@ -525,6 +537,17 @@ func main() { fmt.Printf("error: %+v\n", err.Error()) os.Exit(1) } + + // Monitor change to DRAINING and DRAINED mode + _, err = monitor.HostMaintenance( + hosts, + []aurora.MaintenanceMode{aurora.MaintenanceMode_NONE}, + 5, + 10) + if err != nil { + fmt.Printf("error: %+v\n", err.Error()) + os.Exit(1) + } fmt.Print(result.String()) default: diff --git a/monitors.go b/monitors.go index 69285c0..36caf2e 100644 --- a/monitors.go +++ b/monitors.go @@ -161,17 +161,15 @@ func (m *Monitor) HostMaintenance(hosts []string, modes []aurora.MaintenanceMode for _,mode := range modes { desiredMode[mode] = struct{}{} } - hostMode := make(map[string]bool) // Initial map has all hosts we're looking for. // For each node we find in the correct mode, eliminate it from the map. If we reach 0 elements in the map, // we found all hosts we we're monitoring. This avoids having to go through and check the list one by one each cycle. + hostMode := make(map[string]struct{}) for _,host := range hosts { - hostMode[host] = true + hostMode[host] = struct{}{} } - fmt.Println("mode map and hosts have the same number of elements: ", len(hostMode) == len(hosts)) - for step := 0; step < steps; step++ { // Client may have multiple retries handle retries @@ -183,14 +181,13 @@ func (m *Monitor) HostMaintenance(hosts []string, modes []aurora.MaintenanceMode for stat := range result.GetStatuses() { if _, ok := desiredMode[stat.GetMode()]; ok { - fmt.Printf("host %s\n", stat.GetHost()) - fmt.Println(hostMode) + fmt.Printf("host %s entered %s state\n", stat.GetHost(), stat.GetMode()) delete(hostMode, stat.GetHost()) } } if len(hostMode) == 0 { - fmt.Println("Provided hosts have all entered the desired state(s)") + fmt.Println("Provided hosts have all entered desired state(s)") return true, nil } else { fmt.Printf("%d host(s) not in desired state\n", len(hostMode)) From bf354bcc0aab819415704472d24e9f39a47a68d1 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Thu, 28 Sep 2017 17:14:55 -0700 Subject: [PATCH 05/13] Fixed bug with NewDefaultClientUsingUrl where backoff was not initialized leading to nil pointer error. --- realis.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/realis.go b/realis.go index 88a695e..6dbd94d 100644 --- a/realis.go +++ b/realis.go @@ -348,7 +348,9 @@ func NewDefaultClientUsingUrl(url, user, passwd string) (Realis, error) { config.cluster = nil // Configured for vagrant AddBasicAuth(config, user, passwd) + config.backoff = &Backoff{Steps: 2, Duration: 10 * time.Second, Factor: 2.0, Jitter: 0.1} r := newClient(config) + return r, nil } From 7db2395df104ae9c3c285ac491c570847e7b9e42 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Thu, 28 Sep 2017 17:36:41 -0700 Subject: [PATCH 06/13] Changed from the old style of creating clients to the new clojure pattern. --- realis_e2e_test.go | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/realis_e2e_test.go b/realis_e2e_test.go index 465e085..c2caaee 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -12,11 +12,12 @@ * limitations under the License. */ -package realis +package realis_test import ( "fmt" "github.com/rdelval/gorealis/gen-go/apache/aurora" + "github.com/rdelval/gorealis" "github.com/stretchr/testify/assert" "io/ioutil" "os" @@ -24,15 +25,19 @@ import ( "time" ) -var r Realis +var r realis.Realis var thermosPayload []byte func TestMain(m *testing.M) { var err error // New configuration to connect to Vagrant image - r, err = NewDefaultClientUsingUrl("http://192.168.33.7:8081","aurora", "secret") - if err != nil { + r, err = realis.NewRealisClient(realis.SchedulerUrl("http://192.168.33.7:8081"), + realis.BasicAuth("aurora", "secret"), + realis.ThriftJSON(), + realis.TimeoutMS(20000), + realis.BackOff(&realis.Backoff{Steps: 2, Duration: 10 * time.Second, Factor: 2.0, Jitter: 0.1})) + if err != nil { fmt.Println("Please run vagrant box before running test suite") os.Exit(1) } @@ -48,10 +53,10 @@ func TestMain(m *testing.M) { func TestRealisClient_CreateJob_Thermos(t *testing.T) { - job := NewJob(). + job := realis.NewJob(). Environment("prod"). Role("vagrant"). - Name("create_job_test"). + Name("create_thermos_job_test"). ExecutorName(aurora.AURORA_EXECUTOR_NAME). ExecutorData(string(thermosPayload)). CPU(1). @@ -95,7 +100,7 @@ func TestRealisClient_ScheduleCronJob_Thermos(t *testing.T) { os.Exit(1) } - job := NewJob(). + job := realis.NewJob(). Environment("prod"). Role("vagrant"). Name("cronsched_job_test"). From 430764f02598d1e3270bc730b782c20f7578e2a1 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Thu, 28 Sep 2017 17:49:15 -0700 Subject: [PATCH 07/13] Added tests for draining. run go test with a aurora vagrant image running to test. --- monitors.go | 1 - realis_e2e_test.go | 55 +++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 49 insertions(+), 7 deletions(-) diff --git a/monitors.go b/monitors.go index 36caf2e..73a4e48 100644 --- a/monitors.go +++ b/monitors.go @@ -187,7 +187,6 @@ func (m *Monitor) HostMaintenance(hosts []string, modes []aurora.MaintenanceMode } if len(hostMode) == 0 { - fmt.Println("Provided hosts have all entered desired state(s)") return true, nil } else { fmt.Printf("%d host(s) not in desired state\n", len(hostMode)) diff --git a/realis_e2e_test.go b/realis_e2e_test.go index c2caaee..89470e9 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -16,8 +16,8 @@ package realis_test import ( "fmt" - "github.com/rdelval/gorealis/gen-go/apache/aurora" "github.com/rdelval/gorealis" + "github.com/rdelval/gorealis/gen-go/apache/aurora" "github.com/stretchr/testify/assert" "io/ioutil" "os" @@ -26,6 +26,7 @@ import ( ) var r realis.Realis +var monitor *realis.Monitor var thermosPayload []byte func TestMain(m *testing.M) { @@ -37,11 +38,14 @@ func TestMain(m *testing.M) { realis.ThriftJSON(), realis.TimeoutMS(20000), realis.BackOff(&realis.Backoff{Steps: 2, Duration: 10 * time.Second, Factor: 2.0, Jitter: 0.1})) - if err != nil { + if err != nil { fmt.Println("Please run vagrant box before running test suite") os.Exit(1) } + // Create monitor + monitor = &realis.Monitor{r} + thermosPayload, err = ioutil.ReadFile("examples/thermos_payload.json") if err != nil { fmt.Println("Error reading thermos payload file: ", err) @@ -75,7 +79,7 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) { } assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode) - fmt.Printf("Create call took %d ns\n", (end.UnixNano()- start.UnixNano())) + fmt.Printf("Create call took %d ns\n", (end.UnixNano() - start.UnixNano())) // Tasks must exist for it to be killed t.Run("TestRealisClient_KillJob_Thermos", func(t *testing.T) { @@ -88,7 +92,7 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) { } assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode) - fmt.Printf("Kill call took %d ns\n", (end.UnixNano()- start.UnixNano())) + fmt.Printf("Kill call took %d ns\n", (end.UnixNano() - start.UnixNano())) }) } @@ -132,7 +136,7 @@ func TestRealisClient_ScheduleCronJob_Thermos(t *testing.T) { os.Exit(1) } assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode) - fmt.Printf("Schedule cron call took %d ns\n", (end.UnixNano()- start.UnixNano())) + fmt.Printf("Schedule cron call took %d ns\n", (end.UnixNano() - start.UnixNano())) }) t.Run("TestRealisClient_DeschedulerCronJob_Thermos", func(t *testing.T) { @@ -145,6 +149,45 @@ func TestRealisClient_ScheduleCronJob_Thermos(t *testing.T) { } assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode) - fmt.Printf("Deschedule cron call took %d ns\n", (end.UnixNano()- start.UnixNano())) + fmt.Printf("Deschedule cron call took %d ns\n", (end.UnixNano() - start.UnixNano())) }) } +func TestRealisClient_DrainHosts(t *testing.T) { + hosts := []string{"192.168.33.7"} + _, _ , err := r.DrainHosts(hosts...) + if err != nil { + fmt.Printf("error: %+v\n", err.Error()) + os.Exit(1) + } + + // Monitor change to DRAINING and DRAINED mode + _, err = monitor.HostMaintenance( + hosts, + []aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING}, + 5, + 10) + if err != nil { + fmt.Printf("error: %+v\n", err.Error()) + os.Exit(1) + } + + t.Run("TestRealisClient_EndMaintenance", func(t *testing.T) { + _, _ , err := r.EndMaintenance(hosts...) + if err != nil { + fmt.Printf("error: %+v\n", err.Error()) + os.Exit(1) + } + + // Monitor change to DRAINING and DRAINED mode + _, err = monitor.HostMaintenance( + hosts, + []aurora.MaintenanceMode{aurora.MaintenanceMode_NONE}, + 5, + 10) + if err != nil { + fmt.Printf("error: %+v\n", err.Error()) + os.Exit(1) + } + }) + +} From 3111b358fcb3cd975f8eac66beb690a6f2f57a56 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Fri, 29 Sep 2017 18:21:30 -0700 Subject: [PATCH 08/13] Host Maintenance monitor now returns a list of hosts that did enter the desired mode(s) instead of a boolean. This means the monitor can see a partial success. --- examples/client.go | 22 ++++++++++++++++++++-- monitors.go | 37 ++++++++++++++++++------------------- realis_e2e_test.go | 25 ++++++++++++++++++------- 3 files changed, 56 insertions(+), 28 deletions(-) diff --git a/examples/client.go b/examples/client.go index 8e173ca..b6f9df5 100644 --- a/examples/client.go +++ b/examples/client.go @@ -513,12 +513,21 @@ func main() { } // Monitor change to DRAINING and DRAINED mode - _, err = monitor.HostMaintenance( + nontransitioned, err := monitor.HostMaintenance( hosts, []aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING}, 5, 10) if err != nil { + + // Check whether the call was partially successful + if len(nontransitioned) != 0 { + fmt.Println("Partial success:") + for host, _ := range nontransitioned { + fmt.Printf("Host %s did not transtion into desired mode(s)\n", host) + } + } + fmt.Printf("error: %+v\n", err.Error()) os.Exit(1) } @@ -539,15 +548,24 @@ func main() { } // Monitor change to DRAINING and DRAINED mode - _, err = monitor.HostMaintenance( + nontransitioned, err := monitor.HostMaintenance( hosts, []aurora.MaintenanceMode{aurora.MaintenanceMode_NONE}, 5, 10) if err != nil { + // Check whether the call was partially successful + if len(nontransitioned) != 0 { + fmt.Println("Partial success:") + for host, _ := range nontransitioned { + fmt.Printf("Host %s did not transtion into desired mode(s)\n", host) + } + } + fmt.Printf("error: %+v\n", err.Error()) os.Exit(1) } + fmt.Print(result.String()) default: diff --git a/monitors.go b/monitors.go index 73a4e48..6aab49b 100644 --- a/monitors.go +++ b/monitors.go @@ -153,48 +153,47 @@ func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval int, t return false, nil } -// Monitor host status until all hosts match the status provided -func (m *Monitor) HostMaintenance(hosts []string, modes []aurora.MaintenanceMode, sleepTime, steps int) (bool, error) { +// Monitor host status until all hosts match the status provided. May return an error along with a non nil map which contains +// the hosts that did not transition to the desired modes(s). +func (m *Monitor) HostMaintenance(hosts []string, modes []aurora.MaintenanceMode, sleepTime, steps int) (map[string]struct{}, error) { - // Transform modes into a look up table + // Transform modes to monitor for into a set for easy lookup desiredMode := make(map[aurora.MaintenanceMode]struct{}) for _,mode := range modes { desiredMode[mode] = struct{}{} } - // Initial map has all hosts we're looking for. - // For each node we find in the correct mode, eliminate it from the map. If we reach 0 elements in the map, - // we found all hosts we we're monitoring. This avoids having to go through and check the list one by one each cycle. - hostMode := make(map[string]struct{}) + // Turn slice into a host set to eliminate duplicates. Delete hosts that have entered the desired mode from + // observed list. We are done when the number of observed hosts reaches zero. + // This avoids having to go through and check the list one by one each cycle. + observedHosts := make(map[string]struct{}) for _,host := range hosts { - hostMode[host] = struct{}{} + observedHosts[host] = struct{}{} } for step := 0; step < steps; step++ { - // Client may have multiple retries handle retries _, result, err := m.Client.MaintenanceStatus(hosts...) if err != nil { // Error is either a payload error or a severe connection error - return false, errors.Wrap(err,"client error") + return observedHosts, errors.Wrap(err,"client error") } - for stat := range result.GetStatuses() { - if _, ok := desiredMode[stat.GetMode()]; ok { - fmt.Printf("host %s entered %s state\n", stat.GetHost(), stat.GetMode()) - delete(hostMode, stat.GetHost()) + for status := range result.GetStatuses() { + if _, ok := desiredMode[status.GetMode()]; ok { + fmt.Printf("host %s entered %s state\n", status.GetHost(), status.GetMode()) + delete(observedHosts, status.GetHost()) } } - if len(hostMode) == 0 { - return true, nil + if len(observedHosts) == 0{ + return observedHosts, nil } else { - fmt.Printf("%d host(s) not in desired state\n", len(hostMode)) + fmt.Printf("%d host(s) not in desired state\n", len(observedHosts)) } time.Sleep(time.Duration(sleepTime) * time.Second) } - return false, errors.New("Timed out") - + return observedHosts, errors.New("Timed out") } diff --git a/realis_e2e_test.go b/realis_e2e_test.go index 89470e9..8c73b67 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -154,25 +154,36 @@ func TestRealisClient_ScheduleCronJob_Thermos(t *testing.T) { } func TestRealisClient_DrainHosts(t *testing.T) { hosts := []string{"192.168.33.7"} - _, _ , err := r.DrainHosts(hosts...) + _, _, err := r.DrainHosts(hosts...) if err != nil { fmt.Printf("error: %+v\n", err.Error()) os.Exit(1) } // Monitor change to DRAINING and DRAINED mode - _, err = monitor.HostMaintenance( + nontransitioned, err := monitor.HostMaintenance( hosts, []aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING}, 5, 10) - if err != nil { - fmt.Printf("error: %+v\n", err.Error()) - os.Exit(1) - } + assert.Equal(t, nontransitioned, map[string]struct{}{}) + assert.NoError(t, err) + + t.Run("TestRealisClient_MonitorNontransitioned", func(t *testing.T) { + // Monitor change to DRAINING and DRAINED mode + nontransitioned, err := monitor.HostMaintenance( + append(hosts, "IMAGINARY_HOST"), + []aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING}, + 1, + 1) + + // Assert monitor returned an error that was not nil, and also a list of the non-transitioned hosts + assert.Error(t, err) + assert.Equal(t, nontransitioned, map[string]struct{}{"IMAGINARY_HOST": {}}) + }) t.Run("TestRealisClient_EndMaintenance", func(t *testing.T) { - _, _ , err := r.EndMaintenance(hosts...) + _, _, err := r.EndMaintenance(hosts...) if err != nil { fmt.Printf("error: %+v\n", err.Error()) os.Exit(1) From 922e8d6b5a78ee92a213fecfba506c4fcda1622e Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Mon, 2 Oct 2017 17:24:01 -0700 Subject: [PATCH 09/13] Changing HostMaintenance to return a map[string]bool where true indicates success, false indicates failure to transition to the desired state. --- monitors.go | 42 ++++++++++++++++++++++-------------------- realis_e2e_test.go | 39 +++++++++++---------------------------- 2 files changed, 33 insertions(+), 48 deletions(-) diff --git a/monitors.go b/monitors.go index 6aab49b..92ea117 100644 --- a/monitors.go +++ b/monitors.go @@ -153,22 +153,20 @@ func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval int, t return false, nil } -// Monitor host status until all hosts match the status provided. May return an error along with a non nil map which contains -// the hosts that did not transition to the desired modes(s). -func (m *Monitor) HostMaintenance(hosts []string, modes []aurora.MaintenanceMode, sleepTime, steps int) (map[string]struct{}, error) { +// Monitor host status until all hosts match the status provided. Returns a map where the value is true if the host +// is in one of the desired mode(s) or false if it is not. +func (m *Monitor) HostMaintenance(hosts []string, modes []aurora.MaintenanceMode, sleepTime, steps int) (map[string]bool, error) { // Transform modes to monitor for into a set for easy lookup desiredMode := make(map[aurora.MaintenanceMode]struct{}) - for _,mode := range modes { + for _, mode := range modes { desiredMode[mode] = struct{}{} } - // Turn slice into a host set to eliminate duplicates. Delete hosts that have entered the desired mode from - // observed list. We are done when the number of observed hosts reaches zero. - // This avoids having to go through and check the list one by one each cycle. - observedHosts := make(map[string]struct{}) - for _,host := range hosts { - observedHosts[host] = struct{}{} + // Turn slice into a host set to eliminate duplicates. + observedHosts := make(map[string]bool) + for _, host := range hosts { + observedHosts[host] = false } for step := 0; step < steps; step++ { @@ -176,20 +174,24 @@ func (m *Monitor) HostMaintenance(hosts []string, modes []aurora.MaintenanceMode _, result, err := m.Client.MaintenanceStatus(hosts...) if err != nil { // Error is either a payload error or a severe connection error - return observedHosts, errors.Wrap(err,"client error") + return observedHosts, errors.Wrap(err, "client error") } for status := range result.GetStatuses() { - if _, ok := desiredMode[status.GetMode()]; ok { - fmt.Printf("host %s entered %s state\n", status.GetHost(), status.GetMode()) - delete(observedHosts, status.GetHost()) - } - } + if _, ok := desiredMode[status.GetMode()]; ok { + observedHosts[status.GetHost()] = true - if len(observedHosts) == 0{ - return observedHosts, nil - } else { - fmt.Printf("%d host(s) not in desired state\n", len(observedHosts)) + transitionedHosts := 0 + for _, val := range observedHosts { + if val { + transitionedHosts++ + } + } + + if len(observedHosts) == transitionedHosts { + return observedHosts, nil + } + } } time.Sleep(time.Duration(sleepTime) * time.Second) diff --git a/realis_e2e_test.go b/realis_e2e_test.go index 8c73b67..c75f184 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -73,10 +73,7 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) { start := time.Now() resp, err := r.CreateJob(job) end := time.Now() - if err != nil { - fmt.Println(err) - os.Exit(1) - } + assert.NoError(t, err) assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode) fmt.Printf("Create call took %d ns\n", (end.UnixNano() - start.UnixNano())) @@ -86,10 +83,7 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) { start := time.Now() resp, err := r.KillJob(job.JobKey()) end := time.Now() - if err != nil { - fmt.Println(err) - os.Exit(1) - } + assert.NoError(t, err) assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode) fmt.Printf("Kill call took %d ns\n", (end.UnixNano() - start.UnixNano())) @@ -99,10 +93,7 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) { func TestRealisClient_ScheduleCronJob_Thermos(t *testing.T) { thermosCronPayload, err := ioutil.ReadFile("examples/thermos_cron_payload.json") - if err != nil { - fmt.Println("Error reading thermos payload file: ", err) - os.Exit(1) - } + assert.NoError(t, err) job := realis.NewJob(). Environment("prod"). @@ -131,10 +122,8 @@ func TestRealisClient_ScheduleCronJob_Thermos(t *testing.T) { start := time.Now() resp, err := r.StartCronJob(job.JobKey()) end := time.Now() - if err != nil { - fmt.Println(err) - os.Exit(1) - } + + assert.NoError(t, err) assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode) fmt.Printf("Schedule cron call took %d ns\n", (end.UnixNano() - start.UnixNano())) }) @@ -143,11 +132,8 @@ func TestRealisClient_ScheduleCronJob_Thermos(t *testing.T) { start := time.Now() resp, err := r.DescheduleCronJob(job.JobKey()) end := time.Now() - if err != nil { - fmt.Println(err) - os.Exit(1) - } + assert.NoError(t, err) assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode) fmt.Printf("Deschedule cron call took %d ns\n", (end.UnixNano() - start.UnixNano())) }) @@ -161,17 +147,17 @@ func TestRealisClient_DrainHosts(t *testing.T) { } // Monitor change to DRAINING and DRAINED mode - nontransitioned, err := monitor.HostMaintenance( + hostResults, err := monitor.HostMaintenance( hosts, []aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING}, 5, 10) - assert.Equal(t, nontransitioned, map[string]struct{}{}) + assert.Equal(t, map[string]bool{"192.168.33.7": true}, hostResults) assert.NoError(t, err) t.Run("TestRealisClient_MonitorNontransitioned", func(t *testing.T) { // Monitor change to DRAINING and DRAINED mode - nontransitioned, err := monitor.HostMaintenance( + hostResults, err := monitor.HostMaintenance( append(hosts, "IMAGINARY_HOST"), []aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING}, 1, @@ -179,7 +165,7 @@ func TestRealisClient_DrainHosts(t *testing.T) { // Assert monitor returned an error that was not nil, and also a list of the non-transitioned hosts assert.Error(t, err) - assert.Equal(t, nontransitioned, map[string]struct{}{"IMAGINARY_HOST": {}}) + assert.Equal(t, map[string]bool{"192.168.33.7": true, "IMAGINARY_HOST": false}, hostResults) }) t.Run("TestRealisClient_EndMaintenance", func(t *testing.T) { @@ -195,10 +181,7 @@ func TestRealisClient_DrainHosts(t *testing.T) { []aurora.MaintenanceMode{aurora.MaintenanceMode_NONE}, 5, 10) - if err != nil { - fmt.Printf("error: %+v\n", err.Error()) - os.Exit(1) - } + assert.NoError(t, err) }) } From fa7833a74969c60f223da1c07f71fd011ff81a96 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Wed, 4 Oct 2017 14:34:47 -0700 Subject: [PATCH 10/13] Updating client to reflect changes made on the Monitor's side --- examples/client.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/examples/client.go b/examples/client.go index b6f9df5..5719178 100644 --- a/examples/client.go +++ b/examples/client.go @@ -548,16 +548,15 @@ func main() { } // Monitor change to DRAINING and DRAINED mode - nontransitioned, err := monitor.HostMaintenance( + hostsResult, err := monitor.HostMaintenance( hosts, []aurora.MaintenanceMode{aurora.MaintenanceMode_NONE}, 5, 10) if err != nil { - // Check whether the call was partially successful - if len(nontransitioned) != 0 { - fmt.Println("Partial success:") - for host, _ := range nontransitioned { + fmt.Println("Partial success:") + for host, ok := range hostsResult { + if !ok { fmt.Printf("Host %s did not transtion into desired mode(s)\n", host) } } From 1fd07b5007ee7f73233b049a68a9ad59ac5e987e Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Wed, 4 Oct 2017 15:56:59 -0700 Subject: [PATCH 11/13] Avoided going through the entire list of monitored hosts by keeping a set of hosts that had transistioned to a desired mode. --- monitors.go | 43 ++++++++++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/monitors.go b/monitors.go index 92ea117..ffa524e 100644 --- a/monitors.go +++ b/monitors.go @@ -154,7 +154,7 @@ func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval int, t } // Monitor host status until all hosts match the status provided. Returns a map where the value is true if the host -// is in one of the desired mode(s) or false if it is not. +// is in one of the desired mode(s) or false if it is not as of the time when the monitor exited. func (m *Monitor) HostMaintenance(hosts []string, modes []aurora.MaintenanceMode, sleepTime, steps int) (map[string]bool, error) { // Transform modes to monitor for into a set for easy lookup @@ -164,38 +164,47 @@ func (m *Monitor) HostMaintenance(hosts []string, modes []aurora.MaintenanceMode } // Turn slice into a host set to eliminate duplicates. - observedHosts := make(map[string]bool) + // We also can't use a simple count because multiple modes means we can have multiple matches for a single host. + // I.e. host A transitions from ACTIVE to DRAINING to DRAINED while monitored + remainingHosts := make(map[string]struct{}) for _, host := range hosts { - observedHosts[host] = false + remainingHosts[host] = struct{}{} } + hostResult := make(map[string]bool) + for step := 0; step < steps; step++ { - // Client may have multiple retries handle retries + if step != 0 { + time.Sleep(time.Duration(sleepTime) * time.Second) + } + + // Client call has multiple retries internally _, result, err := m.Client.MaintenanceStatus(hosts...) if err != nil { // Error is either a payload error or a severe connection error - return observedHosts, errors.Wrap(err, "client error") + for host := range remainingHosts { + hostResult[host] = false + } + return hostResult, errors.Wrap(err, "client error in monitor") } for status := range result.GetStatuses() { + if _, ok := desiredMode[status.GetMode()]; ok { - observedHosts[status.GetHost()] = true + hostResult[status.GetHost()] = true + delete(remainingHosts, status.GetHost()) - transitionedHosts := 0 - for _, val := range observedHosts { - if val { - transitionedHosts++ - } - } - - if len(observedHosts) == transitionedHosts { - return observedHosts, nil + if len(remainingHosts) == 0 { + return hostResult, nil } } } - time.Sleep(time.Duration(sleepTime) * time.Second) } - return observedHosts, errors.New("Timed out") + for host := range remainingHosts { + hostResult[host] = false + } + + return hostResult, errors.New("Timed out") } From bd008dbb39456f8bb2ae66001784d9a3922a9e5b Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Wed, 4 Oct 2017 17:09:55 -0700 Subject: [PATCH 12/13] Changing client to reflect monitor changes --- examples/client.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/examples/client.go b/examples/client.go index 5719178..69ab478 100644 --- a/examples/client.go +++ b/examples/client.go @@ -513,17 +513,15 @@ func main() { } // Monitor change to DRAINING and DRAINED mode - nontransitioned, err := monitor.HostMaintenance( + hostResult, err := monitor.HostMaintenance( hosts, []aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING}, 5, 10) if err != nil { - - // Check whether the call was partially successful - if len(nontransitioned) != 0 { - fmt.Println("Partial success:") - for host, _ := range nontransitioned { + fmt.Println("Partial success:") + for host, ok := range hostResult { + if !ok { fmt.Printf("Host %s did not transtion into desired mode(s)\n", host) } } @@ -548,14 +546,14 @@ func main() { } // Monitor change to DRAINING and DRAINED mode - hostsResult, err := monitor.HostMaintenance( + hostResult, err := monitor.HostMaintenance( hosts, []aurora.MaintenanceMode{aurora.MaintenanceMode_NONE}, 5, 10) if err != nil { fmt.Println("Partial success:") - for host, ok := range hostsResult { + for host, ok := range hostResult { if !ok { fmt.Printf("Host %s did not transtion into desired mode(s)\n", host) } From 65398fdfd64f04defb2b2ee252254e6ade93c1c9 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Wed, 4 Oct 2017 17:40:33 -0700 Subject: [PATCH 13/13] Removed print statement as it makes no sense after the monitor change --- examples/client.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/examples/client.go b/examples/client.go index 69ab478..e33e4fc 100644 --- a/examples/client.go +++ b/examples/client.go @@ -519,7 +519,6 @@ func main() { 5, 10) if err != nil { - fmt.Println("Partial success:") for host, ok := range hostResult { if !ok { fmt.Printf("Host %s did not transtion into desired mode(s)\n", host) @@ -552,7 +551,6 @@ func main() { 5, 10) if err != nil { - fmt.Println("Partial success:") for host, ok := range hostResult { if !ok { fmt.Printf("Host %s did not transtion into desired mode(s)\n", host)