commit
364ee93202
3 changed files with 117 additions and 6 deletions
10
.github/workflows/main.yml
vendored
10
.github/workflows/main.yml
vendored
|
@ -10,17 +10,17 @@ jobs:
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v2
|
- uses: actions/checkout@v2
|
||||||
- name: Setup Go for use with actions
|
- name: Setup Go for use with actions
|
||||||
uses: actions/setup-go@v1.0.0
|
uses: actions/setup-go@v2
|
||||||
with:
|
with:
|
||||||
version: 1.13
|
go-version: 1.15
|
||||||
- name: Install goimports
|
- name: Install goimports
|
||||||
run: go get golang.org/x/tools/cmd/goimports
|
run: go get golang.org/x/tools/cmd/goimports
|
||||||
|
- name: Set env with list of directories in repo containin go code
|
||||||
|
run: echo GO_USR_DIRS=$(go list -f {{.Dir}} ./... | grep -E -v "/gen-go/|/vendor/") >> $GITHUB_ENV
|
||||||
- name: Run goimports check
|
- name: Run goimports check
|
||||||
run: test -z "`for d in $GO_USR_DIRS; do goimports -d $d/*.go | tee /dev/stderr; done`"
|
run: test -z "`for d in $GO_USR_DIRS; do goimports -d $d/*.go | tee /dev/stderr; done`"
|
||||||
env:
|
|
||||||
GO_USR_DIRS: $(go list -f {{.Dir}} ./... | grep -E -v "/gen-go/|/vendor/")
|
|
||||||
- name: Create aurora/mesos docker cluster
|
- name: Create aurora/mesos docker cluster
|
||||||
run: docker-compose up -d
|
run: docker-compose up -d
|
||||||
- name: Run tests
|
- name: Run tests
|
||||||
run: go test -timeout 30m -race -coverprofile=coverage.txt -covermode=atomic -v github.com/aurora-scheduler/gorealis/v2
|
run: go test -timeout 35m -race -coverprofile=coverage.txt -covermode=atomic -v github.com/aurora-scheduler/gorealis/v2
|
||||||
|
|
||||||
|
|
|
@ -137,10 +137,11 @@ func (j *JobUpdate) SlaAware(slaAware bool) *JobUpdate {
|
||||||
j.request.Settings.SlaAware = &slaAware
|
j.request.Settings.SlaAware = &slaAware
|
||||||
return j
|
return j
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddInstanceRange allows updates to only touch a certain specific range of instances
|
// AddInstanceRange allows updates to only touch a certain specific range of instances
|
||||||
func (j *JobUpdate) AddInstanceRange(first, last int32) *JobUpdate {
|
func (j *JobUpdate) AddInstanceRange(first, last int32) *JobUpdate {
|
||||||
j.request.Settings.UpdateOnlyTheseInstances = append(j.request.Settings.UpdateOnlyTheseInstances,
|
j.request.Settings.UpdateOnlyTheseInstances = append(j.request.Settings.UpdateOnlyTheseInstances,
|
||||||
&aurora.Range{First: first, Last: last})
|
&aurora.Range{First: first, Last: last})
|
||||||
return j
|
return j
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
110
zk.go
110
zk.go
|
@ -16,6 +16,7 @@ package realis
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"math"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
@ -35,6 +36,18 @@ type ServiceInstance struct {
|
||||||
Status string `json:"status"`
|
Status string `json:"status"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type MesosAddress struct {
|
||||||
|
Hostname string `json:"hostname"`
|
||||||
|
IP string `json:"ip"`
|
||||||
|
Port uint16 `json:"port"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// MesosInstance is defined for mesos json stored in ZK.
|
||||||
|
type MesosInstance struct {
|
||||||
|
Address MesosAddress `json:"address"`
|
||||||
|
Version string `json:"version"`
|
||||||
|
}
|
||||||
|
|
||||||
type zkConfig struct {
|
type zkConfig struct {
|
||||||
endpoints []string
|
endpoints []string
|
||||||
path string
|
path string
|
||||||
|
@ -176,3 +189,100 @@ func LeaderFromZKOpts(options ...ZKOpt) (string, error) {
|
||||||
|
|
||||||
return leaderURL, nil
|
return leaderURL, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Retrieves current mesos leader from ZK with a custom configuration.
|
||||||
|
func MesosFromZKOpts(options ...ZKOpt) (string, error) {
|
||||||
|
var mesosURL string
|
||||||
|
|
||||||
|
// Load the default configuration for Zookeeper followed by overriding values with those provided by the caller.
|
||||||
|
config := &zkConfig{backoff: defaultBackoff, timeout: time.Second * 10, logger: NoopLogger{}}
|
||||||
|
for _, opt := range options {
|
||||||
|
opt(config)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(config.endpoints) == 0 {
|
||||||
|
return "", errors.New("no Zookeeper endpoints supplied")
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.path == "" {
|
||||||
|
return "", errors.New("no Zookeeper path supplied")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a closure that allows us to use the ExponentialBackoff function.
|
||||||
|
retryErr := ExponentialBackoff(config.backoff, config.logger, func() (bool, error) {
|
||||||
|
|
||||||
|
c, _, err := zk.Connect(config.endpoints, config.timeout, func(c *zk.Conn) { c.SetLogger(config.logger) })
|
||||||
|
if err != nil {
|
||||||
|
return false, NewTemporaryError(errors.Wrap(err, "Failed to connect to Zookeeper"))
|
||||||
|
}
|
||||||
|
|
||||||
|
defer c.Close()
|
||||||
|
|
||||||
|
// Open up descriptor for the ZK path given
|
||||||
|
children, _, _, err := c.ChildrenW(config.path)
|
||||||
|
if err != nil {
|
||||||
|
|
||||||
|
// Sentinel error check as there is no other way to check.
|
||||||
|
if err == zk.ErrInvalidPath {
|
||||||
|
return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", config.path)
|
||||||
|
}
|
||||||
|
|
||||||
|
return false,
|
||||||
|
NewTemporaryError(errors.Wrapf(err, "path %s doesn't exist on Zookeeper ", config.path))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Search for the leader through all the children in the given path
|
||||||
|
minScore := math.MaxInt64
|
||||||
|
var mesosInstance MesosInstance
|
||||||
|
for _, child := range children {
|
||||||
|
// Only the leader will start with json.info_
|
||||||
|
if strings.HasPrefix(child, "json.info_") {
|
||||||
|
strs := strings.Split(child, "_")
|
||||||
|
if len(strs) < 2 {
|
||||||
|
config.logger.Printf("Zk node %v/%v's name is malformed.", config.path, child)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
score, err := strconv.Atoi(strs[1])
|
||||||
|
if err != nil {
|
||||||
|
return false, NewTemporaryError(errors.Wrap(err, "unable to read the zk node for Mesos."))
|
||||||
|
}
|
||||||
|
|
||||||
|
// get the leader from the child with the smallest score.
|
||||||
|
if score < minScore {
|
||||||
|
minScore = score
|
||||||
|
childPath := config.path + "/" + child
|
||||||
|
data, _, err := c.Get(childPath)
|
||||||
|
if err != nil {
|
||||||
|
if err == zk.ErrInvalidPath {
|
||||||
|
return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", childPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
return false, NewTemporaryError(errors.Wrap(err, "error fetching contents of leader"))
|
||||||
|
}
|
||||||
|
|
||||||
|
err = json.Unmarshal([]byte(data), &mesosInstance)
|
||||||
|
if err != nil {
|
||||||
|
config.logger.Printf("%s", err)
|
||||||
|
return false,
|
||||||
|
NewTemporaryError(errors.Wrap(err, "unable to unmarshal contents of leader"))
|
||||||
|
}
|
||||||
|
|
||||||
|
mesosURL = mesosInstance.Address.IP + ":" + strconv.Itoa(int(mesosInstance.Address.Port))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(mesosURL) > 0 {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Leader data might not be available yet, try to fetch again.
|
||||||
|
return false, NewTemporaryError(errors.New("no leader found"))
|
||||||
|
})
|
||||||
|
|
||||||
|
if retryErr != nil {
|
||||||
|
config.logger.Printf("Failed to determine leader after %v attempts", config.backoff.Steps)
|
||||||
|
return "", retryErr
|
||||||
|
}
|
||||||
|
|
||||||
|
return mesosURL, nil
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue