From d2fd7b9ba9061fb601c0352e8a7feede090098bd Mon Sep 17 00:00:00 2001 From: lawwong1 <105893156+lawwong1@users.noreply.github.com> Date: Thu, 26 Jan 2023 13:36:40 -0800 Subject: [PATCH] merge retry mechanism change from gorealis v1 to gorealis v2 (#21) --- .github/workflows/main.yml | 2 +- go.mod | 2 +- go.sum | 4 + helpers.go | 23 ++++ realis.go | 236 +++++++++++++++++++++++++++---------- realis_admin.go | 98 ++++++++------- realis_e2e_test.go | 68 ++++++++++- response/response.go | 4 + retry.go | 216 +++++++++++++++++++-------------- util.go | 2 +- 10 files changed, 449 insertions(+), 206 deletions(-) create mode 100644 helpers.go diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 485b6e4..eb459c8 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -11,7 +11,7 @@ on: jobs: build: - runs-on: ubuntu-latest + runs-on: ubuntu-20.04 steps: - uses: actions/checkout@v2 diff --git a/go.mod b/go.mod index 3b3095f..c9c3dee 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ require ( github.com/apache/thrift v0.14.0 github.com/pkg/errors v0.9.1 github.com/samuel/go-zookeeper v0.0.0-20171117190445-471cd4e61d7a - github.com/stretchr/testify v1.5.0 + github.com/stretchr/testify v1.7.0 ) go 1.16 diff --git a/go.sum b/go.sum index 9324c97..d65a779 100644 --- a/go.sum +++ b/go.sum @@ -12,7 +12,11 @@ github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.5.0 h1:DMOzIV76tmoDNE9pX6RSN0aDtCYeCg5VueieJaAo1uw= github.com/stretchr/testify v1.5.0/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/helpers.go b/helpers.go new file mode 100644 index 0000000..f1983db --- /dev/null +++ b/helpers.go @@ -0,0 +1,23 @@ +package realis + +import ( + "context" + + "github.com/aurora-scheduler/gorealis/v2/gen-go/apache/aurora" +) + +func (r *Client) JobExists(key aurora.JobKey) (bool, error) { + resp, err := r.client.GetConfigSummary(context.TODO(), &key) + if err != nil { + return false, err + } + + return resp != nil && + resp.GetResult_() != nil && + resp.GetResult_().GetConfigSummaryResult_() != nil && + resp.GetResult_().GetConfigSummaryResult_().GetSummary() != nil && + resp.GetResult_().GetConfigSummaryResult_().GetSummary().GetGroups() != nil && + len(resp.GetResult_().GetConfigSummaryResult_().GetSummary().GetGroups()) > 0 && + resp.GetResponseCode() == aurora.ResponseCode_OK, + nil +} diff --git a/realis.go b/realis.go index 1219e92..c331918 100644 --- a/realis.go +++ b/realis.go @@ -315,11 +315,13 @@ func (c *Client) GetInstanceIds(key aurora.JobKey, states []aurora.ScheduleStatu Statuses: states, } - c.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", taskQ) + c.logger.DebugPrintf("GetInstanceIds Thrift Payload: %+v\n", taskQ) resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.GetTasksWithoutConfigs(context.TODO(), taskQ) - }) + }, + nil, + ) // If we encountered an error we couldn't recover from by retrying, return an error to the user if retryErr != nil { @@ -341,8 +343,13 @@ func (c *Client) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (* resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.readonlyClient.GetJobUpdateSummaries(context.TODO(), jobUpdateQuery) - }) + }, + nil, + ) + if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetGetJobUpdateSummariesResult_() == nil { + return nil, errors.New("unexpected response from scheduler") + } if retryErr != nil { return nil, errors.Wrap(retryErr, "error getting job update summaries from Aurora Scheduler") } @@ -354,8 +361,12 @@ func (c *Client) GetJobSummary(role string) (*aurora.JobSummaryResult_, error) { resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.readonlyClient.GetJobSummary(context.TODO(), role) - }) - + }, + nil, + ) + if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetJobSummaryResult_() == nil { + return nil, errors.New("unexpected response from scheduler") + } if retryErr != nil { return nil, errors.Wrap(retryErr, "error getting job summaries from Aurora Scheduler") } @@ -365,21 +376,20 @@ func (c *Client) GetJobSummary(role string) (*aurora.JobSummaryResult_, error) { func (c *Client) GetJobs(role string) (*aurora.GetJobsResult_, error) { - var result *aurora.GetJobsResult_ - resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.readonlyClient.GetJobs(context.TODO(), role) - }) + }, + nil, + ) if retryErr != nil { - return result, errors.Wrap(retryErr, "error getting Jobs from Aurora Scheduler") + return nil, errors.Wrap(retryErr, "error getting Jobs from Aurora Scheduler") + } + if resp == nil || resp.GetResult_() == nil { + return nil, errors.New("unexpected response from scheduler") } - if resp.GetResult_() != nil { - result = resp.GetResult_().GetJobsResult_ - } - - return result, nil + return resp.GetResult_().GetJobsResult_, nil } // Kill specific instances of a job. Returns true, nil if a task was actually killed as a result of this API call. @@ -389,19 +399,19 @@ func (c *Client) KillInstances(key aurora.JobKey, instances ...int32) (bool, err resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.KillTasks(context.TODO(), &key, instances, "") - }) + }, + nil, + ) if retryErr != nil { return false, errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler") } - if len(resp.GetDetails()) > 0 { + if resp == nil || len(resp.GetDetails()) > 0 { c.logger.Println("KillTasks was called but no tasks killed as a result.") return false, nil - } else { - return true, nil } - + return true, nil } func (c *Client) RealisConfig() *clientConfig { @@ -416,7 +426,9 @@ func (c *Client) KillJob(key aurora.JobKey) error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { // Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards return c.client.KillTasks(context.TODO(), &key, nil, "") - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler") @@ -438,9 +450,27 @@ func (c *Client) CreateJob(auroraJob *AuroraJob) error { return errors.Wrap(err, "unable to create Thermos payload") } - _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { - return c.client.CreateJob(context.TODO(), auroraJob.JobConfig()) - }) + // Response is checked by the thrift retry code + _, retryErr := c.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return c.client.CreateJob(context.TODO(), auroraJob.JobConfig()) + }, + // On a client timeout, attempt to verify that payload made to the Scheduler by + // trying to get the config summary for the job key + func() (*aurora.Response, bool) { + exists, err := c.JobExists(auroraJob.JobKey()) + if err != nil { + c.logger.Print("verification failed ", err) + } + + if exists { + return &aurora.Response{ResponseCode: aurora.ResponseCode_OK}, true + } + + return nil, false + }, + ) if retryErr != nil { return errors.Wrap(retryErr, "error sending Create command to Aurora Scheduler") @@ -471,7 +501,9 @@ func (c *Client) ScheduleCronJob(auroraJob *AuroraJob) error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.ScheduleCronJob(context.TODO(), auroraJob.JobConfig()) - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "error sending Cron AuroraJob Schedule message to Aurora Scheduler") @@ -485,7 +517,9 @@ func (c *Client) DescheduleCronJob(key aurora.JobKey) error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.DescheduleCronJob(context.TODO(), &key) - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "error sending Cron AuroraJob De-schedule message to Aurora Scheduler") @@ -501,7 +535,9 @@ func (c *Client) StartCronJob(key aurora.JobKey) error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.StartCronJob(context.TODO(), &key) - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "error sending Start Cron AuroraJob message to Aurora Scheduler") @@ -516,7 +552,9 @@ func (c *Client) RestartInstances(key aurora.JobKey, instances ...int32) error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.RestartShards(context.TODO(), &key, instances) - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler") @@ -537,16 +575,17 @@ func (c *Client) RestartJob(key aurora.JobKey) error { if len(instanceIds) > 0 { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.RestartShards(context.TODO(), &key, instanceIds) - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler") } return nil - } else { - return errors.New("no tasks in the Active state") } + return errors.New("no tasks in the Active state") } // Update all tasks under a job configuration. Currently gorealis doesn't support for canary deployments. @@ -558,34 +597,80 @@ func (c *Client) StartJobUpdate(updateJob *JobUpdate, message string) (*aurora.S c.logger.DebugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message) - resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { - return c.client.StartJobUpdate(nil, updateJob.request, message) - }) + resp, retryErr := c.thriftCallWithRetries(false, + func() (*aurora.Response, error) { + return c.client.StartJobUpdate(context.TODO(), updateJob.request, message) + }, + func() (*aurora.Response, bool) { + key := updateJob.JobKey() + summariesResp, err := c.readonlyClient.GetJobUpdateSummaries( + context.TODO(), + &aurora.JobUpdateQuery{ + JobKey: &key, + UpdateStatuses: aurora.ACTIVE_JOB_UPDATE_STATES, + Limit: 1, + }) + + if err != nil { + c.logger.Print("verification failed ", err) + return nil, false + } + + summaries := response.JobUpdateSummaries(summariesResp) + if len(summaries) == 0 { + return nil, false + } + + return &aurora.Response{ + ResponseCode: aurora.ResponseCode_OK, + Result_: &aurora.Result_{ + StartJobUpdateResult_: &aurora.StartJobUpdateResult_{ + UpdateSummary: summaries[0], + Key: summaries[0].Key, + }, + }, + }, true + }, + ) if retryErr != nil { + // A timeout took place when attempting this call, attempt to recover + if IsTimeout(retryErr) { + return nil, retryErr + } + return nil, errors.Wrap(retryErr, "error sending StartJobUpdate command to Aurora Scheduler") } - - if resp.GetResult_() != nil && resp.GetResult_().GetStartJobUpdateResult_() != nil { - return resp.GetResult_().GetStartJobUpdateResult_(), nil + if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetStartJobUpdateResult_() == nil { + return nil, errors.New("unexpected response from scheduler") } - - return nil, errors.New("thrift error: Field in response is nil unexpectedly.") + return resp.GetResult_().GetStartJobUpdateResult_(), nil } -// Abort AuroraJob Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI. +// AbortJobUpdate terminates a job update in the scheduler. +// It requires the updateId which can be obtained on the Aurora web UI. +// This API is meant to be synchronous. It will attempt to wait until the update transitions to the aborted state. +// However, if the job update does not transition to the ABORT state an error will be returned. func (c *Client) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) error { c.logger.DebugPrintf("AbortJobUpdate Thrift Payload: %+v %v\n", updateKey, message) _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.AbortJobUpdate(context.TODO(), &updateKey, message) - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "error sending AbortJobUpdate command to Aurora Scheduler") } - return nil + // Make this call synchronous by blocking until it job has successfully transitioned to aborted + _, err := c.MonitorJobUpdateStatus( + updateKey, + []aurora.JobUpdateStatus{aurora.JobUpdateStatus_ABORTED}, + time.Second*5, + time.Minute) + return err } // Pause AuroraJob Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. @@ -605,7 +690,9 @@ func (c *Client) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.PauseJobUpdate(nil, updateKeyLocal, message) - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "error sending PauseJobUpdate command to Aurora Scheduler") @@ -632,7 +719,9 @@ func (c *Client) ResumeJobUpdate(updateKey aurora.JobUpdateKey, message string) _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.ResumeJobUpdate(context.TODO(), &updateKey, message) - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "error sending ResumeJobUpdate command to Aurora Scheduler") @@ -653,18 +742,19 @@ func (c *Client) PulseJobUpdate(updateKey aurora.JobUpdateKey) (aurora.JobUpdate resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.PulseJobUpdate(context.TODO(), &updateKey) - }) + }, + nil, + ) if retryErr != nil { return aurora.JobUpdatePulseStatus(0), errors.Wrap(retryErr, "error sending PulseJobUpdate command to Aurora Scheduler") } - if resp.GetResult_() != nil && resp.GetResult_().GetPulseJobUpdateResult_() != nil { - return resp.GetResult_().GetPulseJobUpdateResult_().GetStatus(), nil - } else { - return aurora.JobUpdatePulseStatus(0), errors.New("thrift error, field was nil unexpectedly") + if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetPulseJobUpdateResult_() == nil { + return aurora.JobUpdatePulseStatus(0), errors.New("unexpected response from scheduler") } + return resp.GetResult_().GetPulseJobUpdateResult_().GetStatus(), nil } // Scale up the number of instances under a job configuration using the configuration for specific @@ -681,7 +771,9 @@ func (c *Client) AddInstances(instKey aurora.InstanceKey, count int32) error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.AddInstances(context.TODO(), &instKey, count) - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "error sending AddInstances command to Aurora Scheduler") @@ -726,11 +818,16 @@ func (c *Client) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.GetTasksStatus(context.TODO(), query) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status") } + if resp == nil { + return nil, errors.New("unexpected response from scheduler") + } return response.ScheduleStatusResult(resp).GetTasks(), nil } @@ -742,29 +839,32 @@ func (c *Client) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.PendingRea resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.GetPendingReason(context.TODO(), query) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for pending Reasons") } - var result []*aurora.PendingReason - - if resp.GetResult_() != nil { - result = resp.GetResult_().GetGetPendingReasonResult_().GetReasons() + if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetGetPendingReasonResult_() == nil { + return nil, errors.New("unexpected response from scheduler") } - return result, nil + return resp.GetResult_().GetGetPendingReasonResult_().GetReasons(), nil } -// Get information about task including without a task configuration object +// GetTasksWithoutConfigs gets information about task including without a task configuration object. +// This is a more lightweight version of GetTaskStatus but contains less information as a result. func (c *Client) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) { c.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query) resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.GetTasksWithoutConfigs(context.TODO(), query) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status without configs") @@ -791,7 +891,9 @@ func (c *Client) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.GetTasksStatus(context.TODO(), taskQ) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task configuration") @@ -817,17 +919,19 @@ func (c *Client) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) ([]*aurora. resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.GetJobUpdateDetails(context.TODO(), &updateQuery) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "unable to get job update details") } - if resp.GetResult_() != nil && resp.GetResult_().GetGetJobUpdateDetailsResult_() != nil { - return resp.GetResult_().GetGetJobUpdateDetailsResult_().GetDetailsList(), nil - } else { - return nil, errors.New("unknown Thrift error, field is nil.") + if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetGetJobUpdateDetailsResult_() == nil { + return nil, errors.New("unexpected response from scheduler") } + + return resp.GetResult_().GetGetJobUpdateDetailsResult_().GetDetailsList(), nil } func (c *Client) RollbackJobUpdate(key aurora.JobUpdateKey, message string) error { @@ -836,7 +940,9 @@ func (c *Client) RollbackJobUpdate(key aurora.JobUpdateKey, message string) erro _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.RollbackJobUpdate(context.TODO(), &key, message) - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "unable to roll back job update") diff --git a/realis_admin.go b/realis_admin.go index f2759ea..f100f46 100644 --- a/realis_admin.go +++ b/realis_admin.go @@ -37,17 +37,19 @@ func (c *Client) DrainHosts(hosts ...string) ([]*aurora.HostStatus, error) { resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.DrainHosts(context.TODO(), drainList) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "unable to recover connection") } - if resp.GetResult_() != nil && resp.GetResult_().GetDrainHostsResult_() != nil { - return resp.GetResult_().GetDrainHostsResult_().GetStatuses(), nil - } else { - return nil, errors.New("thrift error: Field in response is nil unexpectedly.") + if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetDrainHostsResult_() == nil { + return nil, errors.New("unexpected response from scheduler") } + + return resp.GetResult_().GetDrainHostsResult_().GetStatuses(), nil } // Start SLA Aware Drain. @@ -78,17 +80,19 @@ func (c *Client) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, hosts .. resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.SlaDrainHosts(context.TODO(), drainList, policy, timeout) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "unable to recover connection") } - if resp.GetResult_() != nil && resp.GetResult_().GetDrainHostsResult_() != nil { - return resp.GetResult_().GetDrainHostsResult_().GetStatuses(), nil - } else { - return nil, errors.New("thrift error: Field in response is nil unexpectedly.") + if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetDrainHostsResult_() == nil { + return nil, errors.New("unexpected response from scheduler") } + + return resp.GetResult_().GetDrainHostsResult_().GetStatuses(), nil } func (c *Client) StartMaintenance(hosts ...string) ([]*aurora.HostStatus, error) { @@ -104,17 +108,19 @@ func (c *Client) StartMaintenance(hosts ...string) ([]*aurora.HostStatus, error) resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.StartMaintenance(context.TODO(), hostList) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "unable to recover connection") } - if resp.GetResult_() != nil && resp.GetResult_().GetStartMaintenanceResult_() != nil { - return resp.GetResult_().GetStartMaintenanceResult_().GetStatuses(), nil - } else { - return nil, errors.New("thrift error: Field in response is nil unexpectedly.") + if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetStartMaintenanceResult_() == nil { + return nil, errors.New("unexpected response from scheduler") } + + return resp.GetResult_().GetStartMaintenanceResult_().GetStatuses(), nil } func (c *Client) EndMaintenance(hosts ...string) ([]*aurora.HostStatus, error) { @@ -130,24 +136,20 @@ func (c *Client) EndMaintenance(hosts ...string) ([]*aurora.HostStatus, error) { resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.EndMaintenance(context.TODO(), hostList) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "unable to recover connection") } - - if resp.GetResult_() != nil && resp.GetResult_().GetEndMaintenanceResult_() != nil { - return resp.GetResult_().GetEndMaintenanceResult_().GetStatuses(), nil - } else { - return nil, errors.New("thrift error: Field in response is nil unexpectedly.") + if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetEndMaintenanceResult_() == nil { + return nil, errors.New("unexpected response from scheduler") } - + return resp.GetResult_().GetEndMaintenanceResult_().GetStatuses(), nil } func (c *Client) MaintenanceStatus(hosts ...string) (*aurora.MaintenanceStatusResult_, error) { - - var result *aurora.MaintenanceStatusResult_ - if len(hosts) == 0 { return nil, errors.New("no hosts provided to get maintenance status from") } @@ -161,17 +163,18 @@ func (c *Client) MaintenanceStatus(hosts ...string) (*aurora.MaintenanceStatusRe // and continue trying to resend command until we run out of retries. resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.MaintenanceStatus(context.TODO(), hostList) - }) + }, + nil, + ) if retryErr != nil { - return result, errors.Wrap(retryErr, "unable to recover connection") + return nil, errors.Wrap(retryErr, "unable to recover connection") + } + if resp == nil || resp.GetResult_() == nil { + return nil, errors.New("unexpected response from scheduler") } - if resp.GetResult_() != nil { - result = resp.GetResult_().GetMaintenanceStatusResult_() - } - - return result, nil + return resp.GetResult_().GetMaintenanceStatusResult_(), nil } // SetQuota sets a quota aggregate for the given role @@ -189,7 +192,9 @@ func (c *Client) SetQuota(role string, cpu *float64, ramMb *int64, diskMb *int64 _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.SetQuota(context.TODO(), role, quota) - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "unable to set role quota") @@ -203,17 +208,18 @@ func (c *Client) GetQuota(role string) (*aurora.GetQuotaResult_, error) { resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.GetQuota(context.TODO(), role) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "unable to get role quota") } - if resp.GetResult_() != nil { - return resp.GetResult_().GetGetQuotaResult_(), nil - } else { - return nil, errors.New("thrift error: Field in response is nil unexpectedly.") + if resp == nil || resp.GetResult_() == nil { + return nil, errors.New("unexpected response from scheduler") } + return resp.GetResult_().GetGetQuotaResult_(), nil } // Force Aurora Scheduler to perform a snapshot and write to Mesos log @@ -221,7 +227,9 @@ func (c *Client) Snapshot() error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.Snapshot(context.TODO()) - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "unable to recover connection") @@ -235,7 +243,9 @@ func (c *Client) PerformBackup() error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.PerformBackup(context.TODO()) - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "unable to recover connection") @@ -249,7 +259,9 @@ func (c *Client) ForceImplicitTaskReconciliation() error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.TriggerImplicitTaskReconciliation(context.TODO()) - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "unable to recover connection") @@ -270,7 +282,9 @@ func (c *Client) ForceExplicitTaskReconciliation(batchSize *int32) error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.TriggerExplicitTaskReconciliation(context.TODO(), settings) - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "unable to recover connection") diff --git a/realis_e2e_test.go b/realis_e2e_test.go index 1f544d8..5893491 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -477,13 +477,15 @@ func TestRealisClient_CreateService(t *testing.T) { var ok bool var mErr error - if ok, mErr = r.MonitorJobUpdate(*result.GetKey(), 5*time.Second, 4*time.Minute); !ok || mErr != nil { - // Update may already be in a terminal state so don't check for error - err := r.AbortJobUpdate(*result.GetKey(), "Monitor timed out.") + if result != nil { + if ok, mErr = r.MonitorJobUpdate(*result.GetKey(), 5*time.Second, 4*time.Minute); !ok || mErr != nil { + // Update may already be in a terminal state so don't check for error + err := r.AbortJobUpdate(*result.GetKey(), "Monitor timed out.") - err = r.KillJob(job.JobKey()) + err = r.KillJob(job.JobKey()) - assert.NoError(t, err) + assert.NoError(t, err) + } } assert.True(t, ok) @@ -1445,3 +1447,59 @@ func TestRealisClient_FitTasks(t *testing.T) { } } } + +func TestRealisClient_JobExists(t *testing.T) { + role := "vagrant" + env := "prod" + name := "test_job_exists" + // Create a good single job + job := realis.NewJob(). + Environment(env). + Role(role). + Name(name). + ThermosExecutor(thermosExec). + CPU(.25). + RAM(4). + Disk(10). + InstanceCount(3). + IsService(true). + Production(false). + Tier("preemptible"). + Priority(0) + + // Check if job exists before creating + exists, err := r.JobExists(job.JobKey()) + assert.NoError(t, err) + assert.False(t, exists) + + err = r.CreateJob(job) + assert.NoError(t, err) + + exists, err = r.JobExists(job.JobKey()) + assert.NoError(t, err) + assert.True(t, exists) + + // Create a single bad job + badJob := realis.NewJob(). + Environment("prod"). + Role("vagrant"). + Name("executordoesntexist"). + ExecutorName("idontexist"). + ExecutorData(""). + CPU(.25). + RAM(4). + Disk(10). + InstanceCount(1) + + // Check if job exists before creating + exists, err = r.JobExists(badJob.JobKey()) + assert.NoError(t, err) + assert.False(t, exists) + + err = r.CreateJob(badJob) + assert.Error(t, err) + + exists, err = r.JobExists(badJob.JobKey()) + assert.NoError(t, err) + assert.False(t, exists) +} diff --git a/response/response.go b/response/response.go index 1663b1b..15081ec 100644 --- a/response/response.go +++ b/response/response.go @@ -35,6 +35,10 @@ func ScheduleStatusResult(resp *aurora.Response) *aurora.ScheduleStatusResult_ { } func JobUpdateSummaries(resp *aurora.Response) []*aurora.JobUpdateSummary { + if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetGetJobUpdateSummariesResult_() == nil { + return nil + } + return resp.GetResult_().GetGetJobUpdateSummariesResult_().GetUpdateSummaries() } diff --git a/retry.go b/retry.go index 9860e42..f5b0918 100644 --- a/retry.go +++ b/retry.go @@ -17,10 +17,7 @@ package realis import ( "io" "math/rand" - "net/http" "net/url" - "strconv" - "strings" "time" "github.com/apache/thrift/lib/go/thrift" @@ -29,9 +26,11 @@ import ( "github.com/pkg/errors" ) +// Backoff determines how the retry mechanism should react after each failure and how many failures it should +// tolerate. type Backoff struct { Duration time.Duration // the base duration - Factor float64 // Duration is multipled by factor each iteration + Factor float64 // Duration is multiplied by a factor each iteration Jitter float64 // The amount of jitter applied each iteration Steps int // Exit with error after this many steps } @@ -53,18 +52,15 @@ func Jitter(duration time.Duration, maxFactor float64) time.Duration { // if the loop should be aborted. type ConditionFunc func() (done bool, err error) -// Modified version of the Kubernetes exponential-backoff code. -// ExponentialBackoff repeats a condition check with exponential backoff. -// -// It checks the condition up to Steps times, increasing the wait by multiplying -// the previous duration by Factor. +// ExponentialBackoff is a modified version of the Kubernetes exponential-backoff code. +// It repeats a condition check with exponential backoff and checks the condition up to +// Steps times, increasing the wait by multiplying the previous duration by Factor. // // If Jitter is greater than zero, a random amount of each duration is added // (between duration and duration*(1+jitter)). // // If the condition never returns true, ErrWaitTimeout is returned. Errors // do not cause the function to return. - func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc) error { var err error var ok bool @@ -98,10 +94,9 @@ func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc) // If the error is temporary, continue retrying. if !IsTemporary(err) { return err - } else { - // Print out the temporary error we experienced. - logger.Println(err) } + // Print out the temporary error we experienced. + logger.Println(err) } } @@ -112,19 +107,28 @@ func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc) // Provide more information to the user wherever possible if err != nil { return newRetryError(errors.Wrap(err, "ran out of retries"), curStep) - } else { - return newRetryError(errors.New("ran out of retries"), curStep) } + + return newRetryError(errors.New("ran out of retries"), curStep) } type auroraThriftCall func() (resp *aurora.Response, err error) +// verifyOntimeout defines the type of function that will be used to verify whether a Thirft call to the Scheduler +// made it to the scheduler or not. In general, these types of functions will have to interact with the scheduler +// through the very same Thrift API which previously encountered a time-out from the client. +// This means that the functions themselves should be kept to a minimum number of Thrift calls. +// It should also be noted that this is a best effort mechanism and +// is likely to fail for the same reasons that the original call failed. +type verifyOnTimeout func() (*aurora.Response, bool) + // Duplicates the functionality of ExponentialBackoff but is specifically targeted towards ThriftCalls. -func (c *Client) thriftCallWithRetries(returnOnTimeout bool, thriftCall auroraThriftCall) (*aurora.Response, error) { +func (c *Client) thriftCallWithRetries(returnOnTimeout bool, thriftCall auroraThriftCall, + verifyOnTimeout verifyOnTimeout) (*aurora.Response, error) { var resp *aurora.Response var clientErr error var curStep int - var timeouts int + timeouts := 0 backoff := c.config.backoff duration := backoff.Duration @@ -138,7 +142,10 @@ func (c *Client) thriftCallWithRetries(returnOnTimeout bool, thriftCall auroraTh adjusted = Jitter(duration, backoff.Jitter) } - c.logger.Printf("A retryable error occurred during thrift call, backing off for %v before retry %v\n", adjusted, curStep) + c.logger.Printf( + "A retryable error occurred during thrift call, backing off for %v before retry %v", + adjusted, + curStep) time.Sleep(adjusted) duration = time.Duration(float64(duration) * backoff.Factor) @@ -153,105 +160,132 @@ func (c *Client) thriftCallWithRetries(returnOnTimeout bool, thriftCall auroraTh resp, clientErr = thriftCall() - c.logger.TracePrintf("Aurora Thrift Call ended resp: %v clientErr: %v\n", resp, clientErr) + c.logger.TracePrintf("Aurora Thrift Call ended resp: %v clientErr: %v", resp, clientErr) }() // Check if our thrift call is returning an error. This is a retryable event as we don't know // if it was caused by network issues. if clientErr != nil { - // Print out the error to the user - c.logger.Printf("Client Error: %v\n", clientErr) + c.logger.Printf("Client Error: %v", clientErr) - // Determine if error is a temporary URL error by going up the stack - e, ok := clientErr.(thrift.TTransportException) - if ok { - c.logger.DebugPrint("Encountered a transport exception") + temporary, timedout := isConnectionError(clientErr) + if !temporary && c.RealisConfig().failOnPermanentErrors { + return nil, errors.Wrap(clientErr, "permanent connection error") + } - // TODO(rdelvalle): Figure out a better way to obtain the error code as this is a very brittle solution - // 401 Unauthorized means the wrong username and password were provided - if strings.Contains(e.Error(), strconv.Itoa(http.StatusUnauthorized)) { - return nil, errors.Wrap(clientErr, "wrong username or password provided") - } - - e, ok := e.Err().(*url.Error) - if ok { - // EOF error occurs when the server closes the read buffer of the client. This is common - // when the server is overloaded and should be retried. All other errors that are permanent - // will not be retried. - if e.Err != io.EOF && !e.Temporary() && c.RealisConfig().failOnPermanentErrors { - return nil, errors.Wrap(clientErr, "permanent connection error") - } - // Corner case where thrift payload was received by Aurora but connection timedout before Aurora was - // able to reply. In this case we will return whatever response was received and a TimedOut behaving - // error. Users can take special action on a timeout by using IsTimedout and reacting accordingly. - if e.Timeout() { - timeouts++ - c.logger.DebugPrintf( - "Client closed connection (timedout) %d times before server responded,"+ - " consider increasing connection timeout", - timeouts) - if returnOnTimeout { - return resp, - newTimedoutError(errors.New("client connection closed before server answer")) - } - } - } + // There exists a corner case where thrift payload was received by Aurora but + // connection timed out before Aurora was able to reply. + // Users can take special action on a timeout by using IsTimedout and reacting accordingly + // if they have configured the client to return on a timeout. + if timedout && returnOnTimeout { + return resp, newTimedoutError(errors.New("client connection closed before server answer")) } // In the future, reestablish connection should be able to check if it is actually possible // to make a thrift call to Aurora. For now, a reconnect should always lead to a retry. // Ignoring error due to the fact that an error should be retried regardless - _ = c.ReestablishConn() - - } else { - - // If there was no client error, but the response is nil, something went wrong. - // Ideally, we'll never encounter this but we're placing a safeguard here. - if resp == nil { - return nil, errors.New("response from aurora is nil") + reestablishErr := c.ReestablishConn() + if reestablishErr != nil { + c.logger.DebugPrintf("error re-establishing connection ", reestablishErr) } - // Check Response Code from thrift and make a decision to continue retrying or not. - switch responseCode := resp.GetResponseCode(); responseCode { + // If users did not opt for a return on timeout in order to react to a timedout error, + // attempt to verify that the call made it to the scheduler after the connection was re-established. + if timedout { + timeouts++ + c.logger.DebugPrintf( + "Client closed connection %d times before server responded, "+ + "consider increasing connection timeout", + timeouts) - // If the thrift call succeeded, stop retrying - case aurora.ResponseCode_OK: - return resp, nil - - // If the response code is transient, continue retrying - case aurora.ResponseCode_ERROR_TRANSIENT: - c.logger.Println("Aurora replied with Transient error code, retrying") - continue - - // Failure scenarios, these indicate a bad payload or a bad clientConfig. Stop retrying. - case aurora.ResponseCode_INVALID_REQUEST, - aurora.ResponseCode_ERROR, - aurora.ResponseCode_AUTH_FAILED, - aurora.ResponseCode_JOB_UPDATING_ERROR: - c.logger.Printf("Terminal Response Code %v from Aurora, won't retry\n", resp.GetResponseCode().String()) - return resp, errors.New(response.CombineMessage(resp)) - - // The only case that should fall down to here is a WARNING response code. - // It is currently not used as a response in the scheduler so it is unknown how to handle it. - default: - c.logger.DebugPrintf("unhandled response code %v received from Aurora\n", responseCode) - return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String()) + // Allow caller to provide a function which checks if the original call was successful before + // it timed out. + if verifyOnTimeout != nil { + if verifyResp, ok := verifyOnTimeout(); ok { + c.logger.Print("verified that the call went through successfully after a client timeout") + // Response here might be different than the original as it is no longer constructed + // by the scheduler but mimicked. + // This is OK since the scheduler is very unlikely to change responses at this point in its + // development cycle but we must be careful to not return an incorrectly constructed response. + return verifyResp, nil + } + } } + + // Retry the thrift payload + continue } + // If there was no client error, but the response is nil, something went wrong. + // Ideally, we'll never encounter this but we're placing a safeguard here. + if resp == nil { + return nil, errors.New("response from aurora is nil") + } + + // Check Response Code from thrift and make a decision to continue retrying or not. + switch responseCode := resp.GetResponseCode(); responseCode { + + // If the thrift call succeeded, stop retrying + case aurora.ResponseCode_OK: + return resp, nil + + // If the response code is transient, continue retrying + case aurora.ResponseCode_ERROR_TRANSIENT: + c.logger.Println("Aurora replied with Transient error code, retrying") + continue + + // Failure scenarios, these indicate a bad payload or a bad clientConfig. Stop retrying. + case aurora.ResponseCode_INVALID_REQUEST, + aurora.ResponseCode_ERROR, + aurora.ResponseCode_AUTH_FAILED, + aurora.ResponseCode_JOB_UPDATING_ERROR: + c.logger.Printf("Terminal Response Code %v from Aurora, won't retry\n", resp.GetResponseCode().String()) + return resp, errors.New(response.CombineMessage(resp)) + + // The only case that should fall down to here is a WARNING response code. + // It is currently not used as a response in the scheduler so it is unknown how to handle it. + default: + c.logger.DebugPrintf("unhandled response code %v received from Aurora\n", responseCode) + return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String()) + } } - c.logger.DebugPrintf("it took %v retries to complete this operation\n", curStep) - if curStep > 1 { - c.config.logger.Printf("retried this thrift call %d time(s)", curStep) + c.config.logger.Printf("this thrift call was retried %d time(s)", curStep) } // Provide more information to the user wherever possible. if clientErr != nil { return nil, newRetryError(errors.Wrap(clientErr, "ran out of retries, including latest error"), curStep) - } else { - return nil, newRetryError(errors.New("ran out of retries"), curStep) } + + return nil, newRetryError(errors.New("ran out of retries"), curStep) +} + +// isConnectionError processes the error received by the client. +// The return values indicate whether this was determined to be a temporary error +// and whether it was determined to be a timeout error +func isConnectionError(err error) (bool, bool) { + + // Determine if error is a temporary URL error by going up the stack + transportException, ok := err.(thrift.TTransportException) + if !ok { + return false, false + } + + urlError, ok := transportException.Err().(*url.Error) + if !ok { + return false, false + } + + // EOF error occurs when the server closes the read buffer of the client. This is common + // when the server is overloaded and we consider it temporary. + // All other which are not temporary as per the member function Temporary(), + // are considered not temporary (permanent). + if urlError.Err != io.EOF && !urlError.Temporary() { + return false, false + } + + return true, urlError.Timeout() } diff --git a/util.go b/util.go index f993aaa..a822b3f 100644 --- a/util.go +++ b/util.go @@ -40,7 +40,7 @@ func init() { } } -// TerminalJobUpdateStates returns a slice containing all the terminal states an update may end up in. +// TerminalUpdateStates returns a slice containing all the terminal states an update may be in. // This is a function in order to avoid having a slice that can be accidentally mutated. func TerminalUpdateStates() []aurora.JobUpdateStatus { return []aurora.JobUpdateStatus{