diff --git a/examples/client.go b/examples/client.go index e1e3411..aaee92e 100644 --- a/examples/client.go +++ b/examples/client.go @@ -24,8 +24,6 @@ import ( "strings" - "log" - "github.com/paypal/gorealis" "github.com/paypal/gorealis/gen-go/apache/aurora" "github.com/paypal/gorealis/response" @@ -85,13 +83,13 @@ func main() { realis.BasicAuth(username, password), realis.ThriftJSON(), realis.TimeoutMS(CONNECTION_TIMEOUT), - realis.BackOff(&realis.Backoff{ + realis.BackOff(realis.Backoff{ Steps: 2, Duration: 10 * time.Second, Factor: 2.0, Jitter: 0.1, }), - realis.SetLogger(log.New(os.Stdout, "realis-debug: ", log.Ldate)), + realis.Debug(), } //check if zkUrl is available. @@ -432,8 +430,8 @@ func main() { case "pauseJobUpdate": resp, err := r.PauseJobUpdate(&aurora.JobUpdateKey{ Job: job.JobKey(), - ID: updateId, - }, "") + ID: updateId, + }, "") if err != nil { fmt.Println(err) @@ -443,7 +441,7 @@ func main() { case "resumeJobUpdate": resp, err := r.ResumeJobUpdate(&aurora.JobUpdateKey{ Job: job.JobKey(), - ID: updateId, + ID: updateId, }, "") if err != nil { @@ -454,8 +452,8 @@ func main() { case "pulseJobUpdate": resp, err := r.PulseJobUpdate(&aurora.JobUpdateKey{ Job: job.JobKey(), - ID: updateId, - }) + ID: updateId, + }) if err != nil { fmt.Println(err) } diff --git a/logger.go b/logger.go index aa1e377..4597dde 100644 --- a/logger.go +++ b/logger.go @@ -27,3 +27,29 @@ func (NoopLogger) Printf(format string, a ...interface{}) {} func (NoopLogger) Print(a ...interface{}) {} func (NoopLogger) Println(a ...interface{}) {} + +type LevelLogger struct { + Logger + debug bool +} + +func (l LevelLogger) DebugPrintf(format string, a ...interface{}) { + if l.debug { + l.Print("[DEBUG] ") + l.Printf(format, a) + } +} + +func (l LevelLogger) DebugPrint(a ...interface{}) { + if l.debug { + l.Print("[DEBUG] ") + l.Print(a) + } +} + +func (l LevelLogger) DebugPrintln(a ...interface{}) { + if l.debug { + l.Print("[DEBUG] ") + l.Println(a) + } +} diff --git a/monitors.go b/monitors.go index 1799658..ac5b3fa 100644 --- a/monitors.go +++ b/monitors.go @@ -69,7 +69,7 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout // if we encounter an inactive state and it is not at rolled forward, update failed switch status { case aurora.JobUpdateStatus_ROLLED_FORWARD: - m.Client.RealisConfig().logger.Println("Update succeded") + m.Client.RealisConfig().logger.Println("Update succeeded") return true, nil case aurora.JobUpdateStatus_FAILED: m.Client.RealisConfig().logger.Println("Update failed") diff --git a/realis.go b/realis.go index dde0e6d..4fd90bb 100644 --- a/realis.go +++ b/realis.go @@ -21,12 +21,14 @@ import ( "encoding/base64" "fmt" "io/ioutil" + "log" "net/http" "net/http/cookiejar" + "os" "path/filepath" - "time" - + "strings" "sync" + "time" "git.apache.org/thrift.git/lib/go/thrift" "github.com/paypal/gorealis/gen-go/apache/aurora" @@ -80,8 +82,9 @@ type realisClient struct { client *aurora.AuroraSchedulerManagerClient readonlyClient *aurora.ReadOnlySchedulerClient adminClient *aurora.AuroraAdminClient - logger Logger - lock sync.Mutex + logger LevelLogger + lock *sync.Mutex + debug bool } type RealisConfig struct { @@ -90,7 +93,7 @@ type RealisConfig struct { timeoutms int binTransport, jsonTransport bool cluster *Cluster - backoff *Backoff + backoff Backoff transport thrift.TTransport protoFactory thrift.TProtocolFactory logger Logger @@ -98,6 +101,8 @@ type RealisConfig struct { certspath string clientkey, clientcert string options []ClientOption + debug bool + zkOptions []ZKOpt } var defaultBackoff = Backoff{ @@ -136,12 +141,19 @@ func ZKCluster(cluster *Cluster) ClientOption { } func ZKUrl(url string) ClientOption { + + opts := []ZKOpt{ZKEndpoints(strings.Split(url, ",")...), ZKPath("/aurora/scheduler")} + return func(config *RealisConfig) { - config.cluster = GetDefaultClusterFromZKUrl(url) + if config.zkOptions == nil { + config.zkOptions = opts + } else { + config.zkOptions = append(config.zkOptions, opts...) + } } } -func Retries(backoff *Backoff) ClientOption { +func Retries(backoff Backoff) ClientOption { return func(config *RealisConfig) { config.backoff = backoff } @@ -159,7 +171,7 @@ func ThriftBinary() ClientOption { } } -func BackOff(b *Backoff) ClientOption { +func BackOff(b Backoff) ClientOption { return func(config *RealisConfig) { config.backoff = b } @@ -183,13 +195,29 @@ func ClientCerts(clientKey, clientCert string) ClientOption { } } -// Using the word set to avoid name collision with Interface +// Use this option if you'd like to override default settings for connecting to Zookeeper. +// For example, this can be used to override the scheme to be used for communicating with Aurora (e.g. https). +// See zk.go for what is possible to set as an option. +func ZookeeperOptions(opts ...ZKOpt) ClientOption { + return func(config *RealisConfig) { + config.zkOptions = opts + } +} + +// Using the word set to avoid name collision with Interface. func SetLogger(l Logger) ClientOption { return func(config *RealisConfig) { config.logger = l } } +// Enable debug statements. +func Debug() ClientOption { + return func(config *RealisConfig) { + config.debug = true + } +} + func newTJSONTransport(url string, timeout int, config *RealisConfig) (thrift.TTransport, error) { trans, err := defaultTTransport(url, timeout, config) if err != nil { @@ -220,8 +248,10 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { // Default configs config.timeoutms = 10000 - config.backoff = &defaultBackoff - config.logger = NoopLogger{} + config.backoff = defaultBackoff + config.logger = log.New(os.Stdout, "realis: ", log.Ltime|log.Ldate|log.LUTC) + + // Save options to recreate client if a connection error happens config.options = options // Override default configs where necessary @@ -229,6 +259,11 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { opt(config) } + // Turn off all logging (including debug) + if config.logger == nil { + config.logger = LevelLogger{NoopLogger{}, false} + } + config.logger.Println("Number of options applied to config: ", len(options)) //Set default Transport to JSON if needed. @@ -239,9 +274,16 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { var url string var err error - // Determine how to get information to connect to the scheduler. - // Prioritize getting leader from ZK over using a direct URL. - if config.cluster != nil { + // Find the leader using custom Zookeeper options if options are provided + if config.zkOptions != nil { + url, err = LeaderFromZKOpts(config.zkOptions...) + if err != nil { + return nil, NewTemporaryError(errors.Wrap(err, "LeaderFromZK error")) + } + config.logger.Println("Scheduler URL from ZK: ", url) + } else if config.cluster != nil { + // Determine how to get information to connect to the scheduler. + // Prioritize getting leader from ZK over using a direct URL. url, err = LeaderFromZK(*config.cluster) // If ZK is configured, throw an error if the leader is unable to be determined if err != nil { @@ -252,7 +294,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { url = config.url config.logger.Println("Scheduler URL: ", url) } else { - return nil, errors.New("Incomplete Options -- url or cluster required") + return nil, errors.New("Incomplete Options -- url, cluster.json, or Zookeeper address required") } if config.jsonTransport { @@ -274,9 +316,10 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { config.logger.Printf("gorealis config url: %+v\n", url) - //Basic Authentication. + // Adding Basic Authentication. if config.username != "" && config.password != "" { - AddBasicAuth(config, config.username, config.password) + httpTrans := (config.transport).(*thrift.THttpClient) + httpTrans.SetHeader("Authorization", "Basic "+basicAuth(config.username, config.password)) } return &realisClient{ @@ -284,7 +327,8 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { client: aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory), readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory), adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory), - logger: config.logger}, nil + logger: LevelLogger{config.logger, config.debug}, + lock: &sync.Mutex{}}, nil } func GetDefaultClusterFromZKUrl(zkurl string) *Cluster { @@ -404,14 +448,6 @@ func newTBinaryConfig(url string, timeoutms int, config *RealisConfig) (*RealisC } -// Helper function to add basic authorization needed to communicate with Apache Aurora. -func AddBasicAuth(config *RealisConfig, username string, password string) { - config.username = username - config.password = password - httpTrans := (config.transport).(*thrift.THttpClient) - httpTrans.SetHeader("Authorization", "Basic "+basicAuth(username, password)) -} - func basicAuth(username, password string) string { auth := username + ":" + password return base64.StdEncoding.EncodeToString([]byte(auth)) @@ -458,6 +494,8 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche Statuses: states, } + r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", taskQ) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.GetTasksWithoutConfigs(taskQ) }) @@ -478,6 +516,8 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche } func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error) { + r.logger.DebugPrintf("GetJobUpdateSummaries Thrift Payload: %+v\n", jobUpdateQuery) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.readonlyClient.GetJobUpdateSummaries(jobUpdateQuery) }) @@ -510,6 +550,7 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe // Kill specific instances of a job. func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) { + r.logger.DebugPrintf("KillTasks Thrift Payload: %+v %v\n", key, instances) instanceIds := make(map[int32]bool) @@ -534,6 +575,8 @@ func (r *realisClient) RealisConfig() *RealisConfig { // Sends a kill message to the scheduler for all active tasks under a job. func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { + r.logger.DebugPrintf("KillTasks Thrift Payload: %+v\n", key) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { // Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards return r.client.KillTasks(key, nil, "") @@ -551,6 +594,8 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { // Use this API to create ad-hoc jobs. func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { + r.logger.DebugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig()) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.CreateJob(auroraJob.JobConfig()) }) @@ -570,17 +615,18 @@ func (r *realisClient) CreateService(auroraJob Job, settings *aurora.JobUpdateSe resp, err := r.StartJobUpdate(update, "") if err != nil { - return resp, nil, errors.Wrap(err, "unable to create service") + return nil, nil, errors.Wrap(err, "unable to create service") } if resp != nil && resp.GetResult_() != nil { return resp, resp.GetResult_().GetStartJobUpdateResult_(), nil } - return resp, nil, errors.New("results object is nil") + return nil, nil, errors.New("results object is nil") } func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) { + r.logger.DebugPrintf("ScheduleCronJob Thrift Payload: %+v\n", auroraJob.JobConfig()) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.ScheduleCronJob(auroraJob.JobConfig()) @@ -594,6 +640,8 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) { + r.logger.DebugPrintf("DescheduleCronJob Thrift Payload: %+v\n", key) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.DescheduleCronJob(key) }) @@ -608,6 +656,8 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error) { + r.logger.DebugPrintf("StartCronJob Thrift Payload: %+v\n", key) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.StartCronJob(key) }) @@ -621,6 +671,8 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error // Restarts specific instances specified func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) { + r.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instances) + instanceIds := make(map[int32]bool) for _, instId := range instances { @@ -645,6 +697,8 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) return nil, errors.Wrap(err1, "Could not retrieve relevant task instance IDs") } + r.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instanceIds) + if len(instanceIds) > 0 { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.RestartShards(key, instanceIds) @@ -663,6 +717,8 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) // Update all tasks under a job configuration. Currently gorealis doesn't support for canary deployments. func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) { + r.logger.DebugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.StartJobUpdate(updateJob.req, message) }) @@ -676,6 +732,8 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au // Abort Job Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI. func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) { + r.logger.DebugPrintf("AbortJobUpdate Thrift Payload: %+v %v\n", updateKey, message) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.AbortJobUpdate(&updateKey, message) }) @@ -689,6 +747,8 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str //Pause Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) { + r.logger.DebugPrintf("PauseJobUpdate Thrift Payload: %+v %v\n", updateKey, message) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.PauseJobUpdate(updateKey, message) }) @@ -703,6 +763,8 @@ func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st //Resume Paused Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) { + r.logger.DebugPrintf("ResumeJobUpdate Thrift Payload: %+v %v\n", updateKey, message) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.ResumeJobUpdate(updateKey, message) }) @@ -717,6 +779,8 @@ func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s //Pulse Job Update on Aurora. UpdateID is returned from StartJobUpdate or the Aurora web UI. func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.Response, error) { + r.logger.DebugPrintf("PulseJobUpdate Thrift Payload: %+v\n", updateKey) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.PulseJobUpdate(updateKey) }) @@ -732,6 +796,8 @@ func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R // instance to scale up. func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) { + r.logger.DebugPrintf("AddInstances Thrift Payload: %+v %v\n", instKey, count) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.AddInstances(&instKey, count) }) @@ -768,6 +834,8 @@ func (r *realisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora // Get information about task including a fully hydrated task configuration object func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) { + r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", query) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.GetTasksStatus(query) }) @@ -782,6 +850,8 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.S // Get information about task including without a task configuration object func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) { + r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.GetTasksWithoutConfigs(query) }) @@ -808,6 +878,8 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task Statuses: aurora.ACTIVE_STATES, } + r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", taskQ) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.GetTasksStatus(taskQ) }) @@ -832,6 +904,8 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aurora.Response, error) { + r.logger.DebugPrintf("GetJobUpdateDetails Thrift Payload: %+v\n", updateQuery) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.GetJobUpdateDetails(&updateQuery) }) @@ -845,6 +919,8 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) { + r.logger.DebugPrintf("RollbackJobUpdate Thrift Payload: %+v %v\n", key, message) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.RollbackJobUpdate(&key, message) }) @@ -872,6 +948,8 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr drainList.HostNames[host] = true } + r.logger.DebugPrintf("DrainHosts Thrift Payload: %v\n", drainList) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.adminClient.DrainHosts(drainList) }) @@ -901,6 +979,8 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror hostList.HostNames[host] = true } + r.logger.DebugPrintf("EndMaintenance Thrift Payload: %v\n", hostList) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.adminClient.EndMaintenance(hostList) }) @@ -930,6 +1010,8 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au hostList.HostNames[host] = true } + r.logger.DebugPrintf("MaintenanceStatus Thrift Payload: %v\n", hostList) + // Make thrift call. If we encounter an error sending the call, attempt to reconnect // and continue trying to resend command until we run out of retries. resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { @@ -977,7 +1059,7 @@ func (r *realisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb func (r *realisClient) GetQuota(role string) (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - resp, retryErr :=r.adminClient.GetQuota(role) + resp, retryErr := r.adminClient.GetQuota(role) if retryErr != nil { return nil, errors.Wrap(retryErr, "Unable to get role quota") diff --git a/realis_e2e_test.go b/realis_e2e_test.go index ae61cf7..d27bc11 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -40,7 +40,9 @@ func TestMain(m *testing.M) { // New configuration to connect to Vagrant image r, err = realis.NewRealisClient(realis.SchedulerUrl("http://192.168.33.7:8081"), realis.BasicAuth("aurora", "secret"), - realis.TimeoutMS(20000)) + realis.TimeoutMS(20000), + realis.Debug()) + if err != nil { fmt.Println("Please run vagrant box before running test suite") os.Exit(1) @@ -61,7 +63,7 @@ func TestMain(m *testing.M) { } func TestNonExistentEndpoint(t *testing.T) { - backoff := &realis.Backoff{ // Reduce penalties for this test to make it quick + backoff := realis.Backoff{ // Reduce penalties for this test to make it quick Steps: 5, Duration: 1 * time.Second, Factor: 1.0, diff --git a/retry.go b/retry.go index 0e783f3..39ad106 100644 --- a/retry.go +++ b/retry.go @@ -134,7 +134,7 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro adjusted = Jitter(duration, backoff.Jitter) } - r.logger.Printf("A retriable error occurred during thrift call, backing off for %v before retrying\n", adjusted) + r.logger.Printf("A retriable error occurred during thrift call, backing off for %v before retry %v\n", adjusted, curStep) time.Sleep(adjusted) duration = time.Duration(float64(duration) * backoff.Factor) @@ -146,7 +146,10 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro func() { r.lock.Lock() defer r.lock.Unlock() + resp, clientErr = thriftCall() + + r.logger.DebugPrintf("Aurora Thrift Call ended resp: %v clientErr: %v\n", resp, clientErr) }() // Check if our thrift call is returning an error. This is a retriable event as we don't know @@ -154,48 +157,52 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro if clientErr != nil { // Print out the error to the user - r.logger.Println(clientErr) - - r.ReestablishConn() + r.logger.Printf("Client Error: %v\n", clientErr) // In the future, reestablish connection should be able to check if it is actually possible // to make a thrift call to Aurora. For now, a reconnect should always lead to a retry. - continue - } + r.ReestablishConn() - // If there was no client error, but the response is nil, something went wrong. - // Ideally, we'll never encounter this but we're placing a safeguard here. - if resp == nil { - return nil, errors.New("Response from aurora is nil") - } + } else { - // Check Response Code from thrift and make a decision to continue retrying or not. - switch responseCode := resp.GetResponseCode(); responseCode { + // If there was no client error, but the response is nil, something went wrong. + // Ideally, we'll never encounter this but we're placing a safeguard here. + if resp == nil { + return nil, errors.New("Response from aurora is nil") + } - // If the thrift call succeeded, stop retrying - case aurora.ResponseCode_OK: - return resp, nil + // Check Response Code from thrift and make a decision to continue retrying or not. + switch responseCode := resp.GetResponseCode(); responseCode { - // If the response code is transient, continue retrying - case aurora.ResponseCode_ERROR_TRANSIENT: - r.logger.Println("Aurora replied with Transient error code, retrying") - continue + // If the thrift call succeeded, stop retrying + case aurora.ResponseCode_OK: + return resp, nil - // Failure scenarios, these indicate a bad payload or a bad config. Stop retrying. - case aurora.ResponseCode_INVALID_REQUEST: - case aurora.ResponseCode_ERROR: - case aurora.ResponseCode_AUTH_FAILED: - case aurora.ResponseCode_JOB_UPDATING_ERROR: - return nil, errors.New(response.CombineMessage(resp)) + // If the response code is transient, continue retrying + case aurora.ResponseCode_ERROR_TRANSIENT: + r.logger.Println("Aurora replied with Transient error code, retrying") + continue - // The only case that should fall down to here is a WARNING response code. - // It is currently not used as a response in the scheduler so it is unknown how to handle it. - default: - return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String()) + // Failure scenarios, these indicate a bad payload or a bad config. Stop retrying. + case aurora.ResponseCode_INVALID_REQUEST: + case aurora.ResponseCode_ERROR: + case aurora.ResponseCode_AUTH_FAILED: + case aurora.ResponseCode_JOB_UPDATING_ERROR: + r.logger.Println("Terminal bad reply from Aurora, won't retry") + return nil, errors.New(response.CombineMessage(resp)) + + // The only case that should fall down to here is a WARNING response code. + // It is currently not used as a response in the scheduler so it is unknown how to handle it. + default: + r.logger.DebugPrintf("unhandled response code %v received from Aurora\n", responseCode) + return nil, errors.Errorf("unhandled response code from Aurora %v\n", responseCode.String()) + } } } + r.logger.DebugPrintf("it took %v retries to complete this operation\n", curStep) + if curStep > 1 { r.config.logger.Printf("retried this thrift call %d time(s)", curStep) } diff --git a/zk.go b/zk.go index dd711e0..34ad1a7 100644 --- a/zk.go +++ b/zk.go @@ -36,11 +36,13 @@ type ServiceInstance struct { } type zkConfig struct { - endpoints []string - path string - backoff Backoff - timeout time.Duration - logger Logger + endpoints []string + path string + backoff Backoff + timeout time.Duration + logger Logger + auroraSchemeOverride *string + auroraPortOverride *int } type ZKOpt func(z *zkConfig)