Added documentation to all exported functions and structs. Re-wrote some of the documentation to point to more up to date sources. Made some structures and functions that were needlessly exported unexported.

This commit is contained in:
Renan DelValle 2019-06-11 15:19:53 -07:00
parent b62ff3e182
commit 587e017a9a
No known key found for this signature in database
GPG key ID: C240AD6D6F443EC9
15 changed files with 256 additions and 183 deletions

View file

@ -21,6 +21,8 @@ import (
"github.com/pkg/errors"
)
// Cluster contains the definition of the clusters.json file used by the default Aurora
// client for configuration
type Cluster struct {
Name string `json:"name"`
AgentRoot string `json:"slave_root"`
@ -33,7 +35,8 @@ type Cluster struct {
AuthMechanism string `json:"auth_mechanism"`
}
// Loads clusters.json file traditionally located at /etc/aurora/clusters.json
// LoadClusters loads clusters.json file traditionally located at /etc/aurora/clusters.json
// for use with a gorealis client
func LoadClusters(config string) (map[string]Cluster, error) {
file, err := os.Open(config)

View file

@ -18,31 +18,40 @@ import (
"github.com/paypal/gorealis/gen-go/apache/aurora"
)
// Container is an interface that defines a single function needed to create
// an Aurora container type. It exists because the code must support both Mesos
// and Docker containers.
type Container interface {
Build() *aurora.Container
}
// MesosContainer is a Mesos style container that can be used by Aurora Jobs.
type MesosContainer struct {
container *aurora.MesosContainer
}
// DockerContainer is a vanilla Docker style container that can be used by Aurora Jobs.
type DockerContainer struct {
container *aurora.DockerContainer
}
// NewDockerContainer creates a new Aurora compatible Docker container configuration.
func NewDockerContainer() DockerContainer {
return DockerContainer{container: aurora.NewDockerContainer()}
}
// Build creates an Aurora container based upon the configuration provided.
func (c DockerContainer) Build() *aurora.Container {
return &aurora.Container{Docker: c.container}
}
// Image adds the name of a Docker image to be used by the Job when running.
func (c DockerContainer) Image(image string) DockerContainer {
c.container.Image = image
return c
}
// AddParameter adds a parameter to be passed to Docker when the container is run.
func (c DockerContainer) AddParameter(name, value string) DockerContainer {
c.container.Parameters = append(c.container.Parameters, &aurora.DockerParameter{
Name: name,
@ -51,14 +60,17 @@ func (c DockerContainer) AddParameter(name, value string) DockerContainer {
return c
}
// NewMesosContainer creates a Mesos style container to be configured and built for use by an Aurora Job.
func NewMesosContainer() MesosContainer {
return MesosContainer{container: aurora.NewMesosContainer()}
}
// Build creates a Mesos style Aurora container configuration to be passed on to the Aurora Job.
func (c MesosContainer) Build() *aurora.Container {
return &aurora.Container{Mesos: c.container}
}
// DockerImage configures the Mesos container to use a specific Docker image when being run.
func (c MesosContainer) DockerImage(name, tag string) MesosContainer {
if c.container.Image == nil {
c.container.Image = aurora.NewImage()
@ -68,11 +80,12 @@ func (c MesosContainer) DockerImage(name, tag string) MesosContainer {
return c
}
func (c MesosContainer) AppcImage(name, imageId string) MesosContainer {
// AppcImage configures the Mesos container to use an image in the Appc format to run the container.
func (c MesosContainer) AppcImage(name, imageID string) MesosContainer {
if c.container.Image == nil {
c.container.Image = aurora.NewImage()
}
c.container.Image.Appc = &aurora.AppcImage{Name: name, ImageId: imageId}
c.container.Image.Appc = &aurora.AppcImage{Name: name, ImageId: imageID}
return c
}

View file

@ -1,6 +1,6 @@
# Using the Sample client
## Usage:
## Usage:
```
Usage of ./client:
-cluster string

View file

@ -23,6 +23,8 @@ type timeout interface {
Timedout() bool
}
// IsTimeout returns true if the error being passed as an argument implements the Timeout interface
// and the Timedout function returns true.
func IsTimeout(err error) bool {
temp, ok := err.(timeout)
return ok && temp.Timedout()
@ -61,41 +63,42 @@ func (r *retryErr) RetryCount() int {
return r.retryCount
}
// Helper function for testing verification to avoid whitebox testing
// ToRetryCount is a helper function for testing verification to avoid whitebox testing
// as well as keeping retryErr as a private.
// Should NOT be used under any other context.
func ToRetryCount(err error) *retryErr {
if retryErr, ok := err.(*retryErr); ok {
return retryErr
} else {
return nil
}
return nil
}
func newRetryError(err error, retryCount int) *retryErr {
return &retryErr{error: err, timedout: true, retryCount: retryCount}
}
// Temporary errors indicate that the action may and should be retried.
// Temporary errors indicate that the action may or should be retried.
type temporary interface {
Temporary() bool
}
// IsTemporary indicates whether the error passed in as an argument implements the temporary interface
// and if the Temporary function returns true.
func IsTemporary(err error) bool {
temp, ok := err.(temporary)
return ok && temp.Temporary()
}
type TemporaryErr struct {
type temporaryErr struct {
error
temporary bool
}
func (t *TemporaryErr) Temporary() bool {
func (t *temporaryErr) Temporary() bool {
return t.temporary
}
// Retrying after receiving this error is advised
func NewTemporaryError(err error) *TemporaryErr {
return &TemporaryErr{error: err, temporary: true}
// NewTemporaryError creates a new error which satisfies the Temporary interface.
func NewTemporaryError(err error) *temporaryErr {
return &temporaryErr{error: err, temporary: true}
}

65
job.go
View file

@ -20,6 +20,9 @@ import (
"github.com/paypal/gorealis/gen-go/apache/aurora"
)
// Job inteface is used to define a set of functions an Aurora Job object
// must implemement.
// TODO(rdelvalle): Consider getting rid of the Job interface
type Job interface {
// Set Job Key environment.
Environment(env string) Job
@ -61,24 +64,24 @@ type Job interface {
SlaPolicy(policy *aurora.SlaPolicy) Job
}
type ResourceType int
type resourceType int
const (
CPU ResourceType = iota
CPU resourceType = iota
RAM
DISK
GPU
)
// Structure to collect all information pertaining to an Aurora job.
// AuroraJob is a structure to collect all information pertaining to an Aurora job.
type AuroraJob struct {
jobConfig *aurora.JobConfiguration
resources map[ResourceType]*aurora.Resource
resources map[resourceType]*aurora.Resource
metadata map[string]*aurora.Metadata
portCount int
}
// Create a Job object with everything initialized.
// NewJob is used to create a Job object with everything initialized.
func NewJob() Job {
jobConfig := aurora.NewJobConfiguration()
taskConfig := aurora.NewTaskConfig()
@ -98,7 +101,7 @@ func NewJob() Job {
ramMb := aurora.NewResource()
diskMb := aurora.NewResource()
resources := map[ResourceType]*aurora.Resource{CPU: numCpus, RAM: ramMb, DISK: diskMb}
resources := map[resourceType]*aurora.Resource{CPU: numCpus, RAM: ramMb, DISK: diskMb}
taskConfig.Resources = []*aurora.Resource{numCpus, ramMb, diskMb}
numCpus.NumCpus = new(float64)
@ -113,13 +116,13 @@ func NewJob() Job {
}
}
// Set Job Key environment.
// Environment sets the Job Key environment.
func (j *AuroraJob) Environment(env string) Job {
j.jobConfig.Key.Environment = env
return j
}
// Set Job Key Role.
// Role sets the Job Key role.
func (j *AuroraJob) Role(role string) Job {
j.jobConfig.Key.Role = role
@ -130,13 +133,13 @@ func (j *AuroraJob) Role(role string) Job {
return j
}
// Set Job Key Name.
// Name sets the Job Key Name.
func (j *AuroraJob) Name(name string) Job {
j.jobConfig.Key.Name = name
return j
}
// Set name of the executor that will the task will be configured to.
// ExecutorName sets the name of the executor that will the task will be configured to.
func (j *AuroraJob) ExecutorName(name string) Job {
if j.jobConfig.TaskConfig.ExecutorConfig == nil {
@ -147,7 +150,7 @@ func (j *AuroraJob) ExecutorName(name string) Job {
return j
}
// Will be included as part of entire task inside the scheduler that will be serialized.
// ExecutorData sets the data blob that will be passed to the Mesos executor.
func (j *AuroraJob) ExecutorData(data string) Job {
if j.jobConfig.TaskConfig.ExecutorConfig == nil {
@ -158,21 +161,25 @@ func (j *AuroraJob) ExecutorData(data string) Job {
return j
}
// CPU sets the amount of CPU each task will use in an Aurora Job.
func (j *AuroraJob) CPU(cpus float64) Job {
*j.resources[CPU].NumCpus = cpus
return j
}
// RAM sets the amount of RAM each task will use in an Aurora Job.
func (j *AuroraJob) RAM(ram int64) Job {
*j.resources[RAM].RamMb = ram
return j
}
// Disk sets the amount of Disk each task will use in an Aurora Job.
func (j *AuroraJob) Disk(disk int64) Job {
*j.resources[DISK].DiskMb = disk
return j
}
// GPU sets the amount of GPU each task will use in an Aurora Job.
func (j *AuroraJob) GPU(gpu int64) Job {
// GPU resource must be set explicitly since the scheduler by default
// rejects jobs with GPU resources attached to it.
@ -187,54 +194,58 @@ func (j *AuroraJob) GPU(gpu int64) Job {
return j
}
// How many failures to tolerate before giving up.
// MaxFailure sets how many failures to tolerate before giving up per Job.
func (j *AuroraJob) MaxFailure(maxFail int32) Job {
j.jobConfig.TaskConfig.MaxTaskFailures = maxFail
return j
}
// How many instances of the job to run
// InstanceCount sets how many instances of the task to run for this Job.
func (j *AuroraJob) InstanceCount(instCount int32) Job {
j.jobConfig.InstanceCount = instCount
return j
}
// CronSchedule allows the user to configure a cron schedule for this job to run in.
func (j *AuroraJob) CronSchedule(cron string) Job {
j.jobConfig.CronSchedule = &cron
return j
}
// CronCollisionPolicy allows the user to decide what happens if two or more instances
// of the same Cron job need to run.
func (j *AuroraJob) CronCollisionPolicy(policy aurora.CronCollisionPolicy) Job {
j.jobConfig.CronCollisionPolicy = policy
return j
}
// How many instances of the job to run
// GetInstanceCount returns how many tasks this Job contains.
func (j *AuroraJob) GetInstanceCount() int32 {
return j.jobConfig.InstanceCount
}
// Restart the job's tasks if they fail
// IsService returns true if the job is a long term running job or false if it is an ad-hoc job.
func (j *AuroraJob) IsService(isService bool) Job {
j.jobConfig.TaskConfig.IsService = isService
return j
}
// Get the current job configurations key to use for some realis calls.
// JobKey returns the job's configuration key.
func (j *AuroraJob) JobKey() *aurora.JobKey {
return j.jobConfig.Key
}
// Get the current job configurations key to use for some realis calls.
// JobConfig returns the job's configuration.
func (j *AuroraJob) JobConfig() *aurora.JobConfiguration {
return j.jobConfig
}
// TaskConfig returns the job's task(shard) configuration.
func (j *AuroraJob) TaskConfig() *aurora.TaskConfig {
return j.jobConfig.TaskConfig
}
// Add a list of URIs with the same extract and cache configuration. Scheduler must have
// AddURIs adds a list of URIs with the same extract and cache configuration. Scheduler must have
// --enable_mesos_fetcher flag enabled. Currently there is no duplicate detection.
func (j *AuroraJob) AddURIs(extract bool, cache bool, values ...string) Job {
for _, value := range values {
@ -244,7 +255,7 @@ func (j *AuroraJob) AddURIs(extract bool, cache bool, values ...string) Job {
return j
}
// Adds a Mesos label to the job. Note that Aurora will add the
// AddLabel adds a Mesos label to the job. Note that Aurora will add the
// prefix "org.apache.aurora.metadata." to the beginning of each key.
func (j *AuroraJob) AddLabel(key string, value string) Job {
if _, ok := j.metadata[key]; ok {
@ -256,7 +267,7 @@ func (j *AuroraJob) AddLabel(key string, value string) Job {
return j
}
// Add a named port to the job configuration These are random ports as it's
// AddNamedPorts adds a named port to the job configuration These are random ports as it's
// not currently possible to request specific ports using Aurora.
func (j *AuroraJob) AddNamedPorts(names ...string) Job {
j.portCount += len(names)
@ -269,7 +280,7 @@ func (j *AuroraJob) AddNamedPorts(names ...string) Job {
return j
}
// Adds a request for a number of ports to the job configuration. The names chosen for these ports
// AddPorts adds a request for a number of ports to the job configuration. The names chosen for these ports
// will be org.apache.aurora.port.X, where X is the current port count for the job configuration
// starting at 0. These are random ports as it's not currently possible to request
// specific ports using Aurora.
@ -286,6 +297,8 @@ func (j *AuroraJob) AddPorts(num int) Job {
return j
}
// AddValueConstraint allows the user to add a value constrain to the job to limiti which agents the job's
// tasks can be run on.
// From Aurora Docs:
// Add a Value constraint
// name - Mesos slave attribute that the constraint is matched against.
@ -307,6 +320,7 @@ func (j *AuroraJob) AddValueConstraint(name string, negated bool, values ...stri
return j
}
// AddLimitConstraint allows the user to limit how many tasks form the same Job are run on a single host.
// From Aurora Docs:
// A constraint that specifies the maximum number of active tasks on a host with
// a matching attribute that may be scheduled simultaneously.
@ -323,33 +337,34 @@ func (j *AuroraJob) AddLimitConstraint(name string, limit int32) Job {
return j
}
// AddDedicatedConstraint allows the user to add a dedicated constraint to a Job configuration.
func (j *AuroraJob) AddDedicatedConstraint(role, name string) Job {
j.AddValueConstraint("dedicated", false, role+"/"+name)
return j
}
// Set a container to run for the job configuration to run.
// Container sets a container to run for the job configuration to run.
func (j *AuroraJob) Container(container Container) Job {
j.jobConfig.TaskConfig.Container = container.Build()
return j
}
// Set a partition policy for the job configuration to implement.
// PartitionPolicy sets a partition policy for the job configuration to implement.
func (j *AuroraJob) PartitionPolicy(policy *aurora.PartitionPolicy) Job {
j.jobConfig.TaskConfig.PartitionPolicy = policy
return j
}
// Set the Tier for the Job.
// Tier sets the Tier for the Job.
func (j *AuroraJob) Tier(tier string) Job {
j.jobConfig.TaskConfig.Tier = &tier
return j
}
// Set an SlaPolicy for the Job.
// SlaPolicy sets an SlaPolicy for the Job.
func (j *AuroraJob) SlaPolicy(policy *aurora.SlaPolicy) Job {
j.jobConfig.TaskConfig.SlaPolicy = policy

View file

@ -14,65 +14,73 @@
package realis
type Logger interface {
type logger interface {
Println(v ...interface{})
Printf(format string, v ...interface{})
Print(v ...interface{})
}
// NoopLogger is a logger that can be attached to the client which will not print anything.
type NoopLogger struct{}
// Printf is a NOOP function here.
func (NoopLogger) Printf(format string, a ...interface{}) {}
// Print is a NOOP function here.
func (NoopLogger) Print(a ...interface{}) {}
// Println is a NOOP function here.
func (NoopLogger) Println(a ...interface{}) {}
// LevelLogger is a logger that can be configured to output different levels of information: Debug and Trace.
// Trace should only be enabled when very in depth information about the sequence of events a function took is needed.
type LevelLogger struct {
Logger
logger
debug bool
trace bool
}
// EnableDebug enables debug level logging for the LevelLogger
func (l *LevelLogger) EnableDebug(enable bool) {
l.debug = enable
}
// EnableTrace enables trace level logging for the LevelLogger
func (l *LevelLogger) EnableTrace(enable bool) {
l.trace = enable
}
func (l LevelLogger) DebugPrintf(format string, a ...interface{}) {
func (l LevelLogger) debugPrintf(format string, a ...interface{}) {
if l.debug {
l.Printf("[DEBUG] "+format, a...)
}
}
func (l LevelLogger) DebugPrint(a ...interface{}) {
func (l LevelLogger) debugPrint(a ...interface{}) {
if l.debug {
l.Print(append([]interface{}{"[DEBUG] "}, a...)...)
}
}
func (l LevelLogger) DebugPrintln(a ...interface{}) {
func (l LevelLogger) debugPrintln(a ...interface{}) {
if l.debug {
l.Println(append([]interface{}{"[DEBUG] "}, a...)...)
}
}
func (l LevelLogger) TracePrintf(format string, a ...interface{}) {
func (l LevelLogger) tracePrintf(format string, a ...interface{}) {
if l.trace {
l.Printf("[TRACE] "+format, a...)
}
}
func (l LevelLogger) TracePrint(a ...interface{}) {
func (l LevelLogger) tracePrint(a ...interface{}) {
if l.trace {
l.Print(append([]interface{}{"[TRACE] "}, a...)...)
}
}
func (l LevelLogger) TracePrintln(a ...interface{}) {
func (l LevelLogger) tracePrintln(a ...interface{}) {
if l.trace {
l.Println(append([]interface{}{"[TRACE] "}, a...)...)
}

View file

@ -12,7 +12,6 @@
* limitations under the License.
*/
// Collection of monitors to create synchronicity
package realis
import (
@ -22,11 +21,13 @@ import (
"github.com/pkg/errors"
)
// Monitor is a wrapper for the Realis client which allows us to have functions with the same name for Monitoring purposes.
// TODO(rdelvalle): Deprecate monitors and instead add prefix Monitor to all functions in this file like it is done in V2.
type Monitor struct {
Client Realis
}
// Polls the scheduler every certain amount of time to see if the update has succeeded
// JobUpdate polls the scheduler every certain amount of time to see if the update has entered a terminal state.
func (m *Monitor) JobUpdate(
updateKey aurora.JobUpdateKey,
interval int,
@ -71,6 +72,7 @@ func (m *Monitor) JobUpdate(
}
}
// JobUpdateStatus polls the scheduler every certain amount of time to see if the update has entered a specified state.
func (m *Monitor) JobUpdateStatus(
updateKey aurora.JobUpdateKey,
desiredStatuses map[aurora.JobUpdateStatus]bool,
@ -93,6 +95,7 @@ func (m *Monitor) JobUpdateStatus(
return summary[0].State.Status, err
}
// JobUpdateQuery polls the scheduler every certain amount of time to see if the query call returns any results.
func (m *Monitor) JobUpdateQuery(
updateQuery aurora.JobUpdateQuery,
interval time.Duration,
@ -124,7 +127,7 @@ func (m *Monitor) JobUpdateQuery(
}
}
// Monitor a Job until all instances enter one of the LIVE_STATES
// Instances will monitor a Job until all instances enter one of the LIVE_STATES
func (m *Monitor) Instances(
key *aurora.JobKey,
instances int32,
@ -132,7 +135,7 @@ func (m *Monitor) Instances(
return m.ScheduleStatus(key, instances, LiveStates, interval, timeout)
}
// Monitor a Job until all instances enter a desired status.
// ScheduleStatus will monitor a Job until all instances enter a desired status.
// Defaults sets of desired statuses provided by the thrift API include:
// ACTIVE_STATES, SLAVE_ASSIGNED_STATES, LIVE_STATES, and TERMINAL_STATES
func (m *Monitor) ScheduleStatus(
@ -173,7 +176,7 @@ func (m *Monitor) ScheduleStatus(
}
}
// Monitor host status until all hosts match the status provided.
// HostMaintenance will monitor host status until all hosts match the status provided.
// Returns a map where the value is true if the host
// is in one of the desired mode(s) or false if it is not as of the time when the monitor exited.
func (m *Monitor) HostMaintenance(

170
realis.go
View file

@ -38,8 +38,10 @@ import (
"github.com/paypal/gorealis/response"
)
const VERSION = "1.21.1"
const version = "1.21.1"
// Realis is an interface that defines the various APIs that may be used to communicate with
// the Apache Aurora scheduler.
// TODO(rdelvalle): Move documentation to interface in order to make godoc look better accessible
// Or get rid of the interface
type Realis interface {
@ -73,7 +75,7 @@ type Realis interface {
StartCronJob(key *aurora.JobKey) (*aurora.Response, error)
// TODO: Remove this method and make it private to avoid race conditions
ReestablishConn() error
RealisConfig() *RealisConfig
RealisConfig() *config
Close()
// Admin functions
@ -93,7 +95,7 @@ type Realis interface {
}
type realisClient struct {
config *RealisConfig
config *config
client *aurora.AuroraSchedulerManagerClient
readonlyClient *aurora.ReadOnlySchedulerClient
adminClient *aurora.AuroraAdminClient
@ -103,7 +105,7 @@ type realisClient struct {
transport thrift.TTransport
}
type RealisConfig struct {
type config struct {
username, password string
url string
timeoutms int
@ -130,43 +132,43 @@ var defaultBackoff = Backoff{
Jitter: 0.1,
}
// ClientOption - An alias for a function that modifies the realis config object
type ClientOption func(*RealisConfig)
// ClientOption is an alias for a function that modifies the realis config object
type ClientOption func(*config)
// BasicAuth - Set authentication used against Apache Shiro in the Aurora scheduler
// BasicAuth sets authentication used against Apache Shiro in the Aurora scheduler
func BasicAuth(username, password string) ClientOption {
return func(config *RealisConfig) {
return func(config *config) {
config.username = username
config.password = password
}
}
// SchedulerUrl - Set the immediate location of the current Aurora scheduler leader
// SchedulerUrl sets the immediate location of the current Aurora scheduler leader
func SchedulerUrl(url string) ClientOption {
return func(config *RealisConfig) {
return func(config *config) {
config.url = url
}
}
// TimeoutMS - Set the connection timeout for an HTTP post request in Miliseconds
// TimeoutMS sets the connection timeout for an HTTP post request in Miliseconds
func TimeoutMS(timeout int) ClientOption {
return func(config *RealisConfig) {
return func(config *config) {
config.timeoutms = timeout
}
}
// ZKCluster - Set a clusters.json provided cluster configuration to the client
// ZKCluster sets a clusters.json provided cluster configuration to the client
func ZKCluster(cluster *Cluster) ClientOption {
return func(config *RealisConfig) {
return func(config *config) {
config.cluster = cluster
}
}
// ZKUrl - Set the direct location of a Zookeeper node on which the Aurora leader registers itself
// ZKUrl sets the direct location of a Zookeeper node on which the Aurora leader registers itself
func ZKUrl(url string) ClientOption {
opts := []ZKOpt{ZKEndpoints(strings.Split(url, ",")...), ZKPath("/aurora/scheduler")}
return func(config *RealisConfig) {
return func(config *config) {
if config.zkOptions == nil {
config.zkOptions = opts
} else {
@ -175,87 +177,97 @@ func ZKUrl(url string) ClientOption {
}
}
// Retries - Configure the retry mechanism for the client
// Retries configures the retry mechanism for the client
func Retries(backoff Backoff) ClientOption {
return func(config *RealisConfig) {
return func(config *config) {
config.backoff = backoff
}
}
// ThriftJSON configures the client to use the Thrift JSON protocol.
func ThriftJSON() ClientOption {
return func(config *RealisConfig) {
return func(config *config) {
config.jsonTransport = true
}
}
// ThriftBinary configures the client to use the Thrift Binary protocol.
func ThriftBinary() ClientOption {
return func(config *RealisConfig) {
return func(config *config) {
config.binTransport = true
}
}
// BackOff is an alternative name for the Retry mechanism configuration.
func BackOff(b Backoff) ClientOption {
return func(config *RealisConfig) {
return func(config *config) {
config.backoff = b
}
}
// InsecureSkipVerify configures the client to not check for matching hosts names on certificates
// when using an SSL enabled Aurora scheduler.
func InsecureSkipVerify(insecureSkipVerify bool) ClientOption {
return func(config *RealisConfig) {
return func(config *config) {
config.insecureSkipVerify = insecureSkipVerify
}
}
// Certspath sets the directory where the server certificates to be used when connecting to an SSL enabled
// Aurora scheduler are stored.
func Certspath(certspath string) ClientOption {
return func(config *RealisConfig) {
return func(config *config) {
config.certspath = certspath
}
}
// ClientCerts allows users to set client key and certificate when connecting to an SSL enabled
// Aurora scheduler.
func ClientCerts(clientKey, clientCert string) ClientOption {
return func(config *RealisConfig) {
return func(config *config) {
config.clientKey, config.clientCert = clientKey, clientCert
}
}
// Use this option if you'd like to override default settings for connecting to Zookeeper.
// ZookeeperOptions allows users to override default settings for connecting to Zookeeper.
// See zk.go for what is possible to set as an option.
func ZookeeperOptions(opts ...ZKOpt) ClientOption {
return func(config *RealisConfig) {
return func(config *config) {
config.zkOptions = opts
}
}
// Using the word set to avoid name collision with Interface.
func SetLogger(l Logger) ClientOption {
return func(config *RealisConfig) {
config.logger = &LevelLogger{Logger: l}
// SetLogger allows the user to attach a logger that implements the logger interface in logger.go
// to the client.
func SetLogger(l logger) ClientOption {
return func(config *config) {
config.logger = &LevelLogger{logger: l}
}
}
// Enable debug statements.
// Debug enables debug statements in the client.
func Debug() ClientOption {
return func(config *RealisConfig) {
return func(config *config) {
config.debug = true
}
}
// Enable debug statements.
// Trace enables debug statements in the client.
func Trace() ClientOption {
return func(config *RealisConfig) {
return func(config *config) {
config.trace = true
}
}
// FailOnPermanentErrors - If the client encounters a connection error the standard library
// considers permanent, stop retrying and return an error to the user.
// FailOnPermanentErrors allows the client to stop upon encountering a connection error the standard library
// considers permanent and return an error to the user.
func FailOnPermanentErrors() ClientOption {
return func(config *RealisConfig) {
return func(config *config) {
config.failOnPermanentErrors = true
}
}
func newTJSONTransport(url string, timeout int, config *RealisConfig) (thrift.TTransport, error) {
func newTJSONTransport(url string, timeout int, config *config) (thrift.TTransport, error) {
trans, err := defaultTTransport(url, timeout, config)
if err != nil {
return nil, errors.Wrap(err, "unable to create transport")
@ -266,11 +278,11 @@ func newTJSONTransport(url string, timeout int, config *RealisConfig) (thrift.TT
}
httpTrans.SetHeader("Content-Type", "application/x-thrift")
httpTrans.SetHeader("User-Agent", "gorealis v"+VERSION)
httpTrans.SetHeader("User-Agent", "gorealis v"+version)
return trans, err
}
func newTBinTransport(url string, timeout int, config *RealisConfig) (thrift.TTransport, error) {
func newTBinTransport(url string, timeout int, config *config) (thrift.TTransport, error) {
trans, err := defaultTTransport(url, timeout, config)
if err != nil {
return nil, errors.Wrap(err, "unable to create transport")
@ -283,22 +295,22 @@ func newTBinTransport(url string, timeout int, config *RealisConfig) (thrift.TTr
httpTrans.DelHeader("Content-Type") // Workaround for using thrift HttpPostClient
httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary")
httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary")
httpTrans.SetHeader("User-Agent", "gorealis v"+VERSION)
httpTrans.SetHeader("User-Agent", "gorealis v"+version)
return trans, err
}
// This client implementation of the realis interface uses a retry mechanism for all Thrift Calls.
// NewRealisClient is a client implementation of the realis interface uses a retry mechanism for all Thrift Calls.
// It will retry all calls which result in a temporary failure as well as calls that fail due to an EOF
// being returned by the http client. Most permanent failures are now being caught by the thriftCallWithRetries
// function and not being retried but there may be corner cases not yet handled.
func NewRealisClient(options ...ClientOption) (Realis, error) {
config := &RealisConfig{}
config := &config{}
// Default configs
config.timeoutms = 10000
config.backoff = defaultBackoff
config.logger = &LevelLogger{Logger: log.New(os.Stdout, "realis: ", log.Ltime|log.Ldate|log.LUTC)}
config.logger = &LevelLogger{logger: log.New(os.Stdout, "realis: ", log.Ltime|log.Ldate|log.LUTC)}
// Save options to recreate client if a connection error happens
config.options = options
@ -312,13 +324,13 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
// Turn off all logging (including debug)
if config.logger == nil {
config.logger = &LevelLogger{Logger: NoopLogger{}}
config.logger = &LevelLogger{logger: NoopLogger{}}
}
// Set a logger if debug has been set to true but no logger has been set
if config.logger == nil && config.debug {
config.logger = &LevelLogger{
Logger: log.New(os.Stdout, "realis: ", log.Ltime|log.Ldate|log.LUTC),
logger: log.New(os.Stdout, "realis: ", log.Ltime|log.Ldate|log.LUTC),
debug: true,
}
}
@ -330,7 +342,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
config.logger.EnableDebug(config.debug)
config.logger.EnableTrace(config.trace)
config.logger.DebugPrintln("Number of options applied to config: ", len(options))
config.logger.debugPrintln("Number of options applied to config: ", len(options))
// Set default Transport to JSON if needed.
if !config.jsonTransport && !config.binTransport {
@ -403,11 +415,13 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
client: aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory),
readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory),
adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory),
logger: LevelLogger{Logger: config.logger, debug: config.debug, trace: config.trace},
logger: LevelLogger{logger: config.logger, debug: config.debug, trace: config.trace},
lock: &sync.Mutex{},
transport: config.transport}, nil
}
// GetDefaultClusterFromZKUrl creates a cluster object from a Zoookeper url. This is deprecated in favor of using
// Zookeeper options.
func GetDefaultClusterFromZKUrl(zkurl string) *Cluster {
return &Cluster{
Name: "defaultCluster",
@ -419,7 +433,7 @@ func GetDefaultClusterFromZKUrl(zkurl string) *Cluster {
}
}
func GetCerts(certPath string) (*x509.CertPool, error) {
func createCertPool(certPath string) (*x509.CertPool, error) {
globalRootCAs := x509.NewCertPool()
caFiles, err := ioutil.ReadDir(certPath)
if err != nil {
@ -437,13 +451,13 @@ func GetCerts(certPath string) (*x509.CertPool, error) {
}
// Creates a default Thrift Transport object for communications in gorealis using an HTTP Post Client
func defaultTTransport(url string, timeoutMs int, config *RealisConfig) (thrift.TTransport, error) {
func defaultTTransport(url string, timeoutMs int, config *config) (thrift.TTransport, error) {
var transport http.Transport
if config != nil {
tlsConfig := &tls.Config{InsecureSkipVerify: config.insecureSkipVerify}
if config.certspath != "" {
rootCAs, err := GetCerts(config.certspath)
rootCAs, err := createCertPool(config.certspath)
if err != nil {
config.logger.Println("error occurred couldn't fetch certs")
return nil, err
@ -536,7 +550,7 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states []aurora.Schedu
Statuses: states,
}
r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", taskQ)
r.logger.debugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", taskQ)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -561,7 +575,7 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states []aurora.Schedu
func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error) {
r.logger.DebugPrintf("GetJobUpdateSummaries Thrift Payload: %+v\n", jobUpdateQuery)
r.logger.debugPrintf("GetJobUpdateSummaries Thrift Payload: %+v\n", jobUpdateQuery)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -599,7 +613,7 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe
// Kill specific instances of a job.
func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) {
r.logger.DebugPrintf("KillTasks Thrift Payload: %+v %v\n", key, instances)
r.logger.debugPrintf("KillTasks Thrift Payload: %+v %v\n", key, instances)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -613,14 +627,14 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a
return resp, nil
}
func (r *realisClient) RealisConfig() *RealisConfig {
func (r *realisClient) RealisConfig() *config {
return r.config
}
// Sends a kill message to the scheduler for all active tasks under a job.
func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
r.logger.DebugPrintf("KillTasks Thrift Payload: %+v\n", key)
r.logger.debugPrintf("KillTasks Thrift Payload: %+v\n", key)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -641,7 +655,7 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
// Use this API to create ad-hoc jobs.
func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
r.logger.DebugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig())
r.logger.debugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig())
resp, retryErr := r.thriftCallWithRetries(
false,
@ -680,7 +694,7 @@ func (r *realisClient) CreateService(
}
func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) {
r.logger.DebugPrintf("ScheduleCronJob Thrift Payload: %+v\n", auroraJob.JobConfig())
r.logger.debugPrintf("ScheduleCronJob Thrift Payload: %+v\n", auroraJob.JobConfig())
resp, retryErr := r.thriftCallWithRetries(
false,
@ -696,7 +710,7 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) {
r.logger.DebugPrintf("DescheduleCronJob Thrift Payload: %+v\n", key)
r.logger.debugPrintf("DescheduleCronJob Thrift Payload: %+v\n", key)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -714,7 +728,7 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response,
func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error) {
r.logger.DebugPrintf("StartCronJob Thrift Payload: %+v\n", key)
r.logger.debugPrintf("StartCronJob Thrift Payload: %+v\n", key)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -731,7 +745,7 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error
// Restarts specific instances specified
func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) {
r.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instances)
r.logger.debugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instances)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -753,7 +767,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
return nil, errors.Wrap(err1, "Could not retrieve relevant task instance IDs")
}
r.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instanceIds)
r.logger.debugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instanceIds)
if len(instanceIds) > 0 {
resp, retryErr := r.thriftCallWithRetries(
@ -767,15 +781,15 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
}
return resp, nil
} else {
return nil, errors.New("No tasks in the Active state")
}
return nil, errors.New("No tasks in the Active state")
}
// Update all tasks under a job configuration. Currently gorealis doesn't support for canary deployments.
func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) {
r.logger.DebugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message)
r.logger.debugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message)
resp, retryErr := r.thriftCallWithRetries(
true,
@ -787,9 +801,9 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au
// A timeout took place when attempting this call, attempt to recover
if IsTimeout(retryErr) {
return resp, retryErr
} else {
return resp, errors.Wrap(retryErr, "error sending StartJobUpdate command to Aurora Scheduler")
}
return resp, errors.Wrap(retryErr, "error sending StartJobUpdate command to Aurora Scheduler")
}
return resp, nil
}
@ -799,7 +813,7 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au
// However, if the job update does not transition to the ABORT state an error will be returned.
func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) {
r.logger.DebugPrintf("AbortJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
r.logger.debugPrintf("AbortJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -825,7 +839,7 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str
// Pause Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.
func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) {
r.logger.DebugPrintf("PauseJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
r.logger.debugPrintf("PauseJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -843,7 +857,7 @@ func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st
// Resume Paused Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.
func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) {
r.logger.DebugPrintf("ResumeJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
r.logger.debugPrintf("ResumeJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -861,7 +875,7 @@ func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s
// Pulse Job Update on Aurora. UpdateID is returned from StartJobUpdate or the Aurora web UI.
func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.Response, error) {
r.logger.DebugPrintf("PulseJobUpdate Thrift Payload: %+v\n", updateKey)
r.logger.debugPrintf("PulseJobUpdate Thrift Payload: %+v\n", updateKey)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -880,7 +894,7 @@ func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R
// instance to scale up.
func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) {
r.logger.DebugPrintf("AddInstances Thrift Payload: %+v %v\n", instKey, count)
r.logger.debugPrintf("AddInstances Thrift Payload: %+v %v\n", instKey, count)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -919,7 +933,7 @@ func (r *realisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora
// Get information about task including a fully hydrated task configuration object
func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) {
r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", query)
r.logger.debugPrintf("GetTasksStatus Thrift Payload: %+v\n", query)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -937,7 +951,7 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.Schedul
// Get pending reason
func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.PendingReason, error) {
r.logger.DebugPrintf("GetPendingReason Thrift Payload: %+v\n", query)
r.logger.debugPrintf("GetPendingReason Thrift Payload: %+v\n", query)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -961,7 +975,7 @@ func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.Pend
// Get information about task including without a task configuration object
func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) {
r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query)
r.logger.debugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -987,7 +1001,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
Statuses: aurora.ACTIVE_STATES,
}
r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", taskQ)
r.logger.debugPrintf("GetTasksStatus Thrift Payload: %+v\n", taskQ)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -1015,7 +1029,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aurora.Response, error) {
r.logger.DebugPrintf("GetJobUpdateDetails Thrift Payload: %+v\n", updateQuery)
r.logger.debugPrintf("GetJobUpdateDetails Thrift Payload: %+v\n", updateQuery)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -1032,7 +1046,7 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur
func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) {
r.logger.DebugPrintf("RollbackJobUpdate Thrift Payload: %+v %v\n", key, message)
r.logger.debugPrintf("RollbackJobUpdate Thrift Payload: %+v %v\n", key, message)
resp, retryErr := r.thriftCallWithRetries(
false,

View file

@ -24,7 +24,7 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr
drainList := aurora.NewHosts()
drainList.HostNames = hosts
r.logger.DebugPrintf("DrainHosts Thrift Payload: %v\n", drainList)
r.logger.debugPrintf("DrainHosts Thrift Payload: %v\n", drainList)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -59,7 +59,7 @@ func (r *realisClient) SLADrainHosts(
drainList := aurora.NewHosts()
drainList.HostNames = hosts
r.logger.DebugPrintf("SLADrainHosts Thrift Payload: %v\n", drainList)
r.logger.debugPrintf("SLADrainHosts Thrift Payload: %v\n", drainList)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -89,7 +89,7 @@ func (r *realisClient) StartMaintenance(hosts ...string) (*aurora.Response, *aur
hostList := aurora.NewHosts()
hostList.HostNames = hosts
r.logger.DebugPrintf("StartMaintenance Thrift Payload: %v\n", hostList)
r.logger.debugPrintf("StartMaintenance Thrift Payload: %v\n", hostList)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -119,7 +119,7 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror
hostList := aurora.NewHosts()
hostList.HostNames = hosts
r.logger.DebugPrintf("EndMaintenance Thrift Payload: %v\n", hostList)
r.logger.debugPrintf("EndMaintenance Thrift Payload: %v\n", hostList)
resp, retryErr := r.thriftCallWithRetries(
false,
@ -149,7 +149,7 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au
hostList := aurora.NewHosts()
hostList.HostNames = hosts
r.logger.DebugPrintf("MaintenanceStatus Thrift Payload: %v\n", hostList)
r.logger.debugPrintf("MaintenanceStatus Thrift Payload: %v\n", hostList)
// Make thrift call. If we encounter an error sending the call, attempt to reconnect
// and continue trying to resend command until we run out of retries.

View file

@ -195,12 +195,6 @@ func TestRealisClient_ReestablishConn(t *testing.T) {
assert.NoError(t, err)
}
func TestGetCACerts(t *testing.T) {
certs, err := realis.GetCerts("./examples/certs")
require.NoError(t, err)
assert.Equal(t, len(certs.Subjects()), 2)
}
func TestRealisClient_CreateJob_Thermos(t *testing.T) {
role := "vagrant"

View file

@ -26,6 +26,8 @@ import (
"github.com/pkg/errors"
)
// Backoff determines how the retry mechanism should react after each failure and how many failures it should
// tolerate.
type Backoff struct {
Duration time.Duration // the base duration
Factor float64 // Duration is multiplied by a factor each iteration
@ -50,19 +52,16 @@ func Jitter(duration time.Duration, maxFactor float64) time.Duration {
// if the loop should be aborted.
type ConditionFunc func() (done bool, err error)
// Modified version of the Kubernetes exponential-backoff code.
// ExponentialBackoff repeats a condition check with exponential backoff.
//
// It checks the condition up to Steps times, increasing the wait by multiplying
// the previous duration by Factor.
// ExponentialBackoff is a modified version of the Kubernetes exponential-backoff code.
// It repeats a condition check with exponential backoff and checks the condition up to
// Steps times, increasing the wait by multiplying the previous duration by Factor.
//
// If Jitter is greater than zero, a random amount of each duration is added
// (between duration and duration*(1+jitter)).
//
// If the condition never returns true, ErrWaitTimeout is returned. Errors
// do not cause the function to return.
func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc) error {
func ExponentialBackoff(backoff Backoff, logger logger, condition ConditionFunc) error {
var err error
var ok bool
var curStep int
@ -95,10 +94,9 @@ func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc)
// If the error is temporary, continue retrying.
if !IsTemporary(err) {
return err
} else {
// Print out the temporary error we experienced.
logger.Println(err)
}
// Print out the temporary error we experienced.
logger.Println(err)
}
}
@ -109,9 +107,9 @@ func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc)
// Provide more information to the user wherever possible
if err != nil {
return newRetryError(errors.Wrap(err, "ran out of retries"), curStep)
} else {
return newRetryError(errors.New("ran out of retries"), curStep)
}
return newRetryError(errors.New("ran out of retries"), curStep)
}
type auroraThriftCall func() (resp *aurora.Response, err error)
@ -156,7 +154,7 @@ func (r *realisClient) thriftCallWithRetries(
resp, clientErr = thriftCall()
r.logger.TracePrintf("Aurora Thrift Call ended resp: %v clientErr: %v\n", resp, clientErr)
r.logger.tracePrintf("Aurora Thrift Call ended resp: %v clientErr: %v\n", resp, clientErr)
}()
// Check if our thrift call is returning an error. This is a retryable event as we don't know
@ -169,7 +167,7 @@ func (r *realisClient) thriftCallWithRetries(
// Determine if error is a temporary URL error by going up the stack
e, ok := clientErr.(thrift.TTransportException)
if ok {
r.logger.DebugPrint("Encountered a transport exception")
r.logger.debugPrint("Encountered a transport exception")
e, ok := e.Err().(*url.Error)
if ok {
@ -186,7 +184,7 @@ func (r *realisClient) thriftCallWithRetries(
// error. Users can take special action on a timeout by using IsTimedout and reacting accordingly.
if e.Timeout() {
timeouts++
r.logger.DebugPrintf(
r.logger.debugPrintf(
"Client closed connection (timedout) %d times before server responded, "+
"consider increasing connection timeout",
timeouts)
@ -200,8 +198,10 @@ func (r *realisClient) thriftCallWithRetries(
// In the future, reestablish connection should be able to check if it is actually possible
// to make a thrift call to Aurora. For now, a reconnect should always lead to a retry.
// Ignoring error due to the fact that an error should be retried regardless
_ = r.ReestablishConn()
reestablishErr := r.ReestablishConn()
if reestablishErr != nil {
r.logger.debugPrintf("error re-establishing connection ", reestablishErr)
}
} else {
// If there was no client error, but the response is nil, something went wrong.
@ -233,14 +233,14 @@ func (r *realisClient) thriftCallWithRetries(
// The only case that should fall down to here is a WARNING response code.
// It is currently not used as a response in the scheduler so it is unknown how to handle it.
default:
r.logger.DebugPrintf("unhandled response code %v received from Aurora\n", responseCode)
r.logger.debugPrintf("unhandled response code %v received from Aurora\n", responseCode)
return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String())
}
}
}
r.logger.DebugPrintf("it took %v retries to complete this operation\n", curStep)
r.logger.debugPrintf("it took %v retries to complete this operation\n", curStep)
if curStep > 1 {
r.config.logger.Printf("retried this thrift call %d time(s)", curStep)
@ -249,7 +249,7 @@ func (r *realisClient) thriftCallWithRetries(
// Provide more information to the user wherever possible.
if clientErr != nil {
return nil, newRetryError(errors.Wrap(clientErr, "ran out of retries, including latest error"), curStep)
} else {
return nil, newRetryError(errors.New("ran out of retries"), curStep)
}
return nil, newRetryError(errors.New("ran out of retries"), curStep)
}

View file

@ -18,13 +18,13 @@ import (
"github.com/paypal/gorealis/gen-go/apache/aurora"
)
// Structure to collect all information required to create job update
// UpdateJob is a structure to collect all information required to create job update.
type UpdateJob struct {
Job // SetInstanceCount for job is hidden, access via full qualifier
req *aurora.JobUpdateRequest
}
// Create a default UpdateJob object.
// NewDefaultUpdateJob creates an UpdateJob object with opinionated default settings.
func NewDefaultUpdateJob(config *aurora.TaskConfig) *UpdateJob {
req := aurora.NewJobUpdateRequest()
@ -74,6 +74,7 @@ func NewDefaultUpdateJob(config *aurora.TaskConfig) *UpdateJob {
return &UpdateJob{Job: job, req: req}
}
// NewUpdateJob creates an UpdateJob object wihtout default settings.
func NewUpdateJob(config *aurora.TaskConfig, settings *aurora.JobUpdateSettings) *UpdateJob {
req := aurora.NewJobUpdateRequest()
@ -115,50 +116,50 @@ func NewUpdateJob(config *aurora.TaskConfig, settings *aurora.JobUpdateSettings)
return &UpdateJob{Job: job, req: req}
}
// Set instance count the job will have after the update.
// InstanceCount sets instance count the job will have after the update.
func (u *UpdateJob) InstanceCount(inst int32) *UpdateJob {
u.req.InstanceCount = inst
return u
}
// Max number of instances being updated at any given moment.
// BatchSize sets the max number of instances being updated at any given moment.
func (u *UpdateJob) BatchSize(size int32) *UpdateJob {
u.req.Settings.UpdateGroupSize = size
return u
}
// Minimum number of seconds a shard must remain in RUNNING state before considered a success.
// WatchTime sets the minimum number of seconds a shard must remain in RUNNING state before considered a success.
func (u *UpdateJob) WatchTime(ms int32) *UpdateJob {
u.req.Settings.MinWaitInInstanceRunningMs = ms
return u
}
// Wait for all instances in a group to be done before moving on.
// WaitForBatchCompletion configures the job update to wait for all instances in a group to be done before moving on.
func (u *UpdateJob) WaitForBatchCompletion(batchWait bool) *UpdateJob {
u.req.Settings.WaitForBatchCompletion = batchWait
return u
}
// Max number of instance failures to tolerate before marking instance as FAILED.
// MaxPerInstanceFailures sets the max number of instance failures to tolerate before marking instance as FAILED.
func (u *UpdateJob) MaxPerInstanceFailures(inst int32) *UpdateJob {
u.req.Settings.MaxPerInstanceFailures = inst
return u
}
// Max number of FAILED instances to tolerate before terminating the update.
// MaxFailedInstances sets the max number of FAILED instances to tolerate before terminating the update.
func (u *UpdateJob) MaxFailedInstances(inst int32) *UpdateJob {
u.req.Settings.MaxFailedInstances = inst
return u
}
// When False, prevents auto rollback of a failed update.
// RollbackOnFail configure the job to rollback automatically after a job update fails.
func (u *UpdateJob) RollbackOnFail(rollback bool) *UpdateJob {
u.req.Settings.RollbackOnFailure = rollback
return u
}
// NewUpdateSettings return an opinionated set of job update settings.
func NewUpdateSettings() *aurora.JobUpdateSettings {
us := new(aurora.JobUpdateSettings)
// Mirrors defaults set by Pystachio
us.UpdateGroupSize = 1

11
util.go
View file

@ -10,11 +10,22 @@ import (
const apiPath = "/api"
// ActiveStates - States a task may be in when active.
var ActiveStates = make(map[aurora.ScheduleStatus]bool)
// SlaveAssignedStates - States a task may be in when it has already been assigned to a Mesos agent.
var SlaveAssignedStates = make(map[aurora.ScheduleStatus]bool)
// LiveStates - States a task may be in when it is live (e.g. able to take traffic)
var LiveStates = make(map[aurora.ScheduleStatus]bool)
// TerminalStates - Set of states a task may not transition away from.
var TerminalStates = make(map[aurora.ScheduleStatus]bool)
// ActiveJobUpdateStates - States a Job Update may be in where it is considered active.
var ActiveJobUpdateStates = make(map[aurora.JobUpdateStatus]bool)
// AwaitingPulseJobUpdateStates - States a job update may be in where it is waiting for a pulse.
var AwaitingPulseJobUpdateStates = make(map[aurora.JobUpdateStatus]bool)
func init() {

27
zk.go
View file

@ -24,14 +24,14 @@ import (
"github.com/samuel/go-zookeeper/zk"
)
type Endpoint struct {
type endpoint struct {
Host string `json:"host"`
Port int `json:"port"`
}
type ServiceInstance struct {
Service Endpoint `json:"serviceEndpoint"`
AdditionalEndpoints map[string]Endpoint `json:"additionalEndpoints"`
type serviceInstance struct {
Service endpoint `json:"serviceEndpoint"`
AdditionalEndpoints map[string]endpoint `json:"additionalEndpoints"`
Status string `json:"status"`
}
@ -40,47 +40,54 @@ type zkConfig struct {
path string
backoff Backoff
timeout time.Duration
logger Logger
logger logger
}
// ZKOpt - Configuration option for the Zookeeper client used.
type ZKOpt func(z *zkConfig)
// ZKEndpoints - Endpoints on which a Zookeeper instance is running to be used by the client.
func ZKEndpoints(endpoints ...string) ZKOpt {
return func(z *zkConfig) {
z.endpoints = endpoints
}
}
// ZKPath - Path to look for information in when connected to Zookeeper.
func ZKPath(path string) ZKOpt {
return func(z *zkConfig) {
z.path = path
}
}
// ZKBackoff - Configuration for Retry mechanism used when connecting to Zookeeper.
// TODO(rdelvalle): Determine if this is really necessary as the ZK library already has a retry built in.
func ZKBackoff(b Backoff) ZKOpt {
return func(z *zkConfig) {
z.backoff = b
}
}
// ZKTimeout - How long to wait on a response from the Zookeeper instance before considering it dead.
func ZKTimeout(d time.Duration) ZKOpt {
return func(z *zkConfig) {
z.timeout = d
}
}
func ZKLogger(l Logger) ZKOpt {
// ZKLogger - Attach a logger to the Zookeeper client in order to debug issues.
func ZKLogger(l logger) ZKOpt {
return func(z *zkConfig) {
z.logger = l
}
}
// Retrieves current Aurora leader from ZK.
// LeaderFromZK - Retrieves current Aurora leader from ZK.
func LeaderFromZK(cluster Cluster) (string, error) {
return LeaderFromZKOpts(ZKEndpoints(strings.Split(cluster.ZK, ",")...), ZKPath(cluster.SchedZKPath))
}
// Retrieves current Aurora leader from ZK with a custom configuration.
// LeaderFromZKOpts - Retrieves current Aurora leader from ZK with a custom configuration.
func LeaderFromZKOpts(options ...ZKOpt) (string, error) {
var leaderURL string
@ -121,7 +128,6 @@ func LeaderFromZKOpts(options ...ZKOpt) (string, error) {
}
// Search for the leader through all the children in the given path
serviceInst := new(ServiceInstance)
for _, child := range children {
// Only the leader will start with member_
@ -137,7 +143,8 @@ func LeaderFromZKOpts(options ...ZKOpt) (string, error) {
return false, NewTemporaryError(errors.Wrap(err, "unable to fetch contents of leader"))
}
err = json.Unmarshal([]byte(data), serviceInst)
var serviceInst serviceInstance
err = json.Unmarshal([]byte(data), &serviceInst)
if err != nil {
return false, NewTemporaryError(errors.Wrap(err, "unable to unmarshal contents of leader"))
}

View file

@ -24,11 +24,12 @@ import (
"github.com/stretchr/testify/assert"
)
var backoff realis.Backoff = realis.Backoff{ // Reduce penalties for this test to make it quick
var backoff = realis.Backoff{ // Reduce penalties for this test to make it quick
Steps: 5,
Duration: 1 * time.Second,
Factor: 1.0,
Jitter: 0.1}
Jitter: 0.1,
}
// Test for behavior when no endpoints are given to the ZK leader finding function.
func TestZKNoEndpoints(t *testing.T) {