diff --git a/.auroraversion b/.auroraversion index 4e8f395..8854156 100644 --- a/.auroraversion +++ b/.auroraversion @@ -1 +1 @@ -0.26.0 +0.21.0 diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index eb459c8..665cbd8 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -1,24 +1,18 @@ name: CI -on: - push: - branches: - - master - pull_request: - branches: - - master +on: [push] jobs: build: - runs-on: ubuntu-20.04 + runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - name: Setup Go for use with actions uses: actions/setup-go@v2 with: - go-version: 1.17 + go-version: 1.15 - name: Install goimports run: go get golang.org/x/tools/cmd/goimports - name: Set env with list of directories in repo containin go code diff --git a/.gitignore b/.gitignore index 3f124ef..6cd33c9 100644 --- a/.gitignore +++ b/.gitignore @@ -41,3 +41,6 @@ _testmain.go # Example client build examples/client examples/jsonClient + +# Use checksum database +go.sum diff --git a/clusters.go b/clusters.go index c0e81dd..49e93f6 100644 --- a/clusters.go +++ b/clusters.go @@ -28,7 +28,6 @@ type Cluster struct { ZK string `json:"zk"` ZKPort int `json:"zk_port"` SchedZKPath string `json:"scheduler_zk_path"` - MesosZKPath string `json:"mesos_zk_path"` SchedURI string `json:"scheduler_uri"` ProxyURL string `json:"proxy_url"` AuthMechanism string `json:"auth_mechanism"` @@ -62,7 +61,6 @@ func GetDefaultClusterFromZKUrl(zkURL string) *Cluster { AuthMechanism: "UNAUTHENTICATED", ZK: zkURL, SchedZKPath: "/aurora/scheduler", - MesosZKPath: "/mesos", AgentRunDir: "latest", AgentRoot: "/var/lib/mesos", } diff --git a/clusters_test.go b/clusters_test.go index f0bfd54..48b2f03 100644 --- a/clusters_test.go +++ b/clusters_test.go @@ -32,7 +32,6 @@ func TestLoadClusters(t *testing.T) { assert.Equal(t, clusters["devcluster"].Name, "devcluster") assert.Equal(t, clusters["devcluster"].ZK, "192.168.33.7") assert.Equal(t, clusters["devcluster"].SchedZKPath, "/aurora/scheduler") - assert.Equal(t, clusters["devcluster"].MesosZKPath, "/mesos") assert.Equal(t, clusters["devcluster"].AuthMechanism, "UNAUTHENTICATED") assert.Equal(t, clusters["devcluster"].AgentRunDir, "latest") assert.Equal(t, clusters["devcluster"].AgentRoot, "/var/lib/mesos") diff --git a/docker-compose.yml b/docker-compose.yml index 6fd40ba..f103d35 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,7 +14,7 @@ services: ipv4_address: 192.168.33.2 master: - image: quay.io/aurorascheduler/mesos-master:1.9.0 + image: rdelvalle/mesos-master:1.6.2 restart: on-failure ports: - "5050:5050" @@ -32,7 +32,7 @@ services: - zk agent-one: - image: quay.io/aurorascheduler/mesos-agent:1.9.0 + image: rdelvalle/mesos-agent:1.6.2 pid: host restart: on-failure ports: @@ -41,11 +41,10 @@ services: MESOS_MASTER: zk://192.168.33.2:2181/mesos MESOS_CONTAINERIZERS: docker,mesos MESOS_PORT: 5051 - MESOS_HOSTNAME: agent-one + MESOS_HOSTNAME: localhost MESOS_RESOURCES: ports(*):[11000-11999] MESOS_SYSTEMD_ENABLE_SUPPORT: 'false' MESOS_WORK_DIR: /tmp/mesos - MESOS_ATTRIBUTES: 'host:agent-one;rack:1;zone:west' networks: aurora_cluster: ipv4_address: 192.168.33.4 @@ -56,58 +55,8 @@ services: depends_on: - zk - agent-two: - image: quay.io/aurorascheduler/mesos-agent:1.9.0 - pid: host - restart: on-failure - ports: - - "5052:5051" - environment: - MESOS_MASTER: zk://192.168.33.2:2181/mesos - MESOS_CONTAINERIZERS: docker,mesos - MESOS_PORT: 5051 - MESOS_HOSTNAME: agent-two - MESOS_RESOURCES: ports(*):[11000-11999] - MESOS_SYSTEMD_ENABLE_SUPPORT: 'false' - MESOS_WORK_DIR: /tmp/mesos - MESOS_ATTRIBUTES: 'host:agent-two;rack:2;zone:west' - networks: - aurora_cluster: - ipv4_address: 192.168.33.5 - - volumes: - - /sys/fs/cgroup:/sys/fs/cgroup - - /var/run/docker.sock:/var/run/docker.sock - depends_on: - - zk - - agent-three: - image: quay.io/aurorascheduler/mesos-agent:1.9.0 - pid: host - restart: on-failure - ports: - - "5053:5051" - environment: - MESOS_MASTER: zk://192.168.33.2:2181/mesos - MESOS_CONTAINERIZERS: docker,mesos - MESOS_PORT: 5051 - MESOS_HOSTNAME: agent-three - MESOS_RESOURCES: ports(*):[11000-11999] - MESOS_SYSTEMD_ENABLE_SUPPORT: 'false' - MESOS_WORK_DIR: /tmp/mesos - MESOS_ATTRIBUTES: 'host:agent-three;rack:2;zone:west;dedicated:vagrant/bar' - networks: - aurora_cluster: - ipv4_address: 192.168.33.6 - - volumes: - - /sys/fs/cgroup:/sys/fs/cgroup - - /var/run/docker.sock:/var/run/docker.sock - depends_on: - - zk - aurora-one: - image: quay.io/aurorascheduler/scheduler:0.25.0 + image: rdelvalle/aurora:0.22.0 pid: host ports: - "8081:8081" @@ -121,7 +70,6 @@ services: -shiro_realm_modules=INI_AUTHNZ -shiro_ini_path=/etc/aurora/security.ini -min_required_instances_for_sla_check=1 - -thermos_executor_cpu=0.09 volumes: - ./.aurora-config:/etc/aurora networks: diff --git a/docs/getting-started.md b/docs/getting-started.md index b894d06..b16ac3f 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -247,9 +247,6 @@ job = realis.NewJob(). RAM(64). Disk(100). IsService(false). - Production(false). - Tier("preemptible"). - Priority(0). InstanceCount(1). AddPorts(1). AddLabel("fileName", "sample-app/docker-compose.yml"). @@ -294,9 +291,6 @@ job = realis.NewJob(). RAM(64). Disk(100). IsService(true). - Production(false). - Tier("preemptible"). - Priority(0). InstanceCount(1). AddPorts(1) ``` diff --git a/docs/leveraging-the-library.md b/docs/leveraging-the-library.md index 2165ae1..464bddd 100644 --- a/docs/leveraging-the-library.md +++ b/docs/leveraging-the-library.md @@ -25,9 +25,6 @@ job = realis.NewJob(). RAM(64). Disk(100). IsService(false). - Production(false). - Tier("preemptible"). - Priority(0). InstanceCount(1). AddPorts(1). AddLabel("fileName", "sample-app/docker-compose.yml"). diff --git a/examples/client.go b/examples/client.go index 367f785..f598562 100644 --- a/examples/client.go +++ b/examples/client.go @@ -124,9 +124,6 @@ func main() { RAM(64). Disk(100). IsService(true). - Production(false). - Tier("preemptible"). - Priority(0). InstanceCount(1). AddPorts(1). ThermosExecutor(thermosExec) @@ -141,9 +138,6 @@ func main() { RAM(512). Disk(100). IsService(true). - Production(false). - Tier("preemptible"). - Priority(0). InstanceCount(1). AddPorts(4). AddLabel("fileName", "sample-app/docker-compose.yml"). @@ -157,9 +151,6 @@ func main() { RAM(64). Disk(100). IsService(true). - Production(false). - Tier("preemptible"). - Priority(0). InstanceCount(1). AddPorts(1) default: diff --git a/examples/clusters.json b/examples/clusters.json index 33723a5..c456bd8 100644 --- a/examples/clusters.json +++ b/examples/clusters.json @@ -2,7 +2,6 @@ "name": "devcluster", "zk": "192.168.33.7", "scheduler_zk_path": "/aurora/scheduler", - "mesos_zk_path": "/mesos", "auth_mechanism": "UNAUTHENTICATED", "slave_run_directory": "latest", "slave_root": "/var/lib/mesos" diff --git a/examples/jsonClient.go b/examples/jsonClient.go index c53516b..68ea1ca 100644 --- a/examples/jsonClient.go +++ b/examples/jsonClient.go @@ -177,8 +177,6 @@ func main() { RAM(job.RAM). Disk(job.Disk). IsService(job.Service). - Tier("preemptible"). - Priority(0). InstanceCount(job.Instances). AddPorts(job.Ports) diff --git a/go.mod b/go.mod index c9c3dee..3b3095f 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ require ( github.com/apache/thrift v0.14.0 github.com/pkg/errors v0.9.1 github.com/samuel/go-zookeeper v0.0.0-20171117190445-471cd4e61d7a - github.com/stretchr/testify v1.7.0 + github.com/stretchr/testify v1.5.0 ) go 1.16 diff --git a/go.sum b/go.sum deleted file mode 100644 index d65a779..0000000 --- a/go.sum +++ /dev/null @@ -1,22 +0,0 @@ -github.com/apache/thrift v0.14.0 h1:vqZ2DP42i8th2OsgCcYZkirtbzvpZEFx53LiWDJXIAs= -github.com/apache/thrift v0.14.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= -github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/samuel/go-zookeeper v0.0.0-20171117190445-471cd4e61d7a h1:EYL2xz/Zdo0hyqdZMXR4lmT2O11jDLTPCEqIe/FR6W4= -github.com/samuel/go-zookeeper v0.0.0-20171117190445-471cd4e61d7a/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= -github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.5.0 h1:DMOzIV76tmoDNE9pX6RSN0aDtCYeCg5VueieJaAo1uw= -github.com/stretchr/testify v1.5.0/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= -github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= -gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/helpers.go b/helpers.go deleted file mode 100644 index f1983db..0000000 --- a/helpers.go +++ /dev/null @@ -1,23 +0,0 @@ -package realis - -import ( - "context" - - "github.com/aurora-scheduler/gorealis/v2/gen-go/apache/aurora" -) - -func (r *Client) JobExists(key aurora.JobKey) (bool, error) { - resp, err := r.client.GetConfigSummary(context.TODO(), &key) - if err != nil { - return false, err - } - - return resp != nil && - resp.GetResult_() != nil && - resp.GetResult_().GetConfigSummaryResult_() != nil && - resp.GetResult_().GetConfigSummaryResult_().GetSummary() != nil && - resp.GetResult_().GetConfigSummaryResult_().GetSummary().GetGroups() != nil && - len(resp.GetResult_().GetConfigSummaryResult_().GetSummary().GetGroups()) > 0 && - resp.GetResponseCode() == aurora.ResponseCode_OK, - nil -} diff --git a/job.go b/job.go index adf3958..b553520 100644 --- a/job.go +++ b/job.go @@ -156,16 +156,6 @@ func (j *AuroraJob) IsService(isService bool) *AuroraJob { return j } -func (j *AuroraJob) Priority(priority int32) *AuroraJob { - j.task.Priority(priority) - return j -} - -func (j *AuroraJob) Production(production bool) *AuroraJob { - j.task.Production(production) - return j -} - func (j *AuroraJob) TaskConfig() *aurora.TaskConfig { return j.task.TaskConfig() } diff --git a/jobUpdate.go b/jobUpdate.go index d880c29..3ef2cb4 100644 --- a/jobUpdate.go +++ b/jobUpdate.go @@ -77,7 +77,7 @@ func (j *JobUpdate) BatchSize(size int32) *JobUpdate { // Minimum number of seconds a shard must remain in RUNNING state before considered a success. func (j *JobUpdate) WatchTime(timeout time.Duration) *JobUpdate { - j.request.Settings.MinWaitInInstanceRunningMs = int32(timeout.Milliseconds()) + j.request.Settings.MinWaitInInstanceRunningMs = int32(timeout.Seconds() * 1000) return j } @@ -221,16 +221,6 @@ func (j *JobUpdate) IsService(isService bool) *JobUpdate { return j } -func (j *JobUpdate) Priority(priority int32) *JobUpdate { - j.task.Priority(priority) - return j -} - -func (j *JobUpdate) Production(production bool) *JobUpdate { - j.task.Production(production) - return j -} - func (j *JobUpdate) TaskConfig() *aurora.TaskConfig { return j.task.TaskConfig() } diff --git a/monitors.go b/monitors.go index 552d12e..963c017 100644 --- a/monitors.go +++ b/monitors.go @@ -245,7 +245,7 @@ func (c *Client) MonitorHostMaintenance(hosts []string, } } -// MonitorAutoPausedUpdate is a special monitor for auto pause enabled batch updates. This monitor ensures that the update +// AutoPaused monitor is a special monitor for auto pause enabled batch updates. This monitor ensures that the update // being monitored is capable of auto pausing and has auto pausing enabled. After verifying this information, // the monitor watches for the job to enter the ROLL_FORWARD_PAUSED state and calculates the current batch // the update is in using information from the update configuration. @@ -294,9 +294,8 @@ func (c *Client) MonitorAutoPausedUpdate(key aurora.JobUpdateKey, interval, time return -1, err } - // Summary 0 is assumed to exist because MonitorJobUpdateQuery will return an error if there is no summaries - if !(summary[0].State.Status == aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED || - summary[0].State.Status == aurora.JobUpdateStatus_ROLLED_FORWARD) { + // Summary 0 is assumed to exist because MonitorJobUpdateQuery will return an error if there is Summaries + if summary[0].State.Status != aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED { return -1, errors.Errorf("update is in a terminal state %v", summary[0].State.Status) } diff --git a/offer.go b/offer.go deleted file mode 100644 index 6f5346f..0000000 --- a/offer.go +++ /dev/null @@ -1,434 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package realis - -import ( - "bytes" - "crypto/tls" - "encoding/json" - "fmt" - "net/http" - "strings" - - "github.com/aurora-scheduler/gorealis/v2/gen-go/apache/aurora" -) - -// Offers on [aurora-scheduler]/offers endpoint -type Offer struct { - ID struct { - Value string `json:"value"` - } `json:"id"` - FrameworkID struct { - Value string `json:"value"` - } `json:"framework_id"` - AgentID struct { - Value string `json:"value"` - } `json:"agent_id"` - Hostname string `json:"hostname"` - URL struct { - Scheme string `json:"scheme"` - Address struct { - Hostname string `json:"hostname"` - IP string `json:"ip"` - Port int `json:"port"` - } `json:"address"` - Path string `json:"path"` - Query []interface{} `json:"query"` - } `json:"url"` - Resources []struct { - Name string `json:"name"` - Type string `json:"type"` - Ranges struct { - Range []struct { - Begin int `json:"begin"` - End int `json:"end"` - } `json:"range"` - } `json:"ranges,omitempty"` - Role string `json:"role"` - Reservations []interface{} `json:"reservations"` - Scalar struct { - Value float64 `json:"value"` - } `json:"scalar,omitempty"` - } `json:"resources"` - Attributes []struct { - Name string `json:"name"` - Type string `json:"type"` - Text struct { - Value string `json:"value"` - } `json:"text"` - } `json:"attributes"` - ExecutorIds []struct { - Value string `json:"value"` - } `json:"executor_ids"` -} - -// hosts on [aurora-scheduler]/maintenance endpoint -type MaintenanceList struct { - Drained []string `json:"DRAINED"` - Scheduled []string `json:"SCHEDULED"` - Draining map[string][]string `json:"DRAINING"` -} - -type OfferCount map[float64]int64 -type OfferGroupReport map[string]OfferCount -type OfferReport map[string]OfferGroupReport - -// MaintenanceHosts list all the hosts under maintenance -func (c *Client) MaintenanceHosts() ([]string, error) { - tr := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: c.config.insecureSkipVerify}, - } - - request := &http.Client{Transport: tr} - - resp, err := request.Get(fmt.Sprintf("%s/maintenance", c.GetSchedulerURL())) - if err != nil { - return nil, err - } - defer resp.Body.Close() - - buf := new(bytes.Buffer) - if _, err := buf.ReadFrom(resp.Body); err != nil { - return nil, err - } - - var list MaintenanceList - - if err := json.Unmarshal(buf.Bytes(), &list); err != nil { - return nil, err - } - - hosts := append(list.Drained, list.Scheduled...) - - for drainingHost := range list.Draining { - hosts = append(hosts, drainingHost) - } - - return hosts, nil -} - -// Offers pulls data from /offers endpoint -func (c *Client) Offers() ([]Offer, error) { - tr := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: c.config.insecureSkipVerify}, - } - - request := &http.Client{Transport: tr} - - resp, err := request.Get(fmt.Sprintf("%s/offers", c.GetSchedulerURL())) - if err != nil { - return []Offer{}, err - } - defer resp.Body.Close() - - buf := new(bytes.Buffer) - if _, err := buf.ReadFrom(resp.Body); err != nil { - return nil, err - } - - var offers []Offer - - if err := json.Unmarshal(buf.Bytes(), &offers); err != nil { - return []Offer{}, err - } - - return offers, nil -} - -// AvailOfferReport returns a detailed summary of offers available for use. -// For example, 2 nodes offer 32 cpus and 10 nodes offer 1 cpus. -func (c *Client) AvailOfferReport() (OfferReport, error) { - maintHosts, err := c.MaintenanceHosts() - if err != nil { - return nil, err - } - - maintHostSet := map[string]bool{} - for _, h := range maintHosts { - maintHostSet[h] = true - } - - // Get a list of offers - offers, err := c.Offers() - if err != nil { - return nil, err - } - - report := OfferReport{} - - for _, o := range offers { - if maintHostSet[o.Hostname] { - continue - } - - group := "non-dedicated" - for _, a := range o.Attributes { - if a.Name == "dedicated" { - group = a.Text.Value - break - } - } - - if _, ok := report[group]; !ok { - report[group] = map[string]OfferCount{} - } - - for _, r := range o.Resources { - - if _, ok := report[group][r.Name]; !ok { - report[group][r.Name] = OfferCount{} - } - - val := 0.0 - switch r.Type { - case "SCALAR": - val = r.Scalar.Value - case "RANGES": - for _, pr := range r.Ranges.Range { - val += float64(pr.End - pr.Begin + 1) - } - default: - return nil, fmt.Errorf("%s is not supported", r.Type) - } - - report[group][r.Name][val]++ - } - } - - return report, nil -} - -// FitTasks computes the number tasks can be fit in a list of offer -func (c *Client) FitTasks(taskConfig *aurora.TaskConfig, offers []Offer) (int64, error) { - // count the number of tasks per limit contraint: limit.name -> limit.value -> count - limitCounts := map[string]map[string]int64{} - for _, c := range taskConfig.Constraints { - if c.Constraint.Limit != nil { - limitCounts[c.Name] = map[string]int64{} - } - } - - request := ResourcesToMap(taskConfig.Resources) - - // validate resource request - if len(request) == 0 { - return -1, fmt.Errorf("Resource request %v must not be empty", request) - } - - isValid := false - for _, resVal := range request { - if resVal > 0 { - isValid = true - break - } - } - - if !isValid { - return -1, fmt.Errorf("Resource request %v is not valid", request) - } - - // pull the list of hosts under maintenance - maintHosts, err := c.MaintenanceHosts() - if err != nil { - return -1, err - } - - maintHostSet := map[string]bool{} - for _, h := range maintHosts { - maintHostSet[h] = true - } - - numTasks := int64(0) - - for _, o := range offers { - // skip the hosts under maintenance - if maintHostSet[o.Hostname] { - continue - } - - numTasksPerOffer := int64(-1) - - for resName, resVal := range request { - // skip as we can fit a infinite number of tasks with 0 demand. - if resVal == 0 { - continue - } - - avail := 0.0 - for _, r := range o.Resources { - if r.Name != resName { - continue - } - - switch r.Type { - case "SCALAR": - avail = r.Scalar.Value - case "RANGES": - for _, pr := range r.Ranges.Range { - avail += float64(pr.End - pr.Begin + 1) - } - default: - return -1, fmt.Errorf("%s is not supported", r.Type) - } - } - - numTasksPerResource := int64(avail / resVal) - - if numTasksPerResource < numTasksPerOffer || numTasksPerOffer < 0 { - numTasksPerOffer = numTasksPerResource - } - } - - numTasks += fitConstraints(taskConfig, &o, limitCounts, numTasksPerOffer) - } - - return numTasks, nil -} - -func fitConstraints(taskConfig *aurora.TaskConfig, - offer *Offer, - limitCounts map[string]map[string]int64, - numTasksPerOffer int64) int64 { - - // check dedicated attributes vs. constraints - if !isDedicated(offer, taskConfig.Job.Role, taskConfig.Constraints) { - return 0 - } - - limitConstraints := []*aurora.Constraint{} - - for _, c := range taskConfig.Constraints { - // look for corresponding attribute - attFound := false - for _, a := range offer.Attributes { - if a.Name == c.Name { - attFound = true - } - } - - // constraint not found in offer's attributes - if !attFound { - return 0 - } - - if c.Constraint.Value != nil && !valueConstraint(offer, c) { - // value constraint is not satisfied - return 0 - } else if c.Constraint.Limit != nil { - limitConstraints = append(limitConstraints, c) - limit := limitConstraint(offer, c, limitCounts) - - if numTasksPerOffer > limit && limit >= 0 { - numTasksPerOffer = limit - } - } - } - - // update limitCounts - for _, c := range limitConstraints { - for _, a := range offer.Attributes { - if a.Name == c.Name { - limitCounts[a.Name][a.Text.Value] += numTasksPerOffer - } - } - } - - return numTasksPerOffer -} - -func isDedicated(offer *Offer, role string, constraints []*aurora.Constraint) bool { - // get all dedicated attributes of an offer - dedicatedAtts := map[string]bool{} - for _, a := range offer.Attributes { - if a.Name == "dedicated" { - dedicatedAtts[a.Text.Value] = true - } - } - - if len(dedicatedAtts) == 0 { - return true - } - - // check if constraints are matching dedicated attributes - matched := false - for _, c := range constraints { - if c.Name == "dedicated" && c.Constraint.Value != nil { - found := false - - for _, v := range c.Constraint.Value.Values { - if dedicatedAtts[v] && strings.HasPrefix(v, fmt.Sprintf("%s/", role)) { - found = true - break - } - } - - if found { - matched = true - } else { - return false - } - } - } - - return matched -} - -// valueConstraint checks Value Contraints of task if the are matched by the offer. -// more details can be found here https://aurora.apache.org/documentation/latest/features/constraints/ -func valueConstraint(offer *Offer, constraint *aurora.Constraint) bool { - matched := false - - for _, a := range offer.Attributes { - if a.Name == constraint.Name { - for _, v := range constraint.Constraint.Value.Values { - matched = (a.Text.Value == v && !constraint.Constraint.Value.Negated) || - (a.Text.Value != v && constraint.Constraint.Value.Negated) - - if matched { - break - } - } - - if matched { - break - } - } - } - - return matched -} - -// limitConstraint limits the number of pods on a group which has the same attribute. -// more details can be found here https://aurora.apache.org/documentation/latest/features/constraints/ -func limitConstraint(offer *Offer, constraint *aurora.Constraint, limitCounts map[string]map[string]int64) int64 { - limit := int64(-1) - for _, a := range offer.Attributes { - // limit constraint found - if a.Name == constraint.Name { - curr := limitCounts[a.Name][a.Text.Value] - currLimit := int64(constraint.Constraint.Limit.Limit) - - if curr >= currLimit { - return 0 - } - - if currLimit-curr < limit || limit < 0 { - limit = currLimit - curr - } - } - } - - return limit -} diff --git a/realis.go b/realis.go index c331918..6299986 100644 --- a/realis.go +++ b/realis.go @@ -36,7 +36,7 @@ import ( "github.com/pkg/errors" ) -const VERSION = "2.28.0" +const VERSION = "2.22.1" type Client struct { config *clientConfig @@ -147,8 +147,6 @@ func NewClient(options ...ClientOption) (*Client, error) { return nil, errors.New("incomplete Options -- url, cluster.json, or Zookeeper address required") } - config.url = url - url, err = validateAuroraAddress(url) if err != nil { return nil, errors.Wrap(err, "unable to create realis object, invalid url") @@ -315,13 +313,11 @@ func (c *Client) GetInstanceIds(key aurora.JobKey, states []aurora.ScheduleStatu Statuses: states, } - c.logger.DebugPrintf("GetInstanceIds Thrift Payload: %+v\n", taskQ) + c.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", taskQ) resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.GetTasksWithoutConfigs(context.TODO(), taskQ) - }, - nil, - ) + }) // If we encountered an error we couldn't recover from by retrying, return an error to the user if retryErr != nil { @@ -343,13 +339,8 @@ func (c *Client) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (* resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.readonlyClient.GetJobUpdateSummaries(context.TODO(), jobUpdateQuery) - }, - nil, - ) + }) - if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetGetJobUpdateSummariesResult_() == nil { - return nil, errors.New("unexpected response from scheduler") - } if retryErr != nil { return nil, errors.Wrap(retryErr, "error getting job update summaries from Aurora Scheduler") } @@ -357,39 +348,23 @@ func (c *Client) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (* return resp.GetResult_().GetGetJobUpdateSummariesResult_(), nil } -func (c *Client) GetJobSummary(role string) (*aurora.JobSummaryResult_, error) { - - resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { - return c.readonlyClient.GetJobSummary(context.TODO(), role) - }, - nil, - ) - if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetJobSummaryResult_() == nil { - return nil, errors.New("unexpected response from scheduler") - } - if retryErr != nil { - return nil, errors.Wrap(retryErr, "error getting job summaries from Aurora Scheduler") - } - - return resp.GetResult_().GetJobSummaryResult_(), nil -} - func (c *Client) GetJobs(role string) (*aurora.GetJobsResult_, error) { + var result *aurora.GetJobsResult_ + resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.readonlyClient.GetJobs(context.TODO(), role) - }, - nil, - ) + }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "error getting Jobs from Aurora Scheduler") - } - if resp == nil || resp.GetResult_() == nil { - return nil, errors.New("unexpected response from scheduler") + return result, errors.Wrap(retryErr, "error getting Jobs from Aurora Scheduler") } - return resp.GetResult_().GetJobsResult_, nil + if resp.GetResult_() != nil { + result = resp.GetResult_().GetJobsResult_ + } + + return result, nil } // Kill specific instances of a job. Returns true, nil if a task was actually killed as a result of this API call. @@ -399,19 +374,19 @@ func (c *Client) KillInstances(key aurora.JobKey, instances ...int32) (bool, err resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.KillTasks(context.TODO(), &key, instances, "") - }, - nil, - ) + }) if retryErr != nil { return false, errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler") } - if resp == nil || len(resp.GetDetails()) > 0 { + if len(resp.GetDetails()) > 0 { c.logger.Println("KillTasks was called but no tasks killed as a result.") return false, nil + } else { + return true, nil } - return true, nil + } func (c *Client) RealisConfig() *clientConfig { @@ -426,9 +401,7 @@ func (c *Client) KillJob(key aurora.JobKey) error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { // Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards return c.client.KillTasks(context.TODO(), &key, nil, "") - }, - nil, - ) + }) if retryErr != nil { return errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler") @@ -450,27 +423,9 @@ func (c *Client) CreateJob(auroraJob *AuroraJob) error { return errors.Wrap(err, "unable to create Thermos payload") } - // Response is checked by the thrift retry code - _, retryErr := c.thriftCallWithRetries( - false, - func() (*aurora.Response, error) { - return c.client.CreateJob(context.TODO(), auroraJob.JobConfig()) - }, - // On a client timeout, attempt to verify that payload made to the Scheduler by - // trying to get the config summary for the job key - func() (*aurora.Response, bool) { - exists, err := c.JobExists(auroraJob.JobKey()) - if err != nil { - c.logger.Print("verification failed ", err) - } - - if exists { - return &aurora.Response{ResponseCode: aurora.ResponseCode_OK}, true - } - - return nil, false - }, - ) + _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.client.CreateJob(context.TODO(), auroraJob.JobConfig()) + }) if retryErr != nil { return errors.Wrap(retryErr, "error sending Create command to Aurora Scheduler") @@ -501,9 +456,7 @@ func (c *Client) ScheduleCronJob(auroraJob *AuroraJob) error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.ScheduleCronJob(context.TODO(), auroraJob.JobConfig()) - }, - nil, - ) + }) if retryErr != nil { return errors.Wrap(retryErr, "error sending Cron AuroraJob Schedule message to Aurora Scheduler") @@ -517,9 +470,7 @@ func (c *Client) DescheduleCronJob(key aurora.JobKey) error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.DescheduleCronJob(context.TODO(), &key) - }, - nil, - ) + }) if retryErr != nil { return errors.Wrap(retryErr, "error sending Cron AuroraJob De-schedule message to Aurora Scheduler") @@ -535,9 +486,7 @@ func (c *Client) StartCronJob(key aurora.JobKey) error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.StartCronJob(context.TODO(), &key) - }, - nil, - ) + }) if retryErr != nil { return errors.Wrap(retryErr, "error sending Start Cron AuroraJob message to Aurora Scheduler") @@ -552,9 +501,7 @@ func (c *Client) RestartInstances(key aurora.JobKey, instances ...int32) error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.RestartShards(context.TODO(), &key, instances) - }, - nil, - ) + }) if retryErr != nil { return errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler") @@ -575,17 +522,16 @@ func (c *Client) RestartJob(key aurora.JobKey) error { if len(instanceIds) > 0 { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.RestartShards(context.TODO(), &key, instanceIds) - }, - nil, - ) + }) if retryErr != nil { return errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler") } return nil + } else { + return errors.New("no tasks in the Active state") } - return errors.New("no tasks in the Active state") } // Update all tasks under a job configuration. Currently gorealis doesn't support for canary deployments. @@ -597,80 +543,34 @@ func (c *Client) StartJobUpdate(updateJob *JobUpdate, message string) (*aurora.S c.logger.DebugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message) - resp, retryErr := c.thriftCallWithRetries(false, - func() (*aurora.Response, error) { - return c.client.StartJobUpdate(context.TODO(), updateJob.request, message) - }, - func() (*aurora.Response, bool) { - key := updateJob.JobKey() - summariesResp, err := c.readonlyClient.GetJobUpdateSummaries( - context.TODO(), - &aurora.JobUpdateQuery{ - JobKey: &key, - UpdateStatuses: aurora.ACTIVE_JOB_UPDATE_STATES, - Limit: 1, - }) - - if err != nil { - c.logger.Print("verification failed ", err) - return nil, false - } - - summaries := response.JobUpdateSummaries(summariesResp) - if len(summaries) == 0 { - return nil, false - } - - return &aurora.Response{ - ResponseCode: aurora.ResponseCode_OK, - Result_: &aurora.Result_{ - StartJobUpdateResult_: &aurora.StartJobUpdateResult_{ - UpdateSummary: summaries[0], - Key: summaries[0].Key, - }, - }, - }, true - }, - ) + resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.client.StartJobUpdate(nil, updateJob.request, message) + }) if retryErr != nil { - // A timeout took place when attempting this call, attempt to recover - if IsTimeout(retryErr) { - return nil, retryErr - } - return nil, errors.Wrap(retryErr, "error sending StartJobUpdate command to Aurora Scheduler") } - if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetStartJobUpdateResult_() == nil { - return nil, errors.New("unexpected response from scheduler") + + if resp.GetResult_() != nil && resp.GetResult_().GetStartJobUpdateResult_() != nil { + return resp.GetResult_().GetStartJobUpdateResult_(), nil } - return resp.GetResult_().GetStartJobUpdateResult_(), nil + + return nil, errors.New("thrift error: Field in response is nil unexpectedly.") } -// AbortJobUpdate terminates a job update in the scheduler. -// It requires the updateId which can be obtained on the Aurora web UI. -// This API is meant to be synchronous. It will attempt to wait until the update transitions to the aborted state. -// However, if the job update does not transition to the ABORT state an error will be returned. +// Abort AuroraJob Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI. func (c *Client) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) error { c.logger.DebugPrintf("AbortJobUpdate Thrift Payload: %+v %v\n", updateKey, message) _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.AbortJobUpdate(context.TODO(), &updateKey, message) - }, - nil, - ) + }) if retryErr != nil { return errors.Wrap(retryErr, "error sending AbortJobUpdate command to Aurora Scheduler") } - // Make this call synchronous by blocking until it job has successfully transitioned to aborted - _, err := c.MonitorJobUpdateStatus( - updateKey, - []aurora.JobUpdateStatus{aurora.JobUpdateStatus_ABORTED}, - time.Second*5, - time.Minute) - return err + return nil } // Pause AuroraJob Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. @@ -690,9 +590,7 @@ func (c *Client) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.PauseJobUpdate(nil, updateKeyLocal, message) - }, - nil, - ) + }) if retryErr != nil { return errors.Wrap(retryErr, "error sending PauseJobUpdate command to Aurora Scheduler") @@ -719,9 +617,7 @@ func (c *Client) ResumeJobUpdate(updateKey aurora.JobUpdateKey, message string) _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.ResumeJobUpdate(context.TODO(), &updateKey, message) - }, - nil, - ) + }) if retryErr != nil { return errors.Wrap(retryErr, "error sending ResumeJobUpdate command to Aurora Scheduler") @@ -742,19 +638,18 @@ func (c *Client) PulseJobUpdate(updateKey aurora.JobUpdateKey) (aurora.JobUpdate resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.PulseJobUpdate(context.TODO(), &updateKey) - }, - nil, - ) + }) if retryErr != nil { return aurora.JobUpdatePulseStatus(0), errors.Wrap(retryErr, "error sending PulseJobUpdate command to Aurora Scheduler") } - if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetPulseJobUpdateResult_() == nil { - return aurora.JobUpdatePulseStatus(0), errors.New("unexpected response from scheduler") + if resp.GetResult_() != nil && resp.GetResult_().GetPulseJobUpdateResult_() != nil { + return resp.GetResult_().GetPulseJobUpdateResult_().GetStatus(), nil + } else { + return aurora.JobUpdatePulseStatus(0), errors.New("thrift error, field was nil unexpectedly") } - return resp.GetResult_().GetPulseJobUpdateResult_().GetStatus(), nil } // Scale up the number of instances under a job configuration using the configuration for specific @@ -771,9 +666,7 @@ func (c *Client) AddInstances(instKey aurora.InstanceKey, count int32) error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.AddInstances(context.TODO(), &instKey, count) - }, - nil, - ) + }) if retryErr != nil { return errors.Wrap(retryErr, "error sending AddInstances command to Aurora Scheduler") @@ -818,16 +711,11 @@ func (c *Client) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.GetTasksStatus(context.TODO(), query) - }, - nil, - ) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status") } - if resp == nil { - return nil, errors.New("unexpected response from scheduler") - } return response.ScheduleStatusResult(resp).GetTasks(), nil } @@ -839,32 +727,29 @@ func (c *Client) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.PendingRea resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.GetPendingReason(context.TODO(), query) - }, - nil, - ) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for pending Reasons") } - if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetGetPendingReasonResult_() == nil { - return nil, errors.New("unexpected response from scheduler") + var result []*aurora.PendingReason + + if resp.GetResult_() != nil { + result = resp.GetResult_().GetGetPendingReasonResult_().GetReasons() } - return resp.GetResult_().GetGetPendingReasonResult_().GetReasons(), nil + return result, nil } -// GetTasksWithoutConfigs gets information about task including without a task configuration object. -// This is a more lightweight version of GetTaskStatus but contains less information as a result. +// Get information about task including without a task configuration object func (c *Client) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) { c.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query) resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.GetTasksWithoutConfigs(context.TODO(), query) - }, - nil, - ) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status without configs") @@ -891,9 +776,7 @@ func (c *Client) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.GetTasksStatus(context.TODO(), taskQ) - }, - nil, - ) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task configuration") @@ -919,19 +802,17 @@ func (c *Client) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) ([]*aurora. resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.GetJobUpdateDetails(context.TODO(), &updateQuery) - }, - nil, - ) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "unable to get job update details") } - if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetGetJobUpdateDetailsResult_() == nil { - return nil, errors.New("unexpected response from scheduler") + if resp.GetResult_() != nil && resp.GetResult_().GetGetJobUpdateDetailsResult_() != nil { + return resp.GetResult_().GetGetJobUpdateDetailsResult_().GetDetailsList(), nil + } else { + return nil, errors.New("unknown Thrift error, field is nil.") } - - return resp.GetResult_().GetGetJobUpdateDetailsResult_().GetDetailsList(), nil } func (c *Client) RollbackJobUpdate(key aurora.JobUpdateKey, message string) error { @@ -940,16 +821,10 @@ func (c *Client) RollbackJobUpdate(key aurora.JobUpdateKey, message string) erro _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.client.RollbackJobUpdate(context.TODO(), &key, message) - }, - nil, - ) + }) if retryErr != nil { return errors.Wrap(retryErr, "unable to roll back job update") } return nil } - -func (c *Client) GetSchedulerURL() string { - return c.config.url -} diff --git a/realis_admin.go b/realis_admin.go index f100f46..9530031 100644 --- a/realis_admin.go +++ b/realis_admin.go @@ -37,19 +37,17 @@ func (c *Client) DrainHosts(hosts ...string) ([]*aurora.HostStatus, error) { resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.DrainHosts(context.TODO(), drainList) - }, - nil, - ) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "unable to recover connection") } - if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetDrainHostsResult_() == nil { - return nil, errors.New("unexpected response from scheduler") + if resp.GetResult_() != nil && resp.GetResult_().GetDrainHostsResult_() != nil { + return resp.GetResult_().GetDrainHostsResult_().GetStatuses(), nil + } else { + return nil, errors.New("thrift error: Field in response is nil unexpectedly.") } - - return resp.GetResult_().GetDrainHostsResult_().GetStatuses(), nil } // Start SLA Aware Drain. @@ -61,18 +59,6 @@ func (c *Client) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, hosts .. return nil, errors.New("no hosts provided to drain") } - if policy == nil || policy.CountSetFieldsSlaPolicy() == 0 { - policy = &defaultSlaPolicy - c.logger.Printf("Warning: start draining with default sla policy %v", policy) - } - - if timeout < 0 { - c.logger.Printf("Warning: timeout %d secs is invalid, draining with default timeout %d secs", - timeout, - defaultSlaDrainTimeoutSecs) - timeout = defaultSlaDrainTimeoutSecs - } - drainList := aurora.NewHosts() drainList.HostNames = hosts @@ -80,19 +66,17 @@ func (c *Client) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, hosts .. resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.SlaDrainHosts(context.TODO(), drainList, policy, timeout) - }, - nil, - ) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "unable to recover connection") } - if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetDrainHostsResult_() == nil { - return nil, errors.New("unexpected response from scheduler") + if resp.GetResult_() != nil && resp.GetResult_().GetDrainHostsResult_() != nil { + return resp.GetResult_().GetDrainHostsResult_().GetStatuses(), nil + } else { + return nil, errors.New("thrift error: Field in response is nil unexpectedly.") } - - return resp.GetResult_().GetDrainHostsResult_().GetStatuses(), nil } func (c *Client) StartMaintenance(hosts ...string) ([]*aurora.HostStatus, error) { @@ -108,19 +92,17 @@ func (c *Client) StartMaintenance(hosts ...string) ([]*aurora.HostStatus, error) resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.StartMaintenance(context.TODO(), hostList) - }, - nil, - ) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "unable to recover connection") } - if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetStartMaintenanceResult_() == nil { - return nil, errors.New("unexpected response from scheduler") + if resp.GetResult_() != nil && resp.GetResult_().GetStartMaintenanceResult_() != nil { + return resp.GetResult_().GetStartMaintenanceResult_().GetStatuses(), nil + } else { + return nil, errors.New("thrift error: Field in response is nil unexpectedly.") } - - return resp.GetResult_().GetStartMaintenanceResult_().GetStatuses(), nil } func (c *Client) EndMaintenance(hosts ...string) ([]*aurora.HostStatus, error) { @@ -136,20 +118,24 @@ func (c *Client) EndMaintenance(hosts ...string) ([]*aurora.HostStatus, error) { resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.EndMaintenance(context.TODO(), hostList) - }, - nil, - ) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "unable to recover connection") } - if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetEndMaintenanceResult_() == nil { - return nil, errors.New("unexpected response from scheduler") + + if resp.GetResult_() != nil && resp.GetResult_().GetEndMaintenanceResult_() != nil { + return resp.GetResult_().GetEndMaintenanceResult_().GetStatuses(), nil + } else { + return nil, errors.New("thrift error: Field in response is nil unexpectedly.") } - return resp.GetResult_().GetEndMaintenanceResult_().GetStatuses(), nil + } func (c *Client) MaintenanceStatus(hosts ...string) (*aurora.MaintenanceStatusResult_, error) { + + var result *aurora.MaintenanceStatusResult_ + if len(hosts) == 0 { return nil, errors.New("no hosts provided to get maintenance status from") } @@ -163,18 +149,17 @@ func (c *Client) MaintenanceStatus(hosts ...string) (*aurora.MaintenanceStatusRe // and continue trying to resend command until we run out of retries. resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.MaintenanceStatus(context.TODO(), hostList) - }, - nil, - ) + }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "unable to recover connection") - } - if resp == nil || resp.GetResult_() == nil { - return nil, errors.New("unexpected response from scheduler") + return result, errors.Wrap(retryErr, "unable to recover connection") } - return resp.GetResult_().GetMaintenanceStatusResult_(), nil + if resp.GetResult_() != nil { + result = resp.GetResult_().GetMaintenanceStatusResult_() + } + + return result, nil } // SetQuota sets a quota aggregate for the given role @@ -192,9 +177,7 @@ func (c *Client) SetQuota(role string, cpu *float64, ramMb *int64, diskMb *int64 _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.SetQuota(context.TODO(), role, quota) - }, - nil, - ) + }) if retryErr != nil { return errors.Wrap(retryErr, "unable to set role quota") @@ -208,18 +191,17 @@ func (c *Client) GetQuota(role string) (*aurora.GetQuotaResult_, error) { resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.GetQuota(context.TODO(), role) - }, - nil, - ) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "unable to get role quota") } - if resp == nil || resp.GetResult_() == nil { - return nil, errors.New("unexpected response from scheduler") + if resp.GetResult_() != nil { + return resp.GetResult_().GetGetQuotaResult_(), nil + } else { + return nil, errors.New("thrift error: Field in response is nil unexpectedly.") } - return resp.GetResult_().GetGetQuotaResult_(), nil } // Force Aurora Scheduler to perform a snapshot and write to Mesos log @@ -227,9 +209,7 @@ func (c *Client) Snapshot() error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.Snapshot(context.TODO()) - }, - nil, - ) + }) if retryErr != nil { return errors.Wrap(retryErr, "unable to recover connection") @@ -243,9 +223,7 @@ func (c *Client) PerformBackup() error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.PerformBackup(context.TODO()) - }, - nil, - ) + }) if retryErr != nil { return errors.Wrap(retryErr, "unable to recover connection") @@ -259,9 +237,7 @@ func (c *Client) ForceImplicitTaskReconciliation() error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.TriggerImplicitTaskReconciliation(context.TODO()) - }, - nil, - ) + }) if retryErr != nil { return errors.Wrap(retryErr, "unable to recover connection") @@ -282,9 +258,7 @@ func (c *Client) ForceExplicitTaskReconciliation(batchSize *int32) error { _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.adminClient.TriggerExplicitTaskReconciliation(context.TODO(), settings) - }, - nil, - ) + }) if retryErr != nil { return errors.Wrap(retryErr, "unable to recover connection") diff --git a/realis_config.go b/realis_config.go index 4b2b96f..7bb09fc 100644 --- a/realis_config.go +++ b/realis_config.go @@ -19,7 +19,6 @@ import ( "time" "github.com/apache/thrift/lib/go/thrift" - "github.com/aurora-scheduler/gorealis/v2/gen-go/apache/aurora" ) type clientConfig struct { @@ -49,15 +48,6 @@ var defaultBackoff = Backoff{ Jitter: 0.1, } -var defaultSlaPolicy = aurora.SlaPolicy{ - PercentageSlaPolicy: &aurora.PercentageSlaPolicy{ - Percentage: 66, - DurationSecs: 300, - }, -} - -const defaultSlaDrainTimeoutSecs = 900 - type TransportProtocol int const ( diff --git a/realis_e2e_test.go b/realis_e2e_test.go index 5893491..38e39e7 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -94,15 +94,12 @@ func TestBadCredentials(t *testing.T) { job := realis.NewJob(). Environment("prod"). Role("vagrant"). - Name("create_thermos_job_bad_creds_test"). + Name("create_thermos_job_test"). ThermosExecutor(thermosExec). CPU(.5). RAM(64). Disk(100). IsService(true). - Production(false). - Tier("preemptible"). - Priority(0). InstanceCount(2). AddPorts(1) @@ -180,35 +177,6 @@ func TestLeaderFromZK(t *testing.T) { assert.Equal(t, "http://192.168.33.7:8081", url) } - -func TestMasterFromZK(t *testing.T) { - cluster := realis.GetDefaultClusterFromZKUrl("192.168.33.2:2181") - masterNodesMap, err := realis.MasterNodesFromZK(*cluster) - - assert.NoError(t, err) - - for _, hostnames := range masterNodesMap { - for _, hostname := range hostnames { - assert.NoError(t, err) - assert.Equal(t, "192.168.33.7", hostname) - } - } -} - -func TestMesosMasterFromZK(t *testing.T) { - cluster := realis.GetDefaultClusterFromZKUrl("192.168.33.2:2181") - masterNodesMap, err := realis.MesosMasterNodesFromZK(*cluster) - - assert.NoError(t, err) - - for _, hostnames := range masterNodesMap { - for _, hostname := range hostnames { - assert.NoError(t, err) - assert.Equal(t, "localhost", hostname) - } - } -} - func TestInvalidAuroraURL(t *testing.T) { for _, url := range []string{ "http://doesntexist.com:8081/apitest", @@ -238,6 +206,7 @@ func TestValidAuroraURL(t *testing.T) { } func TestRealisClient_ReestablishConn(t *testing.T) { + // Test that we're able to tear down the old connection and create a new one. err := r.ReestablishConn() @@ -248,9 +217,11 @@ func TestGetCACerts(t *testing.T) { certs, err := realis.GetCerts("./examples/certs") assert.NoError(t, err) assert.Equal(t, len(certs.Subjects()), 2) + } func TestRealisClient_CreateJob_Thermos(t *testing.T) { + role := "vagrant" job := realis.NewJob(). Environment("prod"). @@ -261,9 +232,6 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) { RAM(64). Disk(100). IsService(true). - Production(false). - Tier("preemptible"). - Priority(0). InstanceCount(2). AddPorts(1) @@ -277,7 +245,7 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) { // Fetch all Jobs result, err := r.GetJobs(role) - fmt.Println("GetJobs length: ", len(result.Configs)) + fmt.Printf("GetJobs length: %+v \n", len(result.Configs)) assert.Len(t, result.Configs, 1) assert.NoError(t, err) @@ -298,7 +266,7 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) { err := r.KillJob(job.JobKey()) assert.NoError(t, err) - success, err := r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 90*time.Second) + success, err := r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 60*time.Second) assert.True(t, success) assert.NoError(t, err) }) @@ -306,6 +274,7 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) { // Test configuring an executor that doesn't exist for CreateJob API func TestRealisClient_CreateJob_ExecutorDoesNotExist(t *testing.T) { + // Create a single job job := realis.NewJob(). Environment("prod"). @@ -324,6 +293,7 @@ func TestRealisClient_CreateJob_ExecutorDoesNotExist(t *testing.T) { // Test configuring an executor that doesn't exist for CreateJob API func TestRealisClient_GetPendingReason(t *testing.T) { + env := "prod" role := "vagrant" name := "pending_reason_test" @@ -354,13 +324,10 @@ func TestRealisClient_GetPendingReason(t *testing.T) { err = r.KillJob(job.JobKey()) assert.NoError(t, err) - - success, err := r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 90*time.Second) - assert.True(t, success) - assert.NoError(t, err) } func TestRealisClient_CreateService_WithPulse_Thermos(t *testing.T) { + fmt.Println("Creating service") role := "vagrant" job := realis.NewJobUpdate(). @@ -372,9 +339,6 @@ func TestRealisClient_CreateService_WithPulse_Thermos(t *testing.T) { Disk(100). ThermosExecutor(thermosExec). IsService(true). - Production(false). - Tier("preemptible"). - Priority(0). InstanceCount(2). AddPorts(1). AddLabel("currentTime", time.Now().String()). @@ -443,10 +407,6 @@ pulseLoop: err = r.KillJob(job.JobKey()) assert.NoError(t, err) - - success, err := r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 90*time.Second) - assert.True(t, success) - assert.NoError(t, err) } // Test configuring an executor that doesn't exist for CreateJob API @@ -464,9 +424,6 @@ func TestRealisClient_CreateService(t *testing.T) { InstanceCount(3). WatchTime(20 * time.Second). IsService(true). - Production(false). - Tier("preemptible"). - Priority(0). BatchSize(2) result, err := r.CreateService(job) @@ -477,15 +434,13 @@ func TestRealisClient_CreateService(t *testing.T) { var ok bool var mErr error - if result != nil { - if ok, mErr = r.MonitorJobUpdate(*result.GetKey(), 5*time.Second, 4*time.Minute); !ok || mErr != nil { - // Update may already be in a terminal state so don't check for error - err := r.AbortJobUpdate(*result.GetKey(), "Monitor timed out.") + if ok, mErr = r.MonitorJobUpdate(*result.GetKey(), 5*time.Second, 4*time.Minute); !ok || mErr != nil { + // Update may already be in a terminal state so don't check for error + err := r.AbortJobUpdate(*result.GetKey(), "Monitor timed out.") - err = r.KillJob(job.JobKey()) + err = r.KillJob(job.JobKey()) - assert.NoError(t, err) - } + assert.NoError(t, err) } assert.True(t, ok) @@ -493,10 +448,7 @@ func TestRealisClient_CreateService(t *testing.T) { // Kill task test task after confirming it came up fine err = r.KillJob(job.JobKey()) - assert.NoError(t, err) - success, err := r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 90*time.Second) - assert.True(t, success) assert.NoError(t, err) } @@ -536,9 +488,6 @@ func TestRealisClient_ScheduleCronJob_Thermos(t *testing.T) { RAM(64). Disk(100). IsService(true). - Production(false). - Tier("preemptible"). - Priority(0). InstanceCount(1). AddPorts(1). CronSchedule("* * * * *"). @@ -555,17 +504,10 @@ func TestRealisClient_ScheduleCronJob_Thermos(t *testing.T) { t.Run("TestRealisClient_DeschedulerCronJob_Thermos", func(t *testing.T) { err := r.DescheduleCronJob(job.JobKey()) assert.NoError(t, err) - - err = r.KillJob(job.JobKey()) - assert.NoError(t, err) - - success, err := r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 90*time.Second) - assert.True(t, success) - assert.NoError(t, err) }) } func TestRealisClient_StartMaintenance(t *testing.T) { - hosts := []string{"agent-one"} + hosts := []string{"localhost"} _, err := r.StartMaintenance(hosts...) assert.NoError(t, err) @@ -575,7 +517,7 @@ func TestRealisClient_StartMaintenance(t *testing.T) { []aurora.MaintenanceMode{aurora.MaintenanceMode_SCHEDULED}, 1*time.Second, 50*time.Second) - assert.Equal(t, map[string]bool{"agent-one": true}, hostResults) + assert.Equal(t, map[string]bool{"localhost": true}, hostResults) assert.NoError(t, err) _, err = r.EndMaintenance(hosts...) @@ -591,7 +533,7 @@ func TestRealisClient_StartMaintenance(t *testing.T) { } func TestRealisClient_DrainHosts(t *testing.T) { - hosts := []string{"agent-one"} + hosts := []string{"localhost"} _, err := r.DrainHosts(hosts...) assert.NoError(t, err) @@ -601,7 +543,7 @@ func TestRealisClient_DrainHosts(t *testing.T) { []aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING}, 1*time.Second, 50*time.Second) - assert.Equal(t, map[string]bool{"agent-one": true}, hostResults) + assert.Equal(t, map[string]bool{"localhost": true}, hostResults) assert.NoError(t, err) t.Run("TestRealisClient_MonitorNontransitioned", func(t *testing.T) { @@ -614,7 +556,7 @@ func TestRealisClient_DrainHosts(t *testing.T) { // Assert monitor returned an error that was not nil, and also a list of the non-transitioned hosts assert.Error(t, err) - assert.Equal(t, map[string]bool{"agent-one": true, "IMAGINARY_HOST": false}, hostResults) + assert.Equal(t, map[string]bool{"localhost": true, "IMAGINARY_HOST": false}, hostResults) }) t.Run("TestRealisClient_EndMaintenance", func(t *testing.T) { @@ -633,7 +575,7 @@ func TestRealisClient_DrainHosts(t *testing.T) { } func TestRealisClient_SLADrainHosts(t *testing.T) { - hosts := []string{"agent-one"} + hosts := []string{"localhost"} policy := aurora.SlaPolicy{PercentageSlaPolicy: &aurora.PercentageSlaPolicy{Percentage: 50.0}} _, err := r.SLADrainHosts(&policy, 30, hosts...) @@ -648,7 +590,7 @@ func TestRealisClient_SLADrainHosts(t *testing.T) { []aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING}, 1*time.Second, 50*time.Second) - assert.Equal(t, map[string]bool{"agent-one": true}, hostResults) + assert.Equal(t, map[string]bool{"localhost": true}, hostResults) assert.NoError(t, err) _, err = r.EndMaintenance(hosts...) @@ -661,39 +603,6 @@ func TestRealisClient_SLADrainHosts(t *testing.T) { 5*time.Second, 10*time.Second) assert.NoError(t, err) - - // slaDrainHosts goes with default policy if no policy is specified - _, err = r.SLADrainHosts(nil, 30, hosts...) - if err != nil { - fmt.Printf("error: %+v\n", err.Error()) - os.Exit(1) - } - hostResults, err = r.MonitorHostMaintenance( - hosts, - []aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING}, - 1*time.Second, - 50*time.Second) - assert.Equal(t, map[string]bool{"agent-one": true}, hostResults) - assert.NoError(t, err) - - _, err = r.EndMaintenance(hosts...) - assert.NoError(t, err) - - _, err = r.SLADrainHosts(&aurora.SlaPolicy{}, 30, hosts...) - if err != nil { - fmt.Printf("error: %+v\n", err.Error()) - os.Exit(1) - } - hostResults, err = r.MonitorHostMaintenance( - hosts, - []aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING}, - 1*time.Second, - 50*time.Second) - assert.Equal(t, map[string]bool{"agent-one": true}, hostResults) - assert.NoError(t, err) - - _, err = r.EndMaintenance(hosts...) - assert.NoError(t, err) } // Test multiple go routines using a single connection @@ -730,9 +639,6 @@ func TestRealisClient_SessionThreadSafety(t *testing.T) { err = r.KillJob(job.JobKey()) assert.NoError(t, err) - success, err = r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 90*time.Second) - assert.True(t, success) - assert.NoError(t, err) }() } @@ -798,9 +704,6 @@ func TestRealisClient_PartitionPolicy(t *testing.T) { RAM(64). Disk(100). IsService(true). - Production(false). - Tier("preemptible"). - Priority(0). InstanceCount(2). BatchSize(2). PartitionPolicy(true, partitionDelay) @@ -819,12 +722,6 @@ func TestRealisClient_PartitionPolicy(t *testing.T) { assert.NoError(t, err) } - err = r.KillJob(job.JobKey()) - assert.NoError(t, err) - - success, err := r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 90*time.Second) - assert.True(t, success) - assert.NoError(t, err) } func TestRealisClient_UpdateStrategies(t *testing.T) { @@ -837,10 +734,7 @@ func TestRealisClient_UpdateStrategies(t *testing.T) { RAM(4). Disk(10). InstanceCount(6). - IsService(true). - Production(false). - Tier("preemptible"). - Priority(0) + IsService(true) // Needed to populate the task config correctly assert.NoError(t, job.BuildThermosPayload()) @@ -889,10 +783,6 @@ func TestRealisClient_UpdateStrategies(t *testing.T) { assert.NoError(t, r.AbortJobUpdate(key, "Monitor timed out.")) } assert.NoError(t, r.KillJob(strategy.jobUpdate.JobKey())) - - success, err := r.MonitorInstances(strategy.jobUpdate.JobKey(), 0, 1*time.Second, 90*time.Second) - assert.True(t, success) - assert.NoError(t, err) }) } } @@ -902,15 +792,14 @@ func TestRealisClient_BatchAwareAutoPause(t *testing.T) { job := realis.NewJob(). Environment("prod"). Role("vagrant"). - Name("batch_aware_auto_pause_test"). + Name("BatchAwareAutoPauseTest"). ThermosExecutor(thermosExec). CPU(.01). RAM(4). Disk(10). InstanceCount(6). IsService(true) - - updateGroups := []int32{1, 3} + updateGroups := []int32{1, 2, 3} strategy := realis.JobUpdateFromAuroraTask(job.AuroraTask()). VariableBatchStrategy(true, updateGroups...). InstanceCount(6). @@ -923,583 +812,14 @@ func TestRealisClient_BatchAwareAutoPause(t *testing.T) { key := *result.GetKey() for i := range updateGroups { - curStep, mErr := r.MonitorAutoPausedUpdate(key, time.Second*5, time.Minute*5) + curStep, mErr := r.MonitorAutoPausedUpdate(key, time.Second*5, time.Second*240) if mErr != nil { - fmt.Println(mErr) // Update may already be in a terminal state so don't check for error - _ = r.AbortJobUpdate(key, "Monitor timed out.") + assert.NoError(t, r.AbortJobUpdate(key, "Monitor timed out.")) } assert.Equal(t, i, curStep) - - if i != len(updateGroups)-1 { - require.NoError(t, err) - require.NoError(t, r.ResumeJobUpdate(key, "auto resuming test")) - } + require.NoError(t, r.ResumeJobUpdate(key, "auto resuming test")) } - assert.NoError(t, r.AbortJobUpdate(key, "")) assert.NoError(t, r.KillJob(strategy.JobKey())) - - success, err := r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 90*time.Second) - assert.True(t, success) - assert.NoError(t, err) -} - -func TestRealisClient_GetJobSummary(t *testing.T) { - role := "vagrant" - env := "prod" - name := "test_get_job_summary" - // Create a single job - job := realis.NewJob(). - Environment(env). - Role(role). - Name(name). - ThermosExecutor(thermosExec). - CPU(.25). - RAM(4). - Disk(10). - InstanceCount(3). - IsService(true). - Production(false). - Tier("preemptible"). - Priority(0) - - err := r.CreateJob(job) - assert.NoError(t, err) - - success, err := r.MonitorScheduleStatus(job.JobKey(), - job.GetInstanceCount(), - aurora.ACTIVE_STATES, - 1*time.Second, - 150*time.Second) - assert.True(t, success) - assert.NoError(t, err) - - // get job summaries of the role - summary, err := r.GetJobSummary(role) - assert.NoError(t, err) - assert.NotNil(t, summary) - jobCount := 0 - for _, s := range summary.Summaries { - jobKey := s.Job.TaskConfig.Job - if jobKey.Environment == env && jobKey.Name == name { - jobCount++ - } - } - assert.Equal(t, 1, jobCount) - - err = r.KillJob(job.JobKey()) - assert.NoError(t, err) - - success, err = r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 90*time.Second) - assert.True(t, success) - assert.NoError(t, err) -} - -func TestRealisClient_Offers(t *testing.T) { - var offers []realis.Offer - - // since offers are being recycled, it take a few tries to get all of them. - i := 0 - for ; len(offers) < 3 && i < 5; i++ { - offers, _ = r.Offers() - time.Sleep(5 * time.Second) - } - - assert.NotEqual(t, i, 5) -} - -func TestRealisClient_MaintenanceHosts(t *testing.T) { - offers, err := r.Offers() - assert.NoError(t, err) - - for i := 0; i < len(offers); i++ { - _, err := r.DrainHosts(offers[i].Hostname) - assert.NoError(t, err) - - hosts, err := r.MaintenanceHosts() - assert.Equal(t, i+1, len(hosts)) - } - - // clean up - for i := 0; i < len(offers); i++ { - _, err := r.EndMaintenance(offers[i].Hostname) - assert.NoError(t, err) - - // Monitor change to DRAINING and DRAINED mode - _, err = r.MonitorHostMaintenance( - []string{offers[i].Hostname}, - []aurora.MaintenanceMode{aurora.MaintenanceMode_NONE}, - 5*time.Second, - 10*time.Second) - assert.NoError(t, err) - } -} - -func TestRealisClient_AvailOfferReport(t *testing.T) { - var offers []realis.Offer - - i := 0 - for ; len(offers) < 3 && i < 5; i++ { - offers, _ = r.Offers() - time.Sleep(5 * time.Second) - } - - assert.NotEqual(t, i, 3) - - capacity, err := r.AvailOfferReport() - assert.NoError(t, err) - - // 2 groups for non-dedicated & dedicated - assert.Equal(t, 2, len(capacity)) - // 4 resources: cpus, disk, mem, ports - assert.Equal(t, 4, len(capacity["non-dedicated"])) -} - -func TestRealisClient_FitTasks(t *testing.T) { - var offers []realis.Offer - - i := 0 - for ; len(offers) < 3 && i < 5; i++ { - offers, _ = r.Offers() - time.Sleep(5 * time.Second) - } - - assert.NotEqual(t, i, 5) - - cpuPerOffer := 0.0 - for _, r := range offers[0].Resources { - if r.Name == "cpus" { - cpuPerOffer = r.Scalar.Value - } - } - - // make sure all offers have no running executor - for _, o := range offers { - assert.Equal(t, o.ExecutorIds[:0], o.ExecutorIds) - } - - validCpu := cpuPerOffer / 2 - inValidCpu := cpuPerOffer + 1 - gpu := int64(1) - - tests := []struct { - message string - role string - request aurora.Resource - constraints []*aurora.Constraint - expected int64 - isError bool - }{ - { - message: "task with gpu request", - role: "vagrant", - request: aurora.Resource{ - NumGpus: &gpu, - }, - expected: 0, - isError: false, - }, - { - message: "empty resource request", - role: "vagrant", - request: aurora.Resource{}, - expected: -1, - isError: true, - }, - { - message: "valid resource request", - role: "vagrant", - request: aurora.Resource{ - NumCpus: &validCpu, - }, - expected: 4, - isError: false, - }, - { - message: "invalid cpu request", - role: "vagrant", - request: aurora.Resource{ - NumCpus: &inValidCpu, - }, - expected: 0, - isError: false, - }, - { - message: "dedicated constraint", - role: "vagrant", - request: aurora.Resource{ - NumCpus: &validCpu, - }, - - constraints: []*aurora.Constraint{ - { - Name: "dedicated", - Constraint: &aurora.TaskConstraint{ - Value: &aurora.ValueConstraint{ - Negated: false, - Values: []string{"vagrant/bar"}, - }, - }, - }, - }, - expected: 2, - isError: false, - }, - { - message: "dedicated constraint with unauthorized role", - role: "unauthorized", - request: aurora.Resource{ - NumCpus: &validCpu, - }, - constraints: []*aurora.Constraint{ - { - Name: "dedicated", - Constraint: &aurora.TaskConstraint{ - Value: &aurora.ValueConstraint{ - Negated: false, - Values: []string{"vagrant/bar"}, - }, - }, - }, - }, - expected: 0, - isError: false, - }, - { - message: "value constraint on zone", - role: "vagrant", - request: aurora.Resource{ - NumCpus: &validCpu, - }, - constraints: []*aurora.Constraint{ - { - Name: "zone", - Constraint: &aurora.TaskConstraint{ - Value: &aurora.ValueConstraint{ - Negated: false, - Values: []string{"west"}, - }, - }, - }, - }, - expected: 4, - isError: false, - }, - { - message: "negative value constraint on zone", - role: "vagrant", - request: aurora.Resource{ - NumCpus: &validCpu, - }, - constraints: []*aurora.Constraint{ - { - Name: "zone", - Constraint: &aurora.TaskConstraint{ - Value: &aurora.ValueConstraint{ - Negated: true, - Values: []string{"west"}, - }, - }, - }, - }, - expected: 0, - isError: false, - }, - { - message: "negative value constraint on host", - role: "vagrant", - request: aurora.Resource{ - NumCpus: &validCpu, - }, - constraints: []*aurora.Constraint{ - { - Name: "host", - Constraint: &aurora.TaskConstraint{ - Value: &aurora.ValueConstraint{ - Negated: true, - Values: []string{"agent-one"}, - }, - }, - }, - }, - expected: 2, - isError: false, - }, - { - message: "value constraint on unavailable zone", - role: "vagrant", - request: aurora.Resource{ - NumCpus: &validCpu, - }, - constraints: []*aurora.Constraint{ - { - Name: "zone", - Constraint: &aurora.TaskConstraint{ - Value: &aurora.ValueConstraint{ - Negated: false, - Values: []string{"east"}, - }, - }, - }, - }, - expected: 0, - isError: false, - }, - { - message: "value constraint on unavailable attribute", - role: "vagrant", - request: aurora.Resource{ - NumCpus: &validCpu, - }, - constraints: []*aurora.Constraint{ - { - Name: "os", - Constraint: &aurora.TaskConstraint{ - Value: &aurora.ValueConstraint{ - Negated: false, - Values: []string{"windows"}, - }, - }, - }, - }, - expected: 0, - isError: false, - }, - { - message: "1 value constraint with 2 values", - role: "vagrant", - request: aurora.Resource{ - NumCpus: &validCpu, - }, - constraints: []*aurora.Constraint{ - { - Name: "host", - Constraint: &aurora.TaskConstraint{ - Value: &aurora.ValueConstraint{ - Negated: false, - Values: []string{"agent-one", "agent-two"}, - }, - }, - }, - }, - expected: 4, - isError: false, - }, - { - message: "2 value constraints", - role: "vagrant", - request: aurora.Resource{ - NumCpus: &validCpu, - }, - constraints: []*aurora.Constraint{ - { - Name: "host", - Constraint: &aurora.TaskConstraint{ - Value: &aurora.ValueConstraint{ - Negated: false, - Values: []string{"agent-one"}, - }, - }, - }, - { - Name: "rack", - Constraint: &aurora.TaskConstraint{ - Value: &aurora.ValueConstraint{ - Negated: false, - Values: []string{"2"}, - }, - }, - }, - }, - expected: 0, - isError: false, - }, - { - message: "limit constraint on host", - role: "vagrant", - request: aurora.Resource{ - NumCpus: &validCpu, - }, - constraints: []*aurora.Constraint{ - { - Name: "host", - Constraint: &aurora.TaskConstraint{ - Limit: &aurora.LimitConstraint{ - Limit: 1, - }, - }, - }, - }, - expected: 2, - isError: false, - }, - { - message: "limit constraint on zone", - role: "vagrant", - request: aurora.Resource{ - NumCpus: &validCpu, - }, - constraints: []*aurora.Constraint{ - { - Name: "zone", - Constraint: &aurora.TaskConstraint{ - Limit: &aurora.LimitConstraint{ - Limit: 1, - }, - }, - }, - }, - expected: 1, - isError: false, - }, - { - message: "limit constraint on zone & host", - role: "vagrant", - request: aurora.Resource{ - NumCpus: &validCpu, - }, - constraints: []*aurora.Constraint{ - { - Name: "host", - Constraint: &aurora.TaskConstraint{ - Limit: &aurora.LimitConstraint{ - Limit: 1, - }, - }, - }, - { - Name: "zone", - Constraint: &aurora.TaskConstraint{ - Limit: &aurora.LimitConstraint{ - Limit: 1, - }, - }, - }, - }, - expected: 1, - isError: false, - }, - { - message: "limit constraint on unavailable zone", - role: "vagrant", - request: aurora.Resource{ - NumCpus: &validCpu, - }, - constraints: []*aurora.Constraint{ - { - Name: "gpu-host", // no host has gpu-host attribute - Constraint: &aurora.TaskConstraint{ - Limit: &aurora.LimitConstraint{ - Limit: 1, - }, - }, - }, - }, - expected: 0, - isError: false, - }, - { - message: "limit & dedicated constraint", - role: "vagrant", - request: aurora.Resource{ - NumCpus: &validCpu, - }, - constraints: []*aurora.Constraint{ - { - Name: "dedicated", - Constraint: &aurora.TaskConstraint{ - Value: &aurora.ValueConstraint{ - Negated: false, - Values: []string{"vagrant/bar"}, - }, - }, - }, - { - Name: "host", - Constraint: &aurora.TaskConstraint{ - Limit: &aurora.LimitConstraint{ - Limit: 1, - }, - }, - }, - }, - expected: 1, - isError: false, - }, - } - - for _, tc := range tests { - task := aurora.NewTaskConfig() - task.Resources = []*aurora.Resource{&tc.request} - task.Constraints = tc.constraints - task.Job = &aurora.JobKey{ - Role: tc.role, - } - - numTasks, err := r.FitTasks(task, offers) - - if !tc.isError { - assert.NoError(t, err) - assert.Equal(t, tc.expected, numTasks, tc.message) - } else { - assert.Error(t, err) - } - } -} - -func TestRealisClient_JobExists(t *testing.T) { - role := "vagrant" - env := "prod" - name := "test_job_exists" - // Create a good single job - job := realis.NewJob(). - Environment(env). - Role(role). - Name(name). - ThermosExecutor(thermosExec). - CPU(.25). - RAM(4). - Disk(10). - InstanceCount(3). - IsService(true). - Production(false). - Tier("preemptible"). - Priority(0) - - // Check if job exists before creating - exists, err := r.JobExists(job.JobKey()) - assert.NoError(t, err) - assert.False(t, exists) - - err = r.CreateJob(job) - assert.NoError(t, err) - - exists, err = r.JobExists(job.JobKey()) - assert.NoError(t, err) - assert.True(t, exists) - - // Create a single bad job - badJob := realis.NewJob(). - Environment("prod"). - Role("vagrant"). - Name("executordoesntexist"). - ExecutorName("idontexist"). - ExecutorData(""). - CPU(.25). - RAM(4). - Disk(10). - InstanceCount(1) - - // Check if job exists before creating - exists, err = r.JobExists(badJob.JobKey()) - assert.NoError(t, err) - assert.False(t, exists) - - err = r.CreateJob(badJob) - assert.Error(t, err) - - exists, err = r.JobExists(badJob.JobKey()) - assert.NoError(t, err) - assert.False(t, exists) } diff --git a/response/response.go b/response/response.go index 15081ec..1663b1b 100644 --- a/response/response.go +++ b/response/response.go @@ -35,10 +35,6 @@ func ScheduleStatusResult(resp *aurora.Response) *aurora.ScheduleStatusResult_ { } func JobUpdateSummaries(resp *aurora.Response) []*aurora.JobUpdateSummary { - if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetGetJobUpdateSummariesResult_() == nil { - return nil - } - return resp.GetResult_().GetGetJobUpdateSummariesResult_().GetUpdateSummaries() } diff --git a/retry.go b/retry.go index f5b0918..9860e42 100644 --- a/retry.go +++ b/retry.go @@ -17,7 +17,10 @@ package realis import ( "io" "math/rand" + "net/http" "net/url" + "strconv" + "strings" "time" "github.com/apache/thrift/lib/go/thrift" @@ -26,11 +29,9 @@ import ( "github.com/pkg/errors" ) -// Backoff determines how the retry mechanism should react after each failure and how many failures it should -// tolerate. type Backoff struct { Duration time.Duration // the base duration - Factor float64 // Duration is multiplied by a factor each iteration + Factor float64 // Duration is multipled by factor each iteration Jitter float64 // The amount of jitter applied each iteration Steps int // Exit with error after this many steps } @@ -52,15 +53,18 @@ func Jitter(duration time.Duration, maxFactor float64) time.Duration { // if the loop should be aborted. type ConditionFunc func() (done bool, err error) -// ExponentialBackoff is a modified version of the Kubernetes exponential-backoff code. -// It repeats a condition check with exponential backoff and checks the condition up to -// Steps times, increasing the wait by multiplying the previous duration by Factor. +// Modified version of the Kubernetes exponential-backoff code. +// ExponentialBackoff repeats a condition check with exponential backoff. +// +// It checks the condition up to Steps times, increasing the wait by multiplying +// the previous duration by Factor. // // If Jitter is greater than zero, a random amount of each duration is added // (between duration and duration*(1+jitter)). // // If the condition never returns true, ErrWaitTimeout is returned. Errors // do not cause the function to return. + func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc) error { var err error var ok bool @@ -94,9 +98,10 @@ func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc) // If the error is temporary, continue retrying. if !IsTemporary(err) { return err + } else { + // Print out the temporary error we experienced. + logger.Println(err) } - // Print out the temporary error we experienced. - logger.Println(err) } } @@ -107,28 +112,19 @@ func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc) // Provide more information to the user wherever possible if err != nil { return newRetryError(errors.Wrap(err, "ran out of retries"), curStep) + } else { + return newRetryError(errors.New("ran out of retries"), curStep) } - - return newRetryError(errors.New("ran out of retries"), curStep) } type auroraThriftCall func() (resp *aurora.Response, err error) -// verifyOntimeout defines the type of function that will be used to verify whether a Thirft call to the Scheduler -// made it to the scheduler or not. In general, these types of functions will have to interact with the scheduler -// through the very same Thrift API which previously encountered a time-out from the client. -// This means that the functions themselves should be kept to a minimum number of Thrift calls. -// It should also be noted that this is a best effort mechanism and -// is likely to fail for the same reasons that the original call failed. -type verifyOnTimeout func() (*aurora.Response, bool) - // Duplicates the functionality of ExponentialBackoff but is specifically targeted towards ThriftCalls. -func (c *Client) thriftCallWithRetries(returnOnTimeout bool, thriftCall auroraThriftCall, - verifyOnTimeout verifyOnTimeout) (*aurora.Response, error) { +func (c *Client) thriftCallWithRetries(returnOnTimeout bool, thriftCall auroraThriftCall) (*aurora.Response, error) { var resp *aurora.Response var clientErr error var curStep int - timeouts := 0 + var timeouts int backoff := c.config.backoff duration := backoff.Duration @@ -142,10 +138,7 @@ func (c *Client) thriftCallWithRetries(returnOnTimeout bool, thriftCall auroraTh adjusted = Jitter(duration, backoff.Jitter) } - c.logger.Printf( - "A retryable error occurred during thrift call, backing off for %v before retry %v", - adjusted, - curStep) + c.logger.Printf("A retryable error occurred during thrift call, backing off for %v before retry %v\n", adjusted, curStep) time.Sleep(adjusted) duration = time.Duration(float64(duration) * backoff.Factor) @@ -160,132 +153,105 @@ func (c *Client) thriftCallWithRetries(returnOnTimeout bool, thriftCall auroraTh resp, clientErr = thriftCall() - c.logger.TracePrintf("Aurora Thrift Call ended resp: %v clientErr: %v", resp, clientErr) + c.logger.TracePrintf("Aurora Thrift Call ended resp: %v clientErr: %v\n", resp, clientErr) }() // Check if our thrift call is returning an error. This is a retryable event as we don't know // if it was caused by network issues. if clientErr != nil { + // Print out the error to the user - c.logger.Printf("Client Error: %v", clientErr) + c.logger.Printf("Client Error: %v\n", clientErr) - temporary, timedout := isConnectionError(clientErr) - if !temporary && c.RealisConfig().failOnPermanentErrors { - return nil, errors.Wrap(clientErr, "permanent connection error") - } + // Determine if error is a temporary URL error by going up the stack + e, ok := clientErr.(thrift.TTransportException) + if ok { + c.logger.DebugPrint("Encountered a transport exception") - // There exists a corner case where thrift payload was received by Aurora but - // connection timed out before Aurora was able to reply. - // Users can take special action on a timeout by using IsTimedout and reacting accordingly - // if they have configured the client to return on a timeout. - if timedout && returnOnTimeout { - return resp, newTimedoutError(errors.New("client connection closed before server answer")) + // TODO(rdelvalle): Figure out a better way to obtain the error code as this is a very brittle solution + // 401 Unauthorized means the wrong username and password were provided + if strings.Contains(e.Error(), strconv.Itoa(http.StatusUnauthorized)) { + return nil, errors.Wrap(clientErr, "wrong username or password provided") + } + + e, ok := e.Err().(*url.Error) + if ok { + // EOF error occurs when the server closes the read buffer of the client. This is common + // when the server is overloaded and should be retried. All other errors that are permanent + // will not be retried. + if e.Err != io.EOF && !e.Temporary() && c.RealisConfig().failOnPermanentErrors { + return nil, errors.Wrap(clientErr, "permanent connection error") + } + // Corner case where thrift payload was received by Aurora but connection timedout before Aurora was + // able to reply. In this case we will return whatever response was received and a TimedOut behaving + // error. Users can take special action on a timeout by using IsTimedout and reacting accordingly. + if e.Timeout() { + timeouts++ + c.logger.DebugPrintf( + "Client closed connection (timedout) %d times before server responded,"+ + " consider increasing connection timeout", + timeouts) + if returnOnTimeout { + return resp, + newTimedoutError(errors.New("client connection closed before server answer")) + } + } + } } // In the future, reestablish connection should be able to check if it is actually possible // to make a thrift call to Aurora. For now, a reconnect should always lead to a retry. // Ignoring error due to the fact that an error should be retried regardless - reestablishErr := c.ReestablishConn() - if reestablishErr != nil { - c.logger.DebugPrintf("error re-establishing connection ", reestablishErr) + _ = c.ReestablishConn() + + } else { + + // If there was no client error, but the response is nil, something went wrong. + // Ideally, we'll never encounter this but we're placing a safeguard here. + if resp == nil { + return nil, errors.New("response from aurora is nil") } - // If users did not opt for a return on timeout in order to react to a timedout error, - // attempt to verify that the call made it to the scheduler after the connection was re-established. - if timedout { - timeouts++ - c.logger.DebugPrintf( - "Client closed connection %d times before server responded, "+ - "consider increasing connection timeout", - timeouts) + // Check Response Code from thrift and make a decision to continue retrying or not. + switch responseCode := resp.GetResponseCode(); responseCode { - // Allow caller to provide a function which checks if the original call was successful before - // it timed out. - if verifyOnTimeout != nil { - if verifyResp, ok := verifyOnTimeout(); ok { - c.logger.Print("verified that the call went through successfully after a client timeout") - // Response here might be different than the original as it is no longer constructed - // by the scheduler but mimicked. - // This is OK since the scheduler is very unlikely to change responses at this point in its - // development cycle but we must be careful to not return an incorrectly constructed response. - return verifyResp, nil - } - } + // If the thrift call succeeded, stop retrying + case aurora.ResponseCode_OK: + return resp, nil + + // If the response code is transient, continue retrying + case aurora.ResponseCode_ERROR_TRANSIENT: + c.logger.Println("Aurora replied with Transient error code, retrying") + continue + + // Failure scenarios, these indicate a bad payload or a bad clientConfig. Stop retrying. + case aurora.ResponseCode_INVALID_REQUEST, + aurora.ResponseCode_ERROR, + aurora.ResponseCode_AUTH_FAILED, + aurora.ResponseCode_JOB_UPDATING_ERROR: + c.logger.Printf("Terminal Response Code %v from Aurora, won't retry\n", resp.GetResponseCode().String()) + return resp, errors.New(response.CombineMessage(resp)) + + // The only case that should fall down to here is a WARNING response code. + // It is currently not used as a response in the scheduler so it is unknown how to handle it. + default: + c.logger.DebugPrintf("unhandled response code %v received from Aurora\n", responseCode) + return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String()) } - - // Retry the thrift payload - continue } - // If there was no client error, but the response is nil, something went wrong. - // Ideally, we'll never encounter this but we're placing a safeguard here. - if resp == nil { - return nil, errors.New("response from aurora is nil") - } - - // Check Response Code from thrift and make a decision to continue retrying or not. - switch responseCode := resp.GetResponseCode(); responseCode { - - // If the thrift call succeeded, stop retrying - case aurora.ResponseCode_OK: - return resp, nil - - // If the response code is transient, continue retrying - case aurora.ResponseCode_ERROR_TRANSIENT: - c.logger.Println("Aurora replied with Transient error code, retrying") - continue - - // Failure scenarios, these indicate a bad payload or a bad clientConfig. Stop retrying. - case aurora.ResponseCode_INVALID_REQUEST, - aurora.ResponseCode_ERROR, - aurora.ResponseCode_AUTH_FAILED, - aurora.ResponseCode_JOB_UPDATING_ERROR: - c.logger.Printf("Terminal Response Code %v from Aurora, won't retry\n", resp.GetResponseCode().String()) - return resp, errors.New(response.CombineMessage(resp)) - - // The only case that should fall down to here is a WARNING response code. - // It is currently not used as a response in the scheduler so it is unknown how to handle it. - default: - c.logger.DebugPrintf("unhandled response code %v received from Aurora\n", responseCode) - return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String()) - } } + c.logger.DebugPrintf("it took %v retries to complete this operation\n", curStep) + if curStep > 1 { - c.config.logger.Printf("this thrift call was retried %d time(s)", curStep) + c.config.logger.Printf("retried this thrift call %d time(s)", curStep) } // Provide more information to the user wherever possible. if clientErr != nil { return nil, newRetryError(errors.Wrap(clientErr, "ran out of retries, including latest error"), curStep) + } else { + return nil, newRetryError(errors.New("ran out of retries"), curStep) } - - return nil, newRetryError(errors.New("ran out of retries"), curStep) -} - -// isConnectionError processes the error received by the client. -// The return values indicate whether this was determined to be a temporary error -// and whether it was determined to be a timeout error -func isConnectionError(err error) (bool, bool) { - - // Determine if error is a temporary URL error by going up the stack - transportException, ok := err.(thrift.TTransportException) - if !ok { - return false, false - } - - urlError, ok := transportException.Err().(*url.Error) - if !ok { - return false, false - } - - // EOF error occurs when the server closes the read buffer of the client. This is common - // when the server is overloaded and we consider it temporary. - // All other which are not temporary as per the member function Temporary(), - // are considered not temporary (permanent). - if urlError.Err != io.EOF && !urlError.Temporary() { - return false, false - } - - return true, urlError.Timeout() } diff --git a/runTestsMac.sh b/runTestsMac.sh index 2791ada..c54994a 100644 --- a/runTestsMac.sh +++ b/runTestsMac.sh @@ -1,4 +1,4 @@ #!/bin/bash # Since we run our docker compose setup in bridge mode to be able to run on MacOS, we have to launch a Docker container within the bridge network in order to avoid any routing issues. -docker run --rm -t -w /gorealis -v $GOPATH/pkg:/go/pkg -v $(pwd):/gorealis --network gorealis_aurora_cluster golang:1.17-buster go test -v github.com/aurora-scheduler/gorealis/v2 $@ +docker run --rm -t -v $(pwd):/go/src/github.com/aurora-scheduler/gorealis --network gorealis_aurora_cluster golang:1.13-stretch go test -v github.com/aurora-scheduler/gorealis $@ diff --git a/task.go b/task.go index e9aaa98..e4037c3 100644 --- a/task.go +++ b/task.go @@ -78,17 +78,12 @@ func TaskFromThrift(config *aurora.TaskConfig) *AuroraTask { Role(config.Job.Role). Name(config.Job.Name). MaxFailure(config.MaxTaskFailures). - IsService(config.IsService). - Priority(config.Priority) + IsService(config.IsService) if config.Tier != nil { newTask.Tier(*config.Tier) } - if config.Production != nil { - newTask.Production(*config.Production) - } - if config.ExecutorConfig != nil { newTask. ExecutorName(config.ExecutorConfig.Name). @@ -292,17 +287,6 @@ func (t *AuroraTask) IsService(isService bool) *AuroraTask { return t } -//set priority for preemption or priority-queueing -func (t *AuroraTask) Priority(priority int32) *AuroraTask { - t.task.Priority = priority - return t -} - -func (t *AuroraTask) Production(production bool) *AuroraTask { - t.task.Production = &production - return t -} - // Add a list of URIs with the same extract and cache configuration. Scheduler must have // --enable_mesos_fetcher flag enabled. Currently there is no duplicate detection. func (t *AuroraTask) AddURIs(extract bool, cache bool, values ...string) *AuroraTask { diff --git a/task_test.go b/task_test.go index 8c0a026..14d06c8 100644 --- a/task_test.go +++ b/task_test.go @@ -34,8 +34,6 @@ func TestAuroraTask_Clone(t *testing.T) { RAM(643). Disk(1000). IsService(true). - Priority(1). - Production(false). AddPorts(10). Tier("preferred"). MaxFailure(23). diff --git a/util.go b/util.go index a822b3f..90e3dcf 100644 --- a/util.go +++ b/util.go @@ -40,7 +40,7 @@ func init() { } } -// TerminalUpdateStates returns a slice containing all the terminal states an update may be in. +// TerminalJobUpdateStates returns a slice containing all the terminal states an update may end up in. // This is a function in order to avoid having a slice that can be accidentally mutated. func TerminalUpdateStates() []aurora.JobUpdateStatus { return []aurora.JobUpdateStatus{ @@ -104,23 +104,3 @@ func calculateCurrentBatch(updatingInstances int32, batchSizes []int32) int { } return batchCount } - -func ResourcesToMap(resources []*aurora.Resource) map[string]float64 { - result := map[string]float64{} - - for _, resource := range resources { - if resource.NumCpus != nil { - result["cpus"] += *resource.NumCpus - } else if resource.RamMb != nil { - result["mem"] += float64(*resource.RamMb) - } else if resource.DiskMb != nil { - result["disk"] += float64(*resource.DiskMb) - } else if resource.NamedPort != nil { - result["ports"]++ - } else if resource.NumGpus != nil { - result["gpus"] += float64(*resource.NumGpus) - } - } - - return result -} diff --git a/zk.go b/zk.go index b1c4ad4..9d5b659 100644 --- a/zk.go +++ b/zk.go @@ -286,208 +286,3 @@ func MesosFromZKOpts(options ...ZKOpt) (string, error) { return mesosURL, nil } - -// Retrieves current Aurora master nodes from ZK. -func MasterNodesFromZK(cluster Cluster) (map[string][]string, error) { - return MasterNodesFromZKOpts(ZKEndpoints(strings.Split(cluster.ZK, ",")...), ZKPath(cluster.SchedZKPath)) -} - -// Retrieves current Mesos master nodes/leader from ZK with a custom configuration. -func MasterNodesFromZKOpts(options ...ZKOpt) (map[string][]string, error) { - result := make(map[string][]string) - - // Load the default configuration for Zookeeper followed by overriding values with those provided by the caller. - config := &zkConfig{backoff: defaultBackoff, timeout: time.Second * 10, logger: NoopLogger{}} - for _, opt := range options { - opt(config) - } - - if len(config.endpoints) == 0 { - return nil, errors.New("no Zookeeper endpoints supplied") - } - - if config.path == "" { - return nil, errors.New("no Zookeeper path supplied") - } - - // Create a closure that allows us to use the ExponentialBackoff function. - retryErr := ExponentialBackoff(config.backoff, config.logger, func() (bool, error) { - - c, _, err := zk.Connect(config.endpoints, config.timeout, func(c *zk.Conn) { c.SetLogger(config.logger) }) - if err != nil { - return false, NewTemporaryError(errors.Wrap(err, "Failed to connect to Zookeeper")) - } - - defer c.Close() - - // Open up descriptor for the ZK path given - children, _, _, err := c.ChildrenW(config.path) - if err != nil { - - // Sentinel error check as there is no other way to check. - if err == zk.ErrInvalidPath { - return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", config.path) - } - - return false, - NewTemporaryError(errors.Wrapf(err, "path %s doesn't exist on Zookeeper ", config.path)) - } - - // Get all the master nodes through all the children in the given path - serviceInst := new(ServiceInstance) - var hosts []string - for _, child := range children { - childPath := config.path + "/" + child - data, _, err := c.Get(childPath) - if err != nil { - if err == zk.ErrInvalidPath { - return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", childPath) - } - - return false, NewTemporaryError(errors.Wrap(err, "error fetching contents of leader")) - } - // Only leader is in json format. Have to parse data differently between member_ and not member_ - if strings.HasPrefix(child, "member_") { - err = json.Unmarshal([]byte(data), &serviceInst) - if err != nil { - return false, - NewTemporaryError(errors.Wrap(err, "unable to unmarshal contents of leader")) - } - // Should only be one endpoint. - // This should never be encountered as it would indicate Aurora - // writing bad info into Zookeeper but is kept here as a safety net. - if len(serviceInst.AdditionalEndpoints) > 1 { - return false, - NewTemporaryError( - errors.New("ambiguous endpoints in json blob, Aurora wrote bad info to ZK")) - } - - for _, v := range serviceInst.AdditionalEndpoints { - result["leader"] = append(result["leader"], v.Host) - } - } else { - // data is not in a json format - hosts = append(hosts, string(data)) - } - } - result["masterNodes"] = hosts - - // Master nodes data might not be available yet, try to fetch again. - if len(result["masterNodes"]) == 0 { - return false, NewTemporaryError(errors.New("no master nodes found")) - } - return true, nil - }) - - if retryErr != nil { - config.logger.Printf("Failed to get master nodes after %v attempts", config.backoff.Steps) - return nil, retryErr - } - - return result, nil -} - -// Retrieves current Mesos Aurora master nodes from ZK. -func MesosMasterNodesFromZK(cluster Cluster) (map[string][]string, error) { - return MesosMasterNodesFromZKOpts(ZKEndpoints(strings.Split(cluster.ZK, ",")...), ZKPath(cluster.MesosZKPath)) -} - -// Retrieves current mesos master nodes/leader from ZK with a custom configuration. -func MesosMasterNodesFromZKOpts(options ...ZKOpt) (map[string][]string, error) { - result := make(map[string][]string) - - // Load the default configuration for Zookeeper followed by overriding values with those provided by the caller.] - config := &zkConfig{backoff: defaultBackoff, timeout: time.Second * 10, logger: NoopLogger{}} - for _, opt := range options { - opt(config) - } - - if len(config.endpoints) == 0 { - return nil, errors.New("no Zookeeper endpoints supplied") - } - - if config.path == "" { - return nil, errors.New("no Zookeeper path supplied") - } - - // Create a closure that allows us to use the ExponentialBackoff function. - retryErr := ExponentialBackoff(config.backoff, config.logger, func() (bool, error) { - - c, _, err := zk.Connect(config.endpoints, config.timeout, func(c *zk.Conn) { c.SetLogger(config.logger) }) - if err != nil { - return false, NewTemporaryError(errors.Wrap(err, "Failed to connect to Zookeeper")) - } - - defer c.Close() - - // Open up descriptor for the ZK path given - children, _, _, err := c.ChildrenW(config.path) - if err != nil { - - // Sentinel error check as there is no other way to check. - if err == zk.ErrInvalidPath { - return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", config.path) - } - - return false, - NewTemporaryError(errors.Wrapf(err, "path %s doesn't exist on Zookeeper ", config.path)) - } - - // Get all the master nodes through all the children in the given path - minScore := math.MaxInt64 - var mesosInstance MesosInstance - var hosts []string - for _, child := range children { - // Only the master nodes will start with json.info_ - if strings.HasPrefix(child, "json.info_") { - strs := strings.Split(child, "_") - if len(strs) < 2 { - config.logger.Printf("Zk node %v/%v's name is malformed.", config.path, child) - continue - } - score, err := strconv.Atoi(strs[1]) - if err != nil { - return false, NewTemporaryError(errors.Wrap(err, "unable to read the zk node for Mesos.")) - } - - childPath := config.path + "/" + child - data, _, err := c.Get(childPath) - if err != nil { - if err == zk.ErrInvalidPath { - return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", childPath) - } - - return false, NewTemporaryError(errors.Wrap(err, "error fetching contents of leader")) - } - - err = json.Unmarshal([]byte(data), &mesosInstance) - if err != nil { - config.logger.Printf("%s", err) - return false, - NewTemporaryError(errors.Wrap(err, "unable to unmarshal contents of leader")) - } - // Combine all master nodes into comma-separated - // Return hostname instead of ip to be consistent with aurora master nodes - hosts = append(hosts, mesosInstance.Address.Hostname) - // get the leader from the child with the smallest score. - if score < minScore { - minScore = score - result["leader"] = append(result["leader"], mesosInstance.Address.Hostname) - } - } - } - result["masterNodes"] = hosts - // Master nodes data might not be available yet, try to fetch again. - if len(result["masterNodes"]) == 0 { - return false, NewTemporaryError(errors.New("no mesos master nodes found")) - } - return true, nil - }) - - if retryErr != nil { - config.logger.Printf("Failed to get mesos master nodes after %v attempts", config.backoff.Steps) - return nil, retryErr - } - - return result, nil -}