Documentation and linting improvements (#108)

* Simplifying documentation for getting started: Removed outdated information about install Golang on different platforms and instead included a link to the official Golang website which has more up to date information. Instructions for installing docker-compose have also been added.

* Added documentation to all exported functions and structs.

* Unexported some structures and functions that were needlessly exported.

* Adding golang CI default configuration which can be useful while developing and may be turned on later in the CI.

* Moving build process in CI to xenial.

* Reducing line size. in some files and shadowing in some test cases.
This commit is contained in:
Renan DelValle 2019-06-12 11:22:59 -07:00 committed by GitHub
parent 6dc4bf93b9
commit df8fc2fba1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 347 additions and 274 deletions

71
.golangci.yml Normal file
View file

@ -0,0 +1,71 @@
# This file contains all available configuration options
# with their default values.
# options for analysis running
run:
# default concurrency is a available CPU number
concurrency: 4
# timeout for analysis, e.g. 30s, 5m, default is 1m
deadline: 1m
# exit code when at least one issue was found, default is 1
issues-exit-code: 1
# include test files or not, default is true
tests: true
skip-dirs:
- gen-go/
# output configuration options
output:
# colored-line-number|line-number|json|tab|checkstyle|code-climate, default is "colored-line-number"
format: colored-line-number
# print lines of code with issue, default is true
print-issued-lines: true
# print linter name in the end of issue text, default is true
print-linter-name: true
# all available settings of specific linters
linters-settings:
errcheck:
# report about not checking of errors in type assetions: `a := b.(MyStruct)`;
# default is false: such cases aren't reported by default.
check-type-assertions: true
# report about assignment of errors to blank identifier: `num, _ := strconv.Atoi(numStr)`;
# default is false: such cases aren't reported by default.
check-blank: true
govet:
# report about shadowed variables
check-shadowing: true
goconst:
# minimal length of string constant, 3 by default
min-len: 3
# minimal occurrences count to trigger, 3 by default
min-occurrences: 2
misspell:
# Correct spellings using locale preferences for US or UK.
# Default is to use a neutral variety of English.
# Setting locale to US will correct the British spelling of 'colour' to 'color'.
locale: US
lll:
# max line length, lines longer will be reported. Default is 120.
# '\t' is counted as 1 character by default, and can be changed with the tab-width option
line-length: 120
# tab width in spaces. Default to 1.
tab-width: 4
linters:
enable:
- govet
- goimports
- golint
- lll
- goconst
enable-all: false
fast: false

View file

@ -1,5 +1,6 @@
sudo: required
dist: xenial
language: go
branches:

View file

@ -21,6 +21,8 @@ import (
"github.com/pkg/errors"
)
// Cluster contains the definition of the clusters.json file used by the default Aurora
// client for configuration
type Cluster struct {
Name string `json:"name"`
AgentRoot string `json:"slave_root"`
@ -33,7 +35,8 @@ type Cluster struct {
AuthMechanism string `json:"auth_mechanism"`
}
// Loads clusters.json file traditionally located at /etc/aurora/clusters.json
// LoadClusters loads clusters.json file traditionally located at /etc/aurora/clusters.json
// for use with a gorealis client
func LoadClusters(config string) (map[string]Cluster, error) {
file, err := os.Open(config)

View file

@ -18,31 +18,40 @@ import (
"github.com/paypal/gorealis/gen-go/apache/aurora"
)
// Container is an interface that defines a single function needed to create
// an Aurora container type. It exists because the code must support both Mesos
// and Docker containers.
type Container interface {
Build() *aurora.Container
}
// MesosContainer is a Mesos style container that can be used by Aurora Jobs.
type MesosContainer struct {
container *aurora.MesosContainer
}
// DockerContainer is a vanilla Docker style container that can be used by Aurora Jobs.
type DockerContainer struct {
container *aurora.DockerContainer
}
// NewDockerContainer creates a new Aurora compatible Docker container configuration.
func NewDockerContainer() DockerContainer {
return DockerContainer{container: aurora.NewDockerContainer()}
}
// Build creates an Aurora container based upon the configuration provided.
func (c DockerContainer) Build() *aurora.Container {
return &aurora.Container{Docker: c.container}
}
// Image adds the name of a Docker image to be used by the Job when running.
func (c DockerContainer) Image(image string) DockerContainer {
c.container.Image = image
return c
}
// AddParameter adds a parameter to be passed to Docker when the container is run.
func (c DockerContainer) AddParameter(name, value string) DockerContainer {
c.container.Parameters = append(c.container.Parameters, &aurora.DockerParameter{
Name: name,
@ -51,14 +60,17 @@ func (c DockerContainer) AddParameter(name, value string) DockerContainer {
return c
}
// NewMesosContainer creates a Mesos style container to be configured and built for use by an Aurora Job.
func NewMesosContainer() MesosContainer {
return MesosContainer{container: aurora.NewMesosContainer()}
}
// Build creates a Mesos style Aurora container configuration to be passed on to the Aurora Job.
func (c MesosContainer) Build() *aurora.Container {
return &aurora.Container{Mesos: c.container}
}
// DockerImage configures the Mesos container to use a specific Docker image when being run.
func (c MesosContainer) DockerImage(name, tag string) MesosContainer {
if c.container.Image == nil {
c.container.Image = aurora.NewImage()
@ -68,11 +80,12 @@ func (c MesosContainer) DockerImage(name, tag string) MesosContainer {
return c
}
func (c MesosContainer) AppcImage(name, imageId string) MesosContainer {
// AppcImage configures the Mesos container to use an image in the Appc format to run the container.
func (c MesosContainer) AppcImage(name, imageID string) MesosContainer {
if c.container.Image == nil {
c.container.Image = aurora.NewImage()
}
c.container.Image.Appc = &aurora.AppcImage{Name: name, ImageId: imageId}
c.container.Image.Appc = &aurora.AppcImage{Name: name, ImageId: imageID}
return c
}

View file

@ -88,12 +88,6 @@ On Ubuntu, restarting the aurora-scheduler can be achieved by running the follow
$ sudo service aurora-scheduler restart
```
### Using a custom client
Pystachio does not yet support launching tasks using custom executors. Therefore, a custom
client must be used in order to launch tasks using a custom executor. In this case,
we will be using [gorealis](https://github.com/paypal/gorealis) to launch a task with
the compose executor on Aurora.
## Using [dce-go](https://github.com/paypal/dce-go)
Instead of manually configuring Aurora to run the docker-compose executor, one can follow the instructions provided [here](https://github.com/paypal/dce-go/blob/develop/docs/environment.md) to quickly create a DCE environment that would include mesos, aurora, golang1.7, docker, docker-compose and DCE installed.
@ -107,80 +101,12 @@ Mesos endpoint --> http://192.168.33.8:5050
### Installing Go
#### Linux
Follow the instructions at the official golang website: [golang.org/doc/install](https://golang.org/doc/install)
##### Ubuntu
### Installing docker-compose
###### Adding a PPA and install via apt-get
```
$ sudo add-apt-repository ppa:ubuntu-lxc/lxd-stable
$ sudo apt-get update
$ sudo apt-get install golang
```
###### Configuring the GOPATH
Configure the environment to be able to compile and run Go code.
```
$ mkdir $HOME/go
$ echo export GOPATH=$HOME/go >> $HOME/.bashrc
$ echo export GOROOT=/usr/lib/go >> $HOME/.bashrc
$ echo export PATH=$PATH:$GOPATH/bin >> $HOME/.bashrc
$ echo export PATH=$PATH:$GOROOT/bin >> $HOME/.bashrc
```
Finally we must reload the .bashrc configuration:
```
$ source $HOME/.bashrc
```
#### OS X
One way to install go on OS X is by using [Homebrew](http://brew.sh/)
##### Installing Homebrew
Run the following command from the terminal to install Hombrew:
```
$ /usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"
```
##### Installing Go using Hombrew
Run the following command from the terminal to install Go:
```
$ brew install go
```
##### Configuring the GOPATH
Configure the environment to be able to compile and run Go code.
```
$ mkdir $HOME/go
$ echo export GOPATH=$HOME/go >> $HOME/.profile
$ echo export GOROOT=/usr/local/opt/go/libexec >> $HOME/.profile
$ echo export PATH=$PATH:$GOPATH/bin >> $HOME/.profile
$ echo export PATH=$PATH:$GOROOT/bin >> $HOME/.profile
```
Finally we must reload the .profile configuration:
```
$ source $HOME/.profile
```
#### Windows
Download and run the msi installer from https://golang.org/dl/
## Installing Docker Compose (if manually configured Aurora)
To show Aurora's new multi executor feature, we need to use at least one custom executor.
In this case we will be using the [docker-compose-executor](https://github.com/mesos/docker-compose-executor).
In order to run the docker-compose executor, each agent must have docker-compose installed on it.
This can be done using pip:
```
$ sudo pip install docker-compose
```
Agents which will run dce-go will need docker-compose in order to sucessfully run the executor.
Instructions for installing docker-compose on various platforms may be found on Docker's webiste: [docs.docker.com/compose/install/](https://docs.docker.com/compose/install/)
## Downloading gorealis
Finally, we must get `gorealis` using the `go get` command:
@ -192,7 +118,7 @@ go get github.com/paypal/gorealis
# Creating Aurora Jobs
## Creating a thermos job
To demonstrate that we are able to run jobs using different executors on the
To demonstrate that we are able to run jobs using different executors on the
same scheduler, we'll first launch a thermos job using the default Aurora Client.
We can use a sample job for this:
@ -259,8 +185,8 @@ go run $GOPATH/src/github.com/paypal/gorealis/examples/client.go -executor=compo
```
If everything went according to plan, a new job will be shown in the Aurora UI.
We can further investigate inside the Mesos task sandbox. Inside the sandbox, under
the sample-app folder, we can find a docker-compose.yml-generated.yml. If we inspect this file,
We can further investigate inside the Mesos task sandbox. Inside the sandbox, under
the sample-app folder, we can find a docker-compose.yml-generated.yml. If we inspect this file,
we can find the port at which we can find the web server we launched.
Under Web->Ports, we find the port Mesos allocated. We can then navigate to:
@ -269,10 +195,10 @@ Under Web->Ports, we find the port Mesos allocated. We can then navigate to:
A message from the executor should greet us.
## Creating a Thermos job using gorealis
It is also possible to create a thermos job using gorealis. To do this, however,
It is also possible to create a thermos job using gorealis. To do this, however,
a thermos payload is required. A thermos payload consists of a JSON blob that details
the entire task as it exists inside the Aurora Scheduler. *Creating the blob is unfortunately
out of the scope of what gorealis does*, so a thermos payload must be generated beforehand or
out of the scope of what gorealis does*, so a thermos payload must be generated beforehand or
retrieved from the structdump of an existing task for testing purposes.
A sample thermos JSON payload may be found [here](../examples/thermos_payload.json) in the examples folder.

View file

@ -1,6 +1,6 @@
# Using the Sample client
## Usage:
## Usage:
```
Usage of ./client:
-cluster string

View file

@ -23,6 +23,8 @@ type timeout interface {
Timedout() bool
}
// IsTimeout returns true if the error being passed as an argument implements the Timeout interface
// and the Timedout function returns true.
func IsTimeout(err error) bool {
temp, ok := err.(timeout)
return ok && temp.Timedout()
@ -61,41 +63,42 @@ func (r *retryErr) RetryCount() int {
return r.retryCount
}
// Helper function for testing verification to avoid whitebox testing
// ToRetryCount is a helper function for testing verification to avoid whitebox testing
// as well as keeping retryErr as a private.
// Should NOT be used under any other context.
func ToRetryCount(err error) *retryErr {
if retryErr, ok := err.(*retryErr); ok {
return retryErr
} else {
return nil
}
return nil
}
func newRetryError(err error, retryCount int) *retryErr {
return &retryErr{error: err, timedout: true, retryCount: retryCount}
}
// Temporary errors indicate that the action may and should be retried.
// Temporary errors indicate that the action may or should be retried.
type temporary interface {
Temporary() bool
}
// IsTemporary indicates whether the error passed in as an argument implements the temporary interface
// and if the Temporary function returns true.
func IsTemporary(err error) bool {
temp, ok := err.(temporary)
return ok && temp.Temporary()
}
type TemporaryErr struct {
type temporaryErr struct {
error
temporary bool
}
func (t *TemporaryErr) Temporary() bool {
func (t *temporaryErr) Temporary() bool {
return t.temporary
}
// Retrying after receiving this error is advised
func NewTemporaryError(err error) *TemporaryErr {
return &TemporaryErr{error: err, temporary: true}
// NewTemporaryError creates a new error which satisfies the Temporary interface.
func NewTemporaryError(err error) *temporaryErr {
return &temporaryErr{error: err, temporary: true}
}

65
job.go
View file

@ -20,6 +20,9 @@ import (
"github.com/paypal/gorealis/gen-go/apache/aurora"
)
// Job inteface is used to define a set of functions an Aurora Job object
// must implemement.
// TODO(rdelvalle): Consider getting rid of the Job interface
type Job interface {
// Set Job Key environment.
Environment(env string) Job
@ -61,24 +64,24 @@ type Job interface {
SlaPolicy(policy *aurora.SlaPolicy) Job
}
type ResourceType int
type resourceType int
const (
CPU ResourceType = iota
CPU resourceType = iota
RAM
DISK
GPU
)
// Structure to collect all information pertaining to an Aurora job.
// AuroraJob is a structure to collect all information pertaining to an Aurora job.
type AuroraJob struct {
jobConfig *aurora.JobConfiguration
resources map[ResourceType]*aurora.Resource
resources map[resourceType]*aurora.Resource
metadata map[string]*aurora.Metadata
portCount int
}
// Create a Job object with everything initialized.
// NewJob is used to create a Job object with everything initialized.
func NewJob() Job {
jobConfig := aurora.NewJobConfiguration()
taskConfig := aurora.NewTaskConfig()
@ -98,7 +101,7 @@ func NewJob() Job {
ramMb := aurora.NewResource()
diskMb := aurora.NewResource()
resources := map[ResourceType]*aurora.Resource{CPU: numCpus, RAM: ramMb, DISK: diskMb}
resources := map[resourceType]*aurora.Resource{CPU: numCpus, RAM: ramMb, DISK: diskMb}
taskConfig.Resources = []*aurora.Resource{numCpus, ramMb, diskMb}
numCpus.NumCpus = new(float64)
@ -113,13 +116,13 @@ func NewJob() Job {
}
}
// Set Job Key environment.
// Environment sets the Job Key environment.
func (j *AuroraJob) Environment(env string) Job {
j.jobConfig.Key.Environment = env
return j
}
// Set Job Key Role.
// Role sets the Job Key role.
func (j *AuroraJob) Role(role string) Job {
j.jobConfig.Key.Role = role
@ -130,13 +133,13 @@ func (j *AuroraJob) Role(role string) Job {
return j
}
// Set Job Key Name.
// Name sets the Job Key Name.
func (j *AuroraJob) Name(name string) Job {
j.jobConfig.Key.Name = name
return j
}
// Set name of the executor that will the task will be configured to.
// ExecutorName sets the name of the executor that will the task will be configured to.
func (j *AuroraJob) ExecutorName(name string) Job {
if j.jobConfig.TaskConfig.ExecutorConfig == nil {
@ -147,7 +150,7 @@ func (j *AuroraJob) ExecutorName(name string) Job {
return j
}
// Will be included as part of entire task inside the scheduler that will be serialized.
// ExecutorData sets the data blob that will be passed to the Mesos executor.
func (j *AuroraJob) ExecutorData(data string) Job {
if j.jobConfig.TaskConfig.ExecutorConfig == nil {
@ -158,21 +161,25 @@ func (j *AuroraJob) ExecutorData(data string) Job {
return j
}
// CPU sets the amount of CPU each task will use in an Aurora Job.
func (j *AuroraJob) CPU(cpus float64) Job {
*j.resources[CPU].NumCpus = cpus
return j
}
// RAM sets the amount of RAM each task will use in an Aurora Job.
func (j *AuroraJob) RAM(ram int64) Job {
*j.resources[RAM].RamMb = ram
return j
}
// Disk sets the amount of Disk each task will use in an Aurora Job.
func (j *AuroraJob) Disk(disk int64) Job {
*j.resources[DISK].DiskMb = disk
return j
}
// GPU sets the amount of GPU each task will use in an Aurora Job.
func (j *AuroraJob) GPU(gpu int64) Job {
// GPU resource must be set explicitly since the scheduler by default
// rejects jobs with GPU resources attached to it.
@ -187,54 +194,58 @@ func (j *AuroraJob) GPU(gpu int64) Job {
return j
}
// How many failures to tolerate before giving up.
// MaxFailure sets how many failures to tolerate before giving up per Job.
func (j *AuroraJob) MaxFailure(maxFail int32) Job {
j.jobConfig.TaskConfig.MaxTaskFailures = maxFail
return j
}
// How many instances of the job to run
// InstanceCount sets how many instances of the task to run for this Job.
func (j *AuroraJob) InstanceCount(instCount int32) Job {
j.jobConfig.InstanceCount = instCount
return j
}
// CronSchedule allows the user to configure a cron schedule for this job to run in.
func (j *AuroraJob) CronSchedule(cron string) Job {
j.jobConfig.CronSchedule = &cron
return j
}
// CronCollisionPolicy allows the user to decide what happens if two or more instances
// of the same Cron job need to run.
func (j *AuroraJob) CronCollisionPolicy(policy aurora.CronCollisionPolicy) Job {
j.jobConfig.CronCollisionPolicy = policy
return j
}
// How many instances of the job to run
// GetInstanceCount returns how many tasks this Job contains.
func (j *AuroraJob) GetInstanceCount() int32 {
return j.jobConfig.InstanceCount
}
// Restart the job's tasks if they fail
// IsService returns true if the job is a long term running job or false if it is an ad-hoc job.
func (j *AuroraJob) IsService(isService bool) Job {
j.jobConfig.TaskConfig.IsService = isService
return j
}
// Get the current job configurations key to use for some realis calls.
// JobKey returns the job's configuration key.
func (j *AuroraJob) JobKey() *aurora.JobKey {
return j.jobConfig.Key
}
// Get the current job configurations key to use for some realis calls.
// JobConfig returns the job's configuration.
func (j *AuroraJob) JobConfig() *aurora.JobConfiguration {
return j.jobConfig
}
// TaskConfig returns the job's task(shard) configuration.
func (j *AuroraJob) TaskConfig() *aurora.TaskConfig {
return j.jobConfig.TaskConfig
}
// Add a list of URIs with the same extract and cache configuration. Scheduler must have
// AddURIs adds a list of URIs with the same extract and cache configuration. Scheduler must have
// --enable_mesos_fetcher flag enabled. Currently there is no duplicate detection.
func (j *AuroraJob) AddURIs(extract bool, cache bool, values ...string) Job {
for _, value := range values {
@ -244,7 +255,7 @@ func (j *AuroraJob) AddURIs(extract bool, cache bool, values ...string) Job {
return j
}
// Adds a Mesos label to the job. Note that Aurora will add the
// AddLabel adds a Mesos label to the job. Note that Aurora will add the
// prefix "org.apache.aurora.metadata." to the beginning of each key.
func (j *AuroraJob) AddLabel(key string, value string) Job {
if _, ok := j.metadata[key]; ok {
@ -256,7 +267,7 @@ func (j *AuroraJob) AddLabel(key string, value string) Job {
return j
}
// Add a named port to the job configuration These are random ports as it's
// AddNamedPorts adds a named port to the job configuration These are random ports as it's
// not currently possible to request specific ports using Aurora.
func (j *AuroraJob) AddNamedPorts(names ...string) Job {
j.portCount += len(names)
@ -269,7 +280,7 @@ func (j *AuroraJob) AddNamedPorts(names ...string) Job {
return j
}
// Adds a request for a number of ports to the job configuration. The names chosen for these ports
// AddPorts adds a request for a number of ports to the job configuration. The names chosen for these ports
// will be org.apache.aurora.port.X, where X is the current port count for the job configuration
// starting at 0. These are random ports as it's not currently possible to request
// specific ports using Aurora.
@ -286,6 +297,8 @@ func (j *AuroraJob) AddPorts(num int) Job {
return j
}
// AddValueConstraint allows the user to add a value constrain to the job to limiti which agents the job's
// tasks can be run on.
// From Aurora Docs:
// Add a Value constraint
// name - Mesos slave attribute that the constraint is matched against.
@ -307,6 +320,7 @@ func (j *AuroraJob) AddValueConstraint(name string, negated bool, values ...stri
return j
}
// AddLimitConstraint allows the user to limit how many tasks form the same Job are run on a single host.
// From Aurora Docs:
// A constraint that specifies the maximum number of active tasks on a host with
// a matching attribute that may be scheduled simultaneously.
@ -323,33 +337,34 @@ func (j *AuroraJob) AddLimitConstraint(name string, limit int32) Job {
return j
}
// AddDedicatedConstraint allows the user to add a dedicated constraint to a Job configuration.
func (j *AuroraJob) AddDedicatedConstraint(role, name string) Job {
j.AddValueConstraint("dedicated", false, role+"/"+name)
return j
}
// Set a container to run for the job configuration to run.
// Container sets a container to run for the job configuration to run.
func (j *AuroraJob) Container(container Container) Job {
j.jobConfig.TaskConfig.Container = container.Build()
return j
}
// Set a partition policy for the job configuration to implement.
// PartitionPolicy sets a partition policy for the job configuration to implement.
func (j *AuroraJob) PartitionPolicy(policy *aurora.PartitionPolicy) Job {
j.jobConfig.TaskConfig.PartitionPolicy = policy
return j
}
// Set the Tier for the Job.
// Tier sets the Tier for the Job.
func (j *AuroraJob) Tier(tier string) Job {
j.jobConfig.TaskConfig.Tier = &tier
return j
}
// Set an SlaPolicy for the Job.
// SlaPolicy sets an SlaPolicy for the Job.
func (j *AuroraJob) SlaPolicy(policy *aurora.SlaPolicy) Job {
j.jobConfig.TaskConfig.SlaPolicy = policy

View file

@ -14,65 +14,73 @@
package realis
type Logger interface {
type logger interface {
Println(v ...interface{})
Printf(format string, v ...interface{})
Print(v ...interface{})
}
// NoopLogger is a logger that can be attached to the client which will not print anything.
type NoopLogger struct{}
// Printf is a NOOP function here.
func (NoopLogger) Printf(format string, a ...interface{}) {}
// Print is a NOOP function here.
func (NoopLogger) Print(a ...interface{}) {}
// Println is a NOOP function here.
func (NoopLogger) Println(a ...interface{}) {}
// LevelLogger is a logger that can be configured to output different levels of information: Debug and Trace.
// Trace should only be enabled when very in depth information about the sequence of events a function took is needed.
type LevelLogger struct {
Logger
logger
debug bool
trace bool
}
// EnableDebug enables debug level logging for the LevelLogger
func (l *LevelLogger) EnableDebug(enable bool) {
l.debug = enable
}
// EnableTrace enables trace level logging for the LevelLogger
func (l *LevelLogger) EnableTrace(enable bool) {
l.trace = enable
}
func (l LevelLogger) DebugPrintf(format string, a ...interface{}) {
func (l LevelLogger) debugPrintf(format string, a ...interface{}) {
if l.debug {
l.Printf("[DEBUG] "+format, a...)
}
}
func (l LevelLogger) DebugPrint(a ...interface{}) {
func (l LevelLogger) debugPrint(a ...interface{}) {
if l.debug {
l.Print(append([]interface{}{"[DEBUG] "}, a...)...)
}
}
func (l LevelLogger) DebugPrintln(a ...interface{}) {
func (l LevelLogger) debugPrintln(a ...interface{}) {
if l.debug {
l.Println(append([]interface{}{"[DEBUG] "}, a...)...)
}
}
func (l LevelLogger) TracePrintf(format string, a ...interface{}) {
func (l LevelLogger) tracePrintf(format string, a ...interface{}) {
if l.trace {
l.Printf("[TRACE] "+format, a...)
}
}
func (l LevelLogger) TracePrint(a ...interface{}) {
func (l LevelLogger) tracePrint(a ...interface{}) {
if l.trace {
l.Print(append([]interface{}{"[TRACE] "}, a...)...)
}
}
func (l LevelLogger) TracePrintln(a ...interface{}) {
func (l LevelLogger) tracePrintln(a ...interface{}) {
if l.trace {
l.Println(append([]interface{}{"[TRACE] "}, a...)...)
}

View file

@ -12,7 +12,6 @@
* limitations under the License.
*/
// Collection of monitors to create synchronicity
package realis
import (
@ -22,11 +21,15 @@ import (
"github.com/pkg/errors"
)
// Monitor is a wrapper for the Realis client which allows us to have functions
// with the same name for Monitoring purposes.
// TODO(rdelvalle): Deprecate monitors and instead add prefix Monitor to
// all functions in this file like it is done in V2.
type Monitor struct {
Client Realis
}
// Polls the scheduler every certain amount of time to see if the update has succeeded
// JobUpdate polls the scheduler every certain amount of time to see if the update has entered a terminal state.
func (m *Monitor) JobUpdate(
updateKey aurora.JobUpdateKey,
interval int,
@ -71,6 +74,7 @@ func (m *Monitor) JobUpdate(
}
}
// JobUpdateStatus polls the scheduler every certain amount of time to see if the update has entered a specified state.
func (m *Monitor) JobUpdateStatus(
updateKey aurora.JobUpdateKey,
desiredStatuses map[aurora.JobUpdateStatus]bool,
@ -93,6 +97,7 @@ func (m *Monitor) JobUpdateStatus(
return summary[0].State.Status, err
}
// JobUpdateQuery polls the scheduler every certain amount of time to see if the query call returns any results.
func (m *Monitor) JobUpdateQuery(
updateQuery aurora.JobUpdateQuery,
interval time.Duration,
@ -124,7 +129,7 @@ func (m *Monitor) JobUpdateQuery(
}
}
// Monitor a Job until all instances enter one of the LIVE_STATES
// Instances will monitor a Job until all instances enter one of the LIVE_STATES
func (m *Monitor) Instances(
key *aurora.JobKey,
instances int32,
@ -132,7 +137,7 @@ func (m *Monitor) Instances(
return m.ScheduleStatus(key, instances, LiveStates, interval, timeout)
}
// Monitor a Job until all instances enter a desired status.
// ScheduleStatus will monitor a Job until all instances enter a desired status.
// Defaults sets of desired statuses provided by the thrift API include:
// ACTIVE_STATES, SLAVE_ASSIGNED_STATES, LIVE_STATES, and TERMINAL_STATES
func (m *Monitor) ScheduleStatus(
@ -173,7 +178,7 @@ func (m *Monitor) ScheduleStatus(
}
}
// Monitor host status until all hosts match the status provided.
// HostMaintenance will monitor host status until all hosts match the status provided.
// Returns a map where the value is true if the host
// is in one of the desired mode(s) or false if it is not as of the time when the monitor exited.
func (m *Monitor) HostMaintenance(

170
realis.go
View file

@ -38,8 +38,10 @@ import (
"github.com/paypal/gorealis/response"
)
const VERSION = "1.21.1"
const version = "1.21.1"
// Realis is an interface that defines the various APIs that may be used to communicate with
// the Apache Aurora scheduler.
// TODO(rdelvalle): Move documentation to interface in order to make godoc look better accessible
// Or get rid of the interface
type Realis interface {
@ -73,7 +75,7 @@ type Realis interface {
StartCronJob(key *aurora.JobKey) (*aurora.Response, error)
// TODO: Remove this method and make it private to avoid race conditions
ReestablishConn() error
RealisConfig() *RealisConfig
RealisConfig() *config
Close()
// Admin functions
@ -93,7 +95,7 @@ type Realis interface {
}
type realisClient struct {
config *RealisConfig
config *config
client *aurora.AuroraSchedulerManagerClient
readonlyClient *aurora.ReadOnlySchedulerClient
adminClient *aurora.AuroraAdminClient
@ -103,7 +105,7 @@ type realisClient struct {
transport thrift.TTransport
}
type RealisConfig struct {
type config struct {
username, password string
url string
timeoutms int
@ -130,43 +132,43 @@ var defaultBackoff = Backoff{
Jitter: 0.1,
}
// ClientOption - An alias for a function that modifies the realis config object
type ClientOption func(*RealisConfig)
// ClientOption is an alias for a function that modifies the realis config object
type ClientOption func(*config)
// BasicAuth - Set authentication used against Apache Shiro in the Aurora scheduler
// BasicAuth sets authentication used against Apache Shiro in the Aurora scheduler
func BasicAuth(username, password string) ClientOption {
return func(config *RealisConfig) {
return func(config *config) {
config.username = username
config.password = password
}
}
// SchedulerUrl - Set the immediate location of the current Aurora scheduler leader
// SchedulerUrl sets the immediate location of the current Aurora scheduler leader
func SchedulerUrl(url string) ClientOption {
return func(config *RealisConfig) {
return func(config *config) {
config.url = url
}
}
// TimeoutMS - Set the connection timeout for an HTTP post request in Miliseconds
// TimeoutMS sets the connection timeout for an HTTP post request in Miliseconds
func TimeoutMS(timeout int) ClientOption {
return func(config *RealisConfig) {
return func(config *config) {
config.timeoutms = timeout
}
}
// ZKCluster - Set a clusters.json provided cluster configuration to the client
// ZKCluster sets a clusters.json provided cluster configuration to the client
func ZKCluster(cluster *Cluster) ClientOption {
return func(config *RealisConfig) {
return func(config *config) {
config.cluster = cluster
}
}
// ZKUrl - Set the direct location of a Zookeeper node on which the Aurora leader registers itself
// ZKUrl sets the direct location of a Zookeeper node on which the Aurora leader registers itself
func ZKUrl(url string) ClientOption {
opts := []ZKOpt{ZKEndpoints(strings.Split(url, ",")...), ZKPath("/aurora/scheduler")}
return func(config *RealisConfig) {
return func(config *config) {
if config.zkOptions == nil {
config.zkOptions = opts
} else {
@ -175,87 +177,97 @@ func ZKUrl(url string) ClientOption {
}
}
// Retries - Configure the retry mechanism for the client
// Retries configures the retry mechanism for the client
func Retries(backoff Backoff) ClientOption {
return func(config *RealisConfig) {
return func(config *config) {
config.backoff = backoff
}
}
// ThriftJSON configures the client to use the Thrift JSON protocol.
func ThriftJSON() ClientOption {
return func(config *RealisConfig) {
return func(config *config) {
config.jsonTransport = true
}
}
// ThriftBinary configures the client to use the Thrift Binary protocol.
func ThriftBinary() ClientOption {
return func(config *RealisConfig) {
return func(config *config) {
config.binTransport = true
}
}
// BackOff is an alternative name for the Retry mechanism configuration.
func BackOff(b Backoff) ClientOption {
return func(config *RealisConfig) {
return func(config *config) {
config.backoff = b
}
}
// InsecureSkipVerify configures the client to not check for matching hosts names on certificates
// when using an SSL enabled Aurora scheduler.
func InsecureSkipVerify(insecureSkipVerify bool) ClientOption {
return func(config *RealisConfig) {
return func(config *config) {
config.insecureSkipVerify = insecureSkipVerify
}
}
// Certspath sets the directory where the server certificates to be used when connecting to an SSL enabled
// Aurora scheduler are stored.
func Certspath(certspath string) ClientOption {
return func(config *RealisConfig) {
return func(config *config) {
config.certspath = certspath
}
}
// ClientCerts allows users to set client key and certificate when connecting to an SSL enabled
// Aurora scheduler.
func ClientCerts(clientKey, clientCert string) ClientOption {
return func(config *RealisConfig) {
return func(config *config) {
config.clientKey, config.clientCert = clientKey, clientCert
}
}
// Use this option if you'd like to override default settings for connecting to Zookeeper.
// ZookeeperOptions allows users to override default settings for connecting to Zookeeper.
// See zk.go for what is possible to set as an option.
func ZookeeperOptions(opts ...ZKOpt) ClientOption {
return func(config *RealisConfig) {
return func(config *config) {
config.zkOptions = opts
}
}
// Using the word set to avoid name collision with Interface.
func SetLogger(l Logger) ClientOption {
return func(config *RealisConfig) {
config.logger = &LevelLogger{Logger: l}
// SetLogger allows the user to attach a logger that implements the logger interface in logger.go
// to the client.
func SetLogger(l logger) ClientOption {
return func(config *config) {
config.logger = &LevelLogger{logger: l}
}
}
// Enable debug statements.
// Debug enables debug statements in the client.
func Debug() ClientOption {
return func(config *RealisConfig) {
return func(config *config) {
config.debug = true
}
}
// Enable debug statements.
// Trace enables debug statements in the client.
func Trace() ClientOption {
return func(config *RealisConfig) {
return func(config *config) {
config.trace = true
}
}
// FailOnPermanentErrors - If the client encounters a connection error the standard library
// considers permanent, stop retrying and return an error to the user.
// FailOnPermanentErrors allows the client to stop upon encountering a connection error the standard library
// considers permanent and return an error to the user.
func FailOnPermanentErrors() ClientOption {
return func(config *RealisConfig) {
return func(config *config) {
config.failOnPermanentErrors = true
}
}
func newTJSONTransport(url string, timeout int, config *RealisConfig) (thrift.TTransport, error) {
func newTJSONTransport(url string, timeout int, config *config) (thrift.TTransport, error) {
trans, err := defaultTTransport(url, timeout, config)
if err != nil {
return nil, errors.Wrap(err, "unable to create transport")
@ -266,11 +278,11 @@ func newTJSONTransport(url string, timeout int, config *RealisConfig) (thrift.TT
}
httpTrans.SetHeader("Content-Type", "application/x-thrift")
httpTrans.SetHeader("User-Agent", "gorealis v"+VERSION)
httpTrans.SetHeader("User-Agent", "gorealis v"+version)
return trans, err
}
func newTBinTransport(url string, timeout int, config *RealisConfig) (thrift.TTransport, error) {
func newTBinTransport(url string, timeout int, config *config) (thrift.TTransport, error) {
trans, err := defaultTTransport(url, timeout, config)
if err != nil {
return nil, errors.Wrap(err, "unable to create transport")
@ -283,22 +295,22 @@ func newTBinTransport(url string, timeout int, config *RealisConfig) (thrift.TTr
httpTrans.DelHeader("Content-Type") // Workaround for using thrift HttpPostClient
httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary")
httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary")
httpTrans.SetHeader("User-Agent", "gorealis v"+VERSION)
httpTrans.SetHeader("User-Agent", "gorealis v"+version)
return trans, err
}
// This client implementation of the realis interface uses a retry mechanism for all Thrift Calls.
// NewRealisClient is a client implementation of the realis interface uses a retry mechanism for all Thrift Calls.
// It will retry all calls which result in a temporary failure as well as calls that fail due to an EOF
// being returned by the http client. Most permanent failures are now being caught by the thriftCallWithRetries
// function and not being retried but there may be corner cases not yet handled.
func NewRealisClient(options ...ClientOption) (Realis, error) {
config := &RealisConfig{}
config := &config{}
// Default configs
config.timeoutms = 10000
config.backoff = defaultBackoff
config.logger = &LevelLogger{Logger: log.New(os.Stdout, "realis: ", log.Ltime|log.Ldate|log.LUTC)}
config.logger = &LevelLogger{logger: log.New(os.Stdout, "realis: ", log.Ltime|log.Ldate|log.LUTC)}
// Save options to recreate client if a connection error happens
config.options = options
@ -312,13 +324,13 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
// Turn off all logging (including debug)
if config.logger == nil {
config.logger = &LevelLogger{Logger: NoopLogger{}}
config.logger = &LevelLogger{logger: NoopLogger{}}
}
// Set a logger if debug has been set to true but no logger has been set
if config.logger == nil && config.debug {
config.logger = &LevelLogger{
Logger: log.New(os.Stdout, "realis: ", log.Ltime|log.Ldate|log.LUTC),
logger: log.New(os.Stdout, "realis: ", log.Ltime|log.Ldate|log.LUTC),
debug: true,
}
}
@ -330,7 +342,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
config.logger.EnableDebug(config.debug)
config.logger.EnableTrace(config.trace)
config.logger.DebugPrintln("Number of options applied to config: ", len(options))
config.logger.debugPrintln("Number of options applied to config: ", len(options))
// Set default Transport to JSON if needed.
if !config.jsonTransport && !config.binTransport {
@ -403,11 +415,13 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
client: aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory),
readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory),
adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory),
logger: LevelLogger{Logger: config.logger, debug: config.debug, trace: config.trace},
logger: LevelLogger{logger: config.logger, debug: config.debug, trace: config.trace},
lock: &sync.Mutex{},
transport: config.transport}, nil
}
// GetDefaultClusterFromZKUrl creates a cluster object from a Zoookeper url. This is deprecated in favor of using
// Zookeeper options.
func GetDefaultClusterFromZKUrl(zkurl string) *Cluster {
return &Cluster{
Name: "defaultCluster",
@ -419,7 +433,7 @@ func GetDefaultClusterFromZKUrl(zkurl string) *Cluster {
}
}
func GetCerts(certPath string) (*x509.CertPool, error) {
func createCertPool(certPath string) (*x509.CertPool, error) {
globalRootCAs := x509.NewCertPool()
caFiles, err := ioutil.ReadDir(certPath)
if err != nil {
@ -437,13 +451,13 @@ func GetCerts(certPath string) (*x509.CertPool, error) {
}
// Creates a default Thrift Transport object for communications in gorealis using an HTTP Post Client
func defaultTTransport(url string, timeoutMs int, config *RealisConfig) (thrift.TTransport, error) {
func defaultTTransport(url string, timeoutMs int, config *config) (thrift.TTransport, error) {
var transport http.Transport
if config != nil {
tlsConfig := &tls.Config{InsecureSkipVerify: config.insecureSkipVerify}
if config.certspath != "" {
rootCAs, err := GetCerts(config.certspath)
rootCAs, err := createCertPool(config.certspath)
if err != nil {
config.logger.Println("error occurred couldn't fetch certs")
return nil, err
@ -536,7 +550,7 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states []aurora.Schedu
Statuses: states,
}
r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", taskQ)
r.logger.debugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", taskQ)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -561,7 +575,7 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states []aurora.Schedu
func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error) {
r.logger.DebugPrintf("GetJobUpdateSummaries Thrift Payload: %+v\n", jobUpdateQuery)
r.logger.debugPrintf("GetJobUpdateSummaries Thrift Payload: %+v\n", jobUpdateQuery)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -599,7 +613,7 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe
// Kill specific instances of a job.
func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) {
r.logger.DebugPrintf("KillTasks Thrift Payload: %+v %v\n", key, instances)
r.logger.debugPrintf("KillTasks Thrift Payload: %+v %v\n", key, instances)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -613,14 +627,14 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a
return resp, nil
}
func (r *realisClient) RealisConfig() *RealisConfig {
func (r *realisClient) RealisConfig() *config {
return r.config
}
// Sends a kill message to the scheduler for all active tasks under a job.
func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
r.logger.DebugPrintf("KillTasks Thrift Payload: %+v\n", key)
r.logger.debugPrintf("KillTasks Thrift Payload: %+v\n", key)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -641,7 +655,7 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
// Use this API to create ad-hoc jobs.
func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
r.logger.DebugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig())
r.logger.debugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig())
resp, retryErr := r.thriftCallWithRetries(
false,
@ -680,7 +694,7 @@ func (r *realisClient) CreateService(
}
func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) {
r.logger.DebugPrintf("ScheduleCronJob Thrift Payload: %+v\n", auroraJob.JobConfig())
r.logger.debugPrintf("ScheduleCronJob Thrift Payload: %+v\n", auroraJob.JobConfig())
resp, retryErr := r.thriftCallWithRetries(
false,
@ -696,7 +710,7 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) {
r.logger.DebugPrintf("DescheduleCronJob Thrift Payload: %+v\n", key)
r.logger.debugPrintf("DescheduleCronJob Thrift Payload: %+v\n", key)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -714,7 +728,7 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response,
func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error) {
r.logger.DebugPrintf("StartCronJob Thrift Payload: %+v\n", key)
r.logger.debugPrintf("StartCronJob Thrift Payload: %+v\n", key)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -731,7 +745,7 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error
// Restarts specific instances specified
func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) {
r.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instances)
r.logger.debugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instances)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -753,7 +767,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
return nil, errors.Wrap(err1, "Could not retrieve relevant task instance IDs")
}
r.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instanceIds)
r.logger.debugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instanceIds)
if len(instanceIds) > 0 {
resp, retryErr := r.thriftCallWithRetries(
@ -767,15 +781,15 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
}
return resp, nil
} else {
return nil, errors.New("No tasks in the Active state")
}
return nil, errors.New("No tasks in the Active state")
}
// Update all tasks under a job configuration. Currently gorealis doesn't support for canary deployments.
func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) {
r.logger.DebugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message)
r.logger.debugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message)
resp, retryErr := r.thriftCallWithRetries(
true,
@ -787,9 +801,9 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au
// A timeout took place when attempting this call, attempt to recover
if IsTimeout(retryErr) {
return resp, retryErr
} else {
return resp, errors.Wrap(retryErr, "error sending StartJobUpdate command to Aurora Scheduler")
}
return resp, errors.Wrap(retryErr, "error sending StartJobUpdate command to Aurora Scheduler")
}
return resp, nil
}
@ -799,7 +813,7 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au
// However, if the job update does not transition to the ABORT state an error will be returned.
func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) {
r.logger.DebugPrintf("AbortJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
r.logger.debugPrintf("AbortJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -825,7 +839,7 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str
// Pause Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.
func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) {
r.logger.DebugPrintf("PauseJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
r.logger.debugPrintf("PauseJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -843,7 +857,7 @@ func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st
// Resume Paused Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.
func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) {
r.logger.DebugPrintf("ResumeJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
r.logger.debugPrintf("ResumeJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -861,7 +875,7 @@ func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s
// Pulse Job Update on Aurora. UpdateID is returned from StartJobUpdate or the Aurora web UI.
func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.Response, error) {
r.logger.DebugPrintf("PulseJobUpdate Thrift Payload: %+v\n", updateKey)
r.logger.debugPrintf("PulseJobUpdate Thrift Payload: %+v\n", updateKey)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -880,7 +894,7 @@ func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R
// instance to scale up.
func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) {
r.logger.DebugPrintf("AddInstances Thrift Payload: %+v %v\n", instKey, count)
r.logger.debugPrintf("AddInstances Thrift Payload: %+v %v\n", instKey, count)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -919,7 +933,7 @@ func (r *realisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora
// Get information about task including a fully hydrated task configuration object
func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) {
r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", query)
r.logger.debugPrintf("GetTasksStatus Thrift Payload: %+v\n", query)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -937,7 +951,7 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.Schedul
// Get pending reason
func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.PendingReason, error) {
r.logger.DebugPrintf("GetPendingReason Thrift Payload: %+v\n", query)
r.logger.debugPrintf("GetPendingReason Thrift Payload: %+v\n", query)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -961,7 +975,7 @@ func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.Pend
// Get information about task including without a task configuration object
func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) {
r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query)
r.logger.debugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -987,7 +1001,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
Statuses: aurora.ACTIVE_STATES,
}
r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", taskQ)
r.logger.debugPrintf("GetTasksStatus Thrift Payload: %+v\n", taskQ)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -1015,7 +1029,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aurora.Response, error) {
r.logger.DebugPrintf("GetJobUpdateDetails Thrift Payload: %+v\n", updateQuery)
r.logger.debugPrintf("GetJobUpdateDetails Thrift Payload: %+v\n", updateQuery)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -1032,7 +1046,7 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur
func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) {
r.logger.DebugPrintf("RollbackJobUpdate Thrift Payload: %+v %v\n", key, message)
r.logger.debugPrintf("RollbackJobUpdate Thrift Payload: %+v %v\n", key, message)
resp, retryErr := r.thriftCallWithRetries(
false,

View file

@ -24,7 +24,7 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr
drainList := aurora.NewHosts()
drainList.HostNames = hosts
r.logger.DebugPrintf("DrainHosts Thrift Payload: %v\n", drainList)
r.logger.debugPrintf("DrainHosts Thrift Payload: %v\n", drainList)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -59,7 +59,7 @@ func (r *realisClient) SLADrainHosts(
drainList := aurora.NewHosts()
drainList.HostNames = hosts
r.logger.DebugPrintf("SLADrainHosts Thrift Payload: %v\n", drainList)
r.logger.debugPrintf("SLADrainHosts Thrift Payload: %v\n", drainList)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -89,7 +89,7 @@ func (r *realisClient) StartMaintenance(hosts ...string) (*aurora.Response, *aur
hostList := aurora.NewHosts()
hostList.HostNames = hosts
r.logger.DebugPrintf("StartMaintenance Thrift Payload: %v\n", hostList)
r.logger.debugPrintf("StartMaintenance Thrift Payload: %v\n", hostList)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -119,7 +119,7 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror
hostList := aurora.NewHosts()
hostList.HostNames = hosts
r.logger.DebugPrintf("EndMaintenance Thrift Payload: %v\n", hostList)
r.logger.debugPrintf("EndMaintenance Thrift Payload: %v\n", hostList)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -149,7 +149,7 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au
hostList := aurora.NewHosts()
hostList.HostNames = hosts
r.logger.DebugPrintf("MaintenanceStatus Thrift Payload: %v\n", hostList)
r.logger.debugPrintf("MaintenanceStatus Thrift Payload: %v\n", hostList)
// Make thrift call. If we encounter an error sending the call, attempt to reconnect
// and continue trying to resend command until we run out of retries.

View file

@ -77,17 +77,17 @@ func TestNonExistentEndpoint(t *testing.T) {
t.Run("WithRetries", func(t *testing.T) {
// Attempt to connect to a bad endpoint
r, err := realis.NewRealisClient(
badClient, err := realis.NewRealisClient(
realis.SchedulerUrl(badEndpoint),
realis.TimeoutMS(200000),
realis.BackOff(backoff),
)
require.NoError(t, err)
require.NotNil(t, r)
defer r.Close()
require.NotNil(t, badClient)
defer badClient.Close()
_, err = r.GetTasksWithoutConfigs(taskQ)
_, err = badClient.GetTasksWithoutConfigs(taskQ)
// Check that we do error out of retrying
require.Error(t, err)
@ -101,7 +101,7 @@ func TestNonExistentEndpoint(t *testing.T) {
t.Run("FailOnLookup", func(t *testing.T) {
// Attempt to connect to a bad endpoint
r, err := realis.NewRealisClient(
badClient, err := realis.NewRealisClient(
realis.SchedulerUrl(badEndpoint),
realis.TimeoutMS(200000),
realis.BackOff(backoff),
@ -109,10 +109,10 @@ func TestNonExistentEndpoint(t *testing.T) {
)
require.NoError(t, err)
require.NotNil(t, r)
defer r.Close()
require.NotNil(t, badClient)
defer badClient.Close()
_, err = r.GetTasksWithoutConfigs(taskQ)
_, err = badClient.GetTasksWithoutConfigs(taskQ)
// Check that we do error out of retrying
require.Error(t, err)
@ -195,12 +195,6 @@ func TestRealisClient_ReestablishConn(t *testing.T) {
assert.NoError(t, err)
}
func TestGetCACerts(t *testing.T) {
certs, err := realis.GetCerts("./examples/certs")
require.NoError(t, err)
assert.Equal(t, len(certs.Subjects()), 2)
}
func TestRealisClient_CreateJob_Thermos(t *testing.T) {
role := "vagrant"

View file

@ -26,6 +26,8 @@ import (
"github.com/pkg/errors"
)
// Backoff determines how the retry mechanism should react after each failure and how many failures it should
// tolerate.
type Backoff struct {
Duration time.Duration // the base duration
Factor float64 // Duration is multiplied by a factor each iteration
@ -50,19 +52,16 @@ func Jitter(duration time.Duration, maxFactor float64) time.Duration {
// if the loop should be aborted.
type ConditionFunc func() (done bool, err error)
// Modified version of the Kubernetes exponential-backoff code.
// ExponentialBackoff repeats a condition check with exponential backoff.
//
// It checks the condition up to Steps times, increasing the wait by multiplying
// the previous duration by Factor.
// ExponentialBackoff is a modified version of the Kubernetes exponential-backoff code.
// It repeats a condition check with exponential backoff and checks the condition up to
// Steps times, increasing the wait by multiplying the previous duration by Factor.
//
// If Jitter is greater than zero, a random amount of each duration is added
// (between duration and duration*(1+jitter)).
//
// If the condition never returns true, ErrWaitTimeout is returned. Errors
// do not cause the function to return.
func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc) error {
func ExponentialBackoff(backoff Backoff, logger logger, condition ConditionFunc) error {
var err error
var ok bool
var curStep int
@ -95,10 +94,9 @@ func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc)
// If the error is temporary, continue retrying.
if !IsTemporary(err) {
return err
} else {
// Print out the temporary error we experienced.
logger.Println(err)
}
// Print out the temporary error we experienced.
logger.Println(err)
}
}
@ -109,9 +107,9 @@ func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc)
// Provide more information to the user wherever possible
if err != nil {
return newRetryError(errors.Wrap(err, "ran out of retries"), curStep)
} else {
return newRetryError(errors.New("ran out of retries"), curStep)
}
return newRetryError(errors.New("ran out of retries"), curStep)
}
type auroraThriftCall func() (resp *aurora.Response, err error)
@ -156,7 +154,7 @@ func (r *realisClient) thriftCallWithRetries(
resp, clientErr = thriftCall()
r.logger.TracePrintf("Aurora Thrift Call ended resp: %v clientErr: %v\n", resp, clientErr)
r.logger.tracePrintf("Aurora Thrift Call ended resp: %v clientErr: %v\n", resp, clientErr)
}()
// Check if our thrift call is returning an error. This is a retryable event as we don't know
@ -169,7 +167,7 @@ func (r *realisClient) thriftCallWithRetries(
// Determine if error is a temporary URL error by going up the stack
e, ok := clientErr.(thrift.TTransportException)
if ok {
r.logger.DebugPrint("Encountered a transport exception")
r.logger.debugPrint("Encountered a transport exception")
e, ok := e.Err().(*url.Error)
if ok {
@ -186,7 +184,7 @@ func (r *realisClient) thriftCallWithRetries(
// error. Users can take special action on a timeout by using IsTimedout and reacting accordingly.
if e.Timeout() {
timeouts++
r.logger.DebugPrintf(
r.logger.debugPrintf(
"Client closed connection (timedout) %d times before server responded, "+
"consider increasing connection timeout",
timeouts)
@ -200,8 +198,10 @@ func (r *realisClient) thriftCallWithRetries(
// In the future, reestablish connection should be able to check if it is actually possible
// to make a thrift call to Aurora. For now, a reconnect should always lead to a retry.
// Ignoring error due to the fact that an error should be retried regardless
_ = r.ReestablishConn()
reestablishErr := r.ReestablishConn()
if reestablishErr != nil {
r.logger.debugPrintf("error re-establishing connection ", reestablishErr)
}
} else {
// If there was no client error, but the response is nil, something went wrong.
@ -233,14 +233,14 @@ func (r *realisClient) thriftCallWithRetries(
// The only case that should fall down to here is a WARNING response code.
// It is currently not used as a response in the scheduler so it is unknown how to handle it.
default:
r.logger.DebugPrintf("unhandled response code %v received from Aurora\n", responseCode)
r.logger.debugPrintf("unhandled response code %v received from Aurora\n", responseCode)
return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String())
}
}
}
r.logger.DebugPrintf("it took %v retries to complete this operation\n", curStep)
r.logger.debugPrintf("it took %v retries to complete this operation\n", curStep)
if curStep > 1 {
r.config.logger.Printf("retried this thrift call %d time(s)", curStep)
@ -249,7 +249,7 @@ func (r *realisClient) thriftCallWithRetries(
// Provide more information to the user wherever possible.
if clientErr != nil {
return nil, newRetryError(errors.Wrap(clientErr, "ran out of retries, including latest error"), curStep)
} else {
return nil, newRetryError(errors.New("ran out of retries"), curStep)
}
return nil, newRetryError(errors.New("ran out of retries"), curStep)
}

View file

@ -18,13 +18,13 @@ import (
"github.com/paypal/gorealis/gen-go/apache/aurora"
)
// Structure to collect all information required to create job update
// UpdateJob is a structure to collect all information required to create job update.
type UpdateJob struct {
Job // SetInstanceCount for job is hidden, access via full qualifier
req *aurora.JobUpdateRequest
}
// Create a default UpdateJob object.
// NewDefaultUpdateJob creates an UpdateJob object with opinionated default settings.
func NewDefaultUpdateJob(config *aurora.TaskConfig) *UpdateJob {
req := aurora.NewJobUpdateRequest()
@ -74,6 +74,7 @@ func NewDefaultUpdateJob(config *aurora.TaskConfig) *UpdateJob {
return &UpdateJob{Job: job, req: req}
}
// NewUpdateJob creates an UpdateJob object wihtout default settings.
func NewUpdateJob(config *aurora.TaskConfig, settings *aurora.JobUpdateSettings) *UpdateJob {
req := aurora.NewJobUpdateRequest()
@ -115,50 +116,50 @@ func NewUpdateJob(config *aurora.TaskConfig, settings *aurora.JobUpdateSettings)
return &UpdateJob{Job: job, req: req}
}
// Set instance count the job will have after the update.
// InstanceCount sets instance count the job will have after the update.
func (u *UpdateJob) InstanceCount(inst int32) *UpdateJob {
u.req.InstanceCount = inst
return u
}
// Max number of instances being updated at any given moment.
// BatchSize sets the max number of instances being updated at any given moment.
func (u *UpdateJob) BatchSize(size int32) *UpdateJob {
u.req.Settings.UpdateGroupSize = size
return u
}
// Minimum number of seconds a shard must remain in RUNNING state before considered a success.
// WatchTime sets the minimum number of seconds a shard must remain in RUNNING state before considered a success.
func (u *UpdateJob) WatchTime(ms int32) *UpdateJob {
u.req.Settings.MinWaitInInstanceRunningMs = ms
return u
}
// Wait for all instances in a group to be done before moving on.
// WaitForBatchCompletion configures the job update to wait for all instances in a group to be done before moving on.
func (u *UpdateJob) WaitForBatchCompletion(batchWait bool) *UpdateJob {
u.req.Settings.WaitForBatchCompletion = batchWait
return u
}
// Max number of instance failures to tolerate before marking instance as FAILED.
// MaxPerInstanceFailures sets the max number of instance failures to tolerate before marking instance as FAILED.
func (u *UpdateJob) MaxPerInstanceFailures(inst int32) *UpdateJob {
u.req.Settings.MaxPerInstanceFailures = inst
return u
}
// Max number of FAILED instances to tolerate before terminating the update.
// MaxFailedInstances sets the max number of FAILED instances to tolerate before terminating the update.
func (u *UpdateJob) MaxFailedInstances(inst int32) *UpdateJob {
u.req.Settings.MaxFailedInstances = inst
return u
}
// When False, prevents auto rollback of a failed update.
// RollbackOnFail configure the job to rollback automatically after a job update fails.
func (u *UpdateJob) RollbackOnFail(rollback bool) *UpdateJob {
u.req.Settings.RollbackOnFailure = rollback
return u
}
// NewUpdateSettings return an opinionated set of job update settings.
func NewUpdateSettings() *aurora.JobUpdateSettings {
us := new(aurora.JobUpdateSettings)
// Mirrors defaults set by Pystachio
us.UpdateGroupSize = 1

11
util.go
View file

@ -10,11 +10,22 @@ import (
const apiPath = "/api"
// ActiveStates - States a task may be in when active.
var ActiveStates = make(map[aurora.ScheduleStatus]bool)
// SlaveAssignedStates - States a task may be in when it has already been assigned to a Mesos agent.
var SlaveAssignedStates = make(map[aurora.ScheduleStatus]bool)
// LiveStates - States a task may be in when it is live (e.g. able to take traffic)
var LiveStates = make(map[aurora.ScheduleStatus]bool)
// TerminalStates - Set of states a task may not transition away from.
var TerminalStates = make(map[aurora.ScheduleStatus]bool)
// ActiveJobUpdateStates - States a Job Update may be in where it is considered active.
var ActiveJobUpdateStates = make(map[aurora.JobUpdateStatus]bool)
// AwaitingPulseJobUpdateStates - States a job update may be in where it is waiting for a pulse.
var AwaitingPulseJobUpdateStates = make(map[aurora.JobUpdateStatus]bool)
func init() {

27
zk.go
View file

@ -24,14 +24,14 @@ import (
"github.com/samuel/go-zookeeper/zk"
)
type Endpoint struct {
type endpoint struct {
Host string `json:"host"`
Port int `json:"port"`
}
type ServiceInstance struct {
Service Endpoint `json:"serviceEndpoint"`
AdditionalEndpoints map[string]Endpoint `json:"additionalEndpoints"`
type serviceInstance struct {
Service endpoint `json:"serviceEndpoint"`
AdditionalEndpoints map[string]endpoint `json:"additionalEndpoints"`
Status string `json:"status"`
}
@ -40,47 +40,54 @@ type zkConfig struct {
path string
backoff Backoff
timeout time.Duration
logger Logger
logger logger
}
// ZKOpt - Configuration option for the Zookeeper client used.
type ZKOpt func(z *zkConfig)
// ZKEndpoints - Endpoints on which a Zookeeper instance is running to be used by the client.
func ZKEndpoints(endpoints ...string) ZKOpt {
return func(z *zkConfig) {
z.endpoints = endpoints
}
}
// ZKPath - Path to look for information in when connected to Zookeeper.
func ZKPath(path string) ZKOpt {
return func(z *zkConfig) {
z.path = path
}
}
// ZKBackoff - Configuration for Retry mechanism used when connecting to Zookeeper.
// TODO(rdelvalle): Determine if this is really necessary as the ZK library already has a retry built in.
func ZKBackoff(b Backoff) ZKOpt {
return func(z *zkConfig) {
z.backoff = b
}
}
// ZKTimeout - How long to wait on a response from the Zookeeper instance before considering it dead.
func ZKTimeout(d time.Duration) ZKOpt {
return func(z *zkConfig) {
z.timeout = d
}
}
func ZKLogger(l Logger) ZKOpt {
// ZKLogger - Attach a logger to the Zookeeper client in order to debug issues.
func ZKLogger(l logger) ZKOpt {
return func(z *zkConfig) {
z.logger = l
}
}
// Retrieves current Aurora leader from ZK.
// LeaderFromZK - Retrieves current Aurora leader from ZK.
func LeaderFromZK(cluster Cluster) (string, error) {
return LeaderFromZKOpts(ZKEndpoints(strings.Split(cluster.ZK, ",")...), ZKPath(cluster.SchedZKPath))
}
// Retrieves current Aurora leader from ZK with a custom configuration.
// LeaderFromZKOpts - Retrieves current Aurora leader from ZK with a custom configuration.
func LeaderFromZKOpts(options ...ZKOpt) (string, error) {
var leaderURL string
@ -121,7 +128,6 @@ func LeaderFromZKOpts(options ...ZKOpt) (string, error) {
}
// Search for the leader through all the children in the given path
serviceInst := new(ServiceInstance)
for _, child := range children {
// Only the leader will start with member_
@ -137,7 +143,8 @@ func LeaderFromZKOpts(options ...ZKOpt) (string, error) {
return false, NewTemporaryError(errors.Wrap(err, "unable to fetch contents of leader"))
}
err = json.Unmarshal([]byte(data), serviceInst)
var serviceInst serviceInstance
err = json.Unmarshal([]byte(data), &serviceInst)
if err != nil {
return false, NewTemporaryError(errors.Wrap(err, "unable to unmarshal contents of leader"))
}

View file

@ -24,11 +24,12 @@ import (
"github.com/stretchr/testify/assert"
)
var backoff realis.Backoff = realis.Backoff{ // Reduce penalties for this test to make it quick
var backoff = realis.Backoff{ // Reduce penalties for this test to make it quick
Steps: 5,
Duration: 1 * time.Second,
Factor: 1.0,
Jitter: 0.1}
Jitter: 0.1,
}
// Test for behavior when no endpoints are given to the ZK leader finding function.
func TestZKNoEndpoints(t *testing.T) {