Compare commits

...

34 commits

Author SHA1 Message Date
Tan N. Le
db10285368
Update CHANGELOG.md 2022-10-12 21:47:07 -07:00
shivrsrivastava
8454a6ebf3
Adding priority to the task (#140) 2022-10-12 21:46:07 -07:00
Tan N. Le
c318042e96
release 1.24.0 (#139) 2021-11-09 09:00:35 -08:00
Tan N. Le
db9bebb802
enable default sla for slaDrain (#138) 2021-11-01 18:17:49 -07:00
Renán I. Del Valle
fff2c16751
Enabling code analysis 2021-09-01 10:09:11 -07:00
Renán I. Del Valle
c59d01ab51
Changes Travis CI badge to Github Actions badge (#137) 2021-08-06 18:16:06 -07:00
Renán I. Del Valle
62df98a3c8
Bug fix for auto paused update monitor (#136)
Returns success if the update has finished updating successfully.
2021-08-06 16:02:52 -07:00
Renán I. Del Valle
5c39a23eb2
Enable Github Actions for PRs
Run CI on pull requests and when the branch is pushed.
2021-08-06 15:52:46 -07:00
Renán I. Del Valle
dbc396b0db
Disables Travis CI (#135)
Travis CI is no longer needed as we have migrated to Github Actions
2021-08-06 10:58:34 -07:00
Renán I. Del Valle
86eb045808
Adds go.sum and removes dep files (#134) 2021-08-06 10:57:45 -07:00
Renán I. Del Valle
c7e309f421
Actions fix (#133)
* Moving main.yml to the right place.
2021-08-06 10:06:12 -07:00
Renán I. Del Valle
49877b7d41
Adds support for running CI on github actions. (#132) 2021-08-06 10:00:57 -07:00
Renán I. Del Valle
82b40a53f0
Add verification to retry mechanism (#131)
CreateJob, CreateService, and StartJobUpdate now include a rudimentary verification function to check if the call made it to the Aurora Scheduler when the client experiences a timeout.
2021-05-11 13:37:23 -07:00
Renán I. Del Valle
a9d99067ee
Documentation fix (#130)
Fixes documentation so that it is more compliant with godoc format.
2021-04-29 10:48:43 -07:00
Renán Del Valle
e7f9c0cba9 Bumping up version to 1.23.1 2021-02-25 17:59:02 -08:00
Renán Del Valle
fbaf218dfb Preparing release notes for 1.23.0 2021-02-25 17:59:02 -08:00
Renán I. Del Valle
6a1cf25c87
Upgrading Mesos to 1.7.2 and Aurora Scheduler to 0.23.0 (#128) 2021-02-25 17:55:42 -08:00
Renán Del Valle
4aaec87d32 Bumping up version to 1.23.0 2021-02-25 17:34:48 -08:00
Renán Del Valle
895d810b6c Releasing Version 1.22.5 2021-02-25 17:34:48 -08:00
Renán I. Del Valle
511e76a0ac
Upgrading to Thrift 0.14.0 (#126)
Upgrading thrift to 0.14.0 in order to pick up bug fixes, including the fix for trying to write to closed connections.
2021-02-25 16:37:46 -08:00
Renan DelValle
8be66da88a
Bumping up go version to 1.15 and removing v2 tests from Travis CI config file. 2020-09-28 11:13:29 -07:00
Renan DelValle
6d20f347f7
Update latest version gorealis has been tested against. 2020-09-28 10:32:36 -07:00
Renan DelValle
74b12c36b1
Adding version to README and changing badge to point to the right place. 2020-09-28 10:32:13 -07:00
Renan DelValle
269e0208c1
Change travis CI configuration to use main branch. 2020-09-28 10:30:48 -07:00
Suchith Arodi
4acb0d54a9
return response object in case of noop update (#125)
Return response object such that end user can look into what went wrong when the response is nil.

Cases like this occur when there is a no-op update.
2020-07-29 13:30:56 -07:00
Renán I. Del Valle
5f667555dc
Bumping up CI build to go 1.14 (#124)
Bumping up Travis CI build to go 1.14 as well as increasing the timeout for go tests.
2020-05-27 16:04:07 -07:00
Renán I. Del Valle
2b6025e67d
Checking previously ignored error which caused issues. (#123)
Error in monitor was going unchecked which caused some issues when the monitor timed out.
2020-05-27 12:45:53 -07:00
Renán I. Del Valle
5ec22fab98
Restoring location of r.Close() in retry mechanism since the move created a deadlock. (#122)
Moving the r.Close() call in the retry mechanism created a deadlock since r.Close() also uses the client lock to avoid multiple routines closing at the same time.

This commit reverts that change.
2020-05-27 12:36:52 -07:00
Renan DelValle
f196aa9ed7 Fixing some cosmetic issues and a potential race condition. 2020-05-26 20:40:09 -07:00
Renan DelValle
bb5408f5e2 Bumping up Thrift Version to v0.13.2 forked as v0.13.1 contains a bug. 2020-05-26 20:40:09 -07:00
Renán I. Del Valle
ea8e48f3b8
Allow users to define what extensions CA certs will have (#120)
* Allow users to define what extensions CA certs will have. Skip any files that don't have the right extension.
2020-02-26 08:24:41 -08:00
Renán I. Del Valle
3dc3b09a8e
Point to temporary Thrift fork while we wait for 0.14.0 to be released (#118)
* Updating readme to reflect changes made to the Aurora Scheduler project.

* Changing dependency of mod to point to forked version of the Thrift library while 0.14.0 is released.
2020-02-18 14:18:13 -08:00
Renan I. Del Valle
3fa2a20fe4
Thrift Upgrade to v0.13.0 (#117)
* Removing go.sum file as it's no longer required as of go1.13.

* Removing uncessary client command.

* Bumping up thrift version to v0.13.0
2020-02-12 12:31:56 -08:00
Renan I. Del Valle
c6a2a23ddb
Changing how constraints are handled internally (#115)
* Updating Changelog to reflect what's changing in 1.22.1

* Bug fix: Setting the same constraint multiple times is no longer allowed.

* Constraints map has been added to handle constraints being added to Aurora Jobs.

* Lowering timeout to avoid flaky test for bad payload timeout.

* Adding attributes to Mesos agents in order to test limits by constraint.



* Make two instances schedulable per zone in order to experience flaky behavior.
2020-01-15 08:21:12 -08:00
31 changed files with 11184 additions and 7704 deletions

View file

@ -1 +1 @@
0.22.0 0.23.0

25
.github/main.yml vendored Normal file
View file

@ -0,0 +1,25 @@
name: CI
on: [push]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Setup Go for use with actions
uses: actions/setup-go@v2
with:
go-version: 1.16
- name: Install goimports
run: go get golang.org/x/tools/cmd/goimports
- name: Set env with list of directories in repo containin go code
run: echo GO_USR_DIRS=$(go list -f {{.Dir}} ./... | grep -E -v "/gen-go/|/vendor/") >> $GITHUB_ENV
- name: Run goimports check
run: test -z "`for d in $GO_USR_DIRS; do goimports -d $d/*.go | tee /dev/stderr; done`"
- name: Create aurora/mesos docker cluster
run: docker-compose up -d
- name: Run tests
run: go test -timeout 35m -race -coverprofile=coverage.txt -covermode=atomic -v github.com/paypal/gorealis

57
.github/workflows/codeql-analysis.yml vendored Normal file
View file

@ -0,0 +1,57 @@
# For most projects, this workflow file will not need changing; you simply need
# to commit it to your repository.
#
# You may wish to alter this file to override the set of languages analyzed,
# or to provide custom queries or build logic.
#
# ******** NOTE ********
# We have attempted to detect the languages in your repository. Please check
# the `language` matrix defined below to confirm you have the correct set of
# supported CodeQL languages.
#
name: "CodeQL"
on:
push:
branches: [ main ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ main ]
schedule:
- cron: '34 4 * * 3'
jobs:
analyze:
name: Analyze
runs-on: ubuntu-latest
permissions:
actions: read
contents: read
security-events: write
strategy:
fail-fast: false
matrix:
language: [ 'go' ]
# CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python' ]
# Learn more:
# https://docs.github.com/en/free-pro-team@latest/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#changing-the-languages-that-are-analyzed
steps:
- name: Checkout repository
uses: actions/checkout@v2
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v1
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
# queries: ./path/to/local/query, your-org/your-repo/queries@main
- run: go build examples/client.go
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v1

30
.github/workflows/main.yml vendored Normal file
View file

@ -0,0 +1,30 @@
name: CI
on:
push:
branches:
- main
pull_request:
branches:
- main
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Setup Go for use with actions
uses: actions/setup-go@v2
with:
go-version: 1.16
- name: Install goimports
run: go get golang.org/x/tools/cmd/goimports
- name: Set env with list of directories in repo containin go code
run: echo GO_USR_DIRS=$(go list -f {{.Dir}} ./... | grep -E -v "/gen-go/|/vendor/") >> $GITHUB_ENV
- name: Run goimports check
run: test -z "`for d in $GO_USR_DIRS; do goimports -d $d/*.go | tee /dev/stderr; done`"
- name: Create aurora/mesos docker cluster
run: docker-compose up -d
- name: Run tests
run: go test -timeout 35m -race -coverprofile=coverage.txt -covermode=atomic -v github.com/paypal/gorealis

View file

@ -1,33 +0,0 @@
sudo: required
dist: xenial
language: go
branches:
only:
- master
- master-v2.0
- future
go:
- "1.10.x"
env:
global:
- GO_USR_DIRS=$(go list -f {{.Dir}} ./... | grep -E -v "/gen-go/|/vendor/")
services:
- docker
before_install:
- go get golang.org/x/tools/cmd/goimports
- test -z "`for d in $GO_USR_DIRS; do goimports -d $d/*.go | tee /dev/stderr; done`"
install:
- docker-compose up -d
script:
- go test -race -coverprofile=coverage.txt -covermode=atomic -v github.com/paypal/gorealis
after_success:
- bash <(curl -s https://codecov.io/bash)

View file

@ -1,4 +1,41 @@
1.22.0 (unreleased) 1.25.1 (unreleased)
1.25.0
* Add priority api
1.24.0
* enable default sla for slaDrain
* Changes Travis CI badge to Github Actions badge
* Bug fix for auto paused update monitor
* Adds support for running CI on github actions
1.23.0
* First release tested against Aurora Scheduler 0.23.0
1.22.5
* Upgrading to thrift 0.14.0
1.22.4
* Updates which result in a no-op now return a response value so that the caller may analyze it to determine what happened
1.22.3
* Contains a monitor timeout fix. Previously an error was being left unchecked which made a specific monitor timining out not be handled properly.
1.22.2
* Bug fix: Change in retry mechanism created a deadlock. This release reverts that particular change.
1.22.1
* Adding safeguards against setting multiple constraints with the same name for a single task.
1.22.0
* CreateService and StartJobUpdate do not continue retrying if a timeout has been encountered * CreateService and StartJobUpdate do not continue retrying if a timeout has been encountered
by the HTTP client. Instead they now return an error that conforms to the Timedout interface. by the HTTP client. Instead they now return an error that conforms to the Timedout interface.

64
Gopkg.lock generated
View file

@ -1,64 +0,0 @@
# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'.
[[projects]]
digest = "1:89696c38cec777120b8b1bb5e2d363d655cf2e1e7d8c851919aaa0fd576d9b86"
name = "github.com/apache/thrift"
packages = ["lib/go/thrift"]
pruneopts = ""
revision = "384647d290e2e4a55a14b1b7ef1b7e66293a2c33"
version = "v0.12.0"
[[projects]]
digest = "1:56c130d885a4aacae1dd9c7b71cfe39912c7ebc1ff7d2b46083c8812996dc43b"
name = "github.com/davecgh/go-spew"
packages = ["spew"]
pruneopts = ""
revision = "346938d642f2ec3594ed81d874461961cd0faa76"
version = "v1.1.0"
[[projects]]
digest = "1:df48fb76fb2a40edea0c9b3d960bc95e326660d82ff1114e1f88001f7a236b40"
name = "github.com/pkg/errors"
packages = ["."]
pruneopts = ""
revision = "e881fd58d78e04cf6d0de1217f8707c8cc2249bc"
[[projects]]
digest = "1:256484dbbcd271f9ecebc6795b2df8cad4c458dd0f5fd82a8c2fa0c29f233411"
name = "github.com/pmezard/go-difflib"
packages = ["difflib"]
pruneopts = ""
revision = "792786c7400a136282c1664665ae0a8db921c6c2"
version = "v1.0.0"
[[projects]]
digest = "1:78bea5e26e82826dacc5fd64a1013a6711b7075ec8072819b89e6ad76cb8196d"
name = "github.com/samuel/go-zookeeper"
packages = ["zk"]
pruneopts = ""
revision = "471cd4e61d7a78ece1791fa5faa0345dc8c7d5a5"
[[projects]]
digest = "1:381bcbeb112a51493d9d998bbba207a529c73dbb49b3fd789e48c63fac1f192c"
name = "github.com/stretchr/testify"
packages = [
"assert",
"require",
]
pruneopts = ""
revision = "ffdc059bfe9ce6a4e144ba849dbedead332c6053"
version = "v1.3.0"
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
input-imports = [
"github.com/apache/thrift/lib/go/thrift",
"github.com/pkg/errors",
"github.com/samuel/go-zookeeper/zk",
"github.com/stretchr/testify/assert",
"github.com/stretchr/testify/require",
]
solver-name = "gps-cdcl"
solver-version = 1

View file

@ -1,16 +0,0 @@
[[constraint]]
name = "github.com/apache/thrift"
version = "0.12.0"
[[constraint]]
name = "github.com/pkg/errors"
revision = "e881fd58d78e04cf6d0de1217f8707c8cc2249bc"
[[constraint]]
name = "github.com/samuel/go-zookeeper"
revision = "471cd4e61d7a78ece1791fa5faa0345dc8c7d5a5"
[[constraint]]
name = "github.com/stretchr/testify"
version = "1.3.0"

View file

@ -1,6 +1,8 @@
# gorealis [![GoDoc](https://godoc.org/github.com/paypal/gorealis?status.svg)](https://godoc.org/github.com/paypal/gorealis) [![Build Status](https://travis-ci.org/paypal/gorealis.svg?branch=master)](https://travis-ci.org/paypal/gorealis) [![codecov](https://codecov.io/gh/paypal/gorealis/branch/master/graph/badge.svg)](https://codecov.io/gh/paypal/gorealis) # gorealis [![GoDoc](https://godoc.org/github.com/paypal/gorealis?status.svg)](https://godoc.org/github.com/paypal/gorealis) ![CI Build Status](https://github.com/paypal/gorealis/actions/workflows/main.yml/badge.svg) [![codecov](https://codecov.io/gh/paypal/gorealis/branch/main/graph/badge.svg)](https://codecov.io/gh/paypal/gorealis)
Go library for interacting with [Apache Aurora](https://github.com/apache/aurora). Version 1 of Go library for interacting with [Aurora Scheduler](https://github.com/aurora-scheduler/aurora).
Version 2 of this library can be found [here](https://github.com/aurora-scheduler/gorealis).
### Aurora version compatibility ### Aurora version compatibility
Please see [.auroraversion](./.auroraversion) to see the latest Aurora version against which this Please see [.auroraversion](./.auroraversion) to see the latest Aurora version against which this
@ -14,7 +16,7 @@ library has been tested.
## Projects using gorealis ## Projects using gorealis
* [australis](https://github.com/rdelval/australis) * [australis](https://github.com/aurora-scheduler/australis)
## Contributions ## Contributions
Contributions are always welcome. Please raise an issue to discuss a contribution before it is made. Contributions are always welcome. Please raise an issue to discuss a contribution before it is made.

View file

@ -14,7 +14,7 @@ services:
ipv4_address: 192.168.33.2 ipv4_address: 192.168.33.2
master: master:
image: rdelvalle/mesos-master:1.6.2 image: aurorascheduler/mesos-master:1.7.2
restart: on-failure restart: on-failure
ports: ports:
- "5050:5050" - "5050:5050"
@ -32,12 +32,13 @@ services:
- zk - zk
agent-one: agent-one:
image: rdelvalle/mesos-agent:1.6.2 image: aurorascheduler/mesos-agent:1.7.2
pid: host pid: host
restart: on-failure restart: on-failure
ports: ports:
- "5051:5051" - "5051:5051"
environment: environment:
MESOS_ATTRIBUTES: 'zone:west'
MESOS_MASTER: zk://192.168.33.2:2181/mesos MESOS_MASTER: zk://192.168.33.2:2181/mesos
MESOS_CONTAINERIZERS: docker,mesos MESOS_CONTAINERIZERS: docker,mesos
MESOS_PORT: 5051 MESOS_PORT: 5051
@ -56,12 +57,13 @@ services:
- zk - zk
agent-two: agent-two:
image: rdelvalle/mesos-agent:1.6.2 image: aurorascheduler/mesos-agent:1.7.2
pid: host pid: host
restart: on-failure restart: on-failure
ports: ports:
- "5061:5061" - "5061:5061"
environment: environment:
MESOS_ATTRIBUTES: 'zone:east'
MESOS_MASTER: zk://192.168.33.2:2181/mesos MESOS_MASTER: zk://192.168.33.2:2181/mesos
MESOS_CONTAINERIZERS: docker,mesos MESOS_CONTAINERIZERS: docker,mesos
MESOS_HOSTNAME: localhost MESOS_HOSTNAME: localhost
@ -80,7 +82,7 @@ services:
- zk - zk
aurora-one: aurora-one:
image: rdelvalle/aurora:0.22.0 image: aurorascheduler/scheduler:0.23.0
pid: host pid: host
ports: ports:
- "8081:8081" - "8081:8081"

View file

@ -111,7 +111,7 @@ func main() {
if err != nil { if err != nil {
log.Fatalln(err) log.Fatalln(err)
} }
monitor = &realis.Monitor{r} monitor = &realis.Monitor{Client: r}
defer r.Close() defer r.Close()
switch executor { switch executor {
@ -451,13 +451,6 @@ func main() {
} }
fmt.Println(resp.String()) fmt.Println(resp.String())
case "variableBatchStep":
step, err := r.VariableBatchStep(aurora.JobUpdateKey{Job: job.JobKey(), ID: updateId})
if err != nil {
log.Fatal(err)
}
fmt.Println(step)
case "taskConfig": case "taskConfig":
fmt.Println("Getting job info") fmt.Println("Getting job info")
live, err := r.GetInstanceIds(job.JobKey(), aurora.ACTIVE_STATES) live, err := r.GetInstanceIds(job.JobKey(), aurora.ACTIVE_STATES)

View file

@ -1,5 +1,4 @@
// Autogenerated by Thrift Compiler (0.12.0) // Code generated by Thrift Compiler (0.14.0). DO NOT EDIT.
// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
package aurora package aurora

View file

@ -1,13 +1,12 @@
// Autogenerated by Thrift Compiler (0.12.0) // Code generated by Thrift Compiler (0.14.0). DO NOT EDIT.
// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
package aurora package aurora
import ( import(
"bytes" "bytes"
"context" "context"
"reflect"
"fmt" "fmt"
"time"
"github.com/apache/thrift/lib/go/thrift" "github.com/apache/thrift/lib/go/thrift"
) )
@ -15,7 +14,7 @@ import (
var _ = thrift.ZERO var _ = thrift.ZERO
var _ = fmt.Printf var _ = fmt.Printf
var _ = context.Background var _ = context.Background
var _ = reflect.DeepEqual var _ = time.Now
var _ = bytes.Equal var _ = bytes.Equal
const AURORA_EXECUTOR_NAME = "AuroraExecutor" const AURORA_EXECUTOR_NAME = "AuroraExecutor"

File diff suppressed because it is too large Load diff

View file

@ -1,22 +1,22 @@
// Autogenerated by Thrift Compiler (0.12.0) // Code generated by Thrift Compiler (0.14.0). DO NOT EDIT.
// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
package main package main
import ( import (
"context" "context"
"flag" "flag"
"fmt" "fmt"
"math" "math"
"net" "net"
"net/url" "net/url"
"os" "os"
"strconv" "strconv"
"strings" "strings"
"github.com/apache/thrift/lib/go/thrift" "github.com/apache/thrift/lib/go/thrift"
"apache/aurora" "apache/aurora"
) )
var _ = aurora.GoUnusedProtection__
func Usage() { func Usage() {
fmt.Fprintln(os.Stderr, "Usage of ", os.Args[0], " [-h host:port] [-u url] [-f[ramed]] function [arg1 [arg2...]]:") fmt.Fprintln(os.Stderr, "Usage of ", os.Args[0], " [-h host:port] [-u url] [-f[ramed]] function [arg1 [arg2...]]:")
@ -175,19 +175,19 @@ func main() {
fmt.Fprintln(os.Stderr, "CreateJob requires 1 args") fmt.Fprintln(os.Stderr, "CreateJob requires 1 args")
flag.Usage() flag.Usage()
} }
arg163 := flag.Arg(1) arg213 := flag.Arg(1)
mbTrans164 := thrift.NewTMemoryBufferLen(len(arg163)) mbTrans214 := thrift.NewTMemoryBufferLen(len(arg213))
defer mbTrans164.Close() defer mbTrans214.Close()
_, err165 := mbTrans164.WriteString(arg163) _, err215 := mbTrans214.WriteString(arg213)
if err165 != nil { if err215 != nil {
Usage() Usage()
return return
} }
factory166 := thrift.NewTJSONProtocolFactory() factory216 := thrift.NewTJSONProtocolFactory()
jsProt167 := factory166.GetProtocol(mbTrans164) jsProt217 := factory216.GetProtocol(mbTrans214)
argvalue0 := aurora.NewJobConfiguration() argvalue0 := aurora.NewJobConfiguration()
err168 := argvalue0.Read(jsProt167) err218 := argvalue0.Read(context.Background(), jsProt217)
if err168 != nil { if err218 != nil {
Usage() Usage()
return return
} }
@ -200,19 +200,19 @@ func main() {
fmt.Fprintln(os.Stderr, "ScheduleCronJob requires 1 args") fmt.Fprintln(os.Stderr, "ScheduleCronJob requires 1 args")
flag.Usage() flag.Usage()
} }
arg169 := flag.Arg(1) arg219 := flag.Arg(1)
mbTrans170 := thrift.NewTMemoryBufferLen(len(arg169)) mbTrans220 := thrift.NewTMemoryBufferLen(len(arg219))
defer mbTrans170.Close() defer mbTrans220.Close()
_, err171 := mbTrans170.WriteString(arg169) _, err221 := mbTrans220.WriteString(arg219)
if err171 != nil { if err221 != nil {
Usage() Usage()
return return
} }
factory172 := thrift.NewTJSONProtocolFactory() factory222 := thrift.NewTJSONProtocolFactory()
jsProt173 := factory172.GetProtocol(mbTrans170) jsProt223 := factory222.GetProtocol(mbTrans220)
argvalue0 := aurora.NewJobConfiguration() argvalue0 := aurora.NewJobConfiguration()
err174 := argvalue0.Read(jsProt173) err224 := argvalue0.Read(context.Background(), jsProt223)
if err174 != nil { if err224 != nil {
Usage() Usage()
return return
} }
@ -225,19 +225,19 @@ func main() {
fmt.Fprintln(os.Stderr, "DescheduleCronJob requires 1 args") fmt.Fprintln(os.Stderr, "DescheduleCronJob requires 1 args")
flag.Usage() flag.Usage()
} }
arg175 := flag.Arg(1) arg225 := flag.Arg(1)
mbTrans176 := thrift.NewTMemoryBufferLen(len(arg175)) mbTrans226 := thrift.NewTMemoryBufferLen(len(arg225))
defer mbTrans176.Close() defer mbTrans226.Close()
_, err177 := mbTrans176.WriteString(arg175) _, err227 := mbTrans226.WriteString(arg225)
if err177 != nil { if err227 != nil {
Usage() Usage()
return return
} }
factory178 := thrift.NewTJSONProtocolFactory() factory228 := thrift.NewTJSONProtocolFactory()
jsProt179 := factory178.GetProtocol(mbTrans176) jsProt229 := factory228.GetProtocol(mbTrans226)
argvalue0 := aurora.NewJobKey() argvalue0 := aurora.NewJobKey()
err180 := argvalue0.Read(jsProt179) err230 := argvalue0.Read(context.Background(), jsProt229)
if err180 != nil { if err230 != nil {
Usage() Usage()
return return
} }
@ -250,19 +250,19 @@ func main() {
fmt.Fprintln(os.Stderr, "StartCronJob requires 1 args") fmt.Fprintln(os.Stderr, "StartCronJob requires 1 args")
flag.Usage() flag.Usage()
} }
arg181 := flag.Arg(1) arg231 := flag.Arg(1)
mbTrans182 := thrift.NewTMemoryBufferLen(len(arg181)) mbTrans232 := thrift.NewTMemoryBufferLen(len(arg231))
defer mbTrans182.Close() defer mbTrans232.Close()
_, err183 := mbTrans182.WriteString(arg181) _, err233 := mbTrans232.WriteString(arg231)
if err183 != nil { if err233 != nil {
Usage() Usage()
return return
} }
factory184 := thrift.NewTJSONProtocolFactory() factory234 := thrift.NewTJSONProtocolFactory()
jsProt185 := factory184.GetProtocol(mbTrans182) jsProt235 := factory234.GetProtocol(mbTrans232)
argvalue0 := aurora.NewJobKey() argvalue0 := aurora.NewJobKey()
err186 := argvalue0.Read(jsProt185) err236 := argvalue0.Read(context.Background(), jsProt235)
if err186 != nil { if err236 != nil {
Usage() Usage()
return return
} }
@ -275,36 +275,36 @@ func main() {
fmt.Fprintln(os.Stderr, "RestartShards requires 2 args") fmt.Fprintln(os.Stderr, "RestartShards requires 2 args")
flag.Usage() flag.Usage()
} }
arg187 := flag.Arg(1) arg237 := flag.Arg(1)
mbTrans188 := thrift.NewTMemoryBufferLen(len(arg187)) mbTrans238 := thrift.NewTMemoryBufferLen(len(arg237))
defer mbTrans188.Close() defer mbTrans238.Close()
_, err189 := mbTrans188.WriteString(arg187) _, err239 := mbTrans238.WriteString(arg237)
if err189 != nil { if err239 != nil {
Usage() Usage()
return return
} }
factory190 := thrift.NewTJSONProtocolFactory() factory240 := thrift.NewTJSONProtocolFactory()
jsProt191 := factory190.GetProtocol(mbTrans188) jsProt241 := factory240.GetProtocol(mbTrans238)
argvalue0 := aurora.NewJobKey() argvalue0 := aurora.NewJobKey()
err192 := argvalue0.Read(jsProt191) err242 := argvalue0.Read(context.Background(), jsProt241)
if err192 != nil { if err242 != nil {
Usage() Usage()
return return
} }
value0 := argvalue0 value0 := argvalue0
arg193 := flag.Arg(2) arg243 := flag.Arg(2)
mbTrans194 := thrift.NewTMemoryBufferLen(len(arg193)) mbTrans244 := thrift.NewTMemoryBufferLen(len(arg243))
defer mbTrans194.Close() defer mbTrans244.Close()
_, err195 := mbTrans194.WriteString(arg193) _, err245 := mbTrans244.WriteString(arg243)
if err195 != nil { if err245 != nil {
Usage() Usage()
return return
} }
factory196 := thrift.NewTJSONProtocolFactory() factory246 := thrift.NewTJSONProtocolFactory()
jsProt197 := factory196.GetProtocol(mbTrans194) jsProt247 := factory246.GetProtocol(mbTrans244)
containerStruct1 := aurora.NewAuroraSchedulerManagerRestartShardsArgs() containerStruct1 := aurora.NewAuroraSchedulerManagerRestartShardsArgs()
err198 := containerStruct1.ReadField2(jsProt197) err248 := containerStruct1.ReadField2(context.Background(), jsProt247)
if err198 != nil { if err248 != nil {
Usage() Usage()
return return
} }
@ -318,36 +318,36 @@ func main() {
fmt.Fprintln(os.Stderr, "KillTasks requires 3 args") fmt.Fprintln(os.Stderr, "KillTasks requires 3 args")
flag.Usage() flag.Usage()
} }
arg199 := flag.Arg(1) arg249 := flag.Arg(1)
mbTrans200 := thrift.NewTMemoryBufferLen(len(arg199)) mbTrans250 := thrift.NewTMemoryBufferLen(len(arg249))
defer mbTrans200.Close() defer mbTrans250.Close()
_, err201 := mbTrans200.WriteString(arg199) _, err251 := mbTrans250.WriteString(arg249)
if err201 != nil { if err251 != nil {
Usage() Usage()
return return
} }
factory202 := thrift.NewTJSONProtocolFactory() factory252 := thrift.NewTJSONProtocolFactory()
jsProt203 := factory202.GetProtocol(mbTrans200) jsProt253 := factory252.GetProtocol(mbTrans250)
argvalue0 := aurora.NewJobKey() argvalue0 := aurora.NewJobKey()
err204 := argvalue0.Read(jsProt203) err254 := argvalue0.Read(context.Background(), jsProt253)
if err204 != nil { if err254 != nil {
Usage() Usage()
return return
} }
value0 := argvalue0 value0 := argvalue0
arg205 := flag.Arg(2) arg255 := flag.Arg(2)
mbTrans206 := thrift.NewTMemoryBufferLen(len(arg205)) mbTrans256 := thrift.NewTMemoryBufferLen(len(arg255))
defer mbTrans206.Close() defer mbTrans256.Close()
_, err207 := mbTrans206.WriteString(arg205) _, err257 := mbTrans256.WriteString(arg255)
if err207 != nil { if err257 != nil {
Usage() Usage()
return return
} }
factory208 := thrift.NewTJSONProtocolFactory() factory258 := thrift.NewTJSONProtocolFactory()
jsProt209 := factory208.GetProtocol(mbTrans206) jsProt259 := factory258.GetProtocol(mbTrans256)
containerStruct1 := aurora.NewAuroraSchedulerManagerKillTasksArgs() containerStruct1 := aurora.NewAuroraSchedulerManagerKillTasksArgs()
err210 := containerStruct1.ReadField2(jsProt209) err260 := containerStruct1.ReadField2(context.Background(), jsProt259)
if err210 != nil { if err260 != nil {
Usage() Usage()
return return
} }
@ -363,25 +363,25 @@ func main() {
fmt.Fprintln(os.Stderr, "AddInstances requires 2 args") fmt.Fprintln(os.Stderr, "AddInstances requires 2 args")
flag.Usage() flag.Usage()
} }
arg212 := flag.Arg(1) arg262 := flag.Arg(1)
mbTrans213 := thrift.NewTMemoryBufferLen(len(arg212)) mbTrans263 := thrift.NewTMemoryBufferLen(len(arg262))
defer mbTrans213.Close() defer mbTrans263.Close()
_, err214 := mbTrans213.WriteString(arg212) _, err264 := mbTrans263.WriteString(arg262)
if err214 != nil { if err264 != nil {
Usage() Usage()
return return
} }
factory215 := thrift.NewTJSONProtocolFactory() factory265 := thrift.NewTJSONProtocolFactory()
jsProt216 := factory215.GetProtocol(mbTrans213) jsProt266 := factory265.GetProtocol(mbTrans263)
argvalue0 := aurora.NewInstanceKey() argvalue0 := aurora.NewInstanceKey()
err217 := argvalue0.Read(jsProt216) err267 := argvalue0.Read(context.Background(), jsProt266)
if err217 != nil { if err267 != nil {
Usage() Usage()
return return
} }
value0 := argvalue0 value0 := argvalue0
tmp1, err218 := (strconv.Atoi(flag.Arg(2))) tmp1, err268 := (strconv.Atoi(flag.Arg(2)))
if err218 != nil { if err268 != nil {
Usage() Usage()
return return
} }
@ -395,19 +395,19 @@ func main() {
fmt.Fprintln(os.Stderr, "ReplaceCronTemplate requires 1 args") fmt.Fprintln(os.Stderr, "ReplaceCronTemplate requires 1 args")
flag.Usage() flag.Usage()
} }
arg219 := flag.Arg(1) arg269 := flag.Arg(1)
mbTrans220 := thrift.NewTMemoryBufferLen(len(arg219)) mbTrans270 := thrift.NewTMemoryBufferLen(len(arg269))
defer mbTrans220.Close() defer mbTrans270.Close()
_, err221 := mbTrans220.WriteString(arg219) _, err271 := mbTrans270.WriteString(arg269)
if err221 != nil { if err271 != nil {
Usage() Usage()
return return
} }
factory222 := thrift.NewTJSONProtocolFactory() factory272 := thrift.NewTJSONProtocolFactory()
jsProt223 := factory222.GetProtocol(mbTrans220) jsProt273 := factory272.GetProtocol(mbTrans270)
argvalue0 := aurora.NewJobConfiguration() argvalue0 := aurora.NewJobConfiguration()
err224 := argvalue0.Read(jsProt223) err274 := argvalue0.Read(context.Background(), jsProt273)
if err224 != nil { if err274 != nil {
Usage() Usage()
return return
} }
@ -420,19 +420,19 @@ func main() {
fmt.Fprintln(os.Stderr, "StartJobUpdate requires 2 args") fmt.Fprintln(os.Stderr, "StartJobUpdate requires 2 args")
flag.Usage() flag.Usage()
} }
arg225 := flag.Arg(1) arg275 := flag.Arg(1)
mbTrans226 := thrift.NewTMemoryBufferLen(len(arg225)) mbTrans276 := thrift.NewTMemoryBufferLen(len(arg275))
defer mbTrans226.Close() defer mbTrans276.Close()
_, err227 := mbTrans226.WriteString(arg225) _, err277 := mbTrans276.WriteString(arg275)
if err227 != nil { if err277 != nil {
Usage() Usage()
return return
} }
factory228 := thrift.NewTJSONProtocolFactory() factory278 := thrift.NewTJSONProtocolFactory()
jsProt229 := factory228.GetProtocol(mbTrans226) jsProt279 := factory278.GetProtocol(mbTrans276)
argvalue0 := aurora.NewJobUpdateRequest() argvalue0 := aurora.NewJobUpdateRequest()
err230 := argvalue0.Read(jsProt229) err280 := argvalue0.Read(context.Background(), jsProt279)
if err230 != nil { if err280 != nil {
Usage() Usage()
return return
} }
@ -447,19 +447,19 @@ func main() {
fmt.Fprintln(os.Stderr, "PauseJobUpdate requires 2 args") fmt.Fprintln(os.Stderr, "PauseJobUpdate requires 2 args")
flag.Usage() flag.Usage()
} }
arg232 := flag.Arg(1) arg282 := flag.Arg(1)
mbTrans233 := thrift.NewTMemoryBufferLen(len(arg232)) mbTrans283 := thrift.NewTMemoryBufferLen(len(arg282))
defer mbTrans233.Close() defer mbTrans283.Close()
_, err234 := mbTrans233.WriteString(arg232) _, err284 := mbTrans283.WriteString(arg282)
if err234 != nil { if err284 != nil {
Usage() Usage()
return return
} }
factory235 := thrift.NewTJSONProtocolFactory() factory285 := thrift.NewTJSONProtocolFactory()
jsProt236 := factory235.GetProtocol(mbTrans233) jsProt286 := factory285.GetProtocol(mbTrans283)
argvalue0 := aurora.NewJobUpdateKey() argvalue0 := aurora.NewJobUpdateKey()
err237 := argvalue0.Read(jsProt236) err287 := argvalue0.Read(context.Background(), jsProt286)
if err237 != nil { if err287 != nil {
Usage() Usage()
return return
} }
@ -474,19 +474,19 @@ func main() {
fmt.Fprintln(os.Stderr, "ResumeJobUpdate requires 2 args") fmt.Fprintln(os.Stderr, "ResumeJobUpdate requires 2 args")
flag.Usage() flag.Usage()
} }
arg239 := flag.Arg(1) arg289 := flag.Arg(1)
mbTrans240 := thrift.NewTMemoryBufferLen(len(arg239)) mbTrans290 := thrift.NewTMemoryBufferLen(len(arg289))
defer mbTrans240.Close() defer mbTrans290.Close()
_, err241 := mbTrans240.WriteString(arg239) _, err291 := mbTrans290.WriteString(arg289)
if err241 != nil { if err291 != nil {
Usage() Usage()
return return
} }
factory242 := thrift.NewTJSONProtocolFactory() factory292 := thrift.NewTJSONProtocolFactory()
jsProt243 := factory242.GetProtocol(mbTrans240) jsProt293 := factory292.GetProtocol(mbTrans290)
argvalue0 := aurora.NewJobUpdateKey() argvalue0 := aurora.NewJobUpdateKey()
err244 := argvalue0.Read(jsProt243) err294 := argvalue0.Read(context.Background(), jsProt293)
if err244 != nil { if err294 != nil {
Usage() Usage()
return return
} }
@ -501,19 +501,19 @@ func main() {
fmt.Fprintln(os.Stderr, "AbortJobUpdate requires 2 args") fmt.Fprintln(os.Stderr, "AbortJobUpdate requires 2 args")
flag.Usage() flag.Usage()
} }
arg246 := flag.Arg(1) arg296 := flag.Arg(1)
mbTrans247 := thrift.NewTMemoryBufferLen(len(arg246)) mbTrans297 := thrift.NewTMemoryBufferLen(len(arg296))
defer mbTrans247.Close() defer mbTrans297.Close()
_, err248 := mbTrans247.WriteString(arg246) _, err298 := mbTrans297.WriteString(arg296)
if err248 != nil { if err298 != nil {
Usage() Usage()
return return
} }
factory249 := thrift.NewTJSONProtocolFactory() factory299 := thrift.NewTJSONProtocolFactory()
jsProt250 := factory249.GetProtocol(mbTrans247) jsProt300 := factory299.GetProtocol(mbTrans297)
argvalue0 := aurora.NewJobUpdateKey() argvalue0 := aurora.NewJobUpdateKey()
err251 := argvalue0.Read(jsProt250) err301 := argvalue0.Read(context.Background(), jsProt300)
if err251 != nil { if err301 != nil {
Usage() Usage()
return return
} }
@ -528,19 +528,19 @@ func main() {
fmt.Fprintln(os.Stderr, "RollbackJobUpdate requires 2 args") fmt.Fprintln(os.Stderr, "RollbackJobUpdate requires 2 args")
flag.Usage() flag.Usage()
} }
arg253 := flag.Arg(1) arg303 := flag.Arg(1)
mbTrans254 := thrift.NewTMemoryBufferLen(len(arg253)) mbTrans304 := thrift.NewTMemoryBufferLen(len(arg303))
defer mbTrans254.Close() defer mbTrans304.Close()
_, err255 := mbTrans254.WriteString(arg253) _, err305 := mbTrans304.WriteString(arg303)
if err255 != nil { if err305 != nil {
Usage() Usage()
return return
} }
factory256 := thrift.NewTJSONProtocolFactory() factory306 := thrift.NewTJSONProtocolFactory()
jsProt257 := factory256.GetProtocol(mbTrans254) jsProt307 := factory306.GetProtocol(mbTrans304)
argvalue0 := aurora.NewJobUpdateKey() argvalue0 := aurora.NewJobUpdateKey()
err258 := argvalue0.Read(jsProt257) err308 := argvalue0.Read(context.Background(), jsProt307)
if err258 != nil { if err308 != nil {
Usage() Usage()
return return
} }
@ -555,19 +555,19 @@ func main() {
fmt.Fprintln(os.Stderr, "PulseJobUpdate requires 1 args") fmt.Fprintln(os.Stderr, "PulseJobUpdate requires 1 args")
flag.Usage() flag.Usage()
} }
arg260 := flag.Arg(1) arg310 := flag.Arg(1)
mbTrans261 := thrift.NewTMemoryBufferLen(len(arg260)) mbTrans311 := thrift.NewTMemoryBufferLen(len(arg310))
defer mbTrans261.Close() defer mbTrans311.Close()
_, err262 := mbTrans261.WriteString(arg260) _, err312 := mbTrans311.WriteString(arg310)
if err262 != nil { if err312 != nil {
Usage() Usage()
return return
} }
factory263 := thrift.NewTJSONProtocolFactory() factory313 := thrift.NewTJSONProtocolFactory()
jsProt264 := factory263.GetProtocol(mbTrans261) jsProt314 := factory313.GetProtocol(mbTrans311)
argvalue0 := aurora.NewJobUpdateKey() argvalue0 := aurora.NewJobUpdateKey()
err265 := argvalue0.Read(jsProt264) err315 := argvalue0.Read(context.Background(), jsProt314)
if err265 != nil { if err315 != nil {
Usage() Usage()
return return
} }
@ -598,19 +598,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetTasksStatus requires 1 args") fmt.Fprintln(os.Stderr, "GetTasksStatus requires 1 args")
flag.Usage() flag.Usage()
} }
arg267 := flag.Arg(1) arg317 := flag.Arg(1)
mbTrans268 := thrift.NewTMemoryBufferLen(len(arg267)) mbTrans318 := thrift.NewTMemoryBufferLen(len(arg317))
defer mbTrans268.Close() defer mbTrans318.Close()
_, err269 := mbTrans268.WriteString(arg267) _, err319 := mbTrans318.WriteString(arg317)
if err269 != nil { if err319 != nil {
Usage() Usage()
return return
} }
factory270 := thrift.NewTJSONProtocolFactory() factory320 := thrift.NewTJSONProtocolFactory()
jsProt271 := factory270.GetProtocol(mbTrans268) jsProt321 := factory320.GetProtocol(mbTrans318)
argvalue0 := aurora.NewTaskQuery() argvalue0 := aurora.NewTaskQuery()
err272 := argvalue0.Read(jsProt271) err322 := argvalue0.Read(context.Background(), jsProt321)
if err272 != nil { if err322 != nil {
Usage() Usage()
return return
} }
@ -623,19 +623,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetTasksWithoutConfigs requires 1 args") fmt.Fprintln(os.Stderr, "GetTasksWithoutConfigs requires 1 args")
flag.Usage() flag.Usage()
} }
arg273 := flag.Arg(1) arg323 := flag.Arg(1)
mbTrans274 := thrift.NewTMemoryBufferLen(len(arg273)) mbTrans324 := thrift.NewTMemoryBufferLen(len(arg323))
defer mbTrans274.Close() defer mbTrans324.Close()
_, err275 := mbTrans274.WriteString(arg273) _, err325 := mbTrans324.WriteString(arg323)
if err275 != nil { if err325 != nil {
Usage() Usage()
return return
} }
factory276 := thrift.NewTJSONProtocolFactory() factory326 := thrift.NewTJSONProtocolFactory()
jsProt277 := factory276.GetProtocol(mbTrans274) jsProt327 := factory326.GetProtocol(mbTrans324)
argvalue0 := aurora.NewTaskQuery() argvalue0 := aurora.NewTaskQuery()
err278 := argvalue0.Read(jsProt277) err328 := argvalue0.Read(context.Background(), jsProt327)
if err278 != nil { if err328 != nil {
Usage() Usage()
return return
} }
@ -648,19 +648,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetPendingReason requires 1 args") fmt.Fprintln(os.Stderr, "GetPendingReason requires 1 args")
flag.Usage() flag.Usage()
} }
arg279 := flag.Arg(1) arg329 := flag.Arg(1)
mbTrans280 := thrift.NewTMemoryBufferLen(len(arg279)) mbTrans330 := thrift.NewTMemoryBufferLen(len(arg329))
defer mbTrans280.Close() defer mbTrans330.Close()
_, err281 := mbTrans280.WriteString(arg279) _, err331 := mbTrans330.WriteString(arg329)
if err281 != nil { if err331 != nil {
Usage() Usage()
return return
} }
factory282 := thrift.NewTJSONProtocolFactory() factory332 := thrift.NewTJSONProtocolFactory()
jsProt283 := factory282.GetProtocol(mbTrans280) jsProt333 := factory332.GetProtocol(mbTrans330)
argvalue0 := aurora.NewTaskQuery() argvalue0 := aurora.NewTaskQuery()
err284 := argvalue0.Read(jsProt283) err334 := argvalue0.Read(context.Background(), jsProt333)
if err284 != nil { if err334 != nil {
Usage() Usage()
return return
} }
@ -673,19 +673,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetConfigSummary requires 1 args") fmt.Fprintln(os.Stderr, "GetConfigSummary requires 1 args")
flag.Usage() flag.Usage()
} }
arg285 := flag.Arg(1) arg335 := flag.Arg(1)
mbTrans286 := thrift.NewTMemoryBufferLen(len(arg285)) mbTrans336 := thrift.NewTMemoryBufferLen(len(arg335))
defer mbTrans286.Close() defer mbTrans336.Close()
_, err287 := mbTrans286.WriteString(arg285) _, err337 := mbTrans336.WriteString(arg335)
if err287 != nil { if err337 != nil {
Usage() Usage()
return return
} }
factory288 := thrift.NewTJSONProtocolFactory() factory338 := thrift.NewTJSONProtocolFactory()
jsProt289 := factory288.GetProtocol(mbTrans286) jsProt339 := factory338.GetProtocol(mbTrans336)
argvalue0 := aurora.NewJobKey() argvalue0 := aurora.NewJobKey()
err290 := argvalue0.Read(jsProt289) err340 := argvalue0.Read(context.Background(), jsProt339)
if err290 != nil { if err340 != nil {
Usage() Usage()
return return
} }
@ -718,19 +718,19 @@ func main() {
fmt.Fprintln(os.Stderr, "PopulateJobConfig requires 1 args") fmt.Fprintln(os.Stderr, "PopulateJobConfig requires 1 args")
flag.Usage() flag.Usage()
} }
arg293 := flag.Arg(1) arg343 := flag.Arg(1)
mbTrans294 := thrift.NewTMemoryBufferLen(len(arg293)) mbTrans344 := thrift.NewTMemoryBufferLen(len(arg343))
defer mbTrans294.Close() defer mbTrans344.Close()
_, err295 := mbTrans294.WriteString(arg293) _, err345 := mbTrans344.WriteString(arg343)
if err295 != nil { if err345 != nil {
Usage() Usage()
return return
} }
factory296 := thrift.NewTJSONProtocolFactory() factory346 := thrift.NewTJSONProtocolFactory()
jsProt297 := factory296.GetProtocol(mbTrans294) jsProt347 := factory346.GetProtocol(mbTrans344)
argvalue0 := aurora.NewJobConfiguration() argvalue0 := aurora.NewJobConfiguration()
err298 := argvalue0.Read(jsProt297) err348 := argvalue0.Read(context.Background(), jsProt347)
if err298 != nil { if err348 != nil {
Usage() Usage()
return return
} }
@ -743,19 +743,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetJobUpdateSummaries requires 1 args") fmt.Fprintln(os.Stderr, "GetJobUpdateSummaries requires 1 args")
flag.Usage() flag.Usage()
} }
arg299 := flag.Arg(1) arg349 := flag.Arg(1)
mbTrans300 := thrift.NewTMemoryBufferLen(len(arg299)) mbTrans350 := thrift.NewTMemoryBufferLen(len(arg349))
defer mbTrans300.Close() defer mbTrans350.Close()
_, err301 := mbTrans300.WriteString(arg299) _, err351 := mbTrans350.WriteString(arg349)
if err301 != nil { if err351 != nil {
Usage() Usage()
return return
} }
factory302 := thrift.NewTJSONProtocolFactory() factory352 := thrift.NewTJSONProtocolFactory()
jsProt303 := factory302.GetProtocol(mbTrans300) jsProt353 := factory352.GetProtocol(mbTrans350)
argvalue0 := aurora.NewJobUpdateQuery() argvalue0 := aurora.NewJobUpdateQuery()
err304 := argvalue0.Read(jsProt303) err354 := argvalue0.Read(context.Background(), jsProt353)
if err304 != nil { if err354 != nil {
Usage() Usage()
return return
} }
@ -768,19 +768,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetJobUpdateDetails requires 1 args") fmt.Fprintln(os.Stderr, "GetJobUpdateDetails requires 1 args")
flag.Usage() flag.Usage()
} }
arg305 := flag.Arg(1) arg355 := flag.Arg(1)
mbTrans306 := thrift.NewTMemoryBufferLen(len(arg305)) mbTrans356 := thrift.NewTMemoryBufferLen(len(arg355))
defer mbTrans306.Close() defer mbTrans356.Close()
_, err307 := mbTrans306.WriteString(arg305) _, err357 := mbTrans356.WriteString(arg355)
if err307 != nil { if err357 != nil {
Usage() Usage()
return return
} }
factory308 := thrift.NewTJSONProtocolFactory() factory358 := thrift.NewTJSONProtocolFactory()
jsProt309 := factory308.GetProtocol(mbTrans306) jsProt359 := factory358.GetProtocol(mbTrans356)
argvalue0 := aurora.NewJobUpdateQuery() argvalue0 := aurora.NewJobUpdateQuery()
err310 := argvalue0.Read(jsProt309) err360 := argvalue0.Read(context.Background(), jsProt359)
if err310 != nil { if err360 != nil {
Usage() Usage()
return return
} }
@ -793,19 +793,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetJobUpdateDiff requires 1 args") fmt.Fprintln(os.Stderr, "GetJobUpdateDiff requires 1 args")
flag.Usage() flag.Usage()
} }
arg311 := flag.Arg(1) arg361 := flag.Arg(1)
mbTrans312 := thrift.NewTMemoryBufferLen(len(arg311)) mbTrans362 := thrift.NewTMemoryBufferLen(len(arg361))
defer mbTrans312.Close() defer mbTrans362.Close()
_, err313 := mbTrans312.WriteString(arg311) _, err363 := mbTrans362.WriteString(arg361)
if err313 != nil { if err363 != nil {
Usage() Usage()
return return
} }
factory314 := thrift.NewTJSONProtocolFactory() factory364 := thrift.NewTJSONProtocolFactory()
jsProt315 := factory314.GetProtocol(mbTrans312) jsProt365 := factory364.GetProtocol(mbTrans362)
argvalue0 := aurora.NewJobUpdateRequest() argvalue0 := aurora.NewJobUpdateRequest()
err316 := argvalue0.Read(jsProt315) err366 := argvalue0.Read(context.Background(), jsProt365)
if err316 != nil { if err366 != nil {
Usage() Usage()
return return
} }

View file

@ -1,22 +1,22 @@
// Autogenerated by Thrift Compiler (0.12.0) // Code generated by Thrift Compiler (0.14.0). DO NOT EDIT.
// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
package main package main
import ( import (
"context" "context"
"flag" "flag"
"fmt" "fmt"
"math" "math"
"net" "net"
"net/url" "net/url"
"os" "os"
"strconv" "strconv"
"strings" "strings"
"github.com/apache/thrift/lib/go/thrift" "github.com/apache/thrift/lib/go/thrift"
"apache/aurora" "apache/aurora"
) )
var _ = aurora.GoUnusedProtection__
func Usage() { func Usage() {
fmt.Fprintln(os.Stderr, "Usage of ", os.Args[0], " [-h host:port] [-u url] [-f[ramed]] function [arg1 [arg2...]]:") fmt.Fprintln(os.Stderr, "Usage of ", os.Args[0], " [-h host:port] [-u url] [-f[ramed]] function [arg1 [arg2...]]:")
@ -179,19 +179,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetTasksStatus requires 1 args") fmt.Fprintln(os.Stderr, "GetTasksStatus requires 1 args")
flag.Usage() flag.Usage()
} }
arg82 := flag.Arg(1) arg132 := flag.Arg(1)
mbTrans83 := thrift.NewTMemoryBufferLen(len(arg82)) mbTrans133 := thrift.NewTMemoryBufferLen(len(arg132))
defer mbTrans83.Close() defer mbTrans133.Close()
_, err84 := mbTrans83.WriteString(arg82) _, err134 := mbTrans133.WriteString(arg132)
if err84 != nil { if err134 != nil {
Usage() Usage()
return return
} }
factory85 := thrift.NewTJSONProtocolFactory() factory135 := thrift.NewTJSONProtocolFactory()
jsProt86 := factory85.GetProtocol(mbTrans83) jsProt136 := factory135.GetProtocol(mbTrans133)
argvalue0 := aurora.NewTaskQuery() argvalue0 := aurora.NewTaskQuery()
err87 := argvalue0.Read(jsProt86) err137 := argvalue0.Read(context.Background(), jsProt136)
if err87 != nil { if err137 != nil {
Usage() Usage()
return return
} }
@ -204,19 +204,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetTasksWithoutConfigs requires 1 args") fmt.Fprintln(os.Stderr, "GetTasksWithoutConfigs requires 1 args")
flag.Usage() flag.Usage()
} }
arg88 := flag.Arg(1) arg138 := flag.Arg(1)
mbTrans89 := thrift.NewTMemoryBufferLen(len(arg88)) mbTrans139 := thrift.NewTMemoryBufferLen(len(arg138))
defer mbTrans89.Close() defer mbTrans139.Close()
_, err90 := mbTrans89.WriteString(arg88) _, err140 := mbTrans139.WriteString(arg138)
if err90 != nil { if err140 != nil {
Usage() Usage()
return return
} }
factory91 := thrift.NewTJSONProtocolFactory() factory141 := thrift.NewTJSONProtocolFactory()
jsProt92 := factory91.GetProtocol(mbTrans89) jsProt142 := factory141.GetProtocol(mbTrans139)
argvalue0 := aurora.NewTaskQuery() argvalue0 := aurora.NewTaskQuery()
err93 := argvalue0.Read(jsProt92) err143 := argvalue0.Read(context.Background(), jsProt142)
if err93 != nil { if err143 != nil {
Usage() Usage()
return return
} }
@ -229,19 +229,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetPendingReason requires 1 args") fmt.Fprintln(os.Stderr, "GetPendingReason requires 1 args")
flag.Usage() flag.Usage()
} }
arg94 := flag.Arg(1) arg144 := flag.Arg(1)
mbTrans95 := thrift.NewTMemoryBufferLen(len(arg94)) mbTrans145 := thrift.NewTMemoryBufferLen(len(arg144))
defer mbTrans95.Close() defer mbTrans145.Close()
_, err96 := mbTrans95.WriteString(arg94) _, err146 := mbTrans145.WriteString(arg144)
if err96 != nil { if err146 != nil {
Usage() Usage()
return return
} }
factory97 := thrift.NewTJSONProtocolFactory() factory147 := thrift.NewTJSONProtocolFactory()
jsProt98 := factory97.GetProtocol(mbTrans95) jsProt148 := factory147.GetProtocol(mbTrans145)
argvalue0 := aurora.NewTaskQuery() argvalue0 := aurora.NewTaskQuery()
err99 := argvalue0.Read(jsProt98) err149 := argvalue0.Read(context.Background(), jsProt148)
if err99 != nil { if err149 != nil {
Usage() Usage()
return return
} }
@ -254,19 +254,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetConfigSummary requires 1 args") fmt.Fprintln(os.Stderr, "GetConfigSummary requires 1 args")
flag.Usage() flag.Usage()
} }
arg100 := flag.Arg(1) arg150 := flag.Arg(1)
mbTrans101 := thrift.NewTMemoryBufferLen(len(arg100)) mbTrans151 := thrift.NewTMemoryBufferLen(len(arg150))
defer mbTrans101.Close() defer mbTrans151.Close()
_, err102 := mbTrans101.WriteString(arg100) _, err152 := mbTrans151.WriteString(arg150)
if err102 != nil { if err152 != nil {
Usage() Usage()
return return
} }
factory103 := thrift.NewTJSONProtocolFactory() factory153 := thrift.NewTJSONProtocolFactory()
jsProt104 := factory103.GetProtocol(mbTrans101) jsProt154 := factory153.GetProtocol(mbTrans151)
argvalue0 := aurora.NewJobKey() argvalue0 := aurora.NewJobKey()
err105 := argvalue0.Read(jsProt104) err155 := argvalue0.Read(context.Background(), jsProt154)
if err105 != nil { if err155 != nil {
Usage() Usage()
return return
} }
@ -299,19 +299,19 @@ func main() {
fmt.Fprintln(os.Stderr, "PopulateJobConfig requires 1 args") fmt.Fprintln(os.Stderr, "PopulateJobConfig requires 1 args")
flag.Usage() flag.Usage()
} }
arg108 := flag.Arg(1) arg158 := flag.Arg(1)
mbTrans109 := thrift.NewTMemoryBufferLen(len(arg108)) mbTrans159 := thrift.NewTMemoryBufferLen(len(arg158))
defer mbTrans109.Close() defer mbTrans159.Close()
_, err110 := mbTrans109.WriteString(arg108) _, err160 := mbTrans159.WriteString(arg158)
if err110 != nil { if err160 != nil {
Usage() Usage()
return return
} }
factory111 := thrift.NewTJSONProtocolFactory() factory161 := thrift.NewTJSONProtocolFactory()
jsProt112 := factory111.GetProtocol(mbTrans109) jsProt162 := factory161.GetProtocol(mbTrans159)
argvalue0 := aurora.NewJobConfiguration() argvalue0 := aurora.NewJobConfiguration()
err113 := argvalue0.Read(jsProt112) err163 := argvalue0.Read(context.Background(), jsProt162)
if err113 != nil { if err163 != nil {
Usage() Usage()
return return
} }
@ -324,19 +324,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetJobUpdateSummaries requires 1 args") fmt.Fprintln(os.Stderr, "GetJobUpdateSummaries requires 1 args")
flag.Usage() flag.Usage()
} }
arg114 := flag.Arg(1) arg164 := flag.Arg(1)
mbTrans115 := thrift.NewTMemoryBufferLen(len(arg114)) mbTrans165 := thrift.NewTMemoryBufferLen(len(arg164))
defer mbTrans115.Close() defer mbTrans165.Close()
_, err116 := mbTrans115.WriteString(arg114) _, err166 := mbTrans165.WriteString(arg164)
if err116 != nil { if err166 != nil {
Usage() Usage()
return return
} }
factory117 := thrift.NewTJSONProtocolFactory() factory167 := thrift.NewTJSONProtocolFactory()
jsProt118 := factory117.GetProtocol(mbTrans115) jsProt168 := factory167.GetProtocol(mbTrans165)
argvalue0 := aurora.NewJobUpdateQuery() argvalue0 := aurora.NewJobUpdateQuery()
err119 := argvalue0.Read(jsProt118) err169 := argvalue0.Read(context.Background(), jsProt168)
if err119 != nil { if err169 != nil {
Usage() Usage()
return return
} }
@ -349,19 +349,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetJobUpdateDetails requires 1 args") fmt.Fprintln(os.Stderr, "GetJobUpdateDetails requires 1 args")
flag.Usage() flag.Usage()
} }
arg120 := flag.Arg(1) arg170 := flag.Arg(1)
mbTrans121 := thrift.NewTMemoryBufferLen(len(arg120)) mbTrans171 := thrift.NewTMemoryBufferLen(len(arg170))
defer mbTrans121.Close() defer mbTrans171.Close()
_, err122 := mbTrans121.WriteString(arg120) _, err172 := mbTrans171.WriteString(arg170)
if err122 != nil { if err172 != nil {
Usage() Usage()
return return
} }
factory123 := thrift.NewTJSONProtocolFactory() factory173 := thrift.NewTJSONProtocolFactory()
jsProt124 := factory123.GetProtocol(mbTrans121) jsProt174 := factory173.GetProtocol(mbTrans171)
argvalue0 := aurora.NewJobUpdateQuery() argvalue0 := aurora.NewJobUpdateQuery()
err125 := argvalue0.Read(jsProt124) err175 := argvalue0.Read(context.Background(), jsProt174)
if err125 != nil { if err175 != nil {
Usage() Usage()
return return
} }
@ -374,19 +374,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetJobUpdateDiff requires 1 args") fmt.Fprintln(os.Stderr, "GetJobUpdateDiff requires 1 args")
flag.Usage() flag.Usage()
} }
arg126 := flag.Arg(1) arg176 := flag.Arg(1)
mbTrans127 := thrift.NewTMemoryBufferLen(len(arg126)) mbTrans177 := thrift.NewTMemoryBufferLen(len(arg176))
defer mbTrans127.Close() defer mbTrans177.Close()
_, err128 := mbTrans127.WriteString(arg126) _, err178 := mbTrans177.WriteString(arg176)
if err128 != nil { if err178 != nil {
Usage() Usage()
return return
} }
factory129 := thrift.NewTJSONProtocolFactory() factory179 := thrift.NewTJSONProtocolFactory()
jsProt130 := factory129.GetProtocol(mbTrans127) jsProt180 := factory179.GetProtocol(mbTrans177)
argvalue0 := aurora.NewJobUpdateRequest() argvalue0 := aurora.NewJobUpdateRequest()
err131 := argvalue0.Read(jsProt130) err181 := argvalue0.Read(context.Background(), jsProt180)
if err131 != nil { if err181 != nil {
Usage() Usage()
return return
} }

View file

@ -1,6 +1,6 @@
#! /bin/bash #! /bin/bash
THRIFT_VER=0.12.0 THRIFT_VER=0.14.0
if [[ $(thrift -version | grep -e $THRIFT_VER -c) -ne 1 ]]; then if [[ $(thrift -version | grep -e $THRIFT_VER -c) -ne 1 ]]; then
echo "Warning: This wrapper has only been tested with version" $THRIFT_VER; echo "Warning: This wrapper has only been tested with version" $THRIFT_VER;

12
go.mod
View file

@ -1,12 +1,12 @@
module github.com/paypal/gorealis module github.com/paypal/gorealis
go 1.12 go 1.13
require ( require (
github.com/apache/thrift v0.12.0 github.com/apache/thrift v0.14.0
github.com/davecgh/go-spew v1.1.0 github.com/davecgh/go-spew v1.1.0 // indirect
github.com/pkg/errors v0.0.0-20171216070316-e881fd58d78e github.com/pkg/errors v0.9.1
github.com/pmezard/go-difflib v1.0.0 github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/samuel/go-zookeeper v0.0.0-20171117190445-471cd4e61d7a github.com/samuel/go-zookeeper v0.0.0-20171117190445-471cd4e61d7a
github.com/stretchr/testify v1.2.0 github.com/stretchr/testify v1.7.0
) )

25
go.sum
View file

@ -1,9 +1,30 @@
github.com/apache/thrift v0.12.0 h1:pODnxUFNcjP9UTLZGTdeh+j16A8lJbRvD3rOtrk/7bs= github.com/apache/thrift v0.13.0 h1:5hryIiq9gtn+MiLVn0wP37kb/uTeRZgN08WoCsAhIhI=
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.14.0 h1:vqZ2DP42i8th2OsgCcYZkirtbzvpZEFx53LiWDJXIAs=
github.com/apache/thrift v0.14.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pkg/errors v0.0.0-20171216070316-e881fd58d78e h1:+RHxT/gm0O3UF7nLJbdNzAmULvCFt4XfXHWzh3XI/zs= github.com/pkg/errors v0.0.0-20171216070316-e881fd58d78e h1:+RHxT/gm0O3UF7nLJbdNzAmULvCFt4XfXHWzh3XI/zs=
github.com/pkg/errors v0.0.0-20171216070316-e881fd58d78e/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.0.0-20171216070316-e881fd58d78e/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/ridv/thrift v0.12.1 h1:b80V1Oa2Mbd++jrlJZbJsIybO5/MCfbXKzd1A5v4aSo=
github.com/ridv/thrift v0.12.1/go.mod h1:yTMRF94RCZjO1fY1xt69yncvMbQCPdRL8BhbwIrjPx8=
github.com/ridv/thrift v0.13.1 h1:/8XnTRUqJJeiuqoL7mfnJQmXQa4GJn9tUCiP7+i6Y9o=
github.com/ridv/thrift v0.13.1/go.mod h1:yTMRF94RCZjO1fY1xt69yncvMbQCPdRL8BhbwIrjPx8=
github.com/ridv/thrift v0.13.2 h1:Q3Smr8poXd7VkWZPHvdJZzlQCJO+b5W37ECfoUL4qHc=
github.com/ridv/thrift v0.13.2/go.mod h1:yTMRF94RCZjO1fY1xt69yncvMbQCPdRL8BhbwIrjPx8=
github.com/samuel/go-zookeeper v0.0.0-20171117190445-471cd4e61d7a h1:EYL2xz/Zdo0hyqdZMXR4lmT2O11jDLTPCEqIe/FR6W4= github.com/samuel/go-zookeeper v0.0.0-20171117190445-471cd4e61d7a h1:EYL2xz/Zdo0hyqdZMXR4lmT2O11jDLTPCEqIe/FR6W4=
github.com/samuel/go-zookeeper v0.0.0-20171117190445-471cd4e61d7a/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= github.com/samuel/go-zookeeper v0.0.0-20171117190445-471cd4e61d7a/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.0 h1:LThGCOvhuJic9Gyd1VBCkhyUXmO8vKaBFvBsJ2k03rg=
github.com/stretchr/testify v1.2.0/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.0/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

21
helpers.go Normal file
View file

@ -0,0 +1,21 @@
package realis
import (
"context"
"github.com/paypal/gorealis/gen-go/apache/aurora"
)
func (r *realisClient) jobExists(key aurora.JobKey) (bool, error) {
resp, err := r.client.GetConfigSummary(context.TODO(), &key)
if err != nil {
return false, err
}
return resp == nil ||
resp.GetResult_() == nil ||
resp.GetResult_().GetConfigSummaryResult_() == nil ||
resp.GetResult_().GetConfigSummaryResult_().GetSummary() == nil ||
resp.GetResponseCode() != aurora.ResponseCode_OK,
nil
}

89
job.go
View file

@ -62,6 +62,7 @@ type Job interface {
PartitionPolicy(policy *aurora.PartitionPolicy) Job PartitionPolicy(policy *aurora.PartitionPolicy) Job
Tier(tier string) Job Tier(tier string) Job
SlaPolicy(policy *aurora.SlaPolicy) Job SlaPolicy(policy *aurora.SlaPolicy) Job
Priority(priority int32) Job
} }
type resourceType int type resourceType int
@ -73,12 +74,15 @@ const (
GPU GPU
) )
const portNamePrefix = "org.apache.aurora.port."
// AuroraJob is a structure to collect all information pertaining to an Aurora job. // AuroraJob is a structure to collect all information pertaining to an Aurora job.
type AuroraJob struct { type AuroraJob struct {
jobConfig *aurora.JobConfiguration jobConfig *aurora.JobConfiguration
resources map[resourceType]*aurora.Resource resources map[resourceType]*aurora.Resource
metadata map[string]*aurora.Metadata metadata map[string]*aurora.Metadata
portCount int constraints map[string]*aurora.Constraint
portCount int
} }
// NewJob is used to create a Job object with everything initialized. // NewJob is used to create a Job object with everything initialized.
@ -109,10 +113,11 @@ func NewJob() Job {
diskMb.DiskMb = new(int64) diskMb.DiskMb = new(int64)
return &AuroraJob{ return &AuroraJob{
jobConfig: jobConfig, jobConfig: jobConfig,
resources: resources, resources: resources,
metadata: make(map[string]*aurora.Metadata), metadata: make(map[string]*aurora.Metadata),
portCount: 0, constraints: make(map[string]*aurora.Constraint),
portCount: 0,
} }
} }
@ -258,12 +263,12 @@ func (j *AuroraJob) AddURIs(extract bool, cache bool, values ...string) Job {
// AddLabel adds a Mesos label to the job. Note that Aurora will add the // AddLabel adds a Mesos label to the job. Note that Aurora will add the
// prefix "org.apache.aurora.metadata." to the beginning of each key. // prefix "org.apache.aurora.metadata." to the beginning of each key.
func (j *AuroraJob) AddLabel(key string, value string) Job { func (j *AuroraJob) AddLabel(key string, value string) Job {
if _, ok := j.metadata[key]; ok { if _, ok := j.metadata[key]; !ok {
j.metadata[key].Value = value j.metadata[key] = &aurora.Metadata{Key: key}
} else {
j.metadata[key] = &aurora.Metadata{Key: key, Value: value}
j.jobConfig.TaskConfig.Metadata = append(j.jobConfig.TaskConfig.Metadata, j.metadata[key]) j.jobConfig.TaskConfig.Metadata = append(j.jobConfig.TaskConfig.Metadata, j.metadata[key])
} }
j.metadata[key].Value = value
return j return j
} }
@ -288,7 +293,7 @@ func (j *AuroraJob) AddPorts(num int) Job {
start := j.portCount start := j.portCount
j.portCount += num j.portCount += num
for i := start; i < j.portCount; i++ { for i := start; i < j.portCount; i++ {
portName := "org.apache.aurora.port." + strconv.Itoa(i) portName := portNamePrefix + strconv.Itoa(i)
j.jobConfig.TaskConfig.Resources = append( j.jobConfig.TaskConfig.Resources = append(
j.jobConfig.TaskConfig.Resources, j.jobConfig.TaskConfig.Resources,
&aurora.Resource{NamedPort: &portName}) &aurora.Resource{NamedPort: &portName})
@ -297,47 +302,56 @@ func (j *AuroraJob) AddPorts(num int) Job {
return j return j
} }
// AddValueConstraint allows the user to add a value constrain to the job to limiti which agents the job's // AddValueConstraint allows the user to add a value constrain to the job to limit which agents the job's
// tasks can be run on. // tasks can be run on. If the name matches a constraint that was previously set, the previous value will be
// overwritten. In case the previous constraint attached to the name was of type limit, the constraint will be clobbered
// by this new Value constraint.
// From Aurora Docs: // From Aurora Docs:
// Add a Value constraint // Add a Value constraint
// name - Mesos slave attribute that the constraint is matched against. // name - Mesos slave attribute that the constraint is matched against.
// If negated = true , treat this as a 'not' - to avoid specific values. // If negated = true , treat this as a 'not' - to avoid specific values.
// Values - list of values we look for in attribute name // Values - list of values we look for in attribute name
func (j *AuroraJob) AddValueConstraint(name string, negated bool, values ...string) Job { func (j *AuroraJob) AddValueConstraint(name string, negated bool, values ...string) Job {
j.jobConfig.TaskConfig.Constraints = append(j.jobConfig.TaskConfig.Constraints, if _, ok := j.constraints[name]; !ok {
&aurora.Constraint{ j.constraints[name] = &aurora.Constraint{Name: name}
Name: name, j.jobConfig.TaskConfig.Constraints = append(j.jobConfig.TaskConfig.Constraints, j.constraints[name])
Constraint: &aurora.TaskConstraint{ }
Value: &aurora.ValueConstraint{
Negated: negated, j.constraints[name].Constraint = &aurora.TaskConstraint{
Values: values, Value: &aurora.ValueConstraint{
}, Negated: negated,
Limit: nil, Values: values,
}, },
}) Limit: nil,
}
return j return j
} }
// AddLimitConstraint allows the user to limit how many tasks form the same Job are run on a single host. // AddLimitConstraint allows the user to limit how many tasks form the same Job are run on a single host.
// If the name matches a constraint that was previously set, the previous value will be
// overwritten. In case the previous constraint attached to the name was of type Value, the constraint will be clobbered
// by this new Limit constraint.
// From Aurora Docs: // From Aurora Docs:
// A constraint that specifies the maximum number of active tasks on a host with // A constraint that specifies the maximum number of active tasks on a host with
// a matching attribute that may be scheduled simultaneously. // a matching attribute that may be scheduled simultaneously.
func (j *AuroraJob) AddLimitConstraint(name string, limit int32) Job { func (j *AuroraJob) AddLimitConstraint(name string, limit int32) Job {
j.jobConfig.TaskConfig.Constraints = append(j.jobConfig.TaskConfig.Constraints, if _, ok := j.constraints[name]; !ok {
&aurora.Constraint{ j.constraints[name] = &aurora.Constraint{Name: name}
Name: name, j.jobConfig.TaskConfig.Constraints = append(j.jobConfig.TaskConfig.Constraints, j.constraints[name])
Constraint: &aurora.TaskConstraint{ }
Value: nil,
Limit: &aurora.LimitConstraint{Limit: limit}, j.constraints[name].Constraint = &aurora.TaskConstraint{
}, Value: nil,
}) Limit: &aurora.LimitConstraint{Limit: limit},
}
return j return j
} }
// AddDedicatedConstraint allows the user to add a dedicated constraint to a Job configuration. // AddDedicatedConstraint is a convenience function that allows the user to
// add a dedicated constraint to a Job configuration.
// In case a previous dedicated constraint was set, it will be clobbered by this new value.
func (j *AuroraJob) AddDedicatedConstraint(role, name string) Job { func (j *AuroraJob) AddDedicatedConstraint(role, name string) Job {
j.AddValueConstraint("dedicated", false, role+"/"+name) j.AddValueConstraint("dedicated", false, role+"/"+name)
@ -370,3 +384,8 @@ func (j *AuroraJob) SlaPolicy(policy *aurora.SlaPolicy) Job {
return j return j
} }
func (j *AuroraJob) Priority(priority int32) Job {
j.jobConfig.TaskConfig.Priority = priority
return j
}

View file

@ -78,8 +78,11 @@ func (m *Monitor) JobUpdateStatus(updateKey aurora.JobUpdateKey,
UpdateStatuses: desiredStatuses, UpdateStatuses: desiredStatuses,
} }
summary, err := m.JobUpdateQuery(updateQ, interval, timeout) summary, err := m.JobUpdateQuery(updateQ, interval, timeout)
if err != nil {
return 0, err
}
return summary[0].State.Status, err return summary[0].State.Status, nil
} }
// JobUpdateQuery polls the scheduler every certain amount of time to see if the query call returns any results. // JobUpdateQuery polls the scheduler every certain amount of time to see if the query call returns any results.
@ -114,7 +117,7 @@ func (m *Monitor) JobUpdateQuery(
} }
} }
// AutoPaused monitor is a special monitor for auto pause enabled batch updates. This monitor ensures that the update // AutoPausedUpdateMonitor is a special monitor for auto pause enabled batch updates. This monitor ensures that the update
// being monitored is capable of auto pausing and has auto pausing enabled. After verifying this information, // being monitored is capable of auto pausing and has auto pausing enabled. After verifying this information,
// the monitor watches for the job to enter the ROLL_FORWARD_PAUSED state and calculates the current batch // the monitor watches for the job to enter the ROLL_FORWARD_PAUSED state and calculates the current batch
// the update is in using information from the update configuration. // the update is in using information from the update configuration.
@ -165,7 +168,8 @@ func (m *Monitor) AutoPausedUpdateMonitor(key aurora.JobUpdateKey, interval, tim
return -1, err return -1, err
} }
if summary[0].State.Status != aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED { if !(summary[0].State.Status == aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED ||
summary[0].State.Status == aurora.JobUpdateStatus_ROLLED_FORWARD) {
return -1, errors.Errorf("update is in a terminal state %v", summary[0].State.Status) return -1, errors.Errorf("update is in a terminal state %v", summary[0].State.Status)
} }
@ -180,7 +184,7 @@ func (m *Monitor) AutoPausedUpdateMonitor(key aurora.JobUpdateKey, interval, tim
return calculateCurrentBatch(int32(len(updatingInstances)), batchSizes), nil return calculateCurrentBatch(int32(len(updatingInstances)), batchSizes), nil
} }
// Monitor a Job until all instances enter one of the LIVE_STATES // Instances will monitor a Job until all instances enter one of the LIVE_STATES
func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval, timeout int) (bool, error) { func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval, timeout int) (bool, error) {
return m.ScheduleStatus(key, instances, LiveStates, interval, timeout) return m.ScheduleStatus(key, instances, LiveStates, interval, timeout)
} }

273
realis.go
View file

@ -18,14 +18,11 @@ package realis
import ( import (
"context" "context"
"crypto/tls" "crypto/tls"
"crypto/x509"
"encoding/base64" "encoding/base64"
"fmt" "fmt"
"io/ioutil"
"log" "log"
"net/http" "net/http"
"os" "os"
"path/filepath"
"sort" "sort"
"strings" "strings"
"sync" "sync"
@ -38,7 +35,7 @@ import (
"github.com/paypal/gorealis/response" "github.com/paypal/gorealis/response"
) )
const version = "1.21.1" const version = "1.24.1"
// Realis is an interface that defines the various APIs that may be used to communicate with // Realis is an interface that defines the various APIs that may be used to communicate with
// the Apache Aurora scheduler. // the Apache Aurora scheduler.
@ -68,7 +65,6 @@ type Realis interface {
RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error)
ScheduleCronJob(auroraJob Job) (*aurora.Response, error) ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error)
PauseJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error) PauseJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error)
ResumeJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error) ResumeJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error)
PulseJobUpdate(key *aurora.JobUpdateKey) (*aurora.Response, error) PulseJobUpdate(key *aurora.JobUpdateKey) (*aurora.Response, error)
@ -117,6 +113,7 @@ type config struct {
logger *LevelLogger logger *LevelLogger
insecureSkipVerify bool insecureSkipVerify bool
certspath string certspath string
certExtensions map[string]struct{}
clientKey, clientCert string clientKey, clientCert string
options []ClientOption options []ClientOption
debug bool debug bool
@ -132,6 +129,15 @@ var defaultBackoff = Backoff{
Jitter: 0.1, Jitter: 0.1,
} }
var defaultSlaPolicy = aurora.SlaPolicy{
PercentageSlaPolicy: &aurora.PercentageSlaPolicy{
Percentage: 66,
DurationSecs: 300,
},
}
const defaultSlaDrainTimeoutSecs = 900
// ClientOption is an alias for a function that modifies the realis config object // ClientOption is an alias for a function that modifies the realis config object
type ClientOption func(*config) type ClientOption func(*config)
@ -229,6 +235,18 @@ func ClientCerts(clientKey, clientCert string) ClientOption {
} }
} }
// CertExtensions configures gorealis to consider files with the given extensions when
// loading certificates from the cert path.
func CertExtensions(extensions ...string) ClientOption {
extensionsLookup := make(map[string]struct{})
for _, ext := range extensions {
extensionsLookup[ext] = struct{}{}
}
return func(config *config) {
config.certExtensions = extensionsLookup
}
}
// ZookeeperOptions allows users to override default settings for connecting to Zookeeper. // ZookeeperOptions allows users to override default settings for connecting to Zookeeper.
// See zk.go for what is possible to set as an option. // See zk.go for what is possible to set as an option.
func ZookeeperOptions(opts ...ZKOpt) ClientOption { func ZookeeperOptions(opts ...ZKOpt) ClientOption {
@ -311,6 +329,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
config.timeoutms = 10000 config.timeoutms = 10000
config.backoff = defaultBackoff config.backoff = defaultBackoff
config.logger = &LevelLogger{logger: log.New(os.Stdout, "realis: ", log.Ltime|log.Ldate|log.LUTC)} config.logger = &LevelLogger{logger: log.New(os.Stdout, "realis: ", log.Ltime|log.Ldate|log.LUTC)}
config.certExtensions = map[string]struct{}{".crt": {}, ".pem": {}, ".key": {}}
// Save options to recreate client if a connection error happens // Save options to recreate client if a connection error happens
config.options = options config.options = options
@ -375,7 +394,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
return nil, errors.New("incomplete Options -- url, cluster.json, or Zookeeper address required") return nil, errors.New("incomplete Options -- url, cluster.json, or Zookeeper address required")
} }
config.logger.Println("Addresss obtained: ", url) config.logger.Println("Address obtained: ", url)
url, err = validateAuroraURL(url) url, err = validateAuroraURL(url)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "invalid Aurora url") return nil, errors.Wrap(err, "invalid Aurora url")
@ -417,7 +436,8 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory), adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory),
logger: LevelLogger{logger: config.logger, debug: config.debug, trace: config.trace}, logger: LevelLogger{logger: config.logger, debug: config.debug, trace: config.trace},
lock: &sync.Mutex{}, lock: &sync.Mutex{},
transport: config.transport}, nil transport: config.transport,
}, nil
} }
// GetDefaultClusterFromZKUrl creates a cluster object from a Zoookeper url. This is deprecated in favor of using // GetDefaultClusterFromZKUrl creates a cluster object from a Zoookeper url. This is deprecated in favor of using
@ -433,23 +453,6 @@ func GetDefaultClusterFromZKUrl(zkurl string) *Cluster {
} }
} }
func createCertPool(certPath string) (*x509.CertPool, error) {
globalRootCAs := x509.NewCertPool()
caFiles, err := ioutil.ReadDir(certPath)
if err != nil {
return nil, err
}
for _, cert := range caFiles {
caPathFile := filepath.Join(certPath, cert.Name())
caCert, err := ioutil.ReadFile(caPathFile)
if err != nil {
return nil, err
}
globalRootCAs.AppendCertsFromPEM(caCert)
}
return globalRootCAs, nil
}
// Creates a default Thrift Transport object for communications in gorealis using an HTTP Post Client // Creates a default Thrift Transport object for communications in gorealis using an HTTP Post Client
func defaultTTransport(url string, timeoutMs int, config *config) (thrift.TTransport, error) { func defaultTTransport(url string, timeoutMs int, config *config) (thrift.TTransport, error) {
var transport http.Transport var transport http.Transport
@ -457,7 +460,7 @@ func defaultTTransport(url string, timeoutMs int, config *config) (thrift.TTrans
tlsConfig := &tls.Config{InsecureSkipVerify: config.insecureSkipVerify} tlsConfig := &tls.Config{InsecureSkipVerify: config.insecureSkipVerify}
if config.certspath != "" { if config.certspath != "" {
rootCAs, err := createCertPool(config.certspath) rootCAs, err := createCertPool(config.certspath, config.certExtensions)
if err != nil { if err != nil {
config.logger.Println("error occurred couldn't fetch certs") config.logger.Println("error occurred couldn't fetch certs")
return nil, err return nil, err
@ -491,11 +494,11 @@ func defaultTTransport(url string, timeoutMs int, config *config) (thrift.TTrans
}) })
if err != nil { if err != nil {
return nil, errors.Wrap(err, "Error creating transport") return nil, errors.Wrap(err, "error creating transport")
} }
if err := trans.Open(); err != nil { if err := trans.Open(); err != nil {
return nil, errors.Wrapf(err, "Error opening connection to %s", url) return nil, errors.Wrapf(err, "error opening connection to %s", url)
} }
return trans, nil return trans, nil
@ -509,6 +512,10 @@ func basicAuth(username, password string) string {
func (r *realisClient) ReestablishConn() error { func (r *realisClient) ReestablishConn() error {
// Close existing connection // Close existing connection
r.logger.Println("Re-establishing Connection to Aurora") r.logger.Println("Re-establishing Connection to Aurora")
// This call must happen before we lock as it also uses
// the same lock from the client since close can be called
// by anyone from anywhere.
r.Close() r.Close()
r.lock.Lock() r.lock.Lock()
@ -534,16 +541,17 @@ func (r *realisClient) ReestablishConn() error {
return nil return nil
} }
// Releases resources associated with the realis client. // Close releases resources associated with the realis client.
func (r *realisClient) Close() { func (r *realisClient) Close() {
r.lock.Lock() r.lock.Lock()
defer r.lock.Unlock() defer r.lock.Unlock()
r.transport.Close() // The return value of Close here is ignored on purpose because there's nothing that can be done if it fails.
_ = r.transport.Close()
} }
// Uses predefined set of states to retrieve a set of active jobs in Apache Aurora. // GetInstanceIds uses a predefined set of states to retrieve a set of active jobs in the Aurora Scheduler.
func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states []aurora.ScheduleStatus) ([]int32, error) { func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states []aurora.ScheduleStatus) ([]int32, error) {
taskQ := &aurora.TaskQuery{ taskQ := &aurora.TaskQuery{
JobKeys: []*aurora.JobKey{{Environment: key.Environment, Role: key.Role, Name: key.Name}}, JobKeys: []*aurora.JobKey{{Environment: key.Environment, Role: key.Role, Name: key.Name}},
@ -556,7 +564,9 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states []aurora.Schedu
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.GetTasksWithoutConfigs(context.TODO(), taskQ) return r.client.GetTasksWithoutConfigs(context.TODO(), taskQ)
}) },
nil,
)
// If we encountered an error we couldn't recover from by retrying, return an error to the user // If we encountered an error we couldn't recover from by retrying, return an error to the user
if retryErr != nil { if retryErr != nil {
@ -581,10 +591,16 @@ func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.readonlyClient.GetJobUpdateSummaries(context.TODO(), jobUpdateQuery) return r.readonlyClient.GetJobUpdateSummaries(context.TODO(), jobUpdateQuery)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error getting job update summaries from Aurora Scheduler") return resp, errors.Wrap(retryErr, "error getting job update summaries from Aurora Scheduler")
}
if resp.GetResult_() == nil || resp.GetResult_().GetGetJobUpdateSummariesResult_() == nil {
return nil, errors.New("unexpected response from scheduler")
} }
return resp, nil return resp, nil
@ -598,7 +614,9 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.readonlyClient.GetJobs(context.TODO(), role) return r.readonlyClient.GetJobs(context.TODO(), role)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, result, errors.Wrap(retryErr, "error getting Jobs from Aurora Scheduler") return nil, result, errors.Wrap(retryErr, "error getting Jobs from Aurora Scheduler")
@ -611,7 +629,7 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe
return resp, result, nil return resp, result, nil
} }
// Kill specific instances of a job. // KillInstances kills specific instances of a job.
func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) { func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) {
r.logger.debugPrintf("KillTasks Thrift Payload: %+v %v\n", key, instances) r.logger.debugPrintf("KillTasks Thrift Payload: %+v %v\n", key, instances)
@ -619,7 +637,9 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.KillTasks(context.TODO(), key, instances, "") return r.client.KillTasks(context.TODO(), key, instances, "")
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler")
@ -631,7 +651,7 @@ func (r *realisClient) RealisConfig() *config {
return r.config return r.config
} }
// Sends a kill message to the scheduler for all active tasks under a job. // KillJob kills all instances of a job.
func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
r.logger.debugPrintf("KillTasks Thrift Payload: %+v\n", key) r.logger.debugPrintf("KillTasks Thrift Payload: %+v\n", key)
@ -641,7 +661,9 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
// Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards // Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards
return r.client.KillTasks(context.TODO(), key, nil, "") return r.client.KillTasks(context.TODO(), key, nil, "")
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler")
@ -649,7 +671,7 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
return resp, nil return resp, nil
} }
// Sends a create job message to the scheduler with a specific job configuration. // CreateJob sends a create job message to the scheduler with a specific job configuration.
// Although this API is able to create service jobs, it is better to use CreateService instead // Although this API is able to create service jobs, it is better to use CreateService instead
// as that API uses the update thrift call which has a few extra features available. // as that API uses the update thrift call which has a few extra features available.
// Use this API to create ad-hoc jobs. // Use this API to create ad-hoc jobs.
@ -657,19 +679,36 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
r.logger.debugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig()) r.logger.debugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig())
// Response is checked by the thrift retry code
resp, retryErr := r.thriftCallWithRetries( resp, retryErr := r.thriftCallWithRetries(
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.CreateJob(context.TODO(), auroraJob.JobConfig()) return r.client.CreateJob(context.TODO(), auroraJob.JobConfig())
}) },
// On a client timeout, attempt to verify that payload made to the Scheduler by
// trying to get the config summary for the job key
func() (*aurora.Response, bool) {
exists, err := r.jobExists(*auroraJob.JobKey())
if err != nil {
r.logger.Print("verification failed ", err)
}
if exists {
return &aurora.Response{ResponseCode: aurora.ResponseCode_OK}, true
}
return nil, false
},
)
if retryErr != nil { if retryErr != nil {
return resp, errors.Wrap(retryErr, "error sending Create command to Aurora Scheduler") return resp, errors.Wrap(retryErr, "error sending Create command to Aurora Scheduler")
} }
return resp, nil return resp, nil
} }
// This API uses an update thrift call to create the services giving a few more robust features. // CreateService uses the scheduler's updating mechanism to create a job.
func (r *realisClient) CreateService( func (r *realisClient) CreateService(
auroraJob Job, auroraJob Job,
settings *aurora.JobUpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error) { settings *aurora.JobUpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error) {
@ -680,17 +719,12 @@ func (r *realisClient) CreateService(
resp, err := r.StartJobUpdate(update, "") resp, err := r.StartJobUpdate(update, "")
if err != nil { if err != nil {
if IsTimeout(err) { if IsTimeout(err) {
return resp, nil, err return nil, nil, err
} }
return resp, nil, errors.Wrap(err, "unable to create service") return resp, nil, errors.Wrap(err, "unable to create service")
} }
if resp.GetResult_() != nil { return resp, resp.GetResult_().StartJobUpdateResult_, nil
return resp, resp.GetResult_().GetStartJobUpdateResult_(), nil
}
return nil, nil, errors.New("results object is nil")
} }
func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) { func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) {
@ -700,7 +734,9 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.ScheduleCronJob(context.TODO(), auroraJob.JobConfig()) return r.client.ScheduleCronJob(context.TODO(), auroraJob.JobConfig())
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending Cron Job Schedule message to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending Cron Job Schedule message to Aurora Scheduler")
@ -716,7 +752,9 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response,
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.DescheduleCronJob(context.TODO(), key) return r.client.DescheduleCronJob(context.TODO(), key)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending Cron Job De-schedule message to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending Cron Job De-schedule message to Aurora Scheduler")
@ -734,7 +772,9 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.StartCronJob(context.TODO(), key) return r.client.StartCronJob(context.TODO(), key)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending Start Cron Job message to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending Start Cron Job message to Aurora Scheduler")
@ -743,7 +783,7 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error
} }
// Restarts specific instances specified // RestartInstances restarts the specified instances of a Job.
func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) { func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) {
r.logger.debugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instances) r.logger.debugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instances)
@ -751,7 +791,9 @@ func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32)
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.RestartShards(context.TODO(), key, instances) return r.client.RestartShards(context.TODO(), key, instances)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler")
@ -759,12 +801,12 @@ func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32)
return resp, nil return resp, nil
} }
// Restarts all active tasks under a job configuration. // RestartJob restarts all active instances of a Job.
func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) { func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) {
instanceIds, err1 := r.GetInstanceIds(key, aurora.ACTIVE_STATES) instanceIds, err1 := r.GetInstanceIds(key, aurora.ACTIVE_STATES)
if err1 != nil { if err1 != nil {
return nil, errors.Wrap(err1, "Could not retrieve relevant task instance IDs") return nil, errors.Wrap(err1, "could not retrieve relevant task instance IDs")
} }
r.logger.debugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instanceIds) r.logger.debugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instanceIds)
@ -774,7 +816,9 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.RestartShards(context.TODO(), key, instanceIds) return r.client.RestartShards(context.TODO(), key, instanceIds)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler")
@ -786,7 +830,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
return nil, errors.New("No tasks in the Active state") return nil, errors.New("No tasks in the Active state")
} }
// Update all tasks under a job configuration. Currently gorealis doesn't support for canary deployments. // StartJobUpdate updates all instances under a job configuration.
func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) { func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) {
r.logger.debugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message) r.logger.debugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message)
@ -795,20 +839,56 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au
true, true,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.StartJobUpdate(context.TODO(), updateJob.req, message) return r.client.StartJobUpdate(context.TODO(), updateJob.req, message)
}) },
func() (*aurora.Response, bool) {
summariesResp, err := r.readonlyClient.GetJobUpdateSummaries(
context.TODO(),
&aurora.JobUpdateQuery{
JobKey: updateJob.JobKey(),
UpdateStatuses: aurora.ACTIVE_JOB_UPDATE_STATES,
Limit: 1,
})
if err != nil {
r.logger.Print("verification failed ", err)
return nil, false
}
summaries := response.JobUpdateSummaries(summariesResp)
if len(summaries) == 0 {
return nil, false
}
return &aurora.Response{
ResponseCode: aurora.ResponseCode_OK,
Result_: &aurora.Result_{
StartJobUpdateResult_: &aurora.StartJobUpdateResult_{
UpdateSummary: summaries[0],
Key: summaries[0].Key,
},
},
}, true
},
)
if retryErr != nil { if retryErr != nil {
// A timeout took place when attempting this call, attempt to recover // A timeout took place when attempting this call, attempt to recover
if IsTimeout(retryErr) { if IsTimeout(retryErr) {
return resp, retryErr return nil, retryErr
} }
return resp, errors.Wrap(retryErr, "error sending StartJobUpdate command to Aurora Scheduler") return resp, errors.Wrap(retryErr, "error sending StartJobUpdate command to Aurora Scheduler")
} }
if resp.GetResult_() == nil {
return resp, errors.New("no result in response")
}
return resp, nil return resp, nil
} }
// Abort Job Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI. // AbortJobUpdate terminates a job update in the scheduler.
// It requires the updateId which can be obtained on the Aurora web UI.
// This API is meant to be synchronous. It will attempt to wait until the update transitions to the aborted state. // This API is meant to be synchronous. It will attempt to wait until the update transitions to the aborted state.
// However, if the job update does not transition to the ABORT state an error will be returned. // However, if the job update does not transition to the ABORT state an error will be returned.
func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) { func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) {
@ -819,7 +899,9 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.AbortJobUpdate(context.TODO(), &updateKey, message) return r.client.AbortJobUpdate(context.TODO(), &updateKey, message)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending AbortJobUpdate command to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending AbortJobUpdate command to Aurora Scheduler")
@ -836,7 +918,8 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str
return resp, err return resp, err
} }
// Pause Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. // PauseJobUpdate pauses the progress of an ongoing update.
// The UpdateID value needed for this function is returned from StartJobUpdate or can be obtained from the Aurora web UI.
func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) { func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) {
r.logger.debugPrintf("PauseJobUpdate Thrift Payload: %+v %v\n", updateKey, message) r.logger.debugPrintf("PauseJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
@ -845,7 +928,9 @@ func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.PauseJobUpdate(context.TODO(), updateKey, message) return r.client.PauseJobUpdate(context.TODO(), updateKey, message)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending PauseJobUpdate command to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending PauseJobUpdate command to Aurora Scheduler")
@ -854,7 +939,7 @@ func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st
return resp, nil return resp, nil
} }
// Resume Paused Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. // ResumeJobUpdate resumes a previously Paused Job update.
func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) { func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) {
r.logger.debugPrintf("ResumeJobUpdate Thrift Payload: %+v %v\n", updateKey, message) r.logger.debugPrintf("ResumeJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
@ -863,7 +948,9 @@ func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.ResumeJobUpdate(context.TODO(), updateKey, message) return r.client.ResumeJobUpdate(context.TODO(), updateKey, message)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending ResumeJobUpdate command to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending ResumeJobUpdate command to Aurora Scheduler")
@ -872,7 +959,7 @@ func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s
return resp, nil return resp, nil
} }
// Pulse Job Update on Aurora. UpdateID is returned from StartJobUpdate or the Aurora web UI. // PulseJobUpdate sends a pulse to an ongoing Job update.
func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.Response, error) { func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.Response, error) {
r.logger.debugPrintf("PulseJobUpdate Thrift Payload: %+v\n", updateKey) r.logger.debugPrintf("PulseJobUpdate Thrift Payload: %+v\n", updateKey)
@ -881,7 +968,9 @@ func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.PulseJobUpdate(context.TODO(), updateKey) return r.client.PulseJobUpdate(context.TODO(), updateKey)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending PulseJobUpdate command to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending PulseJobUpdate command to Aurora Scheduler")
@ -890,8 +979,7 @@ func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R
return resp, nil return resp, nil
} }
// Scale up the number of instances under a job configuration using the configuration for specific // AddInstances scales up the number of instances for a Job.
// instance to scale up.
func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) { func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) {
r.logger.debugPrintf("AddInstances Thrift Payload: %+v %v\n", instKey, count) r.logger.debugPrintf("AddInstances Thrift Payload: %+v %v\n", instKey, count)
@ -900,7 +988,9 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.AddInstances(context.TODO(), &instKey, count) return r.client.AddInstances(context.TODO(), &instKey, count)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending AddInstances command to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending AddInstances command to Aurora Scheduler")
@ -909,15 +999,15 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a
} }
// Scale down the number of instances under a job configuration using the configuration of a specific instance // RemoveInstances scales down the number of instances for a Job.
func (r *realisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora.Response, error) { func (r *realisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora.Response, error) {
instanceIds, err := r.GetInstanceIds(key, aurora.ACTIVE_STATES) instanceIds, err := r.GetInstanceIds(key, aurora.ACTIVE_STATES)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "RemoveInstances: Could not retrieve relevant instance IDs") return nil, errors.Wrap(err, "could not retrieve relevant instance IDs")
} }
if len(instanceIds) < int(count) { if len(instanceIds) < int(count) {
return nil, errors.Errorf("Insufficient active instances available for killing: "+ return nil, errors.Errorf("insufficient active instances available for killing: "+
" Instances to be killed %d Active instances %d", count, len(instanceIds)) " Instances to be killed %d Active instances %d", count, len(instanceIds))
} }
@ -930,7 +1020,7 @@ func (r *realisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora
return r.KillInstances(key, instanceIds[:count]...) return r.KillInstances(key, instanceIds[:count]...)
} }
// Get information about task including a fully hydrated task configuration object // GetTaskStatus gets information about task including a fully hydrated task configuration object.
func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) { func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) {
r.logger.debugPrintf("GetTasksStatus Thrift Payload: %+v\n", query) r.logger.debugPrintf("GetTasksStatus Thrift Payload: %+v\n", query)
@ -939,7 +1029,9 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.Schedul
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.GetTasksStatus(context.TODO(), query) return r.client.GetTasksStatus(context.TODO(), query)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status") return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status")
@ -948,7 +1040,7 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.Schedul
return response.ScheduleStatusResult(resp).GetTasks(), nil return response.ScheduleStatusResult(resp).GetTasks(), nil
} }
// Get pending reason // GetPendingReason returns the reason why the an instance of a Job has not been scheduled.
func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.PendingReason, error) { func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.PendingReason, error) {
r.logger.debugPrintf("GetPendingReason Thrift Payload: %+v\n", query) r.logger.debugPrintf("GetPendingReason Thrift Payload: %+v\n", query)
@ -957,7 +1049,9 @@ func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.Pend
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.GetPendingReason(context.TODO(), query) return r.client.GetPendingReason(context.TODO(), query)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for pending Reasons") return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for pending Reasons")
@ -972,7 +1066,8 @@ func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.Pend
return pendingReasons, nil return pendingReasons, nil
} }
// Get information about task including without a task configuration object // GetTasksWithoutConfigs gets information about task including without a task configuration object.
// This is a more lightweight version of GetTaskStatus but contains less information as a result.
func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) { func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) {
r.logger.debugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query) r.logger.debugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query)
@ -981,7 +1076,9 @@ func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*auror
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.GetTasksWithoutConfigs(context.TODO(), query) return r.client.GetTasksWithoutConfigs(context.TODO(), query)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status without configs") return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status without configs")
@ -991,7 +1088,7 @@ func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*auror
} }
// Get the task configuration from the aurora scheduler for a job // FetchTaskConfig gets the task configuration from the aurora scheduler for a job.
func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error) { func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error) {
taskQ := &aurora.TaskQuery{ taskQ := &aurora.TaskQuery{
Role: &instKey.JobKey.Role, Role: &instKey.JobKey.Role,
@ -1007,7 +1104,9 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.GetTasksStatus(context.TODO(), taskQ) return r.client.GetTasksStatus(context.TODO(), taskQ)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task configuration") return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task configuration")
@ -1016,7 +1115,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
tasks := response.ScheduleStatusResult(resp).GetTasks() tasks := response.ScheduleStatusResult(resp).GetTasks()
if len(tasks) == 0 { if len(tasks) == 0 {
return nil, errors.Errorf("Instance %d for jobkey %s/%s/%s doesn't exist", return nil, errors.Errorf("instance %d for jobkey %s/%s/%s doesn't exist",
instKey.InstanceId, instKey.InstanceId,
instKey.JobKey.Environment, instKey.JobKey.Environment,
instKey.JobKey.Role, instKey.JobKey.Role,
@ -1035,10 +1134,12 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.GetJobUpdateDetails(context.TODO(), &updateQuery) return r.client.GetJobUpdateDetails(context.TODO(), &updateQuery)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "Unable to get job update details") return nil, errors.Wrap(retryErr, "unable to get job update details")
} }
return resp, nil return resp, nil
@ -1052,7 +1153,9 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.RollbackJobUpdate(context.TODO(), &key, message) return r.client.RollbackJobUpdate(context.TODO(), &key, message)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "unable to roll back job update") return nil, errors.Wrap(retryErr, "unable to roll back job update")

View file

@ -30,7 +30,9 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.DrainHosts(context.TODO(), drainList) return r.adminClient.DrainHosts(context.TODO(), drainList)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return resp, result, errors.Wrap(retryErr, "Unable to recover connection") return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
@ -56,6 +58,18 @@ func (r *realisClient) SLADrainHosts(
return nil, errors.New("no hosts provided to drain") return nil, errors.New("no hosts provided to drain")
} }
if policy == nil || policy.CountSetFieldsSlaPolicy() == 0 {
policy = &defaultSlaPolicy
r.logger.Printf("Warning: start draining with default sla policy %v", policy)
}
if timeout < 0 {
r.logger.Printf("Warning: timeout %d secs is invalid, draining with default timeout %d secs",
timeout,
defaultSlaDrainTimeoutSecs)
timeout = defaultSlaDrainTimeoutSecs
}
drainList := aurora.NewHosts() drainList := aurora.NewHosts()
drainList.HostNames = hosts drainList.HostNames = hosts
@ -65,7 +79,9 @@ func (r *realisClient) SLADrainHosts(
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.SlaDrainHosts(context.TODO(), drainList, policy, timeout) return r.adminClient.SlaDrainHosts(context.TODO(), drainList, policy, timeout)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return result, errors.Wrap(retryErr, "Unable to recover connection") return result, errors.Wrap(retryErr, "Unable to recover connection")
@ -95,7 +111,9 @@ func (r *realisClient) StartMaintenance(hosts ...string) (*aurora.Response, *aur
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.StartMaintenance(context.TODO(), hostList) return r.adminClient.StartMaintenance(context.TODO(), hostList)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return resp, result, errors.Wrap(retryErr, "Unable to recover connection") return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
@ -125,7 +143,9 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.EndMaintenance(context.TODO(), hostList) return r.adminClient.EndMaintenance(context.TODO(), hostList)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return resp, result, errors.Wrap(retryErr, "Unable to recover connection") return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
@ -157,7 +177,9 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.MaintenanceStatus(context.TODO(), hostList) return r.adminClient.MaintenanceStatus(context.TODO(), hostList)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return resp, result, errors.Wrap(retryErr, "Unable to recover connection") return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
@ -182,7 +204,9 @@ func (r *realisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.SetQuota(context.TODO(), role, quota) return r.adminClient.SetQuota(context.TODO(), role, quota)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return resp, errors.Wrap(retryErr, "Unable to set role quota") return resp, errors.Wrap(retryErr, "Unable to set role quota")
@ -198,7 +222,9 @@ func (r *realisClient) GetQuota(role string) (*aurora.Response, error) {
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.GetQuota(context.TODO(), role) return r.adminClient.GetQuota(context.TODO(), role)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return resp, errors.Wrap(retryErr, "Unable to get role quota") return resp, errors.Wrap(retryErr, "Unable to get role quota")
@ -213,7 +239,9 @@ func (r *realisClient) Snapshot() error {
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.Snapshot(context.TODO()) return r.adminClient.Snapshot(context.TODO())
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return errors.Wrap(retryErr, "Unable to recover connection") return errors.Wrap(retryErr, "Unable to recover connection")
@ -229,7 +257,9 @@ func (r *realisClient) PerformBackup() error {
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.PerformBackup(context.TODO()) return r.adminClient.PerformBackup(context.TODO())
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return errors.Wrap(retryErr, "Unable to recover connection") return errors.Wrap(retryErr, "Unable to recover connection")
@ -244,7 +274,9 @@ func (r *realisClient) ForceImplicitTaskReconciliation() error {
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.TriggerImplicitTaskReconciliation(context.TODO()) return r.adminClient.TriggerImplicitTaskReconciliation(context.TODO())
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return errors.Wrap(retryErr, "Unable to recover connection") return errors.Wrap(retryErr, "Unable to recover connection")
@ -265,7 +297,9 @@ func (r *realisClient) ForceExplicitTaskReconciliation(batchSize *int32) error {
_, retryErr := r.thriftCallWithRetries(false, _, retryErr := r.thriftCallWithRetries(false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.TriggerExplicitTaskReconciliation(context.TODO(), settings) return r.adminClient.TriggerExplicitTaskReconciliation(context.TODO(), settings)
}) },
nil,
)
if retryErr != nil { if retryErr != nil {
return errors.Wrap(retryErr, "Unable to recover connection") return errors.Wrap(retryErr, "Unable to recover connection")

View file

@ -306,6 +306,40 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) {
_, err = r.KillJob(job.JobKey()) _, err = r.KillJob(job.JobKey())
assert.NoError(t, err) assert.NoError(t, err)
}) })
t.Run("Duplicate_constraints", func(t *testing.T) {
job.Name("thermos_duplicate_constraints").
AddValueConstraint("zone", false, "east", "west").
AddValueConstraint("zone", false, "east").
AddValueConstraint("zone", true, "west")
_, err := r.CreateJob(job)
require.NoError(t, err)
success, err := monitor.Instances(job.JobKey(), 2, 1, 50)
assert.True(t, success)
assert.NoError(t, err)
_, err = r.KillJob(job.JobKey())
assert.NoError(t, err)
})
t.Run("Overwrite_constraints", func(t *testing.T) {
job.Name("thermos_overwrite_constraints").
AddLimitConstraint("zone", 1).
AddValueConstraint("zone", true, "west", "east").
AddLimitConstraint("zone", 2)
_, err := r.CreateJob(job)
require.NoError(t, err)
success, err := monitor.Instances(job.JobKey(), 2, 1, 50)
assert.True(t, success)
assert.NoError(t, err)
_, err = r.KillJob(job.JobKey())
assert.NoError(t, err)
})
} }
// Test configuring an executor that doesn't exist for CreateJob API // Test configuring an executor that doesn't exist for CreateJob API
@ -505,7 +539,7 @@ func TestRealisClient_CreateService(t *testing.T) {
timeoutClient, err := realis.NewRealisClient( timeoutClient, err := realis.NewRealisClient(
realis.SchedulerUrl(auroraURL), realis.SchedulerUrl(auroraURL),
realis.BasicAuth("aurora", "secret"), realis.BasicAuth("aurora", "secret"),
realis.TimeoutMS(10), realis.TimeoutMS(5),
) )
require.NoError(t, err) require.NoError(t, err)
defer timeoutClient.Close() defer timeoutClient.Close()
@ -716,6 +750,53 @@ func TestRealisClient_SLADrainHosts(t *testing.T) {
5, 5,
10) 10)
assert.NoError(t, err) assert.NoError(t, err)
// slaDrainHosts goes with default policy if no policy is specified
_, err = r.SLADrainHosts(nil, 30, hosts...)
require.NoError(t, err, "unable to drain host with SLA policy")
// Monitor change to DRAINING and DRAINED mode
hostResults, err = monitor.HostMaintenance(
hosts,
[]aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING},
1,
50)
assert.NoError(t, err)
assert.Equal(t, map[string]bool{"localhost": true}, hostResults)
_, _, err = r.EndMaintenance(hosts...)
require.NoError(t, err)
// Monitor change to DRAINING and DRAINED mode
_, err = monitor.HostMaintenance(
hosts,
[]aurora.MaintenanceMode{aurora.MaintenanceMode_NONE},
5,
10)
assert.NoError(t, err)
_, err = r.SLADrainHosts(&aurora.SlaPolicy{}, 30, hosts...)
require.NoError(t, err, "unable to drain host with SLA policy")
// Monitor change to DRAINING and DRAINED mode
hostResults, err = monitor.HostMaintenance(
hosts,
[]aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING},
1,
50)
assert.NoError(t, err)
assert.Equal(t, map[string]bool{"localhost": true}, hostResults)
_, _, err = r.EndMaintenance(hosts...)
require.NoError(t, err)
// Monitor change to DRAINING and DRAINED mode
_, err = monitor.HostMaintenance(
hosts,
[]aurora.MaintenanceMode{aurora.MaintenanceMode_NONE},
5,
10)
assert.NoError(t, err)
} }
// Test multiple go routines using a single connection // Test multiple go routines using a single connection
@ -1025,8 +1106,10 @@ func TestRealisClient_BatchAwareAutoPause(t *testing.T) {
assert.Equal(t, i, curStep) assert.Equal(t, i, curStep)
_, err = r.ResumeJobUpdate(&key, "auto resuming test") if i != len(updateGroups)-1 {
require.NoError(t, err) _, err = r.ResumeJobUpdate(&key, "auto resuming test")
require.NoError(t, err)
}
} }
_, err = r.KillJob(job.JobKey()) _, err = r.KillJob(job.JobKey())

