diff --git a/job.go b/job.go index b7d443b..365637e 100644 --- a/job.go +++ b/job.go @@ -178,7 +178,9 @@ func (j *AuroraJob) GPU(gpu int64) Job { // rejects jobs with GPU resources attached to it. if _, ok := j.resources[GPU]; !ok { j.resources[GPU] = &aurora.Resource{} - j.JobConfig().GetTaskConfig().Resources = append(j.JobConfig().GetTaskConfig().Resources, j.resources[GPU]) + j.JobConfig().GetTaskConfig().Resources = append( + j.JobConfig().GetTaskConfig().Resources, + j.resources[GPU]) } j.resources[GPU].NumGpus = &gpu @@ -259,7 +261,9 @@ func (j *AuroraJob) AddLabel(key string, value string) Job { func (j *AuroraJob) AddNamedPorts(names ...string) Job { j.portCount += len(names) for _, name := range names { - j.jobConfig.TaskConfig.Resources = append(j.jobConfig.TaskConfig.Resources, &aurora.Resource{NamedPort: &name}) + j.jobConfig.TaskConfig.Resources = append( + j.jobConfig.TaskConfig.Resources, + &aurora.Resource{NamedPort: &name}) } return j @@ -274,7 +278,9 @@ func (j *AuroraJob) AddPorts(num int) Job { j.portCount += num for i := start; i < j.portCount; i++ { portName := "org.apache.aurora.port." + strconv.Itoa(i) - j.jobConfig.TaskConfig.Resources = append(j.jobConfig.TaskConfig.Resources, &aurora.Resource{NamedPort: &portName}) + j.jobConfig.TaskConfig.Resources = append( + j.jobConfig.TaskConfig.Resources, + &aurora.Resource{NamedPort: &portName}) } return j diff --git a/monitors.go b/monitors.go index 72e7027..eb7c85c 100644 --- a/monitors.go +++ b/monitors.go @@ -27,7 +27,10 @@ type Monitor struct { } // Polls the scheduler every certain amount of time to see if the update has succeeded -func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout int) (bool, error) { +func (m *Monitor) JobUpdate( + updateKey aurora.JobUpdateKey, + interval int, + timeout int) (bool, error) { updateQ := aurora.JobUpdateQuery{ Key: &updateKey, @@ -40,7 +43,10 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout aurora.JobUpdateStatus_FAILED, }, } - updateSummaries, err := m.JobUpdateQuery(updateQ, time.Duration(interval)*time.Second, time.Duration(timeout)*time.Second) + updateSummaries, err := m.JobUpdateQuery( + updateQ, + time.Duration(interval)*time.Second, + time.Duration(timeout)*time.Second) status := updateSummaries[0].State.Status @@ -119,7 +125,10 @@ func (m *Monitor) JobUpdateQuery( } // Monitor a Job until all instances enter one of the LIVE_STATES -func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval, timeout int) (bool, error) { +func (m *Monitor) Instances( + key *aurora.JobKey, + instances int32, + interval, timeout int) (bool, error) { return m.ScheduleStatus(key, instances, LiveStates, interval, timeout) } @@ -164,9 +173,13 @@ func (m *Monitor) ScheduleStatus( } } -// Monitor host status until all hosts match the status provided. Returns a map where the value is true if the host +// Monitor host status until all hosts match the status provided. +// Returns a map where the value is true if the host // is in one of the desired mode(s) or false if it is not as of the time when the monitor exited. -func (m *Monitor) HostMaintenance(hosts []string, modes []aurora.MaintenanceMode, interval, timeout int) (map[string]bool, error) { +func (m *Monitor) HostMaintenance( + hosts []string, + modes []aurora.MaintenanceMode, + interval, timeout int) (map[string]bool, error) { // Transform modes to monitor for into a set for easy lookup desiredMode := make(map[aurora.MaintenanceMode]struct{}) @@ -175,7 +188,8 @@ func (m *Monitor) HostMaintenance(hosts []string, modes []aurora.MaintenanceMode } // Turn slice into a host set to eliminate duplicates. - // We also can't use a simple count because multiple modes means we can have multiple matches for a single host. + // We also can't use a simple count because multiple modes means + // we can have multiple matches for a single host. // I.e. host A transitions from ACTIVE to DRAINING to DRAINED while monitored remainingHosts := make(map[string]struct{}) for _, host := range hosts { diff --git a/realis.go b/realis.go index 1b61e7d..af7fb34 100644 --- a/realis.go +++ b/realis.go @@ -16,6 +16,7 @@ package realis import ( + "context" "crypto/tls" "crypto/x509" "encoding/base64" @@ -38,12 +39,15 @@ import ( const VERSION = "1.21.0" -// TODO(rdelvalle): Move documentation to interface in order to make godoc look better/more accessible +// TODO(rdelvalle): Move documentation to interface in order to make godoc look better accessible +// Or get rid of itnerface type Realis interface { AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) CreateJob(auroraJob Job) (*aurora.Response, error) - CreateService(auroraJob Job, settings *aurora.JobUpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error) + CreateService( + auroraJob Job, + settings *aurora.JobUpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error) GetInstanceIds(key *aurora.JobKey, states []aurora.ScheduleStatus) ([]int32, error) @@ -243,7 +247,11 @@ func newTJSONTransport(url string, timeout int, config *RealisConfig) (thrift.TT if err != nil { return nil, errors.Wrap(err, "unable to create transport") } - httpTrans := (trans).(*thrift.THttpClient) + httpTrans, ok := (trans).(*thrift.THttpClient) + if !ok { + return nil, errors.Wrap(err, "transport does not contain a thrift client") + } + httpTrans.SetHeader("Content-Type", "application/x-thrift") httpTrans.SetHeader("User-Agent", "gorealis v"+VERSION) return trans, err @@ -254,7 +262,11 @@ func newTBinTransport(url string, timeout int, config *RealisConfig) (thrift.TTr if err != nil { return nil, errors.Wrap(err, "unable to create transport") } - httpTrans := (trans).(*thrift.THttpClient) + httpTrans, ok := (trans).(*thrift.THttpClient) + if !ok { + return nil, errors.Wrap(err, "transport does not contain a thrift client") + } + httpTrans.DelHeader("Content-Type") // Workaround for using thrift HttpPostClient httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary") httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary") @@ -328,16 +340,20 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { url, err = LeaderFromZK(*config.cluster) // If ZK is configured, throw an error if the leader is unable to be determined if err != nil { - return nil, NewTemporaryError(errors.Wrap(err, "unable to use zk to get leader ")) + return nil, NewTemporaryError(errors.Wrap(err, "unable to use zk to get leader")) } config.logger.Println("Scheduler URL from ZK: ", url) } else if config.url != "" { - url = config.url config.logger.Println("Scheduler URL: ", url) } else { return nil, errors.New("incomplete Options -- url, cluster.json, or Zookeeper address required") } + url, err = validateAndPopulateAuroraURL(url) + if err != nil { + return nil, errors.Wrap(err, "invalid Aurora url") + } + if config.jsonTransport { trans, err := newTJSONTransport(url, config.timeoutms, config) if err != nil { @@ -359,7 +375,10 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { // Adding Basic Authentication. if config.username != "" && config.password != "" { - httpTrans := (config.transport).(*thrift.THttpClient) + httpTrans, ok := (config.transport).(*thrift.THttpClient) + if !ok { + return nil, errors.New("transport provided does not contain an THttpClient") + } httpTrans.SetHeader("Authorization", "Basic "+basicAuth(config.username, config.password)) } @@ -504,7 +523,7 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states []aurora.Schedu resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { - return r.client.GetTasksWithoutConfigs(nil, taskQ) + return r.client.GetTasksWithoutConfigs(context.TODO(), taskQ) }) // If we encountered an error we couldn't recover from by retrying, return an error to the user @@ -529,7 +548,7 @@ func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { - return r.readonlyClient.GetJobUpdateSummaries(nil, jobUpdateQuery) + return r.readonlyClient.GetJobUpdateSummaries(context.TODO(), jobUpdateQuery) }) if retryErr != nil { @@ -546,7 +565,7 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { - return r.readonlyClient.GetJobs(nil, role) + return r.readonlyClient.GetJobs(context.TODO(), role) }) if retryErr != nil { @@ -567,7 +586,7 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { - return r.client.KillTasks(nil, key, instances, "") + return r.client.KillTasks(context.TODO(), key, instances, "") }) if retryErr != nil { @@ -589,7 +608,7 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { false, func() (*aurora.Response, error) { // Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards - return r.client.KillTasks(nil, key, nil, "") + return r.client.KillTasks(context.TODO(), key, nil, "") }) if retryErr != nil { @@ -609,7 +628,7 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { - return r.client.CreateJob(nil, auroraJob.JobConfig()) + return r.client.CreateJob(context.TODO(), auroraJob.JobConfig()) }) if retryErr != nil { @@ -619,7 +638,9 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { } // This API uses an update thrift call to create the services giving a few more robust features. -func (r *realisClient) CreateService(auroraJob Job, settings *aurora.JobUpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error) { +func (r *realisClient) CreateService( + auroraJob Job, + settings *aurora.JobUpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error) { // Create a new job update object and ship it to the StartJobUpdate api update := NewUpdateJob(auroraJob.TaskConfig(), settings) update.InstanceCount(auroraJob.GetInstanceCount()) @@ -646,7 +667,7 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { - return r.client.ScheduleCronJob(nil, auroraJob.JobConfig()) + return r.client.ScheduleCronJob(context.TODO(), auroraJob.JobConfig()) }) if retryErr != nil { @@ -662,7 +683,7 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { - return r.client.DescheduleCronJob(nil, key) + return r.client.DescheduleCronJob(context.TODO(), key) }) if retryErr != nil { @@ -680,7 +701,7 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { - return r.client.StartCronJob(nil, key) + return r.client.StartCronJob(context.TODO(), key) }) if retryErr != nil { @@ -697,7 +718,7 @@ func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { - return r.client.RestartShards(nil, key, instances) + return r.client.RestartShards(context.TODO(), key, instances) }) if retryErr != nil { @@ -720,7 +741,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { - return r.client.RestartShards(nil, key, instanceIds) + return r.client.RestartShards(context.TODO(), key, instanceIds) }) if retryErr != nil { @@ -741,7 +762,7 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au resp, retryErr := r.thriftCallWithRetries( true, func() (*aurora.Response, error) { - return r.client.StartJobUpdate(nil, updateJob.req, message) + return r.client.StartJobUpdate(context.TODO(), updateJob.req, message) }) if retryErr != nil { @@ -765,7 +786,7 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { - return r.client.AbortJobUpdate(nil, &updateKey, message) + return r.client.AbortJobUpdate(context.TODO(), &updateKey, message) }) if retryErr != nil { @@ -774,7 +795,11 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str // Make this call synchronous by blocking until it job has successfully transitioned to aborted m := Monitor{Client: r} - _, err := m.JobUpdateStatus(updateKey, map[aurora.JobUpdateStatus]bool{aurora.JobUpdateStatus_ABORTED: true}, time.Second*5, time.Minute) + _, err := m.JobUpdateStatus( + updateKey, + map[aurora.JobUpdateStatus]bool{aurora.JobUpdateStatus_ABORTED: true}, + time.Second*5, + time.Minute) return resp, err } @@ -787,7 +812,7 @@ func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { - return r.client.PauseJobUpdate(nil, updateKey, message) + return r.client.PauseJobUpdate(context.TODO(), updateKey, message) }) if retryErr != nil { @@ -805,7 +830,7 @@ func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { - return r.client.ResumeJobUpdate(nil, updateKey, message) + return r.client.ResumeJobUpdate(context.TODO(), updateKey, message) }) if retryErr != nil { @@ -823,7 +848,7 @@ func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { - return r.client.PulseJobUpdate(nil, updateKey) + return r.client.PulseJobUpdate(context.TODO(), updateKey) }) if retryErr != nil { @@ -842,7 +867,7 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { - return r.client.AddInstances(nil, &instKey, count) + return r.client.AddInstances(context.TODO(), &instKey, count) }) if retryErr != nil { @@ -881,7 +906,7 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.Schedul resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { - return r.client.GetTasksStatus(nil, query) + return r.client.GetTasksStatus(context.TODO(), query) }) if retryErr != nil { @@ -899,7 +924,7 @@ func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.Pend resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { - return r.client.GetPendingReason(nil, query) + return r.client.GetPendingReason(context.TODO(), query) }) if retryErr != nil { @@ -923,7 +948,7 @@ func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*auror resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { - return r.client.GetTasksWithoutConfigs(nil, query) + return r.client.GetTasksWithoutConfigs(context.TODO(), query) }) if retryErr != nil { @@ -949,7 +974,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { - return r.client.GetTasksStatus(nil, taskQ) + return r.client.GetTasksStatus(context.TODO(), taskQ) }) if retryErr != nil { @@ -977,7 +1002,7 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { - return r.client.GetJobUpdateDetails(nil, &updateQuery) + return r.client.GetJobUpdateDetails(context.TODO(), &updateQuery) }) if retryErr != nil { @@ -994,7 +1019,7 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { - return r.client.RollbackJobUpdate(nil, &key, message) + return r.client.RollbackJobUpdate(context.TODO(), &key, message) }) if retryErr != nil { diff --git a/realis_admin.go b/realis_admin.go index 8ce3ea3..0461d90 100644 --- a/realis_admin.go +++ b/realis_admin.go @@ -1,6 +1,8 @@ package realis import ( + "context" + "github.com/paypal/gorealis/gen-go/apache/aurora" "github.com/pkg/errors" ) @@ -27,7 +29,7 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { - return r.adminClient.DrainHosts(nil, drainList) + return r.adminClient.DrainHosts(context.TODO(), drainList) }) if retryErr != nil { @@ -44,7 +46,10 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr // Start SLA Aware Drain. // defaultSlaPolicy is the fallback SlaPolicy to use if a task does not have an SlaPolicy. // After timeoutSecs, tasks will be forcefully drained without checking SLA. -func (r *realisClient) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, hosts ...string) (*aurora.DrainHostsResult_, error) { +func (r *realisClient) SLADrainHosts( + policy *aurora.SlaPolicy, + timeout int64, + hosts ...string) (*aurora.DrainHostsResult_, error) { var result *aurora.DrainHostsResult_ if len(hosts) == 0 { @@ -59,7 +64,7 @@ func (r *realisClient) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, ho resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { - return r.adminClient.SlaDrainHosts(nil, drainList, policy, timeout) + return r.adminClient.SlaDrainHosts(context.TODO(), drainList, policy, timeout) }) if retryErr != nil { @@ -89,7 +94,7 @@ func (r *realisClient) StartMaintenance(hosts ...string) (*aurora.Response, *aur resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { - return r.adminClient.StartMaintenance(nil, hostList) + return r.adminClient.StartMaintenance(context.TODO(), hostList) }) if retryErr != nil { @@ -119,7 +124,7 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { - return r.adminClient.EndMaintenance(nil, hostList) + return r.adminClient.EndMaintenance(context.TODO(), hostList) }) if retryErr != nil { @@ -151,7 +156,7 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { - return r.adminClient.MaintenanceStatus(nil, hostList) + return r.adminClient.MaintenanceStatus(context.TODO(), hostList) }) if retryErr != nil { @@ -166,7 +171,8 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au } // SetQuota sets a quota aggregate for the given role -// TODO(zircote) Currently investigating an error that is returned from thrift calls that include resources for `NamedPort` and `NumGpu` +// TODO(zircote) Currently investigating an error that is returned +// from thrift calls that include resources for `NamedPort` and `NumGpu` func (r *realisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb *int64) (*aurora.Response, error) { quota := &aurora.ResourceAggregate{ Resources: []*aurora.Resource{{NumCpus: cpu}, {RamMb: ramMb}, {DiskMb: diskMb}}, @@ -175,7 +181,7 @@ func (r *realisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { - return r.adminClient.SetQuota(nil, role, quota) + return r.adminClient.SetQuota(context.TODO(), role, quota) }) if retryErr != nil { @@ -191,7 +197,7 @@ func (r *realisClient) GetQuota(role string) (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { - return r.adminClient.GetQuota(nil, role) + return r.adminClient.GetQuota(context.TODO(), role) }) if retryErr != nil { @@ -206,7 +212,7 @@ func (r *realisClient) Snapshot() error { _, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { - return r.adminClient.Snapshot(nil) + return r.adminClient.Snapshot(context.TODO()) }) if retryErr != nil { @@ -222,7 +228,7 @@ func (r *realisClient) PerformBackup() error { _, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { - return r.adminClient.PerformBackup(nil) + return r.adminClient.PerformBackup(context.TODO()) }) if retryErr != nil { @@ -237,7 +243,7 @@ func (r *realisClient) ForceImplicitTaskReconciliation() error { _, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { - return r.adminClient.TriggerImplicitTaskReconciliation(nil) + return r.adminClient.TriggerImplicitTaskReconciliation(context.TODO()) }) if retryErr != nil { @@ -250,7 +256,7 @@ func (r *realisClient) ForceImplicitTaskReconciliation() error { func (r *realisClient) ForceExplicitTaskReconciliation(batchSize *int32) error { if batchSize != nil && *batchSize < 1 { - return errors.New("Invalid batch size.") + return errors.New("invalid batch size") } settings := aurora.NewExplicitReconciliationSettings() @@ -258,7 +264,7 @@ func (r *realisClient) ForceExplicitTaskReconciliation(batchSize *int32) error { _, retryErr := r.thriftCallWithRetries(false, func() (*aurora.Response, error) { - return r.adminClient.TriggerExplicitTaskReconciliation(nil, settings) + return r.adminClient.TriggerExplicitTaskReconciliation(context.TODO(), settings) }) if retryErr != nil { diff --git a/realis_e2e_test.go b/realis_e2e_test.go index cc09b7b..606f2b3 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -76,6 +76,8 @@ func TestNonExistentEndpoint(t *testing.T) { realis.TimeoutMS(200), realis.BackOff(backoff), ) + + assert.NoError(t, err) defer r.Close() taskQ := &aurora.TaskQuery{} @@ -349,7 +351,7 @@ func TestRealisClient_GetPendingReason(t *testing.T) { assert.NoError(t, err) assert.Len(t, reasons, 1) - resp, err = r.KillJob(job.JobKey()) + _, err = r.KillJob(job.JobKey()) assert.NoError(t, err) } @@ -515,7 +517,7 @@ func TestRealisClient_CreateService(t *testing.T) { assert.Len(t, updateSummaries, 1) - _, err = r.AbortJobUpdate(*updateSummaries[0].Key, "Cleaning up") + r.AbortJobUpdate(*updateSummaries[0].Key, "Cleaning up") _, err = r.KillJob(job.JobKey()) assert.NoError(t, err) @@ -770,13 +772,10 @@ func TestRealisClient_Quota(t *testing.T) { switch true { case res.DiskMb != nil: assert.Equal(t, disk, *res.DiskMb) - break case res.NumCpus != nil: assert.Equal(t, cpu, *res.NumCpus) - break case res.RamMb != nil: assert.Equal(t, ram, *res.RamMb) - break } } }) @@ -829,7 +828,7 @@ func TestRealisClient_PartitionPolicy(t *testing.T) { } // Clean up after finishing test - _, err = r.KillJob(job.JobKey()) + r.KillJob(job.JobKey()) } func TestAuroraJob_UpdateSlaPolicy(t *testing.T) { @@ -904,3 +903,7 @@ func TestAuroraJob_UpdateSlaPolicy(t *testing.T) { }) } } + +func TestAuroraURLValidator(t *testing.T) { + +} diff --git a/retry.go b/retry.go index 256e509..739fc1f 100644 --- a/retry.go +++ b/retry.go @@ -77,7 +77,8 @@ func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc) adjusted = Jitter(duration, backoff.Jitter) } - logger.Printf("A retryable error occurred during function call, backing off for %v before retrying\n", adjusted) + logger.Printf( + "A retryable error occurred during function call, backing off for %v before retrying\n", adjusted) time.Sleep(adjusted) duration = time.Duration(float64(duration) * backoff.Factor) } @@ -116,7 +117,10 @@ func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc) type auroraThriftCall func() (resp *aurora.Response, err error) // Duplicates the functionality of ExponentialBackoff but is specifically targeted towards ThriftCalls. -func (r *realisClient) thriftCallWithRetries(returnOnTimeout bool, thriftCall auroraThriftCall) (*aurora.Response, error) { +func (r *realisClient) thriftCallWithRetries( + returnOnTimeout bool, + thriftCall auroraThriftCall) (*aurora.Response, error) { + var resp *aurora.Response var clientErr error var curStep int @@ -134,7 +138,10 @@ func (r *realisClient) thriftCallWithRetries(returnOnTimeout bool, thriftCall au adjusted = Jitter(duration, backoff.Jitter) } - r.logger.Printf("A retryable error occurred during thrift call, backing off for %v before retry %v\n", adjusted, curStep) + r.logger.Printf( + "A retryable error occurred during thrift call, backing off for %v before retry %v\n", + adjusted, + curStep) time.Sleep(adjusted) duration = time.Duration(float64(duration) * backoff.Factor) @@ -179,7 +186,8 @@ func (r *realisClient) thriftCallWithRetries(returnOnTimeout bool, thriftCall au if e.Timeout() { timeouts++ r.logger.DebugPrintf( - "Client closed connection (timedout) %d times before server responded, consider increasing connection timeout", + "Client closed connection (timedout) %d times before server responded, "+ + "consider increasing connection timeout", timeouts) if returnOnTimeout { return resp, newTimedoutError(errors.New("client connection closed before server answer")) @@ -190,7 +198,8 @@ func (r *realisClient) thriftCallWithRetries(returnOnTimeout bool, thriftCall au // In the future, reestablish connection should be able to check if it is actually possible // to make a thrift call to Aurora. For now, a reconnect should always lead to a retry. - r.ReestablishConn() + // Ignoring error due to the fact that an error should be retried regardless + _ = r.ReestablishConn() } else { diff --git a/updatejob.go b/updatejob.go index eeaaa9c..fd075dc 100644 --- a/updatejob.go +++ b/updatejob.go @@ -31,7 +31,12 @@ func NewDefaultUpdateJob(config *aurora.TaskConfig) *UpdateJob { req.TaskConfig = config req.Settings = NewUpdateSettings() - job := NewJob().(*AuroraJob) + job, ok := NewJob().(*AuroraJob) + if !ok { + // This should never happen but it is here as a safeguard + return nil + } + job.jobConfig.TaskConfig = config // Rebuild resource map from TaskConfig @@ -75,7 +80,11 @@ func NewUpdateJob(config *aurora.TaskConfig, settings *aurora.JobUpdateSettings) req.TaskConfig = config req.Settings = settings - job := NewJob().(*AuroraJob) + job, ok := NewJob().(*AuroraJob) + if !ok { + // This should never happen but it is here as a safeguard + return nil + } job.jobConfig.TaskConfig = config // Rebuild resource map from TaskConfig diff --git a/util.go b/util.go index 324b12f..372d995 100644 --- a/util.go +++ b/util.go @@ -67,7 +67,7 @@ func validateAndPopulateAuroraURL(urlStr string) (string, error) { return "", errors.Errorf("only protocols http and https are supported %v\n", u.Scheme) } - if u.Path != "/api" { + if u.Path != APIPath { return "", errors.Errorf("expected /api path %v\n", u.Path) } diff --git a/zk.go b/zk.go index a9eb92d..51b4d6e 100644 --- a/zk.go +++ b/zk.go @@ -146,7 +146,8 @@ func LeaderFromZKOpts(options ...ZKOpt) (string, error) { // This should never be encountered as it would indicate Aurora // writing bad info into Zookeeper but is kept here as a safety net. if len(serviceInst.AdditionalEndpoints) > 1 { - return false, NewTemporaryError(errors.New("ambiguous endpoints in json blob, Aurora wrote bad info to ZK")) + return false, + NewTemporaryError(errors.New("ambiguous endpoints in json blob, Aurora wrote bad info to ZK")) } var scheme, host, port string