diff --git a/.github/main.yml b/.github/main.yml deleted file mode 100644 index 0445266..0000000 --- a/.github/main.yml +++ /dev/null @@ -1,25 +0,0 @@ -name: CI - -on: [push] - -jobs: - build: - - runs-on: ubuntu-latest - - steps: - - uses: actions/checkout@v2 - - name: Setup Go for use with actions - uses: actions/setup-go@v2 - with: - go-version: 1.16 - - name: Install goimports - run: go get golang.org/x/tools/cmd/goimports - - name: Set env with list of directories in repo containin go code - run: echo GO_USR_DIRS=$(go list -f {{.Dir}} ./... | grep -E -v "/gen-go/|/vendor/") >> $GITHUB_ENV - - name: Run goimports check - run: test -z "`for d in $GO_USR_DIRS; do goimports -d $d/*.go | tee /dev/stderr; done`" - - name: Create aurora/mesos docker cluster - run: docker-compose up -d - - name: Run tests - run: go test -timeout 35m -race -coverprofile=coverage.txt -covermode=atomic -v github.com/paypal/gorealis diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml deleted file mode 100644 index 6c1890f..0000000 --- a/.github/workflows/codeql-analysis.yml +++ /dev/null @@ -1,57 +0,0 @@ -# For most projects, this workflow file will not need changing; you simply need -# to commit it to your repository. -# -# You may wish to alter this file to override the set of languages analyzed, -# or to provide custom queries or build logic. -# -# ******** NOTE ******** -# We have attempted to detect the languages in your repository. Please check -# the `language` matrix defined below to confirm you have the correct set of -# supported CodeQL languages. -# -name: "CodeQL" - -on: - push: - branches: [ main ] - pull_request: - # The branches below must be a subset of the branches above - branches: [ main ] - schedule: - - cron: '34 4 * * 3' - -jobs: - analyze: - name: Analyze - runs-on: ubuntu-latest - permissions: - actions: read - contents: read - security-events: write - - strategy: - fail-fast: false - matrix: - language: [ 'go' ] - # CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python' ] - # Learn more: - # https://docs.github.com/en/free-pro-team@latest/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#changing-the-languages-that-are-analyzed - - steps: - - name: Checkout repository - uses: actions/checkout@v2 - - # Initializes the CodeQL tools for scanning. - - name: Initialize CodeQL - uses: github/codeql-action/init@v1 - with: - languages: ${{ matrix.language }} - # If you wish to specify custom queries, you can do so here or in a config file. - # By default, queries listed here will override any specified in a config file. - # Prefix the list here with "+" to use these queries and those in the config file. - # queries: ./path/to/local/query, your-org/your-repo/queries@main - - - run: go build examples/client.go - - - name: Perform CodeQL Analysis - uses: github/codeql-action/analyze@v1 diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml deleted file mode 100644 index 7edfc51..0000000 --- a/.github/workflows/main.yml +++ /dev/null @@ -1,30 +0,0 @@ -name: CI - -on: - push: - branches: - - main - pull_request: - branches: - - main -jobs: - build: - - runs-on: ubuntu-latest - - steps: - - uses: actions/checkout@v2 - - name: Setup Go for use with actions - uses: actions/setup-go@v2 - with: - go-version: 1.16 - - name: Install goimports - run: go get golang.org/x/tools/cmd/goimports - - name: Set env with list of directories in repo containin go code - run: echo GO_USR_DIRS=$(go list -f {{.Dir}} ./... | grep -E -v "/gen-go/|/vendor/") >> $GITHUB_ENV - - name: Run goimports check - run: test -z "`for d in $GO_USR_DIRS; do goimports -d $d/*.go | tee /dev/stderr; done`" - - name: Create aurora/mesos docker cluster - run: docker-compose up -d - - name: Run tests - run: go test -timeout 35m -race -coverprofile=coverage.txt -covermode=atomic -v github.com/paypal/gorealis diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..f9afb52 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,33 @@ + +os: linux +dist: xenial +language: go + +branches: + only: + - main + - future + +go: + - "1.15.x" + +env: + global: + - GO_USR_DIRS=$(go list -f {{.Dir}} ./... | grep -E -v "/gen-go/|/vendor/") + +services: + - docker + +before_install: + - go get golang.org/x/tools/cmd/goimports + - test -z "`for d in $GO_USR_DIRS; do goimports -d $d/*.go | tee /dev/stderr; done`" + +install: + - go mod download + - docker-compose up -d + +script: + - go test -timeout 30m -race -coverprofile=coverage.txt -covermode=atomic -v github.com/paypal/gorealis + +after_success: + - bash <(curl -s https://codecov.io/bash) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9cfbaef..3e95ba0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,20 +1,3 @@ -1.25.1 (unreleased) - -1.25.0 - -* Add priority api - -1.24.0 - -* enable default sla for slaDrain -* Changes Travis CI badge to Github Actions badge -* Bug fix for auto paused update monitor -* Adds support for running CI on github actions - -1.23.0 - -* First release tested against Aurora Scheduler 0.23.0 - 1.22.5 * Upgrading to thrift 0.14.0 diff --git a/Gopkg.lock b/Gopkg.lock new file mode 100644 index 0000000..1994a30 --- /dev/null +++ b/Gopkg.lock @@ -0,0 +1,64 @@ +# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'. + + +[[projects]] + digest = "1:89696c38cec777120b8b1bb5e2d363d655cf2e1e7d8c851919aaa0fd576d9b86" + name = "github.com/apache/thrift" + packages = ["lib/go/thrift"] + pruneopts = "" + revision = "384647d290e2e4a55a14b1b7ef1b7e66293a2c33" + version = "v0.12.0" + +[[projects]] + digest = "1:56c130d885a4aacae1dd9c7b71cfe39912c7ebc1ff7d2b46083c8812996dc43b" + name = "github.com/davecgh/go-spew" + packages = ["spew"] + pruneopts = "" + revision = "346938d642f2ec3594ed81d874461961cd0faa76" + version = "v1.1.0" + +[[projects]] + digest = "1:df48fb76fb2a40edea0c9b3d960bc95e326660d82ff1114e1f88001f7a236b40" + name = "github.com/pkg/errors" + packages = ["."] + pruneopts = "" + revision = "e881fd58d78e04cf6d0de1217f8707c8cc2249bc" + +[[projects]] + digest = "1:256484dbbcd271f9ecebc6795b2df8cad4c458dd0f5fd82a8c2fa0c29f233411" + name = "github.com/pmezard/go-difflib" + packages = ["difflib"] + pruneopts = "" + revision = "792786c7400a136282c1664665ae0a8db921c6c2" + version = "v1.0.0" + +[[projects]] + digest = "1:78bea5e26e82826dacc5fd64a1013a6711b7075ec8072819b89e6ad76cb8196d" + name = "github.com/samuel/go-zookeeper" + packages = ["zk"] + pruneopts = "" + revision = "471cd4e61d7a78ece1791fa5faa0345dc8c7d5a5" + +[[projects]] + digest = "1:381bcbeb112a51493d9d998bbba207a529c73dbb49b3fd789e48c63fac1f192c" + name = "github.com/stretchr/testify" + packages = [ + "assert", + "require", + ] + pruneopts = "" + revision = "ffdc059bfe9ce6a4e144ba849dbedead332c6053" + version = "v1.3.0" + +[solve-meta] + analyzer-name = "dep" + analyzer-version = 1 + input-imports = [ + "github.com/apache/thrift/lib/go/thrift", + "github.com/pkg/errors", + "github.com/samuel/go-zookeeper/zk", + "github.com/stretchr/testify/assert", + "github.com/stretchr/testify/require", + ] + solver-name = "gps-cdcl" + solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml new file mode 100644 index 0000000..5d6c9f2 --- /dev/null +++ b/Gopkg.toml @@ -0,0 +1,16 @@ +[[constraint]] + name = "github.com/apache/thrift" + version = "0.12.0" + +[[constraint]] + name = "github.com/pkg/errors" + revision = "e881fd58d78e04cf6d0de1217f8707c8cc2249bc" + +[[constraint]] + name = "github.com/samuel/go-zookeeper" + revision = "471cd4e61d7a78ece1791fa5faa0345dc8c7d5a5" + +[[constraint]] + name = "github.com/stretchr/testify" + version = "1.3.0" + diff --git a/README.md b/README.md index 0ce8bc1..3d33bd8 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# gorealis [![GoDoc](https://godoc.org/github.com/paypal/gorealis?status.svg)](https://godoc.org/github.com/paypal/gorealis) ![CI Build Status](https://github.com/paypal/gorealis/actions/workflows/main.yml/badge.svg) [![codecov](https://codecov.io/gh/paypal/gorealis/branch/main/graph/badge.svg)](https://codecov.io/gh/paypal/gorealis) +# gorealis [![GoDoc](https://godoc.org/github.com/paypal/gorealis?status.svg)](https://godoc.org/github.com/paypal/gorealis) [![Build Status](https://travis-ci.org/paypal/gorealis.svg?branch=main)](https://travis-ci.org/paypal/gorealis) [![codecov](https://codecov.io/gh/paypal/gorealis/branch/main/graph/badge.svg)](https://codecov.io/gh/paypal/gorealis) Version 1 of Go library for interacting with [Aurora Scheduler](https://github.com/aurora-scheduler/aurora). diff --git a/docker-compose.yml b/docker-compose.yml index 9c6169f..932053d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,7 +14,7 @@ services: ipv4_address: 192.168.33.2 master: - image: aurorascheduler/mesos-master:1.7.2 + image: rdelvalle/mesos-master:1.6.2 restart: on-failure ports: - "5050:5050" @@ -32,7 +32,7 @@ services: - zk agent-one: - image: aurorascheduler/mesos-agent:1.7.2 + image: rdelvalle/mesos-agent:1.6.2 pid: host restart: on-failure ports: @@ -57,7 +57,7 @@ services: - zk agent-two: - image: aurorascheduler/mesos-agent:1.7.2 + image: rdelvalle/mesos-agent:1.6.2 pid: host restart: on-failure ports: @@ -82,7 +82,7 @@ services: - zk aurora-one: - image: aurorascheduler/scheduler:0.23.0 + image: rdelvalle/aurora:0.22.0 pid: host ports: - "8081:8081" diff --git a/go.mod b/go.mod index 9497185..506556b 100644 --- a/go.mod +++ b/go.mod @@ -8,5 +8,5 @@ require ( github.com/pkg/errors v0.9.1 github.com/pmezard/go-difflib v1.0.0 // indirect github.com/samuel/go-zookeeper v0.0.0-20171117190445-471cd4e61d7a - github.com/stretchr/testify v1.7.0 + github.com/stretchr/testify v1.2.0 ) diff --git a/go.sum b/go.sum deleted file mode 100644 index f3f5eb6..0000000 --- a/go.sum +++ /dev/null @@ -1,30 +0,0 @@ -github.com/apache/thrift v0.13.0 h1:5hryIiq9gtn+MiLVn0wP37kb/uTeRZgN08WoCsAhIhI= -github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= -github.com/apache/thrift v0.14.0 h1:vqZ2DP42i8th2OsgCcYZkirtbzvpZEFx53LiWDJXIAs= -github.com/apache/thrift v0.14.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/pkg/errors v0.0.0-20171216070316-e881fd58d78e h1:+RHxT/gm0O3UF7nLJbdNzAmULvCFt4XfXHWzh3XI/zs= -github.com/pkg/errors v0.0.0-20171216070316-e881fd58d78e/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= -github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/ridv/thrift v0.12.1 h1:b80V1Oa2Mbd++jrlJZbJsIybO5/MCfbXKzd1A5v4aSo= -github.com/ridv/thrift v0.12.1/go.mod h1:yTMRF94RCZjO1fY1xt69yncvMbQCPdRL8BhbwIrjPx8= -github.com/ridv/thrift v0.13.1 h1:/8XnTRUqJJeiuqoL7mfnJQmXQa4GJn9tUCiP7+i6Y9o= -github.com/ridv/thrift v0.13.1/go.mod h1:yTMRF94RCZjO1fY1xt69yncvMbQCPdRL8BhbwIrjPx8= -github.com/ridv/thrift v0.13.2 h1:Q3Smr8poXd7VkWZPHvdJZzlQCJO+b5W37ECfoUL4qHc= -github.com/ridv/thrift v0.13.2/go.mod h1:yTMRF94RCZjO1fY1xt69yncvMbQCPdRL8BhbwIrjPx8= -github.com/samuel/go-zookeeper v0.0.0-20171117190445-471cd4e61d7a h1:EYL2xz/Zdo0hyqdZMXR4lmT2O11jDLTPCEqIe/FR6W4= -github.com/samuel/go-zookeeper v0.0.0-20171117190445-471cd4e61d7a/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= -github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.2.0 h1:LThGCOvhuJic9Gyd1VBCkhyUXmO8vKaBFvBsJ2k03rg= -github.com/stretchr/testify v1.2.0/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= -github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= -github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/helpers.go b/helpers.go deleted file mode 100644 index 56821c1..0000000 --- a/helpers.go +++ /dev/null @@ -1,21 +0,0 @@ -package realis - -import ( - "context" - - "github.com/paypal/gorealis/gen-go/apache/aurora" -) - -func (r *realisClient) jobExists(key aurora.JobKey) (bool, error) { - resp, err := r.client.GetConfigSummary(context.TODO(), &key) - if err != nil { - return false, err - } - - return resp == nil || - resp.GetResult_() == nil || - resp.GetResult_().GetConfigSummaryResult_() == nil || - resp.GetResult_().GetConfigSummaryResult_().GetSummary() == nil || - resp.GetResponseCode() != aurora.ResponseCode_OK, - nil -} diff --git a/job.go b/job.go index 8470546..1d85f33 100644 --- a/job.go +++ b/job.go @@ -62,7 +62,6 @@ type Job interface { PartitionPolicy(policy *aurora.PartitionPolicy) Job Tier(tier string) Job SlaPolicy(policy *aurora.SlaPolicy) Job - Priority(priority int32) Job } type resourceType int @@ -384,8 +383,3 @@ func (j *AuroraJob) SlaPolicy(policy *aurora.SlaPolicy) Job { return j } - -func (j *AuroraJob) Priority(priority int32) Job { - j.jobConfig.TaskConfig.Priority = priority - return j -} diff --git a/monitors.go b/monitors.go index edbe4e4..d3de726 100644 --- a/monitors.go +++ b/monitors.go @@ -117,7 +117,7 @@ func (m *Monitor) JobUpdateQuery( } } -// AutoPausedUpdateMonitor is a special monitor for auto pause enabled batch updates. This monitor ensures that the update +// AutoPaused monitor is a special monitor for auto pause enabled batch updates. This monitor ensures that the update // being monitored is capable of auto pausing and has auto pausing enabled. After verifying this information, // the monitor watches for the job to enter the ROLL_FORWARD_PAUSED state and calculates the current batch // the update is in using information from the update configuration. @@ -168,8 +168,7 @@ func (m *Monitor) AutoPausedUpdateMonitor(key aurora.JobUpdateKey, interval, tim return -1, err } - if !(summary[0].State.Status == aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED || - summary[0].State.Status == aurora.JobUpdateStatus_ROLLED_FORWARD) { + if summary[0].State.Status != aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED { return -1, errors.Errorf("update is in a terminal state %v", summary[0].State.Status) } @@ -184,7 +183,7 @@ func (m *Monitor) AutoPausedUpdateMonitor(key aurora.JobUpdateKey, interval, tim return calculateCurrentBatch(int32(len(updatingInstances)), batchSizes), nil } -// Instances will monitor a Job until all instances enter one of the LIVE_STATES +// Monitor a Job until all instances enter one of the LIVE_STATES func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval, timeout int) (bool, error) { return m.ScheduleStatus(key, instances, LiveStates, interval, timeout) } diff --git a/realis.go b/realis.go index 0464908..eb298e1 100644 --- a/realis.go +++ b/realis.go @@ -35,7 +35,7 @@ import ( "github.com/paypal/gorealis/response" ) -const version = "1.24.1" +const version = "1.22.5" // Realis is an interface that defines the various APIs that may be used to communicate with // the Apache Aurora scheduler. @@ -65,6 +65,7 @@ type Realis interface { RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) + PauseJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error) ResumeJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error) PulseJobUpdate(key *aurora.JobUpdateKey) (*aurora.Response, error) @@ -129,15 +130,6 @@ var defaultBackoff = Backoff{ Jitter: 0.1, } -var defaultSlaPolicy = aurora.SlaPolicy{ - PercentageSlaPolicy: &aurora.PercentageSlaPolicy{ - Percentage: 66, - DurationSecs: 300, - }, -} - -const defaultSlaDrainTimeoutSecs = 900 - // ClientOption is an alias for a function that modifies the realis config object type ClientOption func(*config) @@ -394,7 +386,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { return nil, errors.New("incomplete Options -- url, cluster.json, or Zookeeper address required") } - config.logger.Println("Address obtained: ", url) + config.logger.Println("Addresss obtained: ", url) url, err = validateAuroraURL(url) if err != nil { return nil, errors.Wrap(err, "invalid Aurora url") @@ -436,8 +428,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory), logger: LevelLogger{logger: config.logger, debug: config.debug, trace: config.trace}, lock: &sync.Mutex{}, - transport: config.transport, - }, nil + transport: config.transport}, nil } // GetDefaultClusterFromZKUrl creates a cluster object from a Zoookeper url. This is deprecated in favor of using @@ -494,11 +485,11 @@ func defaultTTransport(url string, timeoutMs int, config *config) (thrift.TTrans }) if err != nil { - return nil, errors.Wrap(err, "error creating transport") + return nil, errors.Wrap(err, "Error creating transport") } if err := trans.Open(); err != nil { - return nil, errors.Wrapf(err, "error opening connection to %s", url) + return nil, errors.Wrapf(err, "Error opening connection to %s", url) } return trans, nil @@ -541,17 +532,16 @@ func (r *realisClient) ReestablishConn() error { return nil } -// Close releases resources associated with the realis client. +// Releases resources associated with the realis client. func (r *realisClient) Close() { r.lock.Lock() defer r.lock.Unlock() - // The return value of Close here is ignored on purpose because there's nothing that can be done if it fails. - _ = r.transport.Close() + r.transport.Close() } -// GetInstanceIds uses a predefined set of states to retrieve a set of active jobs in the Aurora Scheduler. +// Uses predefined set of states to retrieve a set of active jobs in Apache Aurora. func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states []aurora.ScheduleStatus) ([]int32, error) { taskQ := &aurora.TaskQuery{ JobKeys: []*aurora.JobKey{{Environment: key.Environment, Role: key.Role, Name: key.Name}}, @@ -564,9 +554,7 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states []aurora.Schedu false, func() (*aurora.Response, error) { return r.client.GetTasksWithoutConfigs(context.TODO(), taskQ) - }, - nil, - ) + }) // If we encountered an error we couldn't recover from by retrying, return an error to the user if retryErr != nil { @@ -591,16 +579,10 @@ func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue false, func() (*aurora.Response, error) { return r.readonlyClient.GetJobUpdateSummaries(context.TODO(), jobUpdateQuery) - }, - nil, - ) + }) if retryErr != nil { - return resp, errors.Wrap(retryErr, "error getting job update summaries from Aurora Scheduler") - } - - if resp.GetResult_() == nil || resp.GetResult_().GetGetJobUpdateSummariesResult_() == nil { - return nil, errors.New("unexpected response from scheduler") + return nil, errors.Wrap(retryErr, "error getting job update summaries from Aurora Scheduler") } return resp, nil @@ -614,9 +596,7 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe false, func() (*aurora.Response, error) { return r.readonlyClient.GetJobs(context.TODO(), role) - }, - nil, - ) + }) if retryErr != nil { return nil, result, errors.Wrap(retryErr, "error getting Jobs from Aurora Scheduler") @@ -629,7 +609,7 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe return resp, result, nil } -// KillInstances kills specific instances of a job. +// Kill specific instances of a job. func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) { r.logger.debugPrintf("KillTasks Thrift Payload: %+v %v\n", key, instances) @@ -637,9 +617,7 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a false, func() (*aurora.Response, error) { return r.client.KillTasks(context.TODO(), key, instances, "") - }, - nil, - ) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler") @@ -651,7 +629,7 @@ func (r *realisClient) RealisConfig() *config { return r.config } -// KillJob kills all instances of a job. +// Sends a kill message to the scheduler for all active tasks under a job. func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { r.logger.debugPrintf("KillTasks Thrift Payload: %+v\n", key) @@ -661,9 +639,7 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { func() (*aurora.Response, error) { // Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards return r.client.KillTasks(context.TODO(), key, nil, "") - }, - nil, - ) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler") @@ -671,7 +647,7 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { return resp, nil } -// CreateJob sends a create job message to the scheduler with a specific job configuration. +// Sends a create job message to the scheduler with a specific job configuration. // Although this API is able to create service jobs, it is better to use CreateService instead // as that API uses the update thrift call which has a few extra features available. // Use this API to create ad-hoc jobs. @@ -679,36 +655,19 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { r.logger.debugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig()) - // Response is checked by the thrift retry code resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { return r.client.CreateJob(context.TODO(), auroraJob.JobConfig()) - }, - // On a client timeout, attempt to verify that payload made to the Scheduler by - // trying to get the config summary for the job key - func() (*aurora.Response, bool) { - exists, err := r.jobExists(*auroraJob.JobKey()) - if err != nil { - r.logger.Print("verification failed ", err) - } - - if exists { - return &aurora.Response{ResponseCode: aurora.ResponseCode_OK}, true - } - - return nil, false - }, - ) + }) if retryErr != nil { return resp, errors.Wrap(retryErr, "error sending Create command to Aurora Scheduler") } - return resp, nil } -// CreateService uses the scheduler's updating mechanism to create a job. +// This API uses an update thrift call to create the services giving a few more robust features. func (r *realisClient) CreateService( auroraJob Job, settings *aurora.JobUpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error) { @@ -719,12 +678,17 @@ func (r *realisClient) CreateService( resp, err := r.StartJobUpdate(update, "") if err != nil { if IsTimeout(err) { - return nil, nil, err + return resp, nil, err } + return resp, nil, errors.Wrap(err, "unable to create service") } - return resp, resp.GetResult_().StartJobUpdateResult_, nil + if resp.GetResult_() != nil { + return resp, resp.GetResult_().GetStartJobUpdateResult_(), nil + } + + return resp, nil, errors.New("results object is nil") } func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) { @@ -734,9 +698,7 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) false, func() (*aurora.Response, error) { return r.client.ScheduleCronJob(context.TODO(), auroraJob.JobConfig()) - }, - nil, - ) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending Cron Job Schedule message to Aurora Scheduler") @@ -752,9 +714,7 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, false, func() (*aurora.Response, error) { return r.client.DescheduleCronJob(context.TODO(), key) - }, - nil, - ) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending Cron Job De-schedule message to Aurora Scheduler") @@ -772,9 +732,7 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error false, func() (*aurora.Response, error) { return r.client.StartCronJob(context.TODO(), key) - }, - nil, - ) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending Start Cron Job message to Aurora Scheduler") @@ -783,7 +741,7 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error } -// RestartInstances restarts the specified instances of a Job. +// Restarts specific instances specified func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) { r.logger.debugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instances) @@ -791,9 +749,7 @@ func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) false, func() (*aurora.Response, error) { return r.client.RestartShards(context.TODO(), key, instances) - }, - nil, - ) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler") @@ -801,12 +757,12 @@ func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) return resp, nil } -// RestartJob restarts all active instances of a Job. +// Restarts all active tasks under a job configuration. func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) { instanceIds, err1 := r.GetInstanceIds(key, aurora.ACTIVE_STATES) if err1 != nil { - return nil, errors.Wrap(err1, "could not retrieve relevant task instance IDs") + return nil, errors.Wrap(err1, "Could not retrieve relevant task instance IDs") } r.logger.debugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instanceIds) @@ -816,9 +772,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) false, func() (*aurora.Response, error) { return r.client.RestartShards(context.TODO(), key, instanceIds) - }, - nil, - ) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler") @@ -830,7 +784,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) return nil, errors.New("No tasks in the Active state") } -// StartJobUpdate updates all instances under a job configuration. +// Update all tasks under a job configuration. Currently gorealis doesn't support for canary deployments. func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) { r.logger.debugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message) @@ -839,56 +793,20 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au true, func() (*aurora.Response, error) { return r.client.StartJobUpdate(context.TODO(), updateJob.req, message) - }, - func() (*aurora.Response, bool) { - summariesResp, err := r.readonlyClient.GetJobUpdateSummaries( - context.TODO(), - &aurora.JobUpdateQuery{ - JobKey: updateJob.JobKey(), - UpdateStatuses: aurora.ACTIVE_JOB_UPDATE_STATES, - Limit: 1, - }) - - if err != nil { - r.logger.Print("verification failed ", err) - return nil, false - } - - summaries := response.JobUpdateSummaries(summariesResp) - if len(summaries) == 0 { - return nil, false - } - - return &aurora.Response{ - ResponseCode: aurora.ResponseCode_OK, - Result_: &aurora.Result_{ - StartJobUpdateResult_: &aurora.StartJobUpdateResult_{ - UpdateSummary: summaries[0], - Key: summaries[0].Key, - }, - }, - }, true - }, - ) + }) if retryErr != nil { // A timeout took place when attempting this call, attempt to recover if IsTimeout(retryErr) { - return nil, retryErr + return resp, retryErr } return resp, errors.Wrap(retryErr, "error sending StartJobUpdate command to Aurora Scheduler") } - - if resp.GetResult_() == nil { - return resp, errors.New("no result in response") - } - return resp, nil } -// AbortJobUpdate terminates a job update in the scheduler. -// It requires the updateId which can be obtained on the Aurora web UI. +// Abort Job Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI. // This API is meant to be synchronous. It will attempt to wait until the update transitions to the aborted state. // However, if the job update does not transition to the ABORT state an error will be returned. func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) { @@ -899,9 +817,7 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str false, func() (*aurora.Response, error) { return r.client.AbortJobUpdate(context.TODO(), &updateKey, message) - }, - nil, - ) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending AbortJobUpdate command to Aurora Scheduler") @@ -918,8 +834,7 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str return resp, err } -// PauseJobUpdate pauses the progress of an ongoing update. -// The UpdateID value needed for this function is returned from StartJobUpdate or can be obtained from the Aurora web UI. +// Pause Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) { r.logger.debugPrintf("PauseJobUpdate Thrift Payload: %+v %v\n", updateKey, message) @@ -928,9 +843,7 @@ func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st false, func() (*aurora.Response, error) { return r.client.PauseJobUpdate(context.TODO(), updateKey, message) - }, - nil, - ) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending PauseJobUpdate command to Aurora Scheduler") @@ -939,7 +852,7 @@ func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st return resp, nil } -// ResumeJobUpdate resumes a previously Paused Job update. +// Resume Paused Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) { r.logger.debugPrintf("ResumeJobUpdate Thrift Payload: %+v %v\n", updateKey, message) @@ -948,9 +861,7 @@ func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s false, func() (*aurora.Response, error) { return r.client.ResumeJobUpdate(context.TODO(), updateKey, message) - }, - nil, - ) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending ResumeJobUpdate command to Aurora Scheduler") @@ -959,7 +870,7 @@ func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s return resp, nil } -// PulseJobUpdate sends a pulse to an ongoing Job update. +// Pulse Job Update on Aurora. UpdateID is returned from StartJobUpdate or the Aurora web UI. func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.Response, error) { r.logger.debugPrintf("PulseJobUpdate Thrift Payload: %+v\n", updateKey) @@ -968,9 +879,7 @@ func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R false, func() (*aurora.Response, error) { return r.client.PulseJobUpdate(context.TODO(), updateKey) - }, - nil, - ) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending PulseJobUpdate command to Aurora Scheduler") @@ -979,7 +888,8 @@ func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R return resp, nil } -// AddInstances scales up the number of instances for a Job. +// Scale up the number of instances under a job configuration using the configuration for specific +// instance to scale up. func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) { r.logger.debugPrintf("AddInstances Thrift Payload: %+v %v\n", instKey, count) @@ -988,9 +898,7 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a false, func() (*aurora.Response, error) { return r.client.AddInstances(context.TODO(), &instKey, count) - }, - nil, - ) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending AddInstances command to Aurora Scheduler") @@ -999,15 +907,15 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a } -// RemoveInstances scales down the number of instances for a Job. +// Scale down the number of instances under a job configuration using the configuration of a specific instance func (r *realisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora.Response, error) { instanceIds, err := r.GetInstanceIds(key, aurora.ACTIVE_STATES) if err != nil { - return nil, errors.Wrap(err, "could not retrieve relevant instance IDs") + return nil, errors.Wrap(err, "RemoveInstances: Could not retrieve relevant instance IDs") } if len(instanceIds) < int(count) { - return nil, errors.Errorf("insufficient active instances available for killing: "+ + return nil, errors.Errorf("Insufficient active instances available for killing: "+ " Instances to be killed %d Active instances %d", count, len(instanceIds)) } @@ -1020,7 +928,7 @@ func (r *realisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora return r.KillInstances(key, instanceIds[:count]...) } -// GetTaskStatus gets information about task including a fully hydrated task configuration object. +// Get information about task including a fully hydrated task configuration object func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) { r.logger.debugPrintf("GetTasksStatus Thrift Payload: %+v\n", query) @@ -1029,9 +937,7 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.Schedul false, func() (*aurora.Response, error) { return r.client.GetTasksStatus(context.TODO(), query) - }, - nil, - ) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status") @@ -1040,7 +946,7 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.Schedul return response.ScheduleStatusResult(resp).GetTasks(), nil } -// GetPendingReason returns the reason why the an instance of a Job has not been scheduled. +// Get pending reason func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.PendingReason, error) { r.logger.debugPrintf("GetPendingReason Thrift Payload: %+v\n", query) @@ -1049,9 +955,7 @@ func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.Pend false, func() (*aurora.Response, error) { return r.client.GetPendingReason(context.TODO(), query) - }, - nil, - ) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for pending Reasons") @@ -1066,8 +970,7 @@ func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.Pend return pendingReasons, nil } -// GetTasksWithoutConfigs gets information about task including without a task configuration object. -// This is a more lightweight version of GetTaskStatus but contains less information as a result. +// Get information about task including without a task configuration object func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) { r.logger.debugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query) @@ -1076,9 +979,7 @@ func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*auror false, func() (*aurora.Response, error) { return r.client.GetTasksWithoutConfigs(context.TODO(), query) - }, - nil, - ) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status without configs") @@ -1088,7 +989,7 @@ func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*auror } -// FetchTaskConfig gets the task configuration from the aurora scheduler for a job. +// Get the task configuration from the aurora scheduler for a job func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error) { taskQ := &aurora.TaskQuery{ Role: &instKey.JobKey.Role, @@ -1104,9 +1005,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task false, func() (*aurora.Response, error) { return r.client.GetTasksStatus(context.TODO(), taskQ) - }, - nil, - ) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task configuration") @@ -1115,7 +1014,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task tasks := response.ScheduleStatusResult(resp).GetTasks() if len(tasks) == 0 { - return nil, errors.Errorf("instance %d for jobkey %s/%s/%s doesn't exist", + return nil, errors.Errorf("Instance %d for jobkey %s/%s/%s doesn't exist", instKey.InstanceId, instKey.JobKey.Environment, instKey.JobKey.Role, @@ -1134,12 +1033,10 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur false, func() (*aurora.Response, error) { return r.client.GetJobUpdateDetails(context.TODO(), &updateQuery) - }, - nil, - ) + }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "unable to get job update details") + return nil, errors.Wrap(retryErr, "Unable to get job update details") } return resp, nil @@ -1153,9 +1050,7 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string false, func() (*aurora.Response, error) { return r.client.RollbackJobUpdate(context.TODO(), &key, message) - }, - nil, - ) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "unable to roll back job update") diff --git a/realis_admin.go b/realis_admin.go index 9c58081..184ae55 100644 --- a/realis_admin.go +++ b/realis_admin.go @@ -30,9 +30,7 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr false, func() (*aurora.Response, error) { return r.adminClient.DrainHosts(context.TODO(), drainList) - }, - nil, - ) + }) if retryErr != nil { return resp, result, errors.Wrap(retryErr, "Unable to recover connection") @@ -58,18 +56,6 @@ func (r *realisClient) SLADrainHosts( return nil, errors.New("no hosts provided to drain") } - if policy == nil || policy.CountSetFieldsSlaPolicy() == 0 { - policy = &defaultSlaPolicy - r.logger.Printf("Warning: start draining with default sla policy %v", policy) - } - - if timeout < 0 { - r.logger.Printf("Warning: timeout %d secs is invalid, draining with default timeout %d secs", - timeout, - defaultSlaDrainTimeoutSecs) - timeout = defaultSlaDrainTimeoutSecs - } - drainList := aurora.NewHosts() drainList.HostNames = hosts @@ -79,9 +65,7 @@ func (r *realisClient) SLADrainHosts( false, func() (*aurora.Response, error) { return r.adminClient.SlaDrainHosts(context.TODO(), drainList, policy, timeout) - }, - nil, - ) + }) if retryErr != nil { return result, errors.Wrap(retryErr, "Unable to recover connection") @@ -111,9 +95,7 @@ func (r *realisClient) StartMaintenance(hosts ...string) (*aurora.Response, *aur false, func() (*aurora.Response, error) { return r.adminClient.StartMaintenance(context.TODO(), hostList) - }, - nil, - ) + }) if retryErr != nil { return resp, result, errors.Wrap(retryErr, "Unable to recover connection") @@ -143,9 +125,7 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror false, func() (*aurora.Response, error) { return r.adminClient.EndMaintenance(context.TODO(), hostList) - }, - nil, - ) + }) if retryErr != nil { return resp, result, errors.Wrap(retryErr, "Unable to recover connection") @@ -177,9 +157,7 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au false, func() (*aurora.Response, error) { return r.adminClient.MaintenanceStatus(context.TODO(), hostList) - }, - nil, - ) + }) if retryErr != nil { return resp, result, errors.Wrap(retryErr, "Unable to recover connection") @@ -204,9 +182,7 @@ func (r *realisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb false, func() (*aurora.Response, error) { return r.adminClient.SetQuota(context.TODO(), role, quota) - }, - nil, - ) + }) if retryErr != nil { return resp, errors.Wrap(retryErr, "Unable to set role quota") @@ -222,9 +198,7 @@ func (r *realisClient) GetQuota(role string) (*aurora.Response, error) { false, func() (*aurora.Response, error) { return r.adminClient.GetQuota(context.TODO(), role) - }, - nil, - ) + }) if retryErr != nil { return resp, errors.Wrap(retryErr, "Unable to get role quota") @@ -239,9 +213,7 @@ func (r *realisClient) Snapshot() error { false, func() (*aurora.Response, error) { return r.adminClient.Snapshot(context.TODO()) - }, - nil, - ) + }) if retryErr != nil { return errors.Wrap(retryErr, "Unable to recover connection") @@ -257,9 +229,7 @@ func (r *realisClient) PerformBackup() error { false, func() (*aurora.Response, error) { return r.adminClient.PerformBackup(context.TODO()) - }, - nil, - ) + }) if retryErr != nil { return errors.Wrap(retryErr, "Unable to recover connection") @@ -274,9 +244,7 @@ func (r *realisClient) ForceImplicitTaskReconciliation() error { false, func() (*aurora.Response, error) { return r.adminClient.TriggerImplicitTaskReconciliation(context.TODO()) - }, - nil, - ) + }) if retryErr != nil { return errors.Wrap(retryErr, "Unable to recover connection") @@ -297,9 +265,7 @@ func (r *realisClient) ForceExplicitTaskReconciliation(batchSize *int32) error { _, retryErr := r.thriftCallWithRetries(false, func() (*aurora.Response, error) { return r.adminClient.TriggerExplicitTaskReconciliation(context.TODO(), settings) - }, - nil, - ) + }) if retryErr != nil { return errors.Wrap(retryErr, "Unable to recover connection") diff --git a/realis_e2e_test.go b/realis_e2e_test.go index 78d8f0e..27e8b03 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -750,53 +750,6 @@ func TestRealisClient_SLADrainHosts(t *testing.T) { 5, 10) assert.NoError(t, err) - - // slaDrainHosts goes with default policy if no policy is specified - _, err = r.SLADrainHosts(nil, 30, hosts...) - require.NoError(t, err, "unable to drain host with SLA policy") - - // Monitor change to DRAINING and DRAINED mode - hostResults, err = monitor.HostMaintenance( - hosts, - []aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING}, - 1, - 50) - assert.NoError(t, err) - assert.Equal(t, map[string]bool{"localhost": true}, hostResults) - - _, _, err = r.EndMaintenance(hosts...) - require.NoError(t, err) - - // Monitor change to DRAINING and DRAINED mode - _, err = monitor.HostMaintenance( - hosts, - []aurora.MaintenanceMode{aurora.MaintenanceMode_NONE}, - 5, - 10) - assert.NoError(t, err) - - _, err = r.SLADrainHosts(&aurora.SlaPolicy{}, 30, hosts...) - require.NoError(t, err, "unable to drain host with SLA policy") - - // Monitor change to DRAINING and DRAINED mode - hostResults, err = monitor.HostMaintenance( - hosts, - []aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING}, - 1, - 50) - assert.NoError(t, err) - assert.Equal(t, map[string]bool{"localhost": true}, hostResults) - - _, _, err = r.EndMaintenance(hosts...) - require.NoError(t, err) - - // Monitor change to DRAINING and DRAINED mode - _, err = monitor.HostMaintenance( - hosts, - []aurora.MaintenanceMode{aurora.MaintenanceMode_NONE}, - 5, - 10) - assert.NoError(t, err) } // Test multiple go routines using a single connection @@ -1106,10 +1059,8 @@ func TestRealisClient_BatchAwareAutoPause(t *testing.T) { assert.Equal(t, i, curStep) - if i != len(updateGroups)-1 { - _, err = r.ResumeJobUpdate(&key, "auto resuming test") - require.NoError(t, err) - } + _, err = r.ResumeJobUpdate(&key, "auto resuming test") + require.NoError(t, err) } _, err = r.KillJob(job.JobKey()) diff --git a/response/response.go b/response/response.go index 4a67ca0..b77348d 100644 --- a/response/response.go +++ b/response/response.go @@ -36,10 +36,6 @@ func ScheduleStatusResult(resp *aurora.Response) *aurora.ScheduleStatusResult_ { } func JobUpdateSummaries(resp *aurora.Response) []*aurora.JobUpdateSummary { - if resp.GetResult_() == nil || resp.GetResult_().GetGetJobUpdateSummariesResult_() == nil { - return nil - } - return resp.GetResult_().GetGetJobUpdateSummariesResult_().GetUpdateSummaries() } diff --git a/retry.go b/retry.go index eefcf2a..dff5658 100644 --- a/retry.go +++ b/retry.go @@ -114,19 +114,10 @@ func ExponentialBackoff(backoff Backoff, logger logger, condition ConditionFunc) type auroraThriftCall func() (resp *aurora.Response, err error) -// verifyOntimeout defines the type of function that will be used to verify whether a Thirft call to the Scheduler -// made it to the scheduler or not. In general, these types of functions will have to interact with the scheduler -// through the very same Thrift API which previously encountered a time out from the client. -// This means that the functions themselves should be kept to a minimum number of Thrift calls. -// It should also be noted that this is a best effort mechanism and -// is likely to fail for the same reasons that the original call failed. -type verifyOnTimeout func() (*aurora.Response, bool) - // Duplicates the functionality of ExponentialBackoff but is specifically targeted towards ThriftCalls. func (r *realisClient) thriftCallWithRetries( returnOnTimeout bool, - thriftCall auroraThriftCall, - verifyOnTimeout verifyOnTimeout) (*aurora.Response, error) { + thriftCall auroraThriftCall) (*aurora.Response, error) { var resp *aurora.Response var clientErr error @@ -166,22 +157,42 @@ func (r *realisClient) thriftCallWithRetries( r.logger.tracePrintf("Aurora Thrift Call ended resp: %v clientErr: %v", resp, clientErr) }() - // Check if our thrift call is returning an error. + // Check if our thrift call is returning an error. This is a retryable event as we don't know + // if it was caused by network issues. if clientErr != nil { + // Print out the error to the user r.logger.Printf("Client Error: %v", clientErr) - temporary, timedout := isConnectionError(clientErr) - if !temporary && r.RealisConfig().failOnPermanentErrors { - return nil, errors.Wrap(clientErr, "permanent connection error") - } + // Determine if error is a temporary URL error by going up the stack + e, ok := clientErr.(thrift.TTransportException) + if ok { + r.logger.debugPrint("Encountered a transport exception") - // There exists a corner case where thrift payload was received by Aurora but - // connection timed out before Aurora was able to reply. - // Users can take special action on a timeout by using IsTimedout and reacting accordingly - // if they have configured the client to return on a timeout. - if timedout && returnOnTimeout { - return resp, newTimedoutError(errors.New("client connection closed before server answer")) + e, ok := e.Err().(*url.Error) + if ok { + + // EOF error occurs when the server closes the read buffer of the client. This is common + // when the server is overloaded and should be retried. All other errors that are permanent + // will not be retried. + if e.Err != io.EOF && !e.Temporary() && r.RealisConfig().failOnPermanentErrors { + return nil, errors.Wrap(clientErr, "permanent connection error") + } + + // Corner case where thrift payload was received by Aurora but connection timedout before Aurora was + // able to reply. In this case we will return whatever response was received and a TimedOut behaving + // error. Users can take special action on a timeout by using IsTimedout and reacting accordingly. + if e.Timeout() { + timeouts++ + r.logger.debugPrintf( + "Client closed connection (timedout) %d times before server responded, "+ + "consider increasing connection timeout", + timeouts) + if returnOnTimeout { + return resp, newTimedoutError(errors.New("client connection closed before server answer")) + } + } + } } // In the future, reestablish connection should be able to check if it is actually possible @@ -191,71 +202,48 @@ func (r *realisClient) thriftCallWithRetries( if reestablishErr != nil { r.logger.debugPrintf("error re-establishing connection ", reestablishErr) } + } else { - // If users did not opt for a return on timeout in order to react to a timedout error, - // attempt to verify that the call made it to the scheduler after the connection was re-established. - if timedout { - timeouts++ - r.logger.debugPrintf( - "Client closed connection %d times before server responded, "+ - "consider increasing connection timeout", - timeouts) - - // Allow caller to provide a function which checks if the original call was successful before - // it timed out. - if verifyOnTimeout != nil { - if verifyResp, ok := verifyOnTimeout(); ok { - r.logger.Print("verified that the call went through successfully after a client timeout") - // Response here might be different than the original as it is no longer constructed - // by the scheduler but mimicked. - // This is OK since the scheduler is very unlikely to change responses at this point in its - // development cycle but we must be careful to not return an incorrectly constructed response. - return verifyResp, nil - } - } + // If there was no client error, but the response is nil, something went wrong. + // Ideally, we'll never encounter this but we're placing a safeguard here. + if resp == nil { + return nil, errors.New("response from aurora is nil") } - // Retry the thrift payload - continue - } + // Check Response Code from thrift and make a decision to continue retrying or not. + switch responseCode := resp.GetResponseCode(); responseCode { - // If there was no client error, but the response is nil, something went wrong. - // Ideally, we'll never encounter this but we're placing a safeguard here. - if resp == nil { - return nil, errors.New("response from aurora is nil") - } + // If the thrift call succeeded, stop retrying + case aurora.ResponseCode_OK: + return resp, nil - // Check Response Code from thrift and make a decision to continue retrying or not. - switch responseCode := resp.GetResponseCode(); responseCode { + // If the response code is transient, continue retrying + case aurora.ResponseCode_ERROR_TRANSIENT: + r.logger.Println("Aurora replied with Transient error code, retrying") + continue - // If the thrift call succeeded, stop retrying - case aurora.ResponseCode_OK: - return resp, nil + // Failure scenarios, these indicate a bad payload or a bad config. Stop retrying. + case aurora.ResponseCode_INVALID_REQUEST, + aurora.ResponseCode_ERROR, + aurora.ResponseCode_AUTH_FAILED, + aurora.ResponseCode_JOB_UPDATING_ERROR: + r.logger.Printf("Terminal Response Code %v from Aurora, won't retry\n", resp.GetResponseCode().String()) + return resp, errors.New(response.CombineMessage(resp)) - // If the response code is transient, continue retrying - case aurora.ResponseCode_ERROR_TRANSIENT: - r.logger.Println("Aurora replied with Transient error code, retrying") - continue - - // Failure scenarios, these indicate a bad payload or a bad config. Stop retrying. - case aurora.ResponseCode_INVALID_REQUEST, - aurora.ResponseCode_ERROR, - aurora.ResponseCode_AUTH_FAILED, - aurora.ResponseCode_JOB_UPDATING_ERROR: - r.logger.Printf("Terminal Response Code %v from Aurora, won't retry\n", resp.GetResponseCode().String()) - return resp, errors.New(response.CombineMessage(resp)) - - // The only case that should fall down to here is a WARNING response code. - // It is currently not used as a response in the scheduler so it is unknown how to handle it. - default: - r.logger.debugPrintf("unhandled response code %v received from Aurora\n", responseCode) - return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String()) + // The only case that should fall down to here is a WARNING response code. + // It is currently not used as a response in the scheduler so it is unknown how to handle it. + default: + r.logger.debugPrintf("unhandled response code %v received from Aurora\n", responseCode) + return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String()) + } } } + r.logger.debugPrintf("it took %v retries to complete this operation\n", curStep) + if curStep > 1 { - r.config.logger.Printf("this thrift call was retried %d time(s)", curStep) + r.config.logger.Printf("retried this thrift call %d time(s)", curStep) } // Provide more information to the user wherever possible. @@ -265,30 +253,3 @@ func (r *realisClient) thriftCallWithRetries( return nil, newRetryError(errors.New("ran out of retries"), curStep) } - -// isConnectionError processes the error received by the client. -// The return values indicate weather this was determined to be a temporary error -// and weather it was determined to be a timeout error -func isConnectionError(err error) (bool, bool) { - - // Determine if error is a temporary URL error by going up the stack - transportException, ok := err.(thrift.TTransportException) - if !ok { - return false, false - } - - urlError, ok := transportException.Err().(*url.Error) - if !ok { - return false, false - } - - // EOF error occurs when the server closes the read buffer of the client. This is common - // when the server is overloaded and we consider it temporary. - // All other which are not temporary as per the member function Temporary(), - // are considered not temporary (permanent). - if urlError.Err != io.EOF && !urlError.Temporary() { - return false, false - } - - return true, urlError.Timeout() -} diff --git a/util.go b/util.go index 4307d1c..19930e2 100644 --- a/util.go +++ b/util.go @@ -29,7 +29,7 @@ var TerminalStates = make(map[aurora.ScheduleStatus]bool) // ActiveJobUpdateStates - States a Job Update may be in where it is considered active. var ActiveJobUpdateStates = make(map[aurora.JobUpdateStatus]bool) -// TerminalUpdateStates returns a slice containing all the terminal states an update may be in. +// TerminalJobUpdateStates returns a slice containing all the terminal states an update may end up in. // This is a function in order to avoid having a slice that can be accidentally mutated. func TerminalUpdateStates() []aurora.JobUpdateStatus { return []aurora.JobUpdateStatus{