View file

@ -36,6 +36,10 @@ func ScheduleStatusResult(resp *aurora.Response) *aurora.ScheduleStatusResult_ {
} }
func JobUpdateSummaries(resp *aurora.Response) []*aurora.JobUpdateSummary { func JobUpdateSummaries(resp *aurora.Response) []*aurora.JobUpdateSummary {
if resp.GetResult_() == nil || resp.GetResult_().GetGetJobUpdateSummariesResult_() == nil {
return nil
}
return resp.GetResult_().GetGetJobUpdateSummariesResult_().GetUpdateSummaries() return resp.GetResult_().GetGetJobUpdateSummariesResult_().GetUpdateSummaries()
} }

169
retry.go
View file

@ -114,10 +114,19 @@ func ExponentialBackoff(backoff Backoff, logger logger, condition ConditionFunc)
type auroraThriftCall func() (resp *aurora.Response, err error) type auroraThriftCall func() (resp *aurora.Response, err error)
// verifyOntimeout defines the type of function that will be used to verify whether a Thirft call to the Scheduler
// made it to the scheduler or not. In general, these types of functions will have to interact with the scheduler
// through the very same Thrift API which previously encountered a time out from the client.
// This means that the functions themselves should be kept to a minimum number of Thrift calls.
// It should also be noted that this is a best effort mechanism and
// is likely to fail for the same reasons that the original call failed.
type verifyOnTimeout func() (*aurora.Response, bool)
// Duplicates the functionality of ExponentialBackoff but is specifically targeted towards ThriftCalls. // Duplicates the functionality of ExponentialBackoff but is specifically targeted towards ThriftCalls.
func (r *realisClient) thriftCallWithRetries( func (r *realisClient) thriftCallWithRetries(
returnOnTimeout bool, returnOnTimeout bool,
thriftCall auroraThriftCall) (*aurora.Response, error) { thriftCall auroraThriftCall,
verifyOnTimeout verifyOnTimeout) (*aurora.Response, error) {
var resp *aurora.Response var resp *aurora.Response
var clientErr error var clientErr error
@ -137,7 +146,7 @@ func (r *realisClient) thriftCallWithRetries(
} }
r.logger.Printf( r.logger.Printf(
"A retryable error occurred during thrift call, backing off for %v before retry %v\n", "A retryable error occurred during thrift call, backing off for %v before retry %v",
adjusted, adjusted,
curStep) curStep)
@ -154,45 +163,25 @@ func (r *realisClient) thriftCallWithRetries(
resp, clientErr = thriftCall() resp, clientErr = thriftCall()
r.logger.tracePrintf("Aurora Thrift Call ended resp: %v clientErr: %v\n", resp, clientErr) r.logger.tracePrintf("Aurora Thrift Call ended resp: %v clientErr: %v", resp, clientErr)
}() }()
// Check if our thrift call is returning an error. This is a retryable event as we don't know // Check if our thrift call is returning an error.
// if it was caused by network issues.
if clientErr != nil { if clientErr != nil {
// Print out the error to the user // Print out the error to the user
r.logger.Printf("Client Error: %v\n", clientErr) r.logger.Printf("Client Error: %v", clientErr)
// Determine if error is a temporary URL error by going up the stack temporary, timedout := isConnectionError(clientErr)
e, ok := clientErr.(thrift.TTransportException) if !temporary && r.RealisConfig().failOnPermanentErrors {
if ok { return nil, errors.Wrap(clientErr, "permanent connection error")
r.logger.debugPrint("Encountered a transport exception") }
e, ok := e.Err().(*url.Error) // There exists a corner case where thrift payload was received by Aurora but
if ok { // connection timed out before Aurora was able to reply.
// Users can take special action on a timeout by using IsTimedout and reacting accordingly
// EOF error occurs when the server closes the read buffer of the client. This is common // if they have configured the client to return on a timeout.
// when the server is overloaded and should be retried. All other errors that are permanent if timedout && returnOnTimeout {
// will not be retried. return resp, newTimedoutError(errors.New("client connection closed before server answer"))
if e.Err != io.EOF && !e.Temporary() && r.RealisConfig().failOnPermanentErrors {
return nil, errors.Wrap(clientErr, "permanent connection error")
}
// Corner case where thrift payload was received by Aurora but connection timedout before Aurora was
// able to reply. In this case we will return whatever response was received and a TimedOut behaving
// error. Users can take special action on a timeout by using IsTimedout and reacting accordingly.
if e.Timeout() {
timeouts++
r.logger.debugPrintf(
"Client closed connection (timedout) %d times before server responded, "+
"consider increasing connection timeout",
timeouts)
if returnOnTimeout {
return resp, newTimedoutError(errors.New("client connection closed before server answer"))
}
}
}
} }
// In the future, reestablish connection should be able to check if it is actually possible // In the future, reestablish connection should be able to check if it is actually possible
@ -202,48 +191,71 @@ func (r *realisClient) thriftCallWithRetries(
if reestablishErr != nil { if reestablishErr != nil {
r.logger.debugPrintf("error re-establishing connection ", reestablishErr) r.logger.debugPrintf("error re-establishing connection ", reestablishErr)
} }
} else {
// If there was no client error, but the response is nil, something went wrong. // If users did not opt for a return on timeout in order to react to a timedout error,
// Ideally, we'll never encounter this but we're placing a safeguard here. // attempt to verify that the call made it to the scheduler after the connection was re-established.
if resp == nil { if timedout {
return nil, errors.New("response from aurora is nil") timeouts++
r.logger.debugPrintf(
"Client closed connection %d times before server responded, "+
"consider increasing connection timeout",
timeouts)
// Allow caller to provide a function which checks if the original call was successful before
// it timed out.
if verifyOnTimeout != nil {
if verifyResp, ok := verifyOnTimeout(); ok {
r.logger.Print("verified that the call went through successfully after a client timeout")
// Response here might be different than the original as it is no longer constructed
// by the scheduler but mimicked.
// This is OK since the scheduler is very unlikely to change responses at this point in its
// development cycle but we must be careful to not return an incorrectly constructed response.
return verifyResp, nil
}
}
} }
// Check Response Code from thrift and make a decision to continue retrying or not. // Retry the thrift payload
switch responseCode := resp.GetResponseCode(); responseCode { continue
}
// If the thrift call succeeded, stop retrying // If there was no client error, but the response is nil, something went wrong.
case aurora.ResponseCode_OK: // Ideally, we'll never encounter this but we're placing a safeguard here.
return resp, nil if resp == nil {
return nil, errors.New("response from aurora is nil")
}
// If the response code is transient, continue retrying // Check Response Code from thrift and make a decision to continue retrying or not.
case aurora.ResponseCode_ERROR_TRANSIENT: switch responseCode := resp.GetResponseCode(); responseCode {
r.logger.Println("Aurora replied with Transient error code, retrying")
continue
// Failure scenarios, these indicate a bad payload or a bad config. Stop retrying. // If the thrift call succeeded, stop retrying
case aurora.ResponseCode_INVALID_REQUEST, case aurora.ResponseCode_OK:
aurora.ResponseCode_ERROR, return resp, nil
aurora.ResponseCode_AUTH_FAILED,
aurora.ResponseCode_JOB_UPDATING_ERROR:
r.logger.Printf("Terminal Response Code %v from Aurora, won't retry\n", resp.GetResponseCode().String())
return resp, errors.New(response.CombineMessage(resp))
// The only case that should fall down to here is a WARNING response code. // If the response code is transient, continue retrying
// It is currently not used as a response in the scheduler so it is unknown how to handle it. case aurora.ResponseCode_ERROR_TRANSIENT:
default: r.logger.Println("Aurora replied with Transient error code, retrying")
r.logger.debugPrintf("unhandled response code %v received from Aurora\n", responseCode) continue
return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String())
} // Failure scenarios, these indicate a bad payload or a bad config. Stop retrying.
case aurora.ResponseCode_INVALID_REQUEST,
aurora.ResponseCode_ERROR,
aurora.ResponseCode_AUTH_FAILED,
aurora.ResponseCode_JOB_UPDATING_ERROR:
r.logger.Printf("Terminal Response Code %v from Aurora, won't retry\n", resp.GetResponseCode().String())
return resp, errors.New(response.CombineMessage(resp))
// The only case that should fall down to here is a WARNING response code.
// It is currently not used as a response in the scheduler so it is unknown how to handle it.
default:
r.logger.debugPrintf("unhandled response code %v received from Aurora\n", responseCode)
return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String())
} }
} }
r.logger.debugPrintf("it took %v retries to complete this operation\n", curStep)
if curStep > 1 { if curStep > 1 {
r.config.logger.Printf("retried this thrift call %d time(s)", curStep) r.config.logger.Printf("this thrift call was retried %d time(s)", curStep)
} }
// Provide more information to the user wherever possible. // Provide more information to the user wherever possible.
@ -253,3 +265,30 @@ func (r *realisClient) thriftCallWithRetries(
return nil, newRetryError(errors.New("ran out of retries"), curStep) return nil, newRetryError(errors.New("ran out of retries"), curStep)
} }
// isConnectionError processes the error received by the client.
// The return values indicate weather this was determined to be a temporary error
// and weather it was determined to be a timeout error
func isConnectionError(err error) (bool, bool) {
// Determine if error is a temporary URL error by going up the stack
transportException, ok := err.(thrift.TTransportException)
if !ok {
return false, false
}
urlError, ok := transportException.Err().(*url.Error)
if !ok {
return false, false
}
// EOF error occurs when the server closes the read buffer of the client. This is common
// when the server is overloaded and we consider it temporary.
// All other which are not temporary as per the member function Temporary(),
// are considered not temporary (permanent).
if urlError.Err != io.EOF && !urlError.Temporary() {
return false, false
}
return true, urlError.Timeout()
}

View file

@ -1,4 +1,4 @@
#!/bin/bash #!/bin/bash
# Since we run our docker compose setup in bridge mode to be able to run on MacOS, we have to launch a Docker container within the bridge network in order to avoid any routing issues. # Since we run our docker compose setup in bridge mode to be able to run on MacOS, we have to launch a Docker container within the bridge network in order to avoid any routing issues.
docker run --rm -t -v $(pwd):/go/src/github.com/paypal/gorealis --network gorealis_aurora_cluster golang:1.10-stretch go test -v github.com/paypal/gorealis $@ docker run --rm -t -w /gorealis -v $GOPATH/pkg:/go/pkg -v $(pwd):/gorealis --network gorealis_aurora_cluster golang:1.16-buster go test -v github.com/paypal/gorealis $@

51
util.go
View file

@ -1,7 +1,11 @@
package realis package realis
import ( import (
"crypto/x509"
"io/ioutil"
"net/url" "net/url"
"os"
"path/filepath"
"strings" "strings"
"github.com/paypal/gorealis/gen-go/apache/aurora" "github.com/paypal/gorealis/gen-go/apache/aurora"
@ -25,7 +29,7 @@ var TerminalStates = make(map[aurora.ScheduleStatus]bool)
// ActiveJobUpdateStates - States a Job Update may be in where it is considered active. // ActiveJobUpdateStates - States a Job Update may be in where it is considered active.
var ActiveJobUpdateStates = make(map[aurora.JobUpdateStatus]bool) var ActiveJobUpdateStates = make(map[aurora.JobUpdateStatus]bool)
// TerminalJobUpdateStates returns a slice containing all the terminal states an update may end up in. // TerminalUpdateStates returns a slice containing all the terminal states an update may be in.
// This is a function in order to avoid having a slice that can be accidentally mutated. // This is a function in order to avoid having a slice that can be accidentally mutated.
func TerminalUpdateStates() []aurora.JobUpdateStatus { func TerminalUpdateStates() []aurora.JobUpdateStatus {
return []aurora.JobUpdateStatus{ return []aurora.JobUpdateStatus{
@ -65,6 +69,49 @@ func init() {
} }
} }
// createCertPool will attempt to load certificates into a certificate pool from a given directory.
// Only files with an extension contained in the extension map are considered.
// This function ignores any files that cannot be read successfully or cannot be added to the certPool
// successfully.
func createCertPool(path string, extensions map[string]struct{}) (*x509.CertPool, error) {
_, err := os.Stat(path)
if err != nil {
return nil, errors.Wrap(err, "unable to load certificates")
}
caFiles, err := ioutil.ReadDir(path)
if err != nil {
return nil, err
}
certPool := x509.NewCertPool()
loadedCerts := 0
for _, cert := range caFiles {
// Skip directories
if cert.IsDir() {
continue
}
// Skip any files that do not contain the right extension
if _, ok := extensions[filepath.Ext(cert.Name())]; !ok {
continue
}
pem, err := ioutil.ReadFile(filepath.Join(path, cert.Name()))
if err != nil {
continue
}
if certPool.AppendCertsFromPEM(pem) {
loadedCerts++
}
}
if loadedCerts == 0 {
return nil, errors.New("no certificates were able to be successfully loaded")
}
return certPool, nil
}
func validateAuroraURL(location string) (string, error) { func validateAuroraURL(location string) (string, error) {
// If no protocol defined, assume http // If no protocol defined, assume http
@ -92,7 +139,7 @@ func validateAuroraURL(location string) (string, error) {
return "", errors.Errorf("only protocols http and https are supported %v\n", u.Scheme) return "", errors.Errorf("only protocols http and https are supported %v\n", u.Scheme)
} }
// This could theoretically be elsewhwere but we'll be strict for the sake of simplicty // This could theoretically be elsewhere but we'll be strict for the sake of simplicity
if u.Path != apiPath { if u.Path != apiPath {
return "", errors.Errorf("expected /api path %v\n", u.Path) return "", errors.Errorf("expected /api path %v\n", u.Path)
} }

View file

@ -100,3 +100,15 @@ func TestCurrentBatchCalculator(t *testing.T) {
assert.Equal(t, 0, curBatch) assert.Equal(t, 0, curBatch)
}) })
} }
func TestCertPoolCreator(t *testing.T) {
extensions := map[string]struct{}{".crt": {}}
_, err := createCertPool("examples/certs", extensions)
assert.NoError(t, err)
t.Run("badDir", func(t *testing.T) {
_, err := createCertPool("idontexist", extensions)
assert.Error(t, err)
})
}