Compare commits

...

34 commits

Author SHA1 Message Date
Tan N. Le
db10285368
Update CHANGELOG.md 2022-10-12 21:47:07 -07:00
shivrsrivastava
8454a6ebf3
Adding priority to the task (#140) 2022-10-12 21:46:07 -07:00
Tan N. Le
c318042e96
release 1.24.0 (#139) 2021-11-09 09:00:35 -08:00
Tan N. Le
db9bebb802
enable default sla for slaDrain (#138) 2021-11-01 18:17:49 -07:00
Renán I. Del Valle
fff2c16751
Enabling code analysis 2021-09-01 10:09:11 -07:00
Renán I. Del Valle
c59d01ab51
Changes Travis CI badge to Github Actions badge (#137) 2021-08-06 18:16:06 -07:00
Renán I. Del Valle
62df98a3c8
Bug fix for auto paused update monitor (#136)
Returns success if the update has finished updating successfully.
2021-08-06 16:02:52 -07:00
Renán I. Del Valle
5c39a23eb2
Enable Github Actions for PRs
Run CI on pull requests and when the branch is pushed.
2021-08-06 15:52:46 -07:00
Renán I. Del Valle
dbc396b0db
Disables Travis CI (#135)
Travis CI is no longer needed as we have migrated to Github Actions
2021-08-06 10:58:34 -07:00
Renán I. Del Valle
86eb045808
Adds go.sum and removes dep files (#134) 2021-08-06 10:57:45 -07:00
Renán I. Del Valle
c7e309f421
Actions fix (#133)
* Moving main.yml to the right place.
2021-08-06 10:06:12 -07:00
Renán I. Del Valle
49877b7d41
Adds support for running CI on github actions. (#132) 2021-08-06 10:00:57 -07:00
Renán I. Del Valle
82b40a53f0
Add verification to retry mechanism (#131)
CreateJob, CreateService, and StartJobUpdate now include a rudimentary verification function to check if the call made it to the Aurora Scheduler when the client experiences a timeout.
2021-05-11 13:37:23 -07:00
Renán I. Del Valle
a9d99067ee
Documentation fix (#130)
Fixes documentation so that it is more compliant with godoc format.
2021-04-29 10:48:43 -07:00
Renán Del Valle
e7f9c0cba9 Bumping up version to 1.23.1 2021-02-25 17:59:02 -08:00
Renán Del Valle
fbaf218dfb Preparing release notes for 1.23.0 2021-02-25 17:59:02 -08:00
Renán I. Del Valle
6a1cf25c87
Upgrading Mesos to 1.7.2 and Aurora Scheduler to 0.23.0 (#128) 2021-02-25 17:55:42 -08:00
Renán Del Valle
4aaec87d32 Bumping up version to 1.23.0 2021-02-25 17:34:48 -08:00
Renán Del Valle
895d810b6c Releasing Version 1.22.5 2021-02-25 17:34:48 -08:00
Renán I. Del Valle
511e76a0ac
Upgrading to Thrift 0.14.0 (#126)
Upgrading thrift to 0.14.0 in order to pick up bug fixes, including the fix for trying to write to closed connections.
2021-02-25 16:37:46 -08:00
Renan DelValle
8be66da88a
Bumping up go version to 1.15 and removing v2 tests from Travis CI config file. 2020-09-28 11:13:29 -07:00
Renan DelValle
6d20f347f7
Update latest version gorealis has been tested against. 2020-09-28 10:32:36 -07:00
Renan DelValle
74b12c36b1
Adding version to README and changing badge to point to the right place. 2020-09-28 10:32:13 -07:00
Renan DelValle
269e0208c1
Change travis CI configuration to use main branch. 2020-09-28 10:30:48 -07:00
Suchith Arodi
4acb0d54a9
return response object in case of noop update (#125)
Return response object such that end user can look into what went wrong when the response is nil.

Cases like this occur when there is a no-op update.
2020-07-29 13:30:56 -07:00
Renán I. Del Valle
5f667555dc
Bumping up CI build to go 1.14 (#124)
Bumping up Travis CI build to go 1.14 as well as increasing the timeout for go tests.
2020-05-27 16:04:07 -07:00
Renán I. Del Valle
2b6025e67d
Checking previously ignored error which caused issues. (#123)
Error in monitor was going unchecked which caused some issues when the monitor timed out.
2020-05-27 12:45:53 -07:00
Renán I. Del Valle
5ec22fab98
Restoring location of r.Close() in retry mechanism since the move created a deadlock. (#122)
Moving the r.Close() call in the retry mechanism created a deadlock since r.Close() also uses the client lock to avoid multiple routines closing at the same time.

This commit reverts that change.
2020-05-27 12:36:52 -07:00
Renan DelValle
f196aa9ed7 Fixing some cosmetic issues and a potential race condition. 2020-05-26 20:40:09 -07:00
Renan DelValle
bb5408f5e2 Bumping up Thrift Version to v0.13.2 forked as v0.13.1 contains a bug. 2020-05-26 20:40:09 -07:00
Renán I. Del Valle
ea8e48f3b8
Allow users to define what extensions CA certs will have (#120)
* Allow users to define what extensions CA certs will have. Skip any files that don't have the right extension.
2020-02-26 08:24:41 -08:00
Renán I. Del Valle
3dc3b09a8e
Point to temporary Thrift fork while we wait for 0.14.0 to be released (#118)
* Updating readme to reflect changes made to the Aurora Scheduler project.

* Changing dependency of mod to point to forked version of the Thrift library while 0.14.0 is released.
2020-02-18 14:18:13 -08:00
Renan I. Del Valle
3fa2a20fe4
Thrift Upgrade to v0.13.0 (#117)
* Removing go.sum file as it's no longer required as of go1.13.

* Removing uncessary client command.

* Bumping up thrift version to v0.13.0
2020-02-12 12:31:56 -08:00
Renan I. Del Valle
c6a2a23ddb
Changing how constraints are handled internally (#115)
* Updating Changelog to reflect what's changing in 1.22.1

* Bug fix: Setting the same constraint multiple times is no longer allowed.

* Constraints map has been added to handle constraints being added to Aurora Jobs.

* Lowering timeout to avoid flaky test for bad payload timeout.

* Adding attributes to Mesos agents in order to test limits by constraint.



* Make two instances schedulable per zone in order to experience flaky behavior.
2020-01-15 08:21:12 -08:00
31 changed files with 11184 additions and 7704 deletions

View file

@ -1 +1 @@
0.22.0
0.23.0

25
.github/main.yml vendored Normal file
View file

@ -0,0 +1,25 @@
name: CI
on: [push]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Setup Go for use with actions
uses: actions/setup-go@v2
with:
go-version: 1.16
- name: Install goimports
run: go get golang.org/x/tools/cmd/goimports
- name: Set env with list of directories in repo containin go code
run: echo GO_USR_DIRS=$(go list -f {{.Dir}} ./... | grep -E -v "/gen-go/|/vendor/") >> $GITHUB_ENV
- name: Run goimports check
run: test -z "`for d in $GO_USR_DIRS; do goimports -d $d/*.go | tee /dev/stderr; done`"
- name: Create aurora/mesos docker cluster
run: docker-compose up -d
- name: Run tests
run: go test -timeout 35m -race -coverprofile=coverage.txt -covermode=atomic -v github.com/paypal/gorealis

57
.github/workflows/codeql-analysis.yml vendored Normal file
View file

@ -0,0 +1,57 @@
# For most projects, this workflow file will not need changing; you simply need
# to commit it to your repository.
#
# You may wish to alter this file to override the set of languages analyzed,
# or to provide custom queries or build logic.
#
# ******** NOTE ********
# We have attempted to detect the languages in your repository. Please check
# the `language` matrix defined below to confirm you have the correct set of
# supported CodeQL languages.
#
name: "CodeQL"
on:
push:
branches: [ main ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ main ]
schedule:
- cron: '34 4 * * 3'
jobs:
analyze:
name: Analyze
runs-on: ubuntu-latest
permissions:
actions: read
contents: read
security-events: write
strategy:
fail-fast: false
matrix:
language: [ 'go' ]
# CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python' ]
# Learn more:
# https://docs.github.com/en/free-pro-team@latest/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#changing-the-languages-that-are-analyzed
steps:
- name: Checkout repository
uses: actions/checkout@v2
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v1
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
# queries: ./path/to/local/query, your-org/your-repo/queries@main
- run: go build examples/client.go
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v1

30
.github/workflows/main.yml vendored Normal file
View file

@ -0,0 +1,30 @@
name: CI
on:
push:
branches:
- main
pull_request:
branches:
- main
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Setup Go for use with actions
uses: actions/setup-go@v2
with:
go-version: 1.16
- name: Install goimports
run: go get golang.org/x/tools/cmd/goimports
- name: Set env with list of directories in repo containin go code
run: echo GO_USR_DIRS=$(go list -f {{.Dir}} ./... | grep -E -v "/gen-go/|/vendor/") >> $GITHUB_ENV
- name: Run goimports check
run: test -z "`for d in $GO_USR_DIRS; do goimports -d $d/*.go | tee /dev/stderr; done`"
- name: Create aurora/mesos docker cluster
run: docker-compose up -d
- name: Run tests
run: go test -timeout 35m -race -coverprofile=coverage.txt -covermode=atomic -v github.com/paypal/gorealis

View file

@ -1,33 +0,0 @@
sudo: required
dist: xenial
language: go
branches:
only:
- master
- master-v2.0
- future
go:
- "1.10.x"
env:
global:
- GO_USR_DIRS=$(go list -f {{.Dir}} ./... | grep -E -v "/gen-go/|/vendor/")
services:
- docker
before_install:
- go get golang.org/x/tools/cmd/goimports
- test -z "`for d in $GO_USR_DIRS; do goimports -d $d/*.go | tee /dev/stderr; done`"
install:
- docker-compose up -d
script:
- go test -race -coverprofile=coverage.txt -covermode=atomic -v github.com/paypal/gorealis
after_success:
- bash <(curl -s https://codecov.io/bash)

View file

@ -1,4 +1,41 @@
1.22.0 (unreleased)
1.25.1 (unreleased)
1.25.0
* Add priority api
1.24.0
* enable default sla for slaDrain
* Changes Travis CI badge to Github Actions badge
* Bug fix for auto paused update monitor
* Adds support for running CI on github actions
1.23.0
* First release tested against Aurora Scheduler 0.23.0
1.22.5
* Upgrading to thrift 0.14.0
1.22.4
* Updates which result in a no-op now return a response value so that the caller may analyze it to determine what happened
1.22.3
* Contains a monitor timeout fix. Previously an error was being left unchecked which made a specific monitor timining out not be handled properly.
1.22.2
* Bug fix: Change in retry mechanism created a deadlock. This release reverts that particular change.
1.22.1
* Adding safeguards against setting multiple constraints with the same name for a single task.
1.22.0
* CreateService and StartJobUpdate do not continue retrying if a timeout has been encountered
by the HTTP client. Instead they now return an error that conforms to the Timedout interface.

64
Gopkg.lock generated
View file

@ -1,64 +0,0 @@
# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'.
[[projects]]
digest = "1:89696c38cec777120b8b1bb5e2d363d655cf2e1e7d8c851919aaa0fd576d9b86"
name = "github.com/apache/thrift"
packages = ["lib/go/thrift"]
pruneopts = ""
revision = "384647d290e2e4a55a14b1b7ef1b7e66293a2c33"
version = "v0.12.0"
[[projects]]
digest = "1:56c130d885a4aacae1dd9c7b71cfe39912c7ebc1ff7d2b46083c8812996dc43b"
name = "github.com/davecgh/go-spew"
packages = ["spew"]
pruneopts = ""
revision = "346938d642f2ec3594ed81d874461961cd0faa76"
version = "v1.1.0"
[[projects]]
digest = "1:df48fb76fb2a40edea0c9b3d960bc95e326660d82ff1114e1f88001f7a236b40"
name = "github.com/pkg/errors"
packages = ["."]
pruneopts = ""
revision = "e881fd58d78e04cf6d0de1217f8707c8cc2249bc"
[[projects]]
digest = "1:256484dbbcd271f9ecebc6795b2df8cad4c458dd0f5fd82a8c2fa0c29f233411"
name = "github.com/pmezard/go-difflib"
packages = ["difflib"]
pruneopts = ""
revision = "792786c7400a136282c1664665ae0a8db921c6c2"
version = "v1.0.0"
[[projects]]
digest = "1:78bea5e26e82826dacc5fd64a1013a6711b7075ec8072819b89e6ad76cb8196d"
name = "github.com/samuel/go-zookeeper"
packages = ["zk"]
pruneopts = ""
revision = "471cd4e61d7a78ece1791fa5faa0345dc8c7d5a5"
[[projects]]
digest = "1:381bcbeb112a51493d9d998bbba207a529c73dbb49b3fd789e48c63fac1f192c"
name = "github.com/stretchr/testify"
packages = [
"assert",
"require",
]
pruneopts = ""
revision = "ffdc059bfe9ce6a4e144ba849dbedead332c6053"
version = "v1.3.0"
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
input-imports = [
"github.com/apache/thrift/lib/go/thrift",
"github.com/pkg/errors",
"github.com/samuel/go-zookeeper/zk",
"github.com/stretchr/testify/assert",
"github.com/stretchr/testify/require",
]
solver-name = "gps-cdcl"
solver-version = 1

View file

@ -1,16 +0,0 @@
[[constraint]]
name = "github.com/apache/thrift"
version = "0.12.0"
[[constraint]]
name = "github.com/pkg/errors"
revision = "e881fd58d78e04cf6d0de1217f8707c8cc2249bc"
[[constraint]]
name = "github.com/samuel/go-zookeeper"
revision = "471cd4e61d7a78ece1791fa5faa0345dc8c7d5a5"
[[constraint]]
name = "github.com/stretchr/testify"
version = "1.3.0"

View file

@ -1,6 +1,8 @@
# gorealis [![GoDoc](https://godoc.org/github.com/paypal/gorealis?status.svg)](https://godoc.org/github.com/paypal/gorealis) [![Build Status](https://travis-ci.org/paypal/gorealis.svg?branch=master)](https://travis-ci.org/paypal/gorealis) [![codecov](https://codecov.io/gh/paypal/gorealis/branch/master/graph/badge.svg)](https://codecov.io/gh/paypal/gorealis)
# gorealis [![GoDoc](https://godoc.org/github.com/paypal/gorealis?status.svg)](https://godoc.org/github.com/paypal/gorealis) ![CI Build Status](https://github.com/paypal/gorealis/actions/workflows/main.yml/badge.svg) [![codecov](https://codecov.io/gh/paypal/gorealis/branch/main/graph/badge.svg)](https://codecov.io/gh/paypal/gorealis)
Go library for interacting with [Apache Aurora](https://github.com/apache/aurora).
Version 1 of Go library for interacting with [Aurora Scheduler](https://github.com/aurora-scheduler/aurora).
Version 2 of this library can be found [here](https://github.com/aurora-scheduler/gorealis).
### Aurora version compatibility
Please see [.auroraversion](./.auroraversion) to see the latest Aurora version against which this
@ -14,7 +16,7 @@ library has been tested.
## Projects using gorealis
* [australis](https://github.com/rdelval/australis)
* [australis](https://github.com/aurora-scheduler/australis)
## Contributions
Contributions are always welcome. Please raise an issue to discuss a contribution before it is made.

View file

@ -14,7 +14,7 @@ services:
ipv4_address: 192.168.33.2
master:
image: rdelvalle/mesos-master:1.6.2
image: aurorascheduler/mesos-master:1.7.2
restart: on-failure
ports:
- "5050:5050"
@ -32,12 +32,13 @@ services:
- zk
agent-one:
image: rdelvalle/mesos-agent:1.6.2
image: aurorascheduler/mesos-agent:1.7.2
pid: host
restart: on-failure
ports:
- "5051:5051"
environment:
MESOS_ATTRIBUTES: 'zone:west'
MESOS_MASTER: zk://192.168.33.2:2181/mesos
MESOS_CONTAINERIZERS: docker,mesos
MESOS_PORT: 5051
@ -56,12 +57,13 @@ services:
- zk
agent-two:
image: rdelvalle/mesos-agent:1.6.2
image: aurorascheduler/mesos-agent:1.7.2
pid: host
restart: on-failure
ports:
- "5061:5061"
environment:
MESOS_ATTRIBUTES: 'zone:east'
MESOS_MASTER: zk://192.168.33.2:2181/mesos
MESOS_CONTAINERIZERS: docker,mesos
MESOS_HOSTNAME: localhost
@ -80,7 +82,7 @@ services:
- zk
aurora-one:
image: rdelvalle/aurora:0.22.0
image: aurorascheduler/scheduler:0.23.0
pid: host
ports:
- "8081:8081"

View file

@ -111,7 +111,7 @@ func main() {
if err != nil {
log.Fatalln(err)
}
monitor = &realis.Monitor{r}
monitor = &realis.Monitor{Client: r}
defer r.Close()
switch executor {
@ -451,13 +451,6 @@ func main() {
}
fmt.Println(resp.String())
case "variableBatchStep":
step, err := r.VariableBatchStep(aurora.JobUpdateKey{Job: job.JobKey(), ID: updateId})
if err != nil {
log.Fatal(err)
}
fmt.Println(step)
case "taskConfig":
fmt.Println("Getting job info")
live, err := r.GetInstanceIds(job.JobKey(), aurora.ACTIVE_STATES)

View file

@ -1,5 +1,4 @@
// Autogenerated by Thrift Compiler (0.12.0)
// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
// Code generated by Thrift Compiler (0.14.0). DO NOT EDIT.
package aurora

View file

@ -1,13 +1,12 @@
// Autogenerated by Thrift Compiler (0.12.0)
// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
// Code generated by Thrift Compiler (0.14.0). DO NOT EDIT.
package aurora
import (
import(
"bytes"
"context"
"reflect"
"fmt"
"time"
"github.com/apache/thrift/lib/go/thrift"
)
@ -15,7 +14,7 @@ import (
var _ = thrift.ZERO
var _ = fmt.Printf
var _ = context.Background
var _ = reflect.DeepEqual
var _ = time.Now
var _ = bytes.Equal
const AURORA_EXECUTOR_NAME = "AuroraExecutor"

File diff suppressed because it is too large Load diff

View file

@ -1,22 +1,22 @@
// Autogenerated by Thrift Compiler (0.12.0)
// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
// Code generated by Thrift Compiler (0.14.0). DO NOT EDIT.
package main
import (
"context"
"flag"
"fmt"
"math"
"net"
"net/url"
"os"
"strconv"
"strings"
"github.com/apache/thrift/lib/go/thrift"
"apache/aurora"
"context"
"flag"
"fmt"
"math"
"net"
"net/url"
"os"
"strconv"
"strings"
"github.com/apache/thrift/lib/go/thrift"
"apache/aurora"
)
var _ = aurora.GoUnusedProtection__
func Usage() {
fmt.Fprintln(os.Stderr, "Usage of ", os.Args[0], " [-h host:port] [-u url] [-f[ramed]] function [arg1 [arg2...]]:")
@ -175,19 +175,19 @@ func main() {
fmt.Fprintln(os.Stderr, "CreateJob requires 1 args")
flag.Usage()
}
arg163 := flag.Arg(1)
mbTrans164 := thrift.NewTMemoryBufferLen(len(arg163))
defer mbTrans164.Close()
_, err165 := mbTrans164.WriteString(arg163)
if err165 != nil {
arg213 := flag.Arg(1)
mbTrans214 := thrift.NewTMemoryBufferLen(len(arg213))
defer mbTrans214.Close()
_, err215 := mbTrans214.WriteString(arg213)
if err215 != nil {
Usage()
return
}
factory166 := thrift.NewTJSONProtocolFactory()
jsProt167 := factory166.GetProtocol(mbTrans164)
factory216 := thrift.NewTJSONProtocolFactory()
jsProt217 := factory216.GetProtocol(mbTrans214)
argvalue0 := aurora.NewJobConfiguration()
err168 := argvalue0.Read(jsProt167)
if err168 != nil {
err218 := argvalue0.Read(context.Background(), jsProt217)
if err218 != nil {
Usage()
return
}
@ -200,19 +200,19 @@ func main() {
fmt.Fprintln(os.Stderr, "ScheduleCronJob requires 1 args")
flag.Usage()
}
arg169 := flag.Arg(1)
mbTrans170 := thrift.NewTMemoryBufferLen(len(arg169))
defer mbTrans170.Close()
_, err171 := mbTrans170.WriteString(arg169)
if err171 != nil {
arg219 := flag.Arg(1)
mbTrans220 := thrift.NewTMemoryBufferLen(len(arg219))
defer mbTrans220.Close()
_, err221 := mbTrans220.WriteString(arg219)
if err221 != nil {
Usage()
return
}
factory172 := thrift.NewTJSONProtocolFactory()
jsProt173 := factory172.GetProtocol(mbTrans170)
factory222 := thrift.NewTJSONProtocolFactory()
jsProt223 := factory222.GetProtocol(mbTrans220)
argvalue0 := aurora.NewJobConfiguration()
err174 := argvalue0.Read(jsProt173)
if err174 != nil {
err224 := argvalue0.Read(context.Background(), jsProt223)
if err224 != nil {
Usage()
return
}
@ -225,19 +225,19 @@ func main() {
fmt.Fprintln(os.Stderr, "DescheduleCronJob requires 1 args")
flag.Usage()
}
arg175 := flag.Arg(1)
mbTrans176 := thrift.NewTMemoryBufferLen(len(arg175))
defer mbTrans176.Close()
_, err177 := mbTrans176.WriteString(arg175)
if err177 != nil {
arg225 := flag.Arg(1)
mbTrans226 := thrift.NewTMemoryBufferLen(len(arg225))
defer mbTrans226.Close()
_, err227 := mbTrans226.WriteString(arg225)
if err227 != nil {
Usage()
return
}
factory178 := thrift.NewTJSONProtocolFactory()
jsProt179 := factory178.GetProtocol(mbTrans176)
factory228 := thrift.NewTJSONProtocolFactory()
jsProt229 := factory228.GetProtocol(mbTrans226)
argvalue0 := aurora.NewJobKey()
err180 := argvalue0.Read(jsProt179)
if err180 != nil {
err230 := argvalue0.Read(context.Background(), jsProt229)
if err230 != nil {
Usage()
return
}
@ -250,19 +250,19 @@ func main() {
fmt.Fprintln(os.Stderr, "StartCronJob requires 1 args")
flag.Usage()
}
arg181 := flag.Arg(1)
mbTrans182 := thrift.NewTMemoryBufferLen(len(arg181))
defer mbTrans182.Close()
_, err183 := mbTrans182.WriteString(arg181)
if err183 != nil {
arg231 := flag.Arg(1)
mbTrans232 := thrift.NewTMemoryBufferLen(len(arg231))
defer mbTrans232.Close()
_, err233 := mbTrans232.WriteString(arg231)
if err233 != nil {
Usage()
return
}
factory184 := thrift.NewTJSONProtocolFactory()
jsProt185 := factory184.GetProtocol(mbTrans182)
factory234 := thrift.NewTJSONProtocolFactory()
jsProt235 := factory234.GetProtocol(mbTrans232)
argvalue0 := aurora.NewJobKey()
err186 := argvalue0.Read(jsProt185)
if err186 != nil {
err236 := argvalue0.Read(context.Background(), jsProt235)
if err236 != nil {
Usage()
return
}
@ -275,36 +275,36 @@ func main() {
fmt.Fprintln(os.Stderr, "RestartShards requires 2 args")
flag.Usage()
}
arg187 := flag.Arg(1)
mbTrans188 := thrift.NewTMemoryBufferLen(len(arg187))
defer mbTrans188.Close()
_, err189 := mbTrans188.WriteString(arg187)
if err189 != nil {
arg237 := flag.Arg(1)
mbTrans238 := thrift.NewTMemoryBufferLen(len(arg237))
defer mbTrans238.Close()
_, err239 := mbTrans238.WriteString(arg237)
if err239 != nil {
Usage()
return
}
factory190 := thrift.NewTJSONProtocolFactory()
jsProt191 := factory190.GetProtocol(mbTrans188)
factory240 := thrift.NewTJSONProtocolFactory()
jsProt241 := factory240.GetProtocol(mbTrans238)
argvalue0 := aurora.NewJobKey()
err192 := argvalue0.Read(jsProt191)
if err192 != nil {
err242 := argvalue0.Read(context.Background(), jsProt241)
if err242 != nil {
Usage()
return
}
value0 := argvalue0
arg193 := flag.Arg(2)
mbTrans194 := thrift.NewTMemoryBufferLen(len(arg193))
defer mbTrans194.Close()
_, err195 := mbTrans194.WriteString(arg193)
if err195 != nil {
arg243 := flag.Arg(2)
mbTrans244 := thrift.NewTMemoryBufferLen(len(arg243))
defer mbTrans244.Close()
_, err245 := mbTrans244.WriteString(arg243)
if err245 != nil {
Usage()
return
}
factory196 := thrift.NewTJSONProtocolFactory()
jsProt197 := factory196.GetProtocol(mbTrans194)
factory246 := thrift.NewTJSONProtocolFactory()
jsProt247 := factory246.GetProtocol(mbTrans244)
containerStruct1 := aurora.NewAuroraSchedulerManagerRestartShardsArgs()
err198 := containerStruct1.ReadField2(jsProt197)
if err198 != nil {
err248 := containerStruct1.ReadField2(context.Background(), jsProt247)
if err248 != nil {
Usage()
return
}
@ -318,36 +318,36 @@ func main() {
fmt.Fprintln(os.Stderr, "KillTasks requires 3 args")
flag.Usage()
}
arg199 := flag.Arg(1)
mbTrans200 := thrift.NewTMemoryBufferLen(len(arg199))
defer mbTrans200.Close()
_, err201 := mbTrans200.WriteString(arg199)
if err201 != nil {
arg249 := flag.Arg(1)
mbTrans250 := thrift.NewTMemoryBufferLen(len(arg249))
defer mbTrans250.Close()
_, err251 := mbTrans250.WriteString(arg249)
if err251 != nil {
Usage()
return
}
factory202 := thrift.NewTJSONProtocolFactory()
jsProt203 := factory202.GetProtocol(mbTrans200)
factory252 := thrift.NewTJSONProtocolFactory()
jsProt253 := factory252.GetProtocol(mbTrans250)
argvalue0 := aurora.NewJobKey()
err204 := argvalue0.Read(jsProt203)
if err204 != nil {
err254 := argvalue0.Read(context.Background(), jsProt253)
if err254 != nil {
Usage()
return
}
value0 := argvalue0
arg205 := flag.Arg(2)
mbTrans206 := thrift.NewTMemoryBufferLen(len(arg205))
defer mbTrans206.Close()
_, err207 := mbTrans206.WriteString(arg205)
if err207 != nil {
arg255 := flag.Arg(2)
mbTrans256 := thrift.NewTMemoryBufferLen(len(arg255))
defer mbTrans256.Close()
_, err257 := mbTrans256.WriteString(arg255)
if err257 != nil {
Usage()
return
}
factory208 := thrift.NewTJSONProtocolFactory()
jsProt209 := factory208.GetProtocol(mbTrans206)
factory258 := thrift.NewTJSONProtocolFactory()
jsProt259 := factory258.GetProtocol(mbTrans256)
containerStruct1 := aurora.NewAuroraSchedulerManagerKillTasksArgs()
err210 := containerStruct1.ReadField2(jsProt209)
if err210 != nil {
err260 := containerStruct1.ReadField2(context.Background(), jsProt259)
if err260 != nil {
Usage()
return
}
@ -363,25 +363,25 @@ func main() {
fmt.Fprintln(os.Stderr, "AddInstances requires 2 args")
flag.Usage()
}
arg212 := flag.Arg(1)
mbTrans213 := thrift.NewTMemoryBufferLen(len(arg212))
defer mbTrans213.Close()
_, err214 := mbTrans213.WriteString(arg212)
if err214 != nil {
arg262 := flag.Arg(1)
mbTrans263 := thrift.NewTMemoryBufferLen(len(arg262))
defer mbTrans263.Close()
_, err264 := mbTrans263.WriteString(arg262)
if err264 != nil {
Usage()
return
}
factory215 := thrift.NewTJSONProtocolFactory()
jsProt216 := factory215.GetProtocol(mbTrans213)
factory265 := thrift.NewTJSONProtocolFactory()
jsProt266 := factory265.GetProtocol(mbTrans263)
argvalue0 := aurora.NewInstanceKey()
err217 := argvalue0.Read(jsProt216)
if err217 != nil {
err267 := argvalue0.Read(context.Background(), jsProt266)
if err267 != nil {
Usage()
return
}
value0 := argvalue0
tmp1, err218 := (strconv.Atoi(flag.Arg(2)))
if err218 != nil {
tmp1, err268 := (strconv.Atoi(flag.Arg(2)))
if err268 != nil {
Usage()
return
}
@ -395,19 +395,19 @@ func main() {
fmt.Fprintln(os.Stderr, "ReplaceCronTemplate requires 1 args")
flag.Usage()
}
arg219 := flag.Arg(1)
mbTrans220 := thrift.NewTMemoryBufferLen(len(arg219))
defer mbTrans220.Close()
_, err221 := mbTrans220.WriteString(arg219)
if err221 != nil {
arg269 := flag.Arg(1)
mbTrans270 := thrift.NewTMemoryBufferLen(len(arg269))
defer mbTrans270.Close()
_, err271 := mbTrans270.WriteString(arg269)
if err271 != nil {
Usage()
return
}
factory222 := thrift.NewTJSONProtocolFactory()
jsProt223 := factory222.GetProtocol(mbTrans220)
factory272 := thrift.NewTJSONProtocolFactory()
jsProt273 := factory272.GetProtocol(mbTrans270)
argvalue0 := aurora.NewJobConfiguration()
err224 := argvalue0.Read(jsProt223)
if err224 != nil {
err274 := argvalue0.Read(context.Background(), jsProt273)
if err274 != nil {
Usage()
return
}
@ -420,19 +420,19 @@ func main() {
fmt.Fprintln(os.Stderr, "StartJobUpdate requires 2 args")
flag.Usage()
}
arg225 := flag.Arg(1)
mbTrans226 := thrift.NewTMemoryBufferLen(len(arg225))
defer mbTrans226.Close()
_, err227 := mbTrans226.WriteString(arg225)
if err227 != nil {
arg275 := flag.Arg(1)
mbTrans276 := thrift.NewTMemoryBufferLen(len(arg275))
defer mbTrans276.Close()
_, err277 := mbTrans276.WriteString(arg275)
if err277 != nil {
Usage()
return
}
factory228 := thrift.NewTJSONProtocolFactory()
jsProt229 := factory228.GetProtocol(mbTrans226)
factory278 := thrift.NewTJSONProtocolFactory()
jsProt279 := factory278.GetProtocol(mbTrans276)
argvalue0 := aurora.NewJobUpdateRequest()
err230 := argvalue0.Read(jsProt229)
if err230 != nil {
err280 := argvalue0.Read(context.Background(), jsProt279)
if err280 != nil {
Usage()
return
}
@ -447,19 +447,19 @@ func main() {
fmt.Fprintln(os.Stderr, "PauseJobUpdate requires 2 args")
flag.Usage()
}
arg232 := flag.Arg(1)
mbTrans233 := thrift.NewTMemoryBufferLen(len(arg232))
defer mbTrans233.Close()
_, err234 := mbTrans233.WriteString(arg232)
if err234 != nil {
arg282 := flag.Arg(1)
mbTrans283 := thrift.NewTMemoryBufferLen(len(arg282))
defer mbTrans283.Close()
_, err284 := mbTrans283.WriteString(arg282)
if err284 != nil {
Usage()
return
}
factory235 := thrift.NewTJSONProtocolFactory()
jsProt236 := factory235.GetProtocol(mbTrans233)
factory285 := thrift.NewTJSONProtocolFactory()
jsProt286 := factory285.GetProtocol(mbTrans283)
argvalue0 := aurora.NewJobUpdateKey()
err237 := argvalue0.Read(jsProt236)
if err237 != nil {
err287 := argvalue0.Read(context.Background(), jsProt286)
if err287 != nil {
Usage()
return
}
@ -474,19 +474,19 @@ func main() {
fmt.Fprintln(os.Stderr, "ResumeJobUpdate requires 2 args")
flag.Usage()
}
arg239 := flag.Arg(1)
mbTrans240 := thrift.NewTMemoryBufferLen(len(arg239))
defer mbTrans240.Close()
_, err241 := mbTrans240.WriteString(arg239)
if err241 != nil {
arg289 := flag.Arg(1)
mbTrans290 := thrift.NewTMemoryBufferLen(len(arg289))
defer mbTrans290.Close()
_, err291 := mbTrans290.WriteString(arg289)
if err291 != nil {
Usage()
return
}
factory242 := thrift.NewTJSONProtocolFactory()
jsProt243 := factory242.GetProtocol(mbTrans240)
factory292 := thrift.NewTJSONProtocolFactory()
jsProt293 := factory292.GetProtocol(mbTrans290)
argvalue0 := aurora.NewJobUpdateKey()
err244 := argvalue0.Read(jsProt243)
if err244 != nil {
err294 := argvalue0.Read(context.Background(), jsProt293)
if err294 != nil {
Usage()
return
}
@ -501,19 +501,19 @@ func main() {
fmt.Fprintln(os.Stderr, "AbortJobUpdate requires 2 args")
flag.Usage()
}
arg246 := flag.Arg(1)
mbTrans247 := thrift.NewTMemoryBufferLen(len(arg246))
defer mbTrans247.Close()
_, err248 := mbTrans247.WriteString(arg246)
if err248 != nil {
arg296 := flag.Arg(1)
mbTrans297 := thrift.NewTMemoryBufferLen(len(arg296))
defer mbTrans297.Close()
_, err298 := mbTrans297.WriteString(arg296)
if err298 != nil {
Usage()
return
}
factory249 := thrift.NewTJSONProtocolFactory()
jsProt250 := factory249.GetProtocol(mbTrans247)
factory299 := thrift.NewTJSONProtocolFactory()
jsProt300 := factory299.GetProtocol(mbTrans297)
argvalue0 := aurora.NewJobUpdateKey()
err251 := argvalue0.Read(jsProt250)
if err251 != nil {
err301 := argvalue0.Read(context.Background(), jsProt300)
if err301 != nil {
Usage()
return
}
@ -528,19 +528,19 @@ func main() {
fmt.Fprintln(os.Stderr, "RollbackJobUpdate requires 2 args")
flag.Usage()
}
arg253 := flag.Arg(1)
mbTrans254 := thrift.NewTMemoryBufferLen(len(arg253))
defer mbTrans254.Close()
_, err255 := mbTrans254.WriteString(arg253)
if err255 != nil {
arg303 := flag.Arg(1)
mbTrans304 := thrift.NewTMemoryBufferLen(len(arg303))
defer mbTrans304.Close()
_, err305 := mbTrans304.WriteString(arg303)
if err305 != nil {
Usage()
return
}
factory256 := thrift.NewTJSONProtocolFactory()
jsProt257 := factory256.GetProtocol(mbTrans254)
factory306 := thrift.NewTJSONProtocolFactory()
jsProt307 := factory306.GetProtocol(mbTrans304)
argvalue0 := aurora.NewJobUpdateKey()
err258 := argvalue0.Read(jsProt257)
if err258 != nil {
err308 := argvalue0.Read(context.Background(), jsProt307)
if err308 != nil {
Usage()
return
}
@ -555,19 +555,19 @@ func main() {
fmt.Fprintln(os.Stderr, "PulseJobUpdate requires 1 args")
flag.Usage()
}
arg260 := flag.Arg(1)
mbTrans261 := thrift.NewTMemoryBufferLen(len(arg260))
defer mbTrans261.Close()
_, err262 := mbTrans261.WriteString(arg260)
if err262 != nil {
arg310 := flag.Arg(1)
mbTrans311 := thrift.NewTMemoryBufferLen(len(arg310))
defer mbTrans311.Close()
_, err312 := mbTrans311.WriteString(arg310)
if err312 != nil {
Usage()
return
}
factory263 := thrift.NewTJSONProtocolFactory()
jsProt264 := factory263.GetProtocol(mbTrans261)
factory313 := thrift.NewTJSONProtocolFactory()
jsProt314 := factory313.GetProtocol(mbTrans311)
argvalue0 := aurora.NewJobUpdateKey()
err265 := argvalue0.Read(jsProt264)
if err265 != nil {
err315 := argvalue0.Read(context.Background(), jsProt314)
if err315 != nil {
Usage()
return
}
@ -598,19 +598,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetTasksStatus requires 1 args")
flag.Usage()
}
arg267 := flag.Arg(1)
mbTrans268 := thrift.NewTMemoryBufferLen(len(arg267))
defer mbTrans268.Close()
_, err269 := mbTrans268.WriteString(arg267)
if err269 != nil {
arg317 := flag.Arg(1)
mbTrans318 := thrift.NewTMemoryBufferLen(len(arg317))
defer mbTrans318.Close()
_, err319 := mbTrans318.WriteString(arg317)
if err319 != nil {
Usage()
return
}
factory270 := thrift.NewTJSONProtocolFactory()
jsProt271 := factory270.GetProtocol(mbTrans268)
factory320 := thrift.NewTJSONProtocolFactory()
jsProt321 := factory320.GetProtocol(mbTrans318)
argvalue0 := aurora.NewTaskQuery()
err272 := argvalue0.Read(jsProt271)
if err272 != nil {
err322 := argvalue0.Read(context.Background(), jsProt321)
if err322 != nil {
Usage()
return
}
@ -623,19 +623,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetTasksWithoutConfigs requires 1 args")
flag.Usage()
}
arg273 := flag.Arg(1)
mbTrans274 := thrift.NewTMemoryBufferLen(len(arg273))
defer mbTrans274.Close()
_, err275 := mbTrans274.WriteString(arg273)
if err275 != nil {
arg323 := flag.Arg(1)
mbTrans324 := thrift.NewTMemoryBufferLen(len(arg323))
defer mbTrans324.Close()
_, err325 := mbTrans324.WriteString(arg323)
if err325 != nil {
Usage()
return
}
factory276 := thrift.NewTJSONProtocolFactory()
jsProt277 := factory276.GetProtocol(mbTrans274)
factory326 := thrift.NewTJSONProtocolFactory()
jsProt327 := factory326.GetProtocol(mbTrans324)
argvalue0 := aurora.NewTaskQuery()
err278 := argvalue0.Read(jsProt277)
if err278 != nil {
err328 := argvalue0.Read(context.Background(), jsProt327)
if err328 != nil {
Usage()
return
}
@ -648,19 +648,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetPendingReason requires 1 args")
flag.Usage()
}
arg279 := flag.Arg(1)
mbTrans280 := thrift.NewTMemoryBufferLen(len(arg279))
defer mbTrans280.Close()
_, err281 := mbTrans280.WriteString(arg279)
if err281 != nil {
arg329 := flag.Arg(1)
mbTrans330 := thrift.NewTMemoryBufferLen(len(arg329))
defer mbTrans330.Close()
_, err331 := mbTrans330.WriteString(arg329)
if err331 != nil {
Usage()
return
}
factory282 := thrift.NewTJSONProtocolFactory()
jsProt283 := factory282.GetProtocol(mbTrans280)
factory332 := thrift.NewTJSONProtocolFactory()
jsProt333 := factory332.GetProtocol(mbTrans330)
argvalue0 := aurora.NewTaskQuery()
err284 := argvalue0.Read(jsProt283)
if err284 != nil {
err334 := argvalue0.Read(context.Background(), jsProt333)
if err334 != nil {
Usage()
return
}
@ -673,19 +673,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetConfigSummary requires 1 args")
flag.Usage()
}
arg285 := flag.Arg(1)
mbTrans286 := thrift.NewTMemoryBufferLen(len(arg285))
defer mbTrans286.Close()
_, err287 := mbTrans286.WriteString(arg285)
if err287 != nil {
arg335 := flag.Arg(1)
mbTrans336 := thrift.NewTMemoryBufferLen(len(arg335))
defer mbTrans336.Close()
_, err337 := mbTrans336.WriteString(arg335)
if err337 != nil {
Usage()
return
}
factory288 := thrift.NewTJSONProtocolFactory()
jsProt289 := factory288.GetProtocol(mbTrans286)
factory338 := thrift.NewTJSONProtocolFactory()
jsProt339 := factory338.GetProtocol(mbTrans336)
argvalue0 := aurora.NewJobKey()
err290 := argvalue0.Read(jsProt289)
if err290 != nil {
err340 := argvalue0.Read(context.Background(), jsProt339)
if err340 != nil {
Usage()
return
}
@ -718,19 +718,19 @@ func main() {
fmt.Fprintln(os.Stderr, "PopulateJobConfig requires 1 args")
flag.Usage()
}
arg293 := flag.Arg(1)
mbTrans294 := thrift.NewTMemoryBufferLen(len(arg293))
defer mbTrans294.Close()
_, err295 := mbTrans294.WriteString(arg293)
if err295 != nil {
arg343 := flag.Arg(1)
mbTrans344 := thrift.NewTMemoryBufferLen(len(arg343))
defer mbTrans344.Close()
_, err345 := mbTrans344.WriteString(arg343)
if err345 != nil {
Usage()
return
}
factory296 := thrift.NewTJSONProtocolFactory()
jsProt297 := factory296.GetProtocol(mbTrans294)
factory346 := thrift.NewTJSONProtocolFactory()
jsProt347 := factory346.GetProtocol(mbTrans344)
argvalue0 := aurora.NewJobConfiguration()
err298 := argvalue0.Read(jsProt297)
if err298 != nil {
err348 := argvalue0.Read(context.Background(), jsProt347)
if err348 != nil {
Usage()
return
}
@ -743,19 +743,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetJobUpdateSummaries requires 1 args")
flag.Usage()
}
arg299 := flag.Arg(1)
mbTrans300 := thrift.NewTMemoryBufferLen(len(arg299))
defer mbTrans300.Close()
_, err301 := mbTrans300.WriteString(arg299)
if err301 != nil {
arg349 := flag.Arg(1)
mbTrans350 := thrift.NewTMemoryBufferLen(len(arg349))
defer mbTrans350.Close()
_, err351 := mbTrans350.WriteString(arg349)
if err351 != nil {
Usage()
return
}
factory302 := thrift.NewTJSONProtocolFactory()
jsProt303 := factory302.GetProtocol(mbTrans300)
factory352 := thrift.NewTJSONProtocolFactory()
jsProt353 := factory352.GetProtocol(mbTrans350)
argvalue0 := aurora.NewJobUpdateQuery()
err304 := argvalue0.Read(jsProt303)
if err304 != nil {
err354 := argvalue0.Read(context.Background(), jsProt353)
if err354 != nil {
Usage()
return
}
@ -768,19 +768,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetJobUpdateDetails requires 1 args")
flag.Usage()
}
arg305 := flag.Arg(1)
mbTrans306 := thrift.NewTMemoryBufferLen(len(arg305))
defer mbTrans306.Close()
_, err307 := mbTrans306.WriteString(arg305)
if err307 != nil {
arg355 := flag.Arg(1)
mbTrans356 := thrift.NewTMemoryBufferLen(len(arg355))
defer mbTrans356.Close()
_, err357 := mbTrans356.WriteString(arg355)
if err357 != nil {
Usage()
return
}
factory308 := thrift.NewTJSONProtocolFactory()
jsProt309 := factory308.GetProtocol(mbTrans306)
factory358 := thrift.NewTJSONProtocolFactory()
jsProt359 := factory358.GetProtocol(mbTrans356)
argvalue0 := aurora.NewJobUpdateQuery()
err310 := argvalue0.Read(jsProt309)
if err310 != nil {
err360 := argvalue0.Read(context.Background(), jsProt359)
if err360 != nil {
Usage()
return
}
@ -793,19 +793,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetJobUpdateDiff requires 1 args")
flag.Usage()
}
arg311 := flag.Arg(1)
mbTrans312 := thrift.NewTMemoryBufferLen(len(arg311))
defer mbTrans312.Close()
_, err313 := mbTrans312.WriteString(arg311)
if err313 != nil {
arg361 := flag.Arg(1)
mbTrans362 := thrift.NewTMemoryBufferLen(len(arg361))
defer mbTrans362.Close()
_, err363 := mbTrans362.WriteString(arg361)
if err363 != nil {
Usage()
return
}
factory314 := thrift.NewTJSONProtocolFactory()
jsProt315 := factory314.GetProtocol(mbTrans312)
factory364 := thrift.NewTJSONProtocolFactory()
jsProt365 := factory364.GetProtocol(mbTrans362)
argvalue0 := aurora.NewJobUpdateRequest()
err316 := argvalue0.Read(jsProt315)
if err316 != nil {
err366 := argvalue0.Read(context.Background(), jsProt365)
if err366 != nil {
Usage()
return
}

View file

@ -1,22 +1,22 @@
// Autogenerated by Thrift Compiler (0.12.0)
// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
// Code generated by Thrift Compiler (0.14.0). DO NOT EDIT.
package main
import (
"context"
"flag"
"fmt"
"math"
"net"
"net/url"
"os"
"strconv"
"strings"
"github.com/apache/thrift/lib/go/thrift"
"apache/aurora"
"context"
"flag"
"fmt"
"math"
"net"
"net/url"
"os"
"strconv"
"strings"
"github.com/apache/thrift/lib/go/thrift"
"apache/aurora"
)
var _ = aurora.GoUnusedProtection__
func Usage() {
fmt.Fprintln(os.Stderr, "Usage of ", os.Args[0], " [-h host:port] [-u url] [-f[ramed]] function [arg1 [arg2...]]:")
@ -179,19 +179,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetTasksStatus requires 1 args")
flag.Usage()
}
arg82 := flag.Arg(1)
mbTrans83 := thrift.NewTMemoryBufferLen(len(arg82))
defer mbTrans83.Close()
_, err84 := mbTrans83.WriteString(arg82)
if err84 != nil {
arg132 := flag.Arg(1)
mbTrans133 := thrift.NewTMemoryBufferLen(len(arg132))
defer mbTrans133.Close()
_, err134 := mbTrans133.WriteString(arg132)
if err134 != nil {
Usage()
return
}
factory85 := thrift.NewTJSONProtocolFactory()
jsProt86 := factory85.GetProtocol(mbTrans83)
factory135 := thrift.NewTJSONProtocolFactory()
jsProt136 := factory135.GetProtocol(mbTrans133)
argvalue0 := aurora.NewTaskQuery()
err87 := argvalue0.Read(jsProt86)
if err87 != nil {
err137 := argvalue0.Read(context.Background(), jsProt136)
if err137 != nil {
Usage()
return
}
@ -204,19 +204,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetTasksWithoutConfigs requires 1 args")
flag.Usage()
}
arg88 := flag.Arg(1)
mbTrans89 := thrift.NewTMemoryBufferLen(len(arg88))
defer mbTrans89.Close()
_, err90 := mbTrans89.WriteString(arg88)
if err90 != nil {
arg138 := flag.Arg(1)
mbTrans139 := thrift.NewTMemoryBufferLen(len(arg138))
defer mbTrans139.Close()
_, err140 := mbTrans139.WriteString(arg138)
if err140 != nil {
Usage()
return
}
factory91 := thrift.NewTJSONProtocolFactory()
jsProt92 := factory91.GetProtocol(mbTrans89)
factory141 := thrift.NewTJSONProtocolFactory()
jsProt142 := factory141.GetProtocol(mbTrans139)
argvalue0 := aurora.NewTaskQuery()
err93 := argvalue0.Read(jsProt92)
if err93 != nil {
err143 := argvalue0.Read(context.Background(), jsProt142)
if err143 != nil {
Usage()
return
}
@ -229,19 +229,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetPendingReason requires 1 args")
flag.Usage()
}
arg94 := flag.Arg(1)
mbTrans95 := thrift.NewTMemoryBufferLen(len(arg94))
defer mbTrans95.Close()
_, err96 := mbTrans95.WriteString(arg94)
if err96 != nil {
arg144 := flag.Arg(1)
mbTrans145 := thrift.NewTMemoryBufferLen(len(arg144))
defer mbTrans145.Close()
_, err146 := mbTrans145.WriteString(arg144)
if err146 != nil {
Usage()
return
}
factory97 := thrift.NewTJSONProtocolFactory()
jsProt98 := factory97.GetProtocol(mbTrans95)
factory147 := thrift.NewTJSONProtocolFactory()
jsProt148 := factory147.GetProtocol(mbTrans145)
argvalue0 := aurora.NewTaskQuery()
err99 := argvalue0.Read(jsProt98)
if err99 != nil {
err149 := argvalue0.Read(context.Background(), jsProt148)
if err149 != nil {
Usage()
return
}
@ -254,19 +254,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetConfigSummary requires 1 args")
flag.Usage()
}
arg100 := flag.Arg(1)
mbTrans101 := thrift.NewTMemoryBufferLen(len(arg100))
defer mbTrans101.Close()
_, err102 := mbTrans101.WriteString(arg100)
if err102 != nil {
arg150 := flag.Arg(1)
mbTrans151 := thrift.NewTMemoryBufferLen(len(arg150))
defer mbTrans151.Close()
_, err152 := mbTrans151.WriteString(arg150)
if err152 != nil {
Usage()
return
}
factory103 := thrift.NewTJSONProtocolFactory()
jsProt104 := factory103.GetProtocol(mbTrans101)
factory153 := thrift.NewTJSONProtocolFactory()
jsProt154 := factory153.GetProtocol(mbTrans151)
argvalue0 := aurora.NewJobKey()
err105 := argvalue0.Read(jsProt104)
if err105 != nil {
err155 := argvalue0.Read(context.Background(), jsProt154)
if err155 != nil {
Usage()
return
}
@ -299,19 +299,19 @@ func main() {
fmt.Fprintln(os.Stderr, "PopulateJobConfig requires 1 args")
flag.Usage()
}
arg108 := flag.Arg(1)
mbTrans109 := thrift.NewTMemoryBufferLen(len(arg108))
defer mbTrans109.Close()
_, err110 := mbTrans109.WriteString(arg108)
if err110 != nil {
arg158 := flag.Arg(1)
mbTrans159 := thrift.NewTMemoryBufferLen(len(arg158))
defer mbTrans159.Close()
_, err160 := mbTrans159.WriteString(arg158)
if err160 != nil {
Usage()
return
}
factory111 := thrift.NewTJSONProtocolFactory()
jsProt112 := factory111.GetProtocol(mbTrans109)
factory161 := thrift.NewTJSONProtocolFactory()
jsProt162 := factory161.GetProtocol(mbTrans159)
argvalue0 := aurora.NewJobConfiguration()
err113 := argvalue0.Read(jsProt112)
if err113 != nil {
err163 := argvalue0.Read(context.Background(), jsProt162)
if err163 != nil {
Usage()
return
}
@ -324,19 +324,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetJobUpdateSummaries requires 1 args")
flag.Usage()
}
arg114 := flag.Arg(1)
mbTrans115 := thrift.NewTMemoryBufferLen(len(arg114))
defer mbTrans115.Close()
_, err116 := mbTrans115.WriteString(arg114)
if err116 != nil {
arg164 := flag.Arg(1)
mbTrans165 := thrift.NewTMemoryBufferLen(len(arg164))
defer mbTrans165.Close()
_, err166 := mbTrans165.WriteString(arg164)
if err166 != nil {
Usage()
return
}
factory117 := thrift.NewTJSONProtocolFactory()
jsProt118 := factory117.GetProtocol(mbTrans115)
factory167 := thrift.NewTJSONProtocolFactory()
jsProt168 := factory167.GetProtocol(mbTrans165)
argvalue0 := aurora.NewJobUpdateQuery()
err119 := argvalue0.Read(jsProt118)
if err119 != nil {
err169 := argvalue0.Read(context.Background(), jsProt168)
if err169 != nil {
Usage()
return
}
@ -349,19 +349,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetJobUpdateDetails requires 1 args")
flag.Usage()
}
arg120 := flag.Arg(1)
mbTrans121 := thrift.NewTMemoryBufferLen(len(arg120))
defer mbTrans121.Close()
_, err122 := mbTrans121.WriteString(arg120)
if err122 != nil {
arg170 := flag.Arg(1)
mbTrans171 := thrift.NewTMemoryBufferLen(len(arg170))
defer mbTrans171.Close()
_, err172 := mbTrans171.WriteString(arg170)
if err172 != nil {
Usage()
return
}
factory123 := thrift.NewTJSONProtocolFactory()
jsProt124 := factory123.GetProtocol(mbTrans121)
factory173 := thrift.NewTJSONProtocolFactory()
jsProt174 := factory173.GetProtocol(mbTrans171)
argvalue0 := aurora.NewJobUpdateQuery()
err125 := argvalue0.Read(jsProt124)
if err125 != nil {
err175 := argvalue0.Read(context.Background(), jsProt174)
if err175 != nil {
Usage()
return
}
@ -374,19 +374,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetJobUpdateDiff requires 1 args")
flag.Usage()
}
arg126 := flag.Arg(1)
mbTrans127 := thrift.NewTMemoryBufferLen(len(arg126))
defer mbTrans127.Close()
_, err128 := mbTrans127.WriteString(arg126)
if err128 != nil {
arg176 := flag.Arg(1)
mbTrans177 := thrift.NewTMemoryBufferLen(len(arg176))
defer mbTrans177.Close()
_, err178 := mbTrans177.WriteString(arg176)
if err178 != nil {
Usage()
return
}
factory129 := thrift.NewTJSONProtocolFactory()
jsProt130 := factory129.GetProtocol(mbTrans127)
factory179 := thrift.NewTJSONProtocolFactory()
jsProt180 := factory179.GetProtocol(mbTrans177)
argvalue0 := aurora.NewJobUpdateRequest()
err131 := argvalue0.Read(jsProt130)
if err131 != nil {
err181 := argvalue0.Read(context.Background(), jsProt180)
if err181 != nil {
Usage()
return
}

View file

@ -1,6 +1,6 @@
#! /bin/bash
THRIFT_VER=0.12.0
THRIFT_VER=0.14.0
if [[ $(thrift -version | grep -e $THRIFT_VER -c) -ne 1 ]]; then
echo "Warning: This wrapper has only been tested with version" $THRIFT_VER;

12
go.mod
View file

@ -1,12 +1,12 @@
module github.com/paypal/gorealis
go 1.12
go 1.13
require (
github.com/apache/thrift v0.12.0
github.com/davecgh/go-spew v1.1.0
github.com/pkg/errors v0.0.0-20171216070316-e881fd58d78e
github.com/pmezard/go-difflib v1.0.0
github.com/apache/thrift v0.14.0
github.com/davecgh/go-spew v1.1.0 // indirect
github.com/pkg/errors v0.9.1
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/samuel/go-zookeeper v0.0.0-20171117190445-471cd4e61d7a
github.com/stretchr/testify v1.2.0
github.com/stretchr/testify v1.7.0
)

25
go.sum
View file

@ -1,9 +1,30 @@
github.com/apache/thrift v0.12.0 h1:pODnxUFNcjP9UTLZGTdeh+j16A8lJbRvD3rOtrk/7bs=
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.13.0 h1:5hryIiq9gtn+MiLVn0wP37kb/uTeRZgN08WoCsAhIhI=
github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.14.0 h1:vqZ2DP42i8th2OsgCcYZkirtbzvpZEFx53LiWDJXIAs=
github.com/apache/thrift v0.14.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pkg/errors v0.0.0-20171216070316-e881fd58d78e h1:+RHxT/gm0O3UF7nLJbdNzAmULvCFt4XfXHWzh3XI/zs=
github.com/pkg/errors v0.0.0-20171216070316-e881fd58d78e/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/ridv/thrift v0.12.1 h1:b80V1Oa2Mbd++jrlJZbJsIybO5/MCfbXKzd1A5v4aSo=
github.com/ridv/thrift v0.12.1/go.mod h1:yTMRF94RCZjO1fY1xt69yncvMbQCPdRL8BhbwIrjPx8=
github.com/ridv/thrift v0.13.1 h1:/8XnTRUqJJeiuqoL7mfnJQmXQa4GJn9tUCiP7+i6Y9o=
github.com/ridv/thrift v0.13.1/go.mod h1:yTMRF94RCZjO1fY1xt69yncvMbQCPdRL8BhbwIrjPx8=
github.com/ridv/thrift v0.13.2 h1:Q3Smr8poXd7VkWZPHvdJZzlQCJO+b5W37ECfoUL4qHc=
github.com/ridv/thrift v0.13.2/go.mod h1:yTMRF94RCZjO1fY1xt69yncvMbQCPdRL8BhbwIrjPx8=
github.com/samuel/go-zookeeper v0.0.0-20171117190445-471cd4e61d7a h1:EYL2xz/Zdo0hyqdZMXR4lmT2O11jDLTPCEqIe/FR6W4=
github.com/samuel/go-zookeeper v0.0.0-20171117190445-471cd4e61d7a/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.0 h1:LThGCOvhuJic9Gyd1VBCkhyUXmO8vKaBFvBsJ2k03rg=
github.com/stretchr/testify v1.2.0/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

21
helpers.go Normal file
View file

@ -0,0 +1,21 @@
package realis
import (
"context"
"github.com/paypal/gorealis/gen-go/apache/aurora"
)
func (r *realisClient) jobExists(key aurora.JobKey) (bool, error) {
resp, err := r.client.GetConfigSummary(context.TODO(), &key)
if err != nil {
return false, err
}
return resp == nil ||
resp.GetResult_() == nil ||
resp.GetResult_().GetConfigSummaryResult_() == nil ||
resp.GetResult_().GetConfigSummaryResult_().GetSummary() == nil ||
resp.GetResponseCode() != aurora.ResponseCode_OK,
nil
}

89
job.go
View file

@ -62,6 +62,7 @@ type Job interface {
PartitionPolicy(policy *aurora.PartitionPolicy) Job
Tier(tier string) Job
SlaPolicy(policy *aurora.SlaPolicy) Job
Priority(priority int32) Job
}
type resourceType int
@ -73,12 +74,15 @@ const (
GPU
)
const portNamePrefix = "org.apache.aurora.port."
// AuroraJob is a structure to collect all information pertaining to an Aurora job.
type AuroraJob struct {
jobConfig *aurora.JobConfiguration
resources map[resourceType]*aurora.Resource
metadata map[string]*aurora.Metadata
portCount int
jobConfig *aurora.JobConfiguration
resources map[resourceType]*aurora.Resource
metadata map[string]*aurora.Metadata
constraints map[string]*aurora.Constraint
portCount int
}
// NewJob is used to create a Job object with everything initialized.
@ -109,10 +113,11 @@ func NewJob() Job {
diskMb.DiskMb = new(int64)
return &AuroraJob{
jobConfig: jobConfig,
resources: resources,
metadata: make(map[string]*aurora.Metadata),
portCount: 0,
jobConfig: jobConfig,
resources: resources,
metadata: make(map[string]*aurora.Metadata),
constraints: make(map[string]*aurora.Constraint),
portCount: 0,
}
}
@ -258,12 +263,12 @@ func (j *AuroraJob) AddURIs(extract bool, cache bool, values ...string) Job {
// AddLabel adds a Mesos label to the job. Note that Aurora will add the
// prefix "org.apache.aurora.metadata." to the beginning of each key.
func (j *AuroraJob) AddLabel(key string, value string) Job {
if _, ok := j.metadata[key]; ok {
j.metadata[key].Value = value
} else {
j.metadata[key] = &aurora.Metadata{Key: key, Value: value}
if _, ok := j.metadata[key]; !ok {
j.metadata[key] = &aurora.Metadata{Key: key}
j.jobConfig.TaskConfig.Metadata = append(j.jobConfig.TaskConfig.Metadata, j.metadata[key])
}
j.metadata[key].Value = value
return j
}
@ -288,7 +293,7 @@ func (j *AuroraJob) AddPorts(num int) Job {
start := j.portCount
j.portCount += num
for i := start; i < j.portCount; i++ {
portName := "org.apache.aurora.port." + strconv.Itoa(i)
portName := portNamePrefix + strconv.Itoa(i)
j.jobConfig.TaskConfig.Resources = append(
j.jobConfig.TaskConfig.Resources,
&aurora.Resource{NamedPort: &portName})
@ -297,47 +302,56 @@ func (j *AuroraJob) AddPorts(num int) Job {
return j
}
// AddValueConstraint allows the user to add a value constrain to the job to limiti which agents the job's
// tasks can be run on.
// AddValueConstraint allows the user to add a value constrain to the job to limit which agents the job's
// tasks can be run on. If the name matches a constraint that was previously set, the previous value will be
// overwritten. In case the previous constraint attached to the name was of type limit, the constraint will be clobbered
// by this new Value constraint.
// From Aurora Docs:
// Add a Value constraint
// name - Mesos slave attribute that the constraint is matched against.
// If negated = true , treat this as a 'not' - to avoid specific values.
// Values - list of values we look for in attribute name
func (j *AuroraJob) AddValueConstraint(name string, negated bool, values ...string) Job {
j.jobConfig.TaskConfig.Constraints = append(j.jobConfig.TaskConfig.Constraints,
&aurora.Constraint{
Name: name,
Constraint: &aurora.TaskConstraint{
Value: &aurora.ValueConstraint{
Negated: negated,
Values: values,
},
Limit: nil,
},
})
if _, ok := j.constraints[name]; !ok {
j.constraints[name] = &aurora.Constraint{Name: name}
j.jobConfig.TaskConfig.Constraints = append(j.jobConfig.TaskConfig.Constraints, j.constraints[name])
}
j.constraints[name].Constraint = &aurora.TaskConstraint{
Value: &aurora.ValueConstraint{
Negated: negated,
Values: values,
},
Limit: nil,
}
return j
}
// AddLimitConstraint allows the user to limit how many tasks form the same Job are run on a single host.
// If the name matches a constraint that was previously set, the previous value will be
// overwritten. In case the previous constraint attached to the name was of type Value, the constraint will be clobbered
// by this new Limit constraint.
// From Aurora Docs:
// A constraint that specifies the maximum number of active tasks on a host with
// a matching attribute that may be scheduled simultaneously.
func (j *AuroraJob) AddLimitConstraint(name string, limit int32) Job {
j.jobConfig.TaskConfig.Constraints = append(j.jobConfig.TaskConfig.Constraints,
&aurora.Constraint{
Name: name,
Constraint: &aurora.TaskConstraint{
Value: nil,
Limit: &aurora.LimitConstraint{Limit: limit},
},
})
if _, ok := j.constraints[name]; !ok {
j.constraints[name] = &aurora.Constraint{Name: name}
j.jobConfig.TaskConfig.Constraints = append(j.jobConfig.TaskConfig.Constraints, j.constraints[name])
}
j.constraints[name].Constraint = &aurora.TaskConstraint{
Value: nil,
Limit: &aurora.LimitConstraint{Limit: limit},
}
return j
}
// AddDedicatedConstraint allows the user to add a dedicated constraint to a Job configuration.
// AddDedicatedConstraint is a convenience function that allows the user to
// add a dedicated constraint to a Job configuration.
// In case a previous dedicated constraint was set, it will be clobbered by this new value.
func (j *AuroraJob) AddDedicatedConstraint(role, name string) Job {
j.AddValueConstraint("dedicated", false, role+"/"+name)
@ -370,3 +384,8 @@ func (j *AuroraJob) SlaPolicy(policy *aurora.SlaPolicy) Job {
return j
}
func (j *AuroraJob) Priority(priority int32) Job {
j.jobConfig.TaskConfig.Priority = priority
return j
}

View file

@ -78,8 +78,11 @@ func (m *Monitor) JobUpdateStatus(updateKey aurora.JobUpdateKey,
UpdateStatuses: desiredStatuses,
}
summary, err := m.JobUpdateQuery(updateQ, interval, timeout)
if err != nil {
return 0, err
}
return summary[0].State.Status, err
return summary[0].State.Status, nil
}
// JobUpdateQuery polls the scheduler every certain amount of time to see if the query call returns any results.
@ -114,7 +117,7 @@ func (m *Monitor) JobUpdateQuery(
}
}
// AutoPaused monitor is a special monitor for auto pause enabled batch updates. This monitor ensures that the update
// AutoPausedUpdateMonitor is a special monitor for auto pause enabled batch updates. This monitor ensures that the update
// being monitored is capable of auto pausing and has auto pausing enabled. After verifying this information,
// the monitor watches for the job to enter the ROLL_FORWARD_PAUSED state and calculates the current batch
// the update is in using information from the update configuration.
@ -165,7 +168,8 @@ func (m *Monitor) AutoPausedUpdateMonitor(key aurora.JobUpdateKey, interval, tim
return -1, err
}
if summary[0].State.Status != aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED {
if !(summary[0].State.Status == aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED ||
summary[0].State.Status == aurora.JobUpdateStatus_ROLLED_FORWARD) {
return -1, errors.Errorf("update is in a terminal state %v", summary[0].State.Status)
}
@ -180,7 +184,7 @@ func (m *Monitor) AutoPausedUpdateMonitor(key aurora.JobUpdateKey, interval, tim
return calculateCurrentBatch(int32(len(updatingInstances)), batchSizes), nil
}
// Monitor a Job until all instances enter one of the LIVE_STATES
// Instances will monitor a Job until all instances enter one of the LIVE_STATES
func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval, timeout int) (bool, error) {
return m.ScheduleStatus(key, instances, LiveStates, interval, timeout)
}

273
realis.go
View file

@ -18,14 +18,11 @@ package realis
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/base64"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"path/filepath"
"sort"
"strings"
"sync"
@ -38,7 +35,7 @@ import (
"github.com/paypal/gorealis/response"
)
const version = "1.21.1"
const version = "1.24.1"
// Realis is an interface that defines the various APIs that may be used to communicate with
// the Apache Aurora scheduler.
@ -68,7 +65,6 @@ type Realis interface {
RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error)
ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error)
PauseJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error)
ResumeJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error)
PulseJobUpdate(key *aurora.JobUpdateKey) (*aurora.Response, error)
@ -117,6 +113,7 @@ type config struct {
logger *LevelLogger
insecureSkipVerify bool
certspath string
certExtensions map[string]struct{}
clientKey, clientCert string
options []ClientOption
debug bool
@ -132,6 +129,15 @@ var defaultBackoff = Backoff{
Jitter: 0.1,
}
var defaultSlaPolicy = aurora.SlaPolicy{
PercentageSlaPolicy: &aurora.PercentageSlaPolicy{
Percentage: 66,
DurationSecs: 300,
},
}
const defaultSlaDrainTimeoutSecs = 900
// ClientOption is an alias for a function that modifies the realis config object
type ClientOption func(*config)
@ -229,6 +235,18 @@ func ClientCerts(clientKey, clientCert string) ClientOption {
}
}
// CertExtensions configures gorealis to consider files with the given extensions when
// loading certificates from the cert path.
func CertExtensions(extensions ...string) ClientOption {
extensionsLookup := make(map[string]struct{})
for _, ext := range extensions {
extensionsLookup[ext] = struct{}{}
}
return func(config *config) {
config.certExtensions = extensionsLookup
}
}
// ZookeeperOptions allows users to override default settings for connecting to Zookeeper.
// See zk.go for what is possible to set as an option.
func ZookeeperOptions(opts ...ZKOpt) ClientOption {
@ -311,6 +329,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
config.timeoutms = 10000
config.backoff = defaultBackoff
config.logger = &LevelLogger{logger: log.New(os.Stdout, "realis: ", log.Ltime|log.Ldate|log.LUTC)}
config.certExtensions = map[string]struct{}{".crt": {}, ".pem": {}, ".key": {}}
// Save options to recreate client if a connection error happens
config.options = options
@ -375,7 +394,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
return nil, errors.New("incomplete Options -- url, cluster.json, or Zookeeper address required")
}
config.logger.Println("Addresss obtained: ", url)
config.logger.Println("Address obtained: ", url)
url, err = validateAuroraURL(url)
if err != nil {
return nil, errors.Wrap(err, "invalid Aurora url")
@ -417,7 +436,8 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory),
logger: LevelLogger{logger: config.logger, debug: config.debug, trace: config.trace},
lock: &sync.Mutex{},
transport: config.transport}, nil
transport: config.transport,
}, nil
}
// GetDefaultClusterFromZKUrl creates a cluster object from a Zoookeper url. This is deprecated in favor of using
@ -433,23 +453,6 @@ func GetDefaultClusterFromZKUrl(zkurl string) *Cluster {
}
}
func createCertPool(certPath string) (*x509.CertPool, error) {
globalRootCAs := x509.NewCertPool()
caFiles, err := ioutil.ReadDir(certPath)
if err != nil {
return nil, err
}
for _, cert := range caFiles {
caPathFile := filepath.Join(certPath, cert.Name())
caCert, err := ioutil.ReadFile(caPathFile)
if err != nil {
return nil, err
}
globalRootCAs.AppendCertsFromPEM(caCert)
}
return globalRootCAs, nil
}
// Creates a default Thrift Transport object for communications in gorealis using an HTTP Post Client
func defaultTTransport(url string, timeoutMs int, config *config) (thrift.TTransport, error) {
var transport http.Transport
@ -457,7 +460,7 @@ func defaultTTransport(url string, timeoutMs int, config *config) (thrift.TTrans
tlsConfig := &tls.Config{InsecureSkipVerify: config.insecureSkipVerify}
if config.certspath != "" {
rootCAs, err := createCertPool(config.certspath)
rootCAs, err := createCertPool(config.certspath, config.certExtensions)
if err != nil {
config.logger.Println("error occurred couldn't fetch certs")
return nil, err
@ -491,11 +494,11 @@ func defaultTTransport(url string, timeoutMs int, config *config) (thrift.TTrans
})
if err != nil {
return nil, errors.Wrap(err, "Error creating transport")
return nil, errors.Wrap(err, "error creating transport")
}
if err := trans.Open(); err != nil {
return nil, errors.Wrapf(err, "Error opening connection to %s", url)
return nil, errors.Wrapf(err, "error opening connection to %s", url)
}
return trans, nil
@ -509,6 +512,10 @@ func basicAuth(username, password string) string {
func (r *realisClient) ReestablishConn() error {
// Close existing connection
r.logger.Println("Re-establishing Connection to Aurora")
// This call must happen before we lock as it also uses
// the same lock from the client since close can be called
// by anyone from anywhere.
r.Close()
r.lock.Lock()
@ -534,16 +541,17 @@ func (r *realisClient) ReestablishConn() error {
return nil
}
// Releases resources associated with the realis client.
// Close releases resources associated with the realis client.
func (r *realisClient) Close() {
r.lock.Lock()
defer r.lock.Unlock()
r.transport.Close()
// The return value of Close here is ignored on purpose because there's nothing that can be done if it fails.
_ = r.transport.Close()
}
// Uses predefined set of states to retrieve a set of active jobs in Apache Aurora.
// GetInstanceIds uses a predefined set of states to retrieve a set of active jobs in the Aurora Scheduler.
func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states []aurora.ScheduleStatus) ([]int32, error) {
taskQ := &aurora.TaskQuery{
JobKeys: []*aurora.JobKey{{Environment: key.Environment, Role: key.Role, Name: key.Name}},
@ -556,7 +564,9 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states []aurora.Schedu
false,
func() (*aurora.Response, error) {
return r.client.GetTasksWithoutConfigs(context.TODO(), taskQ)
})
},
nil,
)
// If we encountered an error we couldn't recover from by retrying, return an error to the user
if retryErr != nil {
@ -581,10 +591,16 @@ func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue
false,
func() (*aurora.Response, error) {
return r.readonlyClient.GetJobUpdateSummaries(context.TODO(), jobUpdateQuery)
})
},
nil,
)
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error getting job update summaries from Aurora Scheduler")
return resp, errors.Wrap(retryErr, "error getting job update summaries from Aurora Scheduler")
}
if resp.GetResult_() == nil || resp.GetResult_().GetGetJobUpdateSummariesResult_() == nil {
return nil, errors.New("unexpected response from scheduler")
}
return resp, nil
@ -598,7 +614,9 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe
false,
func() (*aurora.Response, error) {
return r.readonlyClient.GetJobs(context.TODO(), role)
})
},
nil,
)
if retryErr != nil {
return nil, result, errors.Wrap(retryErr, "error getting Jobs from Aurora Scheduler")
@ -611,7 +629,7 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe
return resp, result, nil
}
// Kill specific instances of a job.
// KillInstances kills specific instances of a job.
func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) {
r.logger.debugPrintf("KillTasks Thrift Payload: %+v %v\n", key, instances)
@ -619,7 +637,9 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a
false,
func() (*aurora.Response, error) {
return r.client.KillTasks(context.TODO(), key, instances, "")
})
},
nil,
)
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler")
@ -631,7 +651,7 @@ func (r *realisClient) RealisConfig() *config {
return r.config
}
// Sends a kill message to the scheduler for all active tasks under a job.
// KillJob kills all instances of a job.
func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
r.logger.debugPrintf("KillTasks Thrift Payload: %+v\n", key)
@ -641,7 +661,9 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
func() (*aurora.Response, error) {
// Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards
return r.client.KillTasks(context.TODO(), key, nil, "")
})
},
nil,
)
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler")
@ -649,7 +671,7 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
return resp, nil
}
// Sends a create job message to the scheduler with a specific job configuration.
// CreateJob sends a create job message to the scheduler with a specific job configuration.
// Although this API is able to create service jobs, it is better to use CreateService instead
// as that API uses the update thrift call which has a few extra features available.
// Use this API to create ad-hoc jobs.
@ -657,19 +679,36 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
r.logger.debugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig())
// Response is checked by the thrift retry code
resp, retryErr := r.thriftCallWithRetries(
false,
func() (*aurora.Response, error) {
return r.client.CreateJob(context.TODO(), auroraJob.JobConfig())
})
},
// On a client timeout, attempt to verify that payload made to the Scheduler by
// trying to get the config summary for the job key
func() (*aurora.Response, bool) {
exists, err := r.jobExists(*auroraJob.JobKey())
if err != nil {
r.logger.Print("verification failed ", err)
}
if exists {
return &aurora.Response{ResponseCode: aurora.ResponseCode_OK}, true
}
return nil, false
},
)
if retryErr != nil {
return resp, errors.Wrap(retryErr, "error sending Create command to Aurora Scheduler")
}
return resp, nil
}
// This API uses an update thrift call to create the services giving a few more robust features.
// CreateService uses the scheduler's updating mechanism to create a job.
func (r *realisClient) CreateService(
auroraJob Job,
settings *aurora.JobUpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error) {
@ -680,17 +719,12 @@ func (r *realisClient) CreateService(
resp, err := r.StartJobUpdate(update, "")
if err != nil {
if IsTimeout(err) {
return resp, nil, err
return nil, nil, err
}
return resp, nil, errors.Wrap(err, "unable to create service")
}
if resp.GetResult_() != nil {
return resp, resp.GetResult_().GetStartJobUpdateResult_(), nil
}
return nil, nil, errors.New("results object is nil")
return resp, resp.GetResult_().StartJobUpdateResult_, nil
}
func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) {
@ -700,7 +734,9 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
false,
func() (*aurora.Response, error) {
return r.client.ScheduleCronJob(context.TODO(), auroraJob.JobConfig())
})
},
nil,
)
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending Cron Job Schedule message to Aurora Scheduler")
@ -716,7 +752,9 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response,
false,
func() (*aurora.Response, error) {
return r.client.DescheduleCronJob(context.TODO(), key)
})
},
nil,
)
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending Cron Job De-schedule message to Aurora Scheduler")
@ -734,7 +772,9 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error
false,
func() (*aurora.Response, error) {
return r.client.StartCronJob(context.TODO(), key)
})
},
nil,
)
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending Start Cron Job message to Aurora Scheduler")
@ -743,7 +783,7 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error
}
// Restarts specific instances specified
// RestartInstances restarts the specified instances of a Job.
func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) {
r.logger.debugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instances)
@ -751,7 +791,9 @@ func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32)
false,
func() (*aurora.Response, error) {
return r.client.RestartShards(context.TODO(), key, instances)
})
},
nil,
)
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler")
@ -759,12 +801,12 @@ func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32)
return resp, nil
}
// Restarts all active tasks under a job configuration.
// RestartJob restarts all active instances of a Job.
func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) {
instanceIds, err1 := r.GetInstanceIds(key, aurora.ACTIVE_STATES)
if err1 != nil {
return nil, errors.Wrap(err1, "Could not retrieve relevant task instance IDs")
return nil, errors.Wrap(err1, "could not retrieve relevant task instance IDs")
}
r.logger.debugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instanceIds)
@ -774,7 +816,9 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
false,
func() (*aurora.Response, error) {
return r.client.RestartShards(context.TODO(), key, instanceIds)
})
},
nil,
)
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler")
@ -786,7 +830,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
return nil, errors.New("No tasks in the Active state")
}
// Update all tasks under a job configuration. Currently gorealis doesn't support for canary deployments.
// StartJobUpdate updates all instances under a job configuration.
func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) {
r.logger.debugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message)
@ -795,20 +839,56 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au
true,
func() (*aurora.Response, error) {
return r.client.StartJobUpdate(context.TODO(), updateJob.req, message)
})
},
func() (*aurora.Response, bool) {
summariesResp, err := r.readonlyClient.GetJobUpdateSummaries(
context.TODO(),
&aurora.JobUpdateQuery{
JobKey: updateJob.JobKey(),
UpdateStatuses: aurora.ACTIVE_JOB_UPDATE_STATES,
Limit: 1,
})
if err != nil {
r.logger.Print("verification failed ", err)
return nil, false
}
summaries := response.JobUpdateSummaries(summariesResp)
if len(summaries) == 0 {
return nil, false
}
return &aurora.Response{
ResponseCode: aurora.ResponseCode_OK,
Result_: &aurora.Result_{
StartJobUpdateResult_: &aurora.StartJobUpdateResult_{
UpdateSummary: summaries[0],
Key: summaries[0].Key,
},
},
}, true
},
)
if retryErr != nil {
// A timeout took place when attempting this call, attempt to recover
if IsTimeout(retryErr) {
return resp, retryErr
return nil, retryErr
}
return resp, errors.Wrap(retryErr, "error sending StartJobUpdate command to Aurora Scheduler")
}
if resp.GetResult_() == nil {
return resp, errors.New("no result in response")
}
return resp, nil
}
// Abort Job Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI.
// AbortJobUpdate terminates a job update in the scheduler.
// It requires the updateId which can be obtained on the Aurora web UI.
// This API is meant to be synchronous. It will attempt to wait until the update transitions to the aborted state.
// However, if the job update does not transition to the ABORT state an error will be returned.
func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) {
@ -819,7 +899,9 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str
false,
func() (*aurora.Response, error) {
return r.client.AbortJobUpdate(context.TODO(), &updateKey, message)
})
},
nil,
)
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending AbortJobUpdate command to Aurora Scheduler")
@ -836,7 +918,8 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str
return resp, err
}
// Pause Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.
// PauseJobUpdate pauses the progress of an ongoing update.
// The UpdateID value needed for this function is returned from StartJobUpdate or can be obtained from the Aurora web UI.
func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) {
r.logger.debugPrintf("PauseJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
@ -845,7 +928,9 @@ func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st
false,
func() (*aurora.Response, error) {
return r.client.PauseJobUpdate(context.TODO(), updateKey, message)
})
},
nil,
)
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending PauseJobUpdate command to Aurora Scheduler")
@ -854,7 +939,7 @@ func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st
return resp, nil
}
// Resume Paused Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.
// ResumeJobUpdate resumes a previously Paused Job update.
func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) {
r.logger.debugPrintf("ResumeJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
@ -863,7 +948,9 @@ func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s
false,
func() (*aurora.Response, error) {
return r.client.ResumeJobUpdate(context.TODO(), updateKey, message)
})
},
nil,
)
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending ResumeJobUpdate command to Aurora Scheduler")
@ -872,7 +959,7 @@ func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s
return resp, nil
}
// Pulse Job Update on Aurora. UpdateID is returned from StartJobUpdate or the Aurora web UI.
// PulseJobUpdate sends a pulse to an ongoing Job update.
func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.Response, error) {
r.logger.debugPrintf("PulseJobUpdate Thrift Payload: %+v\n", updateKey)
@ -881,7 +968,9 @@ func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R
false,
func() (*aurora.Response, error) {
return r.client.PulseJobUpdate(context.TODO(), updateKey)
})
},
nil,
)
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending PulseJobUpdate command to Aurora Scheduler")
@ -890,8 +979,7 @@ func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R
return resp, nil
}
// Scale up the number of instances under a job configuration using the configuration for specific
// instance to scale up.
// AddInstances scales up the number of instances for a Job.
func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) {
r.logger.debugPrintf("AddInstances Thrift Payload: %+v %v\n", instKey, count)
@ -900,7 +988,9 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a
false,
func() (*aurora.Response, error) {
return r.client.AddInstances(context.TODO(), &instKey, count)
})
},
nil,
)
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending AddInstances command to Aurora Scheduler")
@ -909,15 +999,15 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a
}
// Scale down the number of instances under a job configuration using the configuration of a specific instance
// RemoveInstances scales down the number of instances for a Job.
func (r *realisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora.Response, error) {
instanceIds, err := r.GetInstanceIds(key, aurora.ACTIVE_STATES)
if err != nil {
return nil, errors.Wrap(err, "RemoveInstances: Could not retrieve relevant instance IDs")
return nil, errors.Wrap(err, "could not retrieve relevant instance IDs")
}
if len(instanceIds) < int(count) {
return nil, errors.Errorf("Insufficient active instances available for killing: "+
return nil, errors.Errorf("insufficient active instances available for killing: "+
" Instances to be killed %d Active instances %d", count, len(instanceIds))
}
@ -930,7 +1020,7 @@ func (r *realisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora
return r.KillInstances(key, instanceIds[:count]...)
}
// Get information about task including a fully hydrated task configuration object
// GetTaskStatus gets information about task including a fully hydrated task configuration object.
func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) {
r.logger.debugPrintf("GetTasksStatus Thrift Payload: %+v\n", query)
@ -939,7 +1029,9 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.Schedul
false,
func() (*aurora.Response, error) {
return r.client.GetTasksStatus(context.TODO(), query)
})
},
nil,
)
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status")
@ -948,7 +1040,7 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.Schedul
return response.ScheduleStatusResult(resp).GetTasks(), nil
}
// Get pending reason
// GetPendingReason returns the reason why the an instance of a Job has not been scheduled.
func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.PendingReason, error) {
r.logger.debugPrintf("GetPendingReason Thrift Payload: %+v\n", query)
@ -957,7 +1049,9 @@ func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.Pend
false,
func() (*aurora.Response, error) {
return r.client.GetPendingReason(context.TODO(), query)
})
},
nil,
)
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for pending Reasons")
@ -972,7 +1066,8 @@ func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.Pend
return pendingReasons, nil
}
// Get information about task including without a task configuration object
// GetTasksWithoutConfigs gets information about task including without a task configuration object.
// This is a more lightweight version of GetTaskStatus but contains less information as a result.
func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) {
r.logger.debugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query)
@ -981,7 +1076,9 @@ func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*auror
false,
func() (*aurora.Response, error) {
return r.client.GetTasksWithoutConfigs(context.TODO(), query)
})
},
nil,
)
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status without configs")
@ -991,7 +1088,7 @@ func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*auror
}
// Get the task configuration from the aurora scheduler for a job
// FetchTaskConfig gets the task configuration from the aurora scheduler for a job.
func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error) {
taskQ := &aurora.TaskQuery{
Role: &instKey.JobKey.Role,
@ -1007,7 +1104,9 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
false,
func() (*aurora.Response, error) {
return r.client.GetTasksStatus(context.TODO(), taskQ)
})
},
nil,
)
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task configuration")
@ -1016,7 +1115,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
tasks := response.ScheduleStatusResult(resp).GetTasks()
if len(tasks) == 0 {
return nil, errors.Errorf("Instance %d for jobkey %s/%s/%s doesn't exist",
return nil, errors.Errorf("instance %d for jobkey %s/%s/%s doesn't exist",
instKey.InstanceId,
instKey.JobKey.Environment,
instKey.JobKey.Role,
@ -1035,10 +1134,12 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur
false,
func() (*aurora.Response, error) {
return r.client.GetJobUpdateDetails(context.TODO(), &updateQuery)
})
},
nil,
)
if retryErr != nil {
return nil, errors.Wrap(retryErr, "Unable to get job update details")
return nil, errors.Wrap(retryErr, "unable to get job update details")
}
return resp, nil
@ -1052,7 +1153,9 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string
false,
func() (*aurora.Response, error) {
return r.client.RollbackJobUpdate(context.TODO(), &key, message)
})
},
nil,
)
if retryErr != nil {
return nil, errors.Wrap(retryErr, "unable to roll back job update")

View file

@ -30,7 +30,9 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr
false,
func() (*aurora.Response, error) {
return r.adminClient.DrainHosts(context.TODO(), drainList)
})
},
nil,
)
if retryErr != nil {
return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
@ -56,6 +58,18 @@ func (r *realisClient) SLADrainHosts(
return nil, errors.New("no hosts provided to drain")
}
if policy == nil || policy.CountSetFieldsSlaPolicy() == 0 {
policy = &defaultSlaPolicy
r.logger.Printf("Warning: start draining with default sla policy %v", policy)
}
if timeout < 0 {
r.logger.Printf("Warning: timeout %d secs is invalid, draining with default timeout %d secs",
timeout,
defaultSlaDrainTimeoutSecs)
timeout = defaultSlaDrainTimeoutSecs
}
drainList := aurora.NewHosts()
drainList.HostNames = hosts
@ -65,7 +79,9 @@ func (r *realisClient) SLADrainHosts(
false,
func() (*aurora.Response, error) {
return r.adminClient.SlaDrainHosts(context.TODO(), drainList, policy, timeout)
})
},
nil,
)
if retryErr != nil {
return result, errors.Wrap(retryErr, "Unable to recover connection")
@ -95,7 +111,9 @@ func (r *realisClient) StartMaintenance(hosts ...string) (*aurora.Response, *aur
false,
func() (*aurora.Response, error) {
return r.adminClient.StartMaintenance(context.TODO(), hostList)
})
},
nil,
)
if retryErr != nil {
return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
@ -125,7 +143,9 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror
false,
func() (*aurora.Response, error) {
return r.adminClient.EndMaintenance(context.TODO(), hostList)
})
},
nil,
)
if retryErr != nil {
return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
@ -157,7 +177,9 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au
false,
func() (*aurora.Response, error) {
return r.adminClient.MaintenanceStatus(context.TODO(), hostList)
})
},
nil,
)
if retryErr != nil {
return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
@ -182,7 +204,9 @@ func (r *realisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb
false,
func() (*aurora.Response, error) {
return r.adminClient.SetQuota(context.TODO(), role, quota)
})
},
nil,
)
if retryErr != nil {
return resp, errors.Wrap(retryErr, "Unable to set role quota")
@ -198,7 +222,9 @@ func (r *realisClient) GetQuota(role string) (*aurora.Response, error) {
false,
func() (*aurora.Response, error) {
return r.adminClient.GetQuota(context.TODO(), role)
})
},
nil,
)
if retryErr != nil {
return resp, errors.Wrap(retryErr, "Unable to get role quota")
@ -213,7 +239,9 @@ func (r *realisClient) Snapshot() error {
false,
func() (*aurora.Response, error) {
return r.adminClient.Snapshot(context.TODO())
})
},
nil,
)
if retryErr != nil {
return errors.Wrap(retryErr, "Unable to recover connection")
@ -229,7 +257,9 @@ func (r *realisClient) PerformBackup() error {
false,
func() (*aurora.Response, error) {
return r.adminClient.PerformBackup(context.TODO())
})
},
nil,
)
if retryErr != nil {
return errors.Wrap(retryErr, "Unable to recover connection")
@ -244,7 +274,9 @@ func (r *realisClient) ForceImplicitTaskReconciliation() error {
false,
func() (*aurora.Response, error) {
return r.adminClient.TriggerImplicitTaskReconciliation(context.TODO())
})
},
nil,
)
if retryErr != nil {
return errors.Wrap(retryErr, "Unable to recover connection")
@ -265,7 +297,9 @@ func (r *realisClient) ForceExplicitTaskReconciliation(batchSize *int32) error {
_, retryErr := r.thriftCallWithRetries(false,
func() (*aurora.Response, error) {
return r.adminClient.TriggerExplicitTaskReconciliation(context.TODO(), settings)
})
},
nil,
)
if retryErr != nil {
return errors.Wrap(retryErr, "Unable to recover connection")

View file

@ -306,6 +306,40 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) {
_, err = r.KillJob(job.JobKey())
assert.NoError(t, err)
})
t.Run("Duplicate_constraints", func(t *testing.T) {
job.Name("thermos_duplicate_constraints").
AddValueConstraint("zone", false, "east", "west").
AddValueConstraint("zone", false, "east").
AddValueConstraint("zone", true, "west")
_, err := r.CreateJob(job)
require.NoError(t, err)
success, err := monitor.Instances(job.JobKey(), 2, 1, 50)
assert.True(t, success)
assert.NoError(t, err)
_, err = r.KillJob(job.JobKey())
assert.NoError(t, err)
})
t.Run("Overwrite_constraints", func(t *testing.T) {
job.Name("thermos_overwrite_constraints").
AddLimitConstraint("zone", 1).
AddValueConstraint("zone", true, "west", "east").
AddLimitConstraint("zone", 2)
_, err := r.CreateJob(job)
require.NoError(t, err)
success, err := monitor.Instances(job.JobKey(), 2, 1, 50)
assert.True(t, success)
assert.NoError(t, err)
_, err = r.KillJob(job.JobKey())
assert.NoError(t, err)
})
}
// Test configuring an executor that doesn't exist for CreateJob API
@ -505,7 +539,7 @@ func TestRealisClient_CreateService(t *testing.T) {
timeoutClient, err := realis.NewRealisClient(
realis.SchedulerUrl(auroraURL),
realis.BasicAuth("aurora", "secret"),
realis.TimeoutMS(10),
realis.TimeoutMS(5),
)
require.NoError(t, err)
defer timeoutClient.Close()
@ -716,6 +750,53 @@ func TestRealisClient_SLADrainHosts(t *testing.T) {
5,
10)
assert.NoError(t, err)
// slaDrainHosts goes with default policy if no policy is specified
_, err = r.SLADrainHosts(nil, 30, hosts...)
require.NoError(t, err, "unable to drain host with SLA policy")
// Monitor change to DRAINING and DRAINED mode
hostResults, err = monitor.HostMaintenance(
hosts,
[]aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING},
1,
50)
assert.NoError(t, err)
assert.Equal(t, map[string]bool{"localhost": true}, hostResults)
_, _, err = r.EndMaintenance(hosts...)
require.NoError(t, err)
// Monitor change to DRAINING and DRAINED mode
_, err = monitor.HostMaintenance(
hosts,
[]aurora.MaintenanceMode{aurora.MaintenanceMode_NONE},
5,
10)
assert.NoError(t, err)
_, err = r.SLADrainHosts(&aurora.SlaPolicy{}, 30, hosts...)
require.NoError(t, err, "unable to drain host with SLA policy")
// Monitor change to DRAINING and DRAINED mode
hostResults, err = monitor.HostMaintenance(
hosts,
[]aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING},
1,
50)
assert.NoError(t, err)
assert.Equal(t, map[string]bool{"localhost": true}, hostResults)
_, _, err = r.EndMaintenance(hosts...)
require.NoError(t, err)
// Monitor change to DRAINING and DRAINED mode
_, err = monitor.HostMaintenance(
hosts,
[]aurora.MaintenanceMode{aurora.MaintenanceMode_NONE},
5,
10)
assert.NoError(t, err)
}
// Test multiple go routines using a single connection
@ -1025,8 +1106,10 @@ func TestRealisClient_BatchAwareAutoPause(t *testing.T) {
assert.Equal(t, i, curStep)
_, err = r.ResumeJobUpdate(&key, "auto resuming test")
require.NoError(t, err)
if i != len(updateGroups)-1 {
_, err = r.ResumeJobUpdate(&key, "auto resuming test")
require.NoError(t, err)
}
}
_, err = r.KillJob(job.JobKey())

View file

@ -36,6 +36,10 @@ func ScheduleStatusResult(resp *aurora.Response) *aurora.ScheduleStatusResult_ {
}
func JobUpdateSummaries(resp *aurora.Response) []*aurora.JobUpdateSummary {
if resp.GetResult_() == nil || resp.GetResult_().GetGetJobUpdateSummariesResult_() == nil {
return nil
}
return resp.GetResult_().GetGetJobUpdateSummariesResult_().GetUpdateSummaries()
}

169
retry.go
View file

@ -114,10 +114,19 @@ func ExponentialBackoff(backoff Backoff, logger logger, condition ConditionFunc)
type auroraThriftCall func() (resp *aurora.Response, err error)
// verifyOntimeout defines the type of function that will be used to verify whether a Thirft call to the Scheduler
// made it to the scheduler or not. In general, these types of functions will have to interact with the scheduler
// through the very same Thrift API which previously encountered a time out from the client.
// This means that the functions themselves should be kept to a minimum number of Thrift calls.
// It should also be noted that this is a best effort mechanism and
// is likely to fail for the same reasons that the original call failed.
type verifyOnTimeout func() (*aurora.Response, bool)
// Duplicates the functionality of ExponentialBackoff but is specifically targeted towards ThriftCalls.
func (r *realisClient) thriftCallWithRetries(
returnOnTimeout bool,
thriftCall auroraThriftCall) (*aurora.Response, error) {
thriftCall auroraThriftCall,
verifyOnTimeout verifyOnTimeout) (*aurora.Response, error) {
var resp *aurora.Response
var clientErr error
@ -137,7 +146,7 @@ func (r *realisClient) thriftCallWithRetries(
}
r.logger.Printf(
"A retryable error occurred during thrift call, backing off for %v before retry %v\n",
"A retryable error occurred during thrift call, backing off for %v before retry %v",
adjusted,
curStep)
@ -154,45 +163,25 @@ func (r *realisClient) thriftCallWithRetries(
resp, clientErr = thriftCall()
r.logger.tracePrintf("Aurora Thrift Call ended resp: %v clientErr: %v\n", resp, clientErr)
r.logger.tracePrintf("Aurora Thrift Call ended resp: %v clientErr: %v", resp, clientErr)
}()
// Check if our thrift call is returning an error. This is a retryable event as we don't know
// if it was caused by network issues.
// Check if our thrift call is returning an error.
if clientErr != nil {
// Print out the error to the user
r.logger.Printf("Client Error: %v\n", clientErr)
r.logger.Printf("Client Error: %v", clientErr)
// Determine if error is a temporary URL error by going up the stack
e, ok := clientErr.(thrift.TTransportException)
if ok {
r.logger.debugPrint("Encountered a transport exception")
temporary, timedout := isConnectionError(clientErr)
if !temporary && r.RealisConfig().failOnPermanentErrors {
return nil, errors.Wrap(clientErr, "permanent connection error")
}
e, ok := e.Err().(*url.Error)
if ok {
// EOF error occurs when the server closes the read buffer of the client. This is common
// when the server is overloaded and should be retried. All other errors that are permanent
// will not be retried.
if e.Err != io.EOF && !e.Temporary() && r.RealisConfig().failOnPermanentErrors {
return nil, errors.Wrap(clientErr, "permanent connection error")
}
// Corner case where thrift payload was received by Aurora but connection timedout before Aurora was
// able to reply. In this case we will return whatever response was received and a TimedOut behaving
// error. Users can take special action on a timeout by using IsTimedout and reacting accordingly.
if e.Timeout() {
timeouts++
r.logger.debugPrintf(
"Client closed connection (timedout) %d times before server responded, "+
"consider increasing connection timeout",
timeouts)
if returnOnTimeout {
return resp, newTimedoutError(errors.New("client connection closed before server answer"))
}
}
}
// There exists a corner case where thrift payload was received by Aurora but
// connection timed out before Aurora was able to reply.
// Users can take special action on a timeout by using IsTimedout and reacting accordingly
// if they have configured the client to return on a timeout.
if timedout && returnOnTimeout {
return resp, newTimedoutError(errors.New("client connection closed before server answer"))
}
// In the future, reestablish connection should be able to check if it is actually possible
@ -202,48 +191,71 @@ func (r *realisClient) thriftCallWithRetries(
if reestablishErr != nil {
r.logger.debugPrintf("error re-establishing connection ", reestablishErr)
}
} else {
// If there was no client error, but the response is nil, something went wrong.
// Ideally, we'll never encounter this but we're placing a safeguard here.
if resp == nil {
return nil, errors.New("response from aurora is nil")
// If users did not opt for a return on timeout in order to react to a timedout error,
// attempt to verify that the call made it to the scheduler after the connection was re-established.
if timedout {
timeouts++
r.logger.debugPrintf(
"Client closed connection %d times before server responded, "+
"consider increasing connection timeout",
timeouts)
// Allow caller to provide a function which checks if the original call was successful before
// it timed out.
if verifyOnTimeout != nil {
if verifyResp, ok := verifyOnTimeout(); ok {
r.logger.Print("verified that the call went through successfully after a client timeout")
// Response here might be different than the original as it is no longer constructed
// by the scheduler but mimicked.
// This is OK since the scheduler is very unlikely to change responses at this point in its
// development cycle but we must be careful to not return an incorrectly constructed response.
return verifyResp, nil
}
}
}
// Check Response Code from thrift and make a decision to continue retrying or not.
switch responseCode := resp.GetResponseCode(); responseCode {
// Retry the thrift payload
continue
}
// If the thrift call succeeded, stop retrying
case aurora.ResponseCode_OK:
return resp, nil
// If there was no client error, but the response is nil, something went wrong.
// Ideally, we'll never encounter this but we're placing a safeguard here.
if resp == nil {
return nil, errors.New("response from aurora is nil")
}
// If the response code is transient, continue retrying
case aurora.ResponseCode_ERROR_TRANSIENT:
r.logger.Println("Aurora replied with Transient error code, retrying")
continue
// Check Response Code from thrift and make a decision to continue retrying or not.
switch responseCode := resp.GetResponseCode(); responseCode {
// Failure scenarios, these indicate a bad payload or a bad config. Stop retrying.
case aurora.ResponseCode_INVALID_REQUEST,
aurora.ResponseCode_ERROR,
aurora.ResponseCode_AUTH_FAILED,
aurora.ResponseCode_JOB_UPDATING_ERROR:
r.logger.Printf("Terminal Response Code %v from Aurora, won't retry\n", resp.GetResponseCode().String())
return resp, errors.New(response.CombineMessage(resp))
// If the thrift call succeeded, stop retrying
case aurora.ResponseCode_OK:
return resp, nil
// The only case that should fall down to here is a WARNING response code.
// It is currently not used as a response in the scheduler so it is unknown how to handle it.
default:
r.logger.debugPrintf("unhandled response code %v received from Aurora\n", responseCode)
return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String())
}
// If the response code is transient, continue retrying
case aurora.ResponseCode_ERROR_TRANSIENT:
r.logger.Println("Aurora replied with Transient error code, retrying")
continue
// Failure scenarios, these indicate a bad payload or a bad config. Stop retrying.
case aurora.ResponseCode_INVALID_REQUEST,
aurora.ResponseCode_ERROR,
aurora.ResponseCode_AUTH_FAILED,
aurora.ResponseCode_JOB_UPDATING_ERROR:
r.logger.Printf("Terminal Response Code %v from Aurora, won't retry\n", resp.GetResponseCode().String())
return resp, errors.New(response.CombineMessage(resp))
// The only case that should fall down to here is a WARNING response code.
// It is currently not used as a response in the scheduler so it is unknown how to handle it.
default:
r.logger.debugPrintf("unhandled response code %v received from Aurora\n", responseCode)
return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String())
}
}
r.logger.debugPrintf("it took %v retries to complete this operation\n", curStep)
if curStep > 1 {
r.config.logger.Printf("retried this thrift call %d time(s)", curStep)
r.config.logger.Printf("this thrift call was retried %d time(s)", curStep)
}
// Provide more information to the user wherever possible.
@ -253,3 +265,30 @@ func (r *realisClient) thriftCallWithRetries(
return nil, newRetryError(errors.New("ran out of retries"), curStep)
}
// isConnectionError processes the error received by the client.
// The return values indicate weather this was determined to be a temporary error
// and weather it was determined to be a timeout error
func isConnectionError(err error) (bool, bool) {
// Determine if error is a temporary URL error by going up the stack
transportException, ok := err.(thrift.TTransportException)
if !ok {
return false, false
}
urlError, ok := transportException.Err().(*url.Error)
if !ok {
return false, false
}
// EOF error occurs when the server closes the read buffer of the client. This is common
// when the server is overloaded and we consider it temporary.
// All other which are not temporary as per the member function Temporary(),
// are considered not temporary (permanent).
if urlError.Err != io.EOF && !urlError.Temporary() {
return false, false
}
return true, urlError.Timeout()
}

View file

@ -1,4 +1,4 @@
#!/bin/bash
# Since we run our docker compose setup in bridge mode to be able to run on MacOS, we have to launch a Docker container within the bridge network in order to avoid any routing issues.
docker run --rm -t -v $(pwd):/go/src/github.com/paypal/gorealis --network gorealis_aurora_cluster golang:1.10-stretch go test -v github.com/paypal/gorealis $@
docker run --rm -t -w /gorealis -v $GOPATH/pkg:/go/pkg -v $(pwd):/gorealis --network gorealis_aurora_cluster golang:1.16-buster go test -v github.com/paypal/gorealis $@

51
util.go
View file

@ -1,7 +1,11 @@
package realis
import (
"crypto/x509"
"io/ioutil"
"net/url"
"os"
"path/filepath"
"strings"
"github.com/paypal/gorealis/gen-go/apache/aurora"
@ -25,7 +29,7 @@ var TerminalStates = make(map[aurora.ScheduleStatus]bool)
// ActiveJobUpdateStates - States a Job Update may be in where it is considered active.
var ActiveJobUpdateStates = make(map[aurora.JobUpdateStatus]bool)
// TerminalJobUpdateStates returns a slice containing all the terminal states an update may end up in.
// TerminalUpdateStates returns a slice containing all the terminal states an update may be in.
// This is a function in order to avoid having a slice that can be accidentally mutated.
func TerminalUpdateStates() []aurora.JobUpdateStatus {
return []aurora.JobUpdateStatus{
@ -65,6 +69,49 @@ func init() {
}
}
// createCertPool will attempt to load certificates into a certificate pool from a given directory.
// Only files with an extension contained in the extension map are considered.
// This function ignores any files that cannot be read successfully or cannot be added to the certPool
// successfully.
func createCertPool(path string, extensions map[string]struct{}) (*x509.CertPool, error) {
_, err := os.Stat(path)
if err != nil {
return nil, errors.Wrap(err, "unable to load certificates")
}
caFiles, err := ioutil.ReadDir(path)
if err != nil {
return nil, err
}
certPool := x509.NewCertPool()
loadedCerts := 0
for _, cert := range caFiles {
// Skip directories
if cert.IsDir() {
continue
}
// Skip any files that do not contain the right extension
if _, ok := extensions[filepath.Ext(cert.Name())]; !ok {
continue
}
pem, err := ioutil.ReadFile(filepath.Join(path, cert.Name()))
if err != nil {
continue
}
if certPool.AppendCertsFromPEM(pem) {
loadedCerts++
}
}
if loadedCerts == 0 {
return nil, errors.New("no certificates were able to be successfully loaded")
}
return certPool, nil
}
func validateAuroraURL(location string) (string, error) {
// If no protocol defined, assume http
@ -92,7 +139,7 @@ func validateAuroraURL(location string) (string, error) {
return "", errors.Errorf("only protocols http and https are supported %v\n", u.Scheme)
}
// This could theoretically be elsewhwere but we'll be strict for the sake of simplicty
// This could theoretically be elsewhere but we'll be strict for the sake of simplicity
if u.Path != apiPath {
return "", errors.Errorf("expected /api path %v\n", u.Path)
}

View file

@ -100,3 +100,15 @@ func TestCurrentBatchCalculator(t *testing.T) {
assert.Equal(t, 0, curBatch)
})
}
func TestCertPoolCreator(t *testing.T) {
extensions := map[string]struct{}{".crt": {}}
_, err := createCertPool("examples/certs", extensions)
assert.NoError(t, err)
t.Run("badDir", func(t *testing.T) {
_, err := createCertPool("idontexist", extensions)
assert.Error(t, err)
})
}