Compare commits

..

4 commits
main ... future

Author SHA1 Message Date
Renan I. Del Valle
efc0fdcd81
Moving future to final 0.22.0 release and Mesos 1.6.2 (#114)
Changes in compose testing setup:
* Upgrading Aurora to 0.22.0
* Upgrading Mesos to 1.6.2
2020-01-14 15:34:59 -08:00
Renan I. Del Valle
b505304b79
Adding autopause APIs to future (#110)
* Updating thrift definitions to add autopause for batch based update strategies.

* Adding batch calculator utility and test cases for it.

* Adding PauseUpdateMonitor which allows users to poll Aurora for information on an active Update being carried out until it enters the ROLL_FORWARD_PAUSED state.

* Tests for PauseUpdateMonitor and VariableBatchStep added to the end to end tests.

* Adding TerminalUpdateStates function which returns a slice containing all terminal states for an update. Changed signature of JobUpdateStatus from using a map for desired states to a slice. A map is no longer necessary with the new version of thrift and only adds complexity.
2020-01-13 16:03:40 -08:00
Renan DelValle
2148351b94
Variable Batch Update Support (#100)
* Changing generateBinding.sh check to check for thrift 0.12.0 and adding support for Variable Batch updates.

* Adding update strategies change to changelog, changed docker-compose to point to aurora 0.22.0 snapshot. Added test coverage for update strategies.
2019-08-28 17:12:59 -07:00
Renan DelValle
f936576c4d
Increasing aurora version for future branch. 2019-08-28 17:11:32 -07:00
31 changed files with 7704 additions and 11184 deletions

View file

@ -1 +1 @@
0.23.0 0.22.0

25
.github/main.yml vendored
View file

@ -1,25 +0,0 @@
name: CI
on: [push]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Setup Go for use with actions
uses: actions/setup-go@v2
with:
go-version: 1.16
- name: Install goimports
run: go get golang.org/x/tools/cmd/goimports
- name: Set env with list of directories in repo containin go code
run: echo GO_USR_DIRS=$(go list -f {{.Dir}} ./... | grep -E -v "/gen-go/|/vendor/") >> $GITHUB_ENV
- name: Run goimports check
run: test -z "`for d in $GO_USR_DIRS; do goimports -d $d/*.go | tee /dev/stderr; done`"
- name: Create aurora/mesos docker cluster
run: docker-compose up -d
- name: Run tests
run: go test -timeout 35m -race -coverprofile=coverage.txt -covermode=atomic -v github.com/paypal/gorealis

View file

@ -1,57 +0,0 @@
# For most projects, this workflow file will not need changing; you simply need
# to commit it to your repository.
#
# You may wish to alter this file to override the set of languages analyzed,
# or to provide custom queries or build logic.
#
# ******** NOTE ********
# We have attempted to detect the languages in your repository. Please check
# the `language` matrix defined below to confirm you have the correct set of
# supported CodeQL languages.
#
name: "CodeQL"
on:
push:
branches: [ main ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ main ]
schedule:
- cron: '34 4 * * 3'
jobs:
analyze:
name: Analyze
runs-on: ubuntu-latest
permissions:
actions: read
contents: read
security-events: write
strategy:
fail-fast: false
matrix:
language: [ 'go' ]
# CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python' ]
# Learn more:
# https://docs.github.com/en/free-pro-team@latest/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#changing-the-languages-that-are-analyzed
steps:
- name: Checkout repository
uses: actions/checkout@v2
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v1
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
# queries: ./path/to/local/query, your-org/your-repo/queries@main
- run: go build examples/client.go
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v1

View file

@ -1,30 +0,0 @@
name: CI
on:
push:
branches:
- main
pull_request:
branches:
- main
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Setup Go for use with actions
uses: actions/setup-go@v2
with:
go-version: 1.16
- name: Install goimports
run: go get golang.org/x/tools/cmd/goimports
- name: Set env with list of directories in repo containin go code
run: echo GO_USR_DIRS=$(go list -f {{.Dir}} ./... | grep -E -v "/gen-go/|/vendor/") >> $GITHUB_ENV
- name: Run goimports check
run: test -z "`for d in $GO_USR_DIRS; do goimports -d $d/*.go | tee /dev/stderr; done`"
- name: Create aurora/mesos docker cluster
run: docker-compose up -d
- name: Run tests
run: go test -timeout 35m -race -coverprofile=coverage.txt -covermode=atomic -v github.com/paypal/gorealis

33
.travis.yml Normal file
View file

@ -0,0 +1,33 @@
sudo: required
dist: xenial
language: go
branches:
only:
- master
- master-v2.0
- future
go:
- "1.10.x"
env:
global:
- GO_USR_DIRS=$(go list -f {{.Dir}} ./... | grep -E -v "/gen-go/|/vendor/")
services:
- docker
before_install:
- go get golang.org/x/tools/cmd/goimports
- test -z "`for d in $GO_USR_DIRS; do goimports -d $d/*.go | tee /dev/stderr; done`"
install:
- docker-compose up -d
script:
- go test -race -coverprofile=coverage.txt -covermode=atomic -v github.com/paypal/gorealis
after_success:
- bash <(curl -s https://codecov.io/bash)

View file

@ -1,41 +1,4 @@
1.25.1 (unreleased) 1.22.0 (unreleased)
1.25.0
* Add priority api
1.24.0
* enable default sla for slaDrain
* Changes Travis CI badge to Github Actions badge
* Bug fix for auto paused update monitor
* Adds support for running CI on github actions
1.23.0
* First release tested against Aurora Scheduler 0.23.0
1.22.5
* Upgrading to thrift 0.14.0
1.22.4
* Updates which result in a no-op now return a response value so that the caller may analyze it to determine what happened
1.22.3
* Contains a monitor timeout fix. Previously an error was being left unchecked which made a specific monitor timining out not be handled properly.
1.22.2
* Bug fix: Change in retry mechanism created a deadlock. This release reverts that particular change.
1.22.1
* Adding safeguards against setting multiple constraints with the same name for a single task.
1.22.0
* CreateService and StartJobUpdate do not continue retrying if a timeout has been encountered * CreateService and StartJobUpdate do not continue retrying if a timeout has been encountered
by the HTTP client. Instead they now return an error that conforms to the Timedout interface. by the HTTP client. Instead they now return an error that conforms to the Timedout interface.

64
Gopkg.lock generated Normal file
View file

@ -0,0 +1,64 @@
# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'.
[[projects]]
digest = "1:89696c38cec777120b8b1bb5e2d363d655cf2e1e7d8c851919aaa0fd576d9b86"
name = "github.com/apache/thrift"
packages = ["lib/go/thrift"]
pruneopts = ""
revision = "384647d290e2e4a55a14b1b7ef1b7e66293a2c33"
version = "v0.12.0"
[[projects]]
digest = "1:56c130d885a4aacae1dd9c7b71cfe39912c7ebc1ff7d2b46083c8812996dc43b"
name = "github.com/davecgh/go-spew"
packages = ["spew"]
pruneopts = ""
revision = "346938d642f2ec3594ed81d874461961cd0faa76"
version = "v1.1.0"
[[projects]]
digest = "1:df48fb76fb2a40edea0c9b3d960bc95e326660d82ff1114e1f88001f7a236b40"
name = "github.com/pkg/errors"
packages = ["."]
pruneopts = ""
revision = "e881fd58d78e04cf6d0de1217f8707c8cc2249bc"
[[projects]]
digest = "1:256484dbbcd271f9ecebc6795b2df8cad4c458dd0f5fd82a8c2fa0c29f233411"
name = "github.com/pmezard/go-difflib"
packages = ["difflib"]
pruneopts = ""
revision = "792786c7400a136282c1664665ae0a8db921c6c2"
version = "v1.0.0"
[[projects]]
digest = "1:78bea5e26e82826dacc5fd64a1013a6711b7075ec8072819b89e6ad76cb8196d"
name = "github.com/samuel/go-zookeeper"
packages = ["zk"]
pruneopts = ""
revision = "471cd4e61d7a78ece1791fa5faa0345dc8c7d5a5"
[[projects]]
digest = "1:381bcbeb112a51493d9d998bbba207a529c73dbb49b3fd789e48c63fac1f192c"
name = "github.com/stretchr/testify"
packages = [
"assert",
"require",
]
pruneopts = ""
revision = "ffdc059bfe9ce6a4e144ba849dbedead332c6053"
version = "v1.3.0"
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
input-imports = [
"github.com/apache/thrift/lib/go/thrift",
"github.com/pkg/errors",
"github.com/samuel/go-zookeeper/zk",
"github.com/stretchr/testify/assert",
"github.com/stretchr/testify/require",
]
solver-name = "gps-cdcl"
solver-version = 1

16
Gopkg.toml Normal file
View file

@ -0,0 +1,16 @@
[[constraint]]
name = "github.com/apache/thrift"
version = "0.12.0"
[[constraint]]
name = "github.com/pkg/errors"
revision = "e881fd58d78e04cf6d0de1217f8707c8cc2249bc"
[[constraint]]
name = "github.com/samuel/go-zookeeper"
revision = "471cd4e61d7a78ece1791fa5faa0345dc8c7d5a5"
[[constraint]]
name = "github.com/stretchr/testify"
version = "1.3.0"

View file

@ -1,8 +1,6 @@
# gorealis [![GoDoc](https://godoc.org/github.com/paypal/gorealis?status.svg)](https://godoc.org/github.com/paypal/gorealis) ![CI Build Status](https://github.com/paypal/gorealis/actions/workflows/main.yml/badge.svg) [![codecov](https://codecov.io/gh/paypal/gorealis/branch/main/graph/badge.svg)](https://codecov.io/gh/paypal/gorealis) # gorealis [![GoDoc](https://godoc.org/github.com/paypal/gorealis?status.svg)](https://godoc.org/github.com/paypal/gorealis) [![Build Status](https://travis-ci.org/paypal/gorealis.svg?branch=master)](https://travis-ci.org/paypal/gorealis) [![codecov](https://codecov.io/gh/paypal/gorealis/branch/master/graph/badge.svg)](https://codecov.io/gh/paypal/gorealis)
Version 1 of Go library for interacting with [Aurora Scheduler](https://github.com/aurora-scheduler/aurora). Go library for interacting with [Apache Aurora](https://github.com/apache/aurora).
Version 2 of this library can be found [here](https://github.com/aurora-scheduler/gorealis).
### Aurora version compatibility ### Aurora version compatibility
Please see [.auroraversion](./.auroraversion) to see the latest Aurora version against which this Please see [.auroraversion](./.auroraversion) to see the latest Aurora version against which this
@ -16,7 +14,7 @@ library has been tested.
## Projects using gorealis ## Projects using gorealis
* [australis](https://github.com/aurora-scheduler/australis) * [australis](https://github.com/rdelval/australis)
## Contributions ## Contributions
Contributions are always welcome. Please raise an issue to discuss a contribution before it is made. Contributions are always welcome. Please raise an issue to discuss a contribution before it is made.

View file

@ -14,7 +14,7 @@ services:
ipv4_address: 192.168.33.2 ipv4_address: 192.168.33.2
master: master:
image: aurorascheduler/mesos-master:1.7.2 image: rdelvalle/mesos-master:1.6.2
restart: on-failure restart: on-failure
ports: ports:
- "5050:5050" - "5050:5050"
@ -32,13 +32,12 @@ services:
- zk - zk
agent-one: agent-one:
image: aurorascheduler/mesos-agent:1.7.2 image: rdelvalle/mesos-agent:1.6.2
pid: host pid: host
restart: on-failure restart: on-failure
ports: ports:
- "5051:5051" - "5051:5051"
environment: environment:
MESOS_ATTRIBUTES: 'zone:west'
MESOS_MASTER: zk://192.168.33.2:2181/mesos MESOS_MASTER: zk://192.168.33.2:2181/mesos
MESOS_CONTAINERIZERS: docker,mesos MESOS_CONTAINERIZERS: docker,mesos
MESOS_PORT: 5051 MESOS_PORT: 5051
@ -57,13 +56,12 @@ services:
- zk - zk
agent-two: agent-two:
image: aurorascheduler/mesos-agent:1.7.2 image: rdelvalle/mesos-agent:1.6.2
pid: host pid: host
restart: on-failure restart: on-failure
ports: ports:
- "5061:5061" - "5061:5061"
environment: environment:
MESOS_ATTRIBUTES: 'zone:east'
MESOS_MASTER: zk://192.168.33.2:2181/mesos MESOS_MASTER: zk://192.168.33.2:2181/mesos
MESOS_CONTAINERIZERS: docker,mesos MESOS_CONTAINERIZERS: docker,mesos
MESOS_HOSTNAME: localhost MESOS_HOSTNAME: localhost
@ -82,7 +80,7 @@ services:
- zk - zk
aurora-one: aurora-one:
image: aurorascheduler/scheduler:0.23.0 image: rdelvalle/aurora:0.22.0
pid: host pid: host
ports: ports:
- "8081:8081" - "8081:8081"

View file

@ -111,7 +111,7 @@ func main() {
if err != nil { if err != nil {
log.Fatalln(err) log.Fatalln(err)
} }
monitor = &realis.Monitor{Client: r} monitor = &realis.Monitor{r}
defer r.Close() defer r.Close()
switch executor { switch executor {
@ -451,6 +451,13 @@ func main() {
} }
fmt.Println(resp.String()) fmt.Println(resp.String())
case "variableBatchStep":
step, err := r.VariableBatchStep(aurora.JobUpdateKey{Job: job.JobKey(), ID: updateId})
if err != nil {
log.Fatal(err)
}
fmt.Println(step)
case "taskConfig": case "taskConfig":
fmt.Println("Getting job info") fmt.Println("Getting job info")
live, err := r.GetInstanceIds(job.JobKey(), aurora.ACTIVE_STATES) live, err := r.GetInstanceIds(job.JobKey(), aurora.ACTIVE_STATES)

View file

@ -1,4 +1,5 @@
// Code generated by Thrift Compiler (0.14.0). DO NOT EDIT. // Autogenerated by Thrift Compiler (0.12.0)
// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
package aurora package aurora

View file

@ -1,12 +1,13 @@
// Code generated by Thrift Compiler (0.14.0). DO NOT EDIT. // Autogenerated by Thrift Compiler (0.12.0)
// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
package aurora package aurora
import( import (
"bytes" "bytes"
"context" "context"
"reflect"
"fmt" "fmt"
"time"
"github.com/apache/thrift/lib/go/thrift" "github.com/apache/thrift/lib/go/thrift"
) )
@ -14,7 +15,7 @@ import(
var _ = thrift.ZERO var _ = thrift.ZERO
var _ = fmt.Printf var _ = fmt.Printf
var _ = context.Background var _ = context.Background
var _ = time.Now var _ = reflect.DeepEqual
var _ = bytes.Equal var _ = bytes.Equal
const AURORA_EXECUTOR_NAME = "AuroraExecutor" const AURORA_EXECUTOR_NAME = "AuroraExecutor"

File diff suppressed because it is too large Load diff

View file

@ -1,22 +1,22 @@
// Code generated by Thrift Compiler (0.14.0). DO NOT EDIT. // Autogenerated by Thrift Compiler (0.12.0)
// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
package main package main
import ( import (
"context" "context"
"flag" "flag"
"fmt" "fmt"
"math" "math"
"net" "net"
"net/url" "net/url"
"os" "os"
"strconv" "strconv"
"strings" "strings"
"github.com/apache/thrift/lib/go/thrift" "github.com/apache/thrift/lib/go/thrift"
"apache/aurora" "apache/aurora"
) )
var _ = aurora.GoUnusedProtection__
func Usage() { func Usage() {
fmt.Fprintln(os.Stderr, "Usage of ", os.Args[0], " [-h host:port] [-u url] [-f[ramed]] function [arg1 [arg2...]]:") fmt.Fprintln(os.Stderr, "Usage of ", os.Args[0], " [-h host:port] [-u url] [-f[ramed]] function [arg1 [arg2...]]:")
@ -175,19 +175,19 @@ func main() {
fmt.Fprintln(os.Stderr, "CreateJob requires 1 args") fmt.Fprintln(os.Stderr, "CreateJob requires 1 args")
flag.Usage() flag.Usage()
} }
arg213 := flag.Arg(1) arg163 := flag.Arg(1)
mbTrans214 := thrift.NewTMemoryBufferLen(len(arg213)) mbTrans164 := thrift.NewTMemoryBufferLen(len(arg163))
defer mbTrans214.Close() defer mbTrans164.Close()
_, err215 := mbTrans214.WriteString(arg213) _, err165 := mbTrans164.WriteString(arg163)
if err215 != nil { if err165 != nil {
Usage() Usage()
return return
} }
factory216 := thrift.NewTJSONProtocolFactory() factory166 := thrift.NewTJSONProtocolFactory()
jsProt217 := factory216.GetProtocol(mbTrans214) jsProt167 := factory166.GetProtocol(mbTrans164)
argvalue0 := aurora.NewJobConfiguration() argvalue0 := aurora.NewJobConfiguration()
err218 := argvalue0.Read(context.Background(), jsProt217) err168 := argvalue0.Read(jsProt167)
if err218 != nil { if err168 != nil {
Usage() Usage()
return return
} }
@ -200,19 +200,19 @@ func main() {
fmt.Fprintln(os.Stderr, "ScheduleCronJob requires 1 args") fmt.Fprintln(os.Stderr, "ScheduleCronJob requires 1 args")
flag.Usage() flag.Usage()
} }
arg219 := flag.Arg(1) arg169 := flag.Arg(1)
mbTrans220 := thrift.NewTMemoryBufferLen(len(arg219)) mbTrans170 := thrift.NewTMemoryBufferLen(len(arg169))
defer mbTrans220.Close() defer mbTrans170.Close()
_, err221 := mbTrans220.WriteString(arg219) _, err171 := mbTrans170.WriteString(arg169)
if err221 != nil { if err171 != nil {
Usage() Usage()
return return
} }
factory222 := thrift.NewTJSONProtocolFactory() factory172 := thrift.NewTJSONProtocolFactory()
jsProt223 := factory222.GetProtocol(mbTrans220) jsProt173 := factory172.GetProtocol(mbTrans170)
argvalue0 := aurora.NewJobConfiguration() argvalue0 := aurora.NewJobConfiguration()
err224 := argvalue0.Read(context.Background(), jsProt223) err174 := argvalue0.Read(jsProt173)
if err224 != nil { if err174 != nil {
Usage() Usage()
return return
} }
@ -225,19 +225,19 @@ func main() {
fmt.Fprintln(os.Stderr, "DescheduleCronJob requires 1 args") fmt.Fprintln(os.Stderr, "DescheduleCronJob requires 1 args")
flag.Usage() flag.Usage()
} }
arg225 := flag.Arg(1) arg175 := flag.Arg(1)
mbTrans226 := thrift.NewTMemoryBufferLen(len(arg225)) mbTrans176 := thrift.NewTMemoryBufferLen(len(arg175))
defer mbTrans226.Close() defer mbTrans176.Close()
_, err227 := mbTrans226.WriteString(arg225) _, err177 := mbTrans176.WriteString(arg175)
if err227 != nil { if err177 != nil {
Usage() Usage()
return return
} }
factory228 := thrift.NewTJSONProtocolFactory() factory178 := thrift.NewTJSONProtocolFactory()
jsProt229 := factory228.GetProtocol(mbTrans226) jsProt179 := factory178.GetProtocol(mbTrans176)
argvalue0 := aurora.NewJobKey() argvalue0 := aurora.NewJobKey()
err230 := argvalue0.Read(context.Background(), jsProt229) err180 := argvalue0.Read(jsProt179)
if err230 != nil { if err180 != nil {
Usage() Usage()
return return
} }
@ -250,19 +250,19 @@ func main() {
fmt.Fprintln(os.Stderr, "StartCronJob requires 1 args") fmt.Fprintln(os.Stderr, "StartCronJob requires 1 args")
flag.Usage() flag.Usage()
} }
arg231 := flag.Arg(1) arg181 := flag.Arg(1)
mbTrans232 := thrift.NewTMemoryBufferLen(len(arg231)) mbTrans182 := thrift.NewTMemoryBufferLen(len(arg181))
defer mbTrans232.Close() defer mbTrans182.Close()
_, err233 := mbTrans232.WriteString(arg231) _, err183 := mbTrans182.WriteString(arg181)
if err233 != nil { if err183 != nil {
Usage() Usage()
return return
} }
factory234 := thrift.NewTJSONProtocolFactory() factory184 := thrift.NewTJSONProtocolFactory()
jsProt235 := factory234.GetProtocol(mbTrans232) jsProt185 := factory184.GetProtocol(mbTrans182)
argvalue0 := aurora.NewJobKey() argvalue0 := aurora.NewJobKey()
err236 := argvalue0.Read(context.Background(), jsProt235) err186 := argvalue0.Read(jsProt185)
if err236 != nil { if err186 != nil {
Usage() Usage()
return return
} }
@ -275,36 +275,36 @@ func main() {
fmt.Fprintln(os.Stderr, "RestartShards requires 2 args") fmt.Fprintln(os.Stderr, "RestartShards requires 2 args")
flag.Usage() flag.Usage()
} }
arg237 := flag.Arg(1) arg187 := flag.Arg(1)
mbTrans238 := thrift.NewTMemoryBufferLen(len(arg237)) mbTrans188 := thrift.NewTMemoryBufferLen(len(arg187))
defer mbTrans238.Close() defer mbTrans188.Close()
_, err239 := mbTrans238.WriteString(arg237) _, err189 := mbTrans188.WriteString(arg187)
if err239 != nil { if err189 != nil {
Usage() Usage()
return return
} }
factory240 := thrift.NewTJSONProtocolFactory() factory190 := thrift.NewTJSONProtocolFactory()
jsProt241 := factory240.GetProtocol(mbTrans238) jsProt191 := factory190.GetProtocol(mbTrans188)
argvalue0 := aurora.NewJobKey() argvalue0 := aurora.NewJobKey()
err242 := argvalue0.Read(context.Background(), jsProt241) err192 := argvalue0.Read(jsProt191)
if err242 != nil { if err192 != nil {
Usage() Usage()
return return
} }
value0 := argvalue0 value0 := argvalue0
arg243 := flag.Arg(2) arg193 := flag.Arg(2)
mbTrans244 := thrift.NewTMemoryBufferLen(len(arg243)) mbTrans194 := thrift.NewTMemoryBufferLen(len(arg193))
defer mbTrans244.Close() defer mbTrans194.Close()
_, err245 := mbTrans244.WriteString(arg243) _, err195 := mbTrans194.WriteString(arg193)
if err245 != nil { if err195 != nil {
Usage() Usage()
return return
} }
factory246 := thrift.NewTJSONProtocolFactory() factory196 := thrift.NewTJSONProtocolFactory()
jsProt247 := factory246.GetProtocol(mbTrans244) jsProt197 := factory196.GetProtocol(mbTrans194)
containerStruct1 := aurora.NewAuroraSchedulerManagerRestartShardsArgs() containerStruct1 := aurora.NewAuroraSchedulerManagerRestartShardsArgs()
err248 := containerStruct1.ReadField2(context.Background(), jsProt247) err198 := containerStruct1.ReadField2(jsProt197)
if err248 != nil { if err198 != nil {
Usage() Usage()
return return
} }
@ -318,36 +318,36 @@ func main() {
fmt.Fprintln(os.Stderr, "KillTasks requires 3 args") fmt.Fprintln(os.Stderr, "KillTasks requires 3 args")
flag.Usage() flag.Usage()
} }
arg249 := flag.Arg(1) arg199 := flag.Arg(1)
mbTrans250 := thrift.NewTMemoryBufferLen(len(arg249)) mbTrans200 := thrift.NewTMemoryBufferLen(len(arg199))
defer mbTrans250.Close() defer mbTrans200.Close()
_, err251 := mbTrans250.WriteString(arg249) _, err201 := mbTrans200.WriteString(arg199)
if err251 != nil { if err201 != nil {
Usage() Usage()
return return
} }
factory252 := thrift.NewTJSONProtocolFactory() factory202 := thrift.NewTJSONProtocolFactory()
jsProt253 := factory252.GetProtocol(mbTrans250) jsProt203 := factory202.GetProtocol(mbTrans200)
argvalue0 := aurora.NewJobKey() argvalue0 := aurora.NewJobKey()
err254 := argvalue0.Read(context.Background(), jsProt253) err204 := argvalue0.Read(jsProt203)
if err254 != nil { if err204 != nil {
Usage() Usage()
return return
} }
value0 := argvalue0 value0 := argvalue0
arg255 := flag.Arg(2) arg205 := flag.Arg(2)
mbTrans256 := thrift.NewTMemoryBufferLen(len(arg255)) mbTrans206 := thrift.NewTMemoryBufferLen(len(arg205))
defer mbTrans256.Close() defer mbTrans206.Close()
_, err257 := mbTrans256.WriteString(arg255) _, err207 := mbTrans206.WriteString(arg205)
if err257 != nil { if err207 != nil {
Usage() Usage()
return return
} }
factory258 := thrift.NewTJSONProtocolFactory() factory208 := thrift.NewTJSONProtocolFactory()
jsProt259 := factory258.GetProtocol(mbTrans256) jsProt209 := factory208.GetProtocol(mbTrans206)
containerStruct1 := aurora.NewAuroraSchedulerManagerKillTasksArgs() containerStruct1 := aurora.NewAuroraSchedulerManagerKillTasksArgs()
err260 := containerStruct1.ReadField2(context.Background(), jsProt259) err210 := containerStruct1.ReadField2(jsProt209)
if err260 != nil { if err210 != nil {
Usage() Usage()
return return
} }
@ -363,25 +363,25 @@ func main() {
fmt.Fprintln(os.Stderr, "AddInstances requires 2 args") fmt.Fprintln(os.Stderr, "AddInstances requires 2 args")
flag.Usage() flag.Usage()
} }
arg262 := flag.Arg(1) arg212 := flag.Arg(1)
mbTrans263 := thrift.NewTMemoryBufferLen(len(arg262)) mbTrans213 := thrift.NewTMemoryBufferLen(len(arg212))
defer mbTrans263.Close() defer mbTrans213.Close()
_, err264 := mbTrans263.WriteString(arg262) _, err214 := mbTrans213.WriteString(arg212)
if err264 != nil { if err214 != nil {
Usage() Usage()
return return
} }
factory265 := thrift.NewTJSONProtocolFactory() factory215 := thrift.NewTJSONProtocolFactory()
jsProt266 := factory265.GetProtocol(mbTrans263) jsProt216 := factory215.GetProtocol(mbTrans213)
argvalue0 := aurora.NewInstanceKey() argvalue0 := aurora.NewInstanceKey()
err267 := argvalue0.Read(context.Background(), jsProt266) err217 := argvalue0.Read(jsProt216)
if err267 != nil { if err217 != nil {
Usage() Usage()
return return
} }
value0 := argvalue0 value0 := argvalue0
tmp1, err268 := (strconv.Atoi(flag.Arg(2))) tmp1, err218 := (strconv.Atoi(flag.Arg(2)))
if err268 != nil { if err218 != nil {
Usage() Usage()
return return
} }
@ -395,19 +395,19 @@ func main() {
fmt.Fprintln(os.Stderr, "ReplaceCronTemplate requires 1 args") fmt.Fprintln(os.Stderr, "ReplaceCronTemplate requires 1 args")
flag.Usage() flag.Usage()
} }
arg269 := flag.Arg(1) arg219 := flag.Arg(1)
mbTrans270 := thrift.NewTMemoryBufferLen(len(arg269)) mbTrans220 := thrift.NewTMemoryBufferLen(len(arg219))
defer mbTrans270.Close() defer mbTrans220.Close()
_, err271 := mbTrans270.WriteString(arg269) _, err221 := mbTrans220.WriteString(arg219)
if err271 != nil { if err221 != nil {
Usage() Usage()
return return
} }
factory272 := thrift.NewTJSONProtocolFactory() factory222 := thrift.NewTJSONProtocolFactory()
jsProt273 := factory272.GetProtocol(mbTrans270) jsProt223 := factory222.GetProtocol(mbTrans220)
argvalue0 := aurora.NewJobConfiguration() argvalue0 := aurora.NewJobConfiguration()
err274 := argvalue0.Read(context.Background(), jsProt273) err224 := argvalue0.Read(jsProt223)
if err274 != nil { if err224 != nil {
Usage() Usage()
return return
} }
@ -420,19 +420,19 @@ func main() {
fmt.Fprintln(os.Stderr, "StartJobUpdate requires 2 args") fmt.Fprintln(os.Stderr, "StartJobUpdate requires 2 args")
flag.Usage() flag.Usage()
} }
arg275 := flag.Arg(1) arg225 := flag.Arg(1)
mbTrans276 := thrift.NewTMemoryBufferLen(len(arg275)) mbTrans226 := thrift.NewTMemoryBufferLen(len(arg225))
defer mbTrans276.Close() defer mbTrans226.Close()
_, err277 := mbTrans276.WriteString(arg275) _, err227 := mbTrans226.WriteString(arg225)
if err277 != nil { if err227 != nil {
Usage() Usage()
return return
} }
factory278 := thrift.NewTJSONProtocolFactory() factory228 := thrift.NewTJSONProtocolFactory()
jsProt279 := factory278.GetProtocol(mbTrans276) jsProt229 := factory228.GetProtocol(mbTrans226)
argvalue0 := aurora.NewJobUpdateRequest() argvalue0 := aurora.NewJobUpdateRequest()
err280 := argvalue0.Read(context.Background(), jsProt279) err230 := argvalue0.Read(jsProt229)
if err280 != nil { if err230 != nil {
Usage() Usage()
return return
} }
@ -447,19 +447,19 @@ func main() {
fmt.Fprintln(os.Stderr, "PauseJobUpdate requires 2 args") fmt.Fprintln(os.Stderr, "PauseJobUpdate requires 2 args")
flag.Usage() flag.Usage()
} }
arg282 := flag.Arg(1) arg232 := flag.Arg(1)
mbTrans283 := thrift.NewTMemoryBufferLen(len(arg282)) mbTrans233 := thrift.NewTMemoryBufferLen(len(arg232))
defer mbTrans283.Close() defer mbTrans233.Close()
_, err284 := mbTrans283.WriteString(arg282) _, err234 := mbTrans233.WriteString(arg232)
if err284 != nil { if err234 != nil {
Usage() Usage()
return return
} }
factory285 := thrift.NewTJSONProtocolFactory() factory235 := thrift.NewTJSONProtocolFactory()
jsProt286 := factory285.GetProtocol(mbTrans283) jsProt236 := factory235.GetProtocol(mbTrans233)
argvalue0 := aurora.NewJobUpdateKey() argvalue0 := aurora.NewJobUpdateKey()
err287 := argvalue0.Read(context.Background(), jsProt286) err237 := argvalue0.Read(jsProt236)
if err287 != nil { if err237 != nil {
Usage() Usage()
return return
} }
@ -474,19 +474,19 @@ func main() {
fmt.Fprintln(os.Stderr, "ResumeJobUpdate requires 2 args") fmt.Fprintln(os.Stderr, "ResumeJobUpdate requires 2 args")
flag.Usage() flag.Usage()
} }
arg289 := flag.Arg(1) arg239 := flag.Arg(1)
mbTrans290 := thrift.NewTMemoryBufferLen(len(arg289)) mbTrans240 := thrift.NewTMemoryBufferLen(len(arg239))
defer mbTrans290.Close() defer mbTrans240.Close()
_, err291 := mbTrans290.WriteString(arg289) _, err241 := mbTrans240.WriteString(arg239)
if err291 != nil { if err241 != nil {
Usage() Usage()
return return
} }
factory292 := thrift.NewTJSONProtocolFactory() factory242 := thrift.NewTJSONProtocolFactory()
jsProt293 := factory292.GetProtocol(mbTrans290) jsProt243 := factory242.GetProtocol(mbTrans240)
argvalue0 := aurora.NewJobUpdateKey() argvalue0 := aurora.NewJobUpdateKey()
err294 := argvalue0.Read(context.Background(), jsProt293) err244 := argvalue0.Read(jsProt243)
if err294 != nil { if err244 != nil {
Usage() Usage()
return return
} }
@ -501,19 +501,19 @@ func main() {
fmt.Fprintln(os.Stderr, "AbortJobUpdate requires 2 args") fmt.Fprintln(os.Stderr, "AbortJobUpdate requires 2 args")
flag.Usage() flag.Usage()
} }
arg296 := flag.Arg(1) arg246 := flag.Arg(1)
mbTrans297 := thrift.NewTMemoryBufferLen(len(arg296)) mbTrans247 := thrift.NewTMemoryBufferLen(len(arg246))
defer mbTrans297.Close() defer mbTrans247.Close()
_, err298 := mbTrans297.WriteString(arg296) _, err248 := mbTrans247.WriteString(arg246)
if err298 != nil { if err248 != nil {
Usage() Usage()
return return
} }
factory299 := thrift.NewTJSONProtocolFactory() factory249 := thrift.NewTJSONProtocolFactory()
jsProt300 := factory299.GetProtocol(mbTrans297) jsProt250 := factory249.GetProtocol(mbTrans247)
argvalue0 := aurora.NewJobUpdateKey() argvalue0 := aurora.NewJobUpdateKey()
err301 := argvalue0.Read(context.Background(), jsProt300) err251 := argvalue0.Read(jsProt250)
if err301 != nil { if err251 != nil {
Usage() Usage()
return return
} }
@ -528,19 +528,19 @@ func main() {
fmt.Fprintln(os.Stderr, "RollbackJobUpdate requires 2 args") fmt.Fprintln(os.Stderr, "RollbackJobUpdate requires 2 args")
flag.Usage() flag.Usage()
} }
arg303 := flag.Arg(1) arg253 := flag.Arg(1)
mbTrans304 := thrift.NewTMemoryBufferLen(len(arg303)) mbTrans254 := thrift.NewTMemoryBufferLen(len(arg253))
defer mbTrans304.Close() defer mbTrans254.Close()
_, err305 := mbTrans304.WriteString(arg303) _, err255 := mbTrans254.WriteString(arg253)
if err305 != nil { if err255 != nil {
Usage() Usage()
return return
} }
factory306 := thrift.NewTJSONProtocolFactory() factory256 := thrift.NewTJSONProtocolFactory()
jsProt307 := factory306.GetProtocol(mbTrans304) jsProt257 := factory256.GetProtocol(mbTrans254)
argvalue0 := aurora.NewJobUpdateKey() argvalue0 := aurora.NewJobUpdateKey()
err308 := argvalue0.Read(context.Background(), jsProt307) err258 := argvalue0.Read(jsProt257)
if err308 != nil { if err258 != nil {
Usage() Usage()
return return
} }
@ -555,19 +555,19 @@ func main() {
fmt.Fprintln(os.Stderr, "PulseJobUpdate requires 1 args") fmt.Fprintln(os.Stderr, "PulseJobUpdate requires 1 args")
flag.Usage() flag.Usage()
} }
arg310 := flag.Arg(1) arg260 := flag.Arg(1)
mbTrans311 := thrift.NewTMemoryBufferLen(len(arg310)) mbTrans261 := thrift.NewTMemoryBufferLen(len(arg260))
defer mbTrans311.Close() defer mbTrans261.Close()
_, err312 := mbTrans311.WriteString(arg310) _, err262 := mbTrans261.WriteString(arg260)
if err312 != nil { if err262 != nil {
Usage() Usage()
return return
} }
factory313 := thrift.NewTJSONProtocolFactory() factory263 := thrift.NewTJSONProtocolFactory()
jsProt314 := factory313.GetProtocol(mbTrans311) jsProt264 := factory263.GetProtocol(mbTrans261)
argvalue0 := aurora.NewJobUpdateKey() argvalue0 := aurora.NewJobUpdateKey()
err315 := argvalue0.Read(context.Background(), jsProt314) err265 := argvalue0.Read(jsProt264)
if err315 != nil { if err265 != nil {
Usage() Usage()
return return
} }
@ -598,19 +598,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetTasksStatus requires 1 args") fmt.Fprintln(os.Stderr, "GetTasksStatus requires 1 args")
flag.Usage() flag.Usage()
} }
arg317 := flag.Arg(1) arg267 := flag.Arg(1)
mbTrans318 := thrift.NewTMemoryBufferLen(len(arg317)) mbTrans268 := thrift.NewTMemoryBufferLen(len(arg267))
defer mbTrans318.Close() defer mbTrans268.Close()
_, err319 := mbTrans318.WriteString(arg317) _, err269 := mbTrans268.WriteString(arg267)
if err319 != nil { if err269 != nil {
Usage() Usage()
return return
} }
factory320 := thrift.NewTJSONProtocolFactory() factory270 := thrift.NewTJSONProtocolFactory()
jsProt321 := factory320.GetProtocol(mbTrans318) jsProt271 := factory270.GetProtocol(mbTrans268)
argvalue0 := aurora.NewTaskQuery() argvalue0 := aurora.NewTaskQuery()
err322 := argvalue0.Read(context.Background(), jsProt321) err272 := argvalue0.Read(jsProt271)
if err322 != nil { if err272 != nil {
Usage() Usage()
return return
} }
@ -623,19 +623,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetTasksWithoutConfigs requires 1 args") fmt.Fprintln(os.Stderr, "GetTasksWithoutConfigs requires 1 args")
flag.Usage() flag.Usage()
} }
arg323 := flag.Arg(1) arg273 := flag.Arg(1)
mbTrans324 := thrift.NewTMemoryBufferLen(len(arg323)) mbTrans274 := thrift.NewTMemoryBufferLen(len(arg273))
defer mbTrans324.Close() defer mbTrans274.Close()
_, err325 := mbTrans324.WriteString(arg323) _, err275 := mbTrans274.WriteString(arg273)
if err325 != nil { if err275 != nil {
Usage() Usage()
return return
} }
factory326 := thrift.NewTJSONProtocolFactory() factory276 := thrift.NewTJSONProtocolFactory()
jsProt327 := factory326.GetProtocol(mbTrans324) jsProt277 := factory276.GetProtocol(mbTrans274)
argvalue0 := aurora.NewTaskQuery() argvalue0 := aurora.NewTaskQuery()
err328 := argvalue0.Read(context.Background(), jsProt327) err278 := argvalue0.Read(jsProt277)
if err328 != nil { if err278 != nil {
Usage() Usage()
return return
} }
@ -648,19 +648,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetPendingReason requires 1 args") fmt.Fprintln(os.Stderr, "GetPendingReason requires 1 args")
flag.Usage() flag.Usage()
} }
arg329 := flag.Arg(1) arg279 := flag.Arg(1)
mbTrans330 := thrift.NewTMemoryBufferLen(len(arg329)) mbTrans280 := thrift.NewTMemoryBufferLen(len(arg279))
defer mbTrans330.Close() defer mbTrans280.Close()
_, err331 := mbTrans330.WriteString(arg329) _, err281 := mbTrans280.WriteString(arg279)
if err331 != nil { if err281 != nil {
Usage() Usage()
return return
} }
factory332 := thrift.NewTJSONProtocolFactory() factory282 := thrift.NewTJSONProtocolFactory()
jsProt333 := factory332.GetProtocol(mbTrans330) jsProt283 := factory282.GetProtocol(mbTrans280)
argvalue0 := aurora.NewTaskQuery() argvalue0 := aurora.NewTaskQuery()
err334 := argvalue0.Read(context.Background(), jsProt333) err284 := argvalue0.Read(jsProt283)
if err334 != nil { if err284 != nil {
Usage() Usage()
return return
} }
@ -673,19 +673,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetConfigSummary requires 1 args") fmt.Fprintln(os.Stderr, "GetConfigSummary requires 1 args")
flag.Usage() flag.Usage()
} }
arg335 := flag.Arg(1) arg285 := flag.Arg(1)
mbTrans336 := thrift.NewTMemoryBufferLen(len(arg335)) mbTrans286 := thrift.NewTMemoryBufferLen(len(arg285))
defer mbTrans336.Close() defer mbTrans286.Close()
_, err337 := mbTrans336.WriteString(arg335) _, err287 := mbTrans286.WriteString(arg285)
if err337 != nil { if err287 != nil {
Usage() Usage()
return return
} }
factory338 := thrift.NewTJSONProtocolFactory() factory288 := thrift.NewTJSONProtocolFactory()
jsProt339 := factory338.GetProtocol(mbTrans336) jsProt289 := factory288.GetProtocol(mbTrans286)
argvalue0 := aurora.NewJobKey() argvalue0 := aurora.NewJobKey()
err340 := argvalue0.Read(context.Background(), jsProt339) err290 := argvalue0.Read(jsProt289)
if err340 != nil { if err290 != nil {
Usage() Usage()
return return
} }
@ -718,19 +718,19 @@ func main() {
fmt.Fprintln(os.Stderr, "PopulateJobConfig requires 1 args") fmt.Fprintln(os.Stderr, "PopulateJobConfig requires 1 args")
flag.Usage() flag.Usage()
} }
arg343 := flag.Arg(1) arg293 := flag.Arg(1)
mbTrans344 := thrift.NewTMemoryBufferLen(len(arg343)) mbTrans294 := thrift.NewTMemoryBufferLen(len(arg293))
defer mbTrans344.Close() defer mbTrans294.Close()
_, err345 := mbTrans344.WriteString(arg343) _, err295 := mbTrans294.WriteString(arg293)
if err345 != nil { if err295 != nil {
Usage() Usage()
return return
} }
factory346 := thrift.NewTJSONProtocolFactory() factory296 := thrift.NewTJSONProtocolFactory()
jsProt347 := factory346.GetProtocol(mbTrans344) jsProt297 := factory296.GetProtocol(mbTrans294)
argvalue0 := aurora.NewJobConfiguration() argvalue0 := aurora.NewJobConfiguration()
err348 := argvalue0.Read(context.Background(), jsProt347) err298 := argvalue0.Read(jsProt297)
if err348 != nil { if err298 != nil {
Usage() Usage()
return return
} }
@ -743,19 +743,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetJobUpdateSummaries requires 1 args") fmt.Fprintln(os.Stderr, "GetJobUpdateSummaries requires 1 args")
flag.Usage() flag.Usage()
} }
arg349 := flag.Arg(1) arg299 := flag.Arg(1)
mbTrans350 := thrift.NewTMemoryBufferLen(len(arg349)) mbTrans300 := thrift.NewTMemoryBufferLen(len(arg299))
defer mbTrans350.Close() defer mbTrans300.Close()
_, err351 := mbTrans350.WriteString(arg349) _, err301 := mbTrans300.WriteString(arg299)
if err351 != nil { if err301 != nil {
Usage() Usage()
return return
} }
factory352 := thrift.NewTJSONProtocolFactory() factory302 := thrift.NewTJSONProtocolFactory()
jsProt353 := factory352.GetProtocol(mbTrans350) jsProt303 := factory302.GetProtocol(mbTrans300)
argvalue0 := aurora.NewJobUpdateQuery() argvalue0 := aurora.NewJobUpdateQuery()
err354 := argvalue0.Read(context.Background(), jsProt353) err304 := argvalue0.Read(jsProt303)
if err354 != nil { if err304 != nil {
Usage() Usage()
return return
} }
@ -768,19 +768,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetJobUpdateDetails requires 1 args") fmt.Fprintln(os.Stderr, "GetJobUpdateDetails requires 1 args")
flag.Usage() flag.Usage()
} }
arg355 := flag.Arg(1) arg305 := flag.Arg(1)
mbTrans356 := thrift.NewTMemoryBufferLen(len(arg355)) mbTrans306 := thrift.NewTMemoryBufferLen(len(arg305))
defer mbTrans356.Close() defer mbTrans306.Close()
_, err357 := mbTrans356.WriteString(arg355) _, err307 := mbTrans306.WriteString(arg305)
if err357 != nil { if err307 != nil {
Usage() Usage()
return return
} }
factory358 := thrift.NewTJSONProtocolFactory() factory308 := thrift.NewTJSONProtocolFactory()
jsProt359 := factory358.GetProtocol(mbTrans356) jsProt309 := factory308.GetProtocol(mbTrans306)
argvalue0 := aurora.NewJobUpdateQuery() argvalue0 := aurora.NewJobUpdateQuery()
err360 := argvalue0.Read(context.Background(), jsProt359) err310 := argvalue0.Read(jsProt309)
if err360 != nil { if err310 != nil {
Usage() Usage()
return return
} }
@ -793,19 +793,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetJobUpdateDiff requires 1 args") fmt.Fprintln(os.Stderr, "GetJobUpdateDiff requires 1 args")
flag.Usage() flag.Usage()
} }
arg361 := flag.Arg(1) arg311 := flag.Arg(1)
mbTrans362 := thrift.NewTMemoryBufferLen(len(arg361)) mbTrans312 := thrift.NewTMemoryBufferLen(len(arg311))
defer mbTrans362.Close() defer mbTrans312.Close()
_, err363 := mbTrans362.WriteString(arg361) _, err313 := mbTrans312.WriteString(arg311)
if err363 != nil { if err313 != nil {
Usage() Usage()
return return
} }
factory364 := thrift.NewTJSONProtocolFactory() factory314 := thrift.NewTJSONProtocolFactory()
jsProt365 := factory364.GetProtocol(mbTrans362) jsProt315 := factory314.GetProtocol(mbTrans312)
argvalue0 := aurora.NewJobUpdateRequest() argvalue0 := aurora.NewJobUpdateRequest()
err366 := argvalue0.Read(context.Background(), jsProt365) err316 := argvalue0.Read(jsProt315)
if err366 != nil { if err316 != nil {
Usage() Usage()
return return
} }

