Compare commits

...

6 commits

Author SHA1 Message Date
lawwong1
d2fd7b9ba9
merge retry mechanism change from gorealis v1 to gorealis v2 (#21) 2023-01-26 13:36:40 -08:00
lawwong1
8db625730f
Support Australis API to get aurora master nodes and mesos master nodes (#20) 2022-08-24 08:51:12 -07:00
Tan N. Le
e33a2d99d8
release 2.28.0 (#19) 2022-08-02 09:55:36 -07:00
Tan N. Le
4258634ccf
Capacity report (#18)
- pull capacity report via /offers endpoint.
- calculate how many tasks (with resource and constraints) can be fit in the cluster.
examples of using the above 2 features are in aurora-scheduler/australis#33
2022-07-28 19:27:53 -07:00
Tan N. Le
5d0998647a
default policy for slaDrainHosts (#17) 2021-11-01 18:15:51 -07:00
Renán I. Del Valle
907430768c
Misc. fixes for tests (#16)
* Bumping up CI to go1.17 and enabling CI for PRs.

* Adding go.sum now that issues seem to have gone away.

* Bump up aurora to 0.25.0 and mesos to 1.9.0

* Fixing Mac tests. Adding extra time for killing thermos jobs.

* Reduce the thermos overhead for unit tests

Co-authored-by: lenhattan86 <lenhattan86@users.noreply.github.com>
2021-10-25 12:39:13 -07:00
22 changed files with 1807 additions and 253 deletions

View file

@ -1 +1 @@
0.21.0
0.26.0

View file

@ -1,18 +1,24 @@
name: CI
on: [push]
on:
push:
branches:
- master
pull_request:
branches:
- master
jobs:
build:
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v2
- name: Setup Go for use with actions
uses: actions/setup-go@v2
with:
go-version: 1.15
go-version: 1.17
- name: Install goimports
run: go get golang.org/x/tools/cmd/goimports
- name: Set env with list of directories in repo containin go code

3
.gitignore vendored
View file

@ -41,6 +41,3 @@ _testmain.go
# Example client build
examples/client
examples/jsonClient
# Use checksum database
go.sum

View file

@ -28,6 +28,7 @@ type Cluster struct {
ZK string `json:"zk"`
ZKPort int `json:"zk_port"`
SchedZKPath string `json:"scheduler_zk_path"`
MesosZKPath string `json:"mesos_zk_path"`
SchedURI string `json:"scheduler_uri"`
ProxyURL string `json:"proxy_url"`
AuthMechanism string `json:"auth_mechanism"`
@ -61,6 +62,7 @@ func GetDefaultClusterFromZKUrl(zkURL string) *Cluster {
AuthMechanism: "UNAUTHENTICATED",
ZK: zkURL,
SchedZKPath: "/aurora/scheduler",
MesosZKPath: "/mesos",
AgentRunDir: "latest",
AgentRoot: "/var/lib/mesos",
}

View file

@ -32,6 +32,7 @@ func TestLoadClusters(t *testing.T) {
assert.Equal(t, clusters["devcluster"].Name, "devcluster")
assert.Equal(t, clusters["devcluster"].ZK, "192.168.33.7")
assert.Equal(t, clusters["devcluster"].SchedZKPath, "/aurora/scheduler")
assert.Equal(t, clusters["devcluster"].MesosZKPath, "/mesos")
assert.Equal(t, clusters["devcluster"].AuthMechanism, "UNAUTHENTICATED")
assert.Equal(t, clusters["devcluster"].AgentRunDir, "latest")
assert.Equal(t, clusters["devcluster"].AgentRoot, "/var/lib/mesos")

View file

@ -14,7 +14,7 @@ services:
ipv4_address: 192.168.33.2
master:
image: rdelvalle/mesos-master:1.6.2
image: quay.io/aurorascheduler/mesos-master:1.9.0
restart: on-failure
ports:
- "5050:5050"
@ -32,7 +32,7 @@ services:
- zk
agent-one:
image: rdelvalle/mesos-agent:1.6.2
image: quay.io/aurorascheduler/mesos-agent:1.9.0
pid: host
restart: on-failure
ports:
@ -41,10 +41,11 @@ services:
MESOS_MASTER: zk://192.168.33.2:2181/mesos
MESOS_CONTAINERIZERS: docker,mesos
MESOS_PORT: 5051
MESOS_HOSTNAME: localhost
MESOS_HOSTNAME: agent-one
MESOS_RESOURCES: ports(*):[11000-11999]
MESOS_SYSTEMD_ENABLE_SUPPORT: 'false'
MESOS_WORK_DIR: /tmp/mesos
MESOS_ATTRIBUTES: 'host:agent-one;rack:1;zone:west'
networks:
aurora_cluster:
ipv4_address: 192.168.33.4
@ -55,8 +56,58 @@ services:
depends_on:
- zk
agent-two:
image: quay.io/aurorascheduler/mesos-agent:1.9.0
pid: host
restart: on-failure
ports:
- "5052:5051"
environment:
MESOS_MASTER: zk://192.168.33.2:2181/mesos
MESOS_CONTAINERIZERS: docker,mesos
MESOS_PORT: 5051
MESOS_HOSTNAME: agent-two
MESOS_RESOURCES: ports(*):[11000-11999]
MESOS_SYSTEMD_ENABLE_SUPPORT: 'false'
MESOS_WORK_DIR: /tmp/mesos
MESOS_ATTRIBUTES: 'host:agent-two;rack:2;zone:west'
networks:
aurora_cluster:
ipv4_address: 192.168.33.5
volumes:
- /sys/fs/cgroup:/sys/fs/cgroup
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
- zk
agent-three:
image: quay.io/aurorascheduler/mesos-agent:1.9.0
pid: host
restart: on-failure
ports:
- "5053:5051"
environment:
MESOS_MASTER: zk://192.168.33.2:2181/mesos
MESOS_CONTAINERIZERS: docker,mesos
MESOS_PORT: 5051
MESOS_HOSTNAME: agent-three
MESOS_RESOURCES: ports(*):[11000-11999]
MESOS_SYSTEMD_ENABLE_SUPPORT: 'false'
MESOS_WORK_DIR: /tmp/mesos
MESOS_ATTRIBUTES: 'host:agent-three;rack:2;zone:west;dedicated:vagrant/bar'
networks:
aurora_cluster:
ipv4_address: 192.168.33.6
volumes:
- /sys/fs/cgroup:/sys/fs/cgroup
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
- zk
aurora-one:
image: rdelvalle/aurora:0.22.0
image: quay.io/aurorascheduler/scheduler:0.25.0
pid: host
ports:
- "8081:8081"
@ -70,6 +121,7 @@ services:
-shiro_realm_modules=INI_AUTHNZ
-shiro_ini_path=/etc/aurora/security.ini
-min_required_instances_for_sla_check=1
-thermos_executor_cpu=0.09
volumes:
- ./.aurora-config:/etc/aurora
networks:

View file

@ -2,6 +2,7 @@
"name": "devcluster",
"zk": "192.168.33.7",
"scheduler_zk_path": "/aurora/scheduler",
"mesos_zk_path": "/mesos",
"auth_mechanism": "UNAUTHENTICATED",
"slave_run_directory": "latest",
"slave_root": "/var/lib/mesos"

2
go.mod
View file

@ -4,7 +4,7 @@ require (
github.com/apache/thrift v0.14.0
github.com/pkg/errors v0.9.1
github.com/samuel/go-zookeeper v0.0.0-20171117190445-471cd4e61d7a
github.com/stretchr/testify v1.5.0
github.com/stretchr/testify v1.7.0
)
go 1.16

22
go.sum Normal file
View file

@ -0,0 +1,22 @@
github.com/apache/thrift v0.14.0 h1:vqZ2DP42i8th2OsgCcYZkirtbzvpZEFx53LiWDJXIAs=
github.com/apache/thrift v0.14.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/samuel/go-zookeeper v0.0.0-20171117190445-471cd4e61d7a h1:EYL2xz/Zdo0hyqdZMXR4lmT2O11jDLTPCEqIe/FR6W4=
github.com/samuel/go-zookeeper v0.0.0-20171117190445-471cd4e61d7a/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.5.0 h1:DMOzIV76tmoDNE9pX6RSN0aDtCYeCg5VueieJaAo1uw=
github.com/stretchr/testify v1.5.0/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

23
helpers.go Normal file
View file

@ -0,0 +1,23 @@
package realis
import (
"context"
"github.com/aurora-scheduler/gorealis/v2/gen-go/apache/aurora"
)
func (r *Client) JobExists(key aurora.JobKey) (bool, error) {
resp, err := r.client.GetConfigSummary(context.TODO(), &key)
if err != nil {
return false, err
}
return resp != nil &&
resp.GetResult_() != nil &&
resp.GetResult_().GetConfigSummaryResult_() != nil &&
resp.GetResult_().GetConfigSummaryResult_().GetSummary() != nil &&
resp.GetResult_().GetConfigSummaryResult_().GetSummary().GetGroups() != nil &&
len(resp.GetResult_().GetConfigSummaryResult_().GetSummary().GetGroups()) > 0 &&
resp.GetResponseCode() == aurora.ResponseCode_OK,
nil
}

View file

@ -77,7 +77,7 @@ func (j *JobUpdate) BatchSize(size int32) *JobUpdate {
// Minimum number of seconds a shard must remain in RUNNING state before considered a success.
func (j *JobUpdate) WatchTime(timeout time.Duration) *JobUpdate {
j.request.Settings.MinWaitInInstanceRunningMs = int32(timeout.Seconds() * 1000)
j.request.Settings.MinWaitInInstanceRunningMs = int32(timeout.Milliseconds())
return j
}

View file

@ -245,7 +245,7 @@ func (c *Client) MonitorHostMaintenance(hosts []string,
}
}
// AutoPaused monitor is a special monitor for auto pause enabled batch updates. This monitor ensures that the update
// MonitorAutoPausedUpdate is a special monitor for auto pause enabled batch updates. This monitor ensures that the update
// being monitored is capable of auto pausing and has auto pausing enabled. After verifying this information,
// the monitor watches for the job to enter the ROLL_FORWARD_PAUSED state and calculates the current batch
// the update is in using information from the update configuration.
@ -294,8 +294,9 @@ func (c *Client) MonitorAutoPausedUpdate(key aurora.JobUpdateKey, interval, time
return -1, err
}
// Summary 0 is assumed to exist because MonitorJobUpdateQuery will return an error if there is Summaries
if summary[0].State.Status != aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED {
// Summary 0 is assumed to exist because MonitorJobUpdateQuery will return an error if there is no summaries
if !(summary[0].State.Status == aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED ||
summary[0].State.Status == aurora.JobUpdateStatus_ROLLED_FORWARD) {
return -1, errors.Errorf("update is in a terminal state %v", summary[0].State.Status)
}

434
offer.go Normal file
View file

@ -0,0 +1,434 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package realis
import (
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
"strings"
"github.com/aurora-scheduler/gorealis/v2/gen-go/apache/aurora"
)
// Offers on [aurora-scheduler]/offers endpoint
type Offer struct {
ID struct {
Value string `json:"value"`
} `json:"id"`
FrameworkID struct {
Value string `json:"value"`
} `json:"framework_id"`
AgentID struct {
Value string `json:"value"`
} `json:"agent_id"`
Hostname string `json:"hostname"`
URL struct {
Scheme string `json:"scheme"`
Address struct {
Hostname string `json:"hostname"`
IP string `json:"ip"`
Port int `json:"port"`
} `json:"address"`
Path string `json:"path"`
Query []interface{} `json:"query"`
} `json:"url"`
Resources []struct {
Name string `json:"name"`
Type string `json:"type"`
Ranges struct {
Range []struct {
Begin int `json:"begin"`
End int `json:"end"`
} `json:"range"`
} `json:"ranges,omitempty"`
Role string `json:"role"`
Reservations []interface{} `json:"reservations"`
Scalar struct {
Value float64 `json:"value"`
} `json:"scalar,omitempty"`
} `json:"resources"`
Attributes []struct {
Name string `json:"name"`
Type string `json:"type"`
Text struct {
Value string `json:"value"`
} `json:"text"`
} `json:"attributes"`
ExecutorIds []struct {
Value string `json:"value"`
} `json:"executor_ids"`
}
// hosts on [aurora-scheduler]/maintenance endpoint
type MaintenanceList struct {
Drained []string `json:"DRAINED"`
Scheduled []string `json:"SCHEDULED"`
Draining map[string][]string `json:"DRAINING"`
}
type OfferCount map[float64]int64
type OfferGroupReport map[string]OfferCount
type OfferReport map[string]OfferGroupReport
// MaintenanceHosts list all the hosts under maintenance
func (c *Client) MaintenanceHosts() ([]string, error) {
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: c.config.insecureSkipVerify},
}
request := &http.Client{Transport: tr}
resp, err := request.Get(fmt.Sprintf("%s/maintenance", c.GetSchedulerURL()))
if err != nil {
return nil, err
}
defer resp.Body.Close()
buf := new(bytes.Buffer)
if _, err := buf.ReadFrom(resp.Body); err != nil {
return nil, err
}
var list MaintenanceList
if err := json.Unmarshal(buf.Bytes(), &list); err != nil {
return nil, err
}
hosts := append(list.Drained, list.Scheduled...)
for drainingHost := range list.Draining {
hosts = append(hosts, drainingHost)
}
return hosts, nil
}
// Offers pulls data from /offers endpoint
func (c *Client) Offers() ([]Offer, error) {
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: c.config.insecureSkipVerify},
}
request := &http.Client{Transport: tr}
resp, err := request.Get(fmt.Sprintf("%s/offers", c.GetSchedulerURL()))
if err != nil {
return []Offer{}, err
}
defer resp.Body.Close()
buf := new(bytes.Buffer)
if _, err := buf.ReadFrom(resp.Body); err != nil {
return nil, err
}
var offers []Offer
if err := json.Unmarshal(buf.Bytes(), &offers); err != nil {
return []Offer{}, err
}
return offers, nil
}
// AvailOfferReport returns a detailed summary of offers available for use.
// For example, 2 nodes offer 32 cpus and 10 nodes offer 1 cpus.
func (c *Client) AvailOfferReport() (OfferReport, error) {
maintHosts, err := c.MaintenanceHosts()
if err != nil {
return nil, err
}
maintHostSet := map[string]bool{}
for _, h := range maintHosts {
maintHostSet[h] = true
}
// Get a list of offers
offers, err := c.Offers()
if err != nil {
return nil, err
}
report := OfferReport{}
for _, o := range offers {
if maintHostSet[o.Hostname] {
continue
}
group := "non-dedicated"
for _, a := range o.Attributes {
if a.Name == "dedicated" {
group = a.Text.Value
break
}
}
if _, ok := report[group]; !ok {
report[group] = map[string]OfferCount{}
}
for _, r := range o.Resources {
if _, ok := report[group][r.Name]; !ok {
report[group][r.Name] = OfferCount{}
}
val := 0.0
switch r.Type {
case "SCALAR":
val = r.Scalar.Value
case "RANGES":
for _, pr := range r.Ranges.Range {
val += float64(pr.End - pr.Begin + 1)
}
default:
return nil, fmt.Errorf("%s is not supported", r.Type)
}
report[group][r.Name][val]++
}
}
return report, nil
}
// FitTasks computes the number tasks can be fit in a list of offer
func (c *Client) FitTasks(taskConfig *aurora.TaskConfig, offers []Offer) (int64, error) {
// count the number of tasks per limit contraint: limit.name -> limit.value -> count
limitCounts := map[string]map[string]int64{}
for _, c := range taskConfig.Constraints {
if c.Constraint.Limit != nil {
limitCounts[c.Name] = map[string]int64{}
}
}
request := ResourcesToMap(taskConfig.Resources)
// validate resource request
if len(request) == 0 {
return -1, fmt.Errorf("Resource request %v must not be empty", request)
}
isValid := false
for _, resVal := range request {
if resVal > 0 {
isValid = true
break
}
}
if !isValid {
return -1, fmt.Errorf("Resource request %v is not valid", request)
}
// pull the list of hosts under maintenance
maintHosts, err := c.MaintenanceHosts()
if err != nil {
return -1, err
}
maintHostSet := map[string]bool{}
for _, h := range maintHosts {
maintHostSet[h] = true
}
numTasks := int64(0)
for _, o := range offers {
// skip the hosts under maintenance
if maintHostSet[o.Hostname] {
continue
}
numTasksPerOffer := int64(-1)
for resName, resVal := range request {
// skip as we can fit a infinite number of tasks with 0 demand.
if resVal == 0 {
continue
}
avail := 0.0
for _, r := range o.Resources {
if r.Name != resName {
continue
}
switch r.Type {
case "SCALAR":
avail = r.Scalar.Value
case "RANGES":
for _, pr := range r.Ranges.Range {
avail += float64(pr.End - pr.Begin + 1)
}
default:
return -1, fmt.Errorf("%s is not supported", r.Type)
}
}
numTasksPerResource := int64(avail / resVal)
if numTasksPerResource < numTasksPerOffer || numTasksPerOffer < 0 {
numTasksPerOffer = numTasksPerResource
}
}
numTasks += fitConstraints(taskConfig, &o, limitCounts, numTasksPerOffer)
}
return numTasks, nil
}
func fitConstraints(taskConfig *aurora.TaskConfig,
offer *Offer,
limitCounts map[string]map[string]int64,
numTasksPerOffer int64) int64 {
// check dedicated attributes vs. constraints
if !isDedicated(offer, taskConfig.Job.Role, taskConfig.Constraints) {
return 0
}
limitConstraints := []*aurora.Constraint{}
for _, c := range taskConfig.Constraints {
// look for corresponding attribute
attFound := false
for _, a := range offer.Attributes {
if a.Name == c.Name {
attFound = true
}
}
// constraint not found in offer's attributes
if !attFound {
return 0
}
if c.Constraint.Value != nil && !valueConstraint(offer, c) {
// value constraint is not satisfied
return 0
} else if c.Constraint.Limit != nil {
limitConstraints = append(limitConstraints, c)
limit := limitConstraint(offer, c, limitCounts)
if numTasksPerOffer > limit && limit >= 0 {
numTasksPerOffer = limit
}
}
}
// update limitCounts
for _, c := range limitConstraints {
for _, a := range offer.Attributes {
if a.Name == c.Name {
limitCounts[a.Name][a.Text.Value] += numTasksPerOffer
}
}
}
return numTasksPerOffer
}
func isDedicated(offer *Offer, role string, constraints []*aurora.Constraint) bool {
// get all dedicated attributes of an offer
dedicatedAtts := map[string]bool{}
for _, a := range offer.Attributes {
if a.Name == "dedicated" {
dedicatedAtts[a.Text.Value] = true
}
}
if len(dedicatedAtts) == 0 {
return true
}
// check if constraints are matching dedicated attributes
matched := false
for _, c := range constraints {
if c.Name == "dedicated" && c.Constraint.Value != nil {
found := false
for _, v := range c.Constraint.Value.Values {
if dedicatedAtts[v] && strings.HasPrefix(v, fmt.Sprintf("%s/", role)) {
found = true
break
}
}
if found {
matched = true
} else {
return false
}
}
}
return matched
}
// valueConstraint checks Value Contraints of task if the are matched by the offer.
// more details can be found here https://aurora.apache.org/documentation/latest/features/constraints/
func valueConstraint(offer *Offer, constraint *aurora.Constraint) bool {
matched := false
for _, a := range offer.Attributes {
if a.Name == constraint.Name {
for _, v := range constraint.Constraint.Value.Values {
matched = (a.Text.Value == v && !constraint.Constraint.Value.Negated) ||
(a.Text.Value != v && constraint.Constraint.Value.Negated)
if matched {
break
}
}
if matched {
break
}
}
}
return matched
}
// limitConstraint limits the number of pods on a group which has the same attribute.
// more details can be found here https://aurora.apache.org/documentation/latest/features/constraints/
func limitConstraint(offer *Offer, constraint *aurora.Constraint, limitCounts map[string]map[string]int64) int64 {
limit := int64(-1)
for _, a := range offer.Attributes {
// limit constraint found
if a.Name == constraint.Name {
curr := limitCounts[a.Name][a.Text.Value]
currLimit := int64(constraint.Constraint.Limit.Limit)
if curr >= currLimit {
return 0
}
if currLimit-curr < limit || limit < 0 {
limit = currLimit - curr
}
}
}
return limit
}

244
realis.go
View file

@ -36,7 +36,7 @@ import (
"github.com/pkg/errors"
)
const VERSION = "2.22.1"
const VERSION = "2.28.0"
type Client struct {
config *clientConfig
@ -147,6 +147,8 @@ func NewClient(options ...ClientOption) (*Client, error) {
return nil, errors.New("incomplete Options -- url, cluster.json, or Zookeeper address required")
}
config.url = url
url, err = validateAuroraAddress(url)
if err != nil {
return nil, errors.Wrap(err, "unable to create realis object, invalid url")
@ -313,11 +315,13 @@ func (c *Client) GetInstanceIds(key aurora.JobKey, states []aurora.ScheduleStatu
Statuses: states,
}
c.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", taskQ)
c.logger.DebugPrintf("GetInstanceIds Thrift Payload: %+v\n", taskQ)
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
return c.client.GetTasksWithoutConfigs(context.TODO(), taskQ)
})
},
nil,
)
// If we encountered an error we couldn't recover from by retrying, return an error to the user
if retryErr != nil {
@ -339,8 +343,13 @@ func (c *Client) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
return c.readonlyClient.GetJobUpdateSummaries(context.TODO(), jobUpdateQuery)
})
},
nil,
)
if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetGetJobUpdateSummariesResult_() == nil {
return nil, errors.New("unexpected response from scheduler")
}
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error getting job update summaries from Aurora Scheduler")
}
@ -352,8 +361,12 @@ func (c *Client) GetJobSummary(role string) (*aurora.JobSummaryResult_, error) {
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
return c.readonlyClient.GetJobSummary(context.TODO(), role)
})
},
nil,
)
if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetJobSummaryResult_() == nil {
return nil, errors.New("unexpected response from scheduler")
}
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error getting job summaries from Aurora Scheduler")
}
@ -363,21 +376,20 @@ func (c *Client) GetJobSummary(role string) (*aurora.JobSummaryResult_, error) {
func (c *Client) GetJobs(role string) (*aurora.GetJobsResult_, error) {
var result *aurora.GetJobsResult_
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
return c.readonlyClient.GetJobs(context.TODO(), role)
})
},
nil,
)
if retryErr != nil {
return result, errors.Wrap(retryErr, "error getting Jobs from Aurora Scheduler")
return nil, errors.Wrap(retryErr, "error getting Jobs from Aurora Scheduler")
}
if resp == nil || resp.GetResult_() == nil {
return nil, errors.New("unexpected response from scheduler")
}
if resp.GetResult_() != nil {
result = resp.GetResult_().GetJobsResult_
}
return result, nil
return resp.GetResult_().GetJobsResult_, nil
}
// Kill specific instances of a job. Returns true, nil if a task was actually killed as a result of this API call.
@ -387,19 +399,19 @@ func (c *Client) KillInstances(key aurora.JobKey, instances ...int32) (bool, err
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
return c.client.KillTasks(context.TODO(), &key, instances, "")
})
},
nil,
)
if retryErr != nil {
return false, errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler")
}
if len(resp.GetDetails()) > 0 {
if resp == nil || len(resp.GetDetails()) > 0 {
c.logger.Println("KillTasks was called but no tasks killed as a result.")
return false, nil
} else {
return true, nil
}
return true, nil
}
func (c *Client) RealisConfig() *clientConfig {
@ -414,7 +426,9 @@ func (c *Client) KillJob(key aurora.JobKey) error {
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
// Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards
return c.client.KillTasks(context.TODO(), &key, nil, "")
})
},
nil,
)
if retryErr != nil {
return errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler")
@ -436,9 +450,27 @@ func (c *Client) CreateJob(auroraJob *AuroraJob) error {
return errors.Wrap(err, "unable to create Thermos payload")
}
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
return c.client.CreateJob(context.TODO(), auroraJob.JobConfig())
})
// Response is checked by the thrift retry code
_, retryErr := c.thriftCallWithRetries(
false,
func() (*aurora.Response, error) {
return c.client.CreateJob(context.TODO(), auroraJob.JobConfig())
},
// On a client timeout, attempt to verify that payload made to the Scheduler by
// trying to get the config summary for the job key
func() (*aurora.Response, bool) {
exists, err := c.JobExists(auroraJob.JobKey())
if err != nil {
c.logger.Print("verification failed ", err)
}
if exists {
return &aurora.Response{ResponseCode: aurora.ResponseCode_OK}, true
}
return nil, false
},
)
if retryErr != nil {
return errors.Wrap(retryErr, "error sending Create command to Aurora Scheduler")
@ -469,7 +501,9 @@ func (c *Client) ScheduleCronJob(auroraJob *AuroraJob) error {
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
return c.client.ScheduleCronJob(context.TODO(), auroraJob.JobConfig())
})
},
nil,
)
if retryErr != nil {
return errors.Wrap(retryErr, "error sending Cron AuroraJob Schedule message to Aurora Scheduler")
@ -483,7 +517,9 @@ func (c *Client) DescheduleCronJob(key aurora.JobKey) error {
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
return c.client.DescheduleCronJob(context.TODO(), &key)
})
},
nil,
)
if retryErr != nil {
return errors.Wrap(retryErr, "error sending Cron AuroraJob De-schedule message to Aurora Scheduler")
@ -499,7 +535,9 @@ func (c *Client) StartCronJob(key aurora.JobKey) error {
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
return c.client.StartCronJob(context.TODO(), &key)
})
},
nil,
)
if retryErr != nil {
return errors.Wrap(retryErr, "error sending Start Cron AuroraJob message to Aurora Scheduler")
@ -514,7 +552,9 @@ func (c *Client) RestartInstances(key aurora.JobKey, instances ...int32) error {
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
return c.client.RestartShards(context.TODO(), &key, instances)
})
},
nil,
)
if retryErr != nil {
return errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler")
@ -535,16 +575,17 @@ func (c *Client) RestartJob(key aurora.JobKey) error {
if len(instanceIds) > 0 {
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
return c.client.RestartShards(context.TODO(), &key, instanceIds)
})
},
nil,
)
if retryErr != nil {
return errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler")
}
return nil
} else {
return errors.New("no tasks in the Active state")
}
return errors.New("no tasks in the Active state")
}
// Update all tasks under a job configuration. Currently gorealis doesn't support for canary deployments.
@ -556,34 +597,80 @@ func (c *Client) StartJobUpdate(updateJob *JobUpdate, message string) (*aurora.S
c.logger.DebugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message)
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
return c.client.StartJobUpdate(nil, updateJob.request, message)
})
resp, retryErr := c.thriftCallWithRetries(false,
func() (*aurora.Response, error) {
return c.client.StartJobUpdate(context.TODO(), updateJob.request, message)
},
func() (*aurora.Response, bool) {
key := updateJob.JobKey()
summariesResp, err := c.readonlyClient.GetJobUpdateSummaries(
context.TODO(),
&aurora.JobUpdateQuery{
JobKey: &key,
UpdateStatuses: aurora.ACTIVE_JOB_UPDATE_STATES,
Limit: 1,
})
if err != nil {
c.logger.Print("verification failed ", err)
return nil, false
}
summaries := response.JobUpdateSummaries(summariesResp)
if len(summaries) == 0 {
return nil, false
}
return &aurora.Response{
ResponseCode: aurora.ResponseCode_OK,
Result_: &aurora.Result_{
StartJobUpdateResult_: &aurora.StartJobUpdateResult_{
UpdateSummary: summaries[0],
Key: summaries[0].Key,
},
},
}, true
},
)
if retryErr != nil {
// A timeout took place when attempting this call, attempt to recover
if IsTimeout(retryErr) {
return nil, retryErr
}
return nil, errors.Wrap(retryErr, "error sending StartJobUpdate command to Aurora Scheduler")
}
if resp.GetResult_() != nil && resp.GetResult_().GetStartJobUpdateResult_() != nil {
return resp.GetResult_().GetStartJobUpdateResult_(), nil
if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetStartJobUpdateResult_() == nil {
return nil, errors.New("unexpected response from scheduler")
}
return nil, errors.New("thrift error: Field in response is nil unexpectedly.")
return resp.GetResult_().GetStartJobUpdateResult_(), nil
}
// Abort AuroraJob Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI.
// AbortJobUpdate terminates a job update in the scheduler.
// It requires the updateId which can be obtained on the Aurora web UI.
// This API is meant to be synchronous. It will attempt to wait until the update transitions to the aborted state.
// However, if the job update does not transition to the ABORT state an error will be returned.
func (c *Client) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) error {
c.logger.DebugPrintf("AbortJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
return c.client.AbortJobUpdate(context.TODO(), &updateKey, message)
})
},
nil,
)
if retryErr != nil {
return errors.Wrap(retryErr, "error sending AbortJobUpdate command to Aurora Scheduler")
}
return nil
// Make this call synchronous by blocking until it job has successfully transitioned to aborted
_, err := c.MonitorJobUpdateStatus(
updateKey,
[]aurora.JobUpdateStatus{aurora.JobUpdateStatus_ABORTED},
time.Second*5,
time.Minute)
return err
}
// Pause AuroraJob Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.
@ -603,7 +690,9 @@ func (c *Client) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string)
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
return c.client.PauseJobUpdate(nil, updateKeyLocal, message)
})
},
nil,
)
if retryErr != nil {
return errors.Wrap(retryErr, "error sending PauseJobUpdate command to Aurora Scheduler")
@ -630,7 +719,9 @@ func (c *Client) ResumeJobUpdate(updateKey aurora.JobUpdateKey, message string)
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
return c.client.ResumeJobUpdate(context.TODO(), &updateKey, message)
})
},
nil,
)
if retryErr != nil {
return errors.Wrap(retryErr, "error sending ResumeJobUpdate command to Aurora Scheduler")
@ -651,18 +742,19 @@ func (c *Client) PulseJobUpdate(updateKey aurora.JobUpdateKey) (aurora.JobUpdate
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
return c.client.PulseJobUpdate(context.TODO(), &updateKey)
})
},
nil,
)
if retryErr != nil {
return aurora.JobUpdatePulseStatus(0), errors.Wrap(retryErr, "error sending PulseJobUpdate command to Aurora Scheduler")
}
if resp.GetResult_() != nil && resp.GetResult_().GetPulseJobUpdateResult_() != nil {
return resp.GetResult_().GetPulseJobUpdateResult_().GetStatus(), nil
} else {
return aurora.JobUpdatePulseStatus(0), errors.New("thrift error, field was nil unexpectedly")
if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetPulseJobUpdateResult_() == nil {
return aurora.JobUpdatePulseStatus(0), errors.New("unexpected response from scheduler")
}
return resp.GetResult_().GetPulseJobUpdateResult_().GetStatus(), nil
}
// Scale up the number of instances under a job configuration using the configuration for specific
@ -679,7 +771,9 @@ func (c *Client) AddInstances(instKey aurora.InstanceKey, count int32) error {
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
return c.client.AddInstances(context.TODO(), &instKey, count)
})
},
nil,
)
if retryErr != nil {
return errors.Wrap(retryErr, "error sending AddInstances command to Aurora Scheduler")
@ -724,11 +818,16 @@ func (c *Client) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
return c.client.GetTasksStatus(context.TODO(), query)
})
},
nil,
)
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status")
}
if resp == nil {
return nil, errors.New("unexpected response from scheduler")
}
return response.ScheduleStatusResult(resp).GetTasks(), nil
}
@ -740,29 +839,32 @@ func (c *Client) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.PendingRea
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
return c.client.GetPendingReason(context.TODO(), query)
})
},
nil,
)
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for pending Reasons")
}
var result []*aurora.PendingReason
if resp.GetResult_() != nil {
result = resp.GetResult_().GetGetPendingReasonResult_().GetReasons()
if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetGetPendingReasonResult_() == nil {
return nil, errors.New("unexpected response from scheduler")
}
return result, nil
return resp.GetResult_().GetGetPendingReasonResult_().GetReasons(), nil
}
// Get information about task including without a task configuration object
// GetTasksWithoutConfigs gets information about task including without a task configuration object.
// This is a more lightweight version of GetTaskStatus but contains less information as a result.
func (c *Client) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) {
c.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query)
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
return c.client.GetTasksWithoutConfigs(context.TODO(), query)
})
},
nil,
)
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status without configs")
@ -789,7 +891,9 @@ func (c *Client) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
return c.client.GetTasksStatus(context.TODO(), taskQ)
})
},
nil,
)
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task configuration")
@ -815,17 +919,19 @@ func (c *Client) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) ([]*aurora.
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
return c.client.GetJobUpdateDetails(context.TODO(), &updateQuery)
})
},
nil,
)
if retryErr != nil {
return nil, errors.Wrap(retryErr, "unable to get job update details")
}
if resp.GetResult_() != nil && resp.GetResult_().GetGetJobUpdateDetailsResult_() != nil {
return resp.GetResult_().GetGetJobUpdateDetailsResult_().GetDetailsList(), nil
} else {
return nil, errors.New("unknown Thrift error, field is nil.")
if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetGetJobUpdateDetailsResult_() == nil {
return nil, errors.New("unexpected response from scheduler")
}
return resp.GetResult_().GetGetJobUpdateDetailsResult_().GetDetailsList(), nil
}
func (c *Client) RollbackJobUpdate(key aurora.JobUpdateKey, message string) error {
@ -834,10 +940,16 @@ func (c *Client) RollbackJobUpdate(key aurora.JobUpdateKey, message string) erro
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
return c.client.RollbackJobUpdate(context.TODO(), &key, message)
})
},
nil,
)
if retryErr != nil {
return errors.Wrap(retryErr, "unable to roll back job update")
}
return nil
}
func (c *Client) GetSchedulerURL() string {
return c.config.url
}

View file

@ -37,17 +37,19 @@ func (c *Client) DrainHosts(hosts ...string) ([]*aurora.HostStatus, error) {
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
return c.adminClient.DrainHosts(context.TODO(), drainList)
})
},
nil,
)
if retryErr != nil {
return nil, errors.Wrap(retryErr, "unable to recover connection")
}
if resp.GetResult_() != nil && resp.GetResult_().GetDrainHostsResult_() != nil {
return resp.GetResult_().GetDrainHostsResult_().GetStatuses(), nil
} else {
return nil, errors.New("thrift error: Field in response is nil unexpectedly.")
if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetDrainHostsResult_() == nil {
return nil, errors.New("unexpected response from scheduler")
}
return resp.GetResult_().GetDrainHostsResult_().GetStatuses(), nil
}
// Start SLA Aware Drain.
@ -59,6 +61,18 @@ func (c *Client) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, hosts ..
return nil, errors.New("no hosts provided to drain")
}
if policy == nil || policy.CountSetFieldsSlaPolicy() == 0 {
policy = &defaultSlaPolicy
c.logger.Printf("Warning: start draining with default sla policy %v", policy)
}
if timeout < 0 {
c.logger.Printf("Warning: timeout %d secs is invalid, draining with default timeout %d secs",
timeout,
defaultSlaDrainTimeoutSecs)
timeout = defaultSlaDrainTimeoutSecs
}
drainList := aurora.NewHosts()
drainList.HostNames = hosts
@ -66,17 +80,19 @@ func (c *Client) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, hosts ..
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
return c.adminClient.SlaDrainHosts(context.TODO(), drainList, policy, timeout)
})
},
nil,
)
if retryErr != nil {
return nil, errors.Wrap(retryErr, "unable to recover connection")
}
if resp.GetResult_() != nil && resp.GetResult_().GetDrainHostsResult_() != nil {
return resp.GetResult_().GetDrainHostsResult_().GetStatuses(), nil
} else {
return nil, errors.New("thrift error: Field in response is nil unexpectedly.")
if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetDrainHostsResult_() == nil {
return nil, errors.New("unexpected response from scheduler")
}
return resp.GetResult_().GetDrainHostsResult_().GetStatuses(), nil
}
func (c *Client) StartMaintenance(hosts ...string) ([]*aurora.HostStatus, error) {
@ -92,17 +108,19 @@ func (c *Client) StartMaintenance(hosts ...string) ([]*aurora.HostStatus, error)
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
return c.adminClient.StartMaintenance(context.TODO(), hostList)
})
},
nil,
)
if retryErr != nil {
return nil, errors.Wrap(retryErr, "unable to recover connection")
}
if resp.GetResult_() != nil && resp.GetResult_().GetStartMaintenanceResult_() != nil {
return resp.GetResult_().GetStartMaintenanceResult_().GetStatuses(), nil
} else {
return nil, errors.New("thrift error: Field in response is nil unexpectedly.")
if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetStartMaintenanceResult_() == nil {
return nil, errors.New("unexpected response from scheduler")
}
return resp.GetResult_().GetStartMaintenanceResult_().GetStatuses(), nil
}
func (c *Client) EndMaintenance(hosts ...string) ([]*aurora.HostStatus, error) {
@ -118,24 +136,20 @@ func (c *Client) EndMaintenance(hosts ...string) ([]*aurora.HostStatus, error) {
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
return c.adminClient.EndMaintenance(context.TODO(), hostList)
})
},
nil,
)
if retryErr != nil {
return nil, errors.Wrap(retryErr, "unable to recover connection")
}
if resp.GetResult_() != nil && resp.GetResult_().GetEndMaintenanceResult_() != nil {
return resp.GetResult_().GetEndMaintenanceResult_().GetStatuses(), nil
} else {
return nil, errors.New("thrift error: Field in response is nil unexpectedly.")
if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetEndMaintenanceResult_() == nil {
return nil, errors.New("unexpected response from scheduler")
}
return resp.GetResult_().GetEndMaintenanceResult_().GetStatuses(), nil
}
func (c *Client) MaintenanceStatus(hosts ...string) (*aurora.MaintenanceStatusResult_, error) {
var result *aurora.MaintenanceStatusResult_
if len(hosts) == 0 {
return nil, errors.New("no hosts provided to get maintenance status from")
}
@ -149,17 +163,18 @@ func (c *Client) MaintenanceStatus(hosts ...string) (*aurora.MaintenanceStatusRe
// and continue trying to resend command until we run out of retries.
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
return c.adminClient.MaintenanceStatus(context.TODO(), hostList)
})
},
nil,
)
if retryErr != nil {
return result, errors.Wrap(retryErr, "unable to recover connection")
return nil, errors.Wrap(retryErr, "unable to recover connection")
}
if resp == nil || resp.GetResult_() == nil {
return nil, errors.New("unexpected response from scheduler")
}
if resp.GetResult_() != nil {
result = resp.GetResult_().GetMaintenanceStatusResult_()
}
return result, nil
return resp.GetResult_().GetMaintenanceStatusResult_(), nil
}
// SetQuota sets a quota aggregate for the given role
@ -177,7 +192,9 @@ func (c *Client) SetQuota(role string, cpu *float64, ramMb *int64, diskMb *int64
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
return c.adminClient.SetQuota(context.TODO(), role, quota)
})
},
nil,
)
if retryErr != nil {
return errors.Wrap(retryErr, "unable to set role quota")
@ -191,17 +208,18 @@ func (c *Client) GetQuota(role string) (*aurora.GetQuotaResult_, error) {
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
return c.adminClient.GetQuota(context.TODO(), role)
})
},
nil,
)
if retryErr != nil {
return nil, errors.Wrap(retryErr, "unable to get role quota")
}
if resp.GetResult_() != nil {
return resp.GetResult_().GetGetQuotaResult_(), nil
} else {
return nil, errors.New("thrift error: Field in response is nil unexpectedly.")
if resp == nil || resp.GetResult_() == nil {
return nil, errors.New("unexpected response from scheduler")
}
return resp.GetResult_().GetGetQuotaResult_(), nil
}
// Force Aurora Scheduler to perform a snapshot and write to Mesos log
@ -209,7 +227,9 @@ func (c *Client) Snapshot() error {
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
return c.adminClient.Snapshot(context.TODO())
})
},
nil,
)
if retryErr != nil {
return errors.Wrap(retryErr, "unable to recover connection")
@ -223,7 +243,9 @@ func (c *Client) PerformBackup() error {
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
return c.adminClient.PerformBackup(context.TODO())
})
},
nil,
)
if retryErr != nil {
return errors.Wrap(retryErr, "unable to recover connection")
@ -237,7 +259,9 @@ func (c *Client) ForceImplicitTaskReconciliation() error {
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
return c.adminClient.TriggerImplicitTaskReconciliation(context.TODO())
})
},
nil,
)
if retryErr != nil {
return errors.Wrap(retryErr, "unable to recover connection")
@ -258,7 +282,9 @@ func (c *Client) ForceExplicitTaskReconciliation(batchSize *int32) error {
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
return c.adminClient.TriggerExplicitTaskReconciliation(context.TODO(), settings)
})
},
nil,
)
if retryErr != nil {
return errors.Wrap(retryErr, "unable to recover connection")

View file

@ -19,6 +19,7 @@ import (
"time"
"github.com/apache/thrift/lib/go/thrift"
"github.com/aurora-scheduler/gorealis/v2/gen-go/apache/aurora"
)
type clientConfig struct {
@ -48,6 +49,15 @@ var defaultBackoff = Backoff{
Jitter: 0.1,
}
var defaultSlaPolicy = aurora.SlaPolicy{
PercentageSlaPolicy: &aurora.PercentageSlaPolicy{
Percentage: 66,
DurationSecs: 300,
},
}
const defaultSlaDrainTimeoutSecs = 900
type TransportProtocol int
const (

View file

@ -94,7 +94,7 @@ func TestBadCredentials(t *testing.T) {
job := realis.NewJob().
Environment("prod").
Role("vagrant").
Name("create_thermos_job_test").
Name("create_thermos_job_bad_creds_test").
ThermosExecutor(thermosExec).
CPU(.5).
RAM(64).
@ -180,6 +180,35 @@ func TestLeaderFromZK(t *testing.T) {
assert.Equal(t, "http://192.168.33.7:8081", url)
}
func TestMasterFromZK(t *testing.T) {
cluster := realis.GetDefaultClusterFromZKUrl("192.168.33.2:2181")
masterNodesMap, err := realis.MasterNodesFromZK(*cluster)
assert.NoError(t, err)
for _, hostnames := range masterNodesMap {
for _, hostname := range hostnames {
assert.NoError(t, err)
assert.Equal(t, "192.168.33.7", hostname)
}
}
}
func TestMesosMasterFromZK(t *testing.T) {
cluster := realis.GetDefaultClusterFromZKUrl("192.168.33.2:2181")
masterNodesMap, err := realis.MesosMasterNodesFromZK(*cluster)
assert.NoError(t, err)
for _, hostnames := range masterNodesMap {
for _, hostname := range hostnames {
assert.NoError(t, err)
assert.Equal(t, "localhost", hostname)
}
}
}
func TestInvalidAuroraURL(t *testing.T) {
for _, url := range []string{
"http://doesntexist.com:8081/apitest",
@ -209,7 +238,6 @@ func TestValidAuroraURL(t *testing.T) {
}
func TestRealisClient_ReestablishConn(t *testing.T) {
// Test that we're able to tear down the old connection and create a new one.
err := r.ReestablishConn()
@ -220,11 +248,9 @@ func TestGetCACerts(t *testing.T) {
certs, err := realis.GetCerts("./examples/certs")
assert.NoError(t, err)
assert.Equal(t, len(certs.Subjects()), 2)
}
func TestRealisClient_CreateJob_Thermos(t *testing.T) {
role := "vagrant"
job := realis.NewJob().
Environment("prod").
@ -251,7 +277,7 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) {
// Fetch all Jobs
result, err := r.GetJobs(role)
fmt.Printf("GetJobs length: %+v \n", len(result.Configs))
fmt.Println("GetJobs length: ", len(result.Configs))
assert.Len(t, result.Configs, 1)
assert.NoError(t, err)
@ -272,7 +298,7 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) {
err := r.KillJob(job.JobKey())
assert.NoError(t, err)
success, err := r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 60*time.Second)
success, err := r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 90*time.Second)
assert.True(t, success)
assert.NoError(t, err)
})
@ -280,7 +306,6 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) {
// Test configuring an executor that doesn't exist for CreateJob API
func TestRealisClient_CreateJob_ExecutorDoesNotExist(t *testing.T) {
// Create a single job
job := realis.NewJob().
Environment("prod").
@ -299,7 +324,6 @@ func TestRealisClient_CreateJob_ExecutorDoesNotExist(t *testing.T) {
// Test configuring an executor that doesn't exist for CreateJob API
func TestRealisClient_GetPendingReason(t *testing.T) {
env := "prod"
role := "vagrant"
name := "pending_reason_test"
@ -330,10 +354,13 @@ func TestRealisClient_GetPendingReason(t *testing.T) {
err = r.KillJob(job.JobKey())
assert.NoError(t, err)
success, err := r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 90*time.Second)
assert.True(t, success)
assert.NoError(t, err)
}
func TestRealisClient_CreateService_WithPulse_Thermos(t *testing.T) {
fmt.Println("Creating service")
role := "vagrant"
job := realis.NewJobUpdate().
@ -416,6 +443,10 @@ pulseLoop:
err = r.KillJob(job.JobKey())
assert.NoError(t, err)
success, err := r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 90*time.Second)
assert.True(t, success)
assert.NoError(t, err)
}
// Test configuring an executor that doesn't exist for CreateJob API
@ -446,13 +477,15 @@ func TestRealisClient_CreateService(t *testing.T) {
var ok bool
var mErr error
if ok, mErr = r.MonitorJobUpdate(*result.GetKey(), 5*time.Second, 4*time.Minute); !ok || mErr != nil {
// Update may already be in a terminal state so don't check for error
err := r.AbortJobUpdate(*result.GetKey(), "Monitor timed out.")
if result != nil {
if ok, mErr = r.MonitorJobUpdate(*result.GetKey(), 5*time.Second, 4*time.Minute); !ok || mErr != nil {
// Update may already be in a terminal state so don't check for error
err := r.AbortJobUpdate(*result.GetKey(), "Monitor timed out.")
err = r.KillJob(job.JobKey())
err = r.KillJob(job.JobKey())
assert.NoError(t, err)
assert.NoError(t, err)
}
}
assert.True(t, ok)
@ -460,7 +493,10 @@ func TestRealisClient_CreateService(t *testing.T) {
// Kill task test task after confirming it came up fine
err = r.KillJob(job.JobKey())
assert.NoError(t, err)
success, err := r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 90*time.Second)
assert.True(t, success)
assert.NoError(t, err)
}
@ -519,10 +555,17 @@ func TestRealisClient_ScheduleCronJob_Thermos(t *testing.T) {
t.Run("TestRealisClient_DeschedulerCronJob_Thermos", func(t *testing.T) {
err := r.DescheduleCronJob(job.JobKey())
assert.NoError(t, err)
err = r.KillJob(job.JobKey())
assert.NoError(t, err)
success, err := r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 90*time.Second)
assert.True(t, success)
assert.NoError(t, err)
})
}
func TestRealisClient_StartMaintenance(t *testing.T) {
hosts := []string{"localhost"}
hosts := []string{"agent-one"}
_, err := r.StartMaintenance(hosts...)
assert.NoError(t, err)
@ -532,7 +575,7 @@ func TestRealisClient_StartMaintenance(t *testing.T) {
[]aurora.MaintenanceMode{aurora.MaintenanceMode_SCHEDULED},
1*time.Second,
50*time.Second)
assert.Equal(t, map[string]bool{"localhost": true}, hostResults)
assert.Equal(t, map[string]bool{"agent-one": true}, hostResults)
assert.NoError(t, err)
_, err = r.EndMaintenance(hosts...)
@ -548,7 +591,7 @@ func TestRealisClient_StartMaintenance(t *testing.T) {
}
func TestRealisClient_DrainHosts(t *testing.T) {
hosts := []string{"localhost"}
hosts := []string{"agent-one"}
_, err := r.DrainHosts(hosts...)
assert.NoError(t, err)
@ -558,7 +601,7 @@ func TestRealisClient_DrainHosts(t *testing.T) {
[]aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING},
1*time.Second,
50*time.Second)
assert.Equal(t, map[string]bool{"localhost": true}, hostResults)
assert.Equal(t, map[string]bool{"agent-one": true}, hostResults)
assert.NoError(t, err)
t.Run("TestRealisClient_MonitorNontransitioned", func(t *testing.T) {
@ -571,7 +614,7 @@ func TestRealisClient_DrainHosts(t *testing.T) {
// Assert monitor returned an error that was not nil, and also a list of the non-transitioned hosts
assert.Error(t, err)
assert.Equal(t, map[string]bool{"localhost": true, "IMAGINARY_HOST": false}, hostResults)
assert.Equal(t, map[string]bool{"agent-one": true, "IMAGINARY_HOST": false}, hostResults)
})
t.Run("TestRealisClient_EndMaintenance", func(t *testing.T) {
@ -590,7 +633,7 @@ func TestRealisClient_DrainHosts(t *testing.T) {
}
func TestRealisClient_SLADrainHosts(t *testing.T) {
hosts := []string{"localhost"}
hosts := []string{"agent-one"}
policy := aurora.SlaPolicy{PercentageSlaPolicy: &aurora.PercentageSlaPolicy{Percentage: 50.0}}
_, err := r.SLADrainHosts(&policy, 30, hosts...)
@ -605,7 +648,7 @@ func TestRealisClient_SLADrainHosts(t *testing.T) {
[]aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING},
1*time.Second,
50*time.Second)
assert.Equal(t, map[string]bool{"localhost": true}, hostResults)
assert.Equal(t, map[string]bool{"agent-one": true}, hostResults)
assert.NoError(t, err)
_, err = r.EndMaintenance(hosts...)
@ -618,6 +661,39 @@ func TestRealisClient_SLADrainHosts(t *testing.T) {
5*time.Second,
10*time.Second)
assert.NoError(t, err)
// slaDrainHosts goes with default policy if no policy is specified
_, err = r.SLADrainHosts(nil, 30, hosts...)
if err != nil {
fmt.Printf("error: %+v\n", err.Error())
os.Exit(1)
}
hostResults, err = r.MonitorHostMaintenance(
hosts,
[]aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING},
1*time.Second,
50*time.Second)
assert.Equal(t, map[string]bool{"agent-one": true}, hostResults)
assert.NoError(t, err)
_, err = r.EndMaintenance(hosts...)
assert.NoError(t, err)
_, err = r.SLADrainHosts(&aurora.SlaPolicy{}, 30, hosts...)
if err != nil {
fmt.Printf("error: %+v\n", err.Error())
os.Exit(1)
}
hostResults, err = r.MonitorHostMaintenance(
hosts,
[]aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING},
1*time.Second,
50*time.Second)
assert.Equal(t, map[string]bool{"agent-one": true}, hostResults)
assert.NoError(t, err)
_, err = r.EndMaintenance(hosts...)
assert.NoError(t, err)
}
// Test multiple go routines using a single connection
@ -654,6 +730,9 @@ func TestRealisClient_SessionThreadSafety(t *testing.T) {
err = r.KillJob(job.JobKey())
assert.NoError(t, err)
success, err = r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 90*time.Second)
assert.True(t, success)
assert.NoError(t, err)
}()
}
@ -740,6 +819,12 @@ func TestRealisClient_PartitionPolicy(t *testing.T) {
assert.NoError(t, err)
}
err = r.KillJob(job.JobKey())
assert.NoError(t, err)
success, err := r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 90*time.Second)
assert.True(t, success)
assert.NoError(t, err)
}
func TestRealisClient_UpdateStrategies(t *testing.T) {
@ -804,6 +889,10 @@ func TestRealisClient_UpdateStrategies(t *testing.T) {
assert.NoError(t, r.AbortJobUpdate(key, "Monitor timed out."))
}
assert.NoError(t, r.KillJob(strategy.jobUpdate.JobKey()))
success, err := r.MonitorInstances(strategy.jobUpdate.JobKey(), 0, 1*time.Second, 90*time.Second)
assert.True(t, success)
assert.NoError(t, err)
})
}
}
@ -813,18 +902,15 @@ func TestRealisClient_BatchAwareAutoPause(t *testing.T) {
job := realis.NewJob().
Environment("prod").
Role("vagrant").
Name("BatchAwareAutoPauseTest").
Name("batch_aware_auto_pause_test").
ThermosExecutor(thermosExec).
CPU(.01).
RAM(4).
Disk(10).
InstanceCount(6).
IsService(true).
Production(false).
Tier("preemptible").
Priority(0)
IsService(true)
updateGroups := []int32{1, 2, 3}
updateGroups := []int32{1, 3}
strategy := realis.JobUpdateFromAuroraTask(job.AuroraTask()).
VariableBatchStrategy(true, updateGroups...).
InstanceCount(6).
@ -837,22 +923,32 @@ func TestRealisClient_BatchAwareAutoPause(t *testing.T) {
key := *result.GetKey()
for i := range updateGroups {
curStep, mErr := r.MonitorAutoPausedUpdate(key, time.Second*5, time.Second*240)
curStep, mErr := r.MonitorAutoPausedUpdate(key, time.Second*5, time.Minute*5)
if mErr != nil {
fmt.Println(mErr)
// Update may already be in a terminal state so don't check for error
assert.NoError(t, r.AbortJobUpdate(key, "Monitor timed out."))
_ = r.AbortJobUpdate(key, "Monitor timed out.")
}
assert.Equal(t, i, curStep)
require.NoError(t, r.ResumeJobUpdate(key, "auto resuming test"))
if i != len(updateGroups)-1 {
require.NoError(t, err)
require.NoError(t, r.ResumeJobUpdate(key, "auto resuming test"))
}
}
assert.NoError(t, r.AbortJobUpdate(key, ""))
assert.NoError(t, r.KillJob(strategy.JobKey()))
success, err := r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 90*time.Second)
assert.True(t, success)
assert.NoError(t, err)
}
func TestRealisClient_GetJobSummary(t *testing.T) {
role := "vagrant"
env := "prod"
name := "GetJobSummaryJob"
name := "test_get_job_summary"
// Create a single job
job := realis.NewJob().
Environment(env).
@ -863,14 +959,10 @@ func TestRealisClient_GetJobSummary(t *testing.T) {
RAM(4).
Disk(10).
InstanceCount(3).
WatchTime(20 * time.Second).
IsService(true).
Production(false).
Tier("preemptible").
Priority(0).
BatchSize(2)
result, err := r.CreateService(job)
Priority(0)
err := r.CreateJob(job)
assert.NoError(t, err)
@ -898,4 +990,516 @@ func TestRealisClient_GetJobSummary(t *testing.T) {
err = r.KillJob(job.JobKey())
assert.NoError(t, err)
success, err = r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 90*time.Second)
assert.True(t, success)
assert.NoError(t, err)
}
func TestRealisClient_Offers(t *testing.T) {
var offers []realis.Offer
// since offers are being recycled, it take a few tries to get all of them.
i := 0
for ; len(offers) < 3 && i < 5; i++ {
offers, _ = r.Offers()
time.Sleep(5 * time.Second)
}
assert.NotEqual(t, i, 5)
}
func TestRealisClient_MaintenanceHosts(t *testing.T) {
offers, err := r.Offers()
assert.NoError(t, err)
for i := 0; i < len(offers); i++ {
_, err := r.DrainHosts(offers[i].Hostname)
assert.NoError(t, err)
hosts, err := r.MaintenanceHosts()
assert.Equal(t, i+1, len(hosts))
}
// clean up
for i := 0; i < len(offers); i++ {
_, err := r.EndMaintenance(offers[i].Hostname)
assert.NoError(t, err)
// Monitor change to DRAINING and DRAINED mode
_, err = r.MonitorHostMaintenance(
[]string{offers[i].Hostname},
[]aurora.MaintenanceMode{aurora.MaintenanceMode_NONE},
5*time.Second,
10*time.Second)
assert.NoError(t, err)
}
}
func TestRealisClient_AvailOfferReport(t *testing.T) {
var offers []realis.Offer
i := 0
for ; len(offers) < 3 && i < 5; i++ {
offers, _ = r.Offers()
time.Sleep(5 * time.Second)
}
assert.NotEqual(t, i, 3)
capacity, err := r.AvailOfferReport()
assert.NoError(t, err)
// 2 groups for non-dedicated & dedicated
assert.Equal(t, 2, len(capacity))
// 4 resources: cpus, disk, mem, ports
assert.Equal(t, 4, len(capacity["non-dedicated"]))
}
func TestRealisClient_FitTasks(t *testing.T) {
var offers []realis.Offer
i := 0
for ; len(offers) < 3 && i < 5; i++ {
offers, _ = r.Offers()
time.Sleep(5 * time.Second)
}
assert.NotEqual(t, i, 5)
cpuPerOffer := 0.0
for _, r := range offers[0].Resources {
if r.Name == "cpus" {
cpuPerOffer = r.Scalar.Value
}
}
// make sure all offers have no running executor
for _, o := range offers {
assert.Equal(t, o.ExecutorIds[:0], o.ExecutorIds)
}
validCpu := cpuPerOffer / 2
inValidCpu := cpuPerOffer + 1
gpu := int64(1)
tests := []struct {
message string
role string
request aurora.Resource
constraints []*aurora.Constraint
expected int64
isError bool
}{
{
message: "task with gpu request",
role: "vagrant",
request: aurora.Resource{
NumGpus: &gpu,
},
expected: 0,
isError: false,
},
{
message: "empty resource request",
role: "vagrant",
request: aurora.Resource{},
expected: -1,
isError: true,
},
{
message: "valid resource request",
role: "vagrant",
request: aurora.Resource{
NumCpus: &validCpu,
},
expected: 4,
isError: false,
},
{
message: "invalid cpu request",
role: "vagrant",
request: aurora.Resource{
NumCpus: &inValidCpu,
},
expected: 0,
isError: false,
},
{
message: "dedicated constraint",
role: "vagrant",
request: aurora.Resource{
NumCpus: &validCpu,
},
constraints: []*aurora.Constraint{
{
Name: "dedicated",
Constraint: &aurora.TaskConstraint{
Value: &aurora.ValueConstraint{
Negated: false,
Values: []string{"vagrant/bar"},
},
},
},
},
expected: 2,
isError: false,
},
{
message: "dedicated constraint with unauthorized role",
role: "unauthorized",
request: aurora.Resource{
NumCpus: &validCpu,
},
constraints: []*aurora.Constraint{
{
Name: "dedicated",
Constraint: &aurora.TaskConstraint{
Value: &aurora.ValueConstraint{
Negated: false,
Values: []string{"vagrant/bar"},
},
},
},
},
expected: 0,
isError: false,
},
{
message: "value constraint on zone",
role: "vagrant",
request: aurora.Resource{
NumCpus: &validCpu,
},
constraints: []*aurora.Constraint{
{
Name: "zone",
Constraint: &aurora.TaskConstraint{
Value: &aurora.ValueConstraint{
Negated: false,
Values: []string{"west"},
},
},
},
},
expected: 4,
isError: false,
},
{
message: "negative value constraint on zone",
role: "vagrant",
request: aurora.Resource{
NumCpus: &validCpu,
},
constraints: []*aurora.Constraint{
{
Name: "zone",
Constraint: &aurora.TaskConstraint{
Value: &aurora.ValueConstraint{
Negated: true,
Values: []string{"west"},
},
},
},
},
expected: 0,
isError: false,
},
{
message: "negative value constraint on host",
role: "vagrant",
request: aurora.Resource{
NumCpus: &validCpu,
},
constraints: []*aurora.Constraint{
{
Name: "host",
Constraint: &aurora.TaskConstraint{
Value: &aurora.ValueConstraint{
Negated: true,
Values: []string{"agent-one"},
},
},
},
},
expected: 2,
isError: false,
},
{
message: "value constraint on unavailable zone",
role: "vagrant",
request: aurora.Resource{
NumCpus: &validCpu,
},
constraints: []*aurora.Constraint{
{
Name: "zone",
Constraint: &aurora.TaskConstraint{
Value: &aurora.ValueConstraint{
Negated: false,
Values: []string{"east"},
},
},
},
},
expected: 0,
isError: false,
},
{
message: "value constraint on unavailable attribute",
role: "vagrant",
request: aurora.Resource{
NumCpus: &validCpu,
},
constraints: []*aurora.Constraint{
{
Name: "os",
Constraint: &aurora.TaskConstraint{
Value: &aurora.ValueConstraint{
Negated: false,
Values: []string{"windows"},
},
},
},
},
expected: 0,
isError: false,
},
{
message: "1 value constraint with 2 values",
role: "vagrant",
request: aurora.Resource{
NumCpus: &validCpu,
},
constraints: []*aurora.Constraint{
{
Name: "host",
Constraint: &aurora.TaskConstraint{
Value: &aurora.ValueConstraint{
Negated: false,
Values: []string{"agent-one", "agent-two"},
},
},
},
},
expected: 4,
isError: false,
},
{
message: "2 value constraints",
role: "vagrant",
request: aurora.Resource{
NumCpus: &validCpu,
},
constraints: []*aurora.Constraint{
{
Name: "host",
Constraint: &aurora.TaskConstraint{
Value: &aurora.ValueConstraint{
Negated: false,
Values: []string{"agent-one"},
},
},
},
{
Name: "rack",
Constraint: &aurora.TaskConstraint{
Value: &aurora.ValueConstraint{
Negated: false,
Values: []string{"2"},
},
},
},
},
expected: 0,
isError: false,
},
{
message: "limit constraint on host",
role: "vagrant",
request: aurora.Resource{
NumCpus: &validCpu,
},
constraints: []*aurora.Constraint{
{
Name: "host",
Constraint: &aurora.TaskConstraint{
Limit: &aurora.LimitConstraint{
Limit: 1,
},
},
},
},
expected: 2,
isError: false,
},
{
message: "limit constraint on zone",
role: "vagrant",
request: aurora.Resource{
NumCpus: &validCpu,
},
constraints: []*aurora.Constraint{
{
Name: "zone",
Constraint: &aurora.TaskConstraint{
Limit: &aurora.LimitConstraint{
Limit: 1,
},
},
},
},
expected: 1,
isError: false,
},
{
message: "limit constraint on zone & host",
role: "vagrant",
request: aurora.Resource{
NumCpus: &validCpu,
},
constraints: []*aurora.Constraint{
{
Name: "host",
Constraint: &aurora.TaskConstraint{
Limit: &aurora.LimitConstraint{
Limit: 1,
},
},
},
{
Name: "zone",
Constraint: &aurora.TaskConstraint{
Limit: &aurora.LimitConstraint{
Limit: 1,
},
},
},
},
expected: 1,
isError: false,
},
{
message: "limit constraint on unavailable zone",
role: "vagrant",
request: aurora.Resource{
NumCpus: &validCpu,
},
constraints: []*aurora.Constraint{
{
Name: "gpu-host", // no host has gpu-host attribute
Constraint: &aurora.TaskConstraint{
Limit: &aurora.LimitConstraint{
Limit: 1,
},
},
},
},
expected: 0,
isError: false,
},
{
message: "limit & dedicated constraint",
role: "vagrant",
request: aurora.Resource{
NumCpus: &validCpu,
},
constraints: []*aurora.Constraint{
{
Name: "dedicated",
Constraint: &aurora.TaskConstraint{
Value: &aurora.ValueConstraint{
Negated: false,
Values: []string{"vagrant/bar"},
},
},
},
{
Name: "host",
Constraint: &aurora.TaskConstraint{
Limit: &aurora.LimitConstraint{
Limit: 1,
},
},
},
},
expected: 1,
isError: false,
},
}
for _, tc := range tests {
task := aurora.NewTaskConfig()
task.Resources = []*aurora.Resource{&tc.request}
task.Constraints = tc.constraints
task.Job = &aurora.JobKey{
Role: tc.role,
}
numTasks, err := r.FitTasks(task, offers)
if !tc.isError {
assert.NoError(t, err)
assert.Equal(t, tc.expected, numTasks, tc.message)
} else {
assert.Error(t, err)
}
}
}
func TestRealisClient_JobExists(t *testing.T) {
role := "vagrant"
env := "prod"
name := "test_job_exists"
// Create a good single job
job := realis.NewJob().
Environment(env).
Role(role).
Name(name).
ThermosExecutor(thermosExec).
CPU(.25).
RAM(4).
Disk(10).
InstanceCount(3).
IsService(true).
Production(false).
Tier("preemptible").
Priority(0)
// Check if job exists before creating
exists, err := r.JobExists(job.JobKey())
assert.NoError(t, err)
assert.False(t, exists)
err = r.CreateJob(job)
assert.NoError(t, err)
exists, err = r.JobExists(job.JobKey())
assert.NoError(t, err)
assert.True(t, exists)
// Create a single bad job
badJob := realis.NewJob().
Environment("prod").
Role("vagrant").
Name("executordoesntexist").
ExecutorName("idontexist").
ExecutorData("").
CPU(.25).
RAM(4).
Disk(10).
InstanceCount(1)
// Check if job exists before creating
exists, err = r.JobExists(badJob.JobKey())
assert.NoError(t, err)
assert.False(t, exists)
err = r.CreateJob(badJob)
assert.Error(t, err)
exists, err = r.JobExists(badJob.JobKey())
assert.NoError(t, err)
assert.False(t, exists)
}

View file

@ -35,6 +35,10 @@ func ScheduleStatusResult(resp *aurora.Response) *aurora.ScheduleStatusResult_ {
}
func JobUpdateSummaries(resp *aurora.Response) []*aurora.JobUpdateSummary {
if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetGetJobUpdateSummariesResult_() == nil {
return nil
}
return resp.GetResult_().GetGetJobUpdateSummariesResult_().GetUpdateSummaries()
}

216
retry.go
View file

@ -17,10 +17,7 @@ package realis
import (
"io"
"math/rand"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"github.com/apache/thrift/lib/go/thrift"
@ -29,9 +26,11 @@ import (
"github.com/pkg/errors"
)
// Backoff determines how the retry mechanism should react after each failure and how many failures it should
// tolerate.
type Backoff struct {
Duration time.Duration // the base duration
Factor float64 // Duration is multipled by factor each iteration
Factor float64 // Duration is multiplied by a factor each iteration
Jitter float64 // The amount of jitter applied each iteration
Steps int // Exit with error after this many steps
}
@ -53,18 +52,15 @@ func Jitter(duration time.Duration, maxFactor float64) time.Duration {
// if the loop should be aborted.
type ConditionFunc func() (done bool, err error)
// Modified version of the Kubernetes exponential-backoff code.
// ExponentialBackoff repeats a condition check with exponential backoff.
//
// It checks the condition up to Steps times, increasing the wait by multiplying
// the previous duration by Factor.
// ExponentialBackoff is a modified version of the Kubernetes exponential-backoff code.
// It repeats a condition check with exponential backoff and checks the condition up to
// Steps times, increasing the wait by multiplying the previous duration by Factor.
//
// If Jitter is greater than zero, a random amount of each duration is added
// (between duration and duration*(1+jitter)).
//
// If the condition never returns true, ErrWaitTimeout is returned. Errors
// do not cause the function to return.
func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc) error {
var err error
var ok bool
@ -98,10 +94,9 @@ func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc)
// If the error is temporary, continue retrying.
if !IsTemporary(err) {
return err
} else {
// Print out the temporary error we experienced.
logger.Println(err)
}
// Print out the temporary error we experienced.
logger.Println(err)
}
}
@ -112,19 +107,28 @@ func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc)
// Provide more information to the user wherever possible
if err != nil {
return newRetryError(errors.Wrap(err, "ran out of retries"), curStep)
} else {
return newRetryError(errors.New("ran out of retries"), curStep)
}
return newRetryError(errors.New("ran out of retries"), curStep)
}
type auroraThriftCall func() (resp *aurora.Response, err error)
// verifyOntimeout defines the type of function that will be used to verify whether a Thirft call to the Scheduler
// made it to the scheduler or not. In general, these types of functions will have to interact with the scheduler
// through the very same Thrift API which previously encountered a time-out from the client.
// This means that the functions themselves should be kept to a minimum number of Thrift calls.
// It should also be noted that this is a best effort mechanism and
// is likely to fail for the same reasons that the original call failed.
type verifyOnTimeout func() (*aurora.Response, bool)
// Duplicates the functionality of ExponentialBackoff but is specifically targeted towards ThriftCalls.
func (c *Client) thriftCallWithRetries(returnOnTimeout bool, thriftCall auroraThriftCall) (*aurora.Response, error) {
func (c *Client) thriftCallWithRetries(returnOnTimeout bool, thriftCall auroraThriftCall,
verifyOnTimeout verifyOnTimeout) (*aurora.Response, error) {
var resp *aurora.Response
var clientErr error
var curStep int
var timeouts int
timeouts := 0
backoff := c.config.backoff
duration := backoff.Duration
@ -138,7 +142,10 @@ func (c *Client) thriftCallWithRetries(returnOnTimeout bool, thriftCall auroraTh
adjusted = Jitter(duration, backoff.Jitter)
}
c.logger.Printf("A retryable error occurred during thrift call, backing off for %v before retry %v\n", adjusted, curStep)
c.logger.Printf(
"A retryable error occurred during thrift call, backing off for %v before retry %v",
adjusted,
curStep)
time.Sleep(adjusted)
duration = time.Duration(float64(duration) * backoff.Factor)
@ -153,105 +160,132 @@ func (c *Client) thriftCallWithRetries(returnOnTimeout bool, thriftCall auroraTh
resp, clientErr = thriftCall()
c.logger.TracePrintf("Aurora Thrift Call ended resp: %v clientErr: %v\n", resp, clientErr)
c.logger.TracePrintf("Aurora Thrift Call ended resp: %v clientErr: %v", resp, clientErr)
}()
// Check if our thrift call is returning an error. This is a retryable event as we don't know
// if it was caused by network issues.
if clientErr != nil {
// Print out the error to the user
c.logger.Printf("Client Error: %v\n", clientErr)
c.logger.Printf("Client Error: %v", clientErr)
// Determine if error is a temporary URL error by going up the stack
e, ok := clientErr.(thrift.TTransportException)
if ok {
c.logger.DebugPrint("Encountered a transport exception")
temporary, timedout := isConnectionError(clientErr)
if !temporary && c.RealisConfig().failOnPermanentErrors {
return nil, errors.Wrap(clientErr, "permanent connection error")
}
// TODO(rdelvalle): Figure out a better way to obtain the error code as this is a very brittle solution
// 401 Unauthorized means the wrong username and password were provided
if strings.Contains(e.Error(), strconv.Itoa(http.StatusUnauthorized)) {
return nil, errors.Wrap(clientErr, "wrong username or password provided")
}
e, ok := e.Err().(*url.Error)
if ok {
// EOF error occurs when the server closes the read buffer of the client. This is common
// when the server is overloaded and should be retried. All other errors that are permanent
// will not be retried.
if e.Err != io.EOF && !e.Temporary() && c.RealisConfig().failOnPermanentErrors {
return nil, errors.Wrap(clientErr, "permanent connection error")
}
// Corner case where thrift payload was received by Aurora but connection timedout before Aurora was
// able to reply. In this case we will return whatever response was received and a TimedOut behaving
// error. Users can take special action on a timeout by using IsTimedout and reacting accordingly.
if e.Timeout() {
timeouts++
c.logger.DebugPrintf(
"Client closed connection (timedout) %d times before server responded,"+
" consider increasing connection timeout",
timeouts)
if returnOnTimeout {
return resp,
newTimedoutError(errors.New("client connection closed before server answer"))
}
}
}
// There exists a corner case where thrift payload was received by Aurora but
// connection timed out before Aurora was able to reply.
// Users can take special action on a timeout by using IsTimedout and reacting accordingly
// if they have configured the client to return on a timeout.
if timedout && returnOnTimeout {
return resp, newTimedoutError(errors.New("client connection closed before server answer"))
}
// In the future, reestablish connection should be able to check if it is actually possible
// to make a thrift call to Aurora. For now, a reconnect should always lead to a retry.
// Ignoring error due to the fact that an error should be retried regardless
_ = c.ReestablishConn()
} else {
// If there was no client error, but the response is nil, something went wrong.
// Ideally, we'll never encounter this but we're placing a safeguard here.
if resp == nil {
return nil, errors.New("response from aurora is nil")
reestablishErr := c.ReestablishConn()
if reestablishErr != nil {
c.logger.DebugPrintf("error re-establishing connection ", reestablishErr)
}
// Check Response Code from thrift and make a decision to continue retrying or not.
switch responseCode := resp.GetResponseCode(); responseCode {
// If users did not opt for a return on timeout in order to react to a timedout error,
// attempt to verify that the call made it to the scheduler after the connection was re-established.
if timedout {
timeouts++
c.logger.DebugPrintf(
"Client closed connection %d times before server responded, "+
"consider increasing connection timeout",
timeouts)
// If the thrift call succeeded, stop retrying
case aurora.ResponseCode_OK:
return resp, nil
// If the response code is transient, continue retrying
case aurora.ResponseCode_ERROR_TRANSIENT:
c.logger.Println("Aurora replied with Transient error code, retrying")
continue
// Failure scenarios, these indicate a bad payload or a bad clientConfig. Stop retrying.
case aurora.ResponseCode_INVALID_REQUEST,
aurora.ResponseCode_ERROR,
aurora.ResponseCode_AUTH_FAILED,
aurora.ResponseCode_JOB_UPDATING_ERROR:
c.logger.Printf("Terminal Response Code %v from Aurora, won't retry\n", resp.GetResponseCode().String())
return resp, errors.New(response.CombineMessage(resp))
// The only case that should fall down to here is a WARNING response code.
// It is currently not used as a response in the scheduler so it is unknown how to handle it.
default:
c.logger.DebugPrintf("unhandled response code %v received from Aurora\n", responseCode)
return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String())
// Allow caller to provide a function which checks if the original call was successful before
// it timed out.
if verifyOnTimeout != nil {
if verifyResp, ok := verifyOnTimeout(); ok {
c.logger.Print("verified that the call went through successfully after a client timeout")
// Response here might be different than the original as it is no longer constructed
// by the scheduler but mimicked.
// This is OK since the scheduler is very unlikely to change responses at this point in its
// development cycle but we must be careful to not return an incorrectly constructed response.
return verifyResp, nil
}
}
}
// Retry the thrift payload
continue
}
// If there was no client error, but the response is nil, something went wrong.
// Ideally, we'll never encounter this but we're placing a safeguard here.
if resp == nil {
return nil, errors.New("response from aurora is nil")
}
// Check Response Code from thrift and make a decision to continue retrying or not.
switch responseCode := resp.GetResponseCode(); responseCode {
// If the thrift call succeeded, stop retrying
case aurora.ResponseCode_OK:
return resp, nil
// If the response code is transient, continue retrying
case aurora.ResponseCode_ERROR_TRANSIENT:
c.logger.Println("Aurora replied with Transient error code, retrying")
continue
// Failure scenarios, these indicate a bad payload or a bad clientConfig. Stop retrying.
case aurora.ResponseCode_INVALID_REQUEST,
aurora.ResponseCode_ERROR,
aurora.ResponseCode_AUTH_FAILED,
aurora.ResponseCode_JOB_UPDATING_ERROR:
c.logger.Printf("Terminal Response Code %v from Aurora, won't retry\n", resp.GetResponseCode().String())
return resp, errors.New(response.CombineMessage(resp))
// The only case that should fall down to here is a WARNING response code.
// It is currently not used as a response in the scheduler so it is unknown how to handle it.
default:
c.logger.DebugPrintf("unhandled response code %v received from Aurora\n", responseCode)
return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String())
}
}
c.logger.DebugPrintf("it took %v retries to complete this operation\n", curStep)
if curStep > 1 {
c.config.logger.Printf("retried this thrift call %d time(s)", curStep)
c.config.logger.Printf("this thrift call was retried %d time(s)", curStep)
}
// Provide more information to the user wherever possible.
if clientErr != nil {
return nil, newRetryError(errors.Wrap(clientErr, "ran out of retries, including latest error"), curStep)
} else {
return nil, newRetryError(errors.New("ran out of retries"), curStep)
}
return nil, newRetryError(errors.New("ran out of retries"), curStep)
}
// isConnectionError processes the error received by the client.
// The return values indicate whether this was determined to be a temporary error
// and whether it was determined to be a timeout error
func isConnectionError(err error) (bool, bool) {
// Determine if error is a temporary URL error by going up the stack
transportException, ok := err.(thrift.TTransportException)
if !ok {
return false, false
}
urlError, ok := transportException.Err().(*url.Error)
if !ok {
return false, false
}
// EOF error occurs when the server closes the read buffer of the client. This is common
// when the server is overloaded and we consider it temporary.
// All other which are not temporary as per the member function Temporary(),
// are considered not temporary (permanent).
if urlError.Err != io.EOF && !urlError.Temporary() {
return false, false
}
return true, urlError.Timeout()
}

View file

@ -1,4 +1,4 @@
#!/bin/bash
# Since we run our docker compose setup in bridge mode to be able to run on MacOS, we have to launch a Docker container within the bridge network in order to avoid any routing issues.
docker run --rm -t -v $(pwd):/go/src/github.com/aurora-scheduler/gorealis --network gorealis_aurora_cluster golang:1.13-stretch go test -v github.com/aurora-scheduler/gorealis $@
docker run --rm -t -w /gorealis -v $GOPATH/pkg:/go/pkg -v $(pwd):/gorealis --network gorealis_aurora_cluster golang:1.17-buster go test -v github.com/aurora-scheduler/gorealis/v2 $@

22
util.go
View file

@ -40,7 +40,7 @@ func init() {
}
}
// TerminalJobUpdateStates returns a slice containing all the terminal states an update may end up in.
// TerminalUpdateStates returns a slice containing all the terminal states an update may be in.
// This is a function in order to avoid having a slice that can be accidentally mutated.
func TerminalUpdateStates() []aurora.JobUpdateStatus {
return []aurora.JobUpdateStatus{
@ -104,3 +104,23 @@ func calculateCurrentBatch(updatingInstances int32, batchSizes []int32) int {
}
return batchCount
}
func ResourcesToMap(resources []*aurora.Resource) map[string]float64 {
result := map[string]float64{}
for _, resource := range resources {
if resource.NumCpus != nil {
result["cpus"] += *resource.NumCpus
} else if resource.RamMb != nil {
result["mem"] += float64(*resource.RamMb)
} else if resource.DiskMb != nil {
result["disk"] += float64(*resource.DiskMb)
} else if resource.NamedPort != nil {
result["ports"]++
} else if resource.NumGpus != nil {
result["gpus"] += float64(*resource.NumGpus)
}
}
return result
}

205
zk.go
View file

@ -286,3 +286,208 @@ func MesosFromZKOpts(options ...ZKOpt) (string, error) {
return mesosURL, nil
}
// Retrieves current Aurora master nodes from ZK.
func MasterNodesFromZK(cluster Cluster) (map[string][]string, error) {
return MasterNodesFromZKOpts(ZKEndpoints(strings.Split(cluster.ZK, ",")...), ZKPath(cluster.SchedZKPath))
}
// Retrieves current Mesos master nodes/leader from ZK with a custom configuration.
func MasterNodesFromZKOpts(options ...ZKOpt) (map[string][]string, error) {
result := make(map[string][]string)
// Load the default configuration for Zookeeper followed by overriding values with those provided by the caller.
config := &zkConfig{backoff: defaultBackoff, timeout: time.Second * 10, logger: NoopLogger{}}
for _, opt := range options {
opt(config)
}
if len(config.endpoints) == 0 {
return nil, errors.New("no Zookeeper endpoints supplied")
}
if config.path == "" {
return nil, errors.New("no Zookeeper path supplied")
}
// Create a closure that allows us to use the ExponentialBackoff function.
retryErr := ExponentialBackoff(config.backoff, config.logger, func() (bool, error) {
c, _, err := zk.Connect(config.endpoints, config.timeout, func(c *zk.Conn) { c.SetLogger(config.logger) })
if err != nil {
return false, NewTemporaryError(errors.Wrap(err, "Failed to connect to Zookeeper"))
}
defer c.Close()
// Open up descriptor for the ZK path given
children, _, _, err := c.ChildrenW(config.path)
if err != nil {
// Sentinel error check as there is no other way to check.
if err == zk.ErrInvalidPath {
return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", config.path)
}
return false,
NewTemporaryError(errors.Wrapf(err, "path %s doesn't exist on Zookeeper ", config.path))
}
// Get all the master nodes through all the children in the given path
serviceInst := new(ServiceInstance)
var hosts []string
for _, child := range children {
childPath := config.path + "/" + child
data, _, err := c.Get(childPath)
if err != nil {
if err == zk.ErrInvalidPath {
return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", childPath)
}
return false, NewTemporaryError(errors.Wrap(err, "error fetching contents of leader"))
}
// Only leader is in json format. Have to parse data differently between member_ and not member_
if strings.HasPrefix(child, "member_") {
err = json.Unmarshal([]byte(data), &serviceInst)
if err != nil {
return false,
NewTemporaryError(errors.Wrap(err, "unable to unmarshal contents of leader"))
}
// Should only be one endpoint.
// This should never be encountered as it would indicate Aurora
// writing bad info into Zookeeper but is kept here as a safety net.
if len(serviceInst.AdditionalEndpoints) > 1 {
return false,
NewTemporaryError(
errors.New("ambiguous endpoints in json blob, Aurora wrote bad info to ZK"))
}
for _, v := range serviceInst.AdditionalEndpoints {
result["leader"] = append(result["leader"], v.Host)
}
} else {
// data is not in a json format
hosts = append(hosts, string(data))
}
}
result["masterNodes"] = hosts
// Master nodes data might not be available yet, try to fetch again.
if len(result["masterNodes"]) == 0 {
return false, NewTemporaryError(errors.New("no master nodes found"))
}
return true, nil
})
if retryErr != nil {
config.logger.Printf("Failed to get master nodes after %v attempts", config.backoff.Steps)
return nil, retryErr
}
return result, nil
}
// Retrieves current Mesos Aurora master nodes from ZK.
func MesosMasterNodesFromZK(cluster Cluster) (map[string][]string, error) {
return MesosMasterNodesFromZKOpts(ZKEndpoints(strings.Split(cluster.ZK, ",")...), ZKPath(cluster.MesosZKPath))
}
// Retrieves current mesos master nodes/leader from ZK with a custom configuration.
func MesosMasterNodesFromZKOpts(options ...ZKOpt) (map[string][]string, error) {
result := make(map[string][]string)
// Load the default configuration for Zookeeper followed by overriding values with those provided by the caller.]
config := &zkConfig{backoff: defaultBackoff, timeout: time.Second * 10, logger: NoopLogger{}}
for _, opt := range options {
opt(config)
}
if len(config.endpoints) == 0 {
return nil, errors.New("no Zookeeper endpoints supplied")
}
if config.path == "" {
return nil, errors.New("no Zookeeper path supplied")
}
// Create a closure that allows us to use the ExponentialBackoff function.
retryErr := ExponentialBackoff(config.backoff, config.logger, func() (bool, error) {
c, _, err := zk.Connect(config.endpoints, config.timeout, func(c *zk.Conn) { c.SetLogger(config.logger) })
if err != nil {
return false, NewTemporaryError(errors.Wrap(err, "Failed to connect to Zookeeper"))
}
defer c.Close()
// Open up descriptor for the ZK path given
children, _, _, err := c.ChildrenW(config.path)
if err != nil {
// Sentinel error check as there is no other way to check.
if err == zk.ErrInvalidPath {
return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", config.path)
}
return false,
NewTemporaryError(errors.Wrapf(err, "path %s doesn't exist on Zookeeper ", config.path))
}
// Get all the master nodes through all the children in the given path
minScore := math.MaxInt64
var mesosInstance MesosInstance
var hosts []string
for _, child := range children {
// Only the master nodes will start with json.info_
if strings.HasPrefix(child, "json.info_") {
strs := strings.Split(child, "_")
if len(strs) < 2 {
config.logger.Printf("Zk node %v/%v's name is malformed.", config.path, child)
continue
}
score, err := strconv.Atoi(strs[1])
if err != nil {
return false, NewTemporaryError(errors.Wrap(err, "unable to read the zk node for Mesos."))
}
childPath := config.path + "/" + child
data, _, err := c.Get(childPath)
if err != nil {
if err == zk.ErrInvalidPath {
return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", childPath)
}
return false, NewTemporaryError(errors.Wrap(err, "error fetching contents of leader"))
}
err = json.Unmarshal([]byte(data), &mesosInstance)
if err != nil {
config.logger.Printf("%s", err)
return false,
NewTemporaryError(errors.Wrap(err, "unable to unmarshal contents of leader"))
}
// Combine all master nodes into comma-separated
// Return hostname instead of ip to be consistent with aurora master nodes
hosts = append(hosts, mesosInstance.Address.Hostname)
// get the leader from the child with the smallest score.
if score < minScore {
minScore = score
result["leader"] = append(result["leader"], mesosInstance.Address.Hostname)
}
}
}
result["masterNodes"] = hosts
// Master nodes data might not be available yet, try to fetch again.
if len(result["masterNodes"]) == 0 {
return false, NewTemporaryError(errors.New("no mesos master nodes found"))
}
return true, nil
})
if retryErr != nil {
config.logger.Printf("Failed to get mesos master nodes after %v attempts", config.backoff.Steps)
return nil, retryErr
}
return result, nil
}