diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index eb459c8..485b6e4 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -11,7 +11,7 @@ on: jobs: build: - runs-on: ubuntu-20.04 + runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 diff --git a/go.mod b/go.mod index c9c3dee..3b3095f 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ require ( github.com/apache/thrift v0.14.0 github.com/pkg/errors v0.9.1 github.com/samuel/go-zookeeper v0.0.0-20171117190445-471cd4e61d7a - github.com/stretchr/testify v1.7.0 + github.com/stretchr/testify v1.5.0 ) go 1.16 diff --git a/go.sum b/go.sum index d65a779..9324c97 100644 --- a/go.sum +++ b/go.sum @@ -12,11 +12,7 @@ github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.5.0 h1:DMOzIV76tmoDNE9pX6RSN0aDtCYeCg5VueieJaAo1uw= github.com/stretchr/testify v1.5.0/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= -github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/helpers.go b/helpers.go deleted file mode 100644 index f1983db..0000000 --- a/helpers.go +++ /dev/null @@ -1,23 +0,0 @@ -package realis - -import ( - "context" - - "github.com/aurora-scheduler/gorealis/v2/gen-go/apache/aurora" -) - -func (r *Client) JobExists(key aurora.JobKey) (bool, error) { - resp, err := r.client.GetConfigSummary(context.TODO(), &key) - if err != nil { - return false, err - } - - return resp != nil && - resp.GetResult_() != nil && - resp.GetResult_().GetConfigSummaryResult_() != nil && - resp.GetResult_().GetConfigSummaryResult_().GetSummary() != nil && - resp.GetResult_().GetConfigSummaryResult_().GetSummary().GetGroups() != nil && - len(resp.GetResult_().GetConfigSummaryResult_().GetSummary().GetGroups()) > 0 && - resp.GetResponseCode() == aurora.ResponseCode_OK, - nil -} diff --git a/realis.go b/realis.go index c331918..1219e92 100644 --- a/realis.go +++ b/realis.go @@ -315,13 +315,11 @@ func (c *Client) GetInstanceIds(key aurora.JobKey, states []aurora.ScheduleStatu Statuses: states, } - c.logger.DebugPrintf("GetInstanceIds Thrift Payload: %+v\n", taskQ) + c.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", taskQ) resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.GetTasksWithoutConfigs(context.TODO(), taskQ) - }, - nil, - ) + }) // If we encountered an error we couldn't recover from by retrying, return an error to the user if retryErr != nil { @@ -343,13 +341,8 @@ func (c *Client) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (* resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.readonlyClient.GetJobUpdateSummaries(context.TODO(), jobUpdateQuery) - }, - nil, - ) + }) - if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetGetJobUpdateSummariesResult_() == nil { - return nil, errors.New("unexpected response from scheduler") - } if retryErr != nil { return nil, errors.Wrap(retryErr, "error getting job update summaries from Aurora Scheduler") } @@ -361,12 +354,8 @@ func (c *Client) GetJobSummary(role string) (*aurora.JobSummaryResult_, error) { resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.readonlyClient.GetJobSummary(context.TODO(), role) - }, - nil, - ) - if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetJobSummaryResult_() == nil { - return nil, errors.New("unexpected response from scheduler") - } + }) + if retryErr != nil { return nil, errors.Wrap(retryErr, "error getting job summaries from Aurora Scheduler") } @@ -376,20 +365,21 @@ func (c *Client) GetJobSummary(role string) (*aurora.JobSummaryResult_, error) { func (c *Client) GetJobs(role string) (*aurora.GetJobsResult_, error) { + var result *aurora.GetJobsResult_ + resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.readonlyClient.GetJobs(context.TODO(), role) - }, - nil, - ) + }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "error getting Jobs from Aurora Scheduler") - } - if resp == nil || resp.GetResult_() == nil { - return nil, errors.New("unexpected response from scheduler") + return result, errors.Wrap(retryErr, "error getting Jobs from Aurora Scheduler") } - return resp.GetResult_().GetJobsResult_, nil + if resp.GetResult_() != nil { + result = resp.GetResult_().GetJobsResult_ + } + + return result, nil } // Kill specific instances of a job. Returns true, nil if a task was actually killed as a result of this API call. @@ -399,19 +389,19 @@ func (c *Client) KillInstances(key aurora.JobKey, instances ...int32) (bool, err resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.KillTasks(context.TODO(), &key, instances, "") - }, - nil, - ) + }) if retryErr != nil { return false, errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler") } - if resp == nil || len(resp.GetDetails()) > 0 { + if len(resp.GetDetails()) > 0 { c.logger.Println("KillTasks was called but no tasks killed as a result.") return false, nil + } else { + return true, nil } - return true, nil + } func (c *Client) RealisConfig() *clientConfig { @@ -426,9 +416,7 @@ func (c *Client) KillJob(key aurora.JobKey) error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { // Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards return c.client.KillTasks(context.TODO(), &key, nil, "") - }, - nil, - ) + }) if retryErr != nil { return errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler") @@ -450,27 +438,9 @@ func (c *Client) CreateJob(auroraJob *AuroraJob) error { return errors.Wrap(err, "unable to create Thermos payload") } - // Response is checked by the thrift retry code - _, retryErr := c.thriftCallWithRetries( - false, - func() (*aurora.Response, error) { - return c.client.CreateJob(context.TODO(), auroraJob.JobConfig()) - }, - // On a client timeout, attempt to verify that payload made to the Scheduler by - // trying to get the config summary for the job key - func() (*aurora.Response, bool) { - exists, err := c.JobExists(auroraJob.JobKey()) - if err != nil { - c.logger.Print("verification failed ", err) - } - - if exists { - return &aurora.Response{ResponseCode: aurora.ResponseCode_OK}, true - } - - return nil, false - }, - ) + _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.client.CreateJob(context.TODO(), auroraJob.JobConfig()) + }) if retryErr != nil { return errors.Wrap(retryErr, "error sending Create command to Aurora Scheduler") @@ -501,9 +471,7 @@ func (c *Client) ScheduleCronJob(auroraJob *AuroraJob) error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.ScheduleCronJob(context.TODO(), auroraJob.JobConfig()) - }, - nil, - ) + }) if retryErr != nil { return errors.Wrap(retryErr, "error sending Cron AuroraJob Schedule message to Aurora Scheduler") @@ -517,9 +485,7 @@ func (c *Client) DescheduleCronJob(key aurora.JobKey) error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.DescheduleCronJob(context.TODO(), &key) - }, - nil, - ) + }) if retryErr != nil { return errors.Wrap(retryErr, "error sending Cron AuroraJob De-schedule message to Aurora Scheduler") @@ -535,9 +501,7 @@ func (c *Client) StartCronJob(key aurora.JobKey) error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.StartCronJob(context.TODO(), &key) - }, - nil, - ) + }) if retryErr != nil { return errors.Wrap(retryErr, "error sending Start Cron AuroraJob message to Aurora Scheduler") @@ -552,9 +516,7 @@ func (c *Client) RestartInstances(key aurora.JobKey, instances ...int32) error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.RestartShards(context.TODO(), &key, instances) - }, - nil, - ) + }) if retryErr != nil { return errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler") @@ -575,17 +537,16 @@ func (c *Client) RestartJob(key aurora.JobKey) error { if len(instanceIds) > 0 { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.RestartShards(context.TODO(), &key, instanceIds) - }, - nil, - ) + }) if retryErr != nil { return errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler") } return nil + } else { + return errors.New("no tasks in the Active state") } - return errors.New("no tasks in the Active state") } // Update all tasks under a job configuration. Currently gorealis doesn't support for canary deployments. @@ -597,80 +558,34 @@ func (c *Client) StartJobUpdate(updateJob *JobUpdate, message string) (*aurora.S c.logger.DebugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message) - resp, retryErr := c.thriftCallWithRetries(false, - func() (*aurora.Response, error) { - return c.client.StartJobUpdate(context.TODO(), updateJob.request, message) - }, - func() (*aurora.Response, bool) { - key := updateJob.JobKey() - summariesResp, err := c.readonlyClient.GetJobUpdateSummaries( - context.TODO(), - &aurora.JobUpdateQuery{ - JobKey: &key, - UpdateStatuses: aurora.ACTIVE_JOB_UPDATE_STATES, - Limit: 1, - }) - - if err != nil { - c.logger.Print("verification failed ", err) - return nil, false - } - - summaries := response.JobUpdateSummaries(summariesResp) - if len(summaries) == 0 { - return nil, false - } - - return &aurora.Response{ - ResponseCode: aurora.ResponseCode_OK, - Result_: &aurora.Result_{ - StartJobUpdateResult_: &aurora.StartJobUpdateResult_{ - UpdateSummary: summaries[0], - Key: summaries[0].Key, - }, - }, - }, true - }, - ) + resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.client.StartJobUpdate(nil, updateJob.request, message) + }) if retryErr != nil { - // A timeout took place when attempting this call, attempt to recover - if IsTimeout(retryErr) { - return nil, retryErr - } - return nil, errors.Wrap(retryErr, "error sending StartJobUpdate command to Aurora Scheduler") } - if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetStartJobUpdateResult_() == nil { - return nil, errors.New("unexpected response from scheduler") + + if resp.GetResult_() != nil && resp.GetResult_().GetStartJobUpdateResult_() != nil { + return resp.GetResult_().GetStartJobUpdateResult_(), nil } - return resp.GetResult_().GetStartJobUpdateResult_(), nil + + return nil, errors.New("thrift error: Field in response is nil unexpectedly.") } -// AbortJobUpdate terminates a job update in the scheduler. -// It requires the updateId which can be obtained on the Aurora web UI. -// This API is meant to be synchronous. It will attempt to wait until the update transitions to the aborted state. -// However, if the job update does not transition to the ABORT state an error will be returned. +// Abort AuroraJob Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI. func (c *Client) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) error { c.logger.DebugPrintf("AbortJobUpdate Thrift Payload: %+v %v\n", updateKey, message) _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.AbortJobUpdate(context.TODO(), &updateKey, message) - }, - nil, - ) + }) if retryErr != nil { return errors.Wrap(retryErr, "error sending AbortJobUpdate command to Aurora Scheduler") } - // Make this call synchronous by blocking until it job has successfully transitioned to aborted - _, err := c.MonitorJobUpdateStatus( - updateKey, - []aurora.JobUpdateStatus{aurora.JobUpdateStatus_ABORTED}, - time.Second*5, - time.Minute) - return err + return nil } // Pause AuroraJob Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. @@ -690,9 +605,7 @@ func (c *Client) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.PauseJobUpdate(nil, updateKeyLocal, message) - }, - nil, - ) + }) if retryErr != nil { return errors.Wrap(retryErr, "error sending PauseJobUpdate command to Aurora Scheduler") @@ -719,9 +632,7 @@ func (c *Client) ResumeJobUpdate(updateKey aurora.JobUpdateKey, message string) _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.ResumeJobUpdate(context.TODO(), &updateKey, message) - }, - nil, - ) + }) if retryErr != nil { return errors.Wrap(retryErr, "error sending ResumeJobUpdate command to Aurora Scheduler") @@ -742,19 +653,18 @@ func (c *Client) PulseJobUpdate(updateKey aurora.JobUpdateKey) (aurora.JobUpdate resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.PulseJobUpdate(context.TODO(), &updateKey) - }, - nil, - ) + }) if retryErr != nil { return aurora.JobUpdatePulseStatus(0), errors.Wrap(retryErr, "error sending PulseJobUpdate command to Aurora Scheduler") } - if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetPulseJobUpdateResult_() == nil { - return aurora.JobUpdatePulseStatus(0), errors.New("unexpected response from scheduler") + if resp.GetResult_() != nil && resp.GetResult_().GetPulseJobUpdateResult_() != nil { + return resp.GetResult_().GetPulseJobUpdateResult_().GetStatus(), nil + } else { + return aurora.JobUpdatePulseStatus(0), errors.New("thrift error, field was nil unexpectedly") } - return resp.GetResult_().GetPulseJobUpdateResult_().GetStatus(), nil } // Scale up the number of instances under a job configuration using the configuration for specific @@ -771,9 +681,7 @@ func (c *Client) AddInstances(instKey aurora.InstanceKey, count int32) error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.AddInstances(context.TODO(), &instKey, count) - }, - nil, - ) + }) if retryErr != nil { return errors.Wrap(retryErr, "error sending AddInstances command to Aurora Scheduler") @@ -818,16 +726,11 @@ func (c *Client) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.GetTasksStatus(context.TODO(), query) - }, - nil, - ) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status") } - if resp == nil { - return nil, errors.New("unexpected response from scheduler") - } return response.ScheduleStatusResult(resp).GetTasks(), nil } @@ -839,32 +742,29 @@ func (c *Client) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.PendingRea resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.GetPendingReason(context.TODO(), query) - }, - nil, - ) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for pending Reasons") } - if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetGetPendingReasonResult_() == nil { - return nil, errors.New("unexpected response from scheduler") + var result []*aurora.PendingReason + + if resp.GetResult_() != nil { + result = resp.GetResult_().GetGetPendingReasonResult_().GetReasons() } - return resp.GetResult_().GetGetPendingReasonResult_().GetReasons(), nil + return result, nil } -// GetTasksWithoutConfigs gets information about task including without a task configuration object. -// This is a more lightweight version of GetTaskStatus but contains less information as a result. +// Get information about task including without a task configuration object func (c *Client) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) { c.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query) resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.GetTasksWithoutConfigs(context.TODO(), query) - }, - nil, - ) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status without configs") @@ -891,9 +791,7 @@ func (c *Client) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.GetTasksStatus(context.TODO(), taskQ) - }, - nil, - ) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task configuration") @@ -919,19 +817,17 @@ func (c *Client) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) ([]*aurora. resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.GetJobUpdateDetails(context.TODO(), &updateQuery) - }, - nil, - ) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "unable to get job update details") } - if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetGetJobUpdateDetailsResult_() == nil { - return nil, errors.New("unexpected response from scheduler") + if resp.GetResult_() != nil && resp.GetResult_().GetGetJobUpdateDetailsResult_() != nil { + return resp.GetResult_().GetGetJobUpdateDetailsResult_().GetDetailsList(), nil + } else { + return nil, errors.New("unknown Thrift error, field is nil.") } - - return resp.GetResult_().GetGetJobUpdateDetailsResult_().GetDetailsList(), nil } func (c *Client) RollbackJobUpdate(key aurora.JobUpdateKey, message string) error { @@ -940,9 +836,7 @@ func (c *Client) RollbackJobUpdate(key aurora.JobUpdateKey, message string) erro _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.RollbackJobUpdate(context.TODO(), &key, message) - }, - nil, - ) + }) if retryErr != nil { return errors.Wrap(retryErr, "unable to roll back job update") diff --git a/realis_admin.go b/realis_admin.go index f100f46..f2759ea 100644 --- a/realis_admin.go +++ b/realis_admin.go @@ -37,19 +37,17 @@ func (c *Client) DrainHosts(hosts ...string) ([]*aurora.HostStatus, error) { resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.DrainHosts(context.TODO(), drainList) - }, - nil, - ) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "unable to recover connection") } - if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetDrainHostsResult_() == nil { - return nil, errors.New("unexpected response from scheduler") + if resp.GetResult_() != nil && resp.GetResult_().GetDrainHostsResult_() != nil { + return resp.GetResult_().GetDrainHostsResult_().GetStatuses(), nil + } else { + return nil, errors.New("thrift error: Field in response is nil unexpectedly.") } - - return resp.GetResult_().GetDrainHostsResult_().GetStatuses(), nil } // Start SLA Aware Drain. @@ -80,19 +78,17 @@ func (c *Client) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, hosts .. resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.SlaDrainHosts(context.TODO(), drainList, policy, timeout) - }, - nil, - ) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "unable to recover connection") } - if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetDrainHostsResult_() == nil { - return nil, errors.New("unexpected response from scheduler") + if resp.GetResult_() != nil && resp.GetResult_().GetDrainHostsResult_() != nil { + return resp.GetResult_().GetDrainHostsResult_().GetStatuses(), nil + } else { + return nil, errors.New("thrift error: Field in response is nil unexpectedly.") } - - return resp.GetResult_().GetDrainHostsResult_().GetStatuses(), nil } func (c *Client) StartMaintenance(hosts ...string) ([]*aurora.HostStatus, error) { @@ -108,19 +104,17 @@ func (c *Client) StartMaintenance(hosts ...string) ([]*aurora.HostStatus, error) resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.StartMaintenance(context.TODO(), hostList) - }, - nil, - ) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "unable to recover connection") } - if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetStartMaintenanceResult_() == nil { - return nil, errors.New("unexpected response from scheduler") + if resp.GetResult_() != nil && resp.GetResult_().GetStartMaintenanceResult_() != nil { + return resp.GetResult_().GetStartMaintenanceResult_().GetStatuses(), nil + } else { + return nil, errors.New("thrift error: Field in response is nil unexpectedly.") } - - return resp.GetResult_().GetStartMaintenanceResult_().GetStatuses(), nil } func (c *Client) EndMaintenance(hosts ...string) ([]*aurora.HostStatus, error) { @@ -136,20 +130,24 @@ func (c *Client) EndMaintenance(hosts ...string) ([]*aurora.HostStatus, error) { resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.EndMaintenance(context.TODO(), hostList) - }, - nil, - ) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "unable to recover connection") } - if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetEndMaintenanceResult_() == nil { - return nil, errors.New("unexpected response from scheduler") + + if resp.GetResult_() != nil && resp.GetResult_().GetEndMaintenanceResult_() != nil { + return resp.GetResult_().GetEndMaintenanceResult_().GetStatuses(), nil + } else { + return nil, errors.New("thrift error: Field in response is nil unexpectedly.") } - return resp.GetResult_().GetEndMaintenanceResult_().GetStatuses(), nil + } func (c *Client) MaintenanceStatus(hosts ...string) (*aurora.MaintenanceStatusResult_, error) { + + var result *aurora.MaintenanceStatusResult_ + if len(hosts) == 0 { return nil, errors.New("no hosts provided to get maintenance status from") } @@ -163,18 +161,17 @@ func (c *Client) MaintenanceStatus(hosts ...string) (*aurora.MaintenanceStatusRe // and continue trying to resend command until we run out of retries. resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.MaintenanceStatus(context.TODO(), hostList) - }, - nil, - ) + }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "unable to recover connection") - } - if resp == nil || resp.GetResult_() == nil { - return nil, errors.New("unexpected response from scheduler") + return result, errors.Wrap(retryErr, "unable to recover connection") } - return resp.GetResult_().GetMaintenanceStatusResult_(), nil + if resp.GetResult_() != nil { + result = resp.GetResult_().GetMaintenanceStatusResult_() + } + + return result, nil } // SetQuota sets a quota aggregate for the given role @@ -192,9 +189,7 @@ func (c *Client) SetQuota(role string, cpu *float64, ramMb *int64, diskMb *int64 _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.SetQuota(context.TODO(), role, quota) - }, - nil, - ) + }) if retryErr != nil { return errors.Wrap(retryErr, "unable to set role quota") @@ -208,18 +203,17 @@ func (c *Client) GetQuota(role string) (*aurora.GetQuotaResult_, error) { resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.GetQuota(context.TODO(), role) - }, - nil, - ) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "unable to get role quota") } - if resp == nil || resp.GetResult_() == nil { - return nil, errors.New("unexpected response from scheduler") + if resp.GetResult_() != nil { + return resp.GetResult_().GetGetQuotaResult_(), nil + } else { + return nil, errors.New("thrift error: Field in response is nil unexpectedly.") } - return resp.GetResult_().GetGetQuotaResult_(), nil } // Force Aurora Scheduler to perform a snapshot and write to Mesos log @@ -227,9 +221,7 @@ func (c *Client) Snapshot() error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.Snapshot(context.TODO()) - }, - nil, - ) + }) if retryErr != nil { return errors.Wrap(retryErr, "unable to recover connection") @@ -243,9 +235,7 @@ func (c *Client) PerformBackup() error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.PerformBackup(context.TODO()) - }, - nil, - ) + }) if retryErr != nil { return errors.Wrap(retryErr, "unable to recover connection") @@ -259,9 +249,7 @@ func (c *Client) ForceImplicitTaskReconciliation() error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.TriggerImplicitTaskReconciliation(context.TODO()) - }, - nil, - ) + }) if retryErr != nil { return errors.Wrap(retryErr, "unable to recover connection") @@ -282,9 +270,7 @@ func (c *Client) ForceExplicitTaskReconciliation(batchSize *int32) error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.TriggerExplicitTaskReconciliation(context.TODO(), settings) - }, - nil, - ) + }) if retryErr != nil { return errors.Wrap(retryErr, "unable to recover connection") diff --git a/realis_e2e_test.go b/realis_e2e_test.go index 5893491..1f544d8 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -477,15 +477,13 @@ func TestRealisClient_CreateService(t *testing.T) { var ok bool var mErr error - if result != nil { - if ok, mErr = r.MonitorJobUpdate(*result.GetKey(), 5*time.Second, 4*time.Minute); !ok || mErr != nil { - // Update may already be in a terminal state so don't check for error - err := r.AbortJobUpdate(*result.GetKey(), "Monitor timed out.") + if ok, mErr = r.MonitorJobUpdate(*result.GetKey(), 5*time.Second, 4*time.Minute); !ok || mErr != nil { + // Update may already be in a terminal state so don't check for error + err := r.AbortJobUpdate(*result.GetKey(), "Monitor timed out.") - err = r.KillJob(job.JobKey()) + err = r.KillJob(job.JobKey()) - assert.NoError(t, err) - } + assert.NoError(t, err) } assert.True(t, ok) @@ -1447,59 +1445,3 @@ func TestRealisClient_FitTasks(t *testing.T) { } } } - -func TestRealisClient_JobExists(t *testing.T) { - role := "vagrant" - env := "prod" - name := "test_job_exists" - // Create a good single job - job := realis.NewJob(). - Environment(env). - Role(role). - Name(name). - ThermosExecutor(thermosExec). - CPU(.25). - RAM(4). - Disk(10). - InstanceCount(3). - IsService(true). - Production(false). - Tier("preemptible"). - Priority(0) - - // Check if job exists before creating - exists, err := r.JobExists(job.JobKey()) - assert.NoError(t, err) - assert.False(t, exists) - - err = r.CreateJob(job) - assert.NoError(t, err) - - exists, err = r.JobExists(job.JobKey()) - assert.NoError(t, err) - assert.True(t, exists) - - // Create a single bad job - badJob := realis.NewJob(). - Environment("prod"). - Role("vagrant"). - Name("executordoesntexist"). - ExecutorName("idontexist"). - ExecutorData(""). - CPU(.25). - RAM(4). - Disk(10). - InstanceCount(1) - - // Check if job exists before creating - exists, err = r.JobExists(badJob.JobKey()) - assert.NoError(t, err) - assert.False(t, exists) - - err = r.CreateJob(badJob) - assert.Error(t, err) - - exists, err = r.JobExists(badJob.JobKey()) - assert.NoError(t, err) - assert.False(t, exists) -} diff --git a/response/response.go b/response/response.go index 15081ec..1663b1b 100644 --- a/response/response.go +++ b/response/response.go @@ -35,10 +35,6 @@ func ScheduleStatusResult(resp *aurora.Response) *aurora.ScheduleStatusResult_ { } func JobUpdateSummaries(resp *aurora.Response) []*aurora.JobUpdateSummary { - if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetGetJobUpdateSummariesResult_() == nil { - return nil - } - return resp.GetResult_().GetGetJobUpdateSummariesResult_().GetUpdateSummaries() } diff --git a/retry.go b/retry.go index f5b0918..9860e42 100644 --- a/retry.go +++ b/retry.go @@ -17,7 +17,10 @@ package realis import ( "io" "math/rand" + "net/http" "net/url" + "strconv" + "strings" "time" "github.com/apache/thrift/lib/go/thrift" @@ -26,11 +29,9 @@ import ( "github.com/pkg/errors" ) -// Backoff determines how the retry mechanism should react after each failure and how many failures it should -// tolerate. type Backoff struct { Duration time.Duration // the base duration - Factor float64 // Duration is multiplied by a factor each iteration + Factor float64 // Duration is multipled by factor each iteration Jitter float64 // The amount of jitter applied each iteration Steps int // Exit with error after this many steps } @@ -52,15 +53,18 @@ func Jitter(duration time.Duration, maxFactor float64) time.Duration { // if the loop should be aborted. type ConditionFunc func() (done bool, err error) -// ExponentialBackoff is a modified version of the Kubernetes exponential-backoff code. -// It repeats a condition check with exponential backoff and checks the condition up to -// Steps times, increasing the wait by multiplying the previous duration by Factor. +// Modified version of the Kubernetes exponential-backoff code. +// ExponentialBackoff repeats a condition check with exponential backoff. +// +// It checks the condition up to Steps times, increasing the wait by multiplying +// the previous duration by Factor. // // If Jitter is greater than zero, a random amount of each duration is added // (between duration and duration*(1+jitter)). // // If the condition never returns true, ErrWaitTimeout is returned. Errors // do not cause the function to return. + func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc) error { var err error var ok bool @@ -94,9 +98,10 @@ func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc) // If the error is temporary, continue retrying. if !IsTemporary(err) { return err + } else { + // Print out the temporary error we experienced. + logger.Println(err) } - // Print out the temporary error we experienced. - logger.Println(err) } } @@ -107,28 +112,19 @@ func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc) // Provide more information to the user wherever possible if err != nil { return newRetryError(errors.Wrap(err, "ran out of retries"), curStep) + } else { + return newRetryError(errors.New("ran out of retries"), curStep) } - - return newRetryError(errors.New("ran out of retries"), curStep) } type auroraThriftCall func() (resp *aurora.Response, err error) -// verifyOntimeout defines the type of function that will be used to verify whether a Thirft call to the Scheduler -// made it to the scheduler or not. In general, these types of functions will have to interact with the scheduler -// through the very same Thrift API which previously encountered a time-out from the client. -// This means that the functions themselves should be kept to a minimum number of Thrift calls. -// It should also be noted that this is a best effort mechanism and -// is likely to fail for the same reasons that the original call failed. -type verifyOnTimeout func() (*aurora.Response, bool) - // Duplicates the functionality of ExponentialBackoff but is specifically targeted towards ThriftCalls. -func (c *Client) thriftCallWithRetries(returnOnTimeout bool, thriftCall auroraThriftCall, - verifyOnTimeout verifyOnTimeout) (*aurora.Response, error) { +func (c *Client) thriftCallWithRetries(returnOnTimeout bool, thriftCall auroraThriftCall) (*aurora.Response, error) { var resp *aurora.Response var clientErr error var curStep int - timeouts := 0 + var timeouts int backoff := c.config.backoff duration := backoff.Duration @@ -142,10 +138,7 @@ func (c *Client) thriftCallWithRetries(returnOnTimeout bool, thriftCall auroraTh adjusted = Jitter(duration, backoff.Jitter) } - c.logger.Printf( - "A retryable error occurred during thrift call, backing off for %v before retry %v", - adjusted, - curStep) + c.logger.Printf("A retryable error occurred during thrift call, backing off for %v before retry %v\n", adjusted, curStep) time.Sleep(adjusted) duration = time.Duration(float64(duration) * backoff.Factor) @@ -160,132 +153,105 @@ func (c *Client) thriftCallWithRetries(returnOnTimeout bool, thriftCall auroraTh resp, clientErr = thriftCall() - c.logger.TracePrintf("Aurora Thrift Call ended resp: %v clientErr: %v", resp, clientErr) + c.logger.TracePrintf("Aurora Thrift Call ended resp: %v clientErr: %v\n", resp, clientErr) }() // Check if our thrift call is returning an error. This is a retryable event as we don't know // if it was caused by network issues. if clientErr != nil { + // Print out the error to the user - c.logger.Printf("Client Error: %v", clientErr) + c.logger.Printf("Client Error: %v\n", clientErr) - temporary, timedout := isConnectionError(clientErr) - if !temporary && c.RealisConfig().failOnPermanentErrors { - return nil, errors.Wrap(clientErr, "permanent connection error") - } + // Determine if error is a temporary URL error by going up the stack + e, ok := clientErr.(thrift.TTransportException) + if ok { + c.logger.DebugPrint("Encountered a transport exception") - // There exists a corner case where thrift payload was received by Aurora but - // connection timed out before Aurora was able to reply. - // Users can take special action on a timeout by using IsTimedout and reacting accordingly - // if they have configured the client to return on a timeout. - if timedout && returnOnTimeout { - return resp, newTimedoutError(errors.New("client connection closed before server answer")) + // TODO(rdelvalle): Figure out a better way to obtain the error code as this is a very brittle solution + // 401 Unauthorized means the wrong username and password were provided + if strings.Contains(e.Error(), strconv.Itoa(http.StatusUnauthorized)) { + return nil, errors.Wrap(clientErr, "wrong username or password provided") + } + + e, ok := e.Err().(*url.Error) + if ok { + // EOF error occurs when the server closes the read buffer of the client. This is common + // when the server is overloaded and should be retried. All other errors that are permanent + // will not be retried. + if e.Err != io.EOF && !e.Temporary() && c.RealisConfig().failOnPermanentErrors { + return nil, errors.Wrap(clientErr, "permanent connection error") + } + // Corner case where thrift payload was received by Aurora but connection timedout before Aurora was + // able to reply. In this case we will return whatever response was received and a TimedOut behaving + // error. Users can take special action on a timeout by using IsTimedout and reacting accordingly. + if e.Timeout() { + timeouts++ + c.logger.DebugPrintf( + "Client closed connection (timedout) %d times before server responded,"+ + " consider increasing connection timeout", + timeouts) + if returnOnTimeout { + return resp, + newTimedoutError(errors.New("client connection closed before server answer")) + } + } + } } // In the future, reestablish connection should be able to check if it is actually possible // to make a thrift call to Aurora. For now, a reconnect should always lead to a retry. // Ignoring error due to the fact that an error should be retried regardless - reestablishErr := c.ReestablishConn() - if reestablishErr != nil { - c.logger.DebugPrintf("error re-establishing connection ", reestablishErr) + _ = c.ReestablishConn() + + } else { + + // If there was no client error, but the response is nil, something went wrong. + // Ideally, we'll never encounter this but we're placing a safeguard here. + if resp == nil { + return nil, errors.New("response from aurora is nil") } - // If users did not opt for a return on timeout in order to react to a timedout error, - // attempt to verify that the call made it to the scheduler after the connection was re-established. - if timedout { - timeouts++ - c.logger.DebugPrintf( - "Client closed connection %d times before server responded, "+ - "consider increasing connection timeout", - timeouts) + // Check Response Code from thrift and make a decision to continue retrying or not. + switch responseCode := resp.GetResponseCode(); responseCode { - // Allow caller to provide a function which checks if the original call was successful before - // it timed out. - if verifyOnTimeout != nil { - if verifyResp, ok := verifyOnTimeout(); ok { - c.logger.Print("verified that the call went through successfully after a client timeout") - // Response here might be different than the original as it is no longer constructed - // by the scheduler but mimicked. - // This is OK since the scheduler is very unlikely to change responses at this point in its - // development cycle but we must be careful to not return an incorrectly constructed response. - return verifyResp, nil - } - } + // If the thrift call succeeded, stop retrying + case aurora.ResponseCode_OK: + return resp, nil + + // If the response code is transient, continue retrying + case aurora.ResponseCode_ERROR_TRANSIENT: + c.logger.Println("Aurora replied with Transient error code, retrying") + continue + + // Failure scenarios, these indicate a bad payload or a bad clientConfig. Stop retrying. + case aurora.ResponseCode_INVALID_REQUEST, + aurora.ResponseCode_ERROR, + aurora.ResponseCode_AUTH_FAILED, + aurora.ResponseCode_JOB_UPDATING_ERROR: + c.logger.Printf("Terminal Response Code %v from Aurora, won't retry\n", resp.GetResponseCode().String()) + return resp, errors.New(response.CombineMessage(resp)) + + // The only case that should fall down to here is a WARNING response code. + // It is currently not used as a response in the scheduler so it is unknown how to handle it. + default: + c.logger.DebugPrintf("unhandled response code %v received from Aurora\n", responseCode) + return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String()) } - - // Retry the thrift payload - continue } - // If there was no client error, but the response is nil, something went wrong. - // Ideally, we'll never encounter this but we're placing a safeguard here. - if resp == nil { - return nil, errors.New("response from aurora is nil") - } - - // Check Response Code from thrift and make a decision to continue retrying or not. - switch responseCode := resp.GetResponseCode(); responseCode { - - // If the thrift call succeeded, stop retrying - case aurora.ResponseCode_OK: - return resp, nil - - // If the response code is transient, continue retrying - case aurora.ResponseCode_ERROR_TRANSIENT: - c.logger.Println("Aurora replied with Transient error code, retrying") - continue - - // Failure scenarios, these indicate a bad payload or a bad clientConfig. Stop retrying. - case aurora.ResponseCode_INVALID_REQUEST, - aurora.ResponseCode_ERROR, - aurora.ResponseCode_AUTH_FAILED, - aurora.ResponseCode_JOB_UPDATING_ERROR: - c.logger.Printf("Terminal Response Code %v from Aurora, won't retry\n", resp.GetResponseCode().String()) - return resp, errors.New(response.CombineMessage(resp)) - - // The only case that should fall down to here is a WARNING response code. - // It is currently not used as a response in the scheduler so it is unknown how to handle it. - default: - c.logger.DebugPrintf("unhandled response code %v received from Aurora\n", responseCode) - return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String()) - } } + c.logger.DebugPrintf("it took %v retries to complete this operation\n", curStep) + if curStep > 1 { - c.config.logger.Printf("this thrift call was retried %d time(s)", curStep) + c.config.logger.Printf("retried this thrift call %d time(s)", curStep) } // Provide more information to the user wherever possible. if clientErr != nil { return nil, newRetryError(errors.Wrap(clientErr, "ran out of retries, including latest error"), curStep) + } else { + return nil, newRetryError(errors.New("ran out of retries"), curStep) } - - return nil, newRetryError(errors.New("ran out of retries"), curStep) -} - -// isConnectionError processes the error received by the client. -// The return values indicate whether this was determined to be a temporary error -// and whether it was determined to be a timeout error -func isConnectionError(err error) (bool, bool) { - - // Determine if error is a temporary URL error by going up the stack - transportException, ok := err.(thrift.TTransportException) - if !ok { - return false, false - } - - urlError, ok := transportException.Err().(*url.Error) - if !ok { - return false, false - } - - // EOF error occurs when the server closes the read buffer of the client. This is common - // when the server is overloaded and we consider it temporary. - // All other which are not temporary as per the member function Temporary(), - // are considered not temporary (permanent). - if urlError.Err != io.EOF && !urlError.Temporary() { - return false, false - } - - return true, urlError.Timeout() } diff --git a/util.go b/util.go index a822b3f..f993aaa 100644 --- a/util.go +++ b/util.go @@ -40,7 +40,7 @@ func init() { } } -// TerminalUpdateStates returns a slice containing all the terminal states an update may be in. +// TerminalJobUpdateStates returns a slice containing all the terminal states an update may end up in. // This is a function in order to avoid having a slice that can be accidentally mutated. func TerminalUpdateStates() []aurora.JobUpdateStatus { return []aurora.JobUpdateStatus{