Fix all relevant errors reported by golangci-lint

This commit is contained in:
Renan DelValle 2019-05-28 18:53:51 -07:00
parent 4870b07cf0
commit ce554c767c
No known key found for this signature in database
GPG key ID: C240AD6D6F443EC9
9 changed files with 143 additions and 70 deletions

12
job.go
View file

@ -178,7 +178,9 @@ func (j *AuroraJob) GPU(gpu int64) Job {
// rejects jobs with GPU resources attached to it. // rejects jobs with GPU resources attached to it.
if _, ok := j.resources[GPU]; !ok { if _, ok := j.resources[GPU]; !ok {
j.resources[GPU] = &aurora.Resource{} j.resources[GPU] = &aurora.Resource{}
j.JobConfig().GetTaskConfig().Resources = append(j.JobConfig().GetTaskConfig().Resources, j.resources[GPU]) j.JobConfig().GetTaskConfig().Resources = append(
j.JobConfig().GetTaskConfig().Resources,
j.resources[GPU])
} }
j.resources[GPU].NumGpus = &gpu j.resources[GPU].NumGpus = &gpu
@ -259,7 +261,9 @@ func (j *AuroraJob) AddLabel(key string, value string) Job {
func (j *AuroraJob) AddNamedPorts(names ...string) Job { func (j *AuroraJob) AddNamedPorts(names ...string) Job {
j.portCount += len(names) j.portCount += len(names)
for _, name := range names { for _, name := range names {
j.jobConfig.TaskConfig.Resources = append(j.jobConfig.TaskConfig.Resources, &aurora.Resource{NamedPort: &name}) j.jobConfig.TaskConfig.Resources = append(
j.jobConfig.TaskConfig.Resources,
&aurora.Resource{NamedPort: &name})
} }
return j return j
@ -274,7 +278,9 @@ func (j *AuroraJob) AddPorts(num int) Job {
j.portCount += num j.portCount += num
for i := start; i < j.portCount; i++ { for i := start; i < j.portCount; i++ {
portName := "org.apache.aurora.port." + strconv.Itoa(i) portName := "org.apache.aurora.port." + strconv.Itoa(i)
j.jobConfig.TaskConfig.Resources = append(j.jobConfig.TaskConfig.Resources, &aurora.Resource{NamedPort: &portName}) j.jobConfig.TaskConfig.Resources = append(
j.jobConfig.TaskConfig.Resources,
&aurora.Resource{NamedPort: &portName})
} }
return j return j

View file

@ -27,7 +27,10 @@ type Monitor struct {
} }
// Polls the scheduler every certain amount of time to see if the update has succeeded // Polls the scheduler every certain amount of time to see if the update has succeeded
func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout int) (bool, error) { func (m *Monitor) JobUpdate(
updateKey aurora.JobUpdateKey,
interval int,
timeout int) (bool, error) {
updateQ := aurora.JobUpdateQuery{ updateQ := aurora.JobUpdateQuery{
Key: &updateKey, Key: &updateKey,
@ -40,7 +43,10 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout
aurora.JobUpdateStatus_FAILED, aurora.JobUpdateStatus_FAILED,
}, },
} }
updateSummaries, err := m.JobUpdateQuery(updateQ, time.Duration(interval)*time.Second, time.Duration(timeout)*time.Second) updateSummaries, err := m.JobUpdateQuery(
updateQ,
time.Duration(interval)*time.Second,
time.Duration(timeout)*time.Second)
status := updateSummaries[0].State.Status status := updateSummaries[0].State.Status
@ -119,7 +125,10 @@ func (m *Monitor) JobUpdateQuery(
} }
// Monitor a Job until all instances enter one of the LIVE_STATES // Monitor a Job until all instances enter one of the LIVE_STATES
func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval, timeout int) (bool, error) { func (m *Monitor) Instances(
key *aurora.JobKey,
instances int32,
interval, timeout int) (bool, error) {
return m.ScheduleStatus(key, instances, LiveStates, interval, timeout) return m.ScheduleStatus(key, instances, LiveStates, interval, timeout)
} }
@ -164,9 +173,13 @@ func (m *Monitor) ScheduleStatus(
} }
} }
// Monitor host status until all hosts match the status provided. Returns a map where the value is true if the host // Monitor host status until all hosts match the status provided.
// Returns a map where the value is true if the host
// is in one of the desired mode(s) or false if it is not as of the time when the monitor exited. // is in one of the desired mode(s) or false if it is not as of the time when the monitor exited.
func (m *Monitor) HostMaintenance(hosts []string, modes []aurora.MaintenanceMode, interval, timeout int) (map[string]bool, error) { func (m *Monitor) HostMaintenance(
hosts []string,
modes []aurora.MaintenanceMode,
interval, timeout int) (map[string]bool, error) {
// Transform modes to monitor for into a set for easy lookup // Transform modes to monitor for into a set for easy lookup
desiredMode := make(map[aurora.MaintenanceMode]struct{}) desiredMode := make(map[aurora.MaintenanceMode]struct{})
@ -175,7 +188,8 @@ func (m *Monitor) HostMaintenance(hosts []string, modes []aurora.MaintenanceMode
} }
// Turn slice into a host set to eliminate duplicates. // Turn slice into a host set to eliminate duplicates.
// We also can't use a simple count because multiple modes means we can have multiple matches for a single host. // We also can't use a simple count because multiple modes means
// we can have multiple matches for a single host.
// I.e. host A transitions from ACTIVE to DRAINING to DRAINED while monitored // I.e. host A transitions from ACTIVE to DRAINING to DRAINED while monitored
remainingHosts := make(map[string]struct{}) remainingHosts := make(map[string]struct{})
for _, host := range hosts { for _, host := range hosts {

View file

@ -16,6 +16,7 @@
package realis package realis
import ( import (
"context"
"crypto/tls" "crypto/tls"
"crypto/x509" "crypto/x509"
"encoding/base64" "encoding/base64"
@ -38,12 +39,15 @@ import (
const VERSION = "1.21.0" const VERSION = "1.21.0"
// TODO(rdelvalle): Move documentation to interface in order to make godoc look better/more accessible // TODO(rdelvalle): Move documentation to interface in order to make godoc look better accessible
// Or get rid of itnerface
type Realis interface { type Realis interface {
AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error)
AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error)
CreateJob(auroraJob Job) (*aurora.Response, error) CreateJob(auroraJob Job) (*aurora.Response, error)
CreateService(auroraJob Job, settings *aurora.JobUpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error) CreateService(
auroraJob Job,
settings *aurora.JobUpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error)
DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error)
FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error)
GetInstanceIds(key *aurora.JobKey, states []aurora.ScheduleStatus) ([]int32, error) GetInstanceIds(key *aurora.JobKey, states []aurora.ScheduleStatus) ([]int32, error)
@ -243,7 +247,11 @@ func newTJSONTransport(url string, timeout int, config *RealisConfig) (thrift.TT
if err != nil { if err != nil {
return nil, errors.Wrap(err, "unable to create transport") return nil, errors.Wrap(err, "unable to create transport")
} }
httpTrans := (trans).(*thrift.THttpClient) httpTrans, ok := (trans).(*thrift.THttpClient)
if !ok {
return nil, errors.Wrap(err, "transport does not contain a thrift client")
}
httpTrans.SetHeader("Content-Type", "application/x-thrift") httpTrans.SetHeader("Content-Type", "application/x-thrift")
httpTrans.SetHeader("User-Agent", "gorealis v"+VERSION) httpTrans.SetHeader("User-Agent", "gorealis v"+VERSION)
return trans, err return trans, err
@ -254,7 +262,11 @@ func newTBinTransport(url string, timeout int, config *RealisConfig) (thrift.TTr
if err != nil { if err != nil {
return nil, errors.Wrap(err, "unable to create transport") return nil, errors.Wrap(err, "unable to create transport")
} }
httpTrans := (trans).(*thrift.THttpClient) httpTrans, ok := (trans).(*thrift.THttpClient)
if !ok {
return nil, errors.Wrap(err, "transport does not contain a thrift client")
}
httpTrans.DelHeader("Content-Type") // Workaround for using thrift HttpPostClient httpTrans.DelHeader("Content-Type") // Workaround for using thrift HttpPostClient
httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary") httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary")
httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary") httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary")
@ -328,16 +340,20 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
url, err = LeaderFromZK(*config.cluster) url, err = LeaderFromZK(*config.cluster)
// If ZK is configured, throw an error if the leader is unable to be determined // If ZK is configured, throw an error if the leader is unable to be determined
if err != nil { if err != nil {
return nil, NewTemporaryError(errors.Wrap(err, "unable to use zk to get leader ")) return nil, NewTemporaryError(errors.Wrap(err, "unable to use zk to get leader"))
} }
config.logger.Println("Scheduler URL from ZK: ", url) config.logger.Println("Scheduler URL from ZK: ", url)
} else if config.url != "" { } else if config.url != "" {
url = config.url
config.logger.Println("Scheduler URL: ", url) config.logger.Println("Scheduler URL: ", url)
} else { } else {
return nil, errors.New("incomplete Options -- url, cluster.json, or Zookeeper address required") return nil, errors.New("incomplete Options -- url, cluster.json, or Zookeeper address required")
} }
url, err = validateAndPopulateAuroraURL(url)
if err != nil {
return nil, errors.Wrap(err, "invalid Aurora url")
}
if config.jsonTransport { if config.jsonTransport {
trans, err := newTJSONTransport(url, config.timeoutms, config) trans, err := newTJSONTransport(url, config.timeoutms, config)
if err != nil { if err != nil {
@ -359,7 +375,10 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
// Adding Basic Authentication. // Adding Basic Authentication.
if config.username != "" && config.password != "" { if config.username != "" && config.password != "" {
httpTrans := (config.transport).(*thrift.THttpClient) httpTrans, ok := (config.transport).(*thrift.THttpClient)
if !ok {
return nil, errors.New("transport provided does not contain an THttpClient")
}
httpTrans.SetHeader("Authorization", "Basic "+basicAuth(config.username, config.password)) httpTrans.SetHeader("Authorization", "Basic "+basicAuth(config.username, config.password))
} }
@ -504,7 +523,7 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states []aurora.Schedu
resp, retryErr := r.thriftCallWithRetries( resp, retryErr := r.thriftCallWithRetries(
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.GetTasksWithoutConfigs(nil, taskQ) return r.client.GetTasksWithoutConfigs(context.TODO(), taskQ)
}) })
// If we encountered an error we couldn't recover from by retrying, return an error to the user // If we encountered an error we couldn't recover from by retrying, return an error to the user
@ -529,7 +548,7 @@ func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue
resp, retryErr := r.thriftCallWithRetries( resp, retryErr := r.thriftCallWithRetries(
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.readonlyClient.GetJobUpdateSummaries(nil, jobUpdateQuery) return r.readonlyClient.GetJobUpdateSummaries(context.TODO(), jobUpdateQuery)
}) })
if retryErr != nil { if retryErr != nil {
@ -546,7 +565,7 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe
resp, retryErr := r.thriftCallWithRetries( resp, retryErr := r.thriftCallWithRetries(
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.readonlyClient.GetJobs(nil, role) return r.readonlyClient.GetJobs(context.TODO(), role)
}) })
if retryErr != nil { if retryErr != nil {
@ -567,7 +586,7 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a
resp, retryErr := r.thriftCallWithRetries( resp, retryErr := r.thriftCallWithRetries(
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.KillTasks(nil, key, instances, "") return r.client.KillTasks(context.TODO(), key, instances, "")
}) })
if retryErr != nil { if retryErr != nil {
@ -589,7 +608,7 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
// Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards // Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards
return r.client.KillTasks(nil, key, nil, "") return r.client.KillTasks(context.TODO(), key, nil, "")
}) })
if retryErr != nil { if retryErr != nil {
@ -609,7 +628,7 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
resp, retryErr := r.thriftCallWithRetries( resp, retryErr := r.thriftCallWithRetries(
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.CreateJob(nil, auroraJob.JobConfig()) return r.client.CreateJob(context.TODO(), auroraJob.JobConfig())
}) })
if retryErr != nil { if retryErr != nil {
@ -619,7 +638,9 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
} }
// This API uses an update thrift call to create the services giving a few more robust features. // This API uses an update thrift call to create the services giving a few more robust features.
func (r *realisClient) CreateService(auroraJob Job, settings *aurora.JobUpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error) { func (r *realisClient) CreateService(
auroraJob Job,
settings *aurora.JobUpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error) {
// Create a new job update object and ship it to the StartJobUpdate api // Create a new job update object and ship it to the StartJobUpdate api
update := NewUpdateJob(auroraJob.TaskConfig(), settings) update := NewUpdateJob(auroraJob.TaskConfig(), settings)
update.InstanceCount(auroraJob.GetInstanceCount()) update.InstanceCount(auroraJob.GetInstanceCount())
@ -646,7 +667,7 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
resp, retryErr := r.thriftCallWithRetries( resp, retryErr := r.thriftCallWithRetries(
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.ScheduleCronJob(nil, auroraJob.JobConfig()) return r.client.ScheduleCronJob(context.TODO(), auroraJob.JobConfig())
}) })
if retryErr != nil { if retryErr != nil {
@ -662,7 +683,7 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response,
resp, retryErr := r.thriftCallWithRetries( resp, retryErr := r.thriftCallWithRetries(
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.DescheduleCronJob(nil, key) return r.client.DescheduleCronJob(context.TODO(), key)
}) })
if retryErr != nil { if retryErr != nil {
@ -680,7 +701,7 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error
resp, retryErr := r.thriftCallWithRetries( resp, retryErr := r.thriftCallWithRetries(
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.StartCronJob(nil, key) return r.client.StartCronJob(context.TODO(), key)
}) })
if retryErr != nil { if retryErr != nil {
@ -697,7 +718,7 @@ func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32)
resp, retryErr := r.thriftCallWithRetries( resp, retryErr := r.thriftCallWithRetries(
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.RestartShards(nil, key, instances) return r.client.RestartShards(context.TODO(), key, instances)
}) })
if retryErr != nil { if retryErr != nil {
@ -720,7 +741,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
resp, retryErr := r.thriftCallWithRetries( resp, retryErr := r.thriftCallWithRetries(
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.RestartShards(nil, key, instanceIds) return r.client.RestartShards(context.TODO(), key, instanceIds)
}) })
if retryErr != nil { if retryErr != nil {
@ -741,7 +762,7 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au
resp, retryErr := r.thriftCallWithRetries( resp, retryErr := r.thriftCallWithRetries(
true, true,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.StartJobUpdate(nil, updateJob.req, message) return r.client.StartJobUpdate(context.TODO(), updateJob.req, message)
}) })
if retryErr != nil { if retryErr != nil {
@ -765,7 +786,7 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str
resp, retryErr := r.thriftCallWithRetries( resp, retryErr := r.thriftCallWithRetries(
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.AbortJobUpdate(nil, &updateKey, message) return r.client.AbortJobUpdate(context.TODO(), &updateKey, message)
}) })
if retryErr != nil { if retryErr != nil {
@ -774,7 +795,11 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str
// Make this call synchronous by blocking until it job has successfully transitioned to aborted // Make this call synchronous by blocking until it job has successfully transitioned to aborted
m := Monitor{Client: r} m := Monitor{Client: r}
_, err := m.JobUpdateStatus(updateKey, map[aurora.JobUpdateStatus]bool{aurora.JobUpdateStatus_ABORTED: true}, time.Second*5, time.Minute) _, err := m.JobUpdateStatus(
updateKey,
map[aurora.JobUpdateStatus]bool{aurora.JobUpdateStatus_ABORTED: true},
time.Second*5,
time.Minute)
return resp, err return resp, err
} }
@ -787,7 +812,7 @@ func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st
resp, retryErr := r.thriftCallWithRetries( resp, retryErr := r.thriftCallWithRetries(
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.PauseJobUpdate(nil, updateKey, message) return r.client.PauseJobUpdate(context.TODO(), updateKey, message)
}) })
if retryErr != nil { if retryErr != nil {
@ -805,7 +830,7 @@ func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s
resp, retryErr := r.thriftCallWithRetries( resp, retryErr := r.thriftCallWithRetries(
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.ResumeJobUpdate(nil, updateKey, message) return r.client.ResumeJobUpdate(context.TODO(), updateKey, message)
}) })
if retryErr != nil { if retryErr != nil {
@ -823,7 +848,7 @@ func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R
resp, retryErr := r.thriftCallWithRetries( resp, retryErr := r.thriftCallWithRetries(
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.PulseJobUpdate(nil, updateKey) return r.client.PulseJobUpdate(context.TODO(), updateKey)
}) })
if retryErr != nil { if retryErr != nil {
@ -842,7 +867,7 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a
resp, retryErr := r.thriftCallWithRetries( resp, retryErr := r.thriftCallWithRetries(
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.AddInstances(nil, &instKey, count) return r.client.AddInstances(context.TODO(), &instKey, count)
}) })
if retryErr != nil { if retryErr != nil {
@ -881,7 +906,7 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.Schedul
resp, retryErr := r.thriftCallWithRetries( resp, retryErr := r.thriftCallWithRetries(
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.GetTasksStatus(nil, query) return r.client.GetTasksStatus(context.TODO(), query)
}) })
if retryErr != nil { if retryErr != nil {
@ -899,7 +924,7 @@ func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.Pend
resp, retryErr := r.thriftCallWithRetries( resp, retryErr := r.thriftCallWithRetries(
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.GetPendingReason(nil, query) return r.client.GetPendingReason(context.TODO(), query)
}) })
if retryErr != nil { if retryErr != nil {
@ -923,7 +948,7 @@ func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*auror
resp, retryErr := r.thriftCallWithRetries( resp, retryErr := r.thriftCallWithRetries(
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.GetTasksWithoutConfigs(nil, query) return r.client.GetTasksWithoutConfigs(context.TODO(), query)
}) })
if retryErr != nil { if retryErr != nil {
@ -949,7 +974,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
resp, retryErr := r.thriftCallWithRetries( resp, retryErr := r.thriftCallWithRetries(
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.GetTasksStatus(nil, taskQ) return r.client.GetTasksStatus(context.TODO(), taskQ)
}) })
if retryErr != nil { if retryErr != nil {
@ -977,7 +1002,7 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur
resp, retryErr := r.thriftCallWithRetries( resp, retryErr := r.thriftCallWithRetries(
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.GetJobUpdateDetails(nil, &updateQuery) return r.client.GetJobUpdateDetails(context.TODO(), &updateQuery)
}) })
if retryErr != nil { if retryErr != nil {
@ -994,7 +1019,7 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string
resp, retryErr := r.thriftCallWithRetries( resp, retryErr := r.thriftCallWithRetries(
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.RollbackJobUpdate(nil, &key, message) return r.client.RollbackJobUpdate(context.TODO(), &key, message)
}) })
if retryErr != nil { if retryErr != nil {

View file

@ -1,6 +1,8 @@
package realis package realis
import ( import (
"context"
"github.com/paypal/gorealis/gen-go/apache/aurora" "github.com/paypal/gorealis/gen-go/apache/aurora"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -27,7 +29,7 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr
resp, retryErr := r.thriftCallWithRetries( resp, retryErr := r.thriftCallWithRetries(
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.DrainHosts(nil, drainList) return r.adminClient.DrainHosts(context.TODO(), drainList)
}) })
if retryErr != nil { if retryErr != nil {
@ -44,7 +46,10 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr
// Start SLA Aware Drain. // Start SLA Aware Drain.
// defaultSlaPolicy is the fallback SlaPolicy to use if a task does not have an SlaPolicy. // defaultSlaPolicy is the fallback SlaPolicy to use if a task does not have an SlaPolicy.
// After timeoutSecs, tasks will be forcefully drained without checking SLA. // After timeoutSecs, tasks will be forcefully drained without checking SLA.
func (r *realisClient) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, hosts ...string) (*aurora.DrainHostsResult_, error) { func (r *realisClient) SLADrainHosts(
policy *aurora.SlaPolicy,
timeout int64,
hosts ...string) (*aurora.DrainHostsResult_, error) {
var result *aurora.DrainHostsResult_ var result *aurora.DrainHostsResult_
if len(hosts) == 0 { if len(hosts) == 0 {
@ -59,7 +64,7 @@ func (r *realisClient) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, ho
resp, retryErr := r.thriftCallWithRetries( resp, retryErr := r.thriftCallWithRetries(
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.SlaDrainHosts(nil, drainList, policy, timeout) return r.adminClient.SlaDrainHosts(context.TODO(), drainList, policy, timeout)
}) })
if retryErr != nil { if retryErr != nil {
@ -89,7 +94,7 @@ func (r *realisClient) StartMaintenance(hosts ...string) (*aurora.Response, *aur
resp, retryErr := r.thriftCallWithRetries( resp, retryErr := r.thriftCallWithRetries(
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.StartMaintenance(nil, hostList) return r.adminClient.StartMaintenance(context.TODO(), hostList)
}) })
if retryErr != nil { if retryErr != nil {
@ -119,7 +124,7 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror
resp, retryErr := r.thriftCallWithRetries( resp, retryErr := r.thriftCallWithRetries(
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.EndMaintenance(nil, hostList) return r.adminClient.EndMaintenance(context.TODO(), hostList)
}) })
if retryErr != nil { if retryErr != nil {
@ -151,7 +156,7 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au
resp, retryErr := r.thriftCallWithRetries( resp, retryErr := r.thriftCallWithRetries(
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.MaintenanceStatus(nil, hostList) return r.adminClient.MaintenanceStatus(context.TODO(), hostList)
}) })
if retryErr != nil { if retryErr != nil {
@ -166,7 +171,8 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au
} }
// SetQuota sets a quota aggregate for the given role // SetQuota sets a quota aggregate for the given role
// TODO(zircote) Currently investigating an error that is returned from thrift calls that include resources for `NamedPort` and `NumGpu` // TODO(zircote) Currently investigating an error that is returned
// from thrift calls that include resources for `NamedPort` and `NumGpu`
func (r *realisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb *int64) (*aurora.Response, error) { func (r *realisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb *int64) (*aurora.Response, error) {
quota := &aurora.ResourceAggregate{ quota := &aurora.ResourceAggregate{
Resources: []*aurora.Resource{{NumCpus: cpu}, {RamMb: ramMb}, {DiskMb: diskMb}}, Resources: []*aurora.Resource{{NumCpus: cpu}, {RamMb: ramMb}, {DiskMb: diskMb}},
@ -175,7 +181,7 @@ func (r *realisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb
resp, retryErr := r.thriftCallWithRetries( resp, retryErr := r.thriftCallWithRetries(
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.SetQuota(nil, role, quota) return r.adminClient.SetQuota(context.TODO(), role, quota)
}) })
if retryErr != nil { if retryErr != nil {
@ -191,7 +197,7 @@ func (r *realisClient) GetQuota(role string) (*aurora.Response, error) {
resp, retryErr := r.thriftCallWithRetries( resp, retryErr := r.thriftCallWithRetries(
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.GetQuota(nil, role) return r.adminClient.GetQuota(context.TODO(), role)
}) })
if retryErr != nil { if retryErr != nil {
@ -206,7 +212,7 @@ func (r *realisClient) Snapshot() error {
_, retryErr := r.thriftCallWithRetries( _, retryErr := r.thriftCallWithRetries(
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.Snapshot(nil) return r.adminClient.Snapshot(context.TODO())
}) })
if retryErr != nil { if retryErr != nil {
@ -222,7 +228,7 @@ func (r *realisClient) PerformBackup() error {
_, retryErr := r.thriftCallWithRetries( _, retryErr := r.thriftCallWithRetries(
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.PerformBackup(nil) return r.adminClient.PerformBackup(context.TODO())
}) })
if retryErr != nil { if retryErr != nil {
@ -237,7 +243,7 @@ func (r *realisClient) ForceImplicitTaskReconciliation() error {
_, retryErr := r.thriftCallWithRetries( _, retryErr := r.thriftCallWithRetries(
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.TriggerImplicitTaskReconciliation(nil) return r.adminClient.TriggerImplicitTaskReconciliation(context.TODO())
}) })
if retryErr != nil { if retryErr != nil {
@ -250,7 +256,7 @@ func (r *realisClient) ForceImplicitTaskReconciliation() error {
func (r *realisClient) ForceExplicitTaskReconciliation(batchSize *int32) error { func (r *realisClient) ForceExplicitTaskReconciliation(batchSize *int32) error {
if batchSize != nil && *batchSize < 1 { if batchSize != nil && *batchSize < 1 {
return errors.New("Invalid batch size.") return errors.New("invalid batch size")
} }
settings := aurora.NewExplicitReconciliationSettings() settings := aurora.NewExplicitReconciliationSettings()
@ -258,7 +264,7 @@ func (r *realisClient) ForceExplicitTaskReconciliation(batchSize *int32) error {
_, retryErr := r.thriftCallWithRetries(false, _, retryErr := r.thriftCallWithRetries(false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.TriggerExplicitTaskReconciliation(nil, settings) return r.adminClient.TriggerExplicitTaskReconciliation(context.TODO(), settings)
}) })
if retryErr != nil { if retryErr != nil {

View file

@ -76,6 +76,8 @@ func TestNonExistentEndpoint(t *testing.T) {
realis.TimeoutMS(200), realis.TimeoutMS(200),
realis.BackOff(backoff), realis.BackOff(backoff),
) )
assert.NoError(t, err)
defer r.Close() defer r.Close()
taskQ := &aurora.TaskQuery{} taskQ := &aurora.TaskQuery{}
@ -349,7 +351,7 @@ func TestRealisClient_GetPendingReason(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
assert.Len(t, reasons, 1) assert.Len(t, reasons, 1)
resp, err = r.KillJob(job.JobKey()) _, err = r.KillJob(job.JobKey())
assert.NoError(t, err) assert.NoError(t, err)
} }
@ -515,7 +517,7 @@ func TestRealisClient_CreateService(t *testing.T) {
assert.Len(t, updateSummaries, 1) assert.Len(t, updateSummaries, 1)
_, err = r.AbortJobUpdate(*updateSummaries[0].Key, "Cleaning up") r.AbortJobUpdate(*updateSummaries[0].Key, "Cleaning up")
_, err = r.KillJob(job.JobKey()) _, err = r.KillJob(job.JobKey())
assert.NoError(t, err) assert.NoError(t, err)
@ -770,13 +772,10 @@ func TestRealisClient_Quota(t *testing.T) {
switch true { switch true {
case res.DiskMb != nil: case res.DiskMb != nil:
assert.Equal(t, disk, *res.DiskMb) assert.Equal(t, disk, *res.DiskMb)
break
case res.NumCpus != nil: case res.NumCpus != nil:
assert.Equal(t, cpu, *res.NumCpus) assert.Equal(t, cpu, *res.NumCpus)
break
case res.RamMb != nil: case res.RamMb != nil:
assert.Equal(t, ram, *res.RamMb) assert.Equal(t, ram, *res.RamMb)
break
} }
} }
}) })
@ -829,7 +828,7 @@ func TestRealisClient_PartitionPolicy(t *testing.T) {
} }
// Clean up after finishing test // Clean up after finishing test
_, err = r.KillJob(job.JobKey()) r.KillJob(job.JobKey())
} }
func TestAuroraJob_UpdateSlaPolicy(t *testing.T) { func TestAuroraJob_UpdateSlaPolicy(t *testing.T) {
@ -904,3 +903,7 @@ func TestAuroraJob_UpdateSlaPolicy(t *testing.T) {
}) })
} }
} }
func TestAuroraURLValidator(t *testing.T) {
}

View file

@ -77,7 +77,8 @@ func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc)
adjusted = Jitter(duration, backoff.Jitter) adjusted = Jitter(duration, backoff.Jitter)
} }
logger.Printf("A retryable error occurred during function call, backing off for %v before retrying\n", adjusted) logger.Printf(
"A retryable error occurred during function call, backing off for %v before retrying\n", adjusted)
time.Sleep(adjusted) time.Sleep(adjusted)
duration = time.Duration(float64(duration) * backoff.Factor) duration = time.Duration(float64(duration) * backoff.Factor)
} }
@ -116,7 +117,10 @@ func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc)
type auroraThriftCall func() (resp *aurora.Response, err error) type auroraThriftCall func() (resp *aurora.Response, err error)
// Duplicates the functionality of ExponentialBackoff but is specifically targeted towards ThriftCalls. // Duplicates the functionality of ExponentialBackoff but is specifically targeted towards ThriftCalls.
func (r *realisClient) thriftCallWithRetries(returnOnTimeout bool, thriftCall auroraThriftCall) (*aurora.Response, error) { func (r *realisClient) thriftCallWithRetries(
returnOnTimeout bool,
thriftCall auroraThriftCall) (*aurora.Response, error) {
var resp *aurora.Response var resp *aurora.Response
var clientErr error var clientErr error
var curStep int var curStep int
@ -134,7 +138,10 @@ func (r *realisClient) thriftCallWithRetries(returnOnTimeout bool, thriftCall au
adjusted = Jitter(duration, backoff.Jitter) adjusted = Jitter(duration, backoff.Jitter)
} }
r.logger.Printf("A retryable error occurred during thrift call, backing off for %v before retry %v\n", adjusted, curStep) r.logger.Printf(
"A retryable error occurred during thrift call, backing off for %v before retry %v\n",
adjusted,
curStep)
time.Sleep(adjusted) time.Sleep(adjusted)
duration = time.Duration(float64(duration) * backoff.Factor) duration = time.Duration(float64(duration) * backoff.Factor)
@ -179,7 +186,8 @@ func (r *realisClient) thriftCallWithRetries(returnOnTimeout bool, thriftCall au
if e.Timeout() { if e.Timeout() {
timeouts++ timeouts++
r.logger.DebugPrintf( r.logger.DebugPrintf(
"Client closed connection (timedout) %d times before server responded, consider increasing connection timeout", "Client closed connection (timedout) %d times before server responded, "+
"consider increasing connection timeout",
timeouts) timeouts)
if returnOnTimeout { if returnOnTimeout {
return resp, newTimedoutError(errors.New("client connection closed before server answer")) return resp, newTimedoutError(errors.New("client connection closed before server answer"))
@ -190,7 +198,8 @@ func (r *realisClient) thriftCallWithRetries(returnOnTimeout bool, thriftCall au
// In the future, reestablish connection should be able to check if it is actually possible // In the future, reestablish connection should be able to check if it is actually possible
// to make a thrift call to Aurora. For now, a reconnect should always lead to a retry. // to make a thrift call to Aurora. For now, a reconnect should always lead to a retry.
r.ReestablishConn() // Ignoring error due to the fact that an error should be retried regardless
_ = r.ReestablishConn()
} else { } else {

View file

@ -31,7 +31,12 @@ func NewDefaultUpdateJob(config *aurora.TaskConfig) *UpdateJob {
req.TaskConfig = config req.TaskConfig = config
req.Settings = NewUpdateSettings() req.Settings = NewUpdateSettings()
job := NewJob().(*AuroraJob) job, ok := NewJob().(*AuroraJob)
if !ok {
// This should never happen but it is here as a safeguard
return nil
}
job.jobConfig.TaskConfig = config job.jobConfig.TaskConfig = config
// Rebuild resource map from TaskConfig // Rebuild resource map from TaskConfig
@ -75,7 +80,11 @@ func NewUpdateJob(config *aurora.TaskConfig, settings *aurora.JobUpdateSettings)
req.TaskConfig = config req.TaskConfig = config
req.Settings = settings req.Settings = settings
job := NewJob().(*AuroraJob) job, ok := NewJob().(*AuroraJob)
if !ok {
// This should never happen but it is here as a safeguard
return nil
}
job.jobConfig.TaskConfig = config job.jobConfig.TaskConfig = config
// Rebuild resource map from TaskConfig // Rebuild resource map from TaskConfig

View file

@ -67,7 +67,7 @@ func validateAndPopulateAuroraURL(urlStr string) (string, error) {
return "", errors.Errorf("only protocols http and https are supported %v\n", u.Scheme) return "", errors.Errorf("only protocols http and https are supported %v\n", u.Scheme)
} }
if u.Path != "/api" { if u.Path != APIPath {
return "", errors.Errorf("expected /api path %v\n", u.Path) return "", errors.Errorf("expected /api path %v\n", u.Path)
} }

3
zk.go
View file

@ -146,7 +146,8 @@ func LeaderFromZKOpts(options ...ZKOpt) (string, error) {
// This should never be encountered as it would indicate Aurora // This should never be encountered as it would indicate Aurora
// writing bad info into Zookeeper but is kept here as a safety net. // writing bad info into Zookeeper but is kept here as a safety net.
if len(serviceInst.AdditionalEndpoints) > 1 { if len(serviceInst.AdditionalEndpoints) > 1 {
return false, NewTemporaryError(errors.New("ambiguous endpoints in json blob, Aurora wrote bad info to ZK")) return false,
NewTemporaryError(errors.New("ambiguous endpoints in json blob, Aurora wrote bad info to ZK"))
} }
var scheme, host, port string var scheme, host, port string