From 4f5766b4438c7c9a1d95cfbb9f039168cd7b2118 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Fri, 13 Apr 2018 11:03:29 -0700 Subject: [PATCH 1/6] Misc. bug fixes and addition of debug logging (#61) * Fixing possible race condition when passing backoff around as a pointer. * Adding a debug logger that is turned off by default. If debug is turned on, but a logger has not been assigned, a default logger that will print to STDOUT will be created. * Making Mutex a pointer so that there's no chance it can accidentally be copied. * Removing a leftover helper function from before we changed how we configured the client. * Minor changes to demonstrate how a logger can be used in conjunction to debug mode in the sample client. --- examples/client.go | 16 +++---- logger.go | 26 ++++++++++++ monitors.go | 2 +- realis.go | 102 +++++++++++++++++++++++++++++++++++---------- realis_e2e_test.go | 6 ++- retry.go | 67 ++++++++++++++++------------- 6 files changed, 156 insertions(+), 63 deletions(-) diff --git a/examples/client.go b/examples/client.go index e1e3411..7162408 100644 --- a/examples/client.go +++ b/examples/client.go @@ -18,14 +18,13 @@ import ( "flag" "fmt" "io/ioutil" + "log" "os" "time" "strings" - "log" - "github.com/paypal/gorealis" "github.com/paypal/gorealis/gen-go/apache/aurora" "github.com/paypal/gorealis/response" @@ -85,13 +84,14 @@ func main() { realis.BasicAuth(username, password), realis.ThriftJSON(), realis.TimeoutMS(CONNECTION_TIMEOUT), - realis.BackOff(&realis.Backoff{ + realis.BackOff(realis.Backoff{ Steps: 2, Duration: 10 * time.Second, Factor: 2.0, Jitter: 0.1, }), realis.SetLogger(log.New(os.Stdout, "realis-debug: ", log.Ldate)), + realis.Debug(), } //check if zkUrl is available. @@ -432,8 +432,8 @@ func main() { case "pauseJobUpdate": resp, err := r.PauseJobUpdate(&aurora.JobUpdateKey{ Job: job.JobKey(), - ID: updateId, - }, "") + ID: updateId, + }, "") if err != nil { fmt.Println(err) @@ -443,7 +443,7 @@ func main() { case "resumeJobUpdate": resp, err := r.ResumeJobUpdate(&aurora.JobUpdateKey{ Job: job.JobKey(), - ID: updateId, + ID: updateId, }, "") if err != nil { @@ -454,8 +454,8 @@ func main() { case "pulseJobUpdate": resp, err := r.PulseJobUpdate(&aurora.JobUpdateKey{ Job: job.JobKey(), - ID: updateId, - }) + ID: updateId, + }) if err != nil { fmt.Println(err) } diff --git a/logger.go b/logger.go index aa1e377..4597dde 100644 --- a/logger.go +++ b/logger.go @@ -27,3 +27,29 @@ func (NoopLogger) Printf(format string, a ...interface{}) {} func (NoopLogger) Print(a ...interface{}) {} func (NoopLogger) Println(a ...interface{}) {} + +type LevelLogger struct { + Logger + debug bool +} + +func (l LevelLogger) DebugPrintf(format string, a ...interface{}) { + if l.debug { + l.Print("[DEBUG] ") + l.Printf(format, a) + } +} + +func (l LevelLogger) DebugPrint(a ...interface{}) { + if l.debug { + l.Print("[DEBUG] ") + l.Print(a) + } +} + +func (l LevelLogger) DebugPrintln(a ...interface{}) { + if l.debug { + l.Print("[DEBUG] ") + l.Println(a) + } +} diff --git a/monitors.go b/monitors.go index 1799658..ac5b3fa 100644 --- a/monitors.go +++ b/monitors.go @@ -69,7 +69,7 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout // if we encounter an inactive state and it is not at rolled forward, update failed switch status { case aurora.JobUpdateStatus_ROLLED_FORWARD: - m.Client.RealisConfig().logger.Println("Update succeded") + m.Client.RealisConfig().logger.Println("Update succeeded") return true, nil case aurora.JobUpdateStatus_FAILED: m.Client.RealisConfig().logger.Println("Update failed") diff --git a/realis.go b/realis.go index dde0e6d..c9291d1 100644 --- a/realis.go +++ b/realis.go @@ -21,8 +21,10 @@ import ( "encoding/base64" "fmt" "io/ioutil" + "log" "net/http" "net/http/cookiejar" + "os" "path/filepath" "time" @@ -80,8 +82,9 @@ type realisClient struct { client *aurora.AuroraSchedulerManagerClient readonlyClient *aurora.ReadOnlySchedulerClient adminClient *aurora.AuroraAdminClient - logger Logger - lock sync.Mutex + logger LevelLogger + lock *sync.Mutex + debug bool } type RealisConfig struct { @@ -90,7 +93,7 @@ type RealisConfig struct { timeoutms int binTransport, jsonTransport bool cluster *Cluster - backoff *Backoff + backoff Backoff transport thrift.TTransport protoFactory thrift.TProtocolFactory logger Logger @@ -98,6 +101,7 @@ type RealisConfig struct { certspath string clientkey, clientcert string options []ClientOption + debug bool } var defaultBackoff = Backoff{ @@ -141,7 +145,7 @@ func ZKUrl(url string) ClientOption { } } -func Retries(backoff *Backoff) ClientOption { +func Retries(backoff Backoff) ClientOption { return func(config *RealisConfig) { config.backoff = backoff } @@ -159,7 +163,7 @@ func ThriftBinary() ClientOption { } } -func BackOff(b *Backoff) ClientOption { +func BackOff(b Backoff) ClientOption { return func(config *RealisConfig) { config.backoff = b } @@ -183,13 +187,20 @@ func ClientCerts(clientKey, clientCert string) ClientOption { } } -// Using the word set to avoid name collision with Interface +// Using the word set to avoid name collision with Interface. func SetLogger(l Logger) ClientOption { return func(config *RealisConfig) { config.logger = l } } +// Enable debug statements. +func Debug() ClientOption { + return func(config *RealisConfig) { + config.debug = true + } +} + func newTJSONTransport(url string, timeout int, config *RealisConfig) (thrift.TTransport, error) { trans, err := defaultTTransport(url, timeout, config) if err != nil { @@ -220,8 +231,10 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { // Default configs config.timeoutms = 10000 - config.backoff = &defaultBackoff - config.logger = NoopLogger{} + config.backoff = defaultBackoff + config.logger = LevelLogger{NoopLogger{}, false} + + // Save options to recreate client if a connection error happens config.options = options // Override default configs where necessary @@ -231,6 +244,11 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { config.logger.Println("Number of options applied to config: ", len(options)) + // Set a logger if debug has been set to true but no logger has been set + if config.logger == nil && config.debug { + config.logger = log.New(os.Stdout, "realis: ", log.Ltime|log.Ldate|log.LUTC) + } + //Set default Transport to JSON if needed. if !config.jsonTransport && !config.binTransport { config.jsonTransport = true @@ -274,9 +292,10 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { config.logger.Printf("gorealis config url: %+v\n", url) - //Basic Authentication. + // Adding Basic Authentication. if config.username != "" && config.password != "" { - AddBasicAuth(config, config.username, config.password) + httpTrans := (config.transport).(*thrift.THttpClient) + httpTrans.SetHeader("Authorization", "Basic "+basicAuth(config.username, config.password)) } return &realisClient{ @@ -284,7 +303,8 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { client: aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory), readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory), adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory), - logger: config.logger}, nil + logger: LevelLogger{config.logger, config.debug}, + lock: &sync.Mutex{}}, nil } func GetDefaultClusterFromZKUrl(zkurl string) *Cluster { @@ -404,14 +424,6 @@ func newTBinaryConfig(url string, timeoutms int, config *RealisConfig) (*RealisC } -// Helper function to add basic authorization needed to communicate with Apache Aurora. -func AddBasicAuth(config *RealisConfig, username string, password string) { - config.username = username - config.password = password - httpTrans := (config.transport).(*thrift.THttpClient) - httpTrans.SetHeader("Authorization", "Basic "+basicAuth(username, password)) -} - func basicAuth(username, password string) string { auth := username + ":" + password return base64.StdEncoding.EncodeToString([]byte(auth)) @@ -458,6 +470,8 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche Statuses: states, } + r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", taskQ) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.GetTasksWithoutConfigs(taskQ) }) @@ -478,6 +492,8 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche } func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error) { + r.logger.DebugPrintf("GetJobUpdateSummaries Thrift Payload: %+v\n", jobUpdateQuery) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.readonlyClient.GetJobUpdateSummaries(jobUpdateQuery) }) @@ -510,6 +526,7 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe // Kill specific instances of a job. func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) { + r.logger.DebugPrintf("KillTasks Thrift Payload: %+v %v\n", key, instances) instanceIds := make(map[int32]bool) @@ -534,6 +551,8 @@ func (r *realisClient) RealisConfig() *RealisConfig { // Sends a kill message to the scheduler for all active tasks under a job. func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { + r.logger.DebugPrintf("KillTasks Thrift Payload: %+v\n", key) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { // Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards return r.client.KillTasks(key, nil, "") @@ -551,6 +570,8 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { // Use this API to create ad-hoc jobs. func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { + r.logger.DebugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig()) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.CreateJob(auroraJob.JobConfig()) }) @@ -570,17 +591,18 @@ func (r *realisClient) CreateService(auroraJob Job, settings *aurora.JobUpdateSe resp, err := r.StartJobUpdate(update, "") if err != nil { - return resp, nil, errors.Wrap(err, "unable to create service") + return nil, nil, errors.Wrap(err, "unable to create service") } if resp != nil && resp.GetResult_() != nil { return resp, resp.GetResult_().GetStartJobUpdateResult_(), nil } - return resp, nil, errors.New("results object is nil") + return nil, nil, errors.New("results object is nil") } func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) { + r.logger.DebugPrintf("ScheduleCronJob Thrift Payload: %+v\n", auroraJob.JobConfig()) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.ScheduleCronJob(auroraJob.JobConfig()) @@ -594,6 +616,8 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) { + r.logger.DebugPrintf("DescheduleCronJob Thrift Payload: %+v\n", key) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.DescheduleCronJob(key) }) @@ -608,6 +632,8 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error) { + r.logger.DebugPrintf("StartCronJob Thrift Payload: %+v\n", key) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.StartCronJob(key) }) @@ -621,6 +647,8 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error // Restarts specific instances specified func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) { + r.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instances) + instanceIds := make(map[int32]bool) for _, instId := range instances { @@ -645,6 +673,8 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) return nil, errors.Wrap(err1, "Could not retrieve relevant task instance IDs") } + r.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instanceIds) + if len(instanceIds) > 0 { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.RestartShards(key, instanceIds) @@ -663,6 +693,8 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) // Update all tasks under a job configuration. Currently gorealis doesn't support for canary deployments. func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) { + r.logger.DebugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.StartJobUpdate(updateJob.req, message) }) @@ -676,6 +708,8 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au // Abort Job Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI. func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) { + r.logger.DebugPrintf("AbortJobUpdate Thrift Payload: %+v %v\n", updateKey, message) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.AbortJobUpdate(&updateKey, message) }) @@ -689,6 +723,8 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str //Pause Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) { + r.logger.DebugPrintf("PauseJobUpdate Thrift Payload: %+v %v\n", updateKey, message) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.PauseJobUpdate(updateKey, message) }) @@ -703,6 +739,8 @@ func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st //Resume Paused Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) { + r.logger.DebugPrintf("ResumeJobUpdate Thrift Payload: %+v %v\n", updateKey, message) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.ResumeJobUpdate(updateKey, message) }) @@ -717,6 +755,8 @@ func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s //Pulse Job Update on Aurora. UpdateID is returned from StartJobUpdate or the Aurora web UI. func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.Response, error) { + r.logger.DebugPrintf("PulseJobUpdate Thrift Payload: %+v\n", updateKey) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.PulseJobUpdate(updateKey) }) @@ -732,6 +772,8 @@ func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R // instance to scale up. func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) { + r.logger.DebugPrintf("AddInstances Thrift Payload: %+v %v\n", instKey, count) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.AddInstances(&instKey, count) }) @@ -768,6 +810,8 @@ func (r *realisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora // Get information about task including a fully hydrated task configuration object func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) { + r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", query) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.GetTasksStatus(query) }) @@ -782,6 +826,8 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.S // Get information about task including without a task configuration object func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) { + r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.GetTasksWithoutConfigs(query) }) @@ -808,6 +854,8 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task Statuses: aurora.ACTIVE_STATES, } + r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", taskQ) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.GetTasksStatus(taskQ) }) @@ -832,6 +880,8 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aurora.Response, error) { + r.logger.DebugPrintf("GetJobUpdateDetails Thrift Payload: %+v\n", updateQuery) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.GetJobUpdateDetails(&updateQuery) }) @@ -845,6 +895,8 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) { + r.logger.DebugPrintf("RollbackJobUpdate Thrift Payload: %+v %v\n", key, message) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.RollbackJobUpdate(&key, message) }) @@ -872,6 +924,8 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr drainList.HostNames[host] = true } + r.logger.DebugPrintf("DrainHosts Thrift Payload: %v\n", drainList) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.adminClient.DrainHosts(drainList) }) @@ -901,6 +955,8 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror hostList.HostNames[host] = true } + r.logger.DebugPrintf("EndMaintenance Thrift Payload: %v\n", hostList) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.adminClient.EndMaintenance(hostList) }) @@ -930,6 +986,8 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au hostList.HostNames[host] = true } + r.logger.DebugPrintf("MaintenanceStatus Thrift Payload: %v\n", hostList) + // Make thrift call. If we encounter an error sending the call, attempt to reconnect // and continue trying to resend command until we run out of retries. resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { @@ -977,7 +1035,7 @@ func (r *realisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb func (r *realisClient) GetQuota(role string) (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - resp, retryErr :=r.adminClient.GetQuota(role) + resp, retryErr := r.adminClient.GetQuota(role) if retryErr != nil { return nil, errors.Wrap(retryErr, "Unable to get role quota") diff --git a/realis_e2e_test.go b/realis_e2e_test.go index ae61cf7..d27bc11 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -40,7 +40,9 @@ func TestMain(m *testing.M) { // New configuration to connect to Vagrant image r, err = realis.NewRealisClient(realis.SchedulerUrl("http://192.168.33.7:8081"), realis.BasicAuth("aurora", "secret"), - realis.TimeoutMS(20000)) + realis.TimeoutMS(20000), + realis.Debug()) + if err != nil { fmt.Println("Please run vagrant box before running test suite") os.Exit(1) @@ -61,7 +63,7 @@ func TestMain(m *testing.M) { } func TestNonExistentEndpoint(t *testing.T) { - backoff := &realis.Backoff{ // Reduce penalties for this test to make it quick + backoff := realis.Backoff{ // Reduce penalties for this test to make it quick Steps: 5, Duration: 1 * time.Second, Factor: 1.0, diff --git a/retry.go b/retry.go index 0e783f3..39ad106 100644 --- a/retry.go +++ b/retry.go @@ -134,7 +134,7 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro adjusted = Jitter(duration, backoff.Jitter) } - r.logger.Printf("A retriable error occurred during thrift call, backing off for %v before retrying\n", adjusted) + r.logger.Printf("A retriable error occurred during thrift call, backing off for %v before retry %v\n", adjusted, curStep) time.Sleep(adjusted) duration = time.Duration(float64(duration) * backoff.Factor) @@ -146,7 +146,10 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro func() { r.lock.Lock() defer r.lock.Unlock() + resp, clientErr = thriftCall() + + r.logger.DebugPrintf("Aurora Thrift Call ended resp: %v clientErr: %v\n", resp, clientErr) }() // Check if our thrift call is returning an error. This is a retriable event as we don't know @@ -154,48 +157,52 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro if clientErr != nil { // Print out the error to the user - r.logger.Println(clientErr) - - r.ReestablishConn() + r.logger.Printf("Client Error: %v\n", clientErr) // In the future, reestablish connection should be able to check if it is actually possible // to make a thrift call to Aurora. For now, a reconnect should always lead to a retry. - continue - } + r.ReestablishConn() - // If there was no client error, but the response is nil, something went wrong. - // Ideally, we'll never encounter this but we're placing a safeguard here. - if resp == nil { - return nil, errors.New("Response from aurora is nil") - } + } else { - // Check Response Code from thrift and make a decision to continue retrying or not. - switch responseCode := resp.GetResponseCode(); responseCode { + // If there was no client error, but the response is nil, something went wrong. + // Ideally, we'll never encounter this but we're placing a safeguard here. + if resp == nil { + return nil, errors.New("Response from aurora is nil") + } - // If the thrift call succeeded, stop retrying - case aurora.ResponseCode_OK: - return resp, nil + // Check Response Code from thrift and make a decision to continue retrying or not. + switch responseCode := resp.GetResponseCode(); responseCode { - // If the response code is transient, continue retrying - case aurora.ResponseCode_ERROR_TRANSIENT: - r.logger.Println("Aurora replied with Transient error code, retrying") - continue + // If the thrift call succeeded, stop retrying + case aurora.ResponseCode_OK: + return resp, nil - // Failure scenarios, these indicate a bad payload or a bad config. Stop retrying. - case aurora.ResponseCode_INVALID_REQUEST: - case aurora.ResponseCode_ERROR: - case aurora.ResponseCode_AUTH_FAILED: - case aurora.ResponseCode_JOB_UPDATING_ERROR: - return nil, errors.New(response.CombineMessage(resp)) + // If the response code is transient, continue retrying + case aurora.ResponseCode_ERROR_TRANSIENT: + r.logger.Println("Aurora replied with Transient error code, retrying") + continue - // The only case that should fall down to here is a WARNING response code. - // It is currently not used as a response in the scheduler so it is unknown how to handle it. - default: - return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String()) + // Failure scenarios, these indicate a bad payload or a bad config. Stop retrying. + case aurora.ResponseCode_INVALID_REQUEST: + case aurora.ResponseCode_ERROR: + case aurora.ResponseCode_AUTH_FAILED: + case aurora.ResponseCode_JOB_UPDATING_ERROR: + r.logger.Println("Terminal bad reply from Aurora, won't retry") + return nil, errors.New(response.CombineMessage(resp)) + + // The only case that should fall down to here is a WARNING response code. + // It is currently not used as a response in the scheduler so it is unknown how to handle it. + default: + r.logger.DebugPrintf("unhandled response code %v received from Aurora\n", responseCode) + return nil, errors.Errorf("unhandled response code from Aurora %v\n", responseCode.String()) + } } } + r.logger.DebugPrintf("it took %v retries to complete this operation\n", curStep) + if curStep > 1 { r.config.logger.Printf("retried this thrift call %d time(s)", curStep) } From d03a7b61e43549e6aca0c46c0e98715e9994c101 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Wed, 24 Jan 2018 16:54:24 -0800 Subject: [PATCH 2/6] Removing napping from the TODO list as go's native http libraries are good enough. --- README.md | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 15d0031..dc8fc10 100644 --- a/README.md +++ b/README.md @@ -12,8 +12,10 @@ library has been tested. * [Using the sample client](docs/using-the-sample-client.md) * [Leveraging the library](docs/leveraging-the-library.md) -## To Do -* Create or import a custom transport that uses https://github.com/jmcvetta/napping to improve efficiency +## Projects using gorealis + +* [australis](https://github.com/rdelval/australis) ## Contributions -Contributions are always welcome. Please raise an issue so that the contribution may be discussed before it's made. \ No newline at end of file +Contributions are always welcome. Please raise an issue to discuss a contribution before it is made. + From e6b204b9dab73c2c65d7f13e1f60df021dc0e0b8 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Sun, 13 May 2018 18:34:34 -0700 Subject: [PATCH 3/6] Removing unnecessary space. --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index dc8fc10..76292af 100644 --- a/README.md +++ b/README.md @@ -17,5 +17,5 @@ library has been tested. * [australis](https://github.com/rdelval/australis) ## Contributions -Contributions are always welcome. Please raise an issue to discuss a contribution before it is made. +Contributions are always welcome. Please raise an issue to discuss a contribution before it is made. From 4f6a5e97417670590cd4747244891cafeaae6a40 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Tue, 22 May 2018 16:56:42 -0700 Subject: [PATCH 4/6] Adding SSL flags to sample client. --- examples/client.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/examples/client.go b/examples/client.go index 7162408..b34dcd2 100644 --- a/examples/client.go +++ b/examples/client.go @@ -31,6 +31,8 @@ import ( ) var cmd, executor, url, clustersConfig, clusterName, updateId, username, password, zkUrl, hostList, role string +var caCertsPath string +var clientKey, clientCert string var CONNECTION_TIMEOUT = 20000 @@ -46,6 +48,9 @@ func init() { flag.StringVar(&zkUrl, "zkurl", "", "zookeeper url") flag.StringVar(&hostList, "hostList", "", "Comma separated list of hosts to operate on") flag.StringVar(&role, "role", "", "owner role to use") + flag.StringVar(&caCertsPath, "caCertsPath", "", "Path to CA certs on local machine.") + flag.StringVar(&clientCert, "clientCert", "", "Client certificate to use to connect to Aurora.") + flag.StringVar(&clientKey, "clientKey", "", "Client key to use to connect to Aurora.") flag.Parse() @@ -102,6 +107,10 @@ func main() { clientOptions = append(clientOptions, realis.SchedulerUrl(url)) } + if clientKey != "" || clientCert != "" || caCertsPath != "" { + clientOptions = append(clientOptions, realis.Certspath(caCertsPath), realis.ClientCerts(clientKey, clientCert)) + } + r, err = realis.NewRealisClient(clientOptions...) if err != nil { fmt.Println(err) From 5d120292272a521d44203f8e38dc135dd175eff4 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Tue, 22 May 2018 17:00:30 -0700 Subject: [PATCH 5/6] Update PR template to hide away instructions on submission. --- .github/PULL_REQUEST_TEMPLATE.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md index 0445c51..797ed27 100644 --- a/.github/PULL_REQUEST_TEMPLATE.md +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -1,11 +1,14 @@ + + * Have you run goformat on the project before submitting? * Have you run go test on the project before submitting? Do all tests pass? From 8ca953f9252d932fa316a895fda6e0213f1e5b78 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Tue, 29 May 2018 12:46:16 -0700 Subject: [PATCH 6/6] Bug fix: using AND in place of OR or SSL flags. (#64) * Bug fix: using AND in place of OR or SSL flags. * Separating CA certificate path and client key and cert addition to options. --- examples/client.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/examples/client.go b/examples/client.go index b34dcd2..b2a57e5 100644 --- a/examples/client.go +++ b/examples/client.go @@ -50,7 +50,7 @@ func init() { flag.StringVar(&role, "role", "", "owner role to use") flag.StringVar(&caCertsPath, "caCertsPath", "", "Path to CA certs on local machine.") flag.StringVar(&clientCert, "clientCert", "", "Client certificate to use to connect to Aurora.") - flag.StringVar(&clientKey, "clientKey", "", "Client key to use to connect to Aurora.") + flag.StringVar(&clientKey, "clientKey", "", "Client private key to use to connect to Aurora.") flag.Parse() @@ -107,8 +107,12 @@ func main() { clientOptions = append(clientOptions, realis.SchedulerUrl(url)) } - if clientKey != "" || clientCert != "" || caCertsPath != "" { - clientOptions = append(clientOptions, realis.Certspath(caCertsPath), realis.ClientCerts(clientKey, clientCert)) + if caCertsPath != "" { + clientOptions = append(clientOptions, realis.Certspath(caCertsPath)) + } + + if clientKey != "" && clientCert != "" { + clientOptions = append(clientOptions, realis.ClientCerts(clientKey, clientCert)) } r, err = realis.NewRealisClient(clientOptions...)