V1 CreateService and StartJobUpdate Timeout signal and cleanup (#105)
* Bumped up version to 1.21.1 * Moving admin functions to a new file. They are still part of the same pointer receiver type. * Removing dead code and fixing some comments to add space between backslash and comment. * Adding set up and tear down to run tests script. It sets up a pod, runs all tests, and then tears down the pod. * Added `--rm` to run tests Mac script. * Removing cookie jar from transport layer as it's not needed. * Changing all error messages to start with a lower case letter. Changing some messages around to be more descriptive. * Adding an argument to allow the retry mechanism to stop if a timeout has been encountered. This is useful for mutating API calls. Only StartUpdate and CreateService have enabled by default stop at timeout. * Added 2 tests for when a call goes through despite the client timing out. One is with a good payload, one is with a bad payload. * Updating changelog with information about the error type returned. * Adding test for duplicate metadata. * Refactored JobUpdateStatus monitor to use a new monitor called JobUpdateQuery. Update monitor will now still continue if it does not find an update to monitor. Furthermore, it has been optimized to reduce returning payloads from the scheduler as much as possible. This is through using the GetJobUpdateSummaries API instead of JobUpdateDetails and by including a the statuses we're searching for as part of the query. * Added documentation as to how to handle a timeout on an API request. * Optimized GetInstancesIds to create a copy of the JobKey being passed down in order to avoid unexpected behavior. Instead of setting every variable name separately, now a JobKey array is being created.
This commit is contained in:
parent
e16e390afe
commit
1a15c4a5aa
12 changed files with 661 additions and 475 deletions
|
@ -1,3 +1,9 @@
|
|||
1.21.1 (unreleased)
|
||||
|
||||
* CreateService and StartJobUpdate do not continue retrying if a timeout has been encountered
|
||||
by the HTTP client. Instead they now return an error that conforms to the Timedout interface.
|
||||
Users can check for a Timedout error by using `realis.IsTimeout(err)`.
|
||||
|
||||
1.21.0
|
||||
|
||||
* Version numbering change. Future versions will be labled X.Y.Z where X is the major version, Y is the Aurora version the library has been tested against (e.g. 21 -> 0.21.0), and X is the minor revision.
|
||||
|
|
|
@ -57,4 +57,19 @@ updateJob := realis.NewUpdateJob(job)
|
|||
updateJob.InstanceCount(1)
|
||||
updateJob.Ram(128)
|
||||
msg, err := r.UpdateJob(updateJob, "")
|
||||
```
|
||||
```
|
||||
|
||||
|
||||
* Handling a timeout scenario:
|
||||
|
||||
When sending an API call to Aurora, the call may timeout at the client side.
|
||||
This means that the time limit has been reached while waiting for the scheduler
|
||||
to reply. In such a case it is recommended that the timeout is increased through
|
||||
the use of the `realis.TimeoutMS()` option.
|
||||
|
||||
As these timeouts cannot be totally avoided, there exists a mechanism to mitigate such
|
||||
scenarios. The `StartJobUpdate` and `CreateService` API will return an error that
|
||||
implements the Timeout interface.
|
||||
|
||||
An error can be checked to see if it is a Timeout error by using the `realis.IsTimeout()`
|
||||
function.
|
|
@ -31,7 +31,7 @@ var cmd, executor, url, clustersConfig, clusterName, updateId, username, passwor
|
|||
var caCertsPath string
|
||||
var clientKey, clientCert string
|
||||
|
||||
var CONNECTION_TIMEOUT = 20000
|
||||
var ConnectionTimeout = 20000
|
||||
|
||||
func init() {
|
||||
flag.StringVar(&cmd, "cmd", "", "Job request type to send to Aurora Scheduler")
|
||||
|
@ -82,7 +82,7 @@ func main() {
|
|||
clientOptions := []realis.ClientOption{
|
||||
realis.BasicAuth(username, password),
|
||||
realis.ThriftJSON(),
|
||||
realis.TimeoutMS(CONNECTION_TIMEOUT),
|
||||
realis.TimeoutMS(ConnectionTimeout),
|
||||
realis.BackOff(realis.Backoff{
|
||||
Steps: 2,
|
||||
Duration: 10 * time.Second,
|
||||
|
@ -92,7 +92,6 @@ func main() {
|
|||
realis.Debug(),
|
||||
}
|
||||
|
||||
//check if zkUrl is available.
|
||||
if zkUrl != "" {
|
||||
fmt.Println("zkUrl: ", zkUrl)
|
||||
clientOptions = append(clientOptions, realis.ZKUrl(zkUrl))
|
||||
|
|
2
job.go
2
job.go
|
@ -123,7 +123,7 @@ func (j *AuroraJob) Environment(env string) Job {
|
|||
func (j *AuroraJob) Role(role string) Job {
|
||||
j.jobConfig.Key.Role = role
|
||||
|
||||
//Will be deprecated
|
||||
// Will be deprecated
|
||||
identity := &aurora.Identity{User: role}
|
||||
j.jobConfig.Owner = identity
|
||||
j.jobConfig.TaskConfig.Owner = identity
|
||||
|
|
80
monitors.go
80
monitors.go
|
@ -19,7 +19,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/paypal/gorealis/gen-go/apache/aurora"
|
||||
"github.com/paypal/gorealis/response"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
|
@ -30,16 +29,20 @@ type Monitor struct {
|
|||
// Polls the scheduler every certain amount of time to see if the update has succeeded
|
||||
func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout int) (bool, error) {
|
||||
|
||||
status, err := m.JobUpdateStatus(updateKey,
|
||||
map[aurora.JobUpdateStatus]bool{
|
||||
aurora.JobUpdateStatus_ROLLED_FORWARD: true,
|
||||
aurora.JobUpdateStatus_ROLLED_BACK: true,
|
||||
aurora.JobUpdateStatus_ABORTED: true,
|
||||
aurora.JobUpdateStatus_ERROR: true,
|
||||
aurora.JobUpdateStatus_FAILED: true,
|
||||
updateQ := aurora.JobUpdateQuery{
|
||||
Key: &updateKey,
|
||||
Limit: 1,
|
||||
UpdateStatuses: []aurora.JobUpdateStatus{
|
||||
aurora.JobUpdateStatus_ROLLED_FORWARD,
|
||||
aurora.JobUpdateStatus_ROLLED_BACK,
|
||||
aurora.JobUpdateStatus_ABORTED,
|
||||
aurora.JobUpdateStatus_ERROR,
|
||||
aurora.JobUpdateStatus_FAILED,
|
||||
},
|
||||
time.Duration(interval)*time.Second,
|
||||
time.Duration(timeout)*time.Second)
|
||||
}
|
||||
updateSummaries, err := m.JobUpdateQuery(updateQ, time.Duration(interval)*time.Second, time.Duration(timeout)*time.Second)
|
||||
|
||||
status := updateSummaries[0].State.Status
|
||||
|
||||
if err != nil {
|
||||
return false, err
|
||||
|
@ -52,22 +55,43 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout
|
|||
switch status {
|
||||
case aurora.JobUpdateStatus_ROLLED_FORWARD:
|
||||
return true, nil
|
||||
case aurora.JobUpdateStatus_ROLLED_BACK, aurora.JobUpdateStatus_ABORTED, aurora.JobUpdateStatus_ERROR, aurora.JobUpdateStatus_FAILED:
|
||||
case aurora.JobUpdateStatus_ROLLED_BACK,
|
||||
aurora.JobUpdateStatus_ABORTED,
|
||||
aurora.JobUpdateStatus_ERROR,
|
||||
aurora.JobUpdateStatus_FAILED:
|
||||
return false, errors.Errorf("bad terminal state for update: %v", status)
|
||||
default:
|
||||
return false, errors.Errorf("unexpected update state: %v", status)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Monitor) JobUpdateStatus(updateKey aurora.JobUpdateKey,
|
||||
func (m *Monitor) JobUpdateStatus(
|
||||
updateKey aurora.JobUpdateKey,
|
||||
desiredStatuses map[aurora.JobUpdateStatus]bool,
|
||||
interval time.Duration,
|
||||
timeout time.Duration) (aurora.JobUpdateStatus, error) {
|
||||
|
||||
updateQ := aurora.JobUpdateQuery{
|
||||
Key: &updateKey,
|
||||
Limit: 1,
|
||||
desiredStatusesSlice := make([]aurora.JobUpdateStatus, 0)
|
||||
|
||||
for k := range desiredStatuses {
|
||||
desiredStatusesSlice = append(desiredStatusesSlice, k)
|
||||
}
|
||||
|
||||
updateQ := aurora.JobUpdateQuery{
|
||||
Key: &updateKey,
|
||||
Limit: 1,
|
||||
UpdateStatuses: desiredStatusesSlice,
|
||||
}
|
||||
summary, err := m.JobUpdateQuery(updateQ, interval, timeout)
|
||||
|
||||
return summary[0].State.Status, err
|
||||
}
|
||||
|
||||
func (m *Monitor) JobUpdateQuery(
|
||||
updateQuery aurora.JobUpdateQuery,
|
||||
interval time.Duration,
|
||||
timeout time.Duration) ([]*aurora.JobUpdateSummary, error) {
|
||||
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
timer := time.NewTimer(timeout)
|
||||
|
@ -78,25 +102,18 @@ func (m *Monitor) JobUpdateStatus(updateKey aurora.JobUpdateKey,
|
|||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
respDetail, cliErr = m.Client.JobUpdateDetails(updateQ)
|
||||
respDetail, cliErr = m.Client.GetJobUpdateSummaries(&updateQuery)
|
||||
if cliErr != nil {
|
||||
return aurora.JobUpdateStatus(-1), cliErr
|
||||
return nil, cliErr
|
||||
}
|
||||
|
||||
updateDetail := response.JobUpdateDetails(respDetail)
|
||||
|
||||
if len(updateDetail) == 0 {
|
||||
m.Client.RealisConfig().logger.Println("No update found")
|
||||
return aurora.JobUpdateStatus(-1), errors.New("No update found for " + updateKey.String())
|
||||
}
|
||||
status := updateDetail[0].Update.Summary.State.Status
|
||||
|
||||
if _, ok := desiredStatuses[status]; ok {
|
||||
return status, nil
|
||||
updateSummaries := respDetail.Result_.GetJobUpdateSummariesResult_.UpdateSummaries
|
||||
if len(updateSummaries) >= 1 {
|
||||
return updateSummaries, nil
|
||||
}
|
||||
|
||||
case <-timer.C:
|
||||
return aurora.JobUpdateStatus(-1), newTimedoutError(errors.New("job update monitor timed out"))
|
||||
return nil, newTimedoutError(errors.New("job update monitor timed out"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -109,7 +126,12 @@ func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval, timeo
|
|||
// Monitor a Job until all instances enter a desired status.
|
||||
// Defaults sets of desired statuses provided by the thrift API include:
|
||||
// ACTIVE_STATES, SLAVE_ASSIGNED_STATES, LIVE_STATES, and TERMINAL_STATES
|
||||
func (m *Monitor) ScheduleStatus(key *aurora.JobKey, instanceCount int32, desiredStatuses map[aurora.ScheduleStatus]bool, interval, timeout int) (bool, error) {
|
||||
func (m *Monitor) ScheduleStatus(
|
||||
key *aurora.JobKey,
|
||||
instanceCount int32,
|
||||
desiredStatuses map[aurora.ScheduleStatus]bool,
|
||||
interval int,
|
||||
timeout int) (bool, error) {
|
||||
|
||||
ticker := time.NewTicker(time.Second * time.Duration(interval))
|
||||
defer ticker.Stop()
|
||||
|
|
611
realis.go
611
realis.go
|
@ -23,7 +23,6 @@ import (
|
|||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
"net/http/cookiejar"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
|
@ -111,7 +110,7 @@ type RealisConfig struct {
|
|||
logger *LevelLogger
|
||||
InsecureSkipVerify bool
|
||||
certspath string
|
||||
clientkey, clientcert string
|
||||
clientKey, clientCert string
|
||||
options []ClientOption
|
||||
debug bool
|
||||
trace bool
|
||||
|
@ -125,6 +124,8 @@ var defaultBackoff = Backoff{
|
|||
Jitter: 0.1,
|
||||
}
|
||||
|
||||
const APIPath = "/api"
|
||||
|
||||
type ClientOption func(*RealisConfig)
|
||||
|
||||
//Config sets for options in RealisConfig.
|
||||
|
@ -204,7 +205,7 @@ func Certspath(certspath string) ClientOption {
|
|||
|
||||
func ClientCerts(clientKey, clientCert string) ClientOption {
|
||||
return func(config *RealisConfig) {
|
||||
config.clientkey, config.clientcert = clientKey, clientCert
|
||||
config.clientKey, config.clientCert = clientKey, clientCert
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -240,24 +241,24 @@ func Trace() ClientOption {
|
|||
func newTJSONTransport(url string, timeout int, config *RealisConfig) (thrift.TTransport, error) {
|
||||
trans, err := defaultTTransport(url, timeout, config)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Error creating realis")
|
||||
return nil, errors.Wrap(err, "unable to create transport")
|
||||
}
|
||||
httpTrans := (trans).(*thrift.THttpClient)
|
||||
httpTrans.SetHeader("Content-Type", "application/x-thrift")
|
||||
httpTrans.SetHeader("User-Agent", "GoRealis v"+VERSION)
|
||||
httpTrans.SetHeader("User-Agent", "gorealis v"+VERSION)
|
||||
return trans, err
|
||||
}
|
||||
|
||||
func newTBinTransport(url string, timeout int, config *RealisConfig) (thrift.TTransport, error) {
|
||||
trans, err := defaultTTransport(url, timeout, config)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Error creating realis")
|
||||
return nil, errors.Wrap(err, "unable to create transport")
|
||||
}
|
||||
httpTrans := (trans).(*thrift.THttpClient)
|
||||
httpTrans.DelHeader("Content-Type") // Workaround for using thrift HttpPostClient
|
||||
httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary")
|
||||
httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary")
|
||||
httpTrans.SetHeader("User-Agent", "GoRealis v"+VERSION)
|
||||
httpTrans.SetHeader("User-Agent", "gorealis v"+VERSION)
|
||||
|
||||
return trans, err
|
||||
}
|
||||
|
@ -306,7 +307,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
|
|||
|
||||
config.logger.DebugPrintln("Number of options applied to config: ", len(options))
|
||||
|
||||
//Set default Transport to JSON if needed.
|
||||
// Set default Transport to JSON if needed.
|
||||
if !config.jsonTransport && !config.binTransport {
|
||||
config.jsonTransport = true
|
||||
}
|
||||
|
@ -318,7 +319,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
|
|||
if config.zkOptions != nil {
|
||||
url, err = LeaderFromZKOpts(config.zkOptions...)
|
||||
if err != nil {
|
||||
return nil, NewTemporaryError(errors.Wrap(err, "LeaderFromZK error"))
|
||||
return nil, NewTemporaryError(errors.Wrap(err, "unable to use zk to get leader"))
|
||||
}
|
||||
config.logger.Println("Scheduler URL from ZK: ", url)
|
||||
} else if config.cluster != nil {
|
||||
|
@ -327,20 +328,20 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
|
|||
url, err = LeaderFromZK(*config.cluster)
|
||||
// If ZK is configured, throw an error if the leader is unable to be determined
|
||||
if err != nil {
|
||||
return nil, NewTemporaryError(errors.Wrap(err, "LeaderFromZK error"))
|
||||
return nil, NewTemporaryError(errors.Wrap(err, "unable to use zk to get leader "))
|
||||
}
|
||||
config.logger.Println("Scheduler URL from ZK: ", url)
|
||||
} else if config.url != "" {
|
||||
url = config.url
|
||||
config.logger.Println("Scheduler URL: ", url)
|
||||
} else {
|
||||
return nil, errors.New("Incomplete Options -- url, cluster.json, or Zookeeper address required")
|
||||
return nil, errors.New("incomplete Options -- url, cluster.json, or Zookeeper address required")
|
||||
}
|
||||
|
||||
if config.jsonTransport {
|
||||
trans, err := newTJSONTransport(url, config.timeoutms, config)
|
||||
if err != nil {
|
||||
return nil, NewTemporaryError(errors.Wrap(err, "Error creating realis"))
|
||||
return nil, NewTemporaryError(err)
|
||||
}
|
||||
config.transport = trans
|
||||
config.protoFactory = thrift.NewTJSONProtocolFactory()
|
||||
|
@ -348,7 +349,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
|
|||
} else if config.binTransport {
|
||||
trans, err := newTBinTransport(url, config.timeoutms, config)
|
||||
if err != nil {
|
||||
return nil, NewTemporaryError(errors.Wrap(err, "Error creating realis"))
|
||||
return nil, NewTemporaryError(err)
|
||||
}
|
||||
config.transport = trans
|
||||
config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault()
|
||||
|
@ -383,15 +384,15 @@ func GetDefaultClusterFromZKUrl(zkurl string) *Cluster {
|
|||
}
|
||||
}
|
||||
|
||||
func GetCerts(certpath string) (*x509.CertPool, error) {
|
||||
func GetCerts(certPath string) (*x509.CertPool, error) {
|
||||
globalRootCAs := x509.NewCertPool()
|
||||
caFiles, err := ioutil.ReadDir(certpath)
|
||||
caFiles, err := ioutil.ReadDir(certPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, cert := range caFiles {
|
||||
capathfile := filepath.Join(certpath, cert.Name())
|
||||
caCert, err := ioutil.ReadFile(capathfile)
|
||||
caPathFile := filepath.Join(certPath, cert.Name())
|
||||
caCert, err := ioutil.ReadFile(caPathFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -401,35 +402,29 @@ func GetCerts(certpath string) (*x509.CertPool, error) {
|
|||
}
|
||||
|
||||
// Creates a default Thrift Transport object for communications in gorealis using an HTTP Post Client
|
||||
func defaultTTransport(urlstr string, timeoutms int, config *RealisConfig) (thrift.TTransport, error) {
|
||||
jar, err := cookiejar.New(nil)
|
||||
if err != nil {
|
||||
return &thrift.THttpClient{}, errors.Wrap(err, "Error creating Cookie Jar")
|
||||
}
|
||||
func defaultTTransport(url string, timeoutMs int, config *RealisConfig) (thrift.TTransport, error) {
|
||||
var transport http.Transport
|
||||
if config != nil {
|
||||
tlsConfig := &tls.Config{}
|
||||
if config.InsecureSkipVerify {
|
||||
tlsConfig.InsecureSkipVerify = true
|
||||
}
|
||||
tlsConfig := &tls.Config{InsecureSkipVerify: config.InsecureSkipVerify}
|
||||
|
||||
if config.certspath != "" {
|
||||
rootCAs, err := GetCerts(config.certspath)
|
||||
if err != nil {
|
||||
config.logger.Println("error occured couldn't fetch certs")
|
||||
config.logger.Println("error occurred couldn't fetch certs")
|
||||
return nil, err
|
||||
}
|
||||
tlsConfig.RootCAs = rootCAs
|
||||
}
|
||||
if config.clientkey != "" && config.clientcert == "" {
|
||||
return nil, fmt.Errorf("have to provide both client key,cert. Only client key provided ")
|
||||
if config.clientKey != "" && config.clientCert == "" {
|
||||
return nil, fmt.Errorf("have to provide both client key, cert. Only client key provided ")
|
||||
}
|
||||
if config.clientkey == "" && config.clientcert != "" {
|
||||
return nil, fmt.Errorf("have to provide both client key,cert. Only client cert provided ")
|
||||
if config.clientKey == "" && config.clientCert != "" {
|
||||
return nil, fmt.Errorf("have to provide both client key, cert. Only client cert provided ")
|
||||
}
|
||||
if config.clientkey != "" && config.clientcert != "" {
|
||||
cert, err := tls.LoadX509KeyPair(config.clientcert, config.clientkey)
|
||||
if config.clientKey != "" && config.clientCert != "" {
|
||||
cert, err := tls.LoadX509KeyPair(config.clientCert, config.clientKey)
|
||||
if err != nil {
|
||||
config.logger.Println("error occured loading client certs and keys")
|
||||
config.logger.Println("error occurred loading client certs and keys")
|
||||
return nil, err
|
||||
}
|
||||
tlsConfig.Certificates = []tls.Certificate{cert}
|
||||
|
@ -437,58 +432,24 @@ func defaultTTransport(urlstr string, timeoutms int, config *RealisConfig) (thri
|
|||
transport.TLSClientConfig = tlsConfig
|
||||
}
|
||||
|
||||
trans, err := thrift.NewTHttpPostClientWithOptions(urlstr+"/api",
|
||||
thrift.THttpClientOptions{Client: &http.Client{Timeout: time.Millisecond * time.Duration(timeoutms), Transport: &transport, Jar: jar}})
|
||||
trans, err := thrift.NewTHttpClientWithOptions(
|
||||
url+APIPath,
|
||||
thrift.THttpClientOptions{
|
||||
Client: &http.Client{
|
||||
Timeout: time.Millisecond * time.Duration(timeoutMs),
|
||||
Transport: &transport}})
|
||||
|
||||
if err != nil {
|
||||
return &thrift.THttpClient{}, errors.Wrap(err, "Error creating transport")
|
||||
return nil, errors.Wrap(err, "Error creating transport")
|
||||
}
|
||||
|
||||
if err := trans.Open(); err != nil {
|
||||
return &thrift.THttpClient{}, errors.Wrapf(err, "Error opening connection to %s", urlstr)
|
||||
return nil, errors.Wrapf(err, "Error opening connection to %s", url)
|
||||
}
|
||||
|
||||
return trans, nil
|
||||
}
|
||||
|
||||
// Create a default configuration of the transport layer, requires a URL to test connection with.
|
||||
// Uses HTTP Post as transport layer and Thrift JSON as the wire protocol by default.
|
||||
func newDefaultConfig(url string, timeoutms int, config *RealisConfig) (*RealisConfig, error) {
|
||||
return newTJSONConfig(url, timeoutms, config)
|
||||
}
|
||||
|
||||
// Creates a realis config object using HTTP Post and Thrift JSON protocol to communicate with Aurora.
|
||||
func newTJSONConfig(url string, timeoutms int, config *RealisConfig) (*RealisConfig, error) {
|
||||
trans, err := defaultTTransport(url, timeoutms, config)
|
||||
if err != nil {
|
||||
return &RealisConfig{}, errors.Wrap(err, "Error creating realis config")
|
||||
}
|
||||
|
||||
httpTrans := (trans).(*thrift.THttpClient)
|
||||
httpTrans.SetHeader("Content-Type", "application/x-thrift")
|
||||
httpTrans.SetHeader("User-Agent", "gorealis v"+VERSION)
|
||||
|
||||
return &RealisConfig{transport: trans, protoFactory: thrift.NewTJSONProtocolFactory()}, nil
|
||||
}
|
||||
|
||||
// Creates a realis config config using HTTP Post and Thrift Binary protocol to communicate with Aurora.
|
||||
func newTBinaryConfig(url string, timeoutms int, config *RealisConfig) (*RealisConfig, error) {
|
||||
trans, err := defaultTTransport(url, timeoutms, config)
|
||||
if err != nil {
|
||||
return &RealisConfig{}, errors.Wrap(err, "Error creating realis config")
|
||||
}
|
||||
|
||||
httpTrans := (trans).(*thrift.THttpClient)
|
||||
httpTrans.DelHeader("Content-Type") // Workaround for using thrift HttpPostClient
|
||||
|
||||
httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary")
|
||||
httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary")
|
||||
httpTrans.SetHeader("User-Agent", "gorealis v"+VERSION)
|
||||
|
||||
return &RealisConfig{transport: trans, protoFactory: thrift.NewTBinaryProtocolFactoryDefault()}, nil
|
||||
|
||||
}
|
||||
|
||||
func basicAuth(username, password string) string {
|
||||
auth := username + ":" + password
|
||||
return base64.StdEncoding.EncodeToString([]byte(auth))
|
||||
|
@ -534,21 +495,21 @@ func (r *realisClient) Close() {
|
|||
// Uses predefined set of states to retrieve a set of active jobs in Apache Aurora.
|
||||
func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states []aurora.ScheduleStatus) ([]int32, error) {
|
||||
taskQ := &aurora.TaskQuery{
|
||||
Role: &key.Role,
|
||||
Environment: &key.Environment,
|
||||
JobName: &key.Name,
|
||||
Statuses: states,
|
||||
JobKeys: []*aurora.JobKey{{Environment: key.Environment, Role: key.Role, Name: key.Name}},
|
||||
Statuses: states,
|
||||
}
|
||||
|
||||
r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", taskQ)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.GetTasksWithoutConfigs(nil, taskQ)
|
||||
})
|
||||
resp, retryErr := r.thriftCallWithRetries(
|
||||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.GetTasksWithoutConfigs(nil, taskQ)
|
||||
})
|
||||
|
||||
// If we encountered an error we couldn't recover from by retrying, return an error to the user
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "Error querying Aurora Scheduler for active IDs")
|
||||
return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for active IDs")
|
||||
}
|
||||
|
||||
// Construct instance id map to stay in line with thrift's representation of sets
|
||||
|
@ -565,12 +526,14 @@ func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue
|
|||
|
||||
r.logger.DebugPrintf("GetJobUpdateSummaries Thrift Payload: %+v\n", jobUpdateQuery)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.readonlyClient.GetJobUpdateSummaries(nil, jobUpdateQuery)
|
||||
})
|
||||
resp, retryErr := r.thriftCallWithRetries(
|
||||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.readonlyClient.GetJobUpdateSummaries(nil, jobUpdateQuery)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "Error getting job update summaries from Aurora Scheduler")
|
||||
return nil, errors.Wrap(retryErr, "error getting job update summaries from Aurora Scheduler")
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
|
@ -580,12 +543,14 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe
|
|||
|
||||
var result *aurora.GetJobsResult_
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.readonlyClient.GetJobs(nil, role)
|
||||
})
|
||||
resp, retryErr := r.thriftCallWithRetries(
|
||||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.readonlyClient.GetJobs(nil, role)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, result, errors.Wrap(retryErr, "Error getting Jobs from Aurora Scheduler")
|
||||
return nil, result, errors.Wrap(retryErr, "error getting Jobs from Aurora Scheduler")
|
||||
}
|
||||
|
||||
if resp.GetResult_() != nil {
|
||||
|
@ -599,12 +564,14 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe
|
|||
func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) {
|
||||
r.logger.DebugPrintf("KillTasks Thrift Payload: %+v %v\n", key, instances)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.KillTasks(nil, key, instances, "")
|
||||
})
|
||||
resp, retryErr := r.thriftCallWithRetries(
|
||||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.KillTasks(nil, key, instances, "")
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "Error sending Kill command to Aurora Scheduler")
|
||||
return nil, errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler")
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
@ -618,13 +585,15 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
|
|||
|
||||
r.logger.DebugPrintf("KillTasks Thrift Payload: %+v\n", key)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
// Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards
|
||||
return r.client.KillTasks(nil, key, nil, "")
|
||||
})
|
||||
resp, retryErr := r.thriftCallWithRetries(
|
||||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
// Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards
|
||||
return r.client.KillTasks(nil, key, nil, "")
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "Error sending Kill command to Aurora Scheduler")
|
||||
return nil, errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler")
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
@ -637,12 +606,14 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
|
|||
|
||||
r.logger.DebugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig())
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.CreateJob(nil, auroraJob.JobConfig())
|
||||
})
|
||||
resp, retryErr := r.thriftCallWithRetries(
|
||||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.CreateJob(nil, auroraJob.JobConfig())
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return resp, errors.Wrap(retryErr, "Error sending Create command to Aurora Scheduler")
|
||||
return resp, errors.Wrap(retryErr, "error sending Create command to Aurora Scheduler")
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
@ -655,6 +626,10 @@ func (r *realisClient) CreateService(auroraJob Job, settings *aurora.JobUpdateSe
|
|||
|
||||
resp, err := r.StartJobUpdate(update, "")
|
||||
if err != nil {
|
||||
if IsTimeout(err) {
|
||||
return resp, nil, err
|
||||
}
|
||||
|
||||
return resp, nil, errors.Wrap(err, "unable to create service")
|
||||
}
|
||||
|
||||
|
@ -668,12 +643,14 @@ func (r *realisClient) CreateService(auroraJob Job, settings *aurora.JobUpdateSe
|
|||
func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) {
|
||||
r.logger.DebugPrintf("ScheduleCronJob Thrift Payload: %+v\n", auroraJob.JobConfig())
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.ScheduleCronJob(nil, auroraJob.JobConfig())
|
||||
})
|
||||
resp, retryErr := r.thriftCallWithRetries(
|
||||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.ScheduleCronJob(nil, auroraJob.JobConfig())
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "Error sending Cron Job Schedule message to Aurora Scheduler")
|
||||
return nil, errors.Wrap(retryErr, "error sending Cron Job Schedule message to Aurora Scheduler")
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
@ -682,12 +659,14 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response,
|
|||
|
||||
r.logger.DebugPrintf("DescheduleCronJob Thrift Payload: %+v\n", key)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.DescheduleCronJob(nil, key)
|
||||
})
|
||||
resp, retryErr := r.thriftCallWithRetries(
|
||||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.DescheduleCronJob(nil, key)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "Error sending Cron Job De-schedule message to Aurora Scheduler")
|
||||
return nil, errors.Wrap(retryErr, "error sending Cron Job De-schedule message to Aurora Scheduler")
|
||||
|
||||
}
|
||||
return resp, nil
|
||||
|
@ -698,12 +677,14 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error
|
|||
|
||||
r.logger.DebugPrintf("StartCronJob Thrift Payload: %+v\n", key)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.StartCronJob(nil, key)
|
||||
})
|
||||
resp, retryErr := r.thriftCallWithRetries(
|
||||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.StartCronJob(nil, key)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "Error sending Start Cron Job message to Aurora Scheduler")
|
||||
return nil, errors.Wrap(retryErr, "error sending Start Cron Job message to Aurora Scheduler")
|
||||
}
|
||||
return resp, nil
|
||||
|
||||
|
@ -713,12 +694,14 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error
|
|||
func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) {
|
||||
r.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instances)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.RestartShards(nil, key, instances)
|
||||
})
|
||||
resp, retryErr := r.thriftCallWithRetries(
|
||||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.RestartShards(nil, key, instances)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "Error sending Restart command to Aurora Scheduler")
|
||||
return nil, errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler")
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
@ -734,12 +717,14 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
|
|||
r.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instanceIds)
|
||||
|
||||
if len(instanceIds) > 0 {
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.RestartShards(nil, key, instanceIds)
|
||||
})
|
||||
resp, retryErr := r.thriftCallWithRetries(
|
||||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.RestartShards(nil, key, instanceIds)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "Error sending Restart command to Aurora Scheduler")
|
||||
return nil, errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler")
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
|
@ -753,12 +738,19 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au
|
|||
|
||||
r.logger.DebugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.StartJobUpdate(nil, updateJob.req, message)
|
||||
})
|
||||
resp, retryErr := r.thriftCallWithRetries(
|
||||
true,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.StartJobUpdate(nil, updateJob.req, message)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return resp, errors.Wrap(retryErr, "Error sending StartJobUpdate command to Aurora Scheduler")
|
||||
// A timeout took place when attempting this call, attempt to recover
|
||||
if IsTimeout(retryErr) {
|
||||
return resp, retryErr
|
||||
} else {
|
||||
return resp, errors.Wrap(retryErr, "error sending StartJobUpdate command to Aurora Scheduler")
|
||||
}
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
@ -770,12 +762,14 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str
|
|||
|
||||
r.logger.DebugPrintf("AbortJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.AbortJobUpdate(nil, &updateKey, message)
|
||||
})
|
||||
resp, retryErr := r.thriftCallWithRetries(
|
||||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.AbortJobUpdate(nil, &updateKey, message)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "Error sending AbortJobUpdate command to Aurora Scheduler")
|
||||
return nil, errors.Wrap(retryErr, "error sending AbortJobUpdate command to Aurora Scheduler")
|
||||
}
|
||||
|
||||
// Make this call synchronous by blocking until it job has successfully transitioned to aborted
|
||||
|
@ -785,49 +779,55 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str
|
|||
return resp, err
|
||||
}
|
||||
|
||||
//Pause Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.
|
||||
// Pause Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.
|
||||
func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) {
|
||||
|
||||
r.logger.DebugPrintf("PauseJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.PauseJobUpdate(nil, updateKey, message)
|
||||
})
|
||||
resp, retryErr := r.thriftCallWithRetries(
|
||||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.PauseJobUpdate(nil, updateKey, message)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "Error sending PauseJobUpdate command to Aurora Scheduler")
|
||||
return nil, errors.Wrap(retryErr, "error sending PauseJobUpdate command to Aurora Scheduler")
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
//Resume Paused Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.
|
||||
// Resume Paused Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.
|
||||
func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) {
|
||||
|
||||
r.logger.DebugPrintf("ResumeJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.ResumeJobUpdate(nil, updateKey, message)
|
||||
})
|
||||
resp, retryErr := r.thriftCallWithRetries(
|
||||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.ResumeJobUpdate(nil, updateKey, message)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "Error sending ResumeJobUpdate command to Aurora Scheduler")
|
||||
return nil, errors.Wrap(retryErr, "error sending ResumeJobUpdate command to Aurora Scheduler")
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
//Pulse Job Update on Aurora. UpdateID is returned from StartJobUpdate or the Aurora web UI.
|
||||
// Pulse Job Update on Aurora. UpdateID is returned from StartJobUpdate or the Aurora web UI.
|
||||
func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.Response, error) {
|
||||
|
||||
r.logger.DebugPrintf("PulseJobUpdate Thrift Payload: %+v\n", updateKey)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.PulseJobUpdate(nil, updateKey)
|
||||
})
|
||||
resp, retryErr := r.thriftCallWithRetries(
|
||||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.PulseJobUpdate(nil, updateKey)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "Error sending PulseJobUpdate command to Aurora Scheduler")
|
||||
return nil, errors.Wrap(retryErr, "error sending PulseJobUpdate command to Aurora Scheduler")
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
|
@ -839,12 +839,14 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a
|
|||
|
||||
r.logger.DebugPrintf("AddInstances Thrift Payload: %+v %v\n", instKey, count)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.AddInstances(nil, &instKey, count)
|
||||
})
|
||||
resp, retryErr := r.thriftCallWithRetries(
|
||||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.AddInstances(nil, &instKey, count)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "Error sending AddInstances command to Aurora Scheduler")
|
||||
return nil, errors.Wrap(retryErr, "error sending AddInstances command to Aurora Scheduler")
|
||||
}
|
||||
return resp, nil
|
||||
|
||||
|
@ -876,12 +878,14 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.Schedul
|
|||
|
||||
r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", query)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.GetTasksStatus(nil, query)
|
||||
})
|
||||
resp, retryErr := r.thriftCallWithRetries(
|
||||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.GetTasksStatus(nil, query)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "Error querying Aurora Scheduler for task status")
|
||||
return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status")
|
||||
}
|
||||
|
||||
return response.ScheduleStatusResult(resp).GetTasks(), nil
|
||||
|
@ -892,12 +896,14 @@ func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.Pend
|
|||
|
||||
r.logger.DebugPrintf("GetPendingReason Thrift Payload: %+v\n", query)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.GetPendingReason(nil, query)
|
||||
})
|
||||
resp, retryErr := r.thriftCallWithRetries(
|
||||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.GetPendingReason(nil, query)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "Error querying Aurora Scheduler for pending Reasons")
|
||||
return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for pending Reasons")
|
||||
}
|
||||
|
||||
var pendingReasons []*aurora.PendingReason
|
||||
|
@ -914,12 +920,14 @@ func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*auror
|
|||
|
||||
r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.GetTasksWithoutConfigs(nil, query)
|
||||
})
|
||||
resp, retryErr := r.thriftCallWithRetries(
|
||||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.GetTasksWithoutConfigs(nil, query)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "Error querying Aurora Scheduler for task status without configs")
|
||||
return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status without configs")
|
||||
}
|
||||
|
||||
return response.ScheduleStatusResult(resp).GetTasks(), nil
|
||||
|
@ -938,12 +946,14 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
|
|||
|
||||
r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", taskQ)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.GetTasksStatus(nil, taskQ)
|
||||
})
|
||||
resp, retryErr := r.thriftCallWithRetries(
|
||||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.GetTasksStatus(nil, taskQ)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "Error querying Aurora Scheduler for task configuration")
|
||||
return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task configuration")
|
||||
}
|
||||
|
||||
tasks := response.ScheduleStatusResult(resp).GetTasks()
|
||||
|
@ -964,9 +974,11 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur
|
|||
|
||||
r.logger.DebugPrintf("GetJobUpdateDetails Thrift Payload: %+v\n", updateQuery)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.GetJobUpdateDetails(nil, &updateQuery)
|
||||
})
|
||||
resp, retryErr := r.thriftCallWithRetries(
|
||||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.GetJobUpdateDetails(nil, &updateQuery)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "Unable to get job update details")
|
||||
|
@ -979,259 +991,14 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string
|
|||
|
||||
r.logger.DebugPrintf("RollbackJobUpdate Thrift Payload: %+v %v\n", key, message)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.RollbackJobUpdate(nil, &key, message)
|
||||
})
|
||||
resp, retryErr := r.thriftCallWithRetries(
|
||||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.client.RollbackJobUpdate(nil, &key, message)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "Unable to roll back job update")
|
||||
return nil, errors.Wrap(retryErr, "unable to roll back job update")
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
/* Admin functions */
|
||||
// TODO(rdelvalle): Consider moving these functions to another interface. It would be a backwards incompatible change,
|
||||
// but would add safety.
|
||||
|
||||
// Set a list of nodes to DRAINING. This means nothing will be able to be scheduled on them and any existing
|
||||
// tasks will be killed and re-scheduled elsewhere in the cluster. Tasks from DRAINING nodes are not guaranteed
|
||||
// to return to running unless there is enough capacity in the cluster to run them.
|
||||
func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.DrainHostsResult_, error) {
|
||||
|
||||
var result *aurora.DrainHostsResult_
|
||||
|
||||
if len(hosts) == 0 {
|
||||
return nil, nil, errors.New("no hosts provided to drain")
|
||||
}
|
||||
|
||||
drainList := aurora.NewHosts()
|
||||
drainList.HostNames = hosts
|
||||
|
||||
r.logger.DebugPrintf("DrainHosts Thrift Payload: %v\n", drainList)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.adminClient.DrainHosts(nil, drainList)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
|
||||
}
|
||||
|
||||
if resp.GetResult_() != nil {
|
||||
result = resp.GetResult_().GetDrainHostsResult_()
|
||||
}
|
||||
|
||||
return resp, result, nil
|
||||
}
|
||||
|
||||
// Start SLA Aware Drain.
|
||||
// defaultSlaPolicy is the fallback SlaPolicy to use if a task does not have an SlaPolicy.
|
||||
// After timeoutSecs, tasks will be forcefully drained without checking SLA.
|
||||
func (r *realisClient) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, hosts ...string) (*aurora.DrainHostsResult_, error) {
|
||||
var result *aurora.DrainHostsResult_
|
||||
|
||||
if len(hosts) == 0 {
|
||||
return nil, errors.New("no hosts provided to drain")
|
||||
}
|
||||
|
||||
drainList := aurora.NewHosts()
|
||||
drainList.HostNames = hosts
|
||||
|
||||
r.logger.DebugPrintf("SLADrainHosts Thrift Payload: %v\n", drainList)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.adminClient.SlaDrainHosts(nil, drainList, policy, timeout)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return result, errors.Wrap(retryErr, "Unable to recover connection")
|
||||
}
|
||||
|
||||
if resp.GetResult_() != nil {
|
||||
result = resp.GetResult_().GetDrainHostsResult_()
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (r *realisClient) StartMaintenance(hosts ...string) (*aurora.Response, *aurora.StartMaintenanceResult_, error) {
|
||||
|
||||
var result *aurora.StartMaintenanceResult_
|
||||
|
||||
if len(hosts) == 0 {
|
||||
return nil, nil, errors.New("no hosts provided to start maintenance on")
|
||||
}
|
||||
|
||||
hostList := aurora.NewHosts()
|
||||
hostList.HostNames = hosts
|
||||
|
||||
r.logger.DebugPrintf("StartMaintenance Thrift Payload: %v\n", hostList)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.adminClient.StartMaintenance(nil, hostList)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
|
||||
}
|
||||
|
||||
if resp.GetResult_() != nil {
|
||||
result = resp.GetResult_().GetStartMaintenanceResult_()
|
||||
}
|
||||
|
||||
return resp, result, nil
|
||||
}
|
||||
|
||||
func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *aurora.EndMaintenanceResult_, error) {
|
||||
|
||||
var result *aurora.EndMaintenanceResult_
|
||||
|
||||
if len(hosts) == 0 {
|
||||
return nil, nil, errors.New("no hosts provided to end maintenance on")
|
||||
}
|
||||
|
||||
hostList := aurora.NewHosts()
|
||||
hostList.HostNames = hosts
|
||||
|
||||
r.logger.DebugPrintf("EndMaintenance Thrift Payload: %v\n", hostList)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.adminClient.EndMaintenance(nil, hostList)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
|
||||
}
|
||||
|
||||
if resp.GetResult_() != nil {
|
||||
result = resp.GetResult_().GetEndMaintenanceResult_()
|
||||
}
|
||||
|
||||
return resp, result, nil
|
||||
}
|
||||
|
||||
func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *aurora.MaintenanceStatusResult_, error) {
|
||||
|
||||
var result *aurora.MaintenanceStatusResult_
|
||||
|
||||
if len(hosts) == 0 {
|
||||
return nil, nil, errors.New("no hosts provided to get maintenance status from")
|
||||
}
|
||||
|
||||
hostList := aurora.NewHosts()
|
||||
hostList.HostNames = hosts
|
||||
|
||||
r.logger.DebugPrintf("MaintenanceStatus Thrift Payload: %v\n", hostList)
|
||||
|
||||
// Make thrift call. If we encounter an error sending the call, attempt to reconnect
|
||||
// and continue trying to resend command until we run out of retries.
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.adminClient.MaintenanceStatus(nil, hostList)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
|
||||
}
|
||||
|
||||
if resp.GetResult_() != nil {
|
||||
result = resp.GetResult_().GetMaintenanceStatusResult_()
|
||||
}
|
||||
|
||||
return resp, result, nil
|
||||
}
|
||||
|
||||
// SetQuota sets a quota aggregate for the given role
|
||||
// TODO(zircote) Currently investigating an error that is returned from thrift calls that include resources for `NamedPort` and `NumGpu`
|
||||
func (r *realisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb *int64) (*aurora.Response, error) {
|
||||
ramRes := aurora.NewResource()
|
||||
ramRes.RamMb = ramMb
|
||||
cpuRes := aurora.NewResource()
|
||||
cpuRes.NumCpus = cpu
|
||||
diskRes := aurora.NewResource()
|
||||
diskRes.DiskMb = diskMb
|
||||
quota := aurora.NewResourceAggregate()
|
||||
quota.Resources = []*aurora.Resource{cpuRes, ramRes, diskRes}
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.adminClient.SetQuota(nil, role, quota)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return resp, errors.Wrap(retryErr, "Unable to set role quota")
|
||||
}
|
||||
return resp, retryErr
|
||||
|
||||
}
|
||||
|
||||
// GetQuota returns the resource aggregate for the given role
|
||||
func (r *realisClient) GetQuota(role string) (*aurora.Response, error) {
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.adminClient.GetQuota(nil, role)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return resp, errors.Wrap(retryErr, "Unable to get role quota")
|
||||
}
|
||||
return resp, retryErr
|
||||
}
|
||||
|
||||
// Force Aurora Scheduler to perform a snapshot and write to Mesos log
|
||||
func (r *realisClient) Snapshot() error {
|
||||
|
||||
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.adminClient.Snapshot(nil)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return errors.Wrap(retryErr, "Unable to recover connection")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Force Aurora Scheduler to write backup file to a file in the backup directory
|
||||
func (r *realisClient) PerformBackup() error {
|
||||
|
||||
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.adminClient.PerformBackup(nil)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return errors.Wrap(retryErr, "Unable to recover connection")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *realisClient) ForceImplicitTaskReconciliation() error {
|
||||
|
||||
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.adminClient.TriggerImplicitTaskReconciliation(nil)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return errors.Wrap(retryErr, "Unable to recover connection")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *realisClient) ForceExplicitTaskReconciliation(batchSize *int32) error {
|
||||
|
||||
if batchSize != nil && *batchSize < 1 {
|
||||
return errors.New("Invalid batch size.")
|
||||
}
|
||||
settings := aurora.NewExplicitReconciliationSettings()
|
||||
|
||||
settings.BatchSize = batchSize
|
||||
|
||||
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.adminClient.TriggerExplicitTaskReconciliation(nil, settings)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return errors.Wrap(retryErr, "Unable to recover connection")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
269
realis_admin.go
Normal file
269
realis_admin.go
Normal file
|
@ -0,0 +1,269 @@
|
|||
package realis
|
||||
|
||||
import (
|
||||
"github.com/paypal/gorealis/gen-go/apache/aurora"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// TODO(rdelvalle): Consider moving these functions to another interface. It would be a backwards incompatible change,
|
||||
// but would add safety.
|
||||
|
||||
// Set a list of nodes to DRAINING. This means nothing will be able to be scheduled on them and any existing
|
||||
// tasks will be killed and re-scheduled elsewhere in the cluster. Tasks from DRAINING nodes are not guaranteed
|
||||
// to return to running unless there is enough capacity in the cluster to run them.
|
||||
func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.DrainHostsResult_, error) {
|
||||
|
||||
var result *aurora.DrainHostsResult_
|
||||
|
||||
if len(hosts) == 0 {
|
||||
return nil, nil, errors.New("no hosts provided to drain")
|
||||
}
|
||||
|
||||
drainList := aurora.NewHosts()
|
||||
drainList.HostNames = hosts
|
||||
|
||||
r.logger.DebugPrintf("DrainHosts Thrift Payload: %v\n", drainList)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(
|
||||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.adminClient.DrainHosts(nil, drainList)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
|
||||
}
|
||||
|
||||
if resp.GetResult_() != nil {
|
||||
result = resp.GetResult_().GetDrainHostsResult_()
|
||||
}
|
||||
|
||||
return resp, result, nil
|
||||
}
|
||||
|
||||
// Start SLA Aware Drain.
|
||||
// defaultSlaPolicy is the fallback SlaPolicy to use if a task does not have an SlaPolicy.
|
||||
// After timeoutSecs, tasks will be forcefully drained without checking SLA.
|
||||
func (r *realisClient) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, hosts ...string) (*aurora.DrainHostsResult_, error) {
|
||||
var result *aurora.DrainHostsResult_
|
||||
|
||||
if len(hosts) == 0 {
|
||||
return nil, errors.New("no hosts provided to drain")
|
||||
}
|
||||
|
||||
drainList := aurora.NewHosts()
|
||||
drainList.HostNames = hosts
|
||||
|
||||
r.logger.DebugPrintf("SLADrainHosts Thrift Payload: %v\n", drainList)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(
|
||||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.adminClient.SlaDrainHosts(nil, drainList, policy, timeout)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return result, errors.Wrap(retryErr, "Unable to recover connection")
|
||||
}
|
||||
|
||||
if resp.GetResult_() != nil {
|
||||
result = resp.GetResult_().GetDrainHostsResult_()
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (r *realisClient) StartMaintenance(hosts ...string) (*aurora.Response, *aurora.StartMaintenanceResult_, error) {
|
||||
|
||||
var result *aurora.StartMaintenanceResult_
|
||||
|
||||
if len(hosts) == 0 {
|
||||
return nil, nil, errors.New("no hosts provided to start maintenance on")
|
||||
}
|
||||
|
||||
hostList := aurora.NewHosts()
|
||||
hostList.HostNames = hosts
|
||||
|
||||
r.logger.DebugPrintf("StartMaintenance Thrift Payload: %v\n", hostList)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(
|
||||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.adminClient.StartMaintenance(nil, hostList)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
|
||||
}
|
||||
|
||||
if resp.GetResult_() != nil {
|
||||
result = resp.GetResult_().GetStartMaintenanceResult_()
|
||||
}
|
||||
|
||||
return resp, result, nil
|
||||
}
|
||||
|
||||
func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *aurora.EndMaintenanceResult_, error) {
|
||||
|
||||
var result *aurora.EndMaintenanceResult_
|
||||
|
||||
if len(hosts) == 0 {
|
||||
return nil, nil, errors.New("no hosts provided to end maintenance on")
|
||||
}
|
||||
|
||||
hostList := aurora.NewHosts()
|
||||
hostList.HostNames = hosts
|
||||
|
||||
r.logger.DebugPrintf("EndMaintenance Thrift Payload: %v\n", hostList)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(
|
||||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.adminClient.EndMaintenance(nil, hostList)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
|
||||
}
|
||||
|
||||
if resp.GetResult_() != nil {
|
||||
result = resp.GetResult_().GetEndMaintenanceResult_()
|
||||
}
|
||||
|
||||
return resp, result, nil
|
||||
}
|
||||
|
||||
func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *aurora.MaintenanceStatusResult_, error) {
|
||||
|
||||
var result *aurora.MaintenanceStatusResult_
|
||||
|
||||
if len(hosts) == 0 {
|
||||
return nil, nil, errors.New("no hosts provided to get maintenance status from")
|
||||
}
|
||||
|
||||
hostList := aurora.NewHosts()
|
||||
hostList.HostNames = hosts
|
||||
|
||||
r.logger.DebugPrintf("MaintenanceStatus Thrift Payload: %v\n", hostList)
|
||||
|
||||
// Make thrift call. If we encounter an error sending the call, attempt to reconnect
|
||||
// and continue trying to resend command until we run out of retries.
|
||||
resp, retryErr := r.thriftCallWithRetries(
|
||||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.adminClient.MaintenanceStatus(nil, hostList)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
|
||||
}
|
||||
|
||||
if resp.GetResult_() != nil {
|
||||
result = resp.GetResult_().GetMaintenanceStatusResult_()
|
||||
}
|
||||
|
||||
return resp, result, nil
|
||||
}
|
||||
|
||||
// SetQuota sets a quota aggregate for the given role
|
||||
// TODO(zircote) Currently investigating an error that is returned from thrift calls that include resources for `NamedPort` and `NumGpu`
|
||||
func (r *realisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb *int64) (*aurora.Response, error) {
|
||||
quota := &aurora.ResourceAggregate{
|
||||
Resources: []*aurora.Resource{{NumCpus: cpu}, {RamMb: ramMb}, {DiskMb: diskMb}},
|
||||
}
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(
|
||||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.adminClient.SetQuota(nil, role, quota)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return resp, errors.Wrap(retryErr, "Unable to set role quota")
|
||||
}
|
||||
return resp, retryErr
|
||||
|
||||
}
|
||||
|
||||
// GetQuota returns the resource aggregate for the given role
|
||||
func (r *realisClient) GetQuota(role string) (*aurora.Response, error) {
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(
|
||||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.adminClient.GetQuota(nil, role)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return resp, errors.Wrap(retryErr, "Unable to get role quota")
|
||||
}
|
||||
return resp, retryErr
|
||||
}
|
||||
|
||||
// Force Aurora Scheduler to perform a snapshot and write to Mesos log
|
||||
func (r *realisClient) Snapshot() error {
|
||||
|
||||
_, retryErr := r.thriftCallWithRetries(
|
||||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.adminClient.Snapshot(nil)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return errors.Wrap(retryErr, "Unable to recover connection")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Force Aurora Scheduler to write backup file to a file in the backup directory
|
||||
func (r *realisClient) PerformBackup() error {
|
||||
|
||||
_, retryErr := r.thriftCallWithRetries(
|
||||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.adminClient.PerformBackup(nil)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return errors.Wrap(retryErr, "Unable to recover connection")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *realisClient) ForceImplicitTaskReconciliation() error {
|
||||
|
||||
_, retryErr := r.thriftCallWithRetries(
|
||||
false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.adminClient.TriggerImplicitTaskReconciliation(nil)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return errors.Wrap(retryErr, "Unable to recover connection")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *realisClient) ForceExplicitTaskReconciliation(batchSize *int32) error {
|
||||
|
||||
if batchSize != nil && *batchSize < 1 {
|
||||
return errors.New("Invalid batch size.")
|
||||
}
|
||||
settings := aurora.NewExplicitReconciliationSettings()
|
||||
|
||||
settings.BatchSize = batchSize
|
||||
|
||||
_, retryErr := r.thriftCallWithRetries(false,
|
||||
func() (*aurora.Response, error) {
|
||||
return r.adminClient.TriggerExplicitTaskReconciliation(nil, settings)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return errors.Wrap(retryErr, "Unable to recover connection")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -35,11 +35,13 @@ var r realis.Realis
|
|||
var monitor *realis.Monitor
|
||||
var thermosPayload []byte
|
||||
|
||||
const auroraURL = "http://192.168.33.7:8081"
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
var err error
|
||||
|
||||
// New configuration to connect to docker container
|
||||
r, err = realis.NewRealisClient(realis.SchedulerUrl("http://192.168.33.7:8081"),
|
||||
r, err = realis.NewRealisClient(realis.SchedulerUrl(auroraURL),
|
||||
realis.BasicAuth("aurora", "secret"),
|
||||
realis.TimeoutMS(20000))
|
||||
|
||||
|
@ -93,7 +95,7 @@ func TestNonExistentEndpoint(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestThriftBinary(t *testing.T) {
|
||||
r, err := realis.NewRealisClient(realis.SchedulerUrl("http://192.168.33.7:8081"),
|
||||
r, err := realis.NewRealisClient(realis.SchedulerUrl(auroraURL),
|
||||
realis.BasicAuth("aurora", "secret"),
|
||||
realis.TimeoutMS(20000),
|
||||
realis.ThriftBinary())
|
||||
|
@ -115,7 +117,7 @@ func TestThriftBinary(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestThriftJSON(t *testing.T) {
|
||||
r, err := realis.NewRealisClient(realis.SchedulerUrl("http://192.168.33.7:8081"),
|
||||
r, err := realis.NewRealisClient(realis.SchedulerUrl(auroraURL),
|
||||
realis.BasicAuth("aurora", "secret"),
|
||||
realis.TimeoutMS(20000),
|
||||
realis.ThriftJSON())
|
||||
|
@ -137,7 +139,7 @@ func TestThriftJSON(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestNoopLogger(t *testing.T) {
|
||||
r, err := realis.NewRealisClient(realis.SchedulerUrl("http://192.168.33.7:8081"),
|
||||
r, err := realis.NewRealisClient(realis.SchedulerUrl(auroraURL),
|
||||
realis.BasicAuth("aurora", "secret"),
|
||||
realis.SetLogger(realis.NoopLogger{}))
|
||||
|
||||
|
@ -161,6 +163,8 @@ func TestLeaderFromZK(t *testing.T) {
|
|||
url, err := realis.LeaderFromZK(*cluster)
|
||||
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Address stored inside of ZK might be different than the one we connect to in our tests.
|
||||
assert.Equal(t, "http://192.168.33.7:8081", url)
|
||||
}
|
||||
|
||||
|
@ -271,6 +275,25 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) {
|
|||
assert.True(t, success)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("Duplicate_Metadata", func(t *testing.T) {
|
||||
job.Name("thermos_duplicate_metadata").
|
||||
AddLabel("hostname", "cookie").
|
||||
AddLabel("hostname", "candy").
|
||||
AddLabel("hostname", "popcorn").
|
||||
AddLabel("hostname", "chips").
|
||||
AddLabel("chips", "chips")
|
||||
|
||||
_, err := r.CreateJob(job)
|
||||
assert.NoError(t, err)
|
||||
|
||||
success, err := monitor.Instances(job.JobKey(), 2, 1, 50)
|
||||
assert.True(t, success)
|
||||
assert.NoError(t, err)
|
||||
|
||||
_, err = r.KillJob(job.JobKey())
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
// Test configuring an executor that doesn't exist for CreateJob API
|
||||
|
@ -461,6 +484,64 @@ func TestRealisClient_CreateService(t *testing.T) {
|
|||
// Kill task test task after confirming it came up fine
|
||||
_, err = r.KillJob(job.JobKey())
|
||||
assert.NoError(t, err)
|
||||
|
||||
success, err := monitor.Instances(job.JobKey(), 0, 1, 50)
|
||||
assert.True(t, success)
|
||||
|
||||
// Create a client which will timeout and close the connection before receiving an answer
|
||||
timeoutClient, err := realis.NewRealisClient(realis.SchedulerUrl(auroraURL),
|
||||
realis.BasicAuth("aurora", "secret"),
|
||||
realis.TimeoutMS(10))
|
||||
assert.NoError(t, err)
|
||||
defer timeoutClient.Close()
|
||||
|
||||
// Test case where http connection timeouts out.
|
||||
t.Run("TimeoutError", func(t *testing.T) {
|
||||
job.Name("createService_timeout")
|
||||
|
||||
// Make sure a timedout error was returned
|
||||
_, _, err = timeoutClient.CreateService(job, settings)
|
||||
assert.Error(t, err)
|
||||
assert.True(t, realis.IsTimeout(err))
|
||||
|
||||
updateReceivedQuery := aurora.JobUpdateQuery{
|
||||
Role: &job.JobKey().Role,
|
||||
JobKey: job.JobKey(),
|
||||
UpdateStatuses: aurora.ACTIVE_JOB_UPDATE_STATES,
|
||||
Limit: 1}
|
||||
|
||||
updateSummaries, err := monitor.JobUpdateQuery(updateReceivedQuery, time.Second*1, time.Second*50)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Len(t, updateSummaries, 1)
|
||||
|
||||
_, err = r.AbortJobUpdate(*updateSummaries[0].Key, "Cleaning up")
|
||||
_, err = r.KillJob(job.JobKey())
|
||||
assert.NoError(t, err)
|
||||
|
||||
})
|
||||
|
||||
// Test case where http connection timeouts out.
|
||||
t.Run("TimeoutError_BadPayload", func(t *testing.T) {
|
||||
// Illegal payload
|
||||
job.InstanceCount(-1)
|
||||
job.Name("createService_timeout_bad_payload")
|
||||
|
||||
// Make sure a timedout error was returned
|
||||
_, _, err = timeoutClient.CreateService(job, settings)
|
||||
assert.Error(t, err)
|
||||
assert.True(t, realis.IsTimeout(err))
|
||||
|
||||
summary, err := r.GetJobUpdateSummaries(
|
||||
&aurora.JobUpdateQuery{
|
||||
Role: &job.JobKey().Role,
|
||||
JobKey: job.JobKey(),
|
||||
UpdateStatuses: aurora.ACTIVE_JOB_UPDATE_STATES})
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Payload should have been rejected, no update should exist
|
||||
assert.Len(t, summary.GetResult_().GetGetJobUpdateSummariesResult_().GetUpdateSummaries(), 0)
|
||||
})
|
||||
}
|
||||
|
||||
// Test configuring an executor that doesn't exist for CreateJob API
|
||||
|
|
30
retry.go
30
retry.go
|
@ -28,7 +28,7 @@ import (
|
|||
|
||||
type Backoff struct {
|
||||
Duration time.Duration // the base duration
|
||||
Factor float64 // Duration is multipled by factor each iteration
|
||||
Factor float64 // Duration is multiplied by a factor each iteration
|
||||
Jitter float64 // The amount of jitter applied each iteration
|
||||
Steps int // Exit with error after this many steps
|
||||
}
|
||||
|
@ -77,7 +77,7 @@ func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc)
|
|||
adjusted = Jitter(duration, backoff.Jitter)
|
||||
}
|
||||
|
||||
logger.Printf("A retriable error occurred during function call, backing off for %v before retrying\n", adjusted)
|
||||
logger.Printf("A retryable error occurred during function call, backing off for %v before retrying\n", adjusted)
|
||||
time.Sleep(adjusted)
|
||||
duration = time.Duration(float64(duration) * backoff.Factor)
|
||||
}
|
||||
|
@ -116,10 +116,11 @@ func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc)
|
|||
type auroraThriftCall func() (resp *aurora.Response, err error)
|
||||
|
||||
// Duplicates the functionality of ExponentialBackoff but is specifically targeted towards ThriftCalls.
|
||||
func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*aurora.Response, error) {
|
||||
func (r *realisClient) thriftCallWithRetries(returnOnTimeout bool, thriftCall auroraThriftCall) (*aurora.Response, error) {
|
||||
var resp *aurora.Response
|
||||
var clientErr error
|
||||
var curStep int
|
||||
timeouts := 0
|
||||
|
||||
backoff := r.config.backoff
|
||||
duration := backoff.Duration
|
||||
|
@ -133,7 +134,7 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro
|
|||
adjusted = Jitter(duration, backoff.Jitter)
|
||||
}
|
||||
|
||||
r.logger.Printf("A retriable error occurred during thrift call, backing off for %v before retry %v\n", adjusted, curStep)
|
||||
r.logger.Printf("A retryable error occurred during thrift call, backing off for %v before retry %v\n", adjusted, curStep)
|
||||
|
||||
time.Sleep(adjusted)
|
||||
duration = time.Duration(float64(duration) * backoff.Factor)
|
||||
|
@ -151,7 +152,7 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro
|
|||
r.logger.TracePrintf("Aurora Thrift Call ended resp: %v clientErr: %v\n", resp, clientErr)
|
||||
}()
|
||||
|
||||
// Check if our thrift call is returning an error. This is a retriable event as we don't know
|
||||
// Check if our thrift call is returning an error. This is a retryable event as we don't know
|
||||
// if it was caused by network issues.
|
||||
if clientErr != nil {
|
||||
|
||||
|
@ -169,7 +170,20 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro
|
|||
// when the server is overloaded and should be retried. All other errors that are permanent
|
||||
// will not be retried.
|
||||
if e.Err != io.EOF && !e.Temporary() {
|
||||
return nil, errors.Wrap(clientErr, "Permanent connection error")
|
||||
return nil, errors.Wrap(clientErr, "permanent connection error")
|
||||
}
|
||||
|
||||
// Corner case where thrift payload was received by Aurora but connection timedout before Aurora was
|
||||
// able to reply. In this case we will return whatever response was received and a TimedOut behaving
|
||||
// error. Users can take special action on a timeout by using IsTimedout and reacting accordingly.
|
||||
if e.Timeout() {
|
||||
timeouts++
|
||||
r.logger.DebugPrintf(
|
||||
"Client closed connection (timedout) %d times before server responded, consider increasing connection timeout",
|
||||
timeouts)
|
||||
if returnOnTimeout {
|
||||
return resp, newTimedoutError(errors.New("client connection closed before server answer"))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -183,7 +197,7 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro
|
|||
// If there was no client error, but the response is nil, something went wrong.
|
||||
// Ideally, we'll never encounter this but we're placing a safeguard here.
|
||||
if resp == nil {
|
||||
return nil, errors.New("Response from aurora is nil")
|
||||
return nil, errors.New("response from aurora is nil")
|
||||
}
|
||||
|
||||
// Check Response Code from thrift and make a decision to continue retrying or not.
|
||||
|
@ -210,7 +224,7 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro
|
|||
// It is currently not used as a response in the scheduler so it is unknown how to handle it.
|
||||
default:
|
||||
r.logger.DebugPrintf("unhandled response code %v received from Aurora\n", responseCode)
|
||||
return nil, errors.Errorf("unhandled response code from Aurora %v\n", responseCode.String())
|
||||
return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
13
runTests.sh
Executable file
13
runTests.sh
Executable file
|
@ -0,0 +1,13 @@
|
|||
#!/bin/bash
|
||||
|
||||
docker-compose up -d
|
||||
|
||||
# If running docker-compose up gives any error, don't do anything.
|
||||
if [ $? -ne 0 ]; then
|
||||
exit
|
||||
fi
|
||||
|
||||
# Since we run our docker compose setup in bridge mode to be able to run on MacOS, we have to launch a Docker container within the bridge network in order to avoid any routing issues.
|
||||
docker run --rm -t -v $(pwd):/go/src/github.com/paypal/gorealis --network gorealis_aurora_cluster golang:1.10-stretch go test -v github.com/paypal/gorealis $@
|
||||
|
||||
docker-compose down
|
2
runTestsMac.sh
Executable file → Normal file
2
runTestsMac.sh
Executable file → Normal file
|
@ -1,4 +1,4 @@
|
|||
#!/bin/bash
|
||||
|
||||
# Since we run our docker compose setup in bridge mode to be able to run on MacOS, we have to launch a Docker container within the bridge network in order to avoid any routing issues.
|
||||
docker run -t -v $(pwd):/go/src/github.com/paypal/gorealis --network gorealis_aurora_cluster golang:1.10-stretch go test -v github.com/paypal/gorealis $@
|
||||
docker run --rm -t -v $(pwd):/go/src/github.com/paypal/gorealis --network gorealis_aurora_cluster golang:1.10-stretch go test -v github.com/paypal/gorealis $@
|
||||
|
|
12
zk.go
12
zk.go
|
@ -103,7 +103,7 @@ func LeaderFromZKOpts(options ...ZKOpt) (string, error) {
|
|||
|
||||
c, _, err := zk.Connect(config.endpoints, config.timeout, func(c *zk.Conn) { c.SetLogger(config.logger) })
|
||||
if err != nil {
|
||||
return false, NewTemporaryError(errors.Wrap(err, "Failed to connect to Zookeeper"))
|
||||
return false, NewTemporaryError(errors.Wrap(err, "failed to connect to Zookeeper"))
|
||||
}
|
||||
|
||||
defer c.Close()
|
||||
|
@ -117,7 +117,7 @@ func LeaderFromZKOpts(options ...ZKOpt) (string, error) {
|
|||
return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", config.path)
|
||||
}
|
||||
|
||||
return false, NewTemporaryError(errors.Wrapf(err, "Path %s doesn't exist on Zookeeper ", config.path))
|
||||
return false, NewTemporaryError(errors.Wrapf(err, "path %s doesn't exist on Zookeeper ", config.path))
|
||||
}
|
||||
|
||||
// Search for the leader through all the children in the given path
|
||||
|
@ -134,12 +134,12 @@ func LeaderFromZKOpts(options ...ZKOpt) (string, error) {
|
|||
return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", childPath)
|
||||
}
|
||||
|
||||
return false, NewTemporaryError(errors.Wrap(err, "Error fetching contents of leader"))
|
||||
return false, NewTemporaryError(errors.Wrap(err, "unable to fetch contents of leader"))
|
||||
}
|
||||
|
||||
err = json.Unmarshal([]byte(data), serviceInst)
|
||||
if err != nil {
|
||||
return false, NewTemporaryError(errors.Wrap(err, "Unable to unmarshall contents of leader"))
|
||||
return false, NewTemporaryError(errors.Wrap(err, "unable to unmarshal contents of leader"))
|
||||
}
|
||||
|
||||
// Should only be one endpoint.
|
||||
|
@ -162,11 +162,11 @@ func LeaderFromZKOpts(options ...ZKOpt) (string, error) {
|
|||
}
|
||||
|
||||
// Leader data might not be available yet, try to fetch again.
|
||||
return false, NewTemporaryError(errors.New("No leader found"))
|
||||
return false, NewTemporaryError(errors.New("no leader found"))
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
config.logger.Printf("Failed to determine leader after %v attempts", config.backoff.Steps)
|
||||
config.logger.Printf("failed to determine leader after %v attempts", config.backoff.Steps)
|
||||
return "", retryErr
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue