From df8fc2fba13a75ee79efdd395d96d46a56971ebb Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Wed, 12 Jun 2019 11:22:59 -0700 Subject: [PATCH] Documentation and linting improvements (#108) * Simplifying documentation for getting started: Removed outdated information about install Golang on different platforms and instead included a link to the official Golang website which has more up to date information. Instructions for installing docker-compose have also been added. * Added documentation to all exported functions and structs. * Unexported some structures and functions that were needlessly exported. * Adding golang CI default configuration which can be useful while developing and may be turned on later in the CI. * Moving build process in CI to xenial. * Reducing line size. in some files and shadowing in some test cases. --- .golangci.yml | 71 +++++++++++++ .travis.yml | 1 + clusters.go | 5 +- container.go | 17 +++- docs/getting-started.md | 92 ++--------------- docs/using-the-sample-client.md | 2 +- errors.go | 21 ++-- job.go | 65 +++++++----- logger.go | 24 +++-- monitors.go | 15 ++- realis.go | 170 +++++++++++++++++--------------- realis_admin.go | 10 +- realis_e2e_test.go | 22 ++--- retry.go | 42 ++++---- updatejob.go | 21 ++-- util.go | 11 +++ zk.go | 27 +++-- zk_test.go | 5 +- 18 files changed, 347 insertions(+), 274 deletions(-) create mode 100644 .golangci.yml diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000..dd37918 --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,71 @@ +# This file contains all available configuration options +# with their default values. + +# options for analysis running +run: + # default concurrency is a available CPU number + concurrency: 4 + + # timeout for analysis, e.g. 30s, 5m, default is 1m + deadline: 1m + + # exit code when at least one issue was found, default is 1 + issues-exit-code: 1 + + # include test files or not, default is true + tests: true + + skip-dirs: + - gen-go/ + +# output configuration options +output: + # colored-line-number|line-number|json|tab|checkstyle|code-climate, default is "colored-line-number" + format: colored-line-number + + # print lines of code with issue, default is true + print-issued-lines: true + + # print linter name in the end of issue text, default is true + print-linter-name: true + + +# all available settings of specific linters +linters-settings: + errcheck: + # report about not checking of errors in type assetions: `a := b.(MyStruct)`; + # default is false: such cases aren't reported by default. + check-type-assertions: true + + # report about assignment of errors to blank identifier: `num, _ := strconv.Atoi(numStr)`; + # default is false: such cases aren't reported by default. + check-blank: true + govet: + # report about shadowed variables + check-shadowing: true + goconst: + # minimal length of string constant, 3 by default + min-len: 3 + # minimal occurrences count to trigger, 3 by default + min-occurrences: 2 + misspell: + # Correct spellings using locale preferences for US or UK. + # Default is to use a neutral variety of English. + # Setting locale to US will correct the British spelling of 'colour' to 'color'. + locale: US + lll: + # max line length, lines longer will be reported. Default is 120. + # '\t' is counted as 1 character by default, and can be changed with the tab-width option + line-length: 120 + # tab width in spaces. Default to 1. + tab-width: 4 + +linters: + enable: + - govet + - goimports + - golint + - lll + - goconst + enable-all: false + fast: false diff --git a/.travis.yml b/.travis.yml index 83cbcca..4f325ed 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,5 +1,6 @@ sudo: required +dist: xenial language: go branches: diff --git a/clusters.go b/clusters.go index 0d20db5..339dc38 100644 --- a/clusters.go +++ b/clusters.go @@ -21,6 +21,8 @@ import ( "github.com/pkg/errors" ) +// Cluster contains the definition of the clusters.json file used by the default Aurora +// client for configuration type Cluster struct { Name string `json:"name"` AgentRoot string `json:"slave_root"` @@ -33,7 +35,8 @@ type Cluster struct { AuthMechanism string `json:"auth_mechanism"` } -// Loads clusters.json file traditionally located at /etc/aurora/clusters.json +// LoadClusters loads clusters.json file traditionally located at /etc/aurora/clusters.json +// for use with a gorealis client func LoadClusters(config string) (map[string]Cluster, error) { file, err := os.Open(config) diff --git a/container.go b/container.go index 5735ec8..7c51d2f 100644 --- a/container.go +++ b/container.go @@ -18,31 +18,40 @@ import ( "github.com/paypal/gorealis/gen-go/apache/aurora" ) +// Container is an interface that defines a single function needed to create +// an Aurora container type. It exists because the code must support both Mesos +// and Docker containers. type Container interface { Build() *aurora.Container } +// MesosContainer is a Mesos style container that can be used by Aurora Jobs. type MesosContainer struct { container *aurora.MesosContainer } +// DockerContainer is a vanilla Docker style container that can be used by Aurora Jobs. type DockerContainer struct { container *aurora.DockerContainer } +// NewDockerContainer creates a new Aurora compatible Docker container configuration. func NewDockerContainer() DockerContainer { return DockerContainer{container: aurora.NewDockerContainer()} } +// Build creates an Aurora container based upon the configuration provided. func (c DockerContainer) Build() *aurora.Container { return &aurora.Container{Docker: c.container} } +// Image adds the name of a Docker image to be used by the Job when running. func (c DockerContainer) Image(image string) DockerContainer { c.container.Image = image return c } +// AddParameter adds a parameter to be passed to Docker when the container is run. func (c DockerContainer) AddParameter(name, value string) DockerContainer { c.container.Parameters = append(c.container.Parameters, &aurora.DockerParameter{ Name: name, @@ -51,14 +60,17 @@ func (c DockerContainer) AddParameter(name, value string) DockerContainer { return c } +// NewMesosContainer creates a Mesos style container to be configured and built for use by an Aurora Job. func NewMesosContainer() MesosContainer { return MesosContainer{container: aurora.NewMesosContainer()} } +// Build creates a Mesos style Aurora container configuration to be passed on to the Aurora Job. func (c MesosContainer) Build() *aurora.Container { return &aurora.Container{Mesos: c.container} } +// DockerImage configures the Mesos container to use a specific Docker image when being run. func (c MesosContainer) DockerImage(name, tag string) MesosContainer { if c.container.Image == nil { c.container.Image = aurora.NewImage() @@ -68,11 +80,12 @@ func (c MesosContainer) DockerImage(name, tag string) MesosContainer { return c } -func (c MesosContainer) AppcImage(name, imageId string) MesosContainer { +// AppcImage configures the Mesos container to use an image in the Appc format to run the container. +func (c MesosContainer) AppcImage(name, imageID string) MesosContainer { if c.container.Image == nil { c.container.Image = aurora.NewImage() } - c.container.Image.Appc = &aurora.AppcImage{Name: name, ImageId: imageId} + c.container.Image.Appc = &aurora.AppcImage{Name: name, ImageId: imageID} return c } diff --git a/docs/getting-started.md b/docs/getting-started.md index b16ac3f..d45477c 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -88,12 +88,6 @@ On Ubuntu, restarting the aurora-scheduler can be achieved by running the follow $ sudo service aurora-scheduler restart ``` -### Using a custom client -Pystachio does not yet support launching tasks using custom executors. Therefore, a custom -client must be used in order to launch tasks using a custom executor. In this case, -we will be using [gorealis](https://github.com/paypal/gorealis) to launch a task with -the compose executor on Aurora. - ## Using [dce-go](https://github.com/paypal/dce-go) Instead of manually configuring Aurora to run the docker-compose executor, one can follow the instructions provided [here](https://github.com/paypal/dce-go/blob/develop/docs/environment.md) to quickly create a DCE environment that would include mesos, aurora, golang1.7, docker, docker-compose and DCE installed. @@ -107,80 +101,12 @@ Mesos endpoint --> http://192.168.33.8:5050 ### Installing Go -#### Linux +Follow the instructions at the official golang website: [golang.org/doc/install](https://golang.org/doc/install) -##### Ubuntu +### Installing docker-compose -###### Adding a PPA and install via apt-get -``` -$ sudo add-apt-repository ppa:ubuntu-lxc/lxd-stable -$ sudo apt-get update -$ sudo apt-get install golang -``` - -###### Configuring the GOPATH - -Configure the environment to be able to compile and run Go code. -``` -$ mkdir $HOME/go -$ echo export GOPATH=$HOME/go >> $HOME/.bashrc -$ echo export GOROOT=/usr/lib/go >> $HOME/.bashrc -$ echo export PATH=$PATH:$GOPATH/bin >> $HOME/.bashrc -$ echo export PATH=$PATH:$GOROOT/bin >> $HOME/.bashrc -``` - -Finally we must reload the .bashrc configuration: -``` -$ source $HOME/.bashrc -``` - -#### OS X - -One way to install go on OS X is by using [Homebrew](http://brew.sh/) - -##### Installing Homebrew -Run the following command from the terminal to install Hombrew: -``` -$ /usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)" -``` - -##### Installing Go using Hombrew - -Run the following command from the terminal to install Go: -``` -$ brew install go -``` - -##### Configuring the GOPATH - -Configure the environment to be able to compile and run Go code. -``` -$ mkdir $HOME/go -$ echo export GOPATH=$HOME/go >> $HOME/.profile -$ echo export GOROOT=/usr/local/opt/go/libexec >> $HOME/.profile -$ echo export PATH=$PATH:$GOPATH/bin >> $HOME/.profile -$ echo export PATH=$PATH:$GOROOT/bin >> $HOME/.profile -``` - -Finally we must reload the .profile configuration: -``` -$ source $HOME/.profile -``` - -#### Windows - -Download and run the msi installer from https://golang.org/dl/ - -## Installing Docker Compose (if manually configured Aurora) -To show Aurora's new multi executor feature, we need to use at least one custom executor. -In this case we will be using the [docker-compose-executor](https://github.com/mesos/docker-compose-executor). - -In order to run the docker-compose executor, each agent must have docker-compose installed on it. - -This can be done using pip: -``` -$ sudo pip install docker-compose -``` +Agents which will run dce-go will need docker-compose in order to sucessfully run the executor. +Instructions for installing docker-compose on various platforms may be found on Docker's webiste: [docs.docker.com/compose/install/](https://docs.docker.com/compose/install/) ## Downloading gorealis Finally, we must get `gorealis` using the `go get` command: @@ -192,7 +118,7 @@ go get github.com/paypal/gorealis # Creating Aurora Jobs ## Creating a thermos job -To demonstrate that we are able to run jobs using different executors on the +To demonstrate that we are able to run jobs using different executors on the same scheduler, we'll first launch a thermos job using the default Aurora Client. We can use a sample job for this: @@ -259,8 +185,8 @@ go run $GOPATH/src/github.com/paypal/gorealis/examples/client.go -executor=compo ``` If everything went according to plan, a new job will be shown in the Aurora UI. -We can further investigate inside the Mesos task sandbox. Inside the sandbox, under -the sample-app folder, we can find a docker-compose.yml-generated.yml. If we inspect this file, +We can further investigate inside the Mesos task sandbox. Inside the sandbox, under +the sample-app folder, we can find a docker-compose.yml-generated.yml. If we inspect this file, we can find the port at which we can find the web server we launched. Under Web->Ports, we find the port Mesos allocated. We can then navigate to: @@ -269,10 +195,10 @@ Under Web->Ports, we find the port Mesos allocated. We can then navigate to: A message from the executor should greet us. ## Creating a Thermos job using gorealis -It is also possible to create a thermos job using gorealis. To do this, however, +It is also possible to create a thermos job using gorealis. To do this, however, a thermos payload is required. A thermos payload consists of a JSON blob that details the entire task as it exists inside the Aurora Scheduler. *Creating the blob is unfortunately -out of the scope of what gorealis does*, so a thermos payload must be generated beforehand or +out of the scope of what gorealis does*, so a thermos payload must be generated beforehand or retrieved from the structdump of an existing task for testing purposes. A sample thermos JSON payload may be found [here](../examples/thermos_payload.json) in the examples folder. diff --git a/docs/using-the-sample-client.md b/docs/using-the-sample-client.md index f2de6d9..7e20455 100644 --- a/docs/using-the-sample-client.md +++ b/docs/using-the-sample-client.md @@ -1,6 +1,6 @@ # Using the Sample client -## Usage: +## Usage: ``` Usage of ./client: -cluster string diff --git a/errors.go b/errors.go index 7411a5e..d865aea 100644 --- a/errors.go +++ b/errors.go @@ -23,6 +23,8 @@ type timeout interface { Timedout() bool } +// IsTimeout returns true if the error being passed as an argument implements the Timeout interface +// and the Timedout function returns true. func IsTimeout(err error) bool { temp, ok := err.(timeout) return ok && temp.Timedout() @@ -61,41 +63,42 @@ func (r *retryErr) RetryCount() int { return r.retryCount } -// Helper function for testing verification to avoid whitebox testing +// ToRetryCount is a helper function for testing verification to avoid whitebox testing // as well as keeping retryErr as a private. // Should NOT be used under any other context. func ToRetryCount(err error) *retryErr { if retryErr, ok := err.(*retryErr); ok { return retryErr - } else { - return nil } + return nil } func newRetryError(err error, retryCount int) *retryErr { return &retryErr{error: err, timedout: true, retryCount: retryCount} } -// Temporary errors indicate that the action may and should be retried. +// Temporary errors indicate that the action may or should be retried. type temporary interface { Temporary() bool } +// IsTemporary indicates whether the error passed in as an argument implements the temporary interface +// and if the Temporary function returns true. func IsTemporary(err error) bool { temp, ok := err.(temporary) return ok && temp.Temporary() } -type TemporaryErr struct { +type temporaryErr struct { error temporary bool } -func (t *TemporaryErr) Temporary() bool { +func (t *temporaryErr) Temporary() bool { return t.temporary } -// Retrying after receiving this error is advised -func NewTemporaryError(err error) *TemporaryErr { - return &TemporaryErr{error: err, temporary: true} +// NewTemporaryError creates a new error which satisfies the Temporary interface. +func NewTemporaryError(err error) *temporaryErr { + return &temporaryErr{error: err, temporary: true} } diff --git a/job.go b/job.go index 365637e..b614f95 100644 --- a/job.go +++ b/job.go @@ -20,6 +20,9 @@ import ( "github.com/paypal/gorealis/gen-go/apache/aurora" ) +// Job inteface is used to define a set of functions an Aurora Job object +// must implemement. +// TODO(rdelvalle): Consider getting rid of the Job interface type Job interface { // Set Job Key environment. Environment(env string) Job @@ -61,24 +64,24 @@ type Job interface { SlaPolicy(policy *aurora.SlaPolicy) Job } -type ResourceType int +type resourceType int const ( - CPU ResourceType = iota + CPU resourceType = iota RAM DISK GPU ) -// Structure to collect all information pertaining to an Aurora job. +// AuroraJob is a structure to collect all information pertaining to an Aurora job. type AuroraJob struct { jobConfig *aurora.JobConfiguration - resources map[ResourceType]*aurora.Resource + resources map[resourceType]*aurora.Resource metadata map[string]*aurora.Metadata portCount int } -// Create a Job object with everything initialized. +// NewJob is used to create a Job object with everything initialized. func NewJob() Job { jobConfig := aurora.NewJobConfiguration() taskConfig := aurora.NewTaskConfig() @@ -98,7 +101,7 @@ func NewJob() Job { ramMb := aurora.NewResource() diskMb := aurora.NewResource() - resources := map[ResourceType]*aurora.Resource{CPU: numCpus, RAM: ramMb, DISK: diskMb} + resources := map[resourceType]*aurora.Resource{CPU: numCpus, RAM: ramMb, DISK: diskMb} taskConfig.Resources = []*aurora.Resource{numCpus, ramMb, diskMb} numCpus.NumCpus = new(float64) @@ -113,13 +116,13 @@ func NewJob() Job { } } -// Set Job Key environment. +// Environment sets the Job Key environment. func (j *AuroraJob) Environment(env string) Job { j.jobConfig.Key.Environment = env return j } -// Set Job Key Role. +// Role sets the Job Key role. func (j *AuroraJob) Role(role string) Job { j.jobConfig.Key.Role = role @@ -130,13 +133,13 @@ func (j *AuroraJob) Role(role string) Job { return j } -// Set Job Key Name. +// Name sets the Job Key Name. func (j *AuroraJob) Name(name string) Job { j.jobConfig.Key.Name = name return j } -// Set name of the executor that will the task will be configured to. +// ExecutorName sets the name of the executor that will the task will be configured to. func (j *AuroraJob) ExecutorName(name string) Job { if j.jobConfig.TaskConfig.ExecutorConfig == nil { @@ -147,7 +150,7 @@ func (j *AuroraJob) ExecutorName(name string) Job { return j } -// Will be included as part of entire task inside the scheduler that will be serialized. +// ExecutorData sets the data blob that will be passed to the Mesos executor. func (j *AuroraJob) ExecutorData(data string) Job { if j.jobConfig.TaskConfig.ExecutorConfig == nil { @@ -158,21 +161,25 @@ func (j *AuroraJob) ExecutorData(data string) Job { return j } +// CPU sets the amount of CPU each task will use in an Aurora Job. func (j *AuroraJob) CPU(cpus float64) Job { *j.resources[CPU].NumCpus = cpus return j } +// RAM sets the amount of RAM each task will use in an Aurora Job. func (j *AuroraJob) RAM(ram int64) Job { *j.resources[RAM].RamMb = ram return j } +// Disk sets the amount of Disk each task will use in an Aurora Job. func (j *AuroraJob) Disk(disk int64) Job { *j.resources[DISK].DiskMb = disk return j } +// GPU sets the amount of GPU each task will use in an Aurora Job. func (j *AuroraJob) GPU(gpu int64) Job { // GPU resource must be set explicitly since the scheduler by default // rejects jobs with GPU resources attached to it. @@ -187,54 +194,58 @@ func (j *AuroraJob) GPU(gpu int64) Job { return j } -// How many failures to tolerate before giving up. +// MaxFailure sets how many failures to tolerate before giving up per Job. func (j *AuroraJob) MaxFailure(maxFail int32) Job { j.jobConfig.TaskConfig.MaxTaskFailures = maxFail return j } -// How many instances of the job to run +// InstanceCount sets how many instances of the task to run for this Job. func (j *AuroraJob) InstanceCount(instCount int32) Job { j.jobConfig.InstanceCount = instCount return j } +// CronSchedule allows the user to configure a cron schedule for this job to run in. func (j *AuroraJob) CronSchedule(cron string) Job { j.jobConfig.CronSchedule = &cron return j } +// CronCollisionPolicy allows the user to decide what happens if two or more instances +// of the same Cron job need to run. func (j *AuroraJob) CronCollisionPolicy(policy aurora.CronCollisionPolicy) Job { j.jobConfig.CronCollisionPolicy = policy return j } -// How many instances of the job to run +// GetInstanceCount returns how many tasks this Job contains. func (j *AuroraJob) GetInstanceCount() int32 { return j.jobConfig.InstanceCount } -// Restart the job's tasks if they fail +// IsService returns true if the job is a long term running job or false if it is an ad-hoc job. func (j *AuroraJob) IsService(isService bool) Job { j.jobConfig.TaskConfig.IsService = isService return j } -// Get the current job configurations key to use for some realis calls. +// JobKey returns the job's configuration key. func (j *AuroraJob) JobKey() *aurora.JobKey { return j.jobConfig.Key } -// Get the current job configurations key to use for some realis calls. +// JobConfig returns the job's configuration. func (j *AuroraJob) JobConfig() *aurora.JobConfiguration { return j.jobConfig } +// TaskConfig returns the job's task(shard) configuration. func (j *AuroraJob) TaskConfig() *aurora.TaskConfig { return j.jobConfig.TaskConfig } -// Add a list of URIs with the same extract and cache configuration. Scheduler must have +// AddURIs adds a list of URIs with the same extract and cache configuration. Scheduler must have // --enable_mesos_fetcher flag enabled. Currently there is no duplicate detection. func (j *AuroraJob) AddURIs(extract bool, cache bool, values ...string) Job { for _, value := range values { @@ -244,7 +255,7 @@ func (j *AuroraJob) AddURIs(extract bool, cache bool, values ...string) Job { return j } -// Adds a Mesos label to the job. Note that Aurora will add the +// AddLabel adds a Mesos label to the job. Note that Aurora will add the // prefix "org.apache.aurora.metadata." to the beginning of each key. func (j *AuroraJob) AddLabel(key string, value string) Job { if _, ok := j.metadata[key]; ok { @@ -256,7 +267,7 @@ func (j *AuroraJob) AddLabel(key string, value string) Job { return j } -// Add a named port to the job configuration These are random ports as it's +// AddNamedPorts adds a named port to the job configuration These are random ports as it's // not currently possible to request specific ports using Aurora. func (j *AuroraJob) AddNamedPorts(names ...string) Job { j.portCount += len(names) @@ -269,7 +280,7 @@ func (j *AuroraJob) AddNamedPorts(names ...string) Job { return j } -// Adds a request for a number of ports to the job configuration. The names chosen for these ports +// AddPorts adds a request for a number of ports to the job configuration. The names chosen for these ports // will be org.apache.aurora.port.X, where X is the current port count for the job configuration // starting at 0. These are random ports as it's not currently possible to request // specific ports using Aurora. @@ -286,6 +297,8 @@ func (j *AuroraJob) AddPorts(num int) Job { return j } +// AddValueConstraint allows the user to add a value constrain to the job to limiti which agents the job's +// tasks can be run on. // From Aurora Docs: // Add a Value constraint // name - Mesos slave attribute that the constraint is matched against. @@ -307,6 +320,7 @@ func (j *AuroraJob) AddValueConstraint(name string, negated bool, values ...stri return j } +// AddLimitConstraint allows the user to limit how many tasks form the same Job are run on a single host. // From Aurora Docs: // A constraint that specifies the maximum number of active tasks on a host with // a matching attribute that may be scheduled simultaneously. @@ -323,33 +337,34 @@ func (j *AuroraJob) AddLimitConstraint(name string, limit int32) Job { return j } +// AddDedicatedConstraint allows the user to add a dedicated constraint to a Job configuration. func (j *AuroraJob) AddDedicatedConstraint(role, name string) Job { j.AddValueConstraint("dedicated", false, role+"/"+name) return j } -// Set a container to run for the job configuration to run. +// Container sets a container to run for the job configuration to run. func (j *AuroraJob) Container(container Container) Job { j.jobConfig.TaskConfig.Container = container.Build() return j } -// Set a partition policy for the job configuration to implement. +// PartitionPolicy sets a partition policy for the job configuration to implement. func (j *AuroraJob) PartitionPolicy(policy *aurora.PartitionPolicy) Job { j.jobConfig.TaskConfig.PartitionPolicy = policy return j } -// Set the Tier for the Job. +// Tier sets the Tier for the Job. func (j *AuroraJob) Tier(tier string) Job { j.jobConfig.TaskConfig.Tier = &tier return j } -// Set an SlaPolicy for the Job. +// SlaPolicy sets an SlaPolicy for the Job. func (j *AuroraJob) SlaPolicy(policy *aurora.SlaPolicy) Job { j.jobConfig.TaskConfig.SlaPolicy = policy diff --git a/logger.go b/logger.go index 34e62ee..340171f 100644 --- a/logger.go +++ b/logger.go @@ -14,65 +14,73 @@ package realis -type Logger interface { +type logger interface { Println(v ...interface{}) Printf(format string, v ...interface{}) Print(v ...interface{}) } +// NoopLogger is a logger that can be attached to the client which will not print anything. type NoopLogger struct{} +// Printf is a NOOP function here. func (NoopLogger) Printf(format string, a ...interface{}) {} +// Print is a NOOP function here. func (NoopLogger) Print(a ...interface{}) {} +// Println is a NOOP function here. func (NoopLogger) Println(a ...interface{}) {} +// LevelLogger is a logger that can be configured to output different levels of information: Debug and Trace. +// Trace should only be enabled when very in depth information about the sequence of events a function took is needed. type LevelLogger struct { - Logger + logger debug bool trace bool } +// EnableDebug enables debug level logging for the LevelLogger func (l *LevelLogger) EnableDebug(enable bool) { l.debug = enable } +// EnableTrace enables trace level logging for the LevelLogger func (l *LevelLogger) EnableTrace(enable bool) { l.trace = enable } -func (l LevelLogger) DebugPrintf(format string, a ...interface{}) { +func (l LevelLogger) debugPrintf(format string, a ...interface{}) { if l.debug { l.Printf("[DEBUG] "+format, a...) } } -func (l LevelLogger) DebugPrint(a ...interface{}) { +func (l LevelLogger) debugPrint(a ...interface{}) { if l.debug { l.Print(append([]interface{}{"[DEBUG] "}, a...)...) } } -func (l LevelLogger) DebugPrintln(a ...interface{}) { +func (l LevelLogger) debugPrintln(a ...interface{}) { if l.debug { l.Println(append([]interface{}{"[DEBUG] "}, a...)...) } } -func (l LevelLogger) TracePrintf(format string, a ...interface{}) { +func (l LevelLogger) tracePrintf(format string, a ...interface{}) { if l.trace { l.Printf("[TRACE] "+format, a...) } } -func (l LevelLogger) TracePrint(a ...interface{}) { +func (l LevelLogger) tracePrint(a ...interface{}) { if l.trace { l.Print(append([]interface{}{"[TRACE] "}, a...)...) } } -func (l LevelLogger) TracePrintln(a ...interface{}) { +func (l LevelLogger) tracePrintln(a ...interface{}) { if l.trace { l.Println(append([]interface{}{"[TRACE] "}, a...)...) } diff --git a/monitors.go b/monitors.go index eb7c85c..69167df 100644 --- a/monitors.go +++ b/monitors.go @@ -12,7 +12,6 @@ * limitations under the License. */ -// Collection of monitors to create synchronicity package realis import ( @@ -22,11 +21,15 @@ import ( "github.com/pkg/errors" ) +// Monitor is a wrapper for the Realis client which allows us to have functions +// with the same name for Monitoring purposes. +// TODO(rdelvalle): Deprecate monitors and instead add prefix Monitor to +// all functions in this file like it is done in V2. type Monitor struct { Client Realis } -// Polls the scheduler every certain amount of time to see if the update has succeeded +// JobUpdate polls the scheduler every certain amount of time to see if the update has entered a terminal state. func (m *Monitor) JobUpdate( updateKey aurora.JobUpdateKey, interval int, @@ -71,6 +74,7 @@ func (m *Monitor) JobUpdate( } } +// JobUpdateStatus polls the scheduler every certain amount of time to see if the update has entered a specified state. func (m *Monitor) JobUpdateStatus( updateKey aurora.JobUpdateKey, desiredStatuses map[aurora.JobUpdateStatus]bool, @@ -93,6 +97,7 @@ func (m *Monitor) JobUpdateStatus( return summary[0].State.Status, err } +// JobUpdateQuery polls the scheduler every certain amount of time to see if the query call returns any results. func (m *Monitor) JobUpdateQuery( updateQuery aurora.JobUpdateQuery, interval time.Duration, @@ -124,7 +129,7 @@ func (m *Monitor) JobUpdateQuery( } } -// Monitor a Job until all instances enter one of the LIVE_STATES +// Instances will monitor a Job until all instances enter one of the LIVE_STATES func (m *Monitor) Instances( key *aurora.JobKey, instances int32, @@ -132,7 +137,7 @@ func (m *Monitor) Instances( return m.ScheduleStatus(key, instances, LiveStates, interval, timeout) } -// Monitor a Job until all instances enter a desired status. +// ScheduleStatus will monitor a Job until all instances enter a desired status. // Defaults sets of desired statuses provided by the thrift API include: // ACTIVE_STATES, SLAVE_ASSIGNED_STATES, LIVE_STATES, and TERMINAL_STATES func (m *Monitor) ScheduleStatus( @@ -173,7 +178,7 @@ func (m *Monitor) ScheduleStatus( } } -// Monitor host status until all hosts match the status provided. +// HostMaintenance will monitor host status until all hosts match the status provided. // Returns a map where the value is true if the host // is in one of the desired mode(s) or false if it is not as of the time when the monitor exited. func (m *Monitor) HostMaintenance( diff --git a/realis.go b/realis.go index 3a095b9..c41016a 100644 --- a/realis.go +++ b/realis.go @@ -38,8 +38,10 @@ import ( "github.com/paypal/gorealis/response" ) -const VERSION = "1.21.1" +const version = "1.21.1" +// Realis is an interface that defines the various APIs that may be used to communicate with +// the Apache Aurora scheduler. // TODO(rdelvalle): Move documentation to interface in order to make godoc look better accessible // Or get rid of the interface type Realis interface { @@ -73,7 +75,7 @@ type Realis interface { StartCronJob(key *aurora.JobKey) (*aurora.Response, error) // TODO: Remove this method and make it private to avoid race conditions ReestablishConn() error - RealisConfig() *RealisConfig + RealisConfig() *config Close() // Admin functions @@ -93,7 +95,7 @@ type Realis interface { } type realisClient struct { - config *RealisConfig + config *config client *aurora.AuroraSchedulerManagerClient readonlyClient *aurora.ReadOnlySchedulerClient adminClient *aurora.AuroraAdminClient @@ -103,7 +105,7 @@ type realisClient struct { transport thrift.TTransport } -type RealisConfig struct { +type config struct { username, password string url string timeoutms int @@ -130,43 +132,43 @@ var defaultBackoff = Backoff{ Jitter: 0.1, } -// ClientOption - An alias for a function that modifies the realis config object -type ClientOption func(*RealisConfig) +// ClientOption is an alias for a function that modifies the realis config object +type ClientOption func(*config) -// BasicAuth - Set authentication used against Apache Shiro in the Aurora scheduler +// BasicAuth sets authentication used against Apache Shiro in the Aurora scheduler func BasicAuth(username, password string) ClientOption { - return func(config *RealisConfig) { + return func(config *config) { config.username = username config.password = password } } -// SchedulerUrl - Set the immediate location of the current Aurora scheduler leader +// SchedulerUrl sets the immediate location of the current Aurora scheduler leader func SchedulerUrl(url string) ClientOption { - return func(config *RealisConfig) { + return func(config *config) { config.url = url } } -// TimeoutMS - Set the connection timeout for an HTTP post request in Miliseconds +// TimeoutMS sets the connection timeout for an HTTP post request in Miliseconds func TimeoutMS(timeout int) ClientOption { - return func(config *RealisConfig) { + return func(config *config) { config.timeoutms = timeout } } -// ZKCluster - Set a clusters.json provided cluster configuration to the client +// ZKCluster sets a clusters.json provided cluster configuration to the client func ZKCluster(cluster *Cluster) ClientOption { - return func(config *RealisConfig) { + return func(config *config) { config.cluster = cluster } } -// ZKUrl - Set the direct location of a Zookeeper node on which the Aurora leader registers itself +// ZKUrl sets the direct location of a Zookeeper node on which the Aurora leader registers itself func ZKUrl(url string) ClientOption { opts := []ZKOpt{ZKEndpoints(strings.Split(url, ",")...), ZKPath("/aurora/scheduler")} - return func(config *RealisConfig) { + return func(config *config) { if config.zkOptions == nil { config.zkOptions = opts } else { @@ -175,87 +177,97 @@ func ZKUrl(url string) ClientOption { } } -// Retries - Configure the retry mechanism for the client +// Retries configures the retry mechanism for the client func Retries(backoff Backoff) ClientOption { - return func(config *RealisConfig) { + return func(config *config) { config.backoff = backoff } } +// ThriftJSON configures the client to use the Thrift JSON protocol. func ThriftJSON() ClientOption { - return func(config *RealisConfig) { + return func(config *config) { config.jsonTransport = true } } +// ThriftBinary configures the client to use the Thrift Binary protocol. func ThriftBinary() ClientOption { - return func(config *RealisConfig) { + return func(config *config) { config.binTransport = true } } +// BackOff is an alternative name for the Retry mechanism configuration. func BackOff(b Backoff) ClientOption { - return func(config *RealisConfig) { + return func(config *config) { config.backoff = b } } +// InsecureSkipVerify configures the client to not check for matching hosts names on certificates +// when using an SSL enabled Aurora scheduler. func InsecureSkipVerify(insecureSkipVerify bool) ClientOption { - return func(config *RealisConfig) { + return func(config *config) { config.insecureSkipVerify = insecureSkipVerify } } +// Certspath sets the directory where the server certificates to be used when connecting to an SSL enabled +// Aurora scheduler are stored. func Certspath(certspath string) ClientOption { - return func(config *RealisConfig) { + return func(config *config) { config.certspath = certspath } } +// ClientCerts allows users to set client key and certificate when connecting to an SSL enabled +// Aurora scheduler. func ClientCerts(clientKey, clientCert string) ClientOption { - return func(config *RealisConfig) { + return func(config *config) { config.clientKey, config.clientCert = clientKey, clientCert } } -// Use this option if you'd like to override default settings for connecting to Zookeeper. +// ZookeeperOptions allows users to override default settings for connecting to Zookeeper. // See zk.go for what is possible to set as an option. func ZookeeperOptions(opts ...ZKOpt) ClientOption { - return func(config *RealisConfig) { + return func(config *config) { config.zkOptions = opts } } -// Using the word set to avoid name collision with Interface. -func SetLogger(l Logger) ClientOption { - return func(config *RealisConfig) { - config.logger = &LevelLogger{Logger: l} +// SetLogger allows the user to attach a logger that implements the logger interface in logger.go +// to the client. +func SetLogger(l logger) ClientOption { + return func(config *config) { + config.logger = &LevelLogger{logger: l} } } -// Enable debug statements. +// Debug enables debug statements in the client. func Debug() ClientOption { - return func(config *RealisConfig) { + return func(config *config) { config.debug = true } } -// Enable debug statements. +// Trace enables debug statements in the client. func Trace() ClientOption { - return func(config *RealisConfig) { + return func(config *config) { config.trace = true } } -// FailOnPermanentErrors - If the client encounters a connection error the standard library -// considers permanent, stop retrying and return an error to the user. +// FailOnPermanentErrors allows the client to stop upon encountering a connection error the standard library +// considers permanent and return an error to the user. func FailOnPermanentErrors() ClientOption { - return func(config *RealisConfig) { + return func(config *config) { config.failOnPermanentErrors = true } } -func newTJSONTransport(url string, timeout int, config *RealisConfig) (thrift.TTransport, error) { +func newTJSONTransport(url string, timeout int, config *config) (thrift.TTransport, error) { trans, err := defaultTTransport(url, timeout, config) if err != nil { return nil, errors.Wrap(err, "unable to create transport") @@ -266,11 +278,11 @@ func newTJSONTransport(url string, timeout int, config *RealisConfig) (thrift.TT } httpTrans.SetHeader("Content-Type", "application/x-thrift") - httpTrans.SetHeader("User-Agent", "gorealis v"+VERSION) + httpTrans.SetHeader("User-Agent", "gorealis v"+version) return trans, err } -func newTBinTransport(url string, timeout int, config *RealisConfig) (thrift.TTransport, error) { +func newTBinTransport(url string, timeout int, config *config) (thrift.TTransport, error) { trans, err := defaultTTransport(url, timeout, config) if err != nil { return nil, errors.Wrap(err, "unable to create transport") @@ -283,22 +295,22 @@ func newTBinTransport(url string, timeout int, config *RealisConfig) (thrift.TTr httpTrans.DelHeader("Content-Type") // Workaround for using thrift HttpPostClient httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary") httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary") - httpTrans.SetHeader("User-Agent", "gorealis v"+VERSION) + httpTrans.SetHeader("User-Agent", "gorealis v"+version) return trans, err } -// This client implementation of the realis interface uses a retry mechanism for all Thrift Calls. +// NewRealisClient is a client implementation of the realis interface uses a retry mechanism for all Thrift Calls. // It will retry all calls which result in a temporary failure as well as calls that fail due to an EOF // being returned by the http client. Most permanent failures are now being caught by the thriftCallWithRetries // function and not being retried but there may be corner cases not yet handled. func NewRealisClient(options ...ClientOption) (Realis, error) { - config := &RealisConfig{} + config := &config{} // Default configs config.timeoutms = 10000 config.backoff = defaultBackoff - config.logger = &LevelLogger{Logger: log.New(os.Stdout, "realis: ", log.Ltime|log.Ldate|log.LUTC)} + config.logger = &LevelLogger{logger: log.New(os.Stdout, "realis: ", log.Ltime|log.Ldate|log.LUTC)} // Save options to recreate client if a connection error happens config.options = options @@ -312,13 +324,13 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { // Turn off all logging (including debug) if config.logger == nil { - config.logger = &LevelLogger{Logger: NoopLogger{}} + config.logger = &LevelLogger{logger: NoopLogger{}} } // Set a logger if debug has been set to true but no logger has been set if config.logger == nil && config.debug { config.logger = &LevelLogger{ - Logger: log.New(os.Stdout, "realis: ", log.Ltime|log.Ldate|log.LUTC), + logger: log.New(os.Stdout, "realis: ", log.Ltime|log.Ldate|log.LUTC), debug: true, } } @@ -330,7 +342,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { config.logger.EnableDebug(config.debug) config.logger.EnableTrace(config.trace) - config.logger.DebugPrintln("Number of options applied to config: ", len(options)) + config.logger.debugPrintln("Number of options applied to config: ", len(options)) // Set default Transport to JSON if needed. if !config.jsonTransport && !config.binTransport { @@ -403,11 +415,13 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { client: aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory), readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory), adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory), - logger: LevelLogger{Logger: config.logger, debug: config.debug, trace: config.trace}, + logger: LevelLogger{logger: config.logger, debug: config.debug, trace: config.trace}, lock: &sync.Mutex{}, transport: config.transport}, nil } +// GetDefaultClusterFromZKUrl creates a cluster object from a Zoookeper url. This is deprecated in favor of using +// Zookeeper options. func GetDefaultClusterFromZKUrl(zkurl string) *Cluster { return &Cluster{ Name: "defaultCluster", @@ -419,7 +433,7 @@ func GetDefaultClusterFromZKUrl(zkurl string) *Cluster { } } -func GetCerts(certPath string) (*x509.CertPool, error) { +func createCertPool(certPath string) (*x509.CertPool, error) { globalRootCAs := x509.NewCertPool() caFiles, err := ioutil.ReadDir(certPath) if err != nil { @@ -437,13 +451,13 @@ func GetCerts(certPath string) (*x509.CertPool, error) { } // Creates a default Thrift Transport object for communications in gorealis using an HTTP Post Client -func defaultTTransport(url string, timeoutMs int, config *RealisConfig) (thrift.TTransport, error) { +func defaultTTransport(url string, timeoutMs int, config *config) (thrift.TTransport, error) { var transport http.Transport if config != nil { tlsConfig := &tls.Config{InsecureSkipVerify: config.insecureSkipVerify} if config.certspath != "" { - rootCAs, err := GetCerts(config.certspath) + rootCAs, err := createCertPool(config.certspath) if err != nil { config.logger.Println("error occurred couldn't fetch certs") return nil, err @@ -536,7 +550,7 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states []aurora.Schedu Statuses: states, } - r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", taskQ) + r.logger.debugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", taskQ) resp, retryErr := r.thriftCallWithRetries( false, @@ -561,7 +575,7 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states []aurora.Schedu func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error) { - r.logger.DebugPrintf("GetJobUpdateSummaries Thrift Payload: %+v\n", jobUpdateQuery) + r.logger.debugPrintf("GetJobUpdateSummaries Thrift Payload: %+v\n", jobUpdateQuery) resp, retryErr := r.thriftCallWithRetries( false, @@ -599,7 +613,7 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe // Kill specific instances of a job. func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) { - r.logger.DebugPrintf("KillTasks Thrift Payload: %+v %v\n", key, instances) + r.logger.debugPrintf("KillTasks Thrift Payload: %+v %v\n", key, instances) resp, retryErr := r.thriftCallWithRetries( false, @@ -613,14 +627,14 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a return resp, nil } -func (r *realisClient) RealisConfig() *RealisConfig { +func (r *realisClient) RealisConfig() *config { return r.config } // Sends a kill message to the scheduler for all active tasks under a job. func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { - r.logger.DebugPrintf("KillTasks Thrift Payload: %+v\n", key) + r.logger.debugPrintf("KillTasks Thrift Payload: %+v\n", key) resp, retryErr := r.thriftCallWithRetries( false, @@ -641,7 +655,7 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { // Use this API to create ad-hoc jobs. func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { - r.logger.DebugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig()) + r.logger.debugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig()) resp, retryErr := r.thriftCallWithRetries( false, @@ -680,7 +694,7 @@ func (r *realisClient) CreateService( } func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) { - r.logger.DebugPrintf("ScheduleCronJob Thrift Payload: %+v\n", auroraJob.JobConfig()) + r.logger.debugPrintf("ScheduleCronJob Thrift Payload: %+v\n", auroraJob.JobConfig()) resp, retryErr := r.thriftCallWithRetries( false, @@ -696,7 +710,7 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) { - r.logger.DebugPrintf("DescheduleCronJob Thrift Payload: %+v\n", key) + r.logger.debugPrintf("DescheduleCronJob Thrift Payload: %+v\n", key) resp, retryErr := r.thriftCallWithRetries( false, @@ -714,7 +728,7 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error) { - r.logger.DebugPrintf("StartCronJob Thrift Payload: %+v\n", key) + r.logger.debugPrintf("StartCronJob Thrift Payload: %+v\n", key) resp, retryErr := r.thriftCallWithRetries( false, @@ -731,7 +745,7 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error // Restarts specific instances specified func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) { - r.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instances) + r.logger.debugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instances) resp, retryErr := r.thriftCallWithRetries( false, @@ -753,7 +767,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) return nil, errors.Wrap(err1, "Could not retrieve relevant task instance IDs") } - r.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instanceIds) + r.logger.debugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instanceIds) if len(instanceIds) > 0 { resp, retryErr := r.thriftCallWithRetries( @@ -767,15 +781,15 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) } return resp, nil - } else { - return nil, errors.New("No tasks in the Active state") } + + return nil, errors.New("No tasks in the Active state") } // Update all tasks under a job configuration. Currently gorealis doesn't support for canary deployments. func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) { - r.logger.DebugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message) + r.logger.debugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message) resp, retryErr := r.thriftCallWithRetries( true, @@ -787,9 +801,9 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au // A timeout took place when attempting this call, attempt to recover if IsTimeout(retryErr) { return resp, retryErr - } else { - return resp, errors.Wrap(retryErr, "error sending StartJobUpdate command to Aurora Scheduler") } + + return resp, errors.Wrap(retryErr, "error sending StartJobUpdate command to Aurora Scheduler") } return resp, nil } @@ -799,7 +813,7 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au // However, if the job update does not transition to the ABORT state an error will be returned. func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) { - r.logger.DebugPrintf("AbortJobUpdate Thrift Payload: %+v %v\n", updateKey, message) + r.logger.debugPrintf("AbortJobUpdate Thrift Payload: %+v %v\n", updateKey, message) resp, retryErr := r.thriftCallWithRetries( false, @@ -825,7 +839,7 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str // Pause Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) { - r.logger.DebugPrintf("PauseJobUpdate Thrift Payload: %+v %v\n", updateKey, message) + r.logger.debugPrintf("PauseJobUpdate Thrift Payload: %+v %v\n", updateKey, message) resp, retryErr := r.thriftCallWithRetries( false, @@ -843,7 +857,7 @@ func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st // Resume Paused Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) { - r.logger.DebugPrintf("ResumeJobUpdate Thrift Payload: %+v %v\n", updateKey, message) + r.logger.debugPrintf("ResumeJobUpdate Thrift Payload: %+v %v\n", updateKey, message) resp, retryErr := r.thriftCallWithRetries( false, @@ -861,7 +875,7 @@ func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s // Pulse Job Update on Aurora. UpdateID is returned from StartJobUpdate or the Aurora web UI. func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.Response, error) { - r.logger.DebugPrintf("PulseJobUpdate Thrift Payload: %+v\n", updateKey) + r.logger.debugPrintf("PulseJobUpdate Thrift Payload: %+v\n", updateKey) resp, retryErr := r.thriftCallWithRetries( false, @@ -880,7 +894,7 @@ func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R // instance to scale up. func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) { - r.logger.DebugPrintf("AddInstances Thrift Payload: %+v %v\n", instKey, count) + r.logger.debugPrintf("AddInstances Thrift Payload: %+v %v\n", instKey, count) resp, retryErr := r.thriftCallWithRetries( false, @@ -919,7 +933,7 @@ func (r *realisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora // Get information about task including a fully hydrated task configuration object func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) { - r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", query) + r.logger.debugPrintf("GetTasksStatus Thrift Payload: %+v\n", query) resp, retryErr := r.thriftCallWithRetries( false, @@ -937,7 +951,7 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.Schedul // Get pending reason func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.PendingReason, error) { - r.logger.DebugPrintf("GetPendingReason Thrift Payload: %+v\n", query) + r.logger.debugPrintf("GetPendingReason Thrift Payload: %+v\n", query) resp, retryErr := r.thriftCallWithRetries( false, @@ -961,7 +975,7 @@ func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.Pend // Get information about task including without a task configuration object func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) { - r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query) + r.logger.debugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query) resp, retryErr := r.thriftCallWithRetries( false, @@ -987,7 +1001,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task Statuses: aurora.ACTIVE_STATES, } - r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", taskQ) + r.logger.debugPrintf("GetTasksStatus Thrift Payload: %+v\n", taskQ) resp, retryErr := r.thriftCallWithRetries( false, @@ -1015,7 +1029,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aurora.Response, error) { - r.logger.DebugPrintf("GetJobUpdateDetails Thrift Payload: %+v\n", updateQuery) + r.logger.debugPrintf("GetJobUpdateDetails Thrift Payload: %+v\n", updateQuery) resp, retryErr := r.thriftCallWithRetries( false, @@ -1032,7 +1046,7 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) { - r.logger.DebugPrintf("RollbackJobUpdate Thrift Payload: %+v %v\n", key, message) + r.logger.debugPrintf("RollbackJobUpdate Thrift Payload: %+v %v\n", key, message) resp, retryErr := r.thriftCallWithRetries( false, diff --git a/realis_admin.go b/realis_admin.go index 0461d90..184ae55 100644 --- a/realis_admin.go +++ b/realis_admin.go @@ -24,7 +24,7 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr drainList := aurora.NewHosts() drainList.HostNames = hosts - r.logger.DebugPrintf("DrainHosts Thrift Payload: %v\n", drainList) + r.logger.debugPrintf("DrainHosts Thrift Payload: %v\n", drainList) resp, retryErr := r.thriftCallWithRetries( false, @@ -59,7 +59,7 @@ func (r *realisClient) SLADrainHosts( drainList := aurora.NewHosts() drainList.HostNames = hosts - r.logger.DebugPrintf("SLADrainHosts Thrift Payload: %v\n", drainList) + r.logger.debugPrintf("SLADrainHosts Thrift Payload: %v\n", drainList) resp, retryErr := r.thriftCallWithRetries( false, @@ -89,7 +89,7 @@ func (r *realisClient) StartMaintenance(hosts ...string) (*aurora.Response, *aur hostList := aurora.NewHosts() hostList.HostNames = hosts - r.logger.DebugPrintf("StartMaintenance Thrift Payload: %v\n", hostList) + r.logger.debugPrintf("StartMaintenance Thrift Payload: %v\n", hostList) resp, retryErr := r.thriftCallWithRetries( false, @@ -119,7 +119,7 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror hostList := aurora.NewHosts() hostList.HostNames = hosts - r.logger.DebugPrintf("EndMaintenance Thrift Payload: %v\n", hostList) + r.logger.debugPrintf("EndMaintenance Thrift Payload: %v\n", hostList) resp, retryErr := r.thriftCallWithRetries( false, @@ -149,7 +149,7 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au hostList := aurora.NewHosts() hostList.HostNames = hosts - r.logger.DebugPrintf("MaintenanceStatus Thrift Payload: %v\n", hostList) + r.logger.debugPrintf("MaintenanceStatus Thrift Payload: %v\n", hostList) // Make thrift call. If we encounter an error sending the call, attempt to reconnect // and continue trying to resend command until we run out of retries. diff --git a/realis_e2e_test.go b/realis_e2e_test.go index 2d0251f..96e9ee0 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -77,17 +77,17 @@ func TestNonExistentEndpoint(t *testing.T) { t.Run("WithRetries", func(t *testing.T) { // Attempt to connect to a bad endpoint - r, err := realis.NewRealisClient( + badClient, err := realis.NewRealisClient( realis.SchedulerUrl(badEndpoint), realis.TimeoutMS(200000), realis.BackOff(backoff), ) require.NoError(t, err) - require.NotNil(t, r) - defer r.Close() + require.NotNil(t, badClient) + defer badClient.Close() - _, err = r.GetTasksWithoutConfigs(taskQ) + _, err = badClient.GetTasksWithoutConfigs(taskQ) // Check that we do error out of retrying require.Error(t, err) @@ -101,7 +101,7 @@ func TestNonExistentEndpoint(t *testing.T) { t.Run("FailOnLookup", func(t *testing.T) { // Attempt to connect to a bad endpoint - r, err := realis.NewRealisClient( + badClient, err := realis.NewRealisClient( realis.SchedulerUrl(badEndpoint), realis.TimeoutMS(200000), realis.BackOff(backoff), @@ -109,10 +109,10 @@ func TestNonExistentEndpoint(t *testing.T) { ) require.NoError(t, err) - require.NotNil(t, r) - defer r.Close() + require.NotNil(t, badClient) + defer badClient.Close() - _, err = r.GetTasksWithoutConfigs(taskQ) + _, err = badClient.GetTasksWithoutConfigs(taskQ) // Check that we do error out of retrying require.Error(t, err) @@ -195,12 +195,6 @@ func TestRealisClient_ReestablishConn(t *testing.T) { assert.NoError(t, err) } -func TestGetCACerts(t *testing.T) { - certs, err := realis.GetCerts("./examples/certs") - require.NoError(t, err) - assert.Equal(t, len(certs.Subjects()), 2) -} - func TestRealisClient_CreateJob_Thermos(t *testing.T) { role := "vagrant" diff --git a/retry.go b/retry.go index ce8e4cc..77b02d5 100644 --- a/retry.go +++ b/retry.go @@ -26,6 +26,8 @@ import ( "github.com/pkg/errors" ) +// Backoff determines how the retry mechanism should react after each failure and how many failures it should +// tolerate. type Backoff struct { Duration time.Duration // the base duration Factor float64 // Duration is multiplied by a factor each iteration @@ -50,19 +52,16 @@ func Jitter(duration time.Duration, maxFactor float64) time.Duration { // if the loop should be aborted. type ConditionFunc func() (done bool, err error) -// Modified version of the Kubernetes exponential-backoff code. -// ExponentialBackoff repeats a condition check with exponential backoff. -// -// It checks the condition up to Steps times, increasing the wait by multiplying -// the previous duration by Factor. +// ExponentialBackoff is a modified version of the Kubernetes exponential-backoff code. +// It repeats a condition check with exponential backoff and checks the condition up to +// Steps times, increasing the wait by multiplying the previous duration by Factor. // // If Jitter is greater than zero, a random amount of each duration is added // (between duration and duration*(1+jitter)). // // If the condition never returns true, ErrWaitTimeout is returned. Errors // do not cause the function to return. - -func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc) error { +func ExponentialBackoff(backoff Backoff, logger logger, condition ConditionFunc) error { var err error var ok bool var curStep int @@ -95,10 +94,9 @@ func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc) // If the error is temporary, continue retrying. if !IsTemporary(err) { return err - } else { - // Print out the temporary error we experienced. - logger.Println(err) } + // Print out the temporary error we experienced. + logger.Println(err) } } @@ -109,9 +107,9 @@ func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc) // Provide more information to the user wherever possible if err != nil { return newRetryError(errors.Wrap(err, "ran out of retries"), curStep) - } else { - return newRetryError(errors.New("ran out of retries"), curStep) } + + return newRetryError(errors.New("ran out of retries"), curStep) } type auroraThriftCall func() (resp *aurora.Response, err error) @@ -156,7 +154,7 @@ func (r *realisClient) thriftCallWithRetries( resp, clientErr = thriftCall() - r.logger.TracePrintf("Aurora Thrift Call ended resp: %v clientErr: %v\n", resp, clientErr) + r.logger.tracePrintf("Aurora Thrift Call ended resp: %v clientErr: %v\n", resp, clientErr) }() // Check if our thrift call is returning an error. This is a retryable event as we don't know @@ -169,7 +167,7 @@ func (r *realisClient) thriftCallWithRetries( // Determine if error is a temporary URL error by going up the stack e, ok := clientErr.(thrift.TTransportException) if ok { - r.logger.DebugPrint("Encountered a transport exception") + r.logger.debugPrint("Encountered a transport exception") e, ok := e.Err().(*url.Error) if ok { @@ -186,7 +184,7 @@ func (r *realisClient) thriftCallWithRetries( // error. Users can take special action on a timeout by using IsTimedout and reacting accordingly. if e.Timeout() { timeouts++ - r.logger.DebugPrintf( + r.logger.debugPrintf( "Client closed connection (timedout) %d times before server responded, "+ "consider increasing connection timeout", timeouts) @@ -200,8 +198,10 @@ func (r *realisClient) thriftCallWithRetries( // In the future, reestablish connection should be able to check if it is actually possible // to make a thrift call to Aurora. For now, a reconnect should always lead to a retry. // Ignoring error due to the fact that an error should be retried regardless - _ = r.ReestablishConn() - + reestablishErr := r.ReestablishConn() + if reestablishErr != nil { + r.logger.debugPrintf("error re-establishing connection ", reestablishErr) + } } else { // If there was no client error, but the response is nil, something went wrong. @@ -233,14 +233,14 @@ func (r *realisClient) thriftCallWithRetries( // The only case that should fall down to here is a WARNING response code. // It is currently not used as a response in the scheduler so it is unknown how to handle it. default: - r.logger.DebugPrintf("unhandled response code %v received from Aurora\n", responseCode) + r.logger.debugPrintf("unhandled response code %v received from Aurora\n", responseCode) return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String()) } } } - r.logger.DebugPrintf("it took %v retries to complete this operation\n", curStep) + r.logger.debugPrintf("it took %v retries to complete this operation\n", curStep) if curStep > 1 { r.config.logger.Printf("retried this thrift call %d time(s)", curStep) @@ -249,7 +249,7 @@ func (r *realisClient) thriftCallWithRetries( // Provide more information to the user wherever possible. if clientErr != nil { return nil, newRetryError(errors.Wrap(clientErr, "ran out of retries, including latest error"), curStep) - } else { - return nil, newRetryError(errors.New("ran out of retries"), curStep) } + + return nil, newRetryError(errors.New("ran out of retries"), curStep) } diff --git a/updatejob.go b/updatejob.go index fd075dc..0537c65 100644 --- a/updatejob.go +++ b/updatejob.go @@ -18,13 +18,13 @@ import ( "github.com/paypal/gorealis/gen-go/apache/aurora" ) -// Structure to collect all information required to create job update +// UpdateJob is a structure to collect all information required to create job update. type UpdateJob struct { Job // SetInstanceCount for job is hidden, access via full qualifier req *aurora.JobUpdateRequest } -// Create a default UpdateJob object. +// NewDefaultUpdateJob creates an UpdateJob object with opinionated default settings. func NewDefaultUpdateJob(config *aurora.TaskConfig) *UpdateJob { req := aurora.NewJobUpdateRequest() @@ -74,6 +74,7 @@ func NewDefaultUpdateJob(config *aurora.TaskConfig) *UpdateJob { return &UpdateJob{Job: job, req: req} } +// NewUpdateJob creates an UpdateJob object wihtout default settings. func NewUpdateJob(config *aurora.TaskConfig, settings *aurora.JobUpdateSettings) *UpdateJob { req := aurora.NewJobUpdateRequest() @@ -115,50 +116,50 @@ func NewUpdateJob(config *aurora.TaskConfig, settings *aurora.JobUpdateSettings) return &UpdateJob{Job: job, req: req} } -// Set instance count the job will have after the update. +// InstanceCount sets instance count the job will have after the update. func (u *UpdateJob) InstanceCount(inst int32) *UpdateJob { u.req.InstanceCount = inst return u } -// Max number of instances being updated at any given moment. +// BatchSize sets the max number of instances being updated at any given moment. func (u *UpdateJob) BatchSize(size int32) *UpdateJob { u.req.Settings.UpdateGroupSize = size return u } -// Minimum number of seconds a shard must remain in RUNNING state before considered a success. +// WatchTime sets the minimum number of seconds a shard must remain in RUNNING state before considered a success. func (u *UpdateJob) WatchTime(ms int32) *UpdateJob { u.req.Settings.MinWaitInInstanceRunningMs = ms return u } -// Wait for all instances in a group to be done before moving on. +// WaitForBatchCompletion configures the job update to wait for all instances in a group to be done before moving on. func (u *UpdateJob) WaitForBatchCompletion(batchWait bool) *UpdateJob { u.req.Settings.WaitForBatchCompletion = batchWait return u } -// Max number of instance failures to tolerate before marking instance as FAILED. +// MaxPerInstanceFailures sets the max number of instance failures to tolerate before marking instance as FAILED. func (u *UpdateJob) MaxPerInstanceFailures(inst int32) *UpdateJob { u.req.Settings.MaxPerInstanceFailures = inst return u } -// Max number of FAILED instances to tolerate before terminating the update. +// MaxFailedInstances sets the max number of FAILED instances to tolerate before terminating the update. func (u *UpdateJob) MaxFailedInstances(inst int32) *UpdateJob { u.req.Settings.MaxFailedInstances = inst return u } -// When False, prevents auto rollback of a failed update. +// RollbackOnFail configure the job to rollback automatically after a job update fails. func (u *UpdateJob) RollbackOnFail(rollback bool) *UpdateJob { u.req.Settings.RollbackOnFailure = rollback return u } +// NewUpdateSettings return an opinionated set of job update settings. func NewUpdateSettings() *aurora.JobUpdateSettings { - us := new(aurora.JobUpdateSettings) // Mirrors defaults set by Pystachio us.UpdateGroupSize = 1 diff --git a/util.go b/util.go index f554b90..8431347 100644 --- a/util.go +++ b/util.go @@ -10,11 +10,22 @@ import ( const apiPath = "/api" +// ActiveStates - States a task may be in when active. var ActiveStates = make(map[aurora.ScheduleStatus]bool) + +// SlaveAssignedStates - States a task may be in when it has already been assigned to a Mesos agent. var SlaveAssignedStates = make(map[aurora.ScheduleStatus]bool) + +// LiveStates - States a task may be in when it is live (e.g. able to take traffic) var LiveStates = make(map[aurora.ScheduleStatus]bool) + +// TerminalStates - Set of states a task may not transition away from. var TerminalStates = make(map[aurora.ScheduleStatus]bool) + +// ActiveJobUpdateStates - States a Job Update may be in where it is considered active. var ActiveJobUpdateStates = make(map[aurora.JobUpdateStatus]bool) + +// AwaitingPulseJobUpdateStates - States a job update may be in where it is waiting for a pulse. var AwaitingPulseJobUpdateStates = make(map[aurora.JobUpdateStatus]bool) func init() { diff --git a/zk.go b/zk.go index 51b4d6e..4bc9ede 100644 --- a/zk.go +++ b/zk.go @@ -24,14 +24,14 @@ import ( "github.com/samuel/go-zookeeper/zk" ) -type Endpoint struct { +type endpoint struct { Host string `json:"host"` Port int `json:"port"` } -type ServiceInstance struct { - Service Endpoint `json:"serviceEndpoint"` - AdditionalEndpoints map[string]Endpoint `json:"additionalEndpoints"` +type serviceInstance struct { + Service endpoint `json:"serviceEndpoint"` + AdditionalEndpoints map[string]endpoint `json:"additionalEndpoints"` Status string `json:"status"` } @@ -40,47 +40,54 @@ type zkConfig struct { path string backoff Backoff timeout time.Duration - logger Logger + logger logger } +// ZKOpt - Configuration option for the Zookeeper client used. type ZKOpt func(z *zkConfig) +// ZKEndpoints - Endpoints on which a Zookeeper instance is running to be used by the client. func ZKEndpoints(endpoints ...string) ZKOpt { return func(z *zkConfig) { z.endpoints = endpoints } } +// ZKPath - Path to look for information in when connected to Zookeeper. func ZKPath(path string) ZKOpt { return func(z *zkConfig) { z.path = path } } +// ZKBackoff - Configuration for Retry mechanism used when connecting to Zookeeper. +// TODO(rdelvalle): Determine if this is really necessary as the ZK library already has a retry built in. func ZKBackoff(b Backoff) ZKOpt { return func(z *zkConfig) { z.backoff = b } } +// ZKTimeout - How long to wait on a response from the Zookeeper instance before considering it dead. func ZKTimeout(d time.Duration) ZKOpt { return func(z *zkConfig) { z.timeout = d } } -func ZKLogger(l Logger) ZKOpt { +// ZKLogger - Attach a logger to the Zookeeper client in order to debug issues. +func ZKLogger(l logger) ZKOpt { return func(z *zkConfig) { z.logger = l } } -// Retrieves current Aurora leader from ZK. +// LeaderFromZK - Retrieves current Aurora leader from ZK. func LeaderFromZK(cluster Cluster) (string, error) { return LeaderFromZKOpts(ZKEndpoints(strings.Split(cluster.ZK, ",")...), ZKPath(cluster.SchedZKPath)) } -// Retrieves current Aurora leader from ZK with a custom configuration. +// LeaderFromZKOpts - Retrieves current Aurora leader from ZK with a custom configuration. func LeaderFromZKOpts(options ...ZKOpt) (string, error) { var leaderURL string @@ -121,7 +128,6 @@ func LeaderFromZKOpts(options ...ZKOpt) (string, error) { } // Search for the leader through all the children in the given path - serviceInst := new(ServiceInstance) for _, child := range children { // Only the leader will start with member_ @@ -137,7 +143,8 @@ func LeaderFromZKOpts(options ...ZKOpt) (string, error) { return false, NewTemporaryError(errors.Wrap(err, "unable to fetch contents of leader")) } - err = json.Unmarshal([]byte(data), serviceInst) + var serviceInst serviceInstance + err = json.Unmarshal([]byte(data), &serviceInst) if err != nil { return false, NewTemporaryError(errors.Wrap(err, "unable to unmarshal contents of leader")) } diff --git a/zk_test.go b/zk_test.go index c23ff75..b4e5d22 100644 --- a/zk_test.go +++ b/zk_test.go @@ -24,11 +24,12 @@ import ( "github.com/stretchr/testify/assert" ) -var backoff realis.Backoff = realis.Backoff{ // Reduce penalties for this test to make it quick +var backoff = realis.Backoff{ // Reduce penalties for this test to make it quick Steps: 5, Duration: 1 * time.Second, Factor: 1.0, - Jitter: 0.1} + Jitter: 0.1, +} // Test for behavior when no endpoints are given to the ZK leader finding function. func TestZKNoEndpoints(t *testing.T) {