View file

@ -1,22 +1,22 @@
// Code generated by Thrift Compiler (0.14.0). DO NOT EDIT. // Autogenerated by Thrift Compiler (0.12.0)
// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
package main package main
import ( import (
"context" "context"
"flag" "flag"
"fmt" "fmt"
"math" "math"
"net" "net"
"net/url" "net/url"
"os" "os"
"strconv" "strconv"
"strings" "strings"
"github.com/apache/thrift/lib/go/thrift" "github.com/apache/thrift/lib/go/thrift"
"apache/aurora" "apache/aurora"
) )
var _ = aurora.GoUnusedProtection__
func Usage() { func Usage() {
fmt.Fprintln(os.Stderr, "Usage of ", os.Args[0], " [-h host:port] [-u url] [-f[ramed]] function [arg1 [arg2...]]:") fmt.Fprintln(os.Stderr, "Usage of ", os.Args[0], " [-h host:port] [-u url] [-f[ramed]] function [arg1 [arg2...]]:")
@ -179,19 +179,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetTasksStatus requires 1 args") fmt.Fprintln(os.Stderr, "GetTasksStatus requires 1 args")
flag.Usage() flag.Usage()
} }
arg132 := flag.Arg(1) arg82 := flag.Arg(1)
mbTrans133 := thrift.NewTMemoryBufferLen(len(arg132)) mbTrans83 := thrift.NewTMemoryBufferLen(len(arg82))
defer mbTrans133.Close() defer mbTrans83.Close()
_, err134 := mbTrans133.WriteString(arg132) _, err84 := mbTrans83.WriteString(arg82)
if err134 != nil { if err84 != nil {
Usage() Usage()
return return
} }
factory135 := thrift.NewTJSONProtocolFactory() factory85 := thrift.NewTJSONProtocolFactory()
jsProt136 := factory135.GetProtocol(mbTrans133) jsProt86 := factory85.GetProtocol(mbTrans83)
argvalue0 := aurora.NewTaskQuery() argvalue0 := aurora.NewTaskQuery()
err137 := argvalue0.Read(context.Background(), jsProt136) err87 := argvalue0.Read(jsProt86)
if err137 != nil { if err87 != nil {
Usage() Usage()
return return
} }
@ -204,19 +204,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetTasksWithoutConfigs requires 1 args") fmt.Fprintln(os.Stderr, "GetTasksWithoutConfigs requires 1 args")
flag.Usage() flag.Usage()
} }
arg138 := flag.Arg(1) arg88 := flag.Arg(1)
mbTrans139 := thrift.NewTMemoryBufferLen(len(arg138)) mbTrans89 := thrift.NewTMemoryBufferLen(len(arg88))
defer mbTrans139.Close() defer mbTrans89.Close()
_, err140 := mbTrans139.WriteString(arg138) _, err90 := mbTrans89.WriteString(arg88)
if err140 != nil { if err90 != nil {
Usage() Usage()
return return
} }
factory141 := thrift.NewTJSONProtocolFactory() factory91 := thrift.NewTJSONProtocolFactory()
jsProt142 := factory141.GetProtocol(mbTrans139) jsProt92 := factory91.GetProtocol(mbTrans89)
argvalue0 := aurora.NewTaskQuery() argvalue0 := aurora.NewTaskQuery()
err143 := argvalue0.Read(context.Background(), jsProt142) err93 := argvalue0.Read(jsProt92)
if err143 != nil { if err93 != nil {
Usage() Usage()
return return
} }
@ -229,19 +229,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetPendingReason requires 1 args") fmt.Fprintln(os.Stderr, "GetPendingReason requires 1 args")
flag.Usage() flag.Usage()
} }
arg144 := flag.Arg(1) arg94 := flag.Arg(1)
mbTrans145 := thrift.NewTMemoryBufferLen(len(arg144)) mbTrans95 := thrift.NewTMemoryBufferLen(len(arg94))
defer mbTrans145.Close() defer mbTrans95.Close()
_, err146 := mbTrans145.WriteString(arg144) _, err96 := mbTrans95.WriteString(arg94)
if err146 != nil { if err96 != nil {
Usage() Usage()
return return
} }
factory147 := thrift.NewTJSONProtocolFactory() factory97 := thrift.NewTJSONProtocolFactory()
jsProt148 := factory147.GetProtocol(mbTrans145) jsProt98 := factory97.GetProtocol(mbTrans95)
argvalue0 := aurora.NewTaskQuery() argvalue0 := aurora.NewTaskQuery()
err149 := argvalue0.Read(context.Background(), jsProt148) err99 := argvalue0.Read(jsProt98)
if err149 != nil { if err99 != nil {
Usage() Usage()
return return
} }
@ -254,19 +254,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetConfigSummary requires 1 args") fmt.Fprintln(os.Stderr, "GetConfigSummary requires 1 args")
flag.Usage() flag.Usage()
} }
arg150 := flag.Arg(1) arg100 := flag.Arg(1)
mbTrans151 := thrift.NewTMemoryBufferLen(len(arg150)) mbTrans101 := thrift.NewTMemoryBufferLen(len(arg100))
defer mbTrans151.Close() defer mbTrans101.Close()
_, err152 := mbTrans151.WriteString(arg150) _, err102 := mbTrans101.WriteString(arg100)
if err152 != nil { if err102 != nil {
Usage() Usage()
return return
} }
factory153 := thrift.NewTJSONProtocolFactory() factory103 := thrift.NewTJSONProtocolFactory()
jsProt154 := factory153.GetProtocol(mbTrans151) jsProt104 := factory103.GetProtocol(mbTrans101)
argvalue0 := aurora.NewJobKey() argvalue0 := aurora.NewJobKey()
err155 := argvalue0.Read(context.Background(), jsProt154) err105 := argvalue0.Read(jsProt104)
if err155 != nil { if err105 != nil {
Usage() Usage()
return return
} }
@ -299,19 +299,19 @@ func main() {
fmt.Fprintln(os.Stderr, "PopulateJobConfig requires 1 args") fmt.Fprintln(os.Stderr, "PopulateJobConfig requires 1 args")
flag.Usage() flag.Usage()
} }
arg158 := flag.Arg(1) arg108 := flag.Arg(1)
mbTrans159 := thrift.NewTMemoryBufferLen(len(arg158)) mbTrans109 := thrift.NewTMemoryBufferLen(len(arg108))
defer mbTrans159.Close() defer mbTrans109.Close()
_, err160 := mbTrans159.WriteString(arg158) _, err110 := mbTrans109.WriteString(arg108)
if err160 != nil { if err110 != nil {
Usage() Usage()
return return
} }
factory161 := thrift.NewTJSONProtocolFactory() factory111 := thrift.NewTJSONProtocolFactory()
jsProt162 := factory161.GetProtocol(mbTrans159) jsProt112 := factory111.GetProtocol(mbTrans109)
argvalue0 := aurora.NewJobConfiguration() argvalue0 := aurora.NewJobConfiguration()
err163 := argvalue0.Read(context.Background(), jsProt162) err113 := argvalue0.Read(jsProt112)
if err163 != nil { if err113 != nil {
Usage() Usage()
return return
} }
@ -324,19 +324,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetJobUpdateSummaries requires 1 args") fmt.Fprintln(os.Stderr, "GetJobUpdateSummaries requires 1 args")
flag.Usage() flag.Usage()
} }
arg164 := flag.Arg(1) arg114 := flag.Arg(1)
mbTrans165 := thrift.NewTMemoryBufferLen(len(arg164)) mbTrans115 := thrift.NewTMemoryBufferLen(len(arg114))
defer mbTrans165.Close() defer mbTrans115.Close()
_, err166 := mbTrans165.WriteString(arg164) _, err116 := mbTrans115.WriteString(arg114)
if err166 != nil { if err116 != nil {
Usage() Usage()
return return
} }
factory167 := thrift.NewTJSONProtocolFactory() factory117 := thrift.NewTJSONProtocolFactory()
jsProt168 := factory167.GetProtocol(mbTrans165) jsProt118 := factory117.GetProtocol(mbTrans115)
argvalue0 := aurora.NewJobUpdateQuery() argvalue0 := aurora.NewJobUpdateQuery()
err169 := argvalue0.Read(context.Background(), jsProt168) err119 := argvalue0.Read(jsProt118)
if err169 != nil { if err119 != nil {
Usage() Usage()
return return
} }
@ -349,19 +349,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetJobUpdateDetails requires 1 args") fmt.Fprintln(os.Stderr, "GetJobUpdateDetails requires 1 args")
flag.Usage() flag.Usage()
} }
arg170 := flag.Arg(1) arg120 := flag.Arg(1)
mbTrans171 := thrift.NewTMemoryBufferLen(len(arg170)) mbTrans121 := thrift.NewTMemoryBufferLen(len(arg120))
defer mbTrans171.Close() defer mbTrans121.Close()
_, err172 := mbTrans171.WriteString(arg170) _, err122 := mbTrans121.WriteString(arg120)
if err172 != nil { if err122 != nil {
Usage() Usage()
return return
} }
factory173 := thrift.NewTJSONProtocolFactory() factory123 := thrift.NewTJSONProtocolFactory()
jsProt174 := factory173.GetProtocol(mbTrans171) jsProt124 := factory123.GetProtocol(mbTrans121)
argvalue0 := aurora.NewJobUpdateQuery() argvalue0 := aurora.NewJobUpdateQuery()
err175 := argvalue0.Read(context.Background(), jsProt174) err125 := argvalue0.Read(jsProt124)
if err175 != nil { if err125 != nil {
Usage() Usage()
return return
} }
@ -374,19 +374,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetJobUpdateDiff requires 1 args") fmt.Fprintln(os.Stderr, "GetJobUpdateDiff requires 1 args")
flag.Usage() flag.Usage()
} }
arg176 := flag.Arg(1) arg126 := flag.Arg(1)
mbTrans177 := thrift.NewTMemoryBufferLen(len(arg176)) mbTrans127 := thrift.NewTMemoryBufferLen(len(arg126))
defer mbTrans177.Close() defer mbTrans127.Close()
_, err178 := mbTrans177.WriteString(arg176) _, err128 := mbTrans127.WriteString(arg126)
if err178 != nil { if err128 != nil {
Usage() Usage()
return return
} }
factory179 := thrift.NewTJSONProtocolFactory() factory129 := thrift.NewTJSONProtocolFactory()
jsProt180 := factory179.GetProtocol(mbTrans177) jsProt130 := factory129.GetProtocol(mbTrans127)
argvalue0 := aurora.NewJobUpdateRequest() argvalue0 := aurora.NewJobUpdateRequest()
err181 := argvalue0.Read(context.Background(), jsProt180) err131 := argvalue0.Read(jsProt130)
if err181 != nil { if err131 != nil {
Usage() Usage()
return return
} }

View file

@ -1,6 +1,6 @@
#! /bin/bash #! /bin/bash
THRIFT_VER=0.14.0 THRIFT_VER=0.12.0
if [[ $(thrift -version | grep -e $THRIFT_VER -c) -ne 1 ]]; then if [[ $(thrift -version | grep -e $THRIFT_VER -c) -ne 1 ]]; then
echo "Warning: This wrapper has only been tested with version" $THRIFT_VER; echo "Warning: This wrapper has only been tested with version" $THRIFT_VER;

12
go.mod
View file

@ -1,12 +1,12 @@
module github.com/paypal/gorealis module github.com/paypal/gorealis
go 1.13 go 1.12
require ( require (
github.com/apache/thrift v0.14.0 github.com/apache/thrift v0.12.0
github.com/davecgh/go-spew v1.1.0 // indirect github.com/davecgh/go-spew v1.1.0
github.com/pkg/errors v0.9.1 github.com/pkg/errors v0.0.0-20171216070316-e881fd58d78e
github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.0
github.com/samuel/go-zookeeper v0.0.0-20171117190445-471cd4e61d7a github.com/samuel/go-zookeeper v0.0.0-20171117190445-471cd4e61d7a
github.com/stretchr/testify v1.7.0 github.com/stretchr/testify v1.2.0
) )

