Compare commits

...
Sign in to create a new pull request.

15 commits

Author SHA1 Message Date
Renan DelValle
82ed77b7c6
Removing option to override schema and ports for information found on Zookeeper. 2018-06-15 14:59:22 -07:00
Renan DelValle
d64a91784a
Turn on logging by default. 2018-04-16 17:54:41 -07:00
Renan DelValle
49e3194ba0
Bug fix: Logger was being set to NOOP despite no logger being provided when debug mode is turned on. 2018-04-16 16:54:53 -07:00
Renan DelValle
7b1f51a747
Adding port override back in. 2018-04-12 18:11:07 -07:00
Renan DelValle
7f104dce70
Changing code comments to reflect getting rid of port override. 2018-04-12 15:22:25 -07:00
Renan DelValle
0dec820951
Removing port override as it is not needed 2018-04-12 15:16:46 -07:00
Renan DelValle
70dfb02169
Minor changes to demonstrate how a logger can be used in conjunction to debug mode. 2018-04-12 12:26:54 -07:00
Renan DelValle
7662277025
Changing the logging paradigm to only require a single logger. All logging will be disabled by default. If debug is enabled, and a logger has not been set, the library will default to printing all logging (INFO and DEBUG) to the stdout. 2018-04-12 12:01:46 -07:00
Renan DelValle
256ec2ea47
Removing a leftover helper function from before we changed how we configured the client. 2018-04-12 12:01:45 -07:00
Renan DelValle
3c1c1f79b8
Removing another superflous debug statement. 2018-04-12 12:01:45 -07:00
Renan DelValle
bfd9e985c2
Changing %v to %+v for composite structs. Removing a repetitive statement for the Aurora return code. 2018-04-12 12:01:45 -07:00
Renan DelValle
65fcb59879
Making Mutex a pointer so that there's no chance it can accidentally be copied. 2018-04-12 12:01:44 -07:00
Renan DelValle
8d5a2d2414
Removing OK Aurora acknowledgment. 2018-04-12 12:01:44 -07:00
Renan DelValle
69442d5957
Adding a debug logger that is turned off by default.
Info logger is enabled by default but prints out less information.
2018-04-12 12:01:44 -07:00
Renan DelValle
7152f568fe
Fixing possible race condition when passing backoff around as a pointer. 2018-04-12 12:01:43 -07:00
7 changed files with 193 additions and 76 deletions

View file

@ -24,8 +24,6 @@ import (
"strings"
"log"
"github.com/paypal/gorealis"
"github.com/paypal/gorealis/gen-go/apache/aurora"
"github.com/paypal/gorealis/response"
@ -85,13 +83,13 @@ func main() {
realis.BasicAuth(username, password),
realis.ThriftJSON(),
realis.TimeoutMS(CONNECTION_TIMEOUT),
realis.BackOff(&realis.Backoff{
realis.BackOff(realis.Backoff{
Steps: 2,
Duration: 10 * time.Second,
Factor: 2.0,
Jitter: 0.1,
}),
realis.SetLogger(log.New(os.Stdout, "realis-debug: ", log.Ldate)),
realis.Debug(),
}
//check if zkUrl is available.
@ -432,8 +430,8 @@ func main() {
case "pauseJobUpdate":
resp, err := r.PauseJobUpdate(&aurora.JobUpdateKey{
Job: job.JobKey(),
ID: updateId,
}, "")
ID: updateId,
}, "")
if err != nil {
fmt.Println(err)
@ -443,7 +441,7 @@ func main() {
case "resumeJobUpdate":
resp, err := r.ResumeJobUpdate(&aurora.JobUpdateKey{
Job: job.JobKey(),
ID: updateId,
ID: updateId,
}, "")
if err != nil {
@ -454,8 +452,8 @@ func main() {
case "pulseJobUpdate":
resp, err := r.PulseJobUpdate(&aurora.JobUpdateKey{
Job: job.JobKey(),
ID: updateId,
})
ID: updateId,
})
if err != nil {
fmt.Println(err)
}

View file

@ -27,3 +27,29 @@ func (NoopLogger) Printf(format string, a ...interface{}) {}
func (NoopLogger) Print(a ...interface{}) {}
func (NoopLogger) Println(a ...interface{}) {}
type LevelLogger struct {
Logger
debug bool
}
func (l LevelLogger) DebugPrintf(format string, a ...interface{}) {
if l.debug {
l.Print("[DEBUG] ")
l.Printf(format, a)
}
}
func (l LevelLogger) DebugPrint(a ...interface{}) {
if l.debug {
l.Print("[DEBUG] ")
l.Print(a)
}
}
func (l LevelLogger) DebugPrintln(a ...interface{}) {
if l.debug {
l.Print("[DEBUG] ")
l.Println(a)
}
}

View file

@ -69,7 +69,7 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout
// if we encounter an inactive state and it is not at rolled forward, update failed
switch status {
case aurora.JobUpdateStatus_ROLLED_FORWARD:
m.Client.RealisConfig().logger.Println("Update succeded")
m.Client.RealisConfig().logger.Println("Update succeeded")
return true, nil
case aurora.JobUpdateStatus_FAILED:
m.Client.RealisConfig().logger.Println("Update failed")

140
realis.go
View file

@ -21,12 +21,14 @@ import (
"encoding/base64"
"fmt"
"io/ioutil"
"log"
"net/http"
"net/http/cookiejar"
"os"
"path/filepath"
"time"
"strings"
"sync"
"time"
"git.apache.org/thrift.git/lib/go/thrift"
"github.com/paypal/gorealis/gen-go/apache/aurora"
@ -80,8 +82,9 @@ type realisClient struct {
client *aurora.AuroraSchedulerManagerClient
readonlyClient *aurora.ReadOnlySchedulerClient
adminClient *aurora.AuroraAdminClient
logger Logger
lock sync.Mutex
logger LevelLogger
lock *sync.Mutex
debug bool
}
type RealisConfig struct {
@ -90,7 +93,7 @@ type RealisConfig struct {
timeoutms int
binTransport, jsonTransport bool
cluster *Cluster
backoff *Backoff
backoff Backoff
transport thrift.TTransport
protoFactory thrift.TProtocolFactory
logger Logger
@ -98,6 +101,8 @@ type RealisConfig struct {
certspath string
clientkey, clientcert string
options []ClientOption
debug bool
zkOptions []ZKOpt
}
var defaultBackoff = Backoff{
@ -136,12 +141,19 @@ func ZKCluster(cluster *Cluster) ClientOption {
}
func ZKUrl(url string) ClientOption {
opts := []ZKOpt{ZKEndpoints(strings.Split(url, ",")...), ZKPath("/aurora/scheduler")}
return func(config *RealisConfig) {
config.cluster = GetDefaultClusterFromZKUrl(url)
if config.zkOptions == nil {
config.zkOptions = opts
} else {
config.zkOptions = append(config.zkOptions, opts...)
}
}
}
func Retries(backoff *Backoff) ClientOption {
func Retries(backoff Backoff) ClientOption {
return func(config *RealisConfig) {
config.backoff = backoff
}
@ -159,7 +171,7 @@ func ThriftBinary() ClientOption {
}
}
func BackOff(b *Backoff) ClientOption {
func BackOff(b Backoff) ClientOption {
return func(config *RealisConfig) {
config.backoff = b
}
@ -183,13 +195,29 @@ func ClientCerts(clientKey, clientCert string) ClientOption {
}
}
// Using the word set to avoid name collision with Interface
// Use this option if you'd like to override default settings for connecting to Zookeeper.
// For example, this can be used to override the scheme to be used for communicating with Aurora (e.g. https).
// See zk.go for what is possible to set as an option.
func ZookeeperOptions(opts ...ZKOpt) ClientOption {
return func(config *RealisConfig) {
config.zkOptions = opts
}
}
// Using the word set to avoid name collision with Interface.
func SetLogger(l Logger) ClientOption {
return func(config *RealisConfig) {
config.logger = l
}
}
// Enable debug statements.
func Debug() ClientOption {
return func(config *RealisConfig) {
config.debug = true
}
}
func newTJSONTransport(url string, timeout int, config *RealisConfig) (thrift.TTransport, error) {
trans, err := defaultTTransport(url, timeout, config)
if err != nil {
@ -220,8 +248,10 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
// Default configs
config.timeoutms = 10000
config.backoff = &defaultBackoff
config.logger = NoopLogger{}
config.backoff = defaultBackoff
config.logger = log.New(os.Stdout, "realis: ", log.Ltime|log.Ldate|log.LUTC)
// Save options to recreate client if a connection error happens
config.options = options
// Override default configs where necessary
@ -229,6 +259,11 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
opt(config)
}
// Turn off all logging (including debug)
if config.logger == nil {
config.logger = LevelLogger{NoopLogger{}, false}
}
config.logger.Println("Number of options applied to config: ", len(options))
//Set default Transport to JSON if needed.
@ -239,9 +274,16 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
var url string
var err error
// Determine how to get information to connect to the scheduler.
// Prioritize getting leader from ZK over using a direct URL.
if config.cluster != nil {
// Find the leader using custom Zookeeper options if options are provided
if config.zkOptions != nil {
url, err = LeaderFromZKOpts(config.zkOptions...)
if err != nil {
return nil, NewTemporaryError(errors.Wrap(err, "LeaderFromZK error"))
}
config.logger.Println("Scheduler URL from ZK: ", url)
} else if config.cluster != nil {
// Determine how to get information to connect to the scheduler.
// Prioritize getting leader from ZK over using a direct URL.
url, err = LeaderFromZK(*config.cluster)
// If ZK is configured, throw an error if the leader is unable to be determined
if err != nil {
@ -252,7 +294,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
url = config.url
config.logger.Println("Scheduler URL: ", url)
} else {
return nil, errors.New("Incomplete Options -- url or cluster required")
return nil, errors.New("Incomplete Options -- url, cluster.json, or Zookeeper address required")
}
if config.jsonTransport {
@ -274,9 +316,10 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
config.logger.Printf("gorealis config url: %+v\n", url)
//Basic Authentication.
// Adding Basic Authentication.
if config.username != "" && config.password != "" {
AddBasicAuth(config, config.username, config.password)
httpTrans := (config.transport).(*thrift.THttpClient)
httpTrans.SetHeader("Authorization", "Basic "+basicAuth(config.username, config.password))
}
return &realisClient{
@ -284,7 +327,8 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
client: aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory),
readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory),
adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory),
logger: config.logger}, nil
logger: LevelLogger{config.logger, config.debug},
lock: &sync.Mutex{}}, nil
}
func GetDefaultClusterFromZKUrl(zkurl string) *Cluster {
@ -404,14 +448,6 @@ func newTBinaryConfig(url string, timeoutms int, config *RealisConfig) (*RealisC
}
// Helper function to add basic authorization needed to communicate with Apache Aurora.
func AddBasicAuth(config *RealisConfig, username string, password string) {
config.username = username
config.password = password
httpTrans := (config.transport).(*thrift.THttpClient)
httpTrans.SetHeader("Authorization", "Basic "+basicAuth(username, password))
}
func basicAuth(username, password string) string {
auth := username + ":" + password
return base64.StdEncoding.EncodeToString([]byte(auth))
@ -458,6 +494,8 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche
Statuses: states,
}
r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", taskQ)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.GetTasksWithoutConfigs(taskQ)
})
@ -478,6 +516,8 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche
}
func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error) {
r.logger.DebugPrintf("GetJobUpdateSummaries Thrift Payload: %+v\n", jobUpdateQuery)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.readonlyClient.GetJobUpdateSummaries(jobUpdateQuery)
})
@ -510,6 +550,7 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe
// Kill specific instances of a job.
func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) {
r.logger.DebugPrintf("KillTasks Thrift Payload: %+v %v\n", key, instances)
instanceIds := make(map[int32]bool)
@ -534,6 +575,8 @@ func (r *realisClient) RealisConfig() *RealisConfig {
// Sends a kill message to the scheduler for all active tasks under a job.
func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
r.logger.DebugPrintf("KillTasks Thrift Payload: %+v\n", key)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
// Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards
return r.client.KillTasks(key, nil, "")
@ -551,6 +594,8 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
// Use this API to create ad-hoc jobs.
func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
r.logger.DebugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig())
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.CreateJob(auroraJob.JobConfig())
})
@ -570,17 +615,18 @@ func (r *realisClient) CreateService(auroraJob Job, settings *aurora.JobUpdateSe
resp, err := r.StartJobUpdate(update, "")
if err != nil {
return resp, nil, errors.Wrap(err, "unable to create service")
return nil, nil, errors.Wrap(err, "unable to create service")
}
if resp != nil && resp.GetResult_() != nil {
return resp, resp.GetResult_().GetStartJobUpdateResult_(), nil
}
return resp, nil, errors.New("results object is nil")
return nil, nil, errors.New("results object is nil")
}
func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) {
r.logger.DebugPrintf("ScheduleCronJob Thrift Payload: %+v\n", auroraJob.JobConfig())
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.ScheduleCronJob(auroraJob.JobConfig())
@ -594,6 +640,8 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) {
r.logger.DebugPrintf("DescheduleCronJob Thrift Payload: %+v\n", key)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.DescheduleCronJob(key)
})
@ -608,6 +656,8 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response,
func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error) {
r.logger.DebugPrintf("StartCronJob Thrift Payload: %+v\n", key)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.StartCronJob(key)
})
@ -621,6 +671,8 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error
// Restarts specific instances specified
func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) {
r.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instances)
instanceIds := make(map[int32]bool)
for _, instId := range instances {
@ -645,6 +697,8 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
return nil, errors.Wrap(err1, "Could not retrieve relevant task instance IDs")
}
r.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instanceIds)
if len(instanceIds) > 0 {
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.RestartShards(key, instanceIds)
@ -663,6 +717,8 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
// Update all tasks under a job configuration. Currently gorealis doesn't support for canary deployments.
func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) {
r.logger.DebugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.StartJobUpdate(updateJob.req, message)
})
@ -676,6 +732,8 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au
// Abort Job Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI.
func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) {
r.logger.DebugPrintf("AbortJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.AbortJobUpdate(&updateKey, message)
})
@ -689,6 +747,8 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str
//Pause Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.
func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) {
r.logger.DebugPrintf("PauseJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.PauseJobUpdate(updateKey, message)
})
@ -703,6 +763,8 @@ func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st
//Resume Paused Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.
func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) {
r.logger.DebugPrintf("ResumeJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.ResumeJobUpdate(updateKey, message)
})
@ -717,6 +779,8 @@ func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s
//Pulse Job Update on Aurora. UpdateID is returned from StartJobUpdate or the Aurora web UI.
func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.Response, error) {
r.logger.DebugPrintf("PulseJobUpdate Thrift Payload: %+v\n", updateKey)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.PulseJobUpdate(updateKey)
})
@ -732,6 +796,8 @@ func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R
// instance to scale up.
func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) {
r.logger.DebugPrintf("AddInstances Thrift Payload: %+v %v\n", instKey, count)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.AddInstances(&instKey, count)
})
@ -768,6 +834,8 @@ func (r *realisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora
// Get information about task including a fully hydrated task configuration object
func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) {
r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", query)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.GetTasksStatus(query)
})
@ -782,6 +850,8 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.S
// Get information about task including without a task configuration object
func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) {
r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.GetTasksWithoutConfigs(query)
})
@ -808,6 +878,8 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
Statuses: aurora.ACTIVE_STATES,
}
r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", taskQ)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.GetTasksStatus(taskQ)
})
@ -832,6 +904,8 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aurora.Response, error) {
r.logger.DebugPrintf("GetJobUpdateDetails Thrift Payload: %+v\n", updateQuery)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.GetJobUpdateDetails(&updateQuery)
})
@ -845,6 +919,8 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur
func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) {
r.logger.DebugPrintf("RollbackJobUpdate Thrift Payload: %+v %v\n", key, message)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.RollbackJobUpdate(&key, message)
})
@ -872,6 +948,8 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr
drainList.HostNames[host] = true
}
r.logger.DebugPrintf("DrainHosts Thrift Payload: %v\n", drainList)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.DrainHosts(drainList)
})
@ -901,6 +979,8 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror
hostList.HostNames[host] = true
}
r.logger.DebugPrintf("EndMaintenance Thrift Payload: %v\n", hostList)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.EndMaintenance(hostList)
})
@ -930,6 +1010,8 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au
hostList.HostNames[host] = true
}
r.logger.DebugPrintf("MaintenanceStatus Thrift Payload: %v\n", hostList)
// Make thrift call. If we encounter an error sending the call, attempt to reconnect
// and continue trying to resend command until we run out of retries.
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
@ -977,7 +1059,7 @@ func (r *realisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb
func (r *realisClient) GetQuota(role string) (*aurora.Response, error) {
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
resp, retryErr :=r.adminClient.GetQuota(role)
resp, retryErr := r.adminClient.GetQuota(role)
if retryErr != nil {
return nil, errors.Wrap(retryErr, "Unable to get role quota")

View file

@ -40,7 +40,9 @@ func TestMain(m *testing.M) {
// New configuration to connect to Vagrant image
r, err = realis.NewRealisClient(realis.SchedulerUrl("http://192.168.33.7:8081"),
realis.BasicAuth("aurora", "secret"),
realis.TimeoutMS(20000))
realis.TimeoutMS(20000),
realis.Debug())
if err != nil {
fmt.Println("Please run vagrant box before running test suite")
os.Exit(1)
@ -61,7 +63,7 @@ func TestMain(m *testing.M) {
}
func TestNonExistentEndpoint(t *testing.T) {
backoff := &realis.Backoff{ // Reduce penalties for this test to make it quick
backoff := realis.Backoff{ // Reduce penalties for this test to make it quick
Steps: 5,
Duration: 1 * time.Second,
Factor: 1.0,

View file

@ -134,7 +134,7 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro
adjusted = Jitter(duration, backoff.Jitter)
}
r.logger.Printf("A retriable error occurred during thrift call, backing off for %v before retrying\n", adjusted)
r.logger.Printf("A retriable error occurred during thrift call, backing off for %v before retry %v\n", adjusted, curStep)
time.Sleep(adjusted)
duration = time.Duration(float64(duration) * backoff.Factor)
@ -146,7 +146,10 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro
func() {
r.lock.Lock()
defer r.lock.Unlock()
resp, clientErr = thriftCall()
r.logger.DebugPrintf("Aurora Thrift Call ended resp: %v clientErr: %v\n", resp, clientErr)
}()
// Check if our thrift call is returning an error. This is a retriable event as we don't know
@ -154,48 +157,52 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro
if clientErr != nil {
// Print out the error to the user
r.logger.Println(clientErr)
r.ReestablishConn()
r.logger.Printf("Client Error: %v\n", clientErr)
// In the future, reestablish connection should be able to check if it is actually possible
// to make a thrift call to Aurora. For now, a reconnect should always lead to a retry.
continue
}
r.ReestablishConn()
// If there was no client error, but the response is nil, something went wrong.
// Ideally, we'll never encounter this but we're placing a safeguard here.
if resp == nil {
return nil, errors.New("Response from aurora is nil")
}
} else {
// Check Response Code from thrift and make a decision to continue retrying or not.
switch responseCode := resp.GetResponseCode(); responseCode {
// If there was no client error, but the response is nil, something went wrong.
// Ideally, we'll never encounter this but we're placing a safeguard here.
if resp == nil {
return nil, errors.New("Response from aurora is nil")
}
// If the thrift call succeeded, stop retrying
case aurora.ResponseCode_OK:
return resp, nil
// Check Response Code from thrift and make a decision to continue retrying or not.
switch responseCode := resp.GetResponseCode(); responseCode {
// If the response code is transient, continue retrying
case aurora.ResponseCode_ERROR_TRANSIENT:
r.logger.Println("Aurora replied with Transient error code, retrying")
continue
// If the thrift call succeeded, stop retrying
case aurora.ResponseCode_OK:
return resp, nil
// Failure scenarios, these indicate a bad payload or a bad config. Stop retrying.
case aurora.ResponseCode_INVALID_REQUEST:
case aurora.ResponseCode_ERROR:
case aurora.ResponseCode_AUTH_FAILED:
case aurora.ResponseCode_JOB_UPDATING_ERROR:
return nil, errors.New(response.CombineMessage(resp))
// If the response code is transient, continue retrying
case aurora.ResponseCode_ERROR_TRANSIENT:
r.logger.Println("Aurora replied with Transient error code, retrying")
continue
// The only case that should fall down to here is a WARNING response code.
// It is currently not used as a response in the scheduler so it is unknown how to handle it.
default:
return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String())
// Failure scenarios, these indicate a bad payload or a bad config. Stop retrying.
case aurora.ResponseCode_INVALID_REQUEST:
case aurora.ResponseCode_ERROR:
case aurora.ResponseCode_AUTH_FAILED:
case aurora.ResponseCode_JOB_UPDATING_ERROR:
r.logger.Println("Terminal bad reply from Aurora, won't retry")
return nil, errors.New(response.CombineMessage(resp))
// The only case that should fall down to here is a WARNING response code.
// It is currently not used as a response in the scheduler so it is unknown how to handle it.
default:
r.logger.DebugPrintf("unhandled response code %v received from Aurora\n", responseCode)
return nil, errors.Errorf("unhandled response code from Aurora %v\n", responseCode.String())
}
}
}
r.logger.DebugPrintf("it took %v retries to complete this operation\n", curStep)
if curStep > 1 {
r.config.logger.Printf("retried this thrift call %d time(s)", curStep)
}

12
zk.go
View file

@ -36,11 +36,13 @@ type ServiceInstance struct {
}
type zkConfig struct {
endpoints []string
path string
backoff Backoff
timeout time.Duration
logger Logger
endpoints []string
path string
backoff Backoff
timeout time.Duration
logger Logger
auroraSchemeOverride *string
auroraPortOverride *int
}
type ZKOpt func(z *zkConfig)