diff --git a/examples/client.go b/examples/client.go index 5d7a01f..e33e4fc 100644 --- a/examples/client.go +++ b/examples/client.go @@ -28,7 +28,7 @@ import ( "strings" ) -var cmd, executor, url, clustersConfig, clusterName, updateId, username, password, zkUrl, drainCandidates string +var cmd, executor, url, clustersConfig, clusterName, updateId, username, password, zkUrl, hostList string var CONNECTION_TIMEOUT = 20000 @@ -42,7 +42,7 @@ func init() { flag.StringVar(&username, "username", "aurora", "Username to use for authorization") flag.StringVar(&password, "password", "secret", "Password to use for authorization") flag.StringVar(&zkUrl, "zkurl", "", "zookeeper url") - flag.StringVar(&drainCandidates, "drainCandidates", "", "Comma separated list of candidate hosts to drain") + flag.StringVar(&hostList, "hostList", "", "Comma separated list of hosts to operate on") flag.Parse() } @@ -501,16 +501,66 @@ func main() { case "drainHosts": fmt.Println("Setting hosts to DRAINING") - if drainCandidates == "" { + if hostList == "" { fmt.Println("No hosts specified to drain") os.Exit(1) } - hosts := strings.Split(drainCandidates, ",") + hosts := strings.Split(hostList, ",") _, result, err := r.DrainHosts(hosts...) if err != nil { fmt.Printf("error: %+v\n", err.Error()) os.Exit(1) } + + // Monitor change to DRAINING and DRAINED mode + hostResult, err := monitor.HostMaintenance( + hosts, + []aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING}, + 5, + 10) + if err != nil { + for host, ok := range hostResult { + if !ok { + fmt.Printf("Host %s did not transtion into desired mode(s)\n", host) + } + } + + fmt.Printf("error: %+v\n", err.Error()) + os.Exit(1) + } + + fmt.Print(result.String()) + + case "endMaintenance": + fmt.Println("Setting hosts to ACTIVE") + if hostList == "" { + fmt.Println("No hosts specified to drain") + os.Exit(1) + } + hosts := strings.Split(hostList, ",") + _, result, err := r.EndMaintenance(hosts...) + if err != nil { + fmt.Printf("error: %+v\n", err.Error()) + os.Exit(1) + } + + // Monitor change to DRAINING and DRAINED mode + hostResult, err := monitor.HostMaintenance( + hosts, + []aurora.MaintenanceMode{aurora.MaintenanceMode_NONE}, + 5, + 10) + if err != nil { + for host, ok := range hostResult { + if !ok { + fmt.Printf("Host %s did not transtion into desired mode(s)\n", host) + } + } + + fmt.Printf("error: %+v\n", err.Error()) + os.Exit(1) + } + fmt.Print(result.String()) default: diff --git a/monitors.go b/monitors.go index caf98d0..ffa524e 100644 --- a/monitors.go +++ b/monitors.go @@ -152,3 +152,59 @@ func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval int, t fmt.Println("Timed out") return false, nil } + +// Monitor host status until all hosts match the status provided. Returns a map where the value is true if the host +// is in one of the desired mode(s) or false if it is not as of the time when the monitor exited. +func (m *Monitor) HostMaintenance(hosts []string, modes []aurora.MaintenanceMode, sleepTime, steps int) (map[string]bool, error) { + + // Transform modes to monitor for into a set for easy lookup + desiredMode := make(map[aurora.MaintenanceMode]struct{}) + for _, mode := range modes { + desiredMode[mode] = struct{}{} + } + + // Turn slice into a host set to eliminate duplicates. + // We also can't use a simple count because multiple modes means we can have multiple matches for a single host. + // I.e. host A transitions from ACTIVE to DRAINING to DRAINED while monitored + remainingHosts := make(map[string]struct{}) + for _, host := range hosts { + remainingHosts[host] = struct{}{} + } + + hostResult := make(map[string]bool) + + for step := 0; step < steps; step++ { + if step != 0 { + time.Sleep(time.Duration(sleepTime) * time.Second) + } + + // Client call has multiple retries internally + _, result, err := m.Client.MaintenanceStatus(hosts...) + if err != nil { + // Error is either a payload error or a severe connection error + for host := range remainingHosts { + hostResult[host] = false + } + return hostResult, errors.Wrap(err, "client error in monitor") + } + + for status := range result.GetStatuses() { + + if _, ok := desiredMode[status.GetMode()]; ok { + hostResult[status.GetHost()] = true + delete(remainingHosts, status.GetHost()) + + if len(remainingHosts) == 0 { + return hostResult, nil + } + } + } + + } + + for host := range remainingHosts { + hostResult[host] = false + } + + return hostResult, errors.New("Timed out") +} diff --git a/realis.go b/realis.go index 37fb65a..6dbd94d 100644 --- a/realis.go +++ b/realis.go @@ -59,6 +59,8 @@ type Realis interface { // Admin functions DrainHosts(hosts ...string) (*aurora.Response, *aurora.DrainHostsResult_, error) + EndMaintenance(hosts ...string) (*aurora.Response, *aurora.EndMaintenanceResult_, error) + MaintenanceStatus(hosts ...string) (*aurora.Response, *aurora.MaintenanceStatusResult_, error) } type realisClient struct { @@ -346,7 +348,9 @@ func NewDefaultClientUsingUrl(url, user, passwd string) (Realis, error) { config.cluster = nil // Configured for vagrant AddBasicAuth(config, user, passwd) + config.backoff = &Backoff{Steps: 2, Duration: 10 * time.Second, Factor: 2.0, Jitter: 0.1} r := newClient(config) + return r, nil } @@ -1164,7 +1168,7 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr var resp *aurora.Response var result *aurora.DrainHostsResult_ - var clientErr, payloadErr error + var returnErr, clientErr, payloadErr error if len(hosts) == 0 { return nil, nil, errors.New("no hosts provided to drain") @@ -1206,12 +1210,146 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr result = resp.GetResult_().GetDrainHostsResult_() } + // Prioritize returning a bad payload error over a client error as a bad payload error indicates + // a deeper issue + if payloadErr != nil { + returnErr = payloadErr + } else { + returnErr = clientErr + } // Timed out on retries. *Note that when we fix the unexpected errors with a correct payload, // this will can become either a timeout error or a payload error if retryErr != nil { - return resp, result, errors.Wrap(clientErr, "Unable to recover connection") + return resp, result, errors.Wrap(returnErr, "Unable to recover connection") } return resp, result, nil } + +func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *aurora.EndMaintenanceResult_, error) { + + var resp *aurora.Response + var result *aurora.EndMaintenanceResult_ + var returnErr, clientErr, payloadErr error + + if len(hosts) == 0 { + return nil, nil, errors.New("no hosts provided to end maintenance on") + } + + hostList := aurora.NewHosts() + hostList.HostNames = make(map[string]bool) + for _, host := range hosts { + hostList.HostNames[host] = true + } + + retryErr := ExponentialBackoff(defaultBackoff, func() (bool, error) { + + // Send thrift call, if we have a thrift send error, attempt to reconnect + // and continue trying to resend command + if resp, clientErr = r.adminClient.EndMaintenance(hostList); clientErr != nil { + // Experienced an connection error + err1 := r.ReestablishConn() + if err1 != nil { + fmt.Println("error in re-establishing connection: ", err1) + } + return false, nil + } + + // If error is NOT due to connection + if _, payloadErr = response.ResponseCodeCheck(resp); payloadErr != nil { + // TODO(rdelvalle): an leader election may cause the response to have + // failed when it should have succeeded. Retry everything for now until + // we figure out a more concrete fix. + return false, nil + } + + // Successful call + return true, nil + + }) + + if resp != nil && resp.GetResult_() != nil { + result = resp.GetResult_().GetEndMaintenanceResult_() + } + + // Prioritize returning a bad payload error over a client error as a bad payload error indicates + // a deeper issue + if payloadErr != nil { + returnErr = payloadErr + } else { + returnErr = clientErr + } + + // Timed out on retries. *Note that when we fix the unexpected errors with a correct payload, + // this will can become either a timeout error or a payload error + if retryErr != nil { + return resp, result, errors.Wrap(returnErr, "Unable to recover connection") + } + + return resp, result, nil +} + +func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *aurora.MaintenanceStatusResult_, error) { + + var resp *aurora.Response + var result *aurora.MaintenanceStatusResult_ + var returnErr, clientErr, payloadErr error + + if len(hosts) == 0 { + return nil, nil, errors.New("no hosts provided to get maintenance status from") + } + + hostList := aurora.NewHosts() + hostList.HostNames = make(map[string]bool) + for _, host := range hosts { + hostList.HostNames[host] = true + } + + retryErr := ExponentialBackoff(defaultBackoff, func() (bool, error) { + + // Send thrift call, if we have a thrift send error, attempt to reconnect + // and continue trying to resend command + if resp, clientErr = r.adminClient.MaintenanceStatus(hostList); clientErr != nil { + // Experienced an connection error + err1 := r.ReestablishConn() + if err1 != nil { + fmt.Println("error in re-establishing connection: ", err1) + } + return false, nil + } + + // If error is NOT due to connection + if _, payloadErr = response.ResponseCodeCheck(resp); payloadErr != nil { + // TODO(rdelvalle): an leader election may cause the response to have + // failed when it should have succeeded. Retry everything for now until + // we figure out a more concrete fix. + return false, nil + } + + // Successful call + return true, nil + + }) + + if resp != nil && resp.GetResult_() != nil { + result = resp.GetResult_().GetMaintenanceStatusResult_() + } + + // Prioritize returning a bad payload error over a client error as a bad payload error indicates + // a deeper issue + if payloadErr != nil { + returnErr = payloadErr + } else { + returnErr = clientErr + } + + // Timed out on retries. *Note that when we fix the unexpected errors with a correct payload, + // this will can become either a timeout error or a payload error + if retryErr != nil { + return resp, result, errors.Wrap(returnErr, "Unable to recover connection") + } + + return resp, result, nil + +} diff --git a/realis_e2e_test.go b/realis_e2e_test.go index 465e085..c75f184 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -12,10 +12,11 @@ * limitations under the License. */ -package realis +package realis_test import ( "fmt" + "github.com/rdelval/gorealis" "github.com/rdelval/gorealis/gen-go/apache/aurora" "github.com/stretchr/testify/assert" "io/ioutil" @@ -24,19 +25,27 @@ import ( "time" ) -var r Realis +var r realis.Realis +var monitor *realis.Monitor var thermosPayload []byte func TestMain(m *testing.M) { var err error // New configuration to connect to Vagrant image - r, err = NewDefaultClientUsingUrl("http://192.168.33.7:8081","aurora", "secret") + r, err = realis.NewRealisClient(realis.SchedulerUrl("http://192.168.33.7:8081"), + realis.BasicAuth("aurora", "secret"), + realis.ThriftJSON(), + realis.TimeoutMS(20000), + realis.BackOff(&realis.Backoff{Steps: 2, Duration: 10 * time.Second, Factor: 2.0, Jitter: 0.1})) if err != nil { fmt.Println("Please run vagrant box before running test suite") os.Exit(1) } + // Create monitor + monitor = &realis.Monitor{r} + thermosPayload, err = ioutil.ReadFile("examples/thermos_payload.json") if err != nil { fmt.Println("Error reading thermos payload file: ", err) @@ -48,10 +57,10 @@ func TestMain(m *testing.M) { func TestRealisClient_CreateJob_Thermos(t *testing.T) { - job := NewJob(). + job := realis.NewJob(). Environment("prod"). Role("vagrant"). - Name("create_job_test"). + Name("create_thermos_job_test"). ExecutorName(aurora.AURORA_EXECUTOR_NAME). ExecutorData(string(thermosPayload)). CPU(1). @@ -64,38 +73,29 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) { start := time.Now() resp, err := r.CreateJob(job) end := time.Now() - if err != nil { - fmt.Println(err) - os.Exit(1) - } + assert.NoError(t, err) assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode) - fmt.Printf("Create call took %d ns\n", (end.UnixNano()- start.UnixNano())) + fmt.Printf("Create call took %d ns\n", (end.UnixNano() - start.UnixNano())) // Tasks must exist for it to be killed t.Run("TestRealisClient_KillJob_Thermos", func(t *testing.T) { start := time.Now() resp, err := r.KillJob(job.JobKey()) end := time.Now() - if err != nil { - fmt.Println(err) - os.Exit(1) - } + assert.NoError(t, err) assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode) - fmt.Printf("Kill call took %d ns\n", (end.UnixNano()- start.UnixNano())) + fmt.Printf("Kill call took %d ns\n", (end.UnixNano() - start.UnixNano())) }) } func TestRealisClient_ScheduleCronJob_Thermos(t *testing.T) { thermosCronPayload, err := ioutil.ReadFile("examples/thermos_cron_payload.json") - if err != nil { - fmt.Println("Error reading thermos payload file: ", err) - os.Exit(1) - } + assert.NoError(t, err) - job := NewJob(). + job := realis.NewJob(). Environment("prod"). Role("vagrant"). Name("cronsched_job_test"). @@ -122,24 +122,66 @@ func TestRealisClient_ScheduleCronJob_Thermos(t *testing.T) { start := time.Now() resp, err := r.StartCronJob(job.JobKey()) end := time.Now() - if err != nil { - fmt.Println(err) - os.Exit(1) - } + + assert.NoError(t, err) assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode) - fmt.Printf("Schedule cron call took %d ns\n", (end.UnixNano()- start.UnixNano())) + fmt.Printf("Schedule cron call took %d ns\n", (end.UnixNano() - start.UnixNano())) }) t.Run("TestRealisClient_DeschedulerCronJob_Thermos", func(t *testing.T) { start := time.Now() resp, err := r.DescheduleCronJob(job.JobKey()) end := time.Now() + + assert.NoError(t, err) + assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode) + fmt.Printf("Deschedule cron call took %d ns\n", (end.UnixNano() - start.UnixNano())) + }) +} +func TestRealisClient_DrainHosts(t *testing.T) { + hosts := []string{"192.168.33.7"} + _, _, err := r.DrainHosts(hosts...) + if err != nil { + fmt.Printf("error: %+v\n", err.Error()) + os.Exit(1) + } + + // Monitor change to DRAINING and DRAINED mode + hostResults, err := monitor.HostMaintenance( + hosts, + []aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING}, + 5, + 10) + assert.Equal(t, map[string]bool{"192.168.33.7": true}, hostResults) + assert.NoError(t, err) + + t.Run("TestRealisClient_MonitorNontransitioned", func(t *testing.T) { + // Monitor change to DRAINING and DRAINED mode + hostResults, err := monitor.HostMaintenance( + append(hosts, "IMAGINARY_HOST"), + []aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING}, + 1, + 1) + + // Assert monitor returned an error that was not nil, and also a list of the non-transitioned hosts + assert.Error(t, err) + assert.Equal(t, map[string]bool{"192.168.33.7": true, "IMAGINARY_HOST": false}, hostResults) + }) + + t.Run("TestRealisClient_EndMaintenance", func(t *testing.T) { + _, _, err := r.EndMaintenance(hosts...) if err != nil { - fmt.Println(err) + fmt.Printf("error: %+v\n", err.Error()) os.Exit(1) } - assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode) - fmt.Printf("Deschedule cron call took %d ns\n", (end.UnixNano()- start.UnixNano())) + // Monitor change to DRAINING and DRAINED mode + _, err = monitor.HostMaintenance( + hosts, + []aurora.MaintenanceMode{aurora.MaintenanceMode_NONE}, + 5, + 10) + assert.NoError(t, err) }) + }