25
go.sum
View file

@ -1,30 +1,9 @@
github.com/apache/thrift v0.13.0 h1:5hryIiq9gtn+MiLVn0wP37kb/uTeRZgN08WoCsAhIhI= github.com/apache/thrift v0.12.0 h1:pODnxUFNcjP9UTLZGTdeh+j16A8lJbRvD3rOtrk/7bs=
github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.14.0 h1:vqZ2DP42i8th2OsgCcYZkirtbzvpZEFx53LiWDJXIAs=
github.com/apache/thrift v0.14.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pkg/errors v0.0.0-20171216070316-e881fd58d78e h1:+RHxT/gm0O3UF7nLJbdNzAmULvCFt4XfXHWzh3XI/zs= github.com/pkg/errors v0.0.0-20171216070316-e881fd58d78e h1:+RHxT/gm0O3UF7nLJbdNzAmULvCFt4XfXHWzh3XI/zs=
github.com/pkg/errors v0.0.0-20171216070316-e881fd58d78e/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.0.0-20171216070316-e881fd58d78e/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/ridv/thrift v0.12.1 h1:b80V1Oa2Mbd++jrlJZbJsIybO5/MCfbXKzd1A5v4aSo=
github.com/ridv/thrift v0.12.1/go.mod h1:yTMRF94RCZjO1fY1xt69yncvMbQCPdRL8BhbwIrjPx8=
github.com/ridv/thrift v0.13.1 h1:/8XnTRUqJJeiuqoL7mfnJQmXQa4GJn9tUCiP7+i6Y9o=
github.com/ridv/thrift v0.13.1/go.mod h1:yTMRF94RCZjO1fY1xt69yncvMbQCPdRL8BhbwIrjPx8=
github.com/ridv/thrift v0.13.2 h1:Q3Smr8poXd7VkWZPHvdJZzlQCJO+b5W37ECfoUL4qHc=
github.com/ridv/thrift v0.13.2/go.mod h1:yTMRF94RCZjO1fY1xt69yncvMbQCPdRL8BhbwIrjPx8=
github.com/samuel/go-zookeeper v0.0.0-20171117190445-471cd4e61d7a h1:EYL2xz/Zdo0hyqdZMXR4lmT2O11jDLTPCEqIe/FR6W4= github.com/samuel/go-zookeeper v0.0.0-20171117190445-471cd4e61d7a h1:EYL2xz/Zdo0hyqdZMXR4lmT2O11jDLTPCEqIe/FR6W4=
github.com/samuel/go-zookeeper v0.0.0-20171117190445-471cd4e61d7a/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= github.com/samuel/go-zookeeper v0.0.0-20171117190445-471cd4e61d7a/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.0 h1:LThGCOvhuJic9Gyd1VBCkhyUXmO8vKaBFvBsJ2k03rg=
github.com/stretchr/testify v1.2.0/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.0/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View file

