Changing the logging paradigm to only require a single logger. All logging will be disabled by default. If debug is enabled, and a logger has not been set, the library will default to printing all logging (INFO and DEBUG) to the stdout.
This commit is contained in:
parent
256ec2ea47
commit
7662277025
6 changed files with 90 additions and 60 deletions
|
@ -24,8 +24,6 @@ import (
|
|||
|
||||
"strings"
|
||||
|
||||
"log"
|
||||
|
||||
"github.com/paypal/gorealis"
|
||||
"github.com/paypal/gorealis/gen-go/apache/aurora"
|
||||
"github.com/paypal/gorealis/response"
|
||||
|
@ -91,7 +89,7 @@ func main() {
|
|||
Factor: 2.0,
|
||||
Jitter: 0.1,
|
||||
}),
|
||||
realis.DebugLogger(log.New(os.Stdout, "realis-debug: ", log.Ltime|log.LUTC|log.Ldate)),
|
||||
realis.Debug(),
|
||||
}
|
||||
|
||||
//check if zkUrl is available.
|
||||
|
|
26
logger.go
26
logger.go
|
@ -27,3 +27,29 @@ func (NoopLogger) Printf(format string, a ...interface{}) {}
|
|||
func (NoopLogger) Print(a ...interface{}) {}
|
||||
|
||||
func (NoopLogger) Println(a ...interface{}) {}
|
||||
|
||||
type LevelLogger struct {
|
||||
Logger
|
||||
debug bool
|
||||
}
|
||||
|
||||
func (l LevelLogger) DebugPrintf(format string, a ...interface{}) {
|
||||
if l.debug {
|
||||
l.Print("[DEBUG] ")
|
||||
l.Printf(format, a)
|
||||
}
|
||||
}
|
||||
|
||||
func (l LevelLogger) DebugPrint(a ...interface{}) {
|
||||
if l.debug {
|
||||
l.Print("[DEBUG] ")
|
||||
l.Print(a)
|
||||
}
|
||||
}
|
||||
|
||||
func (l LevelLogger) DebugPrintln(a ...interface{}) {
|
||||
if l.debug {
|
||||
l.Print("[DEBUG] ")
|
||||
l.Println(a)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -58,7 +58,7 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout
|
|||
updateDetail := response.JobUpdateDetails(respDetail)
|
||||
|
||||
if len(updateDetail) == 0 {
|
||||
m.Client.RealisConfig().infoLogger.Println("No update found")
|
||||
m.Client.RealisConfig().logger.Println("No update found")
|
||||
return false, errors.New("No update found for " + updateKey.String())
|
||||
}
|
||||
status := updateDetail[0].Update.Summary.State.Status
|
||||
|
@ -69,13 +69,13 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout
|
|||
// if we encounter an inactive state and it is not at rolled forward, update failed
|
||||
switch status {
|
||||
case aurora.JobUpdateStatus_ROLLED_FORWARD:
|
||||
m.Client.RealisConfig().infoLogger.Println("Update succeeded")
|
||||
m.Client.RealisConfig().logger.Println("Update succeeded")
|
||||
return true, nil
|
||||
case aurora.JobUpdateStatus_FAILED:
|
||||
m.Client.RealisConfig().infoLogger.Println("Update failed")
|
||||
m.Client.RealisConfig().logger.Println("Update failed")
|
||||
return false, errors.New(UpdateFailed)
|
||||
case aurora.JobUpdateStatus_ROLLED_BACK:
|
||||
m.Client.RealisConfig().infoLogger.Println("rolled back")
|
||||
m.Client.RealisConfig().logger.Println("rolled back")
|
||||
return false, errors.New(RolledBack)
|
||||
default:
|
||||
return false, nil
|
||||
|
|
93
realis.go
93
realis.go
|
@ -82,9 +82,9 @@ type realisClient struct {
|
|||
client *aurora.AuroraSchedulerManagerClient
|
||||
readonlyClient *aurora.ReadOnlySchedulerClient
|
||||
adminClient *aurora.AuroraAdminClient
|
||||
infoLogger Logger
|
||||
debugLogger Logger
|
||||
logger LevelLogger
|
||||
lock *sync.Mutex
|
||||
debug bool
|
||||
}
|
||||
|
||||
type RealisConfig struct {
|
||||
|
@ -96,11 +96,12 @@ type RealisConfig struct {
|
|||
backoff Backoff
|
||||
transport thrift.TTransport
|
||||
protoFactory thrift.TProtocolFactory
|
||||
infoLogger, debugLogger Logger
|
||||
logger Logger
|
||||
InsecureSkipVerify bool
|
||||
certspath string
|
||||
clientkey, clientcert string
|
||||
options []ClientOption
|
||||
debug bool
|
||||
}
|
||||
|
||||
var defaultBackoff = Backoff{
|
||||
|
@ -186,16 +187,17 @@ func ClientCerts(clientKey, clientCert string) ClientOption {
|
|||
}
|
||||
}
|
||||
|
||||
// Using the word set to avoid name collision with Interface
|
||||
// Using the word set to avoid name collision with Interface.
|
||||
func SetLogger(l Logger) ClientOption {
|
||||
return func(config *RealisConfig) {
|
||||
config.infoLogger = l
|
||||
config.logger = l
|
||||
}
|
||||
}
|
||||
|
||||
func DebugLogger(l Logger) ClientOption {
|
||||
// Enable debug statements.
|
||||
func Debug() ClientOption {
|
||||
return func(config *RealisConfig) {
|
||||
config.debugLogger = l
|
||||
config.debug = true
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -230,8 +232,9 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
|
|||
// Default configs
|
||||
config.timeoutms = 10000
|
||||
config.backoff = defaultBackoff
|
||||
config.infoLogger = log.New(os.Stdout, "realis-info: ", log.Ltime|log.Ldate|log.LUTC)
|
||||
config.debugLogger = NoopLogger{}
|
||||
config.logger = LevelLogger{NoopLogger{}, false}
|
||||
|
||||
// Save options to recreate client if a connection error happens
|
||||
config.options = options
|
||||
|
||||
// Override default configs where necessary
|
||||
|
@ -239,7 +242,12 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
|
|||
opt(config)
|
||||
}
|
||||
|
||||
config.infoLogger.Println("Number of options applied to config: ", len(options))
|
||||
config.logger.Println("Number of options applied to config: ", len(options))
|
||||
|
||||
// Set a logger if debug has been set to true but no logger has been set
|
||||
if config.logger == nil && config.debug {
|
||||
config.logger = log.New(os.Stdout, "realis: ", log.Ltime|log.Ldate|log.LUTC)
|
||||
}
|
||||
|
||||
//Set default Transport to JSON if needed.
|
||||
if !config.jsonTransport && !config.binTransport {
|
||||
|
@ -257,10 +265,10 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
|
|||
if err != nil {
|
||||
return nil, NewTemporaryError(errors.Wrap(err, "LeaderFromZK error"))
|
||||
}
|
||||
config.infoLogger.Println("Scheduler URL from ZK: ", url)
|
||||
config.logger.Println("Scheduler URL from ZK: ", url)
|
||||
} else if config.url != "" {
|
||||
url = config.url
|
||||
config.infoLogger.Println("Scheduler URL: ", url)
|
||||
config.logger.Println("Scheduler URL: ", url)
|
||||
} else {
|
||||
return nil, errors.New("Incomplete Options -- url or cluster required")
|
||||
}
|
||||
|
@ -282,7 +290,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
|
|||
config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault()
|
||||
}
|
||||
|
||||
config.infoLogger.Printf("gorealis config url: %+v\n", url)
|
||||
config.logger.Printf("gorealis config url: %+v\n", url)
|
||||
|
||||
// Adding Basic Authentication.
|
||||
if config.username != "" && config.password != "" {
|
||||
|
@ -295,8 +303,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
|
|||
client: aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory),
|
||||
readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory),
|
||||
adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory),
|
||||
infoLogger: config.infoLogger,
|
||||
debugLogger: config.debugLogger,
|
||||
logger: LevelLogger{config.logger, config.debug},
|
||||
lock: &sync.Mutex{}}, nil
|
||||
}
|
||||
|
||||
|
@ -343,7 +350,7 @@ func defaultTTransport(urlstr string, timeoutms int, config *RealisConfig) (thri
|
|||
if config.certspath != "" {
|
||||
rootCAs, err := GetCerts(config.certspath)
|
||||
if err != nil {
|
||||
config.infoLogger.Println("error occured couldn't fetch certs")
|
||||
config.logger.Println("error occured couldn't fetch certs")
|
||||
return nil, err
|
||||
}
|
||||
tlsConfig.RootCAs = rootCAs
|
||||
|
@ -357,7 +364,7 @@ func defaultTTransport(urlstr string, timeoutms int, config *RealisConfig) (thri
|
|||
if config.clientkey != "" && config.clientcert != "" {
|
||||
cert, err := tls.LoadX509KeyPair(config.clientcert, config.clientkey)
|
||||
if err != nil {
|
||||
config.infoLogger.Println("error occured loading client certs and keys")
|
||||
config.logger.Println("error occured loading client certs and keys")
|
||||
return nil, err
|
||||
}
|
||||
tlsConfig.Certificates = []tls.Certificate{cert}
|
||||
|
@ -424,7 +431,7 @@ func basicAuth(username, password string) string {
|
|||
|
||||
func (r *realisClient) ReestablishConn() error {
|
||||
// Close existing connection
|
||||
r.infoLogger.Println("Re-establishing Connection to Aurora")
|
||||
r.logger.Println("Re-establishing Connection to Aurora")
|
||||
r.Close()
|
||||
|
||||
// Recreate connection from scratch using original options
|
||||
|
@ -441,7 +448,7 @@ func (r *realisClient) ReestablishConn() error {
|
|||
r.client = newClient.client
|
||||
r.readonlyClient = newClient.readonlyClient
|
||||
r.adminClient = newClient.adminClient
|
||||
r.infoLogger = newClient.infoLogger
|
||||
r.logger = newClient.logger
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -463,7 +470,7 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche
|
|||
Statuses: states,
|
||||
}
|
||||
|
||||
r.debugLogger.Printf("GetTasksWithoutConfigs Thrift Payload: %+v\n", taskQ)
|
||||
r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", taskQ)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.GetTasksWithoutConfigs(taskQ)
|
||||
|
@ -485,7 +492,7 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche
|
|||
}
|
||||
|
||||
func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error) {
|
||||
r.debugLogger.Printf("GetJobUpdateSummaries Thrift Payload: %+v\n", jobUpdateQuery)
|
||||
r.logger.DebugPrintf("GetJobUpdateSummaries Thrift Payload: %+v\n", jobUpdateQuery)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.readonlyClient.GetJobUpdateSummaries(jobUpdateQuery)
|
||||
|
@ -519,7 +526,7 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe
|
|||
|
||||
// Kill specific instances of a job.
|
||||
func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) {
|
||||
r.debugLogger.Printf("KillTasks Thrift Payload: %+v %v\n", key, instances)
|
||||
r.logger.DebugPrintf("KillTasks Thrift Payload: %+v %v\n", key, instances)
|
||||
|
||||
instanceIds := make(map[int32]bool)
|
||||
|
||||
|
@ -544,7 +551,7 @@ func (r *realisClient) RealisConfig() *RealisConfig {
|
|||
// Sends a kill message to the scheduler for all active tasks under a job.
|
||||
func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
|
||||
|
||||
r.debugLogger.Printf("KillTasks Thrift Payload: %+v\n", key)
|
||||
r.logger.DebugPrintf("KillTasks Thrift Payload: %+v\n", key)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
// Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards
|
||||
|
@ -563,7 +570,7 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
|
|||
// Use this API to create ad-hoc jobs.
|
||||
func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
|
||||
|
||||
r.debugLogger.Printf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig())
|
||||
r.logger.DebugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig())
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.CreateJob(auroraJob.JobConfig())
|
||||
|
@ -595,7 +602,7 @@ func (r *realisClient) CreateService(auroraJob Job, settings *aurora.JobUpdateSe
|
|||
}
|
||||
|
||||
func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) {
|
||||
r.debugLogger.Printf("ScheduleCronJob Thrift Payload: %+v\n", auroraJob.JobConfig())
|
||||
r.logger.DebugPrintf("ScheduleCronJob Thrift Payload: %+v\n", auroraJob.JobConfig())
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.ScheduleCronJob(auroraJob.JobConfig())
|
||||
|
@ -609,7 +616,7 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
|
|||
|
||||
func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) {
|
||||
|
||||
r.debugLogger.Printf("DescheduleCronJob Thrift Payload: %+v\n", key)
|
||||
r.logger.DebugPrintf("DescheduleCronJob Thrift Payload: %+v\n", key)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.DescheduleCronJob(key)
|
||||
|
@ -625,7 +632,7 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response,
|
|||
|
||||
func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error) {
|
||||
|
||||
r.debugLogger.Printf("StartCronJob Thrift Payload: %+v\n", key)
|
||||
r.logger.DebugPrintf("StartCronJob Thrift Payload: %+v\n", key)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.StartCronJob(key)
|
||||
|
@ -640,7 +647,7 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error
|
|||
|
||||
// Restarts specific instances specified
|
||||
func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) {
|
||||
r.debugLogger.Printf("RestartShards Thrift Payload: %+v %v\n", key, instances)
|
||||
r.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instances)
|
||||
|
||||
instanceIds := make(map[int32]bool)
|
||||
|
||||
|
@ -666,7 +673,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
|
|||
return nil, errors.Wrap(err1, "Could not retrieve relevant task instance IDs")
|
||||
}
|
||||
|
||||
r.debugLogger.Printf("RestartShards Thrift Payload: %+v %v\n", key, instanceIds)
|
||||
r.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instanceIds)
|
||||
|
||||
if len(instanceIds) > 0 {
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
|
@ -686,7 +693,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
|
|||
// Update all tasks under a job configuration. Currently gorealis doesn't support for canary deployments.
|
||||
func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) {
|
||||
|
||||
r.debugLogger.Printf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message)
|
||||
r.logger.DebugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.StartJobUpdate(updateJob.req, message)
|
||||
|
@ -701,7 +708,7 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au
|
|||
// Abort Job Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI.
|
||||
func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) {
|
||||
|
||||
r.debugLogger.Printf("AbortJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
|
||||
r.logger.DebugPrintf("AbortJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.AbortJobUpdate(&updateKey, message)
|
||||
|
@ -716,7 +723,7 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str
|
|||
//Pause Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.
|
||||
func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) {
|
||||
|
||||
r.debugLogger.Printf("PauseJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
|
||||
r.logger.DebugPrintf("PauseJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.PauseJobUpdate(updateKey, message)
|
||||
|
@ -732,7 +739,7 @@ func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st
|
|||
//Resume Paused Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.
|
||||
func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) {
|
||||
|
||||
r.debugLogger.Printf("ResumeJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
|
||||
r.logger.DebugPrintf("ResumeJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.ResumeJobUpdate(updateKey, message)
|
||||
|
@ -748,7 +755,7 @@ func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s
|
|||
//Pulse Job Update on Aurora. UpdateID is returned from StartJobUpdate or the Aurora web UI.
|
||||
func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.Response, error) {
|
||||
|
||||
r.debugLogger.Printf("PulseJobUpdate Thrift Payload: %+v\n", updateKey)
|
||||
r.logger.DebugPrintf("PulseJobUpdate Thrift Payload: %+v\n", updateKey)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.PulseJobUpdate(updateKey)
|
||||
|
@ -765,7 +772,7 @@ func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R
|
|||
// instance to scale up.
|
||||
func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) {
|
||||
|
||||
r.debugLogger.Printf("AddInstances Thrift Payload: %+v %v\n", instKey, count)
|
||||
r.logger.DebugPrintf("AddInstances Thrift Payload: %+v %v\n", instKey, count)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.AddInstances(&instKey, count)
|
||||
|
@ -803,7 +810,7 @@ func (r *realisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora
|
|||
// Get information about task including a fully hydrated task configuration object
|
||||
func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) {
|
||||
|
||||
r.debugLogger.Printf("GetTasksStatus Thrift Payload: %+v\n", query)
|
||||
r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", query)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.GetTasksStatus(query)
|
||||
|
@ -819,7 +826,7 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.S
|
|||
// Get information about task including without a task configuration object
|
||||
func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) {
|
||||
|
||||
r.debugLogger.Printf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query)
|
||||
r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.GetTasksWithoutConfigs(query)
|
||||
|
@ -847,7 +854,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
|
|||
Statuses: aurora.ACTIVE_STATES,
|
||||
}
|
||||
|
||||
r.debugLogger.Printf("GetTasksStatus Thrift Payload: %+v\n", taskQ)
|
||||
r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", taskQ)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.GetTasksStatus(taskQ)
|
||||
|
@ -873,7 +880,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
|
|||
|
||||
func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aurora.Response, error) {
|
||||
|
||||
r.debugLogger.Printf("GetJobUpdateDetails Thrift Payload: %+v\n", updateQuery)
|
||||
r.logger.DebugPrintf("GetJobUpdateDetails Thrift Payload: %+v\n", updateQuery)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.GetJobUpdateDetails(&updateQuery)
|
||||
|
@ -888,7 +895,7 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur
|
|||
|
||||
func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) {
|
||||
|
||||
r.debugLogger.Printf("RollbackJobUpdate Thrift Payload: %+v %v\n", key, message)
|
||||
r.logger.DebugPrintf("RollbackJobUpdate Thrift Payload: %+v %v\n", key, message)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.RollbackJobUpdate(&key, message)
|
||||
|
@ -917,7 +924,7 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr
|
|||
drainList.HostNames[host] = true
|
||||
}
|
||||
|
||||
r.debugLogger.Printf("DrainHosts Thrift Payload: %v\n", drainList)
|
||||
r.logger.DebugPrintf("DrainHosts Thrift Payload: %v\n", drainList)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.adminClient.DrainHosts(drainList)
|
||||
|
@ -948,7 +955,7 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror
|
|||
hostList.HostNames[host] = true
|
||||
}
|
||||
|
||||
r.debugLogger.Printf("EndMaintenance Thrift Payload: %v\n", hostList)
|
||||
r.logger.DebugPrintf("EndMaintenance Thrift Payload: %v\n", hostList)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.adminClient.EndMaintenance(hostList)
|
||||
|
@ -979,7 +986,7 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au
|
|||
hostList.HostNames[host] = true
|
||||
}
|
||||
|
||||
r.debugLogger.Printf("MaintenanceStatus Thrift Payload: %v\n", hostList)
|
||||
r.logger.DebugPrintf("MaintenanceStatus Thrift Payload: %v\n", hostList)
|
||||
|
||||
// Make thrift call. If we encounter an error sending the call, attempt to reconnect
|
||||
// and continue trying to resend command until we run out of retries.
|
||||
|
|
|
@ -17,7 +17,6 @@ package realis_test
|
|||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -42,7 +41,7 @@ func TestMain(m *testing.M) {
|
|||
r, err = realis.NewRealisClient(realis.SchedulerUrl("http://192.168.33.7:8081"),
|
||||
realis.BasicAuth("aurora", "secret"),
|
||||
realis.TimeoutMS(20000),
|
||||
realis.DebugLogger(log.New(os.Stdout, "realis-debug: ", log.Ltime|log.LUTC|log.Ldate)))
|
||||
realis.Debug())
|
||||
|
||||
if err != nil {
|
||||
fmt.Println("Please run vagrant box before running test suite")
|
||||
|
|
16
retry.go
16
retry.go
|
@ -134,7 +134,7 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro
|
|||
adjusted = Jitter(duration, backoff.Jitter)
|
||||
}
|
||||
|
||||
r.infoLogger.Printf("A retriable error occurred during thrift call, backing off for %v before retry %v\n", adjusted, curStep)
|
||||
r.logger.Printf("A retriable error occurred during thrift call, backing off for %v before retry %v\n", adjusted, curStep)
|
||||
|
||||
time.Sleep(adjusted)
|
||||
duration = time.Duration(float64(duration) * backoff.Factor)
|
||||
|
@ -149,7 +149,7 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro
|
|||
|
||||
resp, clientErr = thriftCall()
|
||||
|
||||
r.debugLogger.Printf("Aurora Thrift Call ended resp: %v clientErr: %v\n", resp, clientErr)
|
||||
r.logger.DebugPrintf("Aurora Thrift Call ended resp: %v clientErr: %v\n", resp, clientErr)
|
||||
}()
|
||||
|
||||
// Check if our thrift call is returning an error. This is a retriable event as we don't know
|
||||
|
@ -157,7 +157,7 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro
|
|||
if clientErr != nil {
|
||||
|
||||
// Print out the error to the user
|
||||
r.infoLogger.Printf("Client Error: %v\n", clientErr)
|
||||
r.logger.Printf("Client Error: %v\n", clientErr)
|
||||
|
||||
// In the future, reestablish connection should be able to check if it is actually possible
|
||||
// to make a thrift call to Aurora. For now, a reconnect should always lead to a retry.
|
||||
|
@ -180,7 +180,7 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro
|
|||
|
||||
// If the response code is transient, continue retrying
|
||||
case aurora.ResponseCode_ERROR_TRANSIENT:
|
||||
r.infoLogger.Println("Aurora replied with Transient error code, retrying")
|
||||
r.logger.Println("Aurora replied with Transient error code, retrying")
|
||||
continue
|
||||
|
||||
// Failure scenarios, these indicate a bad payload or a bad config. Stop retrying.
|
||||
|
@ -188,23 +188,23 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro
|
|||
case aurora.ResponseCode_ERROR:
|
||||
case aurora.ResponseCode_AUTH_FAILED:
|
||||
case aurora.ResponseCode_JOB_UPDATING_ERROR:
|
||||
r.debugLogger.Println("Terminal bad reply from Aurora, won't retry")
|
||||
r.logger.Println("Terminal bad reply from Aurora, won't retry")
|
||||
return nil, errors.New(response.CombineMessage(resp))
|
||||
|
||||
// The only case that should fall down to here is a WARNING response code.
|
||||
// It is currently not used as a response in the scheduler so it is unknown how to handle it.
|
||||
default:
|
||||
r.debugLogger.Printf("unhandled response code %v received from Aurora\n", responseCode)
|
||||
r.logger.DebugPrintf("unhandled response code %v received from Aurora\n", responseCode)
|
||||
return nil, errors.Errorf("unhandled response code from Aurora %v\n", responseCode.String())
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
r.debugLogger.Printf("it took %v retries to complete this operation\n", curStep)
|
||||
r.logger.DebugPrintf("it took %v retries to complete this operation\n", curStep)
|
||||
|
||||
if curStep > 1 {
|
||||
r.config.infoLogger.Printf("retried this thrift call %d time(s)", curStep)
|
||||
r.config.logger.Printf("retried this thrift call %d time(s)", curStep)
|
||||
}
|
||||
|
||||
// Provide more information to the user wherever possible.
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue