From 7152f568fed9a6f342fa5dd165b140827aa07ab0 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Mon, 5 Mar 2018 23:23:16 -0800 Subject: [PATCH 01/15] Fixing possible race condition when passing backoff around as a pointer. --- realis.go | 12 ++++++------ realis_e2e_test.go | 2 +- retry.go | 7 ++++++- 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/realis.go b/realis.go index dde0e6d..e009343 100644 --- a/realis.go +++ b/realis.go @@ -90,7 +90,7 @@ type RealisConfig struct { timeoutms int binTransport, jsonTransport bool cluster *Cluster - backoff *Backoff + backoff Backoff transport thrift.TTransport protoFactory thrift.TProtocolFactory logger Logger @@ -141,7 +141,7 @@ func ZKUrl(url string) ClientOption { } } -func Retries(backoff *Backoff) ClientOption { +func Retries(backoff Backoff) ClientOption { return func(config *RealisConfig) { config.backoff = backoff } @@ -159,7 +159,7 @@ func ThriftBinary() ClientOption { } } -func BackOff(b *Backoff) ClientOption { +func BackOff(b Backoff) ClientOption { return func(config *RealisConfig) { config.backoff = b } @@ -220,7 +220,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { // Default configs config.timeoutms = 10000 - config.backoff = &defaultBackoff + config.backoff = defaultBackoff config.logger = NoopLogger{} config.options = options @@ -570,14 +570,14 @@ func (r *realisClient) CreateService(auroraJob Job, settings *aurora.JobUpdateSe resp, err := r.StartJobUpdate(update, "") if err != nil { - return resp, nil, errors.Wrap(err, "unable to create service") + return nil, nil, errors.Wrap(err, "unable to create service") } if resp != nil && resp.GetResult_() != nil { return resp, resp.GetResult_().GetStartJobUpdateResult_(), nil } - return resp, nil, errors.New("results object is nil") + return nil, nil, errors.New("results object is nil") } func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) { diff --git a/realis_e2e_test.go b/realis_e2e_test.go index ae61cf7..6c81a50 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -61,7 +61,7 @@ func TestMain(m *testing.M) { } func TestNonExistentEndpoint(t *testing.T) { - backoff := &realis.Backoff{ // Reduce penalties for this test to make it quick + backoff := realis.Backoff{ // Reduce penalties for this test to make it quick Steps: 5, Duration: 1 * time.Second, Factor: 1.0, diff --git a/retry.go b/retry.go index 0e783f3..b6df2c4 100644 --- a/retry.go +++ b/retry.go @@ -134,7 +134,7 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro adjusted = Jitter(duration, backoff.Jitter) } - r.logger.Printf("A retriable error occurred during thrift call, backing off for %v before retrying\n", adjusted) + r.logger.Printf("A retriable error occurred during thrift call, backing off for %v before retry %v\n", adjusted, curStep+1) time.Sleep(adjusted) duration = time.Duration(float64(duration) * backoff.Factor) @@ -146,6 +146,7 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro func() { r.lock.Lock() defer r.lock.Unlock() + r.logger.Println("Beginning Aurora Thrift Call") resp, clientErr = thriftCall() }() @@ -169,6 +170,9 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro return nil, errors.New("Response from aurora is nil") } + // Log response code for debugging + r.logger.Printf("Aurora replied with code: %v\n", resp.GetResponseCode().String()) + // Check Response Code from thrift and make a decision to continue retrying or not. switch responseCode := resp.GetResponseCode(); responseCode { @@ -191,6 +195,7 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro // The only case that should fall down to here is a WARNING response code. // It is currently not used as a response in the scheduler so it is unknown how to handle it. default: + r.logger.Println("unhandled response code received from Aurora") return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String()) } From 69442d5957dcc9ed485864bf9462b7b6e1c58bc6 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Tue, 6 Mar 2018 12:43:09 -0800 Subject: [PATCH 02/15] Adding a debug logger that is turned off by default. Info logger is enabled by default but prints out less information. --- examples/client.go | 14 ++++---- monitors.go | 8 ++--- realis.go | 84 +++++++++++++++++++++++++++++++++++++++------- realis_e2e_test.go | 5 ++- retry.go | 78 +++++++++++++++++++++++------------------- 5 files changed, 129 insertions(+), 60 deletions(-) diff --git a/examples/client.go b/examples/client.go index e1e3411..f51026c 100644 --- a/examples/client.go +++ b/examples/client.go @@ -85,13 +85,13 @@ func main() { realis.BasicAuth(username, password), realis.ThriftJSON(), realis.TimeoutMS(CONNECTION_TIMEOUT), - realis.BackOff(&realis.Backoff{ + realis.BackOff(realis.Backoff{ Steps: 2, Duration: 10 * time.Second, Factor: 2.0, Jitter: 0.1, }), - realis.SetLogger(log.New(os.Stdout, "realis-debug: ", log.Ldate)), + realis.DebugLogger(log.New(os.Stdout, "realis-debug: ", log.Ltime|log.LUTC|log.Ldate)), } //check if zkUrl is available. @@ -432,8 +432,8 @@ func main() { case "pauseJobUpdate": resp, err := r.PauseJobUpdate(&aurora.JobUpdateKey{ Job: job.JobKey(), - ID: updateId, - }, "") + ID: updateId, + }, "") if err != nil { fmt.Println(err) @@ -443,7 +443,7 @@ func main() { case "resumeJobUpdate": resp, err := r.ResumeJobUpdate(&aurora.JobUpdateKey{ Job: job.JobKey(), - ID: updateId, + ID: updateId, }, "") if err != nil { @@ -454,8 +454,8 @@ func main() { case "pulseJobUpdate": resp, err := r.PulseJobUpdate(&aurora.JobUpdateKey{ Job: job.JobKey(), - ID: updateId, - }) + ID: updateId, + }) if err != nil { fmt.Println(err) } diff --git a/monitors.go b/monitors.go index 1799658..3587f66 100644 --- a/monitors.go +++ b/monitors.go @@ -58,7 +58,7 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout updateDetail := response.JobUpdateDetails(respDetail) if len(updateDetail) == 0 { - m.Client.RealisConfig().logger.Println("No update found") + m.Client.RealisConfig().infoLogger.Println("No update found") return false, errors.New("No update found for " + updateKey.String()) } status := updateDetail[0].Update.Summary.State.Status @@ -69,13 +69,13 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout // if we encounter an inactive state and it is not at rolled forward, update failed switch status { case aurora.JobUpdateStatus_ROLLED_FORWARD: - m.Client.RealisConfig().logger.Println("Update succeded") + m.Client.RealisConfig().infoLogger.Println("Update succeded") return true, nil case aurora.JobUpdateStatus_FAILED: - m.Client.RealisConfig().logger.Println("Update failed") + m.Client.RealisConfig().infoLogger.Println("Update failed") return false, errors.New(UpdateFailed) case aurora.JobUpdateStatus_ROLLED_BACK: - m.Client.RealisConfig().logger.Println("rolled back") + m.Client.RealisConfig().infoLogger.Println("rolled back") return false, errors.New(RolledBack) default: return false, nil diff --git a/realis.go b/realis.go index e009343..befb5c1 100644 --- a/realis.go +++ b/realis.go @@ -21,8 +21,10 @@ import ( "encoding/base64" "fmt" "io/ioutil" + "log" "net/http" "net/http/cookiejar" + "os" "path/filepath" "time" @@ -80,7 +82,8 @@ type realisClient struct { client *aurora.AuroraSchedulerManagerClient readonlyClient *aurora.ReadOnlySchedulerClient adminClient *aurora.AuroraAdminClient - logger Logger + infoLogger Logger + debugLogger Logger lock sync.Mutex } @@ -93,7 +96,8 @@ type RealisConfig struct { backoff Backoff transport thrift.TTransport protoFactory thrift.TProtocolFactory - logger Logger + infoLogger Logger + debugLogger Logger InsecureSkipVerify bool certspath string clientkey, clientcert string @@ -186,7 +190,13 @@ func ClientCerts(clientKey, clientCert string) ClientOption { // Using the word set to avoid name collision with Interface func SetLogger(l Logger) ClientOption { return func(config *RealisConfig) { - config.logger = l + config.infoLogger = l + } +} + +func DebugLogger(l Logger) ClientOption { + return func(config *RealisConfig) { + config.debugLogger = l } } @@ -221,7 +231,8 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { // Default configs config.timeoutms = 10000 config.backoff = defaultBackoff - config.logger = NoopLogger{} + config.infoLogger = log.New(os.Stdout, "realis-info: ", log.Ltime|log.Ldate|log.LUTC) + config.debugLogger = NoopLogger{} config.options = options // Override default configs where necessary @@ -229,7 +240,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { opt(config) } - config.logger.Println("Number of options applied to config: ", len(options)) + config.infoLogger.Println("Number of options applied to config: ", len(options)) //Set default Transport to JSON if needed. if !config.jsonTransport && !config.binTransport { @@ -247,10 +258,10 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { if err != nil { return nil, NewTemporaryError(errors.Wrap(err, "LeaderFromZK error")) } - config.logger.Println("Scheduler URL from ZK: ", url) + config.infoLogger.Println("Scheduler URL from ZK: ", url) } else if config.url != "" { url = config.url - config.logger.Println("Scheduler URL: ", url) + config.infoLogger.Println("Scheduler URL: ", url) } else { return nil, errors.New("Incomplete Options -- url or cluster required") } @@ -272,7 +283,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault() } - config.logger.Printf("gorealis config url: %+v\n", url) + config.infoLogger.Printf("gorealis config url: %+v\n", url) //Basic Authentication. if config.username != "" && config.password != "" { @@ -284,7 +295,8 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { client: aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory), readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory), adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory), - logger: config.logger}, nil + infoLogger: config.infoLogger, + debugLogger: config.debugLogger}, nil } func GetDefaultClusterFromZKUrl(zkurl string) *Cluster { @@ -330,7 +342,7 @@ func defaultTTransport(urlstr string, timeoutms int, config *RealisConfig) (thri if config.certspath != "" { rootCAs, err := GetCerts(config.certspath) if err != nil { - config.logger.Println("error occured couldn't fetch certs") + config.infoLogger.Println("error occured couldn't fetch certs") return nil, err } tlsConfig.RootCAs = rootCAs @@ -344,7 +356,7 @@ func defaultTTransport(urlstr string, timeoutms int, config *RealisConfig) (thri if config.clientkey != "" && config.clientcert != "" { cert, err := tls.LoadX509KeyPair(config.clientcert, config.clientkey) if err != nil { - config.logger.Println("error occured loading client certs and keys") + config.infoLogger.Println("error occured loading client certs and keys") return nil, err } tlsConfig.Certificates = []tls.Certificate{cert} @@ -419,7 +431,7 @@ func basicAuth(username, password string) string { func (r *realisClient) ReestablishConn() error { // Close existing connection - r.logger.Println("Re-establishing Connection to Aurora") + r.infoLogger.Println("Re-establishing Connection to Aurora") r.Close() // Recreate connection from scratch using original options @@ -436,7 +448,7 @@ func (r *realisClient) ReestablishConn() error { r.client = newClient.client r.readonlyClient = newClient.readonlyClient r.adminClient = newClient.adminClient - r.logger = newClient.logger + r.infoLogger = newClient.infoLogger } return nil @@ -458,6 +470,8 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche Statuses: states, } + r.debugLogger.Printf("GetTasksWithoutConfig sThrift Payload: %v\n", *taskQ) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.GetTasksWithoutConfigs(taskQ) }) @@ -478,6 +492,8 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche } func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error) { + r.debugLogger.Printf("GetJobUpdateSummaries Thrift Payload: %v\n", *jobUpdateQuery) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.readonlyClient.GetJobUpdateSummaries(jobUpdateQuery) }) @@ -510,6 +526,7 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe // Kill specific instances of a job. func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) { + r.debugLogger.Printf("KillTasks Thrift Payload: %v %v\n", *key, instances) instanceIds := make(map[int32]bool) @@ -534,6 +551,8 @@ func (r *realisClient) RealisConfig() *RealisConfig { // Sends a kill message to the scheduler for all active tasks under a job. func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { + r.debugLogger.Printf("KillTasks Thrift Payload: %v %v\n", *key, nil) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { // Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards return r.client.KillTasks(key, nil, "") @@ -551,6 +570,8 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { // Use this API to create ad-hoc jobs. func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { + r.debugLogger.Printf("CreateJob Thrift Payload: %v\n", *auroraJob.JobConfig()) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.CreateJob(auroraJob.JobConfig()) }) @@ -581,6 +602,7 @@ func (r *realisClient) CreateService(auroraJob Job, settings *aurora.JobUpdateSe } func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) { + r.debugLogger.Printf("ScheduleCronJob Thrift Payload: %v\n", *auroraJob.JobConfig()) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.ScheduleCronJob(auroraJob.JobConfig()) @@ -594,6 +616,8 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) { + r.debugLogger.Printf("DescheduleCronJob Thrift Payload: %v\n", *key) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.DescheduleCronJob(key) }) @@ -608,6 +632,8 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error) { + r.debugLogger.Printf("StartCronJob Thrift Payload: %v\n", *key) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.StartCronJob(key) }) @@ -621,6 +647,8 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error // Restarts specific instances specified func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) { + r.debugLogger.Printf("RestartShards Thrift Payload: %v %v\n", *key, instances) + instanceIds := make(map[int32]bool) for _, instId := range instances { @@ -645,6 +673,8 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) return nil, errors.Wrap(err1, "Could not retrieve relevant task instance IDs") } + r.debugLogger.Printf("RestartShards Thrift Payload: %v %v\n", *key, instanceIds) + if len(instanceIds) > 0 { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.RestartShards(key, instanceIds) @@ -663,6 +693,8 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) // Update all tasks under a job configuration. Currently gorealis doesn't support for canary deployments. func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) { + r.debugLogger.Printf("StartJobUpdate Thrift Payload: %v %v\n", *updateJob, message) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.StartJobUpdate(updateJob.req, message) }) @@ -676,6 +708,8 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au // Abort Job Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI. func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) { + r.debugLogger.Printf("AbortJobUpdate Thrift Payload: %v %v\n", updateKey, message) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.AbortJobUpdate(&updateKey, message) }) @@ -689,6 +723,8 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str //Pause Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) { + r.debugLogger.Printf("PauseJobUpdate Thrift Payload: %v %v\n", updateKey, message) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.PauseJobUpdate(updateKey, message) }) @@ -703,6 +739,8 @@ func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st //Resume Paused Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) { + r.debugLogger.Printf("ResumeJobUpdate Thrift Payload: %v %v\n", updateKey, message) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.ResumeJobUpdate(updateKey, message) }) @@ -717,6 +755,8 @@ func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s //Pulse Job Update on Aurora. UpdateID is returned from StartJobUpdate or the Aurora web UI. func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.Response, error) { + r.debugLogger.Printf("PulseJobUpdate Thrift Payload: %v\n", updateKey) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.PulseJobUpdate(updateKey) }) @@ -732,6 +772,8 @@ func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R // instance to scale up. func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) { + r.debugLogger.Printf("AddInstances Thrift Payload: %v %v\n", instKey, count) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.AddInstances(&instKey, count) }) @@ -768,6 +810,8 @@ func (r *realisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora // Get information about task including a fully hydrated task configuration object func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) { + r.debugLogger.Printf("GetTasksStatus Thrift Payload: %v\n", *query) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.GetTasksStatus(query) }) @@ -782,6 +826,8 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.S // Get information about task including without a task configuration object func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) { + r.debugLogger.Printf("GetTasksWithoutConfigs Thrift Payload: %v\n", *query) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.GetTasksWithoutConfigs(query) }) @@ -808,6 +854,8 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task Statuses: aurora.ACTIVE_STATES, } + r.debugLogger.Printf("GetTasksStatus Thrift Payload: %v\n", *taskQ) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.GetTasksStatus(taskQ) }) @@ -832,6 +880,8 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aurora.Response, error) { + r.debugLogger.Printf("GetJobUpdateDetails Thrift Payload: %v\n", updateQuery) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.GetJobUpdateDetails(&updateQuery) }) @@ -845,6 +895,8 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) { + r.debugLogger.Printf("RollbackJobUpdate Thrift Payload: %v %v\n", key, message) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.RollbackJobUpdate(&key, message) }) @@ -872,6 +924,8 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr drainList.HostNames[host] = true } + r.debugLogger.Printf("DrainHosts Thrift Payload: %v\n", drainList) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.adminClient.DrainHosts(drainList) }) @@ -901,6 +955,8 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror hostList.HostNames[host] = true } + r.debugLogger.Printf("EndMaintenance Thrift Payload: %v\n", hostList) + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.adminClient.EndMaintenance(hostList) }) @@ -930,6 +986,8 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au hostList.HostNames[host] = true } + r.debugLogger.Printf("MaintenanceStatus Thrift Payload: %v\n", hostList) + // Make thrift call. If we encounter an error sending the call, attempt to reconnect // and continue trying to resend command until we run out of retries. resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { diff --git a/realis_e2e_test.go b/realis_e2e_test.go index 6c81a50..e6d71e4 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -17,6 +17,7 @@ package realis_test import ( "fmt" "io/ioutil" + "log" "os" "testing" "time" @@ -40,7 +41,9 @@ func TestMain(m *testing.M) { // New configuration to connect to Vagrant image r, err = realis.NewRealisClient(realis.SchedulerUrl("http://192.168.33.7:8081"), realis.BasicAuth("aurora", "secret"), - realis.TimeoutMS(20000)) + realis.TimeoutMS(20000), + realis.DebugLogger(log.New(os.Stdout, "realis-debug: ", log.Ltime|log.LUTC|log.Ldate))) + if err != nil { fmt.Println("Please run vagrant box before running test suite") os.Exit(1) diff --git a/retry.go b/retry.go index b6df2c4..d2ceb6c 100644 --- a/retry.go +++ b/retry.go @@ -134,7 +134,7 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro adjusted = Jitter(duration, backoff.Jitter) } - r.logger.Printf("A retriable error occurred during thrift call, backing off for %v before retry %v\n", adjusted, curStep+1) + r.infoLogger.Printf("A retriable error occurred during thrift call, backing off for %v before retry %v\n", adjusted, curStep) time.Sleep(adjusted) duration = time.Duration(float64(duration) * backoff.Factor) @@ -146,8 +146,12 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro func() { r.lock.Lock() defer r.lock.Unlock() - r.logger.Println("Beginning Aurora Thrift Call") + + r.debugLogger.Println("Beginning Aurora Thrift Call") + resp, clientErr = thriftCall() + + r.debugLogger.Printf("Aurora Thrift Call ended resp: %v clientErr: %v\n", resp, clientErr) }() // Check if our thrift call is returning an error. This is a retriable event as we don't know @@ -155,54 +159,58 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro if clientErr != nil { // Print out the error to the user - r.logger.Println(clientErr) - - r.ReestablishConn() + r.infoLogger.Printf("Client Error: %v\n", clientErr) // In the future, reestablish connection should be able to check if it is actually possible // to make a thrift call to Aurora. For now, a reconnect should always lead to a retry. - continue - } + r.ReestablishConn() - // If there was no client error, but the response is nil, something went wrong. - // Ideally, we'll never encounter this but we're placing a safeguard here. - if resp == nil { - return nil, errors.New("Response from aurora is nil") - } + } else { - // Log response code for debugging - r.logger.Printf("Aurora replied with code: %v\n", resp.GetResponseCode().String()) + // If there was no client error, but the response is nil, something went wrong. + // Ideally, we'll never encounter this but we're placing a safeguard here. + if resp == nil { + return nil, errors.New("Response from aurora is nil") + } - // Check Response Code from thrift and make a decision to continue retrying or not. - switch responseCode := resp.GetResponseCode(); responseCode { + // Log response code for debugging + r.debugLogger.Printf("Aurora replied with code: %v\n", resp.GetResponseCode().String()) - // If the thrift call succeeded, stop retrying - case aurora.ResponseCode_OK: - return resp, nil + // Check Response Code from thrift and make a decision to continue retrying or not. + switch responseCode := resp.GetResponseCode(); responseCode { - // If the response code is transient, continue retrying - case aurora.ResponseCode_ERROR_TRANSIENT: - r.logger.Println("Aurora replied with Transient error code, retrying") - continue + // If the thrift call succeeded, stop retrying + case aurora.ResponseCode_OK: + r.debugLogger.Println("OK reply from Aurora") + return resp, nil - // Failure scenarios, these indicate a bad payload or a bad config. Stop retrying. - case aurora.ResponseCode_INVALID_REQUEST: - case aurora.ResponseCode_ERROR: - case aurora.ResponseCode_AUTH_FAILED: - case aurora.ResponseCode_JOB_UPDATING_ERROR: - return nil, errors.New(response.CombineMessage(resp)) + // If the response code is transient, continue retrying + case aurora.ResponseCode_ERROR_TRANSIENT: + r.infoLogger.Println("Aurora replied with Transient error code, retrying") + continue - // The only case that should fall down to here is a WARNING response code. - // It is currently not used as a response in the scheduler so it is unknown how to handle it. - default: - r.logger.Println("unhandled response code received from Aurora") - return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String()) + // Failure scenarios, these indicate a bad payload or a bad config. Stop retrying. + case aurora.ResponseCode_INVALID_REQUEST: + case aurora.ResponseCode_ERROR: + case aurora.ResponseCode_AUTH_FAILED: + case aurora.ResponseCode_JOB_UPDATING_ERROR: + r.debugLogger.Println("Terminal bad reply from Aurora, won't retry") + return nil, errors.New(response.CombineMessage(resp)) + + // The only case that should fall down to here is a WARNING response code. + // It is currently not used as a response in the scheduler so it is unknown how to handle it. + default: + r.debugLogger.Printf("unhandled response code %v received from Aurora\n", responseCode) + return nil, errors.Errorf("unhandled response code from Aurora %v\n", responseCode.String()) + } } } + r.debugLogger.Printf("it took %v retries to complete this operation\n", curStep) + if curStep > 1 { - r.config.logger.Printf("retried this thrift call %d time(s)", curStep) + r.config.infoLogger.Printf("retried this thrift call %d time(s)", curStep) } // Provide more information to the user wherever possible. From 8d5a2d2414b74994419ac703edd6314f7aa078bb Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Tue, 6 Mar 2018 15:00:29 -0800 Subject: [PATCH 03/15] Removing OK Aurora acknowledgment. --- retry.go | 1 - 1 file changed, 1 deletion(-) diff --git a/retry.go b/retry.go index d2ceb6c..6cb2c69 100644 --- a/retry.go +++ b/retry.go @@ -181,7 +181,6 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro // If the thrift call succeeded, stop retrying case aurora.ResponseCode_OK: - r.debugLogger.Println("OK reply from Aurora") return resp, nil // If the response code is transient, continue retrying From 65fcb598795e226d8a70a5040514683d6f4a3f52 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Tue, 6 Mar 2018 15:36:39 -0800 Subject: [PATCH 04/15] Making Mutex a pointer so that there's no chance it can accidentally be copied. --- realis.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/realis.go b/realis.go index befb5c1..87ca6e7 100644 --- a/realis.go +++ b/realis.go @@ -84,7 +84,7 @@ type realisClient struct { adminClient *aurora.AuroraAdminClient infoLogger Logger debugLogger Logger - lock sync.Mutex + lock *sync.Mutex } type RealisConfig struct { @@ -96,8 +96,7 @@ type RealisConfig struct { backoff Backoff transport thrift.TTransport protoFactory thrift.TProtocolFactory - infoLogger Logger - debugLogger Logger + infoLogger, debugLogger Logger InsecureSkipVerify bool certspath string clientkey, clientcert string @@ -296,7 +295,8 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory), adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory), infoLogger: config.infoLogger, - debugLogger: config.debugLogger}, nil + debugLogger: config.debugLogger, + lock: &sync.Mutex{}}, nil } func GetDefaultClusterFromZKUrl(zkurl string) *Cluster { From bfd9e985c2f6ed7b9f45322368034e1320a2bb11 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Tue, 6 Mar 2018 16:25:04 -0800 Subject: [PATCH 05/15] Changing %v to %+v for composite structs. Removing a repetitive statement for the Aurora return code. --- monitors.go | 2 +- realis.go | 42 +++++++++++++++++++++--------------------- retry.go | 3 --- 3 files changed, 22 insertions(+), 25 deletions(-) diff --git a/monitors.go b/monitors.go index 3587f66..c6d8127 100644 --- a/monitors.go +++ b/monitors.go @@ -69,7 +69,7 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout // if we encounter an inactive state and it is not at rolled forward, update failed switch status { case aurora.JobUpdateStatus_ROLLED_FORWARD: - m.Client.RealisConfig().infoLogger.Println("Update succeded") + m.Client.RealisConfig().infoLogger.Println("Update succeeded") return true, nil case aurora.JobUpdateStatus_FAILED: m.Client.RealisConfig().infoLogger.Println("Update failed") diff --git a/realis.go b/realis.go index 87ca6e7..0eb893e 100644 --- a/realis.go +++ b/realis.go @@ -470,7 +470,7 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche Statuses: states, } - r.debugLogger.Printf("GetTasksWithoutConfig sThrift Payload: %v\n", *taskQ) + r.debugLogger.Printf("GetTasksWithoutConfigs Thrift Payload: %+v\n", taskQ) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.GetTasksWithoutConfigs(taskQ) @@ -492,7 +492,7 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche } func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error) { - r.debugLogger.Printf("GetJobUpdateSummaries Thrift Payload: %v\n", *jobUpdateQuery) + r.debugLogger.Printf("GetJobUpdateSummaries Thrift Payload: %+v\n", jobUpdateQuery) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.readonlyClient.GetJobUpdateSummaries(jobUpdateQuery) @@ -526,7 +526,7 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe // Kill specific instances of a job. func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) { - r.debugLogger.Printf("KillTasks Thrift Payload: %v %v\n", *key, instances) + r.debugLogger.Printf("KillTasks Thrift Payload: %+v %v\n", key, instances) instanceIds := make(map[int32]bool) @@ -551,7 +551,7 @@ func (r *realisClient) RealisConfig() *RealisConfig { // Sends a kill message to the scheduler for all active tasks under a job. func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { - r.debugLogger.Printf("KillTasks Thrift Payload: %v %v\n", *key, nil) + r.debugLogger.Printf("KillTasks Thrift Payload: %+v\n", key) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { // Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards @@ -570,7 +570,7 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { // Use this API to create ad-hoc jobs. func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { - r.debugLogger.Printf("CreateJob Thrift Payload: %v\n", *auroraJob.JobConfig()) + r.debugLogger.Printf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig()) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.CreateJob(auroraJob.JobConfig()) @@ -602,7 +602,7 @@ func (r *realisClient) CreateService(auroraJob Job, settings *aurora.JobUpdateSe } func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) { - r.debugLogger.Printf("ScheduleCronJob Thrift Payload: %v\n", *auroraJob.JobConfig()) + r.debugLogger.Printf("ScheduleCronJob Thrift Payload: %+v\n", auroraJob.JobConfig()) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.ScheduleCronJob(auroraJob.JobConfig()) @@ -616,7 +616,7 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) { - r.debugLogger.Printf("DescheduleCronJob Thrift Payload: %v\n", *key) + r.debugLogger.Printf("DescheduleCronJob Thrift Payload: %+v\n", key) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.DescheduleCronJob(key) @@ -632,7 +632,7 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error) { - r.debugLogger.Printf("StartCronJob Thrift Payload: %v\n", *key) + r.debugLogger.Printf("StartCronJob Thrift Payload: %+v\n", key) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.StartCronJob(key) @@ -647,7 +647,7 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error // Restarts specific instances specified func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) { - r.debugLogger.Printf("RestartShards Thrift Payload: %v %v\n", *key, instances) + r.debugLogger.Printf("RestartShards Thrift Payload: %+v %v\n", key, instances) instanceIds := make(map[int32]bool) @@ -673,7 +673,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) return nil, errors.Wrap(err1, "Could not retrieve relevant task instance IDs") } - r.debugLogger.Printf("RestartShards Thrift Payload: %v %v\n", *key, instanceIds) + r.debugLogger.Printf("RestartShards Thrift Payload: %+v %v\n", key, instanceIds) if len(instanceIds) > 0 { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { @@ -693,7 +693,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) // Update all tasks under a job configuration. Currently gorealis doesn't support for canary deployments. func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) { - r.debugLogger.Printf("StartJobUpdate Thrift Payload: %v %v\n", *updateJob, message) + r.debugLogger.Printf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.StartJobUpdate(updateJob.req, message) @@ -708,7 +708,7 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au // Abort Job Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI. func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) { - r.debugLogger.Printf("AbortJobUpdate Thrift Payload: %v %v\n", updateKey, message) + r.debugLogger.Printf("AbortJobUpdate Thrift Payload: %+v %v\n", updateKey, message) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.AbortJobUpdate(&updateKey, message) @@ -723,7 +723,7 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str //Pause Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) { - r.debugLogger.Printf("PauseJobUpdate Thrift Payload: %v %v\n", updateKey, message) + r.debugLogger.Printf("PauseJobUpdate Thrift Payload: %+v %v\n", updateKey, message) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.PauseJobUpdate(updateKey, message) @@ -739,7 +739,7 @@ func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st //Resume Paused Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) { - r.debugLogger.Printf("ResumeJobUpdate Thrift Payload: %v %v\n", updateKey, message) + r.debugLogger.Printf("ResumeJobUpdate Thrift Payload: %+v %v\n", updateKey, message) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.ResumeJobUpdate(updateKey, message) @@ -755,7 +755,7 @@ func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s //Pulse Job Update on Aurora. UpdateID is returned from StartJobUpdate or the Aurora web UI. func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.Response, error) { - r.debugLogger.Printf("PulseJobUpdate Thrift Payload: %v\n", updateKey) + r.debugLogger.Printf("PulseJobUpdate Thrift Payload: %+v\n", updateKey) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.PulseJobUpdate(updateKey) @@ -772,7 +772,7 @@ func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R // instance to scale up. func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) { - r.debugLogger.Printf("AddInstances Thrift Payload: %v %v\n", instKey, count) + r.debugLogger.Printf("AddInstances Thrift Payload: %+v %v\n", instKey, count) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.AddInstances(&instKey, count) @@ -810,7 +810,7 @@ func (r *realisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora // Get information about task including a fully hydrated task configuration object func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) { - r.debugLogger.Printf("GetTasksStatus Thrift Payload: %v\n", *query) + r.debugLogger.Printf("GetTasksStatus Thrift Payload: %+v\n", query) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.GetTasksStatus(query) @@ -826,7 +826,7 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.S // Get information about task including without a task configuration object func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) { - r.debugLogger.Printf("GetTasksWithoutConfigs Thrift Payload: %v\n", *query) + r.debugLogger.Printf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.GetTasksWithoutConfigs(query) @@ -854,7 +854,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task Statuses: aurora.ACTIVE_STATES, } - r.debugLogger.Printf("GetTasksStatus Thrift Payload: %v\n", *taskQ) + r.debugLogger.Printf("GetTasksStatus Thrift Payload: %+v\n", taskQ) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.GetTasksStatus(taskQ) @@ -880,7 +880,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aurora.Response, error) { - r.debugLogger.Printf("GetJobUpdateDetails Thrift Payload: %v\n", updateQuery) + r.debugLogger.Printf("GetJobUpdateDetails Thrift Payload: %+v\n", updateQuery) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.GetJobUpdateDetails(&updateQuery) @@ -895,7 +895,7 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) { - r.debugLogger.Printf("RollbackJobUpdate Thrift Payload: %v %v\n", key, message) + r.debugLogger.Printf("RollbackJobUpdate Thrift Payload: %+v %v\n", key, message) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.RollbackJobUpdate(&key, message) diff --git a/retry.go b/retry.go index 6cb2c69..8b4ec6c 100644 --- a/retry.go +++ b/retry.go @@ -173,9 +173,6 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro return nil, errors.New("Response from aurora is nil") } - // Log response code for debugging - r.debugLogger.Printf("Aurora replied with code: %v\n", resp.GetResponseCode().String()) - // Check Response Code from thrift and make a decision to continue retrying or not. switch responseCode := resp.GetResponseCode(); responseCode { From 3c1c1f79b8033a1617dadb8330ee0170f2d80d06 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Tue, 6 Mar 2018 17:12:11 -0800 Subject: [PATCH 06/15] Removing another superflous debug statement. --- retry.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/retry.go b/retry.go index 8b4ec6c..a1e2e3c 100644 --- a/retry.go +++ b/retry.go @@ -147,8 +147,6 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro r.lock.Lock() defer r.lock.Unlock() - r.debugLogger.Println("Beginning Aurora Thrift Call") - resp, clientErr = thriftCall() r.debugLogger.Printf("Aurora Thrift Call ended resp: %v clientErr: %v\n", resp, clientErr) From 256ec2ea47249cb166efc6a94012307b65ee2006 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Tue, 10 Apr 2018 14:19:10 -0700 Subject: [PATCH 07/15] Removing a leftover helper function from before we changed how we configured the client. --- realis.go | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/realis.go b/realis.go index 0eb893e..7609e7f 100644 --- a/realis.go +++ b/realis.go @@ -284,9 +284,10 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { config.infoLogger.Printf("gorealis config url: %+v\n", url) - //Basic Authentication. + // Adding Basic Authentication. if config.username != "" && config.password != "" { - AddBasicAuth(config, config.username, config.password) + httpTrans := (config.transport).(*thrift.THttpClient) + httpTrans.SetHeader("Authorization", "Basic "+basicAuth(config.username, config.password)) } return &realisClient{ @@ -416,14 +417,6 @@ func newTBinaryConfig(url string, timeoutms int, config *RealisConfig) (*RealisC } -// Helper function to add basic authorization needed to communicate with Apache Aurora. -func AddBasicAuth(config *RealisConfig, username string, password string) { - config.username = username - config.password = password - httpTrans := (config.transport).(*thrift.THttpClient) - httpTrans.SetHeader("Authorization", "Basic "+basicAuth(username, password)) -} - func basicAuth(username, password string) string { auth := username + ":" + password return base64.StdEncoding.EncodeToString([]byte(auth)) From 7662277025cc491785a2fa510ae264170553edb6 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Thu, 12 Apr 2018 11:56:16 -0700 Subject: [PATCH 08/15] Changing the logging paradigm to only require a single logger. All logging will be disabled by default. If debug is enabled, and a logger has not been set, the library will default to printing all logging (INFO and DEBUG) to the stdout. --- examples/client.go | 4 +- logger.go | 26 +++++++++++++ monitors.go | 8 ++-- realis.go | 93 +++++++++++++++++++++++++--------------------- realis_e2e_test.go | 3 +- retry.go | 16 ++++---- 6 files changed, 90 insertions(+), 60 deletions(-) diff --git a/examples/client.go b/examples/client.go index f51026c..aaee92e 100644 --- a/examples/client.go +++ b/examples/client.go @@ -24,8 +24,6 @@ import ( "strings" - "log" - "github.com/paypal/gorealis" "github.com/paypal/gorealis/gen-go/apache/aurora" "github.com/paypal/gorealis/response" @@ -91,7 +89,7 @@ func main() { Factor: 2.0, Jitter: 0.1, }), - realis.DebugLogger(log.New(os.Stdout, "realis-debug: ", log.Ltime|log.LUTC|log.Ldate)), + realis.Debug(), } //check if zkUrl is available. diff --git a/logger.go b/logger.go index aa1e377..4597dde 100644 --- a/logger.go +++ b/logger.go @@ -27,3 +27,29 @@ func (NoopLogger) Printf(format string, a ...interface{}) {} func (NoopLogger) Print(a ...interface{}) {} func (NoopLogger) Println(a ...interface{}) {} + +type LevelLogger struct { + Logger + debug bool +} + +func (l LevelLogger) DebugPrintf(format string, a ...interface{}) { + if l.debug { + l.Print("[DEBUG] ") + l.Printf(format, a) + } +} + +func (l LevelLogger) DebugPrint(a ...interface{}) { + if l.debug { + l.Print("[DEBUG] ") + l.Print(a) + } +} + +func (l LevelLogger) DebugPrintln(a ...interface{}) { + if l.debug { + l.Print("[DEBUG] ") + l.Println(a) + } +} diff --git a/monitors.go b/monitors.go index c6d8127..ac5b3fa 100644 --- a/monitors.go +++ b/monitors.go @@ -58,7 +58,7 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout updateDetail := response.JobUpdateDetails(respDetail) if len(updateDetail) == 0 { - m.Client.RealisConfig().infoLogger.Println("No update found") + m.Client.RealisConfig().logger.Println("No update found") return false, errors.New("No update found for " + updateKey.String()) } status := updateDetail[0].Update.Summary.State.Status @@ -69,13 +69,13 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout // if we encounter an inactive state and it is not at rolled forward, update failed switch status { case aurora.JobUpdateStatus_ROLLED_FORWARD: - m.Client.RealisConfig().infoLogger.Println("Update succeeded") + m.Client.RealisConfig().logger.Println("Update succeeded") return true, nil case aurora.JobUpdateStatus_FAILED: - m.Client.RealisConfig().infoLogger.Println("Update failed") + m.Client.RealisConfig().logger.Println("Update failed") return false, errors.New(UpdateFailed) case aurora.JobUpdateStatus_ROLLED_BACK: - m.Client.RealisConfig().infoLogger.Println("rolled back") + m.Client.RealisConfig().logger.Println("rolled back") return false, errors.New(RolledBack) default: return false, nil diff --git a/realis.go b/realis.go index 7609e7f..a0f87f7 100644 --- a/realis.go +++ b/realis.go @@ -82,9 +82,9 @@ type realisClient struct { client *aurora.AuroraSchedulerManagerClient readonlyClient *aurora.ReadOnlySchedulerClient adminClient *aurora.AuroraAdminClient - infoLogger Logger - debugLogger Logger + logger LevelLogger lock *sync.Mutex + debug bool } type RealisConfig struct { @@ -96,11 +96,12 @@ type RealisConfig struct { backoff Backoff transport thrift.TTransport protoFactory thrift.TProtocolFactory - infoLogger, debugLogger Logger + logger Logger InsecureSkipVerify bool certspath string clientkey, clientcert string options []ClientOption + debug bool } var defaultBackoff = Backoff{ @@ -186,16 +187,17 @@ func ClientCerts(clientKey, clientCert string) ClientOption { } } -// Using the word set to avoid name collision with Interface +// Using the word set to avoid name collision with Interface. func SetLogger(l Logger) ClientOption { return func(config *RealisConfig) { - config.infoLogger = l + config.logger = l } } -func DebugLogger(l Logger) ClientOption { +// Enable debug statements. +func Debug() ClientOption { return func(config *RealisConfig) { - config.debugLogger = l + config.debug = true } } @@ -230,8 +232,9 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { // Default configs config.timeoutms = 10000 config.backoff = defaultBackoff - config.infoLogger = log.New(os.Stdout, "realis-info: ", log.Ltime|log.Ldate|log.LUTC) - config.debugLogger = NoopLogger{} + config.logger = LevelLogger{NoopLogger{}, false} + + // Save options to recreate client if a connection error happens config.options = options // Override default configs where necessary @@ -239,7 +242,12 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { opt(config) } - config.infoLogger.Println("Number of options applied to config: ", len(options)) + config.logger.Println("Number of options applied to config: ", len(options)) + + // Set a logger if debug has been set to true but no logger has been set + if config.logger == nil && config.debug { + config.logger = log.New(os.Stdout, "realis: ", log.Ltime|log.Ldate|log.LUTC) + } //Set default Transport to JSON if needed. if !config.jsonTransport && !config.binTransport { @@ -257,10 +265,10 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { if err != nil { return nil, NewTemporaryError(errors.Wrap(err, "LeaderFromZK error")) } - config.infoLogger.Println("Scheduler URL from ZK: ", url) + config.logger.Println("Scheduler URL from ZK: ", url) } else if config.url != "" { url = config.url - config.infoLogger.Println("Scheduler URL: ", url) + config.logger.Println("Scheduler URL: ", url) } else { return nil, errors.New("Incomplete Options -- url or cluster required") } @@ -282,7 +290,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault() } - config.infoLogger.Printf("gorealis config url: %+v\n", url) + config.logger.Printf("gorealis config url: %+v\n", url) // Adding Basic Authentication. if config.username != "" && config.password != "" { @@ -295,8 +303,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { client: aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory), readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory), adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory), - infoLogger: config.infoLogger, - debugLogger: config.debugLogger, + logger: LevelLogger{config.logger, config.debug}, lock: &sync.Mutex{}}, nil } @@ -343,7 +350,7 @@ func defaultTTransport(urlstr string, timeoutms int, config *RealisConfig) (thri if config.certspath != "" { rootCAs, err := GetCerts(config.certspath) if err != nil { - config.infoLogger.Println("error occured couldn't fetch certs") + config.logger.Println("error occured couldn't fetch certs") return nil, err } tlsConfig.RootCAs = rootCAs @@ -357,7 +364,7 @@ func defaultTTransport(urlstr string, timeoutms int, config *RealisConfig) (thri if config.clientkey != "" && config.clientcert != "" { cert, err := tls.LoadX509KeyPair(config.clientcert, config.clientkey) if err != nil { - config.infoLogger.Println("error occured loading client certs and keys") + config.logger.Println("error occured loading client certs and keys") return nil, err } tlsConfig.Certificates = []tls.Certificate{cert} @@ -424,7 +431,7 @@ func basicAuth(username, password string) string { func (r *realisClient) ReestablishConn() error { // Close existing connection - r.infoLogger.Println("Re-establishing Connection to Aurora") + r.logger.Println("Re-establishing Connection to Aurora") r.Close() // Recreate connection from scratch using original options @@ -441,7 +448,7 @@ func (r *realisClient) ReestablishConn() error { r.client = newClient.client r.readonlyClient = newClient.readonlyClient r.adminClient = newClient.adminClient - r.infoLogger = newClient.infoLogger + r.logger = newClient.logger } return nil @@ -463,7 +470,7 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche Statuses: states, } - r.debugLogger.Printf("GetTasksWithoutConfigs Thrift Payload: %+v\n", taskQ) + r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", taskQ) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.GetTasksWithoutConfigs(taskQ) @@ -485,7 +492,7 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche } func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error) { - r.debugLogger.Printf("GetJobUpdateSummaries Thrift Payload: %+v\n", jobUpdateQuery) + r.logger.DebugPrintf("GetJobUpdateSummaries Thrift Payload: %+v\n", jobUpdateQuery) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.readonlyClient.GetJobUpdateSummaries(jobUpdateQuery) @@ -519,7 +526,7 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe // Kill specific instances of a job. func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) { - r.debugLogger.Printf("KillTasks Thrift Payload: %+v %v\n", key, instances) + r.logger.DebugPrintf("KillTasks Thrift Payload: %+v %v\n", key, instances) instanceIds := make(map[int32]bool) @@ -544,7 +551,7 @@ func (r *realisClient) RealisConfig() *RealisConfig { // Sends a kill message to the scheduler for all active tasks under a job. func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { - r.debugLogger.Printf("KillTasks Thrift Payload: %+v\n", key) + r.logger.DebugPrintf("KillTasks Thrift Payload: %+v\n", key) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { // Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards @@ -563,7 +570,7 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { // Use this API to create ad-hoc jobs. func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { - r.debugLogger.Printf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig()) + r.logger.DebugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig()) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.CreateJob(auroraJob.JobConfig()) @@ -595,7 +602,7 @@ func (r *realisClient) CreateService(auroraJob Job, settings *aurora.JobUpdateSe } func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) { - r.debugLogger.Printf("ScheduleCronJob Thrift Payload: %+v\n", auroraJob.JobConfig()) + r.logger.DebugPrintf("ScheduleCronJob Thrift Payload: %+v\n", auroraJob.JobConfig()) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.ScheduleCronJob(auroraJob.JobConfig()) @@ -609,7 +616,7 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) { - r.debugLogger.Printf("DescheduleCronJob Thrift Payload: %+v\n", key) + r.logger.DebugPrintf("DescheduleCronJob Thrift Payload: %+v\n", key) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.DescheduleCronJob(key) @@ -625,7 +632,7 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error) { - r.debugLogger.Printf("StartCronJob Thrift Payload: %+v\n", key) + r.logger.DebugPrintf("StartCronJob Thrift Payload: %+v\n", key) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.StartCronJob(key) @@ -640,7 +647,7 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error // Restarts specific instances specified func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) { - r.debugLogger.Printf("RestartShards Thrift Payload: %+v %v\n", key, instances) + r.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instances) instanceIds := make(map[int32]bool) @@ -666,7 +673,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) return nil, errors.Wrap(err1, "Could not retrieve relevant task instance IDs") } - r.debugLogger.Printf("RestartShards Thrift Payload: %+v %v\n", key, instanceIds) + r.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instanceIds) if len(instanceIds) > 0 { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { @@ -686,7 +693,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) // Update all tasks under a job configuration. Currently gorealis doesn't support for canary deployments. func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) { - r.debugLogger.Printf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message) + r.logger.DebugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.StartJobUpdate(updateJob.req, message) @@ -701,7 +708,7 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au // Abort Job Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI. func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) { - r.debugLogger.Printf("AbortJobUpdate Thrift Payload: %+v %v\n", updateKey, message) + r.logger.DebugPrintf("AbortJobUpdate Thrift Payload: %+v %v\n", updateKey, message) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.AbortJobUpdate(&updateKey, message) @@ -716,7 +723,7 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str //Pause Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) { - r.debugLogger.Printf("PauseJobUpdate Thrift Payload: %+v %v\n", updateKey, message) + r.logger.DebugPrintf("PauseJobUpdate Thrift Payload: %+v %v\n", updateKey, message) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.PauseJobUpdate(updateKey, message) @@ -732,7 +739,7 @@ func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st //Resume Paused Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) { - r.debugLogger.Printf("ResumeJobUpdate Thrift Payload: %+v %v\n", updateKey, message) + r.logger.DebugPrintf("ResumeJobUpdate Thrift Payload: %+v %v\n", updateKey, message) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.ResumeJobUpdate(updateKey, message) @@ -748,7 +755,7 @@ func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s //Pulse Job Update on Aurora. UpdateID is returned from StartJobUpdate or the Aurora web UI. func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.Response, error) { - r.debugLogger.Printf("PulseJobUpdate Thrift Payload: %+v\n", updateKey) + r.logger.DebugPrintf("PulseJobUpdate Thrift Payload: %+v\n", updateKey) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.PulseJobUpdate(updateKey) @@ -765,7 +772,7 @@ func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R // instance to scale up. func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) { - r.debugLogger.Printf("AddInstances Thrift Payload: %+v %v\n", instKey, count) + r.logger.DebugPrintf("AddInstances Thrift Payload: %+v %v\n", instKey, count) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.AddInstances(&instKey, count) @@ -803,7 +810,7 @@ func (r *realisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora // Get information about task including a fully hydrated task configuration object func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) { - r.debugLogger.Printf("GetTasksStatus Thrift Payload: %+v\n", query) + r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", query) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.GetTasksStatus(query) @@ -819,7 +826,7 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.S // Get information about task including without a task configuration object func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) { - r.debugLogger.Printf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query) + r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.GetTasksWithoutConfigs(query) @@ -847,7 +854,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task Statuses: aurora.ACTIVE_STATES, } - r.debugLogger.Printf("GetTasksStatus Thrift Payload: %+v\n", taskQ) + r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", taskQ) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.GetTasksStatus(taskQ) @@ -873,7 +880,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aurora.Response, error) { - r.debugLogger.Printf("GetJobUpdateDetails Thrift Payload: %+v\n", updateQuery) + r.logger.DebugPrintf("GetJobUpdateDetails Thrift Payload: %+v\n", updateQuery) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.GetJobUpdateDetails(&updateQuery) @@ -888,7 +895,7 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) { - r.debugLogger.Printf("RollbackJobUpdate Thrift Payload: %+v %v\n", key, message) + r.logger.DebugPrintf("RollbackJobUpdate Thrift Payload: %+v %v\n", key, message) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.RollbackJobUpdate(&key, message) @@ -917,7 +924,7 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr drainList.HostNames[host] = true } - r.debugLogger.Printf("DrainHosts Thrift Payload: %v\n", drainList) + r.logger.DebugPrintf("DrainHosts Thrift Payload: %v\n", drainList) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.adminClient.DrainHosts(drainList) @@ -948,7 +955,7 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror hostList.HostNames[host] = true } - r.debugLogger.Printf("EndMaintenance Thrift Payload: %v\n", hostList) + r.logger.DebugPrintf("EndMaintenance Thrift Payload: %v\n", hostList) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.adminClient.EndMaintenance(hostList) @@ -979,7 +986,7 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au hostList.HostNames[host] = true } - r.debugLogger.Printf("MaintenanceStatus Thrift Payload: %v\n", hostList) + r.logger.DebugPrintf("MaintenanceStatus Thrift Payload: %v\n", hostList) // Make thrift call. If we encounter an error sending the call, attempt to reconnect // and continue trying to resend command until we run out of retries. diff --git a/realis_e2e_test.go b/realis_e2e_test.go index e6d71e4..d27bc11 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -17,7 +17,6 @@ package realis_test import ( "fmt" "io/ioutil" - "log" "os" "testing" "time" @@ -42,7 +41,7 @@ func TestMain(m *testing.M) { r, err = realis.NewRealisClient(realis.SchedulerUrl("http://192.168.33.7:8081"), realis.BasicAuth("aurora", "secret"), realis.TimeoutMS(20000), - realis.DebugLogger(log.New(os.Stdout, "realis-debug: ", log.Ltime|log.LUTC|log.Ldate))) + realis.Debug()) if err != nil { fmt.Println("Please run vagrant box before running test suite") diff --git a/retry.go b/retry.go index a1e2e3c..39ad106 100644 --- a/retry.go +++ b/retry.go @@ -134,7 +134,7 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro adjusted = Jitter(duration, backoff.Jitter) } - r.infoLogger.Printf("A retriable error occurred during thrift call, backing off for %v before retry %v\n", adjusted, curStep) + r.logger.Printf("A retriable error occurred during thrift call, backing off for %v before retry %v\n", adjusted, curStep) time.Sleep(adjusted) duration = time.Duration(float64(duration) * backoff.Factor) @@ -149,7 +149,7 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro resp, clientErr = thriftCall() - r.debugLogger.Printf("Aurora Thrift Call ended resp: %v clientErr: %v\n", resp, clientErr) + r.logger.DebugPrintf("Aurora Thrift Call ended resp: %v clientErr: %v\n", resp, clientErr) }() // Check if our thrift call is returning an error. This is a retriable event as we don't know @@ -157,7 +157,7 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro if clientErr != nil { // Print out the error to the user - r.infoLogger.Printf("Client Error: %v\n", clientErr) + r.logger.Printf("Client Error: %v\n", clientErr) // In the future, reestablish connection should be able to check if it is actually possible // to make a thrift call to Aurora. For now, a reconnect should always lead to a retry. @@ -180,7 +180,7 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro // If the response code is transient, continue retrying case aurora.ResponseCode_ERROR_TRANSIENT: - r.infoLogger.Println("Aurora replied with Transient error code, retrying") + r.logger.Println("Aurora replied with Transient error code, retrying") continue // Failure scenarios, these indicate a bad payload or a bad config. Stop retrying. @@ -188,23 +188,23 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro case aurora.ResponseCode_ERROR: case aurora.ResponseCode_AUTH_FAILED: case aurora.ResponseCode_JOB_UPDATING_ERROR: - r.debugLogger.Println("Terminal bad reply from Aurora, won't retry") + r.logger.Println("Terminal bad reply from Aurora, won't retry") return nil, errors.New(response.CombineMessage(resp)) // The only case that should fall down to here is a WARNING response code. // It is currently not used as a response in the scheduler so it is unknown how to handle it. default: - r.debugLogger.Printf("unhandled response code %v received from Aurora\n", responseCode) + r.logger.DebugPrintf("unhandled response code %v received from Aurora\n", responseCode) return nil, errors.Errorf("unhandled response code from Aurora %v\n", responseCode.String()) } } } - r.debugLogger.Printf("it took %v retries to complete this operation\n", curStep) + r.logger.DebugPrintf("it took %v retries to complete this operation\n", curStep) if curStep > 1 { - r.config.infoLogger.Printf("retried this thrift call %d time(s)", curStep) + r.config.logger.Printf("retried this thrift call %d time(s)", curStep) } // Provide more information to the user wherever possible. From 70dfb0216915dc380a7e4a380895a6344a858ae1 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Thu, 12 Apr 2018 12:26:54 -0700 Subject: [PATCH 09/15] Minor changes to demonstrate how a logger can be used in conjunction to debug mode. --- examples/client.go | 2 ++ realis.go | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/examples/client.go b/examples/client.go index aaee92e..7162408 100644 --- a/examples/client.go +++ b/examples/client.go @@ -18,6 +18,7 @@ import ( "flag" "fmt" "io/ioutil" + "log" "os" "time" @@ -89,6 +90,7 @@ func main() { Factor: 2.0, Jitter: 0.1, }), + realis.SetLogger(log.New(os.Stdout, "realis-debug: ", log.Ldate)), realis.Debug(), } diff --git a/realis.go b/realis.go index a0f87f7..c9291d1 100644 --- a/realis.go +++ b/realis.go @@ -1035,7 +1035,7 @@ func (r *realisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb func (r *realisClient) GetQuota(role string) (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - resp, retryErr :=r.adminClient.GetQuota(role) + resp, retryErr := r.adminClient.GetQuota(role) if retryErr != nil { return nil, errors.Wrap(retryErr, "Unable to get role quota") From 0dec820951854b5a12eb0917da3b200643857bea Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Thu, 12 Apr 2018 15:12:36 -0700 Subject: [PATCH 10/15] Removing port override as it is not needed --- examples/client.go | 3 ++- realis.go | 36 +++++++++++++++++++++++++++++++----- zk.go | 26 ++++++++++++++++++++------ 3 files changed, 53 insertions(+), 12 deletions(-) diff --git a/examples/client.go b/examples/client.go index 7162408..9e83d9a 100644 --- a/examples/client.go +++ b/examples/client.go @@ -99,7 +99,8 @@ func main() { fmt.Println("zkUrl: ", zkUrl) clientOptions = append(clientOptions, realis.ZKUrl(zkUrl)) } else { - clientOptions = append(clientOptions, realis.SchedulerUrl(url)) + clientOptions = append(clientOptions, realis.SchedulerUrl(url), + realis.ZookeeperOptions(realis.ZKAuroraPortOverride(2343), realis.ZKAuroraSchemeOverride("https"))) } r, err = realis.NewRealisClient(clientOptions...) diff --git a/realis.go b/realis.go index c9291d1..e94e1e7 100644 --- a/realis.go +++ b/realis.go @@ -26,6 +26,7 @@ import ( "net/http/cookiejar" "os" "path/filepath" + "strings" "time" "sync" @@ -102,6 +103,7 @@ type RealisConfig struct { clientkey, clientcert string options []ClientOption debug bool + zkOptions []ZKOpt } var defaultBackoff = Backoff{ @@ -140,8 +142,15 @@ func ZKCluster(cluster *Cluster) ClientOption { } func ZKUrl(url string) ClientOption { + + opts := []ZKOpt{ZKEndpoints(strings.Split(url, ",")...), ZKPath("/aurora/scheduler")} + return func(config *RealisConfig) { - config.cluster = GetDefaultClusterFromZKUrl(url) + if config.zkOptions == nil { + config.zkOptions = opts + } else { + config.zkOptions = append(config.zkOptions, opts...) + } } } @@ -187,6 +196,16 @@ func ClientCerts(clientKey, clientCert string) ClientOption { } } +// Use this option if you'd like to override default settings for connecting to Zookeeper. +// For example, this can be used to override the port on which to communicate with Aurora. +// This may be helpful if Aurora is behind another service and running on a port that is different +// what is advertised in Zookeeper. +func ZookeeperOptions(opts ...ZKOpt) ClientOption { + return func(config *RealisConfig) { + config.zkOptions = opts + } +} + // Using the word set to avoid name collision with Interface. func SetLogger(l Logger) ClientOption { return func(config *RealisConfig) { @@ -257,9 +276,16 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { var url string var err error - // Determine how to get information to connect to the scheduler. - // Prioritize getting leader from ZK over using a direct URL. - if config.cluster != nil { + // Find the leader using custom Zookeeper options if options are provided + if config.zkOptions != nil { + url, err = LeaderFromZKOpts(config.zkOptions...) + if err != nil { + return nil, NewTemporaryError(errors.Wrap(err, "LeaderFromZK error")) + } + config.logger.Println("Scheduler URL from ZK: ", url) + } else if config.cluster != nil { + // Determine how to get information to connect to the scheduler. + // Prioritize getting leader from ZK over using a direct URL. url, err = LeaderFromZK(*config.cluster) // If ZK is configured, throw an error if the leader is unable to be determined if err != nil { @@ -270,7 +296,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { url = config.url config.logger.Println("Scheduler URL: ", url) } else { - return nil, errors.New("Incomplete Options -- url or cluster required") + return nil, errors.New("Incomplete Options -- url, cluster.json, or Zookeeper address required") } if config.jsonTransport { diff --git a/zk.go b/zk.go index dd711e0..838714c 100644 --- a/zk.go +++ b/zk.go @@ -36,11 +36,12 @@ type ServiceInstance struct { } type zkConfig struct { - endpoints []string - path string - backoff Backoff - timeout time.Duration - logger Logger + endpoints []string + path string + backoff Backoff + timeout time.Duration + logger Logger + auroraSchemeOverride *string } type ZKOpt func(z *zkConfig) @@ -75,6 +76,12 @@ func ZKLogger(l Logger) ZKOpt { } } +func ZKAuroraSchemeOverride(scheme string) ZKOpt { + return func(z *zkConfig) { + z.auroraSchemeOverride = &scheme + } +} + // Retrieves current Aurora leader from ZK. func LeaderFromZK(cluster Cluster) (string, error) { return LeaderFromZKOpts(ZKEndpoints(strings.Split(cluster.ZK, ",")...), ZKPath(cluster.SchedZKPath)) @@ -151,8 +158,15 @@ func LeaderFromZKOpts(options ...ZKOpt) (string, error) { var scheme, host, port string for k, v := range serviceInst.AdditionalEndpoints { - scheme = k + + if config.auroraSchemeOverride == nil { + scheme = k + } else { + scheme = *config.auroraSchemeOverride + } + host = v.Host + port = strconv.Itoa(v.Port) } From 7f104dce7063bb3008b4add6248c2adbb846cc54 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Thu, 12 Apr 2018 15:22:25 -0700 Subject: [PATCH 11/15] Changing code comments to reflect getting rid of port override. --- examples/client.go | 3 +-- realis.go | 5 ++--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/examples/client.go b/examples/client.go index 9e83d9a..7162408 100644 --- a/examples/client.go +++ b/examples/client.go @@ -99,8 +99,7 @@ func main() { fmt.Println("zkUrl: ", zkUrl) clientOptions = append(clientOptions, realis.ZKUrl(zkUrl)) } else { - clientOptions = append(clientOptions, realis.SchedulerUrl(url), - realis.ZookeeperOptions(realis.ZKAuroraPortOverride(2343), realis.ZKAuroraSchemeOverride("https"))) + clientOptions = append(clientOptions, realis.SchedulerUrl(url)) } r, err = realis.NewRealisClient(clientOptions...) diff --git a/realis.go b/realis.go index e94e1e7..f1cb09f 100644 --- a/realis.go +++ b/realis.go @@ -197,9 +197,8 @@ func ClientCerts(clientKey, clientCert string) ClientOption { } // Use this option if you'd like to override default settings for connecting to Zookeeper. -// For example, this can be used to override the port on which to communicate with Aurora. -// This may be helpful if Aurora is behind another service and running on a port that is different -// what is advertised in Zookeeper. +// For example, this can be used to override the scheme to be used for communicating with Aurora (e.g. https). +// See zk.go for what is possible to set as an option. func ZookeeperOptions(opts ...ZKOpt) ClientOption { return func(config *RealisConfig) { config.zkOptions = opts From 7b1f51a747de706146bde7522c917925d22d761e Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Thu, 12 Apr 2018 18:11:07 -0700 Subject: [PATCH 12/15] Adding port override back in. --- zk.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/zk.go b/zk.go index 838714c..f930bd9 100644 --- a/zk.go +++ b/zk.go @@ -42,6 +42,7 @@ type zkConfig struct { timeout time.Duration logger Logger auroraSchemeOverride *string + auroraPortOverride *int } type ZKOpt func(z *zkConfig) @@ -82,6 +83,12 @@ func ZKAuroraSchemeOverride(scheme string) ZKOpt { } } +func ZKAuroraPortOverride(port int) ZKOpt { + return func(z *zkConfig) { + z.auroraPortOverride = &port + } +} + // Retrieves current Aurora leader from ZK. func LeaderFromZK(cluster Cluster) (string, error) { return LeaderFromZKOpts(ZKEndpoints(strings.Split(cluster.ZK, ",")...), ZKPath(cluster.SchedZKPath)) @@ -167,7 +174,11 @@ func LeaderFromZKOpts(options ...ZKOpt) (string, error) { host = v.Host - port = strconv.Itoa(v.Port) + if config.auroraPortOverride == nil { + port = strconv.Itoa(v.Port) + } else { + port = strconv.Itoa(*config.auroraPortOverride) + } } leaderURL = scheme + "://" + host + ":" + port From 49e3194ba0a6c30931b75c69c0c0ac57fc62ebe5 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Mon, 16 Apr 2018 16:54:53 -0700 Subject: [PATCH 13/15] Bug fix: Logger was being set to NOOP despite no logger being provided when debug mode is turned on. --- realis.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/realis.go b/realis.go index f1cb09f..a697a60 100644 --- a/realis.go +++ b/realis.go @@ -250,7 +250,6 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { // Default configs config.timeoutms = 10000 config.backoff = defaultBackoff - config.logger = LevelLogger{NoopLogger{}, false} // Save options to recreate client if a connection error happens config.options = options @@ -260,13 +259,15 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { opt(config) } - config.logger.Println("Number of options applied to config: ", len(options)) - - // Set a logger if debug has been set to true but no logger has been set + // Set a logger if debug has been set to true but no logger has been set, otherwise, set noop logger if config.logger == nil && config.debug { config.logger = log.New(os.Stdout, "realis: ", log.Ltime|log.Ldate|log.LUTC) + } else if config.logger == nil { + config.logger = LevelLogger{NoopLogger{}, false} } + config.logger.Println("Number of options applied to config: ", len(options)) + //Set default Transport to JSON if needed. if !config.jsonTransport && !config.binTransport { config.jsonTransport = true From d64a91784a16206c036048a5006a103ed7fe0d89 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Mon, 16 Apr 2018 17:54:41 -0700 Subject: [PATCH 14/15] Turn on logging by default. --- examples/client.go | 2 -- realis.go | 10 ++++------ 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/examples/client.go b/examples/client.go index 7162408..aaee92e 100644 --- a/examples/client.go +++ b/examples/client.go @@ -18,7 +18,6 @@ import ( "flag" "fmt" "io/ioutil" - "log" "os" "time" @@ -90,7 +89,6 @@ func main() { Factor: 2.0, Jitter: 0.1, }), - realis.SetLogger(log.New(os.Stdout, "realis-debug: ", log.Ldate)), realis.Debug(), } diff --git a/realis.go b/realis.go index a697a60..4fd90bb 100644 --- a/realis.go +++ b/realis.go @@ -27,9 +27,8 @@ import ( "os" "path/filepath" "strings" - "time" - "sync" + "time" "git.apache.org/thrift.git/lib/go/thrift" "github.com/paypal/gorealis/gen-go/apache/aurora" @@ -250,6 +249,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { // Default configs config.timeoutms = 10000 config.backoff = defaultBackoff + config.logger = log.New(os.Stdout, "realis: ", log.Ltime|log.Ldate|log.LUTC) // Save options to recreate client if a connection error happens config.options = options @@ -259,10 +259,8 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { opt(config) } - // Set a logger if debug has been set to true but no logger has been set, otherwise, set noop logger - if config.logger == nil && config.debug { - config.logger = log.New(os.Stdout, "realis: ", log.Ltime|log.Ldate|log.LUTC) - } else if config.logger == nil { + // Turn off all logging (including debug) + if config.logger == nil { config.logger = LevelLogger{NoopLogger{}, false} } From 82ed77b7c6d8f0bd9549b11fa89c193683ef5cbd Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Fri, 15 Jun 2018 14:59:22 -0700 Subject: [PATCH 15/15] Removing option to override schema and ports for information found on Zookeeper. --- zk.go | 27 ++------------------------- 1 file changed, 2 insertions(+), 25 deletions(-) diff --git a/zk.go b/zk.go index f930bd9..34ad1a7 100644 --- a/zk.go +++ b/zk.go @@ -77,18 +77,6 @@ func ZKLogger(l Logger) ZKOpt { } } -func ZKAuroraSchemeOverride(scheme string) ZKOpt { - return func(z *zkConfig) { - z.auroraSchemeOverride = &scheme - } -} - -func ZKAuroraPortOverride(port int) ZKOpt { - return func(z *zkConfig) { - z.auroraPortOverride = &port - } -} - // Retrieves current Aurora leader from ZK. func LeaderFromZK(cluster Cluster) (string, error) { return LeaderFromZKOpts(ZKEndpoints(strings.Split(cluster.ZK, ",")...), ZKPath(cluster.SchedZKPath)) @@ -165,20 +153,9 @@ func LeaderFromZKOpts(options ...ZKOpt) (string, error) { var scheme, host, port string for k, v := range serviceInst.AdditionalEndpoints { - - if config.auroraSchemeOverride == nil { - scheme = k - } else { - scheme = *config.auroraSchemeOverride - } - + scheme = k host = v.Host - - if config.auroraPortOverride == nil { - port = strconv.Itoa(v.Port) - } else { - port = strconv.Itoa(*config.auroraPortOverride) - } + port = strconv.Itoa(v.Port) } leaderURL = scheme + "://" + host + ":" + port