@ -1,21 +0,0 @@
package realis
import (
"context"
"github.com/paypal/gorealis/gen-go/apache/aurora"
)
func (r *realisClient) jobExists(key aurora.JobKey) (bool, error) {
resp, err := r.client.GetConfigSummary(context.TODO(), &key)
if err != nil {
return false, err
}
return resp == nil ||
resp.GetResult_() == nil ||
resp.GetResult_().GetConfigSummaryResult_() == nil ||
resp.GetResult_().GetConfigSummaryResult_().GetSummary() == nil ||
resp.GetResponseCode() != aurora.ResponseCode_OK,
nil
}

89
job.go
View file

@ -62,7 +62,6 @@ type Job interface {
PartitionPolicy(policy *aurora.PartitionPolicy) Job PartitionPolicy(policy *aurora.PartitionPolicy) Job
Tier(tier string) Job Tier(tier string) Job
SlaPolicy(policy *aurora.SlaPolicy) Job SlaPolicy(policy *aurora.SlaPolicy) Job
Priority(priority int32) Job
} }
type resourceType int type resourceType int
@ -74,15 +73,12 @@ const (
GPU GPU
) )
const portNamePrefix = "org.apache.aurora.port."
// AuroraJob is a structure to collect all information pertaining to an Aurora job. // AuroraJob is a structure to collect all information pertaining to an Aurora job.
type AuroraJob struct { type AuroraJob struct {
jobConfig *aurora.JobConfiguration jobConfig *aurora.JobConfiguration
resources map[resourceType]*aurora.Resource resources map[resourceType]*aurora.Resource
metadata map[string]*aurora.Metadata metadata map[string]*aurora.Metadata
constraints map[string]*aurora.Constraint portCount int
portCount int
} }
// NewJob is used to create a Job object with everything initialized. // NewJob is used to create a Job object with everything initialized.
@ -113,11 +109,10 @@ func NewJob() Job {
diskMb.DiskMb = new(int64) diskMb.DiskMb = new(int64)
return &AuroraJob{ return &AuroraJob{
jobConfig: jobConfig, jobConfig: jobConfig,
resources: resources, resources: resources,
metadata: make(map[string]*aurora.Metadata), metadata: make(map[string]*aurora.Metadata),
constraints: make(map[string]*aurora.Constraint), portCount: 0,
portCount: 0,
} }
} }
@ -263,12 +258,12 @@ func (j *AuroraJob) AddURIs(extract bool, cache bool, values ...string) Job {
// AddLabel adds a Mesos label to the job. Note that Aurora will add the // AddLabel adds a Mesos label to the job. Note that Aurora will add the
// prefix "org.apache.aurora.metadata." to the beginning of each key. // prefix "org.apache.aurora.metadata." to the beginning of each key.
func (j *AuroraJob) AddLabel(key string, value string) Job { func (j *AuroraJob) AddLabel(key string, value string) Job {
if _, ok := j.metadata[key]; !ok { if _, ok := j.metadata[key]; ok {
j.metadata[key] = &aurora.Metadata{Key: key} j.metadata[key].Value = value
} else {
j.metadata[key] = &aurora.Metadata{Key: key, Value: value}
j.jobConfig.TaskConfig.Metadata = append(j.jobConfig.TaskConfig.Metadata, j.metadata[key]) j.jobConfig.TaskConfig.Metadata = append(j.jobConfig.TaskConfig.Metadata, j.metadata[key])
} }
j.metadata[key].Value = value
return j return j
} }
@ -293,7 +288,7 @@ func (j *AuroraJob) AddPorts(num int) Job {
start := j.portCount start := j.portCount
j.portCount += num j.portCount += num
for i := start; i < j.portCount; i++ { for i := start; i < j.portCount; i++ {
portName := portNamePrefix + strconv.Itoa(i) portName := "org.apache.aurora.port." + strconv.Itoa(i)
j.jobConfig.TaskConfig.Resources = append( j.jobConfig.TaskConfig.Resources = append(
j.jobConfig.TaskConfig.Resources, j.jobConfig.TaskConfig.Resources,
&aurora.Resource{NamedPort: &portName}) &aurora.Resource{NamedPort: &portName})
@ -302,56 +297,47 @@ func (j *AuroraJob) AddPorts(num int) Job {
return j return j
} }
// AddValueConstraint allows the user to add a value constrain to the job to limit which agents the job's // AddValueConstraint allows the user to add a value constrain to the job to limiti which agents the job's
// tasks can be run on. If the name matches a constraint that was previously set, the previous value will be // tasks can be run on.
// overwritten. In case the previous constraint attached to the name was of type limit, the constraint will be clobbered
// by this new Value constraint.
// From Aurora Docs: // From Aurora Docs:
// Add a Value constraint // Add a Value constraint
// name - Mesos slave attribute that the constraint is matched against. // name - Mesos slave attribute that the constraint is matched against.
// If negated = true , treat this as a 'not' - to avoid specific values. // If negated = true , treat this as a 'not' - to avoid specific values.
// Values - list of values we look for in attribute name // Values - list of values we look for in attribute name
func (j *AuroraJob) AddValueConstraint(name string, negated bool, values ...string) Job { func (j *AuroraJob) AddValueConstraint(name string, negated bool, values ...string) Job {
if _, ok := j.constraints[name]; !ok { j.jobConfig.TaskConfig.Constraints = append(j.jobConfig.TaskConfig.Constraints,
j.constraints[name] = &aurora.Constraint{Name: name} &aurora.Constraint{
j.jobConfig.TaskConfig.Constraints = append(j.jobConfig.TaskConfig.Constraints, j.constraints[name]) Name: name,
} Constraint: &aurora.TaskConstraint{
Value: &aurora.ValueConstraint{
j.constraints[name].Constraint = &aurora.TaskConstraint{ Negated: negated,
Value: &aurora.ValueConstraint{ Values: values,
Negated: negated, },
Values: values, Limit: nil,
}, },
Limit: nil, })
}
return j return j
} }
// AddLimitConstraint allows the user to limit how many tasks form the same Job are run on a single host. // AddLimitConstraint allows the user to limit how many tasks form the same Job are run on a single host.
// If the name matches a constraint that was previously set, the previous value will be
// overwritten. In case the previous constraint attached to the name was of type Value, the constraint will be clobbered
// by this new Limit constraint.
// From Aurora Docs: // From Aurora Docs:
// A constraint that specifies the maximum number of active tasks on a host with // A constraint that specifies the maximum number of active tasks on a host with
// a matching attribute that may be scheduled simultaneously. // a matching attribute that may be scheduled simultaneously.
func (j *AuroraJob) AddLimitConstraint(name string, limit int32) Job { func (j *AuroraJob) AddLimitConstraint(name string, limit int32) Job {
if _, ok := j.constraints[name]; !ok { j.jobConfig.TaskConfig.Constraints = append(j.jobConfig.TaskConfig.Constraints,
j.constraints[name] = &aurora.Constraint{Name: name} &aurora.Constraint{
j.jobConfig.TaskConfig.Constraints = append(j.jobConfig.TaskConfig.Constraints, j.constraints[name]) Name: name,
} Constraint: &aurora.TaskConstraint{
Value: nil,
j.constraints[name].Constraint = &aurora.TaskConstraint{ Limit: &aurora.LimitConstraint{Limit: limit},
Value: nil, },
Limit: &aurora.LimitConstraint{Limit: limit}, })
}
return j return j
} }
// AddDedicatedConstraint is a convenience function that allows the user to // AddDedicatedConstraint allows the user to add a dedicated constraint to a Job configuration.
// add a dedicated constraint to a Job configuration.
// In case a previous dedicated constraint was set, it will be clobbered by this new value.
func (j *AuroraJob) AddDedicatedConstraint(role, name string) Job { func (j *AuroraJob) AddDedicatedConstraint(role, name string) Job {
j.AddValueConstraint("dedicated", false, role+"/"+name) j.AddValueConstraint("dedicated", false, role+"/"+name)
@ -384,8 +370,3 @@ func (j *AuroraJob) SlaPolicy(policy *aurora.SlaPolicy) Job {
return j return j
} }
func (j *AuroraJob) Priority(priority int32) Job {
j.jobConfig.TaskConfig.Priority = priority
return j
}

View file

@ -78,11 +78,8 @@ func (m *Monitor) JobUpdateStatus(updateKey aurora.JobUpdateKey,
UpdateStatuses: desiredStatuses, UpdateStatuses: desiredStatuses,
} }
summary, err := m.JobUpdateQuery(updateQ, interval, timeout) summary, err := m.JobUpdateQuery(updateQ, interval, timeout)
if err != nil {
return 0, err
}
return summary[0].State.Status, nil return summary[0].State.Status, err
} }
// JobUpdateQuery polls the scheduler every certain amount of time to see if the query call returns any results. // JobUpdateQuery polls the scheduler every certain amount of time to see if the query call returns any results.
@ -117,7 +114,7 @@ func (m *Monitor) JobUpdateQuery(
} }
} }
// AutoPausedUpdateMonitor is a special monitor for auto pause enabled batch updates. This monitor ensures that the update // AutoPaused monitor is a special monitor for auto pause enabled batch updates. This monitor ensures that the update
// being monitored is capable of auto pausing and has auto pausing enabled. After verifying this information, // being monitored is capable of auto pausing and has auto pausing enabled. After verifying this information,
// the monitor watches for the job to enter the ROLL_FORWARD_PAUSED state and calculates the current batch // the monitor watches for the job to enter the ROLL_FORWARD_PAUSED state and calculates the current batch
// the update is in using information from the update configuration. // the update is in using information from the update configuration.
@ -168,8 +165,7 @@ func (m *Monitor) AutoPausedUpdateMonitor(key aurora.JobUpdateKey, interval, tim
return -1, err return -1, err
} }
if !(summary[0].State.Status == aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED || if summary[0].State.Status != aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED {
summary[0].State.Status == aurora.JobUpdateStatus_ROLLED_FORWARD) {
return -1, errors.Errorf("update is in a terminal state %v", summary[0].State.Status) return -1, errors.Errorf("update is in a terminal state %v", summary[0].State.Status)
} }
@ -184,7 +180,7 @@ func (m *Monitor) AutoPausedUpdateMonitor(key aurora.JobUpdateKey, interval, tim
return calculateCurrentBatch(int32(len(updatingInstances)), batchSizes), nil return calculateCurrentBatch(int32(len(updatingInstances)), batchSizes), nil
} }
// Instances will monitor a Job until all instances enter one of the LIVE_STATES // Monitor a Job until all instances enter one of the LIVE_STATES
func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval, timeout int) (bool, error) { func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval, timeout int) (bool, error) {
return m.ScheduleStatus(key, instances, LiveStates, interval, timeout) return m.ScheduleStatus(key, instances, LiveStates, interval, timeout)
} }

273
realis.go
View file

