Adding a debug logger that is turned off by default.

Info logger is enabled by default but prints out less information.
This commit is contained in:
Renan DelValle 2018-03-06 12:43:09 -08:00
parent 7152f568fe
commit 69442d5957
No known key found for this signature in database
GPG key ID: C240AD6D6F443EC9
5 changed files with 129 additions and 60 deletions

View file

@ -85,13 +85,13 @@ func main() {
realis.BasicAuth(username, password), realis.BasicAuth(username, password),
realis.ThriftJSON(), realis.ThriftJSON(),
realis.TimeoutMS(CONNECTION_TIMEOUT), realis.TimeoutMS(CONNECTION_TIMEOUT),
realis.BackOff(&realis.Backoff{ realis.BackOff(realis.Backoff{
Steps: 2, Steps: 2,
Duration: 10 * time.Second, Duration: 10 * time.Second,
Factor: 2.0, Factor: 2.0,
Jitter: 0.1, Jitter: 0.1,
}), }),
realis.SetLogger(log.New(os.Stdout, "realis-debug: ", log.Ldate)), realis.DebugLogger(log.New(os.Stdout, "realis-debug: ", log.Ltime|log.LUTC|log.Ldate)),
} }
//check if zkUrl is available. //check if zkUrl is available.
@ -432,8 +432,8 @@ func main() {
case "pauseJobUpdate": case "pauseJobUpdate":
resp, err := r.PauseJobUpdate(&aurora.JobUpdateKey{ resp, err := r.PauseJobUpdate(&aurora.JobUpdateKey{
Job: job.JobKey(), Job: job.JobKey(),
ID: updateId, ID: updateId,
}, "") }, "")
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
@ -443,7 +443,7 @@ func main() {
case "resumeJobUpdate": case "resumeJobUpdate":
resp, err := r.ResumeJobUpdate(&aurora.JobUpdateKey{ resp, err := r.ResumeJobUpdate(&aurora.JobUpdateKey{
Job: job.JobKey(), Job: job.JobKey(),
ID: updateId, ID: updateId,
}, "") }, "")
if err != nil { if err != nil {
@ -454,8 +454,8 @@ func main() {
case "pulseJobUpdate": case "pulseJobUpdate":
resp, err := r.PulseJobUpdate(&aurora.JobUpdateKey{ resp, err := r.PulseJobUpdate(&aurora.JobUpdateKey{
Job: job.JobKey(), Job: job.JobKey(),
ID: updateId, ID: updateId,
}) })
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
} }

View file

@ -58,7 +58,7 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout
updateDetail := response.JobUpdateDetails(respDetail) updateDetail := response.JobUpdateDetails(respDetail)
if len(updateDetail) == 0 { if len(updateDetail) == 0 {
m.Client.RealisConfig().logger.Println("No update found") m.Client.RealisConfig().infoLogger.Println("No update found")
return false, errors.New("No update found for " + updateKey.String()) return false, errors.New("No update found for " + updateKey.String())
} }
status := updateDetail[0].Update.Summary.State.Status status := updateDetail[0].Update.Summary.State.Status
@ -69,13 +69,13 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout
// if we encounter an inactive state and it is not at rolled forward, update failed // if we encounter an inactive state and it is not at rolled forward, update failed
switch status { switch status {
case aurora.JobUpdateStatus_ROLLED_FORWARD: case aurora.JobUpdateStatus_ROLLED_FORWARD:
m.Client.RealisConfig().logger.Println("Update succeded") m.Client.RealisConfig().infoLogger.Println("Update succeded")
return true, nil return true, nil
case aurora.JobUpdateStatus_FAILED: case aurora.JobUpdateStatus_FAILED:
m.Client.RealisConfig().logger.Println("Update failed") m.Client.RealisConfig().infoLogger.Println("Update failed")
return false, errors.New(UpdateFailed) return false, errors.New(UpdateFailed)
case aurora.JobUpdateStatus_ROLLED_BACK: case aurora.JobUpdateStatus_ROLLED_BACK:
m.Client.RealisConfig().logger.Println("rolled back") m.Client.RealisConfig().infoLogger.Println("rolled back")
return false, errors.New(RolledBack) return false, errors.New(RolledBack)
default: default:
return false, nil return false, nil

View file

@ -21,8 +21,10 @@ import (
"encoding/base64" "encoding/base64"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"log"
"net/http" "net/http"
"net/http/cookiejar" "net/http/cookiejar"
"os"
"path/filepath" "path/filepath"
"time" "time"
@ -80,7 +82,8 @@ type realisClient struct {
client *aurora.AuroraSchedulerManagerClient client *aurora.AuroraSchedulerManagerClient
readonlyClient *aurora.ReadOnlySchedulerClient readonlyClient *aurora.ReadOnlySchedulerClient
adminClient *aurora.AuroraAdminClient adminClient *aurora.AuroraAdminClient
logger Logger infoLogger Logger
debugLogger Logger
lock sync.Mutex lock sync.Mutex
} }
@ -93,7 +96,8 @@ type RealisConfig struct {
backoff Backoff backoff Backoff
transport thrift.TTransport transport thrift.TTransport
protoFactory thrift.TProtocolFactory protoFactory thrift.TProtocolFactory
logger Logger infoLogger Logger
debugLogger Logger
InsecureSkipVerify bool InsecureSkipVerify bool
certspath string certspath string
clientkey, clientcert string clientkey, clientcert string
@ -186,7 +190,13 @@ func ClientCerts(clientKey, clientCert string) ClientOption {
// Using the word set to avoid name collision with Interface // Using the word set to avoid name collision with Interface
func SetLogger(l Logger) ClientOption { func SetLogger(l Logger) ClientOption {
return func(config *RealisConfig) { return func(config *RealisConfig) {
config.logger = l config.infoLogger = l
}
}
func DebugLogger(l Logger) ClientOption {
return func(config *RealisConfig) {
config.debugLogger = l
} }
} }
@ -221,7 +231,8 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
// Default configs // Default configs
config.timeoutms = 10000 config.timeoutms = 10000
config.backoff = defaultBackoff config.backoff = defaultBackoff
config.logger = NoopLogger{} config.infoLogger = log.New(os.Stdout, "realis-info: ", log.Ltime|log.Ldate|log.LUTC)
config.debugLogger = NoopLogger{}
config.options = options config.options = options
// Override default configs where necessary // Override default configs where necessary
@ -229,7 +240,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
opt(config) opt(config)
} }
config.logger.Println("Number of options applied to config: ", len(options)) config.infoLogger.Println("Number of options applied to config: ", len(options))
//Set default Transport to JSON if needed. //Set default Transport to JSON if needed.
if !config.jsonTransport && !config.binTransport { if !config.jsonTransport && !config.binTransport {
@ -247,10 +258,10 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
if err != nil { if err != nil {
return nil, NewTemporaryError(errors.Wrap(err, "LeaderFromZK error")) return nil, NewTemporaryError(errors.Wrap(err, "LeaderFromZK error"))
} }
config.logger.Println("Scheduler URL from ZK: ", url) config.infoLogger.Println("Scheduler URL from ZK: ", url)
} else if config.url != "" { } else if config.url != "" {
url = config.url url = config.url
config.logger.Println("Scheduler URL: ", url) config.infoLogger.Println("Scheduler URL: ", url)
} else { } else {
return nil, errors.New("Incomplete Options -- url or cluster required") return nil, errors.New("Incomplete Options -- url or cluster required")
} }
@ -272,7 +283,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault() config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault()
} }
config.logger.Printf("gorealis config url: %+v\n", url) config.infoLogger.Printf("gorealis config url: %+v\n", url)
//Basic Authentication. //Basic Authentication.
if config.username != "" && config.password != "" { if config.username != "" && config.password != "" {
@ -284,7 +295,8 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
client: aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory), client: aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory),
readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory), readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory),
adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory), adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory),
logger: config.logger}, nil infoLogger: config.infoLogger,
debugLogger: config.debugLogger}, nil
} }
func GetDefaultClusterFromZKUrl(zkurl string) *Cluster { func GetDefaultClusterFromZKUrl(zkurl string) *Cluster {
@ -330,7 +342,7 @@ func defaultTTransport(urlstr string, timeoutms int, config *RealisConfig) (thri
if config.certspath != "" { if config.certspath != "" {
rootCAs, err := GetCerts(config.certspath) rootCAs, err := GetCerts(config.certspath)
if err != nil { if err != nil {
config.logger.Println("error occured couldn't fetch certs") config.infoLogger.Println("error occured couldn't fetch certs")
return nil, err return nil, err
} }
tlsConfig.RootCAs = rootCAs tlsConfig.RootCAs = rootCAs
@ -344,7 +356,7 @@ func defaultTTransport(urlstr string, timeoutms int, config *RealisConfig) (thri
if config.clientkey != "" && config.clientcert != "" { if config.clientkey != "" && config.clientcert != "" {
cert, err := tls.LoadX509KeyPair(config.clientcert, config.clientkey) cert, err := tls.LoadX509KeyPair(config.clientcert, config.clientkey)
if err != nil { if err != nil {
config.logger.Println("error occured loading client certs and keys") config.infoLogger.Println("error occured loading client certs and keys")
return nil, err return nil, err
} }
tlsConfig.Certificates = []tls.Certificate{cert} tlsConfig.Certificates = []tls.Certificate{cert}
@ -419,7 +431,7 @@ func basicAuth(username, password string) string {
func (r *realisClient) ReestablishConn() error { func (r *realisClient) ReestablishConn() error {
// Close existing connection // Close existing connection
r.logger.Println("Re-establishing Connection to Aurora") r.infoLogger.Println("Re-establishing Connection to Aurora")
r.Close() r.Close()
// Recreate connection from scratch using original options // Recreate connection from scratch using original options
@ -436,7 +448,7 @@ func (r *realisClient) ReestablishConn() error {
r.client = newClient.client r.client = newClient.client
r.readonlyClient = newClient.readonlyClient r.readonlyClient = newClient.readonlyClient
r.adminClient = newClient.adminClient r.adminClient = newClient.adminClient
r.logger = newClient.logger r.infoLogger = newClient.infoLogger
} }
return nil return nil
@ -458,6 +470,8 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche
Statuses: states, Statuses: states,
} }
r.debugLogger.Printf("GetTasksWithoutConfig sThrift Payload: %v\n", *taskQ)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.GetTasksWithoutConfigs(taskQ) return r.client.GetTasksWithoutConfigs(taskQ)
}) })
@ -478,6 +492,8 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche
} }
func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error) { func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error) {
r.debugLogger.Printf("GetJobUpdateSummaries Thrift Payload: %v\n", *jobUpdateQuery)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.readonlyClient.GetJobUpdateSummaries(jobUpdateQuery) return r.readonlyClient.GetJobUpdateSummaries(jobUpdateQuery)
}) })
@ -510,6 +526,7 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe
// Kill specific instances of a job. // Kill specific instances of a job.
func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) { func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) {
r.debugLogger.Printf("KillTasks Thrift Payload: %v %v\n", *key, instances)
instanceIds := make(map[int32]bool) instanceIds := make(map[int32]bool)
@ -534,6 +551,8 @@ func (r *realisClient) RealisConfig() *RealisConfig {
// Sends a kill message to the scheduler for all active tasks under a job. // Sends a kill message to the scheduler for all active tasks under a job.
func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
r.debugLogger.Printf("KillTasks Thrift Payload: %v %v\n", *key, nil)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
// Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards // Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards
return r.client.KillTasks(key, nil, "") return r.client.KillTasks(key, nil, "")
@ -551,6 +570,8 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
// Use this API to create ad-hoc jobs. // Use this API to create ad-hoc jobs.
func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
r.debugLogger.Printf("CreateJob Thrift Payload: %v\n", *auroraJob.JobConfig())
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.CreateJob(auroraJob.JobConfig()) return r.client.CreateJob(auroraJob.JobConfig())
}) })
@ -581,6 +602,7 @@ func (r *realisClient) CreateService(auroraJob Job, settings *aurora.JobUpdateSe
} }
func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) { func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) {
r.debugLogger.Printf("ScheduleCronJob Thrift Payload: %v\n", *auroraJob.JobConfig())
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.ScheduleCronJob(auroraJob.JobConfig()) return r.client.ScheduleCronJob(auroraJob.JobConfig())
@ -594,6 +616,8 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) { func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) {
r.debugLogger.Printf("DescheduleCronJob Thrift Payload: %v\n", *key)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.DescheduleCronJob(key) return r.client.DescheduleCronJob(key)
}) })
@ -608,6 +632,8 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response,
func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error) { func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error) {
r.debugLogger.Printf("StartCronJob Thrift Payload: %v\n", *key)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.StartCronJob(key) return r.client.StartCronJob(key)
}) })
@ -621,6 +647,8 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error
// Restarts specific instances specified // Restarts specific instances specified
func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) { func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) {
r.debugLogger.Printf("RestartShards Thrift Payload: %v %v\n", *key, instances)
instanceIds := make(map[int32]bool) instanceIds := make(map[int32]bool)
for _, instId := range instances { for _, instId := range instances {
@ -645,6 +673,8 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
return nil, errors.Wrap(err1, "Could not retrieve relevant task instance IDs") return nil, errors.Wrap(err1, "Could not retrieve relevant task instance IDs")
} }
r.debugLogger.Printf("RestartShards Thrift Payload: %v %v\n", *key, instanceIds)
if len(instanceIds) > 0 { if len(instanceIds) > 0 {
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.RestartShards(key, instanceIds) return r.client.RestartShards(key, instanceIds)
@ -663,6 +693,8 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
// Update all tasks under a job configuration. Currently gorealis doesn't support for canary deployments. // Update all tasks under a job configuration. Currently gorealis doesn't support for canary deployments.
func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) { func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) {
r.debugLogger.Printf("StartJobUpdate Thrift Payload: %v %v\n", *updateJob, message)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.StartJobUpdate(updateJob.req, message) return r.client.StartJobUpdate(updateJob.req, message)
}) })
@ -676,6 +708,8 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au
// Abort Job Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI. // Abort Job Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI.
func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) { func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) {
r.debugLogger.Printf("AbortJobUpdate Thrift Payload: %v %v\n", updateKey, message)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.AbortJobUpdate(&updateKey, message) return r.client.AbortJobUpdate(&updateKey, message)
}) })
@ -689,6 +723,8 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str
//Pause Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. //Pause Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.
func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) { func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) {
r.debugLogger.Printf("PauseJobUpdate Thrift Payload: %v %v\n", updateKey, message)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.PauseJobUpdate(updateKey, message) return r.client.PauseJobUpdate(updateKey, message)
}) })
@ -703,6 +739,8 @@ func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st
//Resume Paused Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. //Resume Paused Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.
func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) { func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) {
r.debugLogger.Printf("ResumeJobUpdate Thrift Payload: %v %v\n", updateKey, message)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.ResumeJobUpdate(updateKey, message) return r.client.ResumeJobUpdate(updateKey, message)
}) })
@ -717,6 +755,8 @@ func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s
//Pulse Job Update on Aurora. UpdateID is returned from StartJobUpdate or the Aurora web UI. //Pulse Job Update on Aurora. UpdateID is returned from StartJobUpdate or the Aurora web UI.
func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.Response, error) { func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.Response, error) {
r.debugLogger.Printf("PulseJobUpdate Thrift Payload: %v\n", updateKey)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.PulseJobUpdate(updateKey) return r.client.PulseJobUpdate(updateKey)
}) })
@ -732,6 +772,8 @@ func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R
// instance to scale up. // instance to scale up.
func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) { func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) {
r.debugLogger.Printf("AddInstances Thrift Payload: %v %v\n", instKey, count)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.AddInstances(&instKey, count) return r.client.AddInstances(&instKey, count)
}) })
@ -768,6 +810,8 @@ func (r *realisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora
// Get information about task including a fully hydrated task configuration object // Get information about task including a fully hydrated task configuration object
func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) { func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) {
r.debugLogger.Printf("GetTasksStatus Thrift Payload: %v\n", *query)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.GetTasksStatus(query) return r.client.GetTasksStatus(query)
}) })
@ -782,6 +826,8 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.S
// Get information about task including without a task configuration object // Get information about task including without a task configuration object
func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) { func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) {
r.debugLogger.Printf("GetTasksWithoutConfigs Thrift Payload: %v\n", *query)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.GetTasksWithoutConfigs(query) return r.client.GetTasksWithoutConfigs(query)
}) })
@ -808,6 +854,8 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
Statuses: aurora.ACTIVE_STATES, Statuses: aurora.ACTIVE_STATES,
} }
r.debugLogger.Printf("GetTasksStatus Thrift Payload: %v\n", *taskQ)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.GetTasksStatus(taskQ) return r.client.GetTasksStatus(taskQ)
}) })
@ -832,6 +880,8 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aurora.Response, error) { func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aurora.Response, error) {
r.debugLogger.Printf("GetJobUpdateDetails Thrift Payload: %v\n", updateQuery)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.GetJobUpdateDetails(&updateQuery) return r.client.GetJobUpdateDetails(&updateQuery)
}) })
@ -845,6 +895,8 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur
func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) { func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) {
r.debugLogger.Printf("RollbackJobUpdate Thrift Payload: %v %v\n", key, message)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.RollbackJobUpdate(&key, message) return r.client.RollbackJobUpdate(&key, message)
}) })
@ -872,6 +924,8 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr
drainList.HostNames[host] = true drainList.HostNames[host] = true
} }
r.debugLogger.Printf("DrainHosts Thrift Payload: %v\n", drainList)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.DrainHosts(drainList) return r.adminClient.DrainHosts(drainList)
}) })
@ -901,6 +955,8 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror
hostList.HostNames[host] = true hostList.HostNames[host] = true
} }
r.debugLogger.Printf("EndMaintenance Thrift Payload: %v\n", hostList)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.EndMaintenance(hostList) return r.adminClient.EndMaintenance(hostList)
}) })
@ -930,6 +986,8 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au
hostList.HostNames[host] = true hostList.HostNames[host] = true
} }
r.debugLogger.Printf("MaintenanceStatus Thrift Payload: %v\n", hostList)
// Make thrift call. If we encounter an error sending the call, attempt to reconnect // Make thrift call. If we encounter an error sending the call, attempt to reconnect
// and continue trying to resend command until we run out of retries. // and continue trying to resend command until we run out of retries.
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {

View file

@ -17,6 +17,7 @@ package realis_test
import ( import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"log"
"os" "os"
"testing" "testing"
"time" "time"
@ -40,7 +41,9 @@ func TestMain(m *testing.M) {
// New configuration to connect to Vagrant image // New configuration to connect to Vagrant image
r, err = realis.NewRealisClient(realis.SchedulerUrl("http://192.168.33.7:8081"), r, err = realis.NewRealisClient(realis.SchedulerUrl("http://192.168.33.7:8081"),
realis.BasicAuth("aurora", "secret"), realis.BasicAuth("aurora", "secret"),
realis.TimeoutMS(20000)) realis.TimeoutMS(20000),
realis.DebugLogger(log.New(os.Stdout, "realis-debug: ", log.Ltime|log.LUTC|log.Ldate)))
if err != nil { if err != nil {
fmt.Println("Please run vagrant box before running test suite") fmt.Println("Please run vagrant box before running test suite")
os.Exit(1) os.Exit(1)

View file

@ -134,7 +134,7 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro
adjusted = Jitter(duration, backoff.Jitter) adjusted = Jitter(duration, backoff.Jitter)
} }
r.logger.Printf("A retriable error occurred during thrift call, backing off for %v before retry %v\n", adjusted, curStep+1) r.infoLogger.Printf("A retriable error occurred during thrift call, backing off for %v before retry %v\n", adjusted, curStep)
time.Sleep(adjusted) time.Sleep(adjusted)
duration = time.Duration(float64(duration) * backoff.Factor) duration = time.Duration(float64(duration) * backoff.Factor)
@ -146,8 +146,12 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro
func() { func() {
r.lock.Lock() r.lock.Lock()
defer r.lock.Unlock() defer r.lock.Unlock()
r.logger.Println("Beginning Aurora Thrift Call")
r.debugLogger.Println("Beginning Aurora Thrift Call")
resp, clientErr = thriftCall() resp, clientErr = thriftCall()
r.debugLogger.Printf("Aurora Thrift Call ended resp: %v clientErr: %v\n", resp, clientErr)
}() }()
// Check if our thrift call is returning an error. This is a retriable event as we don't know // Check if our thrift call is returning an error. This is a retriable event as we don't know
@ -155,54 +159,58 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro
if clientErr != nil { if clientErr != nil {
// Print out the error to the user // Print out the error to the user
r.logger.Println(clientErr) r.infoLogger.Printf("Client Error: %v\n", clientErr)
r.ReestablishConn()
// In the future, reestablish connection should be able to check if it is actually possible // In the future, reestablish connection should be able to check if it is actually possible
// to make a thrift call to Aurora. For now, a reconnect should always lead to a retry. // to make a thrift call to Aurora. For now, a reconnect should always lead to a retry.
continue r.ReestablishConn()
}
// If there was no client error, but the response is nil, something went wrong. } else {
// Ideally, we'll never encounter this but we're placing a safeguard here.
if resp == nil {
return nil, errors.New("Response from aurora is nil")
}
// Log response code for debugging // If there was no client error, but the response is nil, something went wrong.
r.logger.Printf("Aurora replied with code: %v\n", resp.GetResponseCode().String()) // Ideally, we'll never encounter this but we're placing a safeguard here.
if resp == nil {
return nil, errors.New("Response from aurora is nil")
}
// Check Response Code from thrift and make a decision to continue retrying or not. // Log response code for debugging
switch responseCode := resp.GetResponseCode(); responseCode { r.debugLogger.Printf("Aurora replied with code: %v\n", resp.GetResponseCode().String())
// If the thrift call succeeded, stop retrying // Check Response Code from thrift and make a decision to continue retrying or not.
case aurora.ResponseCode_OK: switch responseCode := resp.GetResponseCode(); responseCode {
return resp, nil
// If the response code is transient, continue retrying // If the thrift call succeeded, stop retrying
case aurora.ResponseCode_ERROR_TRANSIENT: case aurora.ResponseCode_OK:
r.logger.Println("Aurora replied with Transient error code, retrying") r.debugLogger.Println("OK reply from Aurora")
continue return resp, nil
// Failure scenarios, these indicate a bad payload or a bad config. Stop retrying. // If the response code is transient, continue retrying
case aurora.ResponseCode_INVALID_REQUEST: case aurora.ResponseCode_ERROR_TRANSIENT:
case aurora.ResponseCode_ERROR: r.infoLogger.Println("Aurora replied with Transient error code, retrying")
case aurora.ResponseCode_AUTH_FAILED: continue
case aurora.ResponseCode_JOB_UPDATING_ERROR:
return nil, errors.New(response.CombineMessage(resp))
// The only case that should fall down to here is a WARNING response code. // Failure scenarios, these indicate a bad payload or a bad config. Stop retrying.
// It is currently not used as a response in the scheduler so it is unknown how to handle it. case aurora.ResponseCode_INVALID_REQUEST:
default: case aurora.ResponseCode_ERROR:
r.logger.Println("unhandled response code received from Aurora") case aurora.ResponseCode_AUTH_FAILED:
return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String()) case aurora.ResponseCode_JOB_UPDATING_ERROR:
r.debugLogger.Println("Terminal bad reply from Aurora, won't retry")
return nil, errors.New(response.CombineMessage(resp))
// The only case that should fall down to here is a WARNING response code.
// It is currently not used as a response in the scheduler so it is unknown how to handle it.
default:
r.debugLogger.Printf("unhandled response code %v received from Aurora\n", responseCode)
return nil, errors.Errorf("unhandled response code from Aurora %v\n", responseCode.String())
}
} }
} }
r.debugLogger.Printf("it took %v retries to complete this operation\n", curStep)
if curStep > 1 { if curStep > 1 {
r.config.logger.Printf("retried this thrift call %d time(s)", curStep) r.config.infoLogger.Printf("retried this thrift call %d time(s)", curStep)
} }
// Provide more information to the user wherever possible. // Provide more information to the user wherever possible.