diff --git a/.auroraversion b/.auroraversion index 8854156..4e8f395 100644 --- a/.auroraversion +++ b/.auroraversion @@ -1 +1 @@ -0.21.0 +0.26.0 diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 665cbd8..eb459c8 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -1,18 +1,24 @@ name: CI -on: [push] +on: + push: + branches: + - master + pull_request: + branches: + - master jobs: build: - runs-on: ubuntu-latest + runs-on: ubuntu-20.04 steps: - uses: actions/checkout@v2 - name: Setup Go for use with actions uses: actions/setup-go@v2 with: - go-version: 1.15 + go-version: 1.17 - name: Install goimports run: go get golang.org/x/tools/cmd/goimports - name: Set env with list of directories in repo containin go code diff --git a/.gitignore b/.gitignore index 6cd33c9..3f124ef 100644 --- a/.gitignore +++ b/.gitignore @@ -41,6 +41,3 @@ _testmain.go # Example client build examples/client examples/jsonClient - -# Use checksum database -go.sum diff --git a/clusters.go b/clusters.go index 49e93f6..c0e81dd 100644 --- a/clusters.go +++ b/clusters.go @@ -28,6 +28,7 @@ type Cluster struct { ZK string `json:"zk"` ZKPort int `json:"zk_port"` SchedZKPath string `json:"scheduler_zk_path"` + MesosZKPath string `json:"mesos_zk_path"` SchedURI string `json:"scheduler_uri"` ProxyURL string `json:"proxy_url"` AuthMechanism string `json:"auth_mechanism"` @@ -61,6 +62,7 @@ func GetDefaultClusterFromZKUrl(zkURL string) *Cluster { AuthMechanism: "UNAUTHENTICATED", ZK: zkURL, SchedZKPath: "/aurora/scheduler", + MesosZKPath: "/mesos", AgentRunDir: "latest", AgentRoot: "/var/lib/mesos", } diff --git a/clusters_test.go b/clusters_test.go index 48b2f03..f0bfd54 100644 --- a/clusters_test.go +++ b/clusters_test.go @@ -32,6 +32,7 @@ func TestLoadClusters(t *testing.T) { assert.Equal(t, clusters["devcluster"].Name, "devcluster") assert.Equal(t, clusters["devcluster"].ZK, "192.168.33.7") assert.Equal(t, clusters["devcluster"].SchedZKPath, "/aurora/scheduler") + assert.Equal(t, clusters["devcluster"].MesosZKPath, "/mesos") assert.Equal(t, clusters["devcluster"].AuthMechanism, "UNAUTHENTICATED") assert.Equal(t, clusters["devcluster"].AgentRunDir, "latest") assert.Equal(t, clusters["devcluster"].AgentRoot, "/var/lib/mesos") diff --git a/docker-compose.yml b/docker-compose.yml index f103d35..6fd40ba 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,7 +14,7 @@ services: ipv4_address: 192.168.33.2 master: - image: rdelvalle/mesos-master:1.6.2 + image: quay.io/aurorascheduler/mesos-master:1.9.0 restart: on-failure ports: - "5050:5050" @@ -32,7 +32,7 @@ services: - zk agent-one: - image: rdelvalle/mesos-agent:1.6.2 + image: quay.io/aurorascheduler/mesos-agent:1.9.0 pid: host restart: on-failure ports: @@ -41,10 +41,11 @@ services: MESOS_MASTER: zk://192.168.33.2:2181/mesos MESOS_CONTAINERIZERS: docker,mesos MESOS_PORT: 5051 - MESOS_HOSTNAME: localhost + MESOS_HOSTNAME: agent-one MESOS_RESOURCES: ports(*):[11000-11999] MESOS_SYSTEMD_ENABLE_SUPPORT: 'false' MESOS_WORK_DIR: /tmp/mesos + MESOS_ATTRIBUTES: 'host:agent-one;rack:1;zone:west' networks: aurora_cluster: ipv4_address: 192.168.33.4 @@ -55,8 +56,58 @@ services: depends_on: - zk + agent-two: + image: quay.io/aurorascheduler/mesos-agent:1.9.0 + pid: host + restart: on-failure + ports: + - "5052:5051" + environment: + MESOS_MASTER: zk://192.168.33.2:2181/mesos + MESOS_CONTAINERIZERS: docker,mesos + MESOS_PORT: 5051 + MESOS_HOSTNAME: agent-two + MESOS_RESOURCES: ports(*):[11000-11999] + MESOS_SYSTEMD_ENABLE_SUPPORT: 'false' + MESOS_WORK_DIR: /tmp/mesos + MESOS_ATTRIBUTES: 'host:agent-two;rack:2;zone:west' + networks: + aurora_cluster: + ipv4_address: 192.168.33.5 + + volumes: + - /sys/fs/cgroup:/sys/fs/cgroup + - /var/run/docker.sock:/var/run/docker.sock + depends_on: + - zk + + agent-three: + image: quay.io/aurorascheduler/mesos-agent:1.9.0 + pid: host + restart: on-failure + ports: + - "5053:5051" + environment: + MESOS_MASTER: zk://192.168.33.2:2181/mesos + MESOS_CONTAINERIZERS: docker,mesos + MESOS_PORT: 5051 + MESOS_HOSTNAME: agent-three + MESOS_RESOURCES: ports(*):[11000-11999] + MESOS_SYSTEMD_ENABLE_SUPPORT: 'false' + MESOS_WORK_DIR: /tmp/mesos + MESOS_ATTRIBUTES: 'host:agent-three;rack:2;zone:west;dedicated:vagrant/bar' + networks: + aurora_cluster: + ipv4_address: 192.168.33.6 + + volumes: + - /sys/fs/cgroup:/sys/fs/cgroup + - /var/run/docker.sock:/var/run/docker.sock + depends_on: + - zk + aurora-one: - image: rdelvalle/aurora:0.22.0 + image: quay.io/aurorascheduler/scheduler:0.25.0 pid: host ports: - "8081:8081" @@ -70,6 +121,7 @@ services: -shiro_realm_modules=INI_AUTHNZ -shiro_ini_path=/etc/aurora/security.ini -min_required_instances_for_sla_check=1 + -thermos_executor_cpu=0.09 volumes: - ./.aurora-config:/etc/aurora networks: diff --git a/examples/clusters.json b/examples/clusters.json index c456bd8..33723a5 100644 --- a/examples/clusters.json +++ b/examples/clusters.json @@ -2,6 +2,7 @@ "name": "devcluster", "zk": "192.168.33.7", "scheduler_zk_path": "/aurora/scheduler", + "mesos_zk_path": "/mesos", "auth_mechanism": "UNAUTHENTICATED", "slave_run_directory": "latest", "slave_root": "/var/lib/mesos" diff --git a/go.mod b/go.mod index 3b3095f..c9c3dee 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ require ( github.com/apache/thrift v0.14.0 github.com/pkg/errors v0.9.1 github.com/samuel/go-zookeeper v0.0.0-20171117190445-471cd4e61d7a - github.com/stretchr/testify v1.5.0 + github.com/stretchr/testify v1.7.0 ) go 1.16 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..d65a779 --- /dev/null +++ b/go.sum @@ -0,0 +1,22 @@ +github.com/apache/thrift v0.14.0 h1:vqZ2DP42i8th2OsgCcYZkirtbzvpZEFx53LiWDJXIAs= +github.com/apache/thrift v0.14.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/samuel/go-zookeeper v0.0.0-20171117190445-471cd4e61d7a h1:EYL2xz/Zdo0hyqdZMXR4lmT2O11jDLTPCEqIe/FR6W4= +github.com/samuel/go-zookeeper v0.0.0-20171117190445-471cd4e61d7a/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= +github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.5.0 h1:DMOzIV76tmoDNE9pX6RSN0aDtCYeCg5VueieJaAo1uw= +github.com/stretchr/testify v1.5.0/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/helpers.go b/helpers.go new file mode 100644 index 0000000..f1983db --- /dev/null +++ b/helpers.go @@ -0,0 +1,23 @@ +package realis + +import ( + "context" + + "github.com/aurora-scheduler/gorealis/v2/gen-go/apache/aurora" +) + +func (r *Client) JobExists(key aurora.JobKey) (bool, error) { + resp, err := r.client.GetConfigSummary(context.TODO(), &key) + if err != nil { + return false, err + } + + return resp != nil && + resp.GetResult_() != nil && + resp.GetResult_().GetConfigSummaryResult_() != nil && + resp.GetResult_().GetConfigSummaryResult_().GetSummary() != nil && + resp.GetResult_().GetConfigSummaryResult_().GetSummary().GetGroups() != nil && + len(resp.GetResult_().GetConfigSummaryResult_().GetSummary().GetGroups()) > 0 && + resp.GetResponseCode() == aurora.ResponseCode_OK, + nil +} diff --git a/jobUpdate.go b/jobUpdate.go index 248a65d..d880c29 100644 --- a/jobUpdate.go +++ b/jobUpdate.go @@ -77,7 +77,7 @@ func (j *JobUpdate) BatchSize(size int32) *JobUpdate { // Minimum number of seconds a shard must remain in RUNNING state before considered a success. func (j *JobUpdate) WatchTime(timeout time.Duration) *JobUpdate { - j.request.Settings.MinWaitInInstanceRunningMs = int32(timeout.Seconds() * 1000) + j.request.Settings.MinWaitInInstanceRunningMs = int32(timeout.Milliseconds()) return j } diff --git a/monitors.go b/monitors.go index 963c017..552d12e 100644 --- a/monitors.go +++ b/monitors.go @@ -245,7 +245,7 @@ func (c *Client) MonitorHostMaintenance(hosts []string, } } -// AutoPaused monitor is a special monitor for auto pause enabled batch updates. This monitor ensures that the update +// MonitorAutoPausedUpdate is a special monitor for auto pause enabled batch updates. This monitor ensures that the update // being monitored is capable of auto pausing and has auto pausing enabled. After verifying this information, // the monitor watches for the job to enter the ROLL_FORWARD_PAUSED state and calculates the current batch // the update is in using information from the update configuration. @@ -294,8 +294,9 @@ func (c *Client) MonitorAutoPausedUpdate(key aurora.JobUpdateKey, interval, time return -1, err } - // Summary 0 is assumed to exist because MonitorJobUpdateQuery will return an error if there is Summaries - if summary[0].State.Status != aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED { + // Summary 0 is assumed to exist because MonitorJobUpdateQuery will return an error if there is no summaries + if !(summary[0].State.Status == aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED || + summary[0].State.Status == aurora.JobUpdateStatus_ROLLED_FORWARD) { return -1, errors.Errorf("update is in a terminal state %v", summary[0].State.Status) } diff --git a/offer.go b/offer.go new file mode 100644 index 0000000..6f5346f --- /dev/null +++ b/offer.go @@ -0,0 +1,434 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package realis + +import ( + "bytes" + "crypto/tls" + "encoding/json" + "fmt" + "net/http" + "strings" + + "github.com/aurora-scheduler/gorealis/v2/gen-go/apache/aurora" +) + +// Offers on [aurora-scheduler]/offers endpoint +type Offer struct { + ID struct { + Value string `json:"value"` + } `json:"id"` + FrameworkID struct { + Value string `json:"value"` + } `json:"framework_id"` + AgentID struct { + Value string `json:"value"` + } `json:"agent_id"` + Hostname string `json:"hostname"` + URL struct { + Scheme string `json:"scheme"` + Address struct { + Hostname string `json:"hostname"` + IP string `json:"ip"` + Port int `json:"port"` + } `json:"address"` + Path string `json:"path"` + Query []interface{} `json:"query"` + } `json:"url"` + Resources []struct { + Name string `json:"name"` + Type string `json:"type"` + Ranges struct { + Range []struct { + Begin int `json:"begin"` + End int `json:"end"` + } `json:"range"` + } `json:"ranges,omitempty"` + Role string `json:"role"` + Reservations []interface{} `json:"reservations"` + Scalar struct { + Value float64 `json:"value"` + } `json:"scalar,omitempty"` + } `json:"resources"` + Attributes []struct { + Name string `json:"name"` + Type string `json:"type"` + Text struct { + Value string `json:"value"` + } `json:"text"` + } `json:"attributes"` + ExecutorIds []struct { + Value string `json:"value"` + } `json:"executor_ids"` +} + +// hosts on [aurora-scheduler]/maintenance endpoint +type MaintenanceList struct { + Drained []string `json:"DRAINED"` + Scheduled []string `json:"SCHEDULED"` + Draining map[string][]string `json:"DRAINING"` +} + +type OfferCount map[float64]int64 +type OfferGroupReport map[string]OfferCount +type OfferReport map[string]OfferGroupReport + +// MaintenanceHosts list all the hosts under maintenance +func (c *Client) MaintenanceHosts() ([]string, error) { + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: c.config.insecureSkipVerify}, + } + + request := &http.Client{Transport: tr} + + resp, err := request.Get(fmt.Sprintf("%s/maintenance", c.GetSchedulerURL())) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + buf := new(bytes.Buffer) + if _, err := buf.ReadFrom(resp.Body); err != nil { + return nil, err + } + + var list MaintenanceList + + if err := json.Unmarshal(buf.Bytes(), &list); err != nil { + return nil, err + } + + hosts := append(list.Drained, list.Scheduled...) + + for drainingHost := range list.Draining { + hosts = append(hosts, drainingHost) + } + + return hosts, nil +} + +// Offers pulls data from /offers endpoint +func (c *Client) Offers() ([]Offer, error) { + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: c.config.insecureSkipVerify}, + } + + request := &http.Client{Transport: tr} + + resp, err := request.Get(fmt.Sprintf("%s/offers", c.GetSchedulerURL())) + if err != nil { + return []Offer{}, err + } + defer resp.Body.Close() + + buf := new(bytes.Buffer) + if _, err := buf.ReadFrom(resp.Body); err != nil { + return nil, err + } + + var offers []Offer + + if err := json.Unmarshal(buf.Bytes(), &offers); err != nil { + return []Offer{}, err + } + + return offers, nil +} + +// AvailOfferReport returns a detailed summary of offers available for use. +// For example, 2 nodes offer 32 cpus and 10 nodes offer 1 cpus. +func (c *Client) AvailOfferReport() (OfferReport, error) { + maintHosts, err := c.MaintenanceHosts() + if err != nil { + return nil, err + } + + maintHostSet := map[string]bool{} + for _, h := range maintHosts { + maintHostSet[h] = true + } + + // Get a list of offers + offers, err := c.Offers() + if err != nil { + return nil, err + } + + report := OfferReport{} + + for _, o := range offers { + if maintHostSet[o.Hostname] { + continue + } + + group := "non-dedicated" + for _, a := range o.Attributes { + if a.Name == "dedicated" { + group = a.Text.Value + break + } + } + + if _, ok := report[group]; !ok { + report[group] = map[string]OfferCount{} + } + + for _, r := range o.Resources { + + if _, ok := report[group][r.Name]; !ok { + report[group][r.Name] = OfferCount{} + } + + val := 0.0 + switch r.Type { + case "SCALAR": + val = r.Scalar.Value + case "RANGES": + for _, pr := range r.Ranges.Range { + val += float64(pr.End - pr.Begin + 1) + } + default: + return nil, fmt.Errorf("%s is not supported", r.Type) + } + + report[group][r.Name][val]++ + } + } + + return report, nil +} + +// FitTasks computes the number tasks can be fit in a list of offer +func (c *Client) FitTasks(taskConfig *aurora.TaskConfig, offers []Offer) (int64, error) { + // count the number of tasks per limit contraint: limit.name -> limit.value -> count + limitCounts := map[string]map[string]int64{} + for _, c := range taskConfig.Constraints { + if c.Constraint.Limit != nil { + limitCounts[c.Name] = map[string]int64{} + } + } + + request := ResourcesToMap(taskConfig.Resources) + + // validate resource request + if len(request) == 0 { + return -1, fmt.Errorf("Resource request %v must not be empty", request) + } + + isValid := false + for _, resVal := range request { + if resVal > 0 { + isValid = true + break + } + } + + if !isValid { + return -1, fmt.Errorf("Resource request %v is not valid", request) + } + + // pull the list of hosts under maintenance + maintHosts, err := c.MaintenanceHosts() + if err != nil { + return -1, err + } + + maintHostSet := map[string]bool{} + for _, h := range maintHosts { + maintHostSet[h] = true + } + + numTasks := int64(0) + + for _, o := range offers { + // skip the hosts under maintenance + if maintHostSet[o.Hostname] { + continue + } + + numTasksPerOffer := int64(-1) + + for resName, resVal := range request { + // skip as we can fit a infinite number of tasks with 0 demand. + if resVal == 0 { + continue + } + + avail := 0.0 + for _, r := range o.Resources { + if r.Name != resName { + continue + } + + switch r.Type { + case "SCALAR": + avail = r.Scalar.Value + case "RANGES": + for _, pr := range r.Ranges.Range { + avail += float64(pr.End - pr.Begin + 1) + } + default: + return -1, fmt.Errorf("%s is not supported", r.Type) + } + } + + numTasksPerResource := int64(avail / resVal) + + if numTasksPerResource < numTasksPerOffer || numTasksPerOffer < 0 { + numTasksPerOffer = numTasksPerResource + } + } + + numTasks += fitConstraints(taskConfig, &o, limitCounts, numTasksPerOffer) + } + + return numTasks, nil +} + +func fitConstraints(taskConfig *aurora.TaskConfig, + offer *Offer, + limitCounts map[string]map[string]int64, + numTasksPerOffer int64) int64 { + + // check dedicated attributes vs. constraints + if !isDedicated(offer, taskConfig.Job.Role, taskConfig.Constraints) { + return 0 + } + + limitConstraints := []*aurora.Constraint{} + + for _, c := range taskConfig.Constraints { + // look for corresponding attribute + attFound := false + for _, a := range offer.Attributes { + if a.Name == c.Name { + attFound = true + } + } + + // constraint not found in offer's attributes + if !attFound { + return 0 + } + + if c.Constraint.Value != nil && !valueConstraint(offer, c) { + // value constraint is not satisfied + return 0 + } else if c.Constraint.Limit != nil { + limitConstraints = append(limitConstraints, c) + limit := limitConstraint(offer, c, limitCounts) + + if numTasksPerOffer > limit && limit >= 0 { + numTasksPerOffer = limit + } + } + } + + // update limitCounts + for _, c := range limitConstraints { + for _, a := range offer.Attributes { + if a.Name == c.Name { + limitCounts[a.Name][a.Text.Value] += numTasksPerOffer + } + } + } + + return numTasksPerOffer +} + +func isDedicated(offer *Offer, role string, constraints []*aurora.Constraint) bool { + // get all dedicated attributes of an offer + dedicatedAtts := map[string]bool{} + for _, a := range offer.Attributes { + if a.Name == "dedicated" { + dedicatedAtts[a.Text.Value] = true + } + } + + if len(dedicatedAtts) == 0 { + return true + } + + // check if constraints are matching dedicated attributes + matched := false + for _, c := range constraints { + if c.Name == "dedicated" && c.Constraint.Value != nil { + found := false + + for _, v := range c.Constraint.Value.Values { + if dedicatedAtts[v] && strings.HasPrefix(v, fmt.Sprintf("%s/", role)) { + found = true + break + } + } + + if found { + matched = true + } else { + return false + } + } + } + + return matched +} + +// valueConstraint checks Value Contraints of task if the are matched by the offer. +// more details can be found here https://aurora.apache.org/documentation/latest/features/constraints/ +func valueConstraint(offer *Offer, constraint *aurora.Constraint) bool { + matched := false + + for _, a := range offer.Attributes { + if a.Name == constraint.Name { + for _, v := range constraint.Constraint.Value.Values { + matched = (a.Text.Value == v && !constraint.Constraint.Value.Negated) || + (a.Text.Value != v && constraint.Constraint.Value.Negated) + + if matched { + break + } + } + + if matched { + break + } + } + } + + return matched +} + +// limitConstraint limits the number of pods on a group which has the same attribute. +// more details can be found here https://aurora.apache.org/documentation/latest/features/constraints/ +func limitConstraint(offer *Offer, constraint *aurora.Constraint, limitCounts map[string]map[string]int64) int64 { + limit := int64(-1) + for _, a := range offer.Attributes { + // limit constraint found + if a.Name == constraint.Name { + curr := limitCounts[a.Name][a.Text.Value] + currLimit := int64(constraint.Constraint.Limit.Limit) + + if curr >= currLimit { + return 0 + } + + if currLimit-curr < limit || limit < 0 { + limit = currLimit - curr + } + } + } + + return limit +} diff --git a/realis.go b/realis.go index a7c536e..c331918 100644 --- a/realis.go +++ b/realis.go @@ -36,7 +36,7 @@ import ( "github.com/pkg/errors" ) -const VERSION = "2.22.1" +const VERSION = "2.28.0" type Client struct { config *clientConfig @@ -147,6 +147,8 @@ func NewClient(options ...ClientOption) (*Client, error) { return nil, errors.New("incomplete Options -- url, cluster.json, or Zookeeper address required") } + config.url = url + url, err = validateAuroraAddress(url) if err != nil { return nil, errors.Wrap(err, "unable to create realis object, invalid url") @@ -313,11 +315,13 @@ func (c *Client) GetInstanceIds(key aurora.JobKey, states []aurora.ScheduleStatu Statuses: states, } - c.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", taskQ) + c.logger.DebugPrintf("GetInstanceIds Thrift Payload: %+v\n", taskQ) resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.GetTasksWithoutConfigs(context.TODO(), taskQ) - }) + }, + nil, + ) // If we encountered an error we couldn't recover from by retrying, return an error to the user if retryErr != nil { @@ -339,8 +343,13 @@ func (c *Client) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (* resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.readonlyClient.GetJobUpdateSummaries(context.TODO(), jobUpdateQuery) - }) + }, + nil, + ) + if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetGetJobUpdateSummariesResult_() == nil { + return nil, errors.New("unexpected response from scheduler") + } if retryErr != nil { return nil, errors.Wrap(retryErr, "error getting job update summaries from Aurora Scheduler") } @@ -352,8 +361,12 @@ func (c *Client) GetJobSummary(role string) (*aurora.JobSummaryResult_, error) { resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.readonlyClient.GetJobSummary(context.TODO(), role) - }) - + }, + nil, + ) + if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetJobSummaryResult_() == nil { + return nil, errors.New("unexpected response from scheduler") + } if retryErr != nil { return nil, errors.Wrap(retryErr, "error getting job summaries from Aurora Scheduler") } @@ -363,21 +376,20 @@ func (c *Client) GetJobSummary(role string) (*aurora.JobSummaryResult_, error) { func (c *Client) GetJobs(role string) (*aurora.GetJobsResult_, error) { - var result *aurora.GetJobsResult_ - resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.readonlyClient.GetJobs(context.TODO(), role) - }) + }, + nil, + ) if retryErr != nil { - return result, errors.Wrap(retryErr, "error getting Jobs from Aurora Scheduler") + return nil, errors.Wrap(retryErr, "error getting Jobs from Aurora Scheduler") + } + if resp == nil || resp.GetResult_() == nil { + return nil, errors.New("unexpected response from scheduler") } - if resp.GetResult_() != nil { - result = resp.GetResult_().GetJobsResult_ - } - - return result, nil + return resp.GetResult_().GetJobsResult_, nil } // Kill specific instances of a job. Returns true, nil if a task was actually killed as a result of this API call. @@ -387,19 +399,19 @@ func (c *Client) KillInstances(key aurora.JobKey, instances ...int32) (bool, err resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.KillTasks(context.TODO(), &key, instances, "") - }) + }, + nil, + ) if retryErr != nil { return false, errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler") } - if len(resp.GetDetails()) > 0 { + if resp == nil || len(resp.GetDetails()) > 0 { c.logger.Println("KillTasks was called but no tasks killed as a result.") return false, nil - } else { - return true, nil } - + return true, nil } func (c *Client) RealisConfig() *clientConfig { @@ -414,7 +426,9 @@ func (c *Client) KillJob(key aurora.JobKey) error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { // Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards return c.client.KillTasks(context.TODO(), &key, nil, "") - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler") @@ -436,9 +450,27 @@ func (c *Client) CreateJob(auroraJob *AuroraJob) error { return errors.Wrap(err, "unable to create Thermos payload") } - _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { - return c.client.CreateJob(context.TODO(), auroraJob.JobConfig()) - }) + // Response is checked by the thrift retry code + _, retryErr := c.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return c.client.CreateJob(context.TODO(), auroraJob.JobConfig()) + }, + // On a client timeout, attempt to verify that payload made to the Scheduler by + // trying to get the config summary for the job key + func() (*aurora.Response, bool) { + exists, err := c.JobExists(auroraJob.JobKey()) + if err != nil { + c.logger.Print("verification failed ", err) + } + + if exists { + return &aurora.Response{ResponseCode: aurora.ResponseCode_OK}, true + } + + return nil, false + }, + ) if retryErr != nil { return errors.Wrap(retryErr, "error sending Create command to Aurora Scheduler") @@ -469,7 +501,9 @@ func (c *Client) ScheduleCronJob(auroraJob *AuroraJob) error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.ScheduleCronJob(context.TODO(), auroraJob.JobConfig()) - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "error sending Cron AuroraJob Schedule message to Aurora Scheduler") @@ -483,7 +517,9 @@ func (c *Client) DescheduleCronJob(key aurora.JobKey) error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.DescheduleCronJob(context.TODO(), &key) - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "error sending Cron AuroraJob De-schedule message to Aurora Scheduler") @@ -499,7 +535,9 @@ func (c *Client) StartCronJob(key aurora.JobKey) error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.StartCronJob(context.TODO(), &key) - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "error sending Start Cron AuroraJob message to Aurora Scheduler") @@ -514,7 +552,9 @@ func (c *Client) RestartInstances(key aurora.JobKey, instances ...int32) error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.RestartShards(context.TODO(), &key, instances) - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler") @@ -535,16 +575,17 @@ func (c *Client) RestartJob(key aurora.JobKey) error { if len(instanceIds) > 0 { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.RestartShards(context.TODO(), &key, instanceIds) - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler") } return nil - } else { - return errors.New("no tasks in the Active state") } + return errors.New("no tasks in the Active state") } // Update all tasks under a job configuration. Currently gorealis doesn't support for canary deployments. @@ -556,34 +597,80 @@ func (c *Client) StartJobUpdate(updateJob *JobUpdate, message string) (*aurora.S c.logger.DebugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message) - resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { - return c.client.StartJobUpdate(nil, updateJob.request, message) - }) + resp, retryErr := c.thriftCallWithRetries(false, + func() (*aurora.Response, error) { + return c.client.StartJobUpdate(context.TODO(), updateJob.request, message) + }, + func() (*aurora.Response, bool) { + key := updateJob.JobKey() + summariesResp, err := c.readonlyClient.GetJobUpdateSummaries( + context.TODO(), + &aurora.JobUpdateQuery{ + JobKey: &key, + UpdateStatuses: aurora.ACTIVE_JOB_UPDATE_STATES, + Limit: 1, + }) + + if err != nil { + c.logger.Print("verification failed ", err) + return nil, false + } + + summaries := response.JobUpdateSummaries(summariesResp) + if len(summaries) == 0 { + return nil, false + } + + return &aurora.Response{ + ResponseCode: aurora.ResponseCode_OK, + Result_: &aurora.Result_{ + StartJobUpdateResult_: &aurora.StartJobUpdateResult_{ + UpdateSummary: summaries[0], + Key: summaries[0].Key, + }, + }, + }, true + }, + ) if retryErr != nil { + // A timeout took place when attempting this call, attempt to recover + if IsTimeout(retryErr) { + return nil, retryErr + } + return nil, errors.Wrap(retryErr, "error sending StartJobUpdate command to Aurora Scheduler") } - - if resp.GetResult_() != nil && resp.GetResult_().GetStartJobUpdateResult_() != nil { - return resp.GetResult_().GetStartJobUpdateResult_(), nil + if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetStartJobUpdateResult_() == nil { + return nil, errors.New("unexpected response from scheduler") } - - return nil, errors.New("thrift error: Field in response is nil unexpectedly.") + return resp.GetResult_().GetStartJobUpdateResult_(), nil } -// Abort AuroraJob Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI. +// AbortJobUpdate terminates a job update in the scheduler. +// It requires the updateId which can be obtained on the Aurora web UI. +// This API is meant to be synchronous. It will attempt to wait until the update transitions to the aborted state. +// However, if the job update does not transition to the ABORT state an error will be returned. func (c *Client) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) error { c.logger.DebugPrintf("AbortJobUpdate Thrift Payload: %+v %v\n", updateKey, message) _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.AbortJobUpdate(context.TODO(), &updateKey, message) - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "error sending AbortJobUpdate command to Aurora Scheduler") } - return nil + // Make this call synchronous by blocking until it job has successfully transitioned to aborted + _, err := c.MonitorJobUpdateStatus( + updateKey, + []aurora.JobUpdateStatus{aurora.JobUpdateStatus_ABORTED}, + time.Second*5, + time.Minute) + return err } // Pause AuroraJob Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. @@ -603,7 +690,9 @@ func (c *Client) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.PauseJobUpdate(nil, updateKeyLocal, message) - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "error sending PauseJobUpdate command to Aurora Scheduler") @@ -630,7 +719,9 @@ func (c *Client) ResumeJobUpdate(updateKey aurora.JobUpdateKey, message string) _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.ResumeJobUpdate(context.TODO(), &updateKey, message) - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "error sending ResumeJobUpdate command to Aurora Scheduler") @@ -651,18 +742,19 @@ func (c *Client) PulseJobUpdate(updateKey aurora.JobUpdateKey) (aurora.JobUpdate resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.PulseJobUpdate(context.TODO(), &updateKey) - }) + }, + nil, + ) if retryErr != nil { return aurora.JobUpdatePulseStatus(0), errors.Wrap(retryErr, "error sending PulseJobUpdate command to Aurora Scheduler") } - if resp.GetResult_() != nil && resp.GetResult_().GetPulseJobUpdateResult_() != nil { - return resp.GetResult_().GetPulseJobUpdateResult_().GetStatus(), nil - } else { - return aurora.JobUpdatePulseStatus(0), errors.New("thrift error, field was nil unexpectedly") + if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetPulseJobUpdateResult_() == nil { + return aurora.JobUpdatePulseStatus(0), errors.New("unexpected response from scheduler") } + return resp.GetResult_().GetPulseJobUpdateResult_().GetStatus(), nil } // Scale up the number of instances under a job configuration using the configuration for specific @@ -679,7 +771,9 @@ func (c *Client) AddInstances(instKey aurora.InstanceKey, count int32) error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.AddInstances(context.TODO(), &instKey, count) - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "error sending AddInstances command to Aurora Scheduler") @@ -724,11 +818,16 @@ func (c *Client) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.GetTasksStatus(context.TODO(), query) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status") } + if resp == nil { + return nil, errors.New("unexpected response from scheduler") + } return response.ScheduleStatusResult(resp).GetTasks(), nil } @@ -740,29 +839,32 @@ func (c *Client) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.PendingRea resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.GetPendingReason(context.TODO(), query) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for pending Reasons") } - var result []*aurora.PendingReason - - if resp.GetResult_() != nil { - result = resp.GetResult_().GetGetPendingReasonResult_().GetReasons() + if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetGetPendingReasonResult_() == nil { + return nil, errors.New("unexpected response from scheduler") } - return result, nil + return resp.GetResult_().GetGetPendingReasonResult_().GetReasons(), nil } -// Get information about task including without a task configuration object +// GetTasksWithoutConfigs gets information about task including without a task configuration object. +// This is a more lightweight version of GetTaskStatus but contains less information as a result. func (c *Client) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) { c.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query) resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.GetTasksWithoutConfigs(context.TODO(), query) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status without configs") @@ -789,7 +891,9 @@ func (c *Client) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.GetTasksStatus(context.TODO(), taskQ) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task configuration") @@ -815,17 +919,19 @@ func (c *Client) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) ([]*aurora. resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.GetJobUpdateDetails(context.TODO(), &updateQuery) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "unable to get job update details") } - if resp.GetResult_() != nil && resp.GetResult_().GetGetJobUpdateDetailsResult_() != nil { - return resp.GetResult_().GetGetJobUpdateDetailsResult_().GetDetailsList(), nil - } else { - return nil, errors.New("unknown Thrift error, field is nil.") + if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetGetJobUpdateDetailsResult_() == nil { + return nil, errors.New("unexpected response from scheduler") } + + return resp.GetResult_().GetGetJobUpdateDetailsResult_().GetDetailsList(), nil } func (c *Client) RollbackJobUpdate(key aurora.JobUpdateKey, message string) error { @@ -834,10 +940,16 @@ func (c *Client) RollbackJobUpdate(key aurora.JobUpdateKey, message string) erro _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.RollbackJobUpdate(context.TODO(), &key, message) - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "unable to roll back job update") } return nil } + +func (c *Client) GetSchedulerURL() string { + return c.config.url +} diff --git a/realis_admin.go b/realis_admin.go index 9530031..f100f46 100644 --- a/realis_admin.go +++ b/realis_admin.go @@ -37,17 +37,19 @@ func (c *Client) DrainHosts(hosts ...string) ([]*aurora.HostStatus, error) { resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.DrainHosts(context.TODO(), drainList) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "unable to recover connection") } - if resp.GetResult_() != nil && resp.GetResult_().GetDrainHostsResult_() != nil { - return resp.GetResult_().GetDrainHostsResult_().GetStatuses(), nil - } else { - return nil, errors.New("thrift error: Field in response is nil unexpectedly.") + if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetDrainHostsResult_() == nil { + return nil, errors.New("unexpected response from scheduler") } + + return resp.GetResult_().GetDrainHostsResult_().GetStatuses(), nil } // Start SLA Aware Drain. @@ -59,6 +61,18 @@ func (c *Client) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, hosts .. return nil, errors.New("no hosts provided to drain") } + if policy == nil || policy.CountSetFieldsSlaPolicy() == 0 { + policy = &defaultSlaPolicy + c.logger.Printf("Warning: start draining with default sla policy %v", policy) + } + + if timeout < 0 { + c.logger.Printf("Warning: timeout %d secs is invalid, draining with default timeout %d secs", + timeout, + defaultSlaDrainTimeoutSecs) + timeout = defaultSlaDrainTimeoutSecs + } + drainList := aurora.NewHosts() drainList.HostNames = hosts @@ -66,17 +80,19 @@ func (c *Client) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, hosts .. resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.SlaDrainHosts(context.TODO(), drainList, policy, timeout) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "unable to recover connection") } - if resp.GetResult_() != nil && resp.GetResult_().GetDrainHostsResult_() != nil { - return resp.GetResult_().GetDrainHostsResult_().GetStatuses(), nil - } else { - return nil, errors.New("thrift error: Field in response is nil unexpectedly.") + if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetDrainHostsResult_() == nil { + return nil, errors.New("unexpected response from scheduler") } + + return resp.GetResult_().GetDrainHostsResult_().GetStatuses(), nil } func (c *Client) StartMaintenance(hosts ...string) ([]*aurora.HostStatus, error) { @@ -92,17 +108,19 @@ func (c *Client) StartMaintenance(hosts ...string) ([]*aurora.HostStatus, error) resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.StartMaintenance(context.TODO(), hostList) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "unable to recover connection") } - if resp.GetResult_() != nil && resp.GetResult_().GetStartMaintenanceResult_() != nil { - return resp.GetResult_().GetStartMaintenanceResult_().GetStatuses(), nil - } else { - return nil, errors.New("thrift error: Field in response is nil unexpectedly.") + if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetStartMaintenanceResult_() == nil { + return nil, errors.New("unexpected response from scheduler") } + + return resp.GetResult_().GetStartMaintenanceResult_().GetStatuses(), nil } func (c *Client) EndMaintenance(hosts ...string) ([]*aurora.HostStatus, error) { @@ -118,24 +136,20 @@ func (c *Client) EndMaintenance(hosts ...string) ([]*aurora.HostStatus, error) { resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.EndMaintenance(context.TODO(), hostList) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "unable to recover connection") } - - if resp.GetResult_() != nil && resp.GetResult_().GetEndMaintenanceResult_() != nil { - return resp.GetResult_().GetEndMaintenanceResult_().GetStatuses(), nil - } else { - return nil, errors.New("thrift error: Field in response is nil unexpectedly.") + if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetEndMaintenanceResult_() == nil { + return nil, errors.New("unexpected response from scheduler") } - + return resp.GetResult_().GetEndMaintenanceResult_().GetStatuses(), nil } func (c *Client) MaintenanceStatus(hosts ...string) (*aurora.MaintenanceStatusResult_, error) { - - var result *aurora.MaintenanceStatusResult_ - if len(hosts) == 0 { return nil, errors.New("no hosts provided to get maintenance status from") } @@ -149,17 +163,18 @@ func (c *Client) MaintenanceStatus(hosts ...string) (*aurora.MaintenanceStatusRe // and continue trying to resend command until we run out of retries. resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.MaintenanceStatus(context.TODO(), hostList) - }) + }, + nil, + ) if retryErr != nil { - return result, errors.Wrap(retryErr, "unable to recover connection") + return nil, errors.Wrap(retryErr, "unable to recover connection") + } + if resp == nil || resp.GetResult_() == nil { + return nil, errors.New("unexpected response from scheduler") } - if resp.GetResult_() != nil { - result = resp.GetResult_().GetMaintenanceStatusResult_() - } - - return result, nil + return resp.GetResult_().GetMaintenanceStatusResult_(), nil } // SetQuota sets a quota aggregate for the given role @@ -177,7 +192,9 @@ func (c *Client) SetQuota(role string, cpu *float64, ramMb *int64, diskMb *int64 _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.SetQuota(context.TODO(), role, quota) - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "unable to set role quota") @@ -191,17 +208,18 @@ func (c *Client) GetQuota(role string) (*aurora.GetQuotaResult_, error) { resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.GetQuota(context.TODO(), role) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "unable to get role quota") } - if resp.GetResult_() != nil { - return resp.GetResult_().GetGetQuotaResult_(), nil - } else { - return nil, errors.New("thrift error: Field in response is nil unexpectedly.") + if resp == nil || resp.GetResult_() == nil { + return nil, errors.New("unexpected response from scheduler") } + return resp.GetResult_().GetGetQuotaResult_(), nil } // Force Aurora Scheduler to perform a snapshot and write to Mesos log @@ -209,7 +227,9 @@ func (c *Client) Snapshot() error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.Snapshot(context.TODO()) - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "unable to recover connection") @@ -223,7 +243,9 @@ func (c *Client) PerformBackup() error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.PerformBackup(context.TODO()) - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "unable to recover connection") @@ -237,7 +259,9 @@ func (c *Client) ForceImplicitTaskReconciliation() error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.TriggerImplicitTaskReconciliation(context.TODO()) - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "unable to recover connection") @@ -258,7 +282,9 @@ func (c *Client) ForceExplicitTaskReconciliation(batchSize *int32) error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.TriggerExplicitTaskReconciliation(context.TODO(), settings) - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "unable to recover connection") diff --git a/realis_config.go b/realis_config.go index 7bb09fc..4b2b96f 100644 --- a/realis_config.go +++ b/realis_config.go @@ -19,6 +19,7 @@ import ( "time" "github.com/apache/thrift/lib/go/thrift" + "github.com/aurora-scheduler/gorealis/v2/gen-go/apache/aurora" ) type clientConfig struct { @@ -48,6 +49,15 @@ var defaultBackoff = Backoff{ Jitter: 0.1, } +var defaultSlaPolicy = aurora.SlaPolicy{ + PercentageSlaPolicy: &aurora.PercentageSlaPolicy{ + Percentage: 66, + DurationSecs: 300, + }, +} + +const defaultSlaDrainTimeoutSecs = 900 + type TransportProtocol int const ( diff --git a/realis_e2e_test.go b/realis_e2e_test.go index 63d9d03..5893491 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -94,7 +94,7 @@ func TestBadCredentials(t *testing.T) { job := realis.NewJob(). Environment("prod"). Role("vagrant"). - Name("create_thermos_job_test"). + Name("create_thermos_job_bad_creds_test"). ThermosExecutor(thermosExec). CPU(.5). RAM(64). @@ -180,6 +180,35 @@ func TestLeaderFromZK(t *testing.T) { assert.Equal(t, "http://192.168.33.7:8081", url) } + +func TestMasterFromZK(t *testing.T) { + cluster := realis.GetDefaultClusterFromZKUrl("192.168.33.2:2181") + masterNodesMap, err := realis.MasterNodesFromZK(*cluster) + + assert.NoError(t, err) + + for _, hostnames := range masterNodesMap { + for _, hostname := range hostnames { + assert.NoError(t, err) + assert.Equal(t, "192.168.33.7", hostname) + } + } +} + +func TestMesosMasterFromZK(t *testing.T) { + cluster := realis.GetDefaultClusterFromZKUrl("192.168.33.2:2181") + masterNodesMap, err := realis.MesosMasterNodesFromZK(*cluster) + + assert.NoError(t, err) + + for _, hostnames := range masterNodesMap { + for _, hostname := range hostnames { + assert.NoError(t, err) + assert.Equal(t, "localhost", hostname) + } + } +} + func TestInvalidAuroraURL(t *testing.T) { for _, url := range []string{ "http://doesntexist.com:8081/apitest", @@ -209,7 +238,6 @@ func TestValidAuroraURL(t *testing.T) { } func TestRealisClient_ReestablishConn(t *testing.T) { - // Test that we're able to tear down the old connection and create a new one. err := r.ReestablishConn() @@ -220,11 +248,9 @@ func TestGetCACerts(t *testing.T) { certs, err := realis.GetCerts("./examples/certs") assert.NoError(t, err) assert.Equal(t, len(certs.Subjects()), 2) - } func TestRealisClient_CreateJob_Thermos(t *testing.T) { - role := "vagrant" job := realis.NewJob(). Environment("prod"). @@ -251,7 +277,7 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) { // Fetch all Jobs result, err := r.GetJobs(role) - fmt.Printf("GetJobs length: %+v \n", len(result.Configs)) + fmt.Println("GetJobs length: ", len(result.Configs)) assert.Len(t, result.Configs, 1) assert.NoError(t, err) @@ -272,7 +298,7 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) { err := r.KillJob(job.JobKey()) assert.NoError(t, err) - success, err := r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 60*time.Second) + success, err := r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 90*time.Second) assert.True(t, success) assert.NoError(t, err) }) @@ -280,7 +306,6 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) { // Test configuring an executor that doesn't exist for CreateJob API func TestRealisClient_CreateJob_ExecutorDoesNotExist(t *testing.T) { - // Create a single job job := realis.NewJob(). Environment("prod"). @@ -299,7 +324,6 @@ func TestRealisClient_CreateJob_ExecutorDoesNotExist(t *testing.T) { // Test configuring an executor that doesn't exist for CreateJob API func TestRealisClient_GetPendingReason(t *testing.T) { - env := "prod" role := "vagrant" name := "pending_reason_test" @@ -330,10 +354,13 @@ func TestRealisClient_GetPendingReason(t *testing.T) { err = r.KillJob(job.JobKey()) assert.NoError(t, err) + + success, err := r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 90*time.Second) + assert.True(t, success) + assert.NoError(t, err) } func TestRealisClient_CreateService_WithPulse_Thermos(t *testing.T) { - fmt.Println("Creating service") role := "vagrant" job := realis.NewJobUpdate(). @@ -416,6 +443,10 @@ pulseLoop: err = r.KillJob(job.JobKey()) assert.NoError(t, err) + + success, err := r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 90*time.Second) + assert.True(t, success) + assert.NoError(t, err) } // Test configuring an executor that doesn't exist for CreateJob API @@ -446,13 +477,15 @@ func TestRealisClient_CreateService(t *testing.T) { var ok bool var mErr error - if ok, mErr = r.MonitorJobUpdate(*result.GetKey(), 5*time.Second, 4*time.Minute); !ok || mErr != nil { - // Update may already be in a terminal state so don't check for error - err := r.AbortJobUpdate(*result.GetKey(), "Monitor timed out.") + if result != nil { + if ok, mErr = r.MonitorJobUpdate(*result.GetKey(), 5*time.Second, 4*time.Minute); !ok || mErr != nil { + // Update may already be in a terminal state so don't check for error + err := r.AbortJobUpdate(*result.GetKey(), "Monitor timed out.") - err = r.KillJob(job.JobKey()) + err = r.KillJob(job.JobKey()) - assert.NoError(t, err) + assert.NoError(t, err) + } } assert.True(t, ok) @@ -460,7 +493,10 @@ func TestRealisClient_CreateService(t *testing.T) { // Kill task test task after confirming it came up fine err = r.KillJob(job.JobKey()) + assert.NoError(t, err) + success, err := r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 90*time.Second) + assert.True(t, success) assert.NoError(t, err) } @@ -519,10 +555,17 @@ func TestRealisClient_ScheduleCronJob_Thermos(t *testing.T) { t.Run("TestRealisClient_DeschedulerCronJob_Thermos", func(t *testing.T) { err := r.DescheduleCronJob(job.JobKey()) assert.NoError(t, err) + + err = r.KillJob(job.JobKey()) + assert.NoError(t, err) + + success, err := r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 90*time.Second) + assert.True(t, success) + assert.NoError(t, err) }) } func TestRealisClient_StartMaintenance(t *testing.T) { - hosts := []string{"localhost"} + hosts := []string{"agent-one"} _, err := r.StartMaintenance(hosts...) assert.NoError(t, err) @@ -532,7 +575,7 @@ func TestRealisClient_StartMaintenance(t *testing.T) { []aurora.MaintenanceMode{aurora.MaintenanceMode_SCHEDULED}, 1*time.Second, 50*time.Second) - assert.Equal(t, map[string]bool{"localhost": true}, hostResults) + assert.Equal(t, map[string]bool{"agent-one": true}, hostResults) assert.NoError(t, err) _, err = r.EndMaintenance(hosts...) @@ -548,7 +591,7 @@ func TestRealisClient_StartMaintenance(t *testing.T) { } func TestRealisClient_DrainHosts(t *testing.T) { - hosts := []string{"localhost"} + hosts := []string{"agent-one"} _, err := r.DrainHosts(hosts...) assert.NoError(t, err) @@ -558,7 +601,7 @@ func TestRealisClient_DrainHosts(t *testing.T) { []aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING}, 1*time.Second, 50*time.Second) - assert.Equal(t, map[string]bool{"localhost": true}, hostResults) + assert.Equal(t, map[string]bool{"agent-one": true}, hostResults) assert.NoError(t, err) t.Run("TestRealisClient_MonitorNontransitioned", func(t *testing.T) { @@ -571,7 +614,7 @@ func TestRealisClient_DrainHosts(t *testing.T) { // Assert monitor returned an error that was not nil, and also a list of the non-transitioned hosts assert.Error(t, err) - assert.Equal(t, map[string]bool{"localhost": true, "IMAGINARY_HOST": false}, hostResults) + assert.Equal(t, map[string]bool{"agent-one": true, "IMAGINARY_HOST": false}, hostResults) }) t.Run("TestRealisClient_EndMaintenance", func(t *testing.T) { @@ -590,7 +633,7 @@ func TestRealisClient_DrainHosts(t *testing.T) { } func TestRealisClient_SLADrainHosts(t *testing.T) { - hosts := []string{"localhost"} + hosts := []string{"agent-one"} policy := aurora.SlaPolicy{PercentageSlaPolicy: &aurora.PercentageSlaPolicy{Percentage: 50.0}} _, err := r.SLADrainHosts(&policy, 30, hosts...) @@ -605,7 +648,7 @@ func TestRealisClient_SLADrainHosts(t *testing.T) { []aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING}, 1*time.Second, 50*time.Second) - assert.Equal(t, map[string]bool{"localhost": true}, hostResults) + assert.Equal(t, map[string]bool{"agent-one": true}, hostResults) assert.NoError(t, err) _, err = r.EndMaintenance(hosts...) @@ -618,6 +661,39 @@ func TestRealisClient_SLADrainHosts(t *testing.T) { 5*time.Second, 10*time.Second) assert.NoError(t, err) + + // slaDrainHosts goes with default policy if no policy is specified + _, err = r.SLADrainHosts(nil, 30, hosts...) + if err != nil { + fmt.Printf("error: %+v\n", err.Error()) + os.Exit(1) + } + hostResults, err = r.MonitorHostMaintenance( + hosts, + []aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING}, + 1*time.Second, + 50*time.Second) + assert.Equal(t, map[string]bool{"agent-one": true}, hostResults) + assert.NoError(t, err) + + _, err = r.EndMaintenance(hosts...) + assert.NoError(t, err) + + _, err = r.SLADrainHosts(&aurora.SlaPolicy{}, 30, hosts...) + if err != nil { + fmt.Printf("error: %+v\n", err.Error()) + os.Exit(1) + } + hostResults, err = r.MonitorHostMaintenance( + hosts, + []aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING}, + 1*time.Second, + 50*time.Second) + assert.Equal(t, map[string]bool{"agent-one": true}, hostResults) + assert.NoError(t, err) + + _, err = r.EndMaintenance(hosts...) + assert.NoError(t, err) } // Test multiple go routines using a single connection @@ -654,6 +730,9 @@ func TestRealisClient_SessionThreadSafety(t *testing.T) { err = r.KillJob(job.JobKey()) assert.NoError(t, err) + success, err = r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 90*time.Second) + assert.True(t, success) + assert.NoError(t, err) }() } @@ -740,6 +819,12 @@ func TestRealisClient_PartitionPolicy(t *testing.T) { assert.NoError(t, err) } + err = r.KillJob(job.JobKey()) + assert.NoError(t, err) + + success, err := r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 90*time.Second) + assert.True(t, success) + assert.NoError(t, err) } func TestRealisClient_UpdateStrategies(t *testing.T) { @@ -804,6 +889,10 @@ func TestRealisClient_UpdateStrategies(t *testing.T) { assert.NoError(t, r.AbortJobUpdate(key, "Monitor timed out.")) } assert.NoError(t, r.KillJob(strategy.jobUpdate.JobKey())) + + success, err := r.MonitorInstances(strategy.jobUpdate.JobKey(), 0, 1*time.Second, 90*time.Second) + assert.True(t, success) + assert.NoError(t, err) }) } } @@ -813,18 +902,15 @@ func TestRealisClient_BatchAwareAutoPause(t *testing.T) { job := realis.NewJob(). Environment("prod"). Role("vagrant"). - Name("BatchAwareAutoPauseTest"). + Name("batch_aware_auto_pause_test"). ThermosExecutor(thermosExec). CPU(.01). RAM(4). Disk(10). InstanceCount(6). - IsService(true). - Production(false). - Tier("preemptible"). - Priority(0) + IsService(true) - updateGroups := []int32{1, 2, 3} + updateGroups := []int32{1, 3} strategy := realis.JobUpdateFromAuroraTask(job.AuroraTask()). VariableBatchStrategy(true, updateGroups...). InstanceCount(6). @@ -837,22 +923,32 @@ func TestRealisClient_BatchAwareAutoPause(t *testing.T) { key := *result.GetKey() for i := range updateGroups { - curStep, mErr := r.MonitorAutoPausedUpdate(key, time.Second*5, time.Second*240) + curStep, mErr := r.MonitorAutoPausedUpdate(key, time.Second*5, time.Minute*5) if mErr != nil { + fmt.Println(mErr) // Update may already be in a terminal state so don't check for error - assert.NoError(t, r.AbortJobUpdate(key, "Monitor timed out.")) + _ = r.AbortJobUpdate(key, "Monitor timed out.") } assert.Equal(t, i, curStep) - require.NoError(t, r.ResumeJobUpdate(key, "auto resuming test")) + + if i != len(updateGroups)-1 { + require.NoError(t, err) + require.NoError(t, r.ResumeJobUpdate(key, "auto resuming test")) + } } + assert.NoError(t, r.AbortJobUpdate(key, "")) assert.NoError(t, r.KillJob(strategy.JobKey())) + + success, err := r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 90*time.Second) + assert.True(t, success) + assert.NoError(t, err) } func TestRealisClient_GetJobSummary(t *testing.T) { role := "vagrant" env := "prod" - name := "GetJobSummaryJob" + name := "test_get_job_summary" // Create a single job job := realis.NewJob(). Environment(env). @@ -863,14 +959,10 @@ func TestRealisClient_GetJobSummary(t *testing.T) { RAM(4). Disk(10). InstanceCount(3). - WatchTime(20 * time.Second). IsService(true). Production(false). Tier("preemptible"). - Priority(0). - BatchSize(2) - - result, err := r.CreateService(job) + Priority(0) err := r.CreateJob(job) assert.NoError(t, err) @@ -898,4 +990,516 @@ func TestRealisClient_GetJobSummary(t *testing.T) { err = r.KillJob(job.JobKey()) assert.NoError(t, err) + + success, err = r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 90*time.Second) + assert.True(t, success) + assert.NoError(t, err) +} + +func TestRealisClient_Offers(t *testing.T) { + var offers []realis.Offer + + // since offers are being recycled, it take a few tries to get all of them. + i := 0 + for ; len(offers) < 3 && i < 5; i++ { + offers, _ = r.Offers() + time.Sleep(5 * time.Second) + } + + assert.NotEqual(t, i, 5) +} + +func TestRealisClient_MaintenanceHosts(t *testing.T) { + offers, err := r.Offers() + assert.NoError(t, err) + + for i := 0; i < len(offers); i++ { + _, err := r.DrainHosts(offers[i].Hostname) + assert.NoError(t, err) + + hosts, err := r.MaintenanceHosts() + assert.Equal(t, i+1, len(hosts)) + } + + // clean up + for i := 0; i < len(offers); i++ { + _, err := r.EndMaintenance(offers[i].Hostname) + assert.NoError(t, err) + + // Monitor change to DRAINING and DRAINED mode + _, err = r.MonitorHostMaintenance( + []string{offers[i].Hostname}, + []aurora.MaintenanceMode{aurora.MaintenanceMode_NONE}, + 5*time.Second, + 10*time.Second) + assert.NoError(t, err) + } +} + +func TestRealisClient_AvailOfferReport(t *testing.T) { + var offers []realis.Offer + + i := 0 + for ; len(offers) < 3 && i < 5; i++ { + offers, _ = r.Offers() + time.Sleep(5 * time.Second) + } + + assert.NotEqual(t, i, 3) + + capacity, err := r.AvailOfferReport() + assert.NoError(t, err) + + // 2 groups for non-dedicated & dedicated + assert.Equal(t, 2, len(capacity)) + // 4 resources: cpus, disk, mem, ports + assert.Equal(t, 4, len(capacity["non-dedicated"])) +} + +func TestRealisClient_FitTasks(t *testing.T) { + var offers []realis.Offer + + i := 0 + for ; len(offers) < 3 && i < 5; i++ { + offers, _ = r.Offers() + time.Sleep(5 * time.Second) + } + + assert.NotEqual(t, i, 5) + + cpuPerOffer := 0.0 + for _, r := range offers[0].Resources { + if r.Name == "cpus" { + cpuPerOffer = r.Scalar.Value + } + } + + // make sure all offers have no running executor + for _, o := range offers { + assert.Equal(t, o.ExecutorIds[:0], o.ExecutorIds) + } + + validCpu := cpuPerOffer / 2 + inValidCpu := cpuPerOffer + 1 + gpu := int64(1) + + tests := []struct { + message string + role string + request aurora.Resource + constraints []*aurora.Constraint + expected int64 + isError bool + }{ + { + message: "task with gpu request", + role: "vagrant", + request: aurora.Resource{ + NumGpus: &gpu, + }, + expected: 0, + isError: false, + }, + { + message: "empty resource request", + role: "vagrant", + request: aurora.Resource{}, + expected: -1, + isError: true, + }, + { + message: "valid resource request", + role: "vagrant", + request: aurora.Resource{ + NumCpus: &validCpu, + }, + expected: 4, + isError: false, + }, + { + message: "invalid cpu request", + role: "vagrant", + request: aurora.Resource{ + NumCpus: &inValidCpu, + }, + expected: 0, + isError: false, + }, + { + message: "dedicated constraint", + role: "vagrant", + request: aurora.Resource{ + NumCpus: &validCpu, + }, + + constraints: []*aurora.Constraint{ + { + Name: "dedicated", + Constraint: &aurora.TaskConstraint{ + Value: &aurora.ValueConstraint{ + Negated: false, + Values: []string{"vagrant/bar"}, + }, + }, + }, + }, + expected: 2, + isError: false, + }, + { + message: "dedicated constraint with unauthorized role", + role: "unauthorized", + request: aurora.Resource{ + NumCpus: &validCpu, + }, + constraints: []*aurora.Constraint{ + { + Name: "dedicated", + Constraint: &aurora.TaskConstraint{ + Value: &aurora.ValueConstraint{ + Negated: false, + Values: []string{"vagrant/bar"}, + }, + }, + }, + }, + expected: 0, + isError: false, + }, + { + message: "value constraint on zone", + role: "vagrant", + request: aurora.Resource{ + NumCpus: &validCpu, + }, + constraints: []*aurora.Constraint{ + { + Name: "zone", + Constraint: &aurora.TaskConstraint{ + Value: &aurora.ValueConstraint{ + Negated: false, + Values: []string{"west"}, + }, + }, + }, + }, + expected: 4, + isError: false, + }, + { + message: "negative value constraint on zone", + role: "vagrant", + request: aurora.Resource{ + NumCpus: &validCpu, + }, + constraints: []*aurora.Constraint{ + { + Name: "zone", + Constraint: &aurora.TaskConstraint{ + Value: &aurora.ValueConstraint{ + Negated: true, + Values: []string{"west"}, + }, + }, + }, + }, + expected: 0, + isError: false, + }, + { + message: "negative value constraint on host", + role: "vagrant", + request: aurora.Resource{ + NumCpus: &validCpu, + }, + constraints: []*aurora.Constraint{ + { + Name: "host", + Constraint: &aurora.TaskConstraint{ + Value: &aurora.ValueConstraint{ + Negated: true, + Values: []string{"agent-one"}, + }, + }, + }, + }, + expected: 2, + isError: false, + }, + { + message: "value constraint on unavailable zone", + role: "vagrant", + request: aurora.Resource{ + NumCpus: &validCpu, + }, + constraints: []*aurora.Constraint{ + { + Name: "zone", + Constraint: &aurora.TaskConstraint{ + Value: &aurora.ValueConstraint{ + Negated: false, + Values: []string{"east"}, + }, + }, + }, + }, + expected: 0, + isError: false, + }, + { + message: "value constraint on unavailable attribute", + role: "vagrant", + request: aurora.Resource{ + NumCpus: &validCpu, + }, + constraints: []*aurora.Constraint{ + { + Name: "os", + Constraint: &aurora.TaskConstraint{ + Value: &aurora.ValueConstraint{ + Negated: false, + Values: []string{"windows"}, + }, + }, + }, + }, + expected: 0, + isError: false, + }, + { + message: "1 value constraint with 2 values", + role: "vagrant", + request: aurora.Resource{ + NumCpus: &validCpu, + }, + constraints: []*aurora.Constraint{ + { + Name: "host", + Constraint: &aurora.TaskConstraint{ + Value: &aurora.ValueConstraint{ + Negated: false, + Values: []string{"agent-one", "agent-two"}, + }, + }, + }, + }, + expected: 4, + isError: false, + }, + { + message: "2 value constraints", + role: "vagrant", + request: aurora.Resource{ + NumCpus: &validCpu, + }, + constraints: []*aurora.Constraint{ + { + Name: "host", + Constraint: &aurora.TaskConstraint{ + Value: &aurora.ValueConstraint{ + Negated: false, + Values: []string{"agent-one"}, + }, + }, + }, + { + Name: "rack", + Constraint: &aurora.TaskConstraint{ + Value: &aurora.ValueConstraint{ + Negated: false, + Values: []string{"2"}, + }, + }, + }, + }, + expected: 0, + isError: false, + }, + { + message: "limit constraint on host", + role: "vagrant", + request: aurora.Resource{ + NumCpus: &validCpu, + }, + constraints: []*aurora.Constraint{ + { + Name: "host", + Constraint: &aurora.TaskConstraint{ + Limit: &aurora.LimitConstraint{ + Limit: 1, + }, + }, + }, + }, + expected: 2, + isError: false, + }, + { + message: "limit constraint on zone", + role: "vagrant", + request: aurora.Resource{ + NumCpus: &validCpu, + }, + constraints: []*aurora.Constraint{ + { + Name: "zone", + Constraint: &aurora.TaskConstraint{ + Limit: &aurora.LimitConstraint{ + Limit: 1, + }, + }, + }, + }, + expected: 1, + isError: false, + }, + { + message: "limit constraint on zone & host", + role: "vagrant", + request: aurora.Resource{ + NumCpus: &validCpu, + }, + constraints: []*aurora.Constraint{ + { + Name: "host", + Constraint: &aurora.TaskConstraint{ + Limit: &aurora.LimitConstraint{ + Limit: 1, + }, + }, + }, + { + Name: "zone", + Constraint: &aurora.TaskConstraint{ + Limit: &aurora.LimitConstraint{ + Limit: 1, + }, + }, + }, + }, + expected: 1, + isError: false, + }, + { + message: "limit constraint on unavailable zone", + role: "vagrant", + request: aurora.Resource{ + NumCpus: &validCpu, + }, + constraints: []*aurora.Constraint{ + { + Name: "gpu-host", // no host has gpu-host attribute + Constraint: &aurora.TaskConstraint{ + Limit: &aurora.LimitConstraint{ + Limit: 1, + }, + }, + }, + }, + expected: 0, + isError: false, + }, + { + message: "limit & dedicated constraint", + role: "vagrant", + request: aurora.Resource{ + NumCpus: &validCpu, + }, + constraints: []*aurora.Constraint{ + { + Name: "dedicated", + Constraint: &aurora.TaskConstraint{ + Value: &aurora.ValueConstraint{ + Negated: false, + Values: []string{"vagrant/bar"}, + }, + }, + }, + { + Name: "host", + Constraint: &aurora.TaskConstraint{ + Limit: &aurora.LimitConstraint{ + Limit: 1, + }, + }, + }, + }, + expected: 1, + isError: false, + }, + } + + for _, tc := range tests { + task := aurora.NewTaskConfig() + task.Resources = []*aurora.Resource{&tc.request} + task.Constraints = tc.constraints + task.Job = &aurora.JobKey{ + Role: tc.role, + } + + numTasks, err := r.FitTasks(task, offers) + + if !tc.isError { + assert.NoError(t, err) + assert.Equal(t, tc.expected, numTasks, tc.message) + } else { + assert.Error(t, err) + } + } +} + +func TestRealisClient_JobExists(t *testing.T) { + role := "vagrant" + env := "prod" + name := "test_job_exists" + // Create a good single job + job := realis.NewJob(). + Environment(env). + Role(role). + Name(name). + ThermosExecutor(thermosExec). + CPU(.25). + RAM(4). + Disk(10). + InstanceCount(3). + IsService(true). + Production(false). + Tier("preemptible"). + Priority(0) + + // Check if job exists before creating + exists, err := r.JobExists(job.JobKey()) + assert.NoError(t, err) + assert.False(t, exists) + + err = r.CreateJob(job) + assert.NoError(t, err) + + exists, err = r.JobExists(job.JobKey()) + assert.NoError(t, err) + assert.True(t, exists) + + // Create a single bad job + badJob := realis.NewJob(). + Environment("prod"). + Role("vagrant"). + Name("executordoesntexist"). + ExecutorName("idontexist"). + ExecutorData(""). + CPU(.25). + RAM(4). + Disk(10). + InstanceCount(1) + + // Check if job exists before creating + exists, err = r.JobExists(badJob.JobKey()) + assert.NoError(t, err) + assert.False(t, exists) + + err = r.CreateJob(badJob) + assert.Error(t, err) + + exists, err = r.JobExists(badJob.JobKey()) + assert.NoError(t, err) + assert.False(t, exists) } diff --git a/response/response.go b/response/response.go index 1663b1b..15081ec 100644 --- a/response/response.go +++ b/response/response.go @@ -35,6 +35,10 @@ func ScheduleStatusResult(resp *aurora.Response) *aurora.ScheduleStatusResult_ { } func JobUpdateSummaries(resp *aurora.Response) []*aurora.JobUpdateSummary { + if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetGetJobUpdateSummariesResult_() == nil { + return nil + } + return resp.GetResult_().GetGetJobUpdateSummariesResult_().GetUpdateSummaries() } diff --git a/retry.go b/retry.go index 9860e42..f5b0918 100644 --- a/retry.go +++ b/retry.go @@ -17,10 +17,7 @@ package realis import ( "io" "math/rand" - "net/http" "net/url" - "strconv" - "strings" "time" "github.com/apache/thrift/lib/go/thrift" @@ -29,9 +26,11 @@ import ( "github.com/pkg/errors" ) +// Backoff determines how the retry mechanism should react after each failure and how many failures it should +// tolerate. type Backoff struct { Duration time.Duration // the base duration - Factor float64 // Duration is multipled by factor each iteration + Factor float64 // Duration is multiplied by a factor each iteration Jitter float64 // The amount of jitter applied each iteration Steps int // Exit with error after this many steps } @@ -53,18 +52,15 @@ func Jitter(duration time.Duration, maxFactor float64) time.Duration { // if the loop should be aborted. type ConditionFunc func() (done bool, err error) -// Modified version of the Kubernetes exponential-backoff code. -// ExponentialBackoff repeats a condition check with exponential backoff. -// -// It checks the condition up to Steps times, increasing the wait by multiplying -// the previous duration by Factor. +// ExponentialBackoff is a modified version of the Kubernetes exponential-backoff code. +// It repeats a condition check with exponential backoff and checks the condition up to +// Steps times, increasing the wait by multiplying the previous duration by Factor. // // If Jitter is greater than zero, a random amount of each duration is added // (between duration and duration*(1+jitter)). // // If the condition never returns true, ErrWaitTimeout is returned. Errors // do not cause the function to return. - func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc) error { var err error var ok bool @@ -98,10 +94,9 @@ func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc) // If the error is temporary, continue retrying. if !IsTemporary(err) { return err - } else { - // Print out the temporary error we experienced. - logger.Println(err) } + // Print out the temporary error we experienced. + logger.Println(err) } } @@ -112,19 +107,28 @@ func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc) // Provide more information to the user wherever possible if err != nil { return newRetryError(errors.Wrap(err, "ran out of retries"), curStep) - } else { - return newRetryError(errors.New("ran out of retries"), curStep) } + + return newRetryError(errors.New("ran out of retries"), curStep) } type auroraThriftCall func() (resp *aurora.Response, err error) +// verifyOntimeout defines the type of function that will be used to verify whether a Thirft call to the Scheduler +// made it to the scheduler or not. In general, these types of functions will have to interact with the scheduler +// through the very same Thrift API which previously encountered a time-out from the client. +// This means that the functions themselves should be kept to a minimum number of Thrift calls. +// It should also be noted that this is a best effort mechanism and +// is likely to fail for the same reasons that the original call failed. +type verifyOnTimeout func() (*aurora.Response, bool) + // Duplicates the functionality of ExponentialBackoff but is specifically targeted towards ThriftCalls. -func (c *Client) thriftCallWithRetries(returnOnTimeout bool, thriftCall auroraThriftCall) (*aurora.Response, error) { +func (c *Client) thriftCallWithRetries(returnOnTimeout bool, thriftCall auroraThriftCall, + verifyOnTimeout verifyOnTimeout) (*aurora.Response, error) { var resp *aurora.Response var clientErr error var curStep int - var timeouts int + timeouts := 0 backoff := c.config.backoff duration := backoff.Duration @@ -138,7 +142,10 @@ func (c *Client) thriftCallWithRetries(returnOnTimeout bool, thriftCall auroraTh adjusted = Jitter(duration, backoff.Jitter) } - c.logger.Printf("A retryable error occurred during thrift call, backing off for %v before retry %v\n", adjusted, curStep) + c.logger.Printf( + "A retryable error occurred during thrift call, backing off for %v before retry %v", + adjusted, + curStep) time.Sleep(adjusted) duration = time.Duration(float64(duration) * backoff.Factor) @@ -153,105 +160,132 @@ func (c *Client) thriftCallWithRetries(returnOnTimeout bool, thriftCall auroraTh resp, clientErr = thriftCall() - c.logger.TracePrintf("Aurora Thrift Call ended resp: %v clientErr: %v\n", resp, clientErr) + c.logger.TracePrintf("Aurora Thrift Call ended resp: %v clientErr: %v", resp, clientErr) }() // Check if our thrift call is returning an error. This is a retryable event as we don't know // if it was caused by network issues. if clientErr != nil { - // Print out the error to the user - c.logger.Printf("Client Error: %v\n", clientErr) + c.logger.Printf("Client Error: %v", clientErr) - // Determine if error is a temporary URL error by going up the stack - e, ok := clientErr.(thrift.TTransportException) - if ok { - c.logger.DebugPrint("Encountered a transport exception") + temporary, timedout := isConnectionError(clientErr) + if !temporary && c.RealisConfig().failOnPermanentErrors { + return nil, errors.Wrap(clientErr, "permanent connection error") + } - // TODO(rdelvalle): Figure out a better way to obtain the error code as this is a very brittle solution - // 401 Unauthorized means the wrong username and password were provided - if strings.Contains(e.Error(), strconv.Itoa(http.StatusUnauthorized)) { - return nil, errors.Wrap(clientErr, "wrong username or password provided") - } - - e, ok := e.Err().(*url.Error) - if ok { - // EOF error occurs when the server closes the read buffer of the client. This is common - // when the server is overloaded and should be retried. All other errors that are permanent - // will not be retried. - if e.Err != io.EOF && !e.Temporary() && c.RealisConfig().failOnPermanentErrors { - return nil, errors.Wrap(clientErr, "permanent connection error") - } - // Corner case where thrift payload was received by Aurora but connection timedout before Aurora was - // able to reply. In this case we will return whatever response was received and a TimedOut behaving - // error. Users can take special action on a timeout by using IsTimedout and reacting accordingly. - if e.Timeout() { - timeouts++ - c.logger.DebugPrintf( - "Client closed connection (timedout) %d times before server responded,"+ - " consider increasing connection timeout", - timeouts) - if returnOnTimeout { - return resp, - newTimedoutError(errors.New("client connection closed before server answer")) - } - } - } + // There exists a corner case where thrift payload was received by Aurora but + // connection timed out before Aurora was able to reply. + // Users can take special action on a timeout by using IsTimedout and reacting accordingly + // if they have configured the client to return on a timeout. + if timedout && returnOnTimeout { + return resp, newTimedoutError(errors.New("client connection closed before server answer")) } // In the future, reestablish connection should be able to check if it is actually possible // to make a thrift call to Aurora. For now, a reconnect should always lead to a retry. // Ignoring error due to the fact that an error should be retried regardless - _ = c.ReestablishConn() - - } else { - - // If there was no client error, but the response is nil, something went wrong. - // Ideally, we'll never encounter this but we're placing a safeguard here. - if resp == nil { - return nil, errors.New("response from aurora is nil") + reestablishErr := c.ReestablishConn() + if reestablishErr != nil { + c.logger.DebugPrintf("error re-establishing connection ", reestablishErr) } - // Check Response Code from thrift and make a decision to continue retrying or not. - switch responseCode := resp.GetResponseCode(); responseCode { + // If users did not opt for a return on timeout in order to react to a timedout error, + // attempt to verify that the call made it to the scheduler after the connection was re-established. + if timedout { + timeouts++ + c.logger.DebugPrintf( + "Client closed connection %d times before server responded, "+ + "consider increasing connection timeout", + timeouts) - // If the thrift call succeeded, stop retrying - case aurora.ResponseCode_OK: - return resp, nil - - // If the response code is transient, continue retrying - case aurora.ResponseCode_ERROR_TRANSIENT: - c.logger.Println("Aurora replied with Transient error code, retrying") - continue - - // Failure scenarios, these indicate a bad payload or a bad clientConfig. Stop retrying. - case aurora.ResponseCode_INVALID_REQUEST, - aurora.ResponseCode_ERROR, - aurora.ResponseCode_AUTH_FAILED, - aurora.ResponseCode_JOB_UPDATING_ERROR: - c.logger.Printf("Terminal Response Code %v from Aurora, won't retry\n", resp.GetResponseCode().String()) - return resp, errors.New(response.CombineMessage(resp)) - - // The only case that should fall down to here is a WARNING response code. - // It is currently not used as a response in the scheduler so it is unknown how to handle it. - default: - c.logger.DebugPrintf("unhandled response code %v received from Aurora\n", responseCode) - return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String()) + // Allow caller to provide a function which checks if the original call was successful before + // it timed out. + if verifyOnTimeout != nil { + if verifyResp, ok := verifyOnTimeout(); ok { + c.logger.Print("verified that the call went through successfully after a client timeout") + // Response here might be different than the original as it is no longer constructed + // by the scheduler but mimicked. + // This is OK since the scheduler is very unlikely to change responses at this point in its + // development cycle but we must be careful to not return an incorrectly constructed response. + return verifyResp, nil + } + } } + + // Retry the thrift payload + continue } + // If there was no client error, but the response is nil, something went wrong. + // Ideally, we'll never encounter this but we're placing a safeguard here. + if resp == nil { + return nil, errors.New("response from aurora is nil") + } + + // Check Response Code from thrift and make a decision to continue retrying or not. + switch responseCode := resp.GetResponseCode(); responseCode { + + // If the thrift call succeeded, stop retrying + case aurora.ResponseCode_OK: + return resp, nil + + // If the response code is transient, continue retrying + case aurora.ResponseCode_ERROR_TRANSIENT: + c.logger.Println("Aurora replied with Transient error code, retrying") + continue + + // Failure scenarios, these indicate a bad payload or a bad clientConfig. Stop retrying. + case aurora.ResponseCode_INVALID_REQUEST, + aurora.ResponseCode_ERROR, + aurora.ResponseCode_AUTH_FAILED, + aurora.ResponseCode_JOB_UPDATING_ERROR: + c.logger.Printf("Terminal Response Code %v from Aurora, won't retry\n", resp.GetResponseCode().String()) + return resp, errors.New(response.CombineMessage(resp)) + + // The only case that should fall down to here is a WARNING response code. + // It is currently not used as a response in the scheduler so it is unknown how to handle it. + default: + c.logger.DebugPrintf("unhandled response code %v received from Aurora\n", responseCode) + return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String()) + } } - c.logger.DebugPrintf("it took %v retries to complete this operation\n", curStep) - if curStep > 1 { - c.config.logger.Printf("retried this thrift call %d time(s)", curStep) + c.config.logger.Printf("this thrift call was retried %d time(s)", curStep) } // Provide more information to the user wherever possible. if clientErr != nil { return nil, newRetryError(errors.Wrap(clientErr, "ran out of retries, including latest error"), curStep) - } else { - return nil, newRetryError(errors.New("ran out of retries"), curStep) } + + return nil, newRetryError(errors.New("ran out of retries"), curStep) +} + +// isConnectionError processes the error received by the client. +// The return values indicate whether this was determined to be a temporary error +// and whether it was determined to be a timeout error +func isConnectionError(err error) (bool, bool) { + + // Determine if error is a temporary URL error by going up the stack + transportException, ok := err.(thrift.TTransportException) + if !ok { + return false, false + } + + urlError, ok := transportException.Err().(*url.Error) + if !ok { + return false, false + } + + // EOF error occurs when the server closes the read buffer of the client. This is common + // when the server is overloaded and we consider it temporary. + // All other which are not temporary as per the member function Temporary(), + // are considered not temporary (permanent). + if urlError.Err != io.EOF && !urlError.Temporary() { + return false, false + } + + return true, urlError.Timeout() } diff --git a/runTestsMac.sh b/runTestsMac.sh index c54994a..2791ada 100644 --- a/runTestsMac.sh +++ b/runTestsMac.sh @@ -1,4 +1,4 @@ #!/bin/bash # Since we run our docker compose setup in bridge mode to be able to run on MacOS, we have to launch a Docker container within the bridge network in order to avoid any routing issues. -docker run --rm -t -v $(pwd):/go/src/github.com/aurora-scheduler/gorealis --network gorealis_aurora_cluster golang:1.13-stretch go test -v github.com/aurora-scheduler/gorealis $@ +docker run --rm -t -w /gorealis -v $GOPATH/pkg:/go/pkg -v $(pwd):/gorealis --network gorealis_aurora_cluster golang:1.17-buster go test -v github.com/aurora-scheduler/gorealis/v2 $@ diff --git a/util.go b/util.go index 90e3dcf..a822b3f 100644 --- a/util.go +++ b/util.go @@ -40,7 +40,7 @@ func init() { } } -// TerminalJobUpdateStates returns a slice containing all the terminal states an update may end up in. +// TerminalUpdateStates returns a slice containing all the terminal states an update may be in. // This is a function in order to avoid having a slice that can be accidentally mutated. func TerminalUpdateStates() []aurora.JobUpdateStatus { return []aurora.JobUpdateStatus{ @@ -104,3 +104,23 @@ func calculateCurrentBatch(updatingInstances int32, batchSizes []int32) int { } return batchCount } + +func ResourcesToMap(resources []*aurora.Resource) map[string]float64 { + result := map[string]float64{} + + for _, resource := range resources { + if resource.NumCpus != nil { + result["cpus"] += *resource.NumCpus + } else if resource.RamMb != nil { + result["mem"] += float64(*resource.RamMb) + } else if resource.DiskMb != nil { + result["disk"] += float64(*resource.DiskMb) + } else if resource.NamedPort != nil { + result["ports"]++ + } else if resource.NumGpus != nil { + result["gpus"] += float64(*resource.NumGpus) + } + } + + return result +} diff --git a/zk.go b/zk.go index 9d5b659..b1c4ad4 100644 --- a/zk.go +++ b/zk.go @@ -286,3 +286,208 @@ func MesosFromZKOpts(options ...ZKOpt) (string, error) { return mesosURL, nil } + +// Retrieves current Aurora master nodes from ZK. +func MasterNodesFromZK(cluster Cluster) (map[string][]string, error) { + return MasterNodesFromZKOpts(ZKEndpoints(strings.Split(cluster.ZK, ",")...), ZKPath(cluster.SchedZKPath)) +} + +// Retrieves current Mesos master nodes/leader from ZK with a custom configuration. +func MasterNodesFromZKOpts(options ...ZKOpt) (map[string][]string, error) { + result := make(map[string][]string) + + // Load the default configuration for Zookeeper followed by overriding values with those provided by the caller. + config := &zkConfig{backoff: defaultBackoff, timeout: time.Second * 10, logger: NoopLogger{}} + for _, opt := range options { + opt(config) + } + + if len(config.endpoints) == 0 { + return nil, errors.New("no Zookeeper endpoints supplied") + } + + if config.path == "" { + return nil, errors.New("no Zookeeper path supplied") + } + + // Create a closure that allows us to use the ExponentialBackoff function. + retryErr := ExponentialBackoff(config.backoff, config.logger, func() (bool, error) { + + c, _, err := zk.Connect(config.endpoints, config.timeout, func(c *zk.Conn) { c.SetLogger(config.logger) }) + if err != nil { + return false, NewTemporaryError(errors.Wrap(err, "Failed to connect to Zookeeper")) + } + + defer c.Close() + + // Open up descriptor for the ZK path given + children, _, _, err := c.ChildrenW(config.path) + if err != nil { + + // Sentinel error check as there is no other way to check. + if err == zk.ErrInvalidPath { + return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", config.path) + } + + return false, + NewTemporaryError(errors.Wrapf(err, "path %s doesn't exist on Zookeeper ", config.path)) + } + + // Get all the master nodes through all the children in the given path + serviceInst := new(ServiceInstance) + var hosts []string + for _, child := range children { + childPath := config.path + "/" + child + data, _, err := c.Get(childPath) + if err != nil { + if err == zk.ErrInvalidPath { + return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", childPath) + } + + return false, NewTemporaryError(errors.Wrap(err, "error fetching contents of leader")) + } + // Only leader is in json format. Have to parse data differently between member_ and not member_ + if strings.HasPrefix(child, "member_") { + err = json.Unmarshal([]byte(data), &serviceInst) + if err != nil { + return false, + NewTemporaryError(errors.Wrap(err, "unable to unmarshal contents of leader")) + } + // Should only be one endpoint. + // This should never be encountered as it would indicate Aurora + // writing bad info into Zookeeper but is kept here as a safety net. + if len(serviceInst.AdditionalEndpoints) > 1 { + return false, + NewTemporaryError( + errors.New("ambiguous endpoints in json blob, Aurora wrote bad info to ZK")) + } + + for _, v := range serviceInst.AdditionalEndpoints { + result["leader"] = append(result["leader"], v.Host) + } + } else { + // data is not in a json format + hosts = append(hosts, string(data)) + } + } + result["masterNodes"] = hosts + + // Master nodes data might not be available yet, try to fetch again. + if len(result["masterNodes"]) == 0 { + return false, NewTemporaryError(errors.New("no master nodes found")) + } + return true, nil + }) + + if retryErr != nil { + config.logger.Printf("Failed to get master nodes after %v attempts", config.backoff.Steps) + return nil, retryErr + } + + return result, nil +} + +// Retrieves current Mesos Aurora master nodes from ZK. +func MesosMasterNodesFromZK(cluster Cluster) (map[string][]string, error) { + return MesosMasterNodesFromZKOpts(ZKEndpoints(strings.Split(cluster.ZK, ",")...), ZKPath(cluster.MesosZKPath)) +} + +// Retrieves current mesos master nodes/leader from ZK with a custom configuration. +func MesosMasterNodesFromZKOpts(options ...ZKOpt) (map[string][]string, error) { + result := make(map[string][]string) + + // Load the default configuration for Zookeeper followed by overriding values with those provided by the caller.] + config := &zkConfig{backoff: defaultBackoff, timeout: time.Second * 10, logger: NoopLogger{}} + for _, opt := range options { + opt(config) + } + + if len(config.endpoints) == 0 { + return nil, errors.New("no Zookeeper endpoints supplied") + } + + if config.path == "" { + return nil, errors.New("no Zookeeper path supplied") + } + + // Create a closure that allows us to use the ExponentialBackoff function. + retryErr := ExponentialBackoff(config.backoff, config.logger, func() (bool, error) { + + c, _, err := zk.Connect(config.endpoints, config.timeout, func(c *zk.Conn) { c.SetLogger(config.logger) }) + if err != nil { + return false, NewTemporaryError(errors.Wrap(err, "Failed to connect to Zookeeper")) + } + + defer c.Close() + + // Open up descriptor for the ZK path given + children, _, _, err := c.ChildrenW(config.path) + if err != nil { + + // Sentinel error check as there is no other way to check. + if err == zk.ErrInvalidPath { + return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", config.path) + } + + return false, + NewTemporaryError(errors.Wrapf(err, "path %s doesn't exist on Zookeeper ", config.path)) + } + + // Get all the master nodes through all the children in the given path + minScore := math.MaxInt64 + var mesosInstance MesosInstance + var hosts []string + for _, child := range children { + // Only the master nodes will start with json.info_ + if strings.HasPrefix(child, "json.info_") { + strs := strings.Split(child, "_") + if len(strs) < 2 { + config.logger.Printf("Zk node %v/%v's name is malformed.", config.path, child) + continue + } + score, err := strconv.Atoi(strs[1]) + if err != nil { + return false, NewTemporaryError(errors.Wrap(err, "unable to read the zk node for Mesos.")) + } + + childPath := config.path + "/" + child + data, _, err := c.Get(childPath) + if err != nil { + if err == zk.ErrInvalidPath { + return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", childPath) + } + + return false, NewTemporaryError(errors.Wrap(err, "error fetching contents of leader")) + } + + err = json.Unmarshal([]byte(data), &mesosInstance) + if err != nil { + config.logger.Printf("%s", err) + return false, + NewTemporaryError(errors.Wrap(err, "unable to unmarshal contents of leader")) + } + // Combine all master nodes into comma-separated + // Return hostname instead of ip to be consistent with aurora master nodes + hosts = append(hosts, mesosInstance.Address.Hostname) + // get the leader from the child with the smallest score. + if score < minScore { + minScore = score + result["leader"] = append(result["leader"], mesosInstance.Address.Hostname) + } + } + } + result["masterNodes"] = hosts + // Master nodes data might not be available yet, try to fetch again. + if len(result["masterNodes"]) == 0 { + return false, NewTemporaryError(errors.New("no mesos master nodes found")) + } + return true, nil + }) + + if retryErr != nil { + config.logger.Printf("Failed to get mesos master nodes after %v attempts", config.backoff.Steps) + return nil, retryErr + } + + return result, nil +}