@ -18,11 +18,14 @@ package realis
import ( import (
"context" "context"
"crypto/tls" "crypto/tls"
"crypto/x509"
"encoding/base64" "encoding/base64"
"fmt" "fmt"
"io/ioutil"
"log" "log"
"net/http" "net/http"
"os" "os"
"path/filepath"
"sort" "sort"
"strings" "strings"
"sync" "sync"
@ -35,7 +38,7 @@ import (
"github.com/paypal/gorealis/response" "github.com/paypal/gorealis/response"
) )
const version = "1.24.1" const version = "1.21.1"
// Realis is an interface that defines the various APIs that may be used to communicate with // Realis is an interface that defines the various APIs that may be used to communicate with
// the Apache Aurora scheduler. // the Apache Aurora scheduler.
@ -65,6 +68,7 @@ type Realis interface {
RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error)
ScheduleCronJob(auroraJob Job) (*aurora.Response, error) ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error)
PauseJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error) PauseJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error)
ResumeJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error) ResumeJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error)
PulseJobUpdate(key *aurora.JobUpdateKey) (*aurora.Response, error) PulseJobUpdate(key *aurora.JobUpdateKey) (*aurora.Response, error)
@ -113,7 +117,6 @@ type config struct {
logger *LevelLogger logger *LevelLogger
insecureSkipVerify bool insecureSkipVerify bool
certspath string certspath string
certExtensions map[string]struct{}
clientKey, clientCert string clientKey, clientCert string
options []ClientOption options []ClientOption
debug bool debug bool
@ -129,15 +132,6 @@ var defaultBackoff = Backoff{
Jitter: 0.1, Jitter: 0.1,
} }
var defaultSlaPolicy = aurora.SlaPolicy{
PercentageSlaPolicy: &aurora.PercentageSlaPolicy{
Percentage: 66,
DurationSecs: 300,
},
}
const defaultSlaDrainTimeoutSecs = 900
// ClientOption is an alias for a function that modifies the realis config object // ClientOption is an alias for a function that modifies the realis config object
type ClientOption func(*config) type ClientOption func(*config)
@ -235,18 +229,6 @@ func ClientCerts(clientKey, clientCert string) ClientOption {
} }
} }
// CertExtensions configures gorealis to consider files with the given extensions when
// loading certificates from the cert path.
func CertExtensions(extensions ...string) ClientOption {
extensionsLookup := make(map[string]struct{})
for _, ext := range extensions {
extensionsLookup[ext] = struct{}{}
}
return func(config *config) {
config.certExtensions = extensionsLookup
}
}
// ZookeeperOptions allows users to override default settings for connecting to Zookeeper. // ZookeeperOptions allows users to override default settings for connecting to Zookeeper.
// See zk.go for what is possible to set as an option. // See zk.go for what is possible to set as an option.
func ZookeeperOptions(opts ...ZKOpt) ClientOption { func ZookeeperOptions(opts ...ZKOpt) ClientOption {
@ -329,7 +311,6 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
config.timeoutms = 10000 config.timeoutms = 10000
config.backoff = defaultBackoff config.backoff = defaultBackoff
config.logger = &LevelLogger{logger: log.New(os.Stdout, "realis: ", log.Ltime|log.Ldate|log.LUTC)} config.logger = &LevelLogger{logger: log.New(os.Stdout, "realis: ", log.Ltime|log.Ldate|log.LUTC)}
config.certExtensions = map[string]struct{}{".crt": {}, ".pem": {}, ".key": {}}
// Save options to recreate client if a connection error happens // Save options to recreate client if a connection error happens
config.options = options config.options = options
@ -394,7 +375,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
return nil, errors.New("incomplete Options -- url, cluster.json, or Zookeeper address required") return nil, errors.New("incomplete Options -- url, cluster.json, or Zookeeper address required")
} }
config.logger.Println("Address obtained: ", url) config.logger.Println("Addresss obtained: ", url)
url, err = validateAuroraURL(url) url, err = validateAuroraURL(url)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "invalid Aurora url") return nil, errors.Wrap(err, "invalid Aurora url")
@ -436,8 +417,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory), adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory),
logger: LevelLogger{logger: config.logger, debug: config.debug, trace: config.trace}, logger: LevelLogger{logger: config.logger, debug: config.debug, trace: config.trace},
lock: &sync.Mutex{}, lock: &sync.Mutex{},
transport: config.transport, transport: config.transport}, nil
}, nil
} }
// GetDefaultClusterFromZKUrl creates a cluster object from a Zoookeper url. This is deprecated in favor of using // GetDefaultClusterFromZKUrl creates a cluster object from a Zoookeper url. This is deprecated in favor of using
@ -453,6 +433,23 @@ func GetDefaultClusterFromZKUrl(zkurl string) *Cluster {
} }
} }
func createCertPool(certPath string) (*x509.CertPool, error) {
globalRootCAs := x509.NewCertPool()
caFiles, err := ioutil.ReadDir(certPath)
if err != nil {
return nil, err
}
for _, cert := range caFiles {
caPathFile := filepath.Join(certPath, cert.Name())
caCert, err := ioutil.ReadFile(caPathFile)
if err != nil {
return nil, err
}
globalRootCAs.AppendCertsFromPEM(caCert)
}
return globalRootCAs, nil
}
// Creates a default Thrift Transport object for communications in gorealis using an HTTP Post Client // Creates a default Thrift Transport object for communications in gorealis using an HTTP Post Client
func defaultTTransport(url string, timeoutMs int, config *config) (thrift.TTransport, error) { func defaultTTransport(url string, timeoutMs int, config *config) (thrift.TTransport, error) {
var transport http.Transport var transport http.Transport
@ -460,7 +457,7 @@ func defaultTTransport(url string, timeoutMs int, config *config) (thrift.TTrans
tlsConfig := &tls.Config{InsecureSkipVerify: config.insecureSkipVerify} tlsConfig := &tls.Config{InsecureSkipVerify: config.insecureSkipVerify}
if config.certspath != "" { if config.certspath != "" {
rootCAs, err := createCertPool(config.certspath, config.certExtensions) rootCAs, err := createCertPool(config.certspath)
if err != nil { if err != nil {
config.logger.Println("error occurred couldn't fetch certs") config.logger.Println("error occurred couldn't fetch certs")
return nil, err return nil, err
@ -494,11 +491,11 @@ func defaultTTransport(url string, timeoutMs int, config *config) (thrift.TTrans
}) })
if err != nil { if err != nil {
return nil, errors.Wrap(err, "error creating transport") return nil, errors.Wrap(err, "Error creating transport")
} }
if err := trans.Open(); err != nil { if err := trans.Open(); err != nil {
return nil, errors.Wrapf(err, "error opening connection to %s", url) return nil, errors.Wrapf(err, "Error opening connection to %s", url)
} }
return trans, nil return trans, nil
@ -512,10 +509,6 @@ func basicAuth(username, password string) string {
func (r *realisClient) ReestablishConn() error { func (r *realisClient) ReestablishConn() error {
// Close existing connection // Close existing connection
r.logger.Println("Re-establishing Connection to Aurora") r.logger.Println("Re-establishing Connection to Aurora")
// This call must happen before we lock as it also uses
// the same lock from the client since close can be called
// by anyone from anywhere.
r.Close() r.Close()
r.lock.Lock() r.lock.Lock()
@ -541,17 +534,16 @@ func (r *realisClient) ReestablishConn() error {
return nil return nil
} }
// Close releases resources associated with the realis client. // Releases resources associated with the realis client.
func (r *realisClient) Close() { func (r *realisClient) Close() {
r.lock.Lock() r.lock.Lock()
defer r.lock.Unlock() defer r.lock.Unlock()
// The return value of Close here is ignored on purpose because there's nothing that can be done if it fails. r.transport.Close()
_ = r.transport.Close()
} }
// GetInstanceIds uses a predefined set of states to retrieve a set of active jobs in the Aurora Scheduler. // Uses predefined set of states to retrieve a set of active jobs in Apache Aurora.
func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states []aurora.ScheduleStatus) ([]int32, error) { func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states []aurora.ScheduleStatus) ([]int32, error) {
taskQ := &aurora.TaskQuery{ taskQ := &aurora.TaskQuery{
JobKeys: []*aurora.JobKey{{Environment: key.Environment, Role: key.Role, Name: key.Name}}, JobKeys: []*aurora.JobKey{{Environment: key.Environment, Role: key.Role, Name: key.Name}},
@ -564,9 +556,7 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states []aurora.Schedu
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.GetTasksWithoutConfigs(context.TODO(), taskQ) return r.client.GetTasksWithoutConfigs(context.TODO(), taskQ)
}, })
nil,
)
// If we encountered an error we couldn't recover from by retrying, return an error to the user // If we encountered an error we couldn't recover from by retrying, return an error to the user
if retryErr != nil { if retryErr != nil {
@ -591,16 +581,10 @@ func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.readonlyClient.GetJobUpdateSummaries(context.TODO(), jobUpdateQuery) return r.readonlyClient.GetJobUpdateSummaries(context.TODO(), jobUpdateQuery)
}, })
nil,
)
if retryErr != nil { if retryErr != nil {
return resp, errors.Wrap(retryErr, "error getting job update summaries from Aurora Scheduler") return nil, errors.Wrap(retryErr, "error getting job update summaries from Aurora Scheduler")
}
if resp.GetResult_() == nil || resp.GetResult_().GetGetJobUpdateSummariesResult_() == nil {
return nil, errors.New("unexpected response from scheduler")
} }
return resp, nil return resp, nil
@ -614,9 +598,7 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.readonlyClient.GetJobs(context.TODO(), role) return r.readonlyClient.GetJobs(context.TODO(), role)
}, })
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, result, errors.Wrap(retryErr, "error getting Jobs from Aurora Scheduler") return nil, result, errors.Wrap(retryErr, "error getting Jobs from Aurora Scheduler")
@ -629,7 +611,7 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe
return resp, result, nil return resp, result, nil
} }
// KillInstances kills specific instances of a job. // Kill specific instances of a job.
func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) { func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) {
r.logger.debugPrintf("KillTasks Thrift Payload: %+v %v\n", key, instances) r.logger.debugPrintf("KillTasks Thrift Payload: %+v %v\n", key, instances)
@ -637,9 +619,7 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.KillTasks(context.TODO(), key, instances, "") return r.client.KillTasks(context.TODO(), key, instances, "")
}, })
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler")
@ -651,7 +631,7 @@ func (r *realisClient) RealisConfig() *config {
return r.config return r.config
} }
// KillJob kills all instances of a job. // Sends a kill message to the scheduler for all active tasks under a job.
func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
r.logger.debugPrintf("KillTasks Thrift Payload: %+v\n", key) r.logger.debugPrintf("KillTasks Thrift Payload: %+v\n", key)
@ -661,9 +641,7 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
// Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards // Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards
return r.client.KillTasks(context.TODO(), key, nil, "") return r.client.KillTasks(context.TODO(), key, nil, "")
}, })
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler")
@ -671,7 +649,7 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
return resp, nil return resp, nil
} }
// CreateJob sends a create job message to the scheduler with a specific job configuration. // Sends a create job message to the scheduler with a specific job configuration.
// Although this API is able to create service jobs, it is better to use CreateService instead // Although this API is able to create service jobs, it is better to use CreateService instead
// as that API uses the update thrift call which has a few extra features available. // as that API uses the update thrift call which has a few extra features available.
// Use this API to create ad-hoc jobs. // Use this API to create ad-hoc jobs.
@ -679,36 +657,19 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
r.logger.debugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig()) r.logger.debugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig())
// Response is checked by the thrift retry code
resp, retryErr := r.thriftCallWithRetries( resp, retryErr := r.thriftCallWithRetries(
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.CreateJob(context.TODO(), auroraJob.JobConfig()) return r.client.CreateJob(context.TODO(), auroraJob.JobConfig())
}, })
// On a client timeout, attempt to verify that payload made to the Scheduler by
// trying to get the config summary for the job key
func() (*aurora.Response, bool) {
exists, err := r.jobExists(*auroraJob.JobKey())
if err != nil {
r.logger.Print("verification failed ", err)
}
if exists {
return &aurora.Response{ResponseCode: aurora.ResponseCode_OK}, true
}
return nil, false
},
)
if retryErr != nil { if retryErr != nil {
return resp, errors.Wrap(retryErr, "error sending Create command to Aurora Scheduler") return resp, errors.Wrap(retryErr, "error sending Create command to Aurora Scheduler")
} }
return resp, nil return resp, nil
} }
// CreateService uses the scheduler's updating mechanism to create a job. // This API uses an update thrift call to create the services giving a few more robust features.
func (r *realisClient) CreateService( func (r *realisClient) CreateService(
auroraJob Job, auroraJob Job,
settings *aurora.JobUpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error) { settings *aurora.JobUpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error) {
@ -719,12 +680,17 @@ func (r *realisClient) CreateService(
resp, err := r.StartJobUpdate(update, "") resp, err := r.StartJobUpdate(update, "")
if err != nil { if err != nil {
if IsTimeout(err) { if IsTimeout(err) {
return nil, nil, err return resp, nil, err
} }
return resp, nil, errors.Wrap(err, "unable to create service") return resp, nil, errors.Wrap(err, "unable to create service")
} }
return resp, resp.GetResult_().StartJobUpdateResult_, nil if resp.GetResult_() != nil {
return resp, resp.GetResult_().GetStartJobUpdateResult_(), nil
}
return nil, nil, errors.New("results object is nil")
} }
func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) { func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) {
@ -734,9 +700,7 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.ScheduleCronJob(context.TODO(), auroraJob.JobConfig()) return r.client.ScheduleCronJob(context.TODO(), auroraJob.JobConfig())
}, })
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending Cron Job Schedule message to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending Cron Job Schedule message to Aurora Scheduler")
@ -752,9 +716,7 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response,
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.DescheduleCronJob(context.TODO(), key) return r.client.DescheduleCronJob(context.TODO(), key)
}, })
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending Cron Job De-schedule message to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending Cron Job De-schedule message to Aurora Scheduler")
@ -772,9 +734,7 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.StartCronJob(context.TODO(), key) return r.client.StartCronJob(context.TODO(), key)
}, })
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending Start Cron Job message to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending Start Cron Job message to Aurora Scheduler")
@ -783,7 +743,7 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error
} }
// RestartInstances restarts the specified instances of a Job. // Restarts specific instances specified
func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) { func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) {
r.logger.debugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instances) r.logger.debugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instances)
@ -791,9 +751,7 @@ func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32)
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.RestartShards(context.TODO(), key, instances) return r.client.RestartShards(context.TODO(), key, instances)
}, })
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler")
@ -801,12 +759,12 @@ func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32)
return resp, nil return resp, nil
} }
// RestartJob restarts all active instances of a Job. // Restarts all active tasks under a job configuration.
func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) { func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) {
instanceIds, err1 := r.GetInstanceIds(key, aurora.ACTIVE_STATES) instanceIds, err1 := r.GetInstanceIds(key, aurora.ACTIVE_STATES)
if err1 != nil { if err1 != nil {
return nil, errors.Wrap(err1, "could not retrieve relevant task instance IDs") return nil, errors.Wrap(err1, "Could not retrieve relevant task instance IDs")
} }
r.logger.debugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instanceIds) r.logger.debugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instanceIds)
@ -816,9 +774,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.RestartShards(context.TODO(), key, instanceIds) return r.client.RestartShards(context.TODO(), key, instanceIds)
}, })
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler")
@ -830,7 +786,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
return nil, errors.New("No tasks in the Active state") return nil, errors.New("No tasks in the Active state")
} }
// StartJobUpdate updates all instances under a job configuration. // Update all tasks under a job configuration. Currently gorealis doesn't support for canary deployments.
func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) { func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) {
r.logger.debugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message) r.logger.debugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message)
@ -839,56 +795,20 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au
true, true,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.StartJobUpdate(context.TODO(), updateJob.req, message) return r.client.StartJobUpdate(context.TODO(), updateJob.req, message)
}, })
func() (*aurora.Response, bool) {
summariesResp, err := r.readonlyClient.GetJobUpdateSummaries(
context.TODO(),
&aurora.JobUpdateQuery{
JobKey: updateJob.JobKey(),
UpdateStatuses: aurora.ACTIVE_JOB_UPDATE_STATES,
Limit: 1,
})
if err != nil {
r.logger.Print("verification failed ", err)
return nil, false
}
summaries := response.JobUpdateSummaries(summariesResp)
if len(summaries) == 0 {
return nil, false
}
return &aurora.Response{
ResponseCode: aurora.ResponseCode_OK,
Result_: &aurora.Result_{
StartJobUpdateResult_: &aurora.StartJobUpdateResult_{
UpdateSummary: summaries[0],
Key: summaries[0].Key,
},
},
}, true
},
)
if retryErr != nil { if retryErr != nil {
// A timeout took place when attempting this call, attempt to recover // A timeout took place when attempting this call, attempt to recover
if IsTimeout(retryErr) { if IsTimeout(retryErr) {
return nil, retryErr return resp, retryErr
} }
return resp, errors.Wrap(retryErr, "error sending StartJobUpdate command to Aurora Scheduler") return resp, errors.Wrap(retryErr, "error sending StartJobUpdate command to Aurora Scheduler")
} }
if resp.GetResult_() == nil {
return resp, errors.New("no result in response")
}
return resp, nil return resp, nil
} }
// AbortJobUpdate terminates a job update in the scheduler. // Abort Job Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI.
// It requires the updateId which can be obtained on the Aurora web UI.
// This API is meant to be synchronous. It will attempt to wait until the update transitions to the aborted state. // This API is meant to be synchronous. It will attempt to wait until the update transitions to the aborted state.
// However, if the job update does not transition to the ABORT state an error will be returned. // However, if the job update does not transition to the ABORT state an error will be returned.
func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) { func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) {
@ -899,9 +819,7 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.AbortJobUpdate(context.TODO(), &updateKey, message) return r.client.AbortJobUpdate(context.TODO(), &updateKey, message)
}, })
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending AbortJobUpdate command to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending AbortJobUpdate command to Aurora Scheduler")
@ -918,8 +836,7 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str
return resp, err return resp, err
} }
// PauseJobUpdate pauses the progress of an ongoing update. // Pause Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.
// The UpdateID value needed for this function is returned from StartJobUpdate or can be obtained from the Aurora web UI.
func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) { func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) {
r.logger.debugPrintf("PauseJobUpdate Thrift Payload: %+v %v\n", updateKey, message) r.logger.debugPrintf("PauseJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
@ -928,9 +845,7 @@ func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.PauseJobUpdate(context.TODO(), updateKey, message) return r.client.PauseJobUpdate(context.TODO(), updateKey, message)
}, })
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending PauseJobUpdate command to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending PauseJobUpdate command to Aurora Scheduler")
@ -939,7 +854,7 @@ func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st
return resp, nil return resp, nil
} }
// ResumeJobUpdate resumes a previously Paused Job update. // Resume Paused Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.
func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) { func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) {
r.logger.debugPrintf("ResumeJobUpdate Thrift Payload: %+v %v\n", updateKey, message) r.logger.debugPrintf("ResumeJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
@ -948,9 +863,7 @@ func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.ResumeJobUpdate(context.TODO(), updateKey, message) return r.client.ResumeJobUpdate(context.TODO(), updateKey, message)
}, })
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending ResumeJobUpdate command to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending ResumeJobUpdate command to Aurora Scheduler")
@ -959,7 +872,7 @@ func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s
return resp, nil return resp, nil
} }
// PulseJobUpdate sends a pulse to an ongoing Job update. // Pulse Job Update on Aurora. UpdateID is returned from StartJobUpdate or the Aurora web UI.
func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.Response, error) { func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.Response, error) {
r.logger.debugPrintf("PulseJobUpdate Thrift Payload: %+v\n", updateKey) r.logger.debugPrintf("PulseJobUpdate Thrift Payload: %+v\n", updateKey)
@ -968,9 +881,7 @@ func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.PulseJobUpdate(context.TODO(), updateKey) return r.client.PulseJobUpdate(context.TODO(), updateKey)
}, })
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending PulseJobUpdate command to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending PulseJobUpdate command to Aurora Scheduler")
@ -979,7 +890,8 @@ func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R
return resp, nil return resp, nil
} }
// AddInstances scales up the number of instances for a Job. // Scale up the number of instances under a job configuration using the configuration for specific
// instance to scale up.
func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) { func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) {
r.logger.debugPrintf("AddInstances Thrift Payload: %+v %v\n", instKey, count) r.logger.debugPrintf("AddInstances Thrift Payload: %+v %v\n", instKey, count)
@ -988,9 +900,7 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.AddInstances(context.TODO(), &instKey, count) return r.client.AddInstances(context.TODO(), &instKey, count)
}, })
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error sending AddInstances command to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending AddInstances command to Aurora Scheduler")
@ -999,15 +909,15 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a
} }
// RemoveInstances scales down the number of instances for a Job. // Scale down the number of instances under a job configuration using the configuration of a specific instance
func (r *realisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora.Response, error) { func (r *realisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora.Response, error) {
instanceIds, err := r.GetInstanceIds(key, aurora.ACTIVE_STATES) instanceIds, err := r.GetInstanceIds(key, aurora.ACTIVE_STATES)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "could not retrieve relevant instance IDs") return nil, errors.Wrap(err, "RemoveInstances: Could not retrieve relevant instance IDs")
} }
if len(instanceIds) < int(count) { if len(instanceIds) < int(count) {
return nil, errors.Errorf("insufficient active instances available for killing: "+ return nil, errors.Errorf("Insufficient active instances available for killing: "+
" Instances to be killed %d Active instances %d", count, len(instanceIds)) " Instances to be killed %d Active instances %d", count, len(instanceIds))
} }
@ -1020,7 +930,7 @@ func (r *realisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora
return r.KillInstances(key, instanceIds[:count]...) return r.KillInstances(key, instanceIds[:count]...)
} }
// GetTaskStatus gets information about task including a fully hydrated task configuration object. // Get information about task including a fully hydrated task configuration object
func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) { func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) {
r.logger.debugPrintf("GetTasksStatus Thrift Payload: %+v\n", query) r.logger.debugPrintf("GetTasksStatus Thrift Payload: %+v\n", query)
@ -1029,9 +939,7 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.Schedul
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.GetTasksStatus(context.TODO(), query) return r.client.GetTasksStatus(context.TODO(), query)
}, })
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status") return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status")
@ -1040,7 +948,7 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.Schedul
return response.ScheduleStatusResult(resp).GetTasks(), nil return response.ScheduleStatusResult(resp).GetTasks(), nil
} }
// GetPendingReason returns the reason why the an instance of a Job has not been scheduled. // Get pending reason
func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.PendingReason, error) { func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.PendingReason, error) {
r.logger.debugPrintf("GetPendingReason Thrift Payload: %+v\n", query) r.logger.debugPrintf("GetPendingReason Thrift Payload: %+v\n", query)
@ -1049,9 +957,7 @@ func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.Pend
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.GetPendingReason(context.TODO(), query) return r.client.GetPendingReason(context.TODO(), query)
}, })
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for pending Reasons") return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for pending Reasons")
@ -1066,8 +972,7 @@ func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.Pend
return pendingReasons, nil return pendingReasons, nil
} }
// GetTasksWithoutConfigs gets information about task including without a task configuration object. // Get information about task including without a task configuration object
// This is a more lightweight version of GetTaskStatus but contains less information as a result.
func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) { func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) {
r.logger.debugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query) r.logger.debugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query)
@ -1076,9 +981,7 @@ func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*auror
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.GetTasksWithoutConfigs(context.TODO(), query) return r.client.GetTasksWithoutConfigs(context.TODO(), query)
}, })
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status without configs") return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status without configs")
@ -1088,7 +991,7 @@ func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*auror
} }
// FetchTaskConfig gets the task configuration from the aurora scheduler for a job. // Get the task configuration from the aurora scheduler for a job
func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error) { func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error) {
taskQ := &aurora.TaskQuery{ taskQ := &aurora.TaskQuery{
Role: &instKey.JobKey.Role, Role: &instKey.JobKey.Role,
@ -1104,9 +1007,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.GetTasksStatus(context.TODO(), taskQ) return r.client.GetTasksStatus(context.TODO(), taskQ)
}, })
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task configuration") return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task configuration")
@ -1115,7 +1016,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
tasks := response.ScheduleStatusResult(resp).GetTasks() tasks := response.ScheduleStatusResult(resp).GetTasks()
if len(tasks) == 0 { if len(tasks) == 0 {
return nil, errors.Errorf("instance %d for jobkey %s/%s/%s doesn't exist", return nil, errors.Errorf("Instance %d for jobkey %s/%s/%s doesn't exist",
instKey.InstanceId, instKey.InstanceId,
instKey.JobKey.Environment, instKey.JobKey.Environment,
instKey.JobKey.Role, instKey.JobKey.Role,
@ -1134,12 +1035,10 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.GetJobUpdateDetails(context.TODO(), &updateQuery) return r.client.GetJobUpdateDetails(context.TODO(), &updateQuery)
}, })
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "unable to get job update details") return nil, errors.Wrap(retryErr, "Unable to get job update details")
} }
return resp, nil return resp, nil
@ -1153,9 +1052,7 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.client.RollbackJobUpdate(context.TODO(), &key, message) return r.client.RollbackJobUpdate(context.TODO(), &key, message)
}, })
nil,
)
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "unable to roll back job update") return nil, errors.Wrap(retryErr, "unable to roll back job update")

View file

@ -30,9 +30,7 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.DrainHosts(context.TODO(), drainList) return r.adminClient.DrainHosts(context.TODO(), drainList)
}, })
nil,
)
if retryErr != nil { if retryErr != nil {
return resp, result, errors.Wrap(retryErr, "Unable to recover connection") return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
@ -58,18 +56,6 @@ func (r *realisClient) SLADrainHosts(
return nil, errors.New("no hosts provided to drain") return nil, errors.New("no hosts provided to drain")
} }
if policy == nil || policy.CountSetFieldsSlaPolicy() == 0 {
policy = &defaultSlaPolicy
r.logger.Printf("Warning: start draining with default sla policy %v", policy)
}
if timeout < 0 {
r.logger.Printf("Warning: timeout %d secs is invalid, draining with default timeout %d secs",
timeout,
defaultSlaDrainTimeoutSecs)
timeout = defaultSlaDrainTimeoutSecs
}
drainList := aurora.NewHosts() drainList := aurora.NewHosts()
drainList.HostNames = hosts drainList.HostNames = hosts
@ -79,9 +65,7 @@ func (r *realisClient) SLADrainHosts(
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.SlaDrainHosts(context.TODO(), drainList, policy, timeout) return r.adminClient.SlaDrainHosts(context.TODO(), drainList, policy, timeout)
}, })
nil,
)
if retryErr != nil { if retryErr != nil {
return result, errors.Wrap(retryErr, "Unable to recover connection") return result, errors.Wrap(retryErr, "Unable to recover connection")
@ -111,9 +95,7 @@ func (r *realisClient) StartMaintenance(hosts ...string) (*aurora.Response, *aur
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.StartMaintenance(context.TODO(), hostList) return r.adminClient.StartMaintenance(context.TODO(), hostList)
}, })
nil,
)
if retryErr != nil { if retryErr != nil {
return resp, result, errors.Wrap(retryErr, "Unable to recover connection") return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
@ -143,9 +125,7 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.EndMaintenance(context.TODO(), hostList) return r.adminClient.EndMaintenance(context.TODO(), hostList)
}, })
nil,
)
if retryErr != nil { if retryErr != nil {
return resp, result, errors.Wrap(retryErr, "Unable to recover connection") return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
@ -177,9 +157,7 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.MaintenanceStatus(context.TODO(), hostList) return r.adminClient.MaintenanceStatus(context.TODO(), hostList)
}, })
nil,
)
if retryErr != nil { if retryErr != nil {
return resp, result, errors.Wrap(retryErr, "Unable to recover connection") return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
@ -204,9 +182,7 @@ func (r *realisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.SetQuota(context.TODO(), role, quota) return r.adminClient.SetQuota(context.TODO(), role, quota)
}, })
nil,
)
if retryErr != nil { if retryErr != nil {
return resp, errors.Wrap(retryErr, "Unable to set role quota") return resp, errors.Wrap(retryErr, "Unable to set role quota")
@ -222,9 +198,7 @@ func (r *realisClient) GetQuota(role string) (*aurora.Response, error) {
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.GetQuota(context.TODO(), role) return r.adminClient.GetQuota(context.TODO(), role)
}, })
nil,
)
if retryErr != nil { if retryErr != nil {
return resp, errors.Wrap(retryErr, "Unable to get role quota") return resp, errors.Wrap(retryErr, "Unable to get role quota")
@ -239,9 +213,7 @@ func (r *realisClient) Snapshot() error {
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.Snapshot(context.TODO()) return r.adminClient.Snapshot(context.TODO())
}, })
nil,
)
if retryErr != nil { if retryErr != nil {
return errors.Wrap(retryErr, "Unable to recover connection") return errors.Wrap(retryErr, "Unable to recover connection")
@ -257,9 +229,7 @@ func (r *realisClient) PerformBackup() error {
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.PerformBackup(context.TODO()) return r.adminClient.PerformBackup(context.TODO())
}, })
nil,
)
if retryErr != nil { if retryErr != nil {
return errors.Wrap(retryErr, "Unable to recover connection") return errors.Wrap(retryErr, "Unable to recover connection")
@ -274,9 +244,7 @@ func (r *realisClient) ForceImplicitTaskReconciliation() error {
false, false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.TriggerImplicitTaskReconciliation(context.TODO()) return r.adminClient.TriggerImplicitTaskReconciliation(context.TODO())
}, })
nil,
)
if retryErr != nil { if retryErr != nil {
return errors.Wrap(retryErr, "Unable to recover connection") return errors.Wrap(retryErr, "Unable to recover connection")
@ -297,9 +265,7 @@ func (r *realisClient) ForceExplicitTaskReconciliation(batchSize *int32) error {
_, retryErr := r.thriftCallWithRetries(false, _, retryErr := r.thriftCallWithRetries(false,
func() (*aurora.Response, error) { func() (*aurora.Response, error) {
return r.adminClient.TriggerExplicitTaskReconciliation(context.TODO(), settings) return r.adminClient.TriggerExplicitTaskReconciliation(context.TODO(), settings)
}, })
nil,
)
if retryErr != nil { if retryErr != nil {
return errors.Wrap(retryErr, "Unable to recover connection") return errors.Wrap(retryErr, "Unable to recover connection")

View file

@ -306,40 +306,6 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) {
_, err = r.KillJob(job.JobKey()) _, err = r.KillJob(job.JobKey())
assert.NoError(t, err) assert.NoError(t, err)
}) })
t.Run("Duplicate_constraints", func(t *testing.T) {
job.Name("thermos_duplicate_constraints").
AddValueConstraint("zone", false, "east", "west").
AddValueConstraint("zone", false, "east").
AddValueConstraint("zone", true, "west")
_, err := r.CreateJob(job)
require.NoError(t, err)
success, err := monitor.Instances(job.JobKey(), 2, 1, 50)
assert.True(t, success)
assert.NoError(t, err)
_, err = r.KillJob(job.JobKey())
assert.NoError(t, err)
})
t.Run("Overwrite_constraints", func(t *testing.T) {
job.Name("thermos_overwrite_constraints").
AddLimitConstraint("zone", 1).
AddValueConstraint("zone", true, "west", "east").
AddLimitConstraint("zone", 2)
_, err := r.CreateJob(job)
require.NoError(t, err)
success, err := monitor.Instances(job.JobKey(), 2, 1, 50)
assert.True(t, success)
assert.NoError(t, err)
_, err = r.KillJob(job.JobKey())
assert.NoError(t, err)
})
} }
// Test configuring an executor that doesn't exist for CreateJob API // Test configuring an executor that doesn't exist for CreateJob API
@ -539,7 +505,7 @@ func TestRealisClient_CreateService(t *testing.T) {
timeoutClient, err := realis.NewRealisClient( timeoutClient, err := realis.NewRealisClient(
realis.SchedulerUrl(auroraURL), realis.SchedulerUrl(auroraURL),
realis.BasicAuth("aurora", "secret"), realis.BasicAuth("aurora", "secret"),
realis.TimeoutMS(5), realis.TimeoutMS(10),
) )
require.NoError(t, err) require.NoError(t, err)
defer timeoutClient.Close() defer timeoutClient.Close()
@ -750,53 +716,6 @@ func TestRealisClient_SLADrainHosts(t *testing.T) {
5, 5,
10) 10)
assert.NoError(t, err) assert.NoError(t, err)
// slaDrainHosts goes with default policy if no policy is specified
_, err = r.SLADrainHosts(nil, 30, hosts...)
require.NoError(t, err, "unable to drain host with SLA policy")
// Monitor change to DRAINING and DRAINED mode
hostResults, err = monitor.HostMaintenance(
hosts,
[]aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING},
1,
50)
assert.NoError(t, err)
assert.Equal(t, map[string]bool{"localhost": true}, hostResults)
_, _, err = r.EndMaintenance(hosts...)
require.NoError(t, err)
// Monitor change to DRAINING and DRAINED mode
_, err = monitor.HostMaintenance(
hosts,
[]aurora.MaintenanceMode{aurora.MaintenanceMode_NONE},
5,
10)
assert.NoError(t, err)
_, err = r.SLADrainHosts(&aurora.SlaPolicy{}, 30, hosts...)
require.NoError(t, err, "unable to drain host with SLA policy")
// Monitor change to DRAINING and DRAINED mode
hostResults, err = monitor.HostMaintenance(
hosts,
[]aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING},
1,
50)
assert.NoError(t, err)
assert.Equal(t, map[string]bool{"localhost": true}, hostResults)
_, _, err = r.EndMaintenance(hosts...)
require.NoError(t, err)
// Monitor change to DRAINING and DRAINED mode
_, err = monitor.HostMaintenance(
hosts,
[]aurora.MaintenanceMode{aurora.MaintenanceMode_NONE},
5,
10)
assert.NoError(t, err)
} }
// Test multiple go routines using a single connection // Test multiple go routines using a single connection
@ -1106,10 +1025,8 @@ func TestRealisClient_BatchAwareAutoPause(t *testing.T) {
assert.Equal(t, i, curStep) assert.Equal(t, i, curStep)
if i != len(updateGroups)-1 { _, err = r.ResumeJobUpdate(&key, "auto resuming test")
_, err = r.ResumeJobUpdate(&key, "auto resuming test") require.NoError(t, err)
require.NoError(t, err)
}
} }
_, err = r.KillJob(job.JobKey()) _, err = r.KillJob(job.JobKey())

View file

@ -36,10 +36,6 @@ func ScheduleStatusResult(resp *aurora.Response) *aurora.ScheduleStatusResult_ {
} }
func JobUpdateSummaries(resp *aurora.Response) []*aurora.JobUpdateSummary { func JobUpdateSummaries(resp *aurora.Response) []*aurora.JobUpdateSummary {
if resp.GetResult_() == nil || resp.GetResult_().GetGetJobUpdateSummariesResult_() == nil {
return nil
}
return resp.GetResult_().GetGetJobUpdateSummariesResult_().GetUpdateSummaries() return resp.GetResult_().GetGetJobUpdateSummariesResult_().GetUpdateSummaries()
} }

169
retry.go
View file

@ -114,19 +114,10 @@ func ExponentialBackoff(backoff Backoff, logger logger, condition ConditionFunc)
type auroraThriftCall func() (resp *aurora.Response, err error) type auroraThriftCall func() (resp *aurora.Response, err error)
// verifyOntimeout defines the type of function that will be used to verify whether a Thirft call to the Scheduler
// made it to the scheduler or not. In general, these types of functions will have to interact with the scheduler
// through the very same Thrift API which previously encountered a time out from the client.
// This means that the functions themselves should be kept to a minimum number of Thrift calls.
// It should also be noted that this is a best effort mechanism and
// is likely to fail for the same reasons that the original call failed.
type verifyOnTimeout func() (*aurora.Response, bool)
// Duplicates the functionality of ExponentialBackoff but is specifically targeted towards ThriftCalls. // Duplicates the functionality of ExponentialBackoff but is specifically targeted towards ThriftCalls.
func (r *realisClient) thriftCallWithRetries( func (r *realisClient) thriftCallWithRetries(
returnOnTimeout bool, returnOnTimeout bool,
thriftCall auroraThriftCall, thriftCall auroraThriftCall) (*aurora.Response, error) {
verifyOnTimeout verifyOnTimeout) (*aurora.Response, error) {
var resp *aurora.Response var resp *aurora.Response
var clientErr error var clientErr error
@ -146,7 +137,7 @@ func (r *realisClient) thriftCallWithRetries(
} }
r.logger.Printf( r.logger.Printf(
"A retryable error occurred during thrift call, backing off for %v before retry %v", "A retryable error occurred during thrift call, backing off for %v before retry %v\n",
adjusted, adjusted,
curStep) curStep)
@ -163,25 +154,45 @@ func (r *realisClient) thriftCallWithRetries(
resp, clientErr = thriftCall() resp, clientErr = thriftCall()
r.logger.tracePrintf("Aurora Thrift Call ended resp: %v clientErr: %v", resp, clientErr) r.logger.tracePrintf("Aurora Thrift Call ended resp: %v clientErr: %v\n", resp, clientErr)
}() }()
// Check if our thrift call is returning an error. // Check if our thrift call is returning an error. This is a retryable event as we don't know
// if it was caused by network issues.
if clientErr != nil { if clientErr != nil {
// Print out the error to the user // Print out the error to the user
r.logger.Printf("Client Error: %v", clientErr) r.logger.Printf("Client Error: %v\n", clientErr)
temporary, timedout := isConnectionError(clientErr) // Determine if error is a temporary URL error by going up the stack
if !temporary && r.RealisConfig().failOnPermanentErrors { e, ok := clientErr.(thrift.TTransportException)
return nil, errors.Wrap(clientErr, "permanent connection error") if ok {
} r.logger.debugPrint("Encountered a transport exception")
// There exists a corner case where thrift payload was received by Aurora but e, ok := e.Err().(*url.Error)
// connection timed out before Aurora was able to reply. if ok {
// Users can take special action on a timeout by using IsTimedout and reacting accordingly
// if they have configured the client to return on a timeout. // EOF error occurs when the server closes the read buffer of the client. This is common
if timedout && returnOnTimeout { // when the server is overloaded and should be retried. All other errors that are permanent
return resp, newTimedoutError(errors.New("client connection closed before server answer")) // will not be retried.
if e.Err != io.EOF && !e.Temporary() && r.RealisConfig().failOnPermanentErrors {
return nil, errors.Wrap(clientErr, "permanent connection error")
}
// Corner case where thrift payload was received by Aurora but connection timedout before Aurora was
// able to reply. In this case we will return whatever response was received and a TimedOut behaving
// error. Users can take special action on a timeout by using IsTimedout and reacting accordingly.
if e.Timeout() {
timeouts++
r.logger.debugPrintf(
"Client closed connection (timedout) %d times before server responded, "+
"consider increasing connection timeout",
timeouts)
if returnOnTimeout {
return resp, newTimedoutError(errors.New("client connection closed before server answer"))
}
}
}
} }
// In the future, reestablish connection should be able to check if it is actually possible // In the future, reestablish connection should be able to check if it is actually possible
@ -191,71 +202,48 @@ func (r *realisClient) thriftCallWithRetries(
if reestablishErr != nil { if reestablishErr != nil {
r.logger.debugPrintf("error re-establishing connection ", reestablishErr) r.logger.debugPrintf("error re-establishing connection ", reestablishErr)
} }
} else {
// If users did not opt for a return on timeout in order to react to a timedout error, // If there was no client error, but the response is nil, something went wrong.
// attempt to verify that the call made it to the scheduler after the connection was re-established. // Ideally, we'll never encounter this but we're placing a safeguard here.
if timedout { if resp == nil {
timeouts++ return nil, errors.New("response from aurora is nil")
r.logger.debugPrintf(
"Client closed connection %d times before server responded, "+
"consider increasing connection timeout",
timeouts)
// Allow caller to provide a function which checks if the original call was successful before
// it timed out.
if verifyOnTimeout != nil {
if verifyResp, ok := verifyOnTimeout(); ok {
r.logger.Print("verified that the call went through successfully after a client timeout")
// Response here might be different than the original as it is no longer constructed
// by the scheduler but mimicked.
// This is OK since the scheduler is very unlikely to change responses at this point in its
// development cycle but we must be careful to not return an incorrectly constructed response.
return verifyResp, nil
}
}
} }
// Retry the thrift payload // Check Response Code from thrift and make a decision to continue retrying or not.
continue switch responseCode := resp.GetResponseCode(); responseCode {
}
// If there was no client error, but the response is nil, something went wrong. // If the thrift call succeeded, stop retrying
// Ideally, we'll never encounter this but we're placing a safeguard here. case aurora.ResponseCode_OK:
if resp == nil { return resp, nil
return nil, errors.New("response from aurora is nil")
}
// Check Response Code from thrift and make a decision to continue retrying or not. // If the response code is transient, continue retrying
switch responseCode := resp.GetResponseCode(); responseCode { case aurora.ResponseCode_ERROR_TRANSIENT:
r.logger.Println("Aurora replied with Transient error code, retrying")
continue
// If the thrift call succeeded, stop retrying // Failure scenarios, these indicate a bad payload or a bad config. Stop retrying.
case aurora.ResponseCode_OK: case aurora.ResponseCode_INVALID_REQUEST,
return resp, nil aurora.ResponseCode_ERROR,
aurora.ResponseCode_AUTH_FAILED,
aurora.ResponseCode_JOB_UPDATING_ERROR:
r.logger.Printf("Terminal Response Code %v from Aurora, won't retry\n", resp.GetResponseCode().String())
return resp, errors.New(response.CombineMessage(resp))
// If the response code is transient, continue retrying // The only case that should fall down to here is a WARNING response code.
case aurora.ResponseCode_ERROR_TRANSIENT: // It is currently not used as a response in the scheduler so it is unknown how to handle it.
r.logger.Println("Aurora replied with Transient error code, retrying") default:
continue r.logger.debugPrintf("unhandled response code %v received from Aurora\n", responseCode)
return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String())
// Failure scenarios, these indicate a bad payload or a bad config. Stop retrying. }
case aurora.ResponseCode_INVALID_REQUEST,
aurora.ResponseCode_ERROR,
aurora.ResponseCode_AUTH_FAILED,
aurora.ResponseCode_JOB_UPDATING_ERROR:
r.logger.Printf("Terminal Response Code %v from Aurora, won't retry\n", resp.GetResponseCode().String())
return resp, errors.New(response.CombineMessage(resp))
// The only case that should fall down to here is a WARNING response code.
// It is currently not used as a response in the scheduler so it is unknown how to handle it.
default:
r.logger.debugPrintf("unhandled response code %v received from Aurora\n", responseCode)
return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String())
} }
} }
r.logger.debugPrintf("it took %v retries to complete this operation\n", curStep)
if curStep > 1 { if curStep > 1 {
r.config.logger.Printf("this thrift call was retried %d time(s)", curStep) r.config.logger.Printf("retried this thrift call %d time(s)", curStep)
} }
// Provide more information to the user wherever possible. // Provide more information to the user wherever possible.
@ -265,30 +253,3 @@ func (r *realisClient) thriftCallWithRetries(
return nil, newRetryError(errors.New("ran out of retries"), curStep) return nil, newRetryError(errors.New("ran out of retries"), curStep)
} }
// isConnectionError processes the error received by the client.
// The return values indicate weather this was determined to be a temporary error
// and weather it was determined to be a timeout error
func isConnectionError(err error) (bool, bool) {
// Determine if error is a temporary URL error by going up the stack
transportException, ok := err.(thrift.TTransportException)
if !ok {
return false, false
}
urlError, ok := transportException.Err().(*url.Error)
if !ok {
return false, false
}
// EOF error occurs when the server closes the read buffer of the client. This is common
// when the server is overloaded and we consider it temporary.
// All other which are not temporary as per the member function Temporary(),
// are considered not temporary (permanent).
if urlError.Err != io.EOF && !urlError.Temporary() {
return false, false
}
return true, urlError.Timeout()
}

View file

@ -1,4 +1,4 @@
#!/bin/bash #!/bin/bash
# Since we run our docker compose setup in bridge mode to be able to run on MacOS, we have to launch a Docker container within the bridge network in order to avoid any routing issues. # Since we run our docker compose setup in bridge mode to be able to run on MacOS, we have to launch a Docker container within the bridge network in order to avoid any routing issues.
docker run --rm -t -w /gorealis -v $GOPATH/pkg:/go/pkg -v $(pwd):/gorealis --network gorealis_aurora_cluster golang:1.16-buster go test -v github.com/paypal/gorealis $@ docker run --rm -t -v $(pwd):/go/src/github.com/paypal/gorealis --network gorealis_aurora_cluster golang:1.10-stretch go test -v github.com/paypal/gorealis $@

51
util.go
View file

@ -1,11 +1,7 @@
package realis package realis
import ( import (
"crypto/x509"
"io/ioutil"
"net/url" "net/url"
"os"
"path/filepath"
"strings" "strings"
"github.com/paypal/gorealis/gen-go/apache/aurora" "github.com/paypal/gorealis/gen-go/apache/aurora"
@ -29,7 +25,7 @@ var TerminalStates = make(map[aurora.ScheduleStatus]bool)
// ActiveJobUpdateStates - States a Job Update may be in where it is considered active. // ActiveJobUpdateStates - States a Job Update may be in where it is considered active.
var ActiveJobUpdateStates = make(map[aurora.JobUpdateStatus]bool) var ActiveJobUpdateStates = make(map[aurora.JobUpdateStatus]bool)
// TerminalUpdateStates returns a slice containing all the terminal states an update may be in. // TerminalJobUpdateStates returns a slice containing all the terminal states an update may end up in.
// This is a function in order to avoid having a slice that can be accidentally mutated. // This is a function in order to avoid having a slice that can be accidentally mutated.
func TerminalUpdateStates() []aurora.JobUpdateStatus { func TerminalUpdateStates() []aurora.JobUpdateStatus {
return []aurora.JobUpdateStatus{ return []aurora.JobUpdateStatus{
@ -69,49 +65,6 @@ func init() {
} }
} }
// createCertPool will attempt to load certificates into a certificate pool from a given directory.
// Only files with an extension contained in the extension map are considered.
// This function ignores any files that cannot be read successfully or cannot be added to the certPool
// successfully.
func createCertPool(path string, extensions map[string]struct{}) (*x509.CertPool, error) {
_, err := os.Stat(path)
if err != nil {
return nil, errors.Wrap(err, "unable to load certificates")
}
caFiles, err := ioutil.ReadDir(path)
if err != nil {
return nil, err
}
certPool := x509.NewCertPool()
loadedCerts := 0
for _, cert := range caFiles {
// Skip directories
if cert.IsDir() {
continue
}
// Skip any files that do not contain the right extension
if _, ok := extensions[filepath.Ext(cert.Name())]; !ok {
continue
}
pem, err := ioutil.ReadFile(filepath.Join(path, cert.Name()))
if err != nil {
continue
}
if certPool.AppendCertsFromPEM(pem) {
loadedCerts++
}
}
if loadedCerts == 0 {
return nil, errors.New("no certificates were able to be successfully loaded")
}
return certPool, nil
}
func validateAuroraURL(location string) (string, error) { func validateAuroraURL(location string) (string, error) {
// If no protocol defined, assume http // If no protocol defined, assume http
@ -139,7 +92,7 @@ func validateAuroraURL(location string) (string, error) {
return "", errors.Errorf("only protocols http and https are supported %v\n", u.Scheme) return "", errors.Errorf("only protocols http and https are supported %v\n", u.Scheme)
} }
// This could theoretically be elsewhere but we'll be strict for the sake of simplicity // This could theoretically be elsewhwere but we'll be strict for the sake of simplicty
if u.Path != apiPath { if u.Path != apiPath {
return "", errors.Errorf("expected /api path %v\n", u.Path) return "", errors.Errorf("expected /api path %v\n", u.Path)
} }

View file

@ -100,15 +100,3 @@ func TestCurrentBatchCalculator(t *testing.T) {
assert.Equal(t, 0, curBatch) assert.Equal(t, 0, curBatch)
}) })
} }
func TestCertPoolCreator(t *testing.T) {
extensions := map[string]struct{}{".crt": {}}
_, err := createCertPool("examples/certs", extensions)
assert.NoError(t, err)
t.Run("badDir", func(t *testing.T) {
_, err := createCertPool("idontexist", extensions)
assert.Error(t, err)
})
}