Added configuration to fail on a non-temporary error. This is reverting to the original behavior of the retry mechanism. However, this allows the user to opt to fail in a non-temporary error.
This commit is contained in:
parent
11db9daa3b
commit
a00eb4ff39
5 changed files with 137 additions and 91 deletions
41
realis.go
41
realis.go
|
@ -37,10 +37,10 @@ import (
|
|||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
const VERSION = "1.21.0"
|
||||
const VERSION = "1.21.1"
|
||||
|
||||
// TODO(rdelvalle): Move documentation to interface in order to make godoc look better accessible
|
||||
// Or get rid of itnerface
|
||||
// Or get rid of the interface
|
||||
type Realis interface {
|
||||
AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error)
|
||||
AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error)
|
||||
|
@ -112,13 +112,14 @@ type RealisConfig struct {
|
|||
transport thrift.TTransport
|
||||
protoFactory thrift.TProtocolFactory
|
||||
logger *LevelLogger
|
||||
InsecureSkipVerify bool
|
||||
insecureSkipVerify bool
|
||||
certspath string
|
||||
clientKey, clientCert string
|
||||
options []ClientOption
|
||||
debug bool
|
||||
trace bool
|
||||
zkOptions []ZKOpt
|
||||
failOnPermanentErrors bool
|
||||
}
|
||||
|
||||
var defaultBackoff = Backoff{
|
||||
|
@ -128,11 +129,10 @@ var defaultBackoff = Backoff{
|
|||
Jitter: 0.1,
|
||||
}
|
||||
|
||||
const APIPath = "/api"
|
||||
|
||||
// ClientOption - An alias for a function that modifies the realis config object
|
||||
type ClientOption func(*RealisConfig)
|
||||
|
||||
//Config sets for options in RealisConfig.
|
||||
// BasicAuth - Set authentication used against Apache Shiro in the Aurora scheduler
|
||||
func BasicAuth(username, password string) ClientOption {
|
||||
return func(config *RealisConfig) {
|
||||
config.username = username
|
||||
|
@ -140,26 +140,29 @@ func BasicAuth(username, password string) ClientOption {
|
|||
}
|
||||
}
|
||||
|
||||
// SchedulerUrl - Set the immediate location of the current Aurora scheduler leader
|
||||
func SchedulerUrl(url string) ClientOption {
|
||||
return func(config *RealisConfig) {
|
||||
config.url = url
|
||||
}
|
||||
}
|
||||
|
||||
// TimeoutMS - Set the connection timeout for an HTTP post request in Miliseconds
|
||||
func TimeoutMS(timeout int) ClientOption {
|
||||
return func(config *RealisConfig) {
|
||||
config.timeoutms = timeout
|
||||
}
|
||||
}
|
||||
|
||||
// ZKCluster - Set a clusters.json provided cluster configuration to the client
|
||||
func ZKCluster(cluster *Cluster) ClientOption {
|
||||
return func(config *RealisConfig) {
|
||||
config.cluster = cluster
|
||||
}
|
||||
}
|
||||
|
||||
// ZKUrl - Set the direct location of a Zookeeper node on which the Aurora leader registers itself
|
||||
func ZKUrl(url string) ClientOption {
|
||||
|
||||
opts := []ZKOpt{ZKEndpoints(strings.Split(url, ",")...), ZKPath("/aurora/scheduler")}
|
||||
|
||||
return func(config *RealisConfig) {
|
||||
|
@ -171,6 +174,7 @@ func ZKUrl(url string) ClientOption {
|
|||
}
|
||||
}
|
||||
|
||||
// Retries - Configure the retry mechanism for the client
|
||||
func Retries(backoff Backoff) ClientOption {
|
||||
return func(config *RealisConfig) {
|
||||
config.backoff = backoff
|
||||
|
@ -195,9 +199,9 @@ func BackOff(b Backoff) ClientOption {
|
|||
}
|
||||
}
|
||||
|
||||
func InsecureSkipVerify(InsecureSkipVerify bool) ClientOption {
|
||||
func insecureSkipVerify(insecureSkipVerify bool) ClientOption {
|
||||
return func(config *RealisConfig) {
|
||||
config.InsecureSkipVerify = InsecureSkipVerify
|
||||
config.insecureSkipVerify = insecureSkipVerify
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -242,6 +246,14 @@ func Trace() ClientOption {
|
|||
}
|
||||
}
|
||||
|
||||
// FailOnPermanentErrors - If the client encounters a connection error the standard library
|
||||
// considers permanent, stop retrying and return an error to the user.
|
||||
func FailOnPermanentErrors() ClientOption {
|
||||
return func(config *RealisConfig) {
|
||||
config.failOnPermanentErrors = true
|
||||
}
|
||||
}
|
||||
|
||||
func newTJSONTransport(url string, timeout int, config *RealisConfig) (thrift.TTransport, error) {
|
||||
trans, err := defaultTTransport(url, timeout, config)
|
||||
if err != nil {
|
||||
|
@ -344,15 +356,18 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
|
|||
}
|
||||
config.logger.Println("Scheduler URL from ZK: ", url)
|
||||
} else if config.url != "" {
|
||||
url = config.url
|
||||
config.logger.Println("Scheduler URL: ", url)
|
||||
} else {
|
||||
return nil, errors.New("incomplete Options -- url, cluster.json, or Zookeeper address required")
|
||||
}
|
||||
|
||||
config.logger.Println("Addresss obtained: ", url)
|
||||
url, err = validateAuroraURL(url)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "invalid Aurora url")
|
||||
}
|
||||
config.logger.Println("Corrected address: ", url)
|
||||
|
||||
if config.jsonTransport {
|
||||
trans, err := newTJSONTransport(url, config.timeoutms, config)
|
||||
|
@ -424,7 +439,7 @@ func GetCerts(certPath string) (*x509.CertPool, error) {
|
|||
func defaultTTransport(url string, timeoutMs int, config *RealisConfig) (thrift.TTransport, error) {
|
||||
var transport http.Transport
|
||||
if config != nil {
|
||||
tlsConfig := &tls.Config{InsecureSkipVerify: config.InsecureSkipVerify}
|
||||
tlsConfig := &tls.Config{InsecureSkipVerify: config.insecureSkipVerify}
|
||||
|
||||
if config.certspath != "" {
|
||||
rootCAs, err := GetCerts(config.certspath)
|
||||
|
@ -452,11 +467,13 @@ func defaultTTransport(url string, timeoutMs int, config *RealisConfig) (thrift.
|
|||
}
|
||||
|
||||
trans, err := thrift.NewTHttpClientWithOptions(
|
||||
url+APIPath,
|
||||
url,
|
||||
thrift.THttpClientOptions{
|
||||
Client: &http.Client{
|
||||
Timeout: time.Millisecond * time.Duration(timeoutMs),
|
||||
Transport: &transport}})
|
||||
Transport: &transport,
|
||||
},
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Error creating transport")
|
||||
|
|
|
@ -65,34 +65,58 @@ func TestMain(m *testing.M) {
|
|||
}
|
||||
|
||||
func TestNonExistentEndpoint(t *testing.T) {
|
||||
backoff := realis.Backoff{ // Reduce penalties for this test to make it quick
|
||||
Steps: 5,
|
||||
Duration: 1 * time.Second,
|
||||
// Reduce penalties for this test to make it quick
|
||||
backoff := realis.Backoff{
|
||||
Steps: 3,
|
||||
Duration: 200 * time.Millisecond,
|
||||
Factor: 1.0,
|
||||
Jitter: 0.1}
|
||||
|
||||
// Attempt to connect to a bad endpoint
|
||||
r, err := realis.NewRealisClient(realis.SchedulerUrl("http://192.168.33.7:8081/doesntexist/"),
|
||||
realis.TimeoutMS(200),
|
||||
realis.BackOff(backoff),
|
||||
)
|
||||
|
||||
assert.NoError(t, err)
|
||||
defer r.Close()
|
||||
|
||||
taskQ := &aurora.TaskQuery{}
|
||||
badEndpoint := "http://idontexist.local:8081/api"
|
||||
|
||||
_, err = r.GetTasksWithoutConfigs(taskQ)
|
||||
t.Run("WithRetries", func(t *testing.T) {
|
||||
// Attempt to connect to a bad endpoint
|
||||
r, err := realis.NewRealisClient(
|
||||
realis.SchedulerUrl(badEndpoint),
|
||||
realis.TimeoutMS(200000),
|
||||
realis.BackOff(backoff),
|
||||
)
|
||||
|
||||
// Check that we do error out of retrying
|
||||
assert.Error(t, err)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, r)
|
||||
defer r.Close()
|
||||
|
||||
// Check that the error before this one was a a retry error
|
||||
// TODO: Consider bubbling up timeout behaving error all the way up to the user.
|
||||
retryErr := realis.ToRetryCount(errors.Cause(err))
|
||||
assert.NotNil(t, retryErr, "error passed in is not a retry error")
|
||||
_, err = r.GetTasksWithoutConfigs(taskQ)
|
||||
|
||||
assert.Equal(t, backoff.Steps, retryErr.RetryCount(), "retry count is incorrect")
|
||||
// Check that we do error out of retrying
|
||||
require.Error(t, err)
|
||||
// Check that the error before this one was a a retry error
|
||||
// TODO: Consider bubbling up timeout behaving error all the way up to the user.
|
||||
retryErr := realis.ToRetryCount(errors.Cause(err))
|
||||
require.NotNil(t, retryErr, "error passed in is not a retry error")
|
||||
|
||||
assert.Equal(t, backoff.Steps, retryErr.RetryCount(), "retry count is incorrect")
|
||||
})
|
||||
|
||||
t.Run("FailOnLookup", func(t *testing.T) {
|
||||
// Attempt to connect to a bad endpoint
|
||||
r, err := realis.NewRealisClient(
|
||||
realis.SchedulerUrl(badEndpoint),
|
||||
realis.TimeoutMS(200000),
|
||||
realis.BackOff(backoff),
|
||||
realis.FailOnPermanentErrors(),
|
||||
)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, r)
|
||||
defer r.Close()
|
||||
|
||||
_, err = r.GetTasksWithoutConfigs(taskQ)
|
||||
|
||||
// Check that we do error out of retrying
|
||||
require.Error(t, err)
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
|
@ -102,7 +126,8 @@ func TestThriftBinary(t *testing.T) {
|
|||
realis.TimeoutMS(20000),
|
||||
realis.ThriftBinary())
|
||||
|
||||
assert.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
defer r.Close()
|
||||
|
||||
role := "all"
|
||||
taskQ := &aurora.TaskQuery{
|
||||
|
@ -111,11 +136,7 @@ func TestThriftBinary(t *testing.T) {
|
|||
|
||||
// Perform a simple API call to test Thrift Binary
|
||||
_, err = r.GetTasksWithoutConfigs(taskQ)
|
||||
|
||||
assert.NoError(t, err)
|
||||
|
||||
r.Close()
|
||||
|
||||
}
|
||||
|
||||
func TestThriftJSON(t *testing.T) {
|
||||
|
@ -124,7 +145,8 @@ func TestThriftJSON(t *testing.T) {
|
|||
realis.TimeoutMS(20000),
|
||||
realis.ThriftJSON())
|
||||
|
||||
assert.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
defer r.Close()
|
||||
|
||||
role := "all"
|
||||
taskQ := &aurora.TaskQuery{
|
||||
|
@ -136,8 +158,6 @@ func TestThriftJSON(t *testing.T) {
|
|||
|
||||
assert.NoError(t, err)
|
||||
|
||||
r.Close()
|
||||
|
||||
}
|
||||
|
||||
func TestNoopLogger(t *testing.T) {
|
||||
|
@ -145,7 +165,8 @@ func TestNoopLogger(t *testing.T) {
|
|||
realis.BasicAuth("aurora", "secret"),
|
||||
realis.SetLogger(realis.NoopLogger{}))
|
||||
|
||||
assert.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
defer r.Close()
|
||||
|
||||
role := "all"
|
||||
taskQ := &aurora.TaskQuery{
|
||||
|
@ -154,24 +175,20 @@ func TestNoopLogger(t *testing.T) {
|
|||
|
||||
// Perform a simple API call to test Thrift Binary
|
||||
_, err = r.GetTasksWithoutConfigs(taskQ)
|
||||
|
||||
assert.NoError(t, err)
|
||||
|
||||
r.Close()
|
||||
}
|
||||
|
||||
func TestLeaderFromZK(t *testing.T) {
|
||||
cluster := realis.GetDefaultClusterFromZKUrl("192.168.33.2:2181")
|
||||
url, err := realis.LeaderFromZK(*cluster)
|
||||
|
||||
assert.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Address stored inside of ZK might be different than the one we connect to in our tests.
|
||||
assert.Equal(t, "http://192.168.33.7:8081", url)
|
||||
}
|
||||
|
||||
func TestRealisClient_ReestablishConn(t *testing.T) {
|
||||
|
||||
// Test that we're able to tear down the old connection and create a new one.
|
||||
err := r.ReestablishConn()
|
||||
|
||||
|
@ -180,9 +197,8 @@ func TestRealisClient_ReestablishConn(t *testing.T) {
|
|||
|
||||
func TestGetCACerts(t *testing.T) {
|
||||
certs, err := realis.GetCerts("./examples/certs")
|
||||
assert.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, len(certs.Subjects()), 2)
|
||||
|
||||
}
|
||||
|
||||
func TestRealisClient_CreateJob_Thermos(t *testing.T) {
|
||||
|
@ -202,7 +218,7 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) {
|
|||
AddPorts(1)
|
||||
|
||||
_, err := r.CreateJob(job)
|
||||
assert.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Test Instances Monitor
|
||||
success, err := monitor.Instances(job.JobKey(), job.GetInstanceCount(), 1, 50)
|
||||
|
@ -230,7 +246,7 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) {
|
|||
status, err := r.GetTaskStatus(&aurora.TaskQuery{
|
||||
JobKeys: []*aurora.JobKey{job.JobKey()},
|
||||
Statuses: []aurora.ScheduleStatus{aurora.ScheduleStatus_RUNNING}})
|
||||
assert.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
assert.NotNil(t, status)
|
||||
assert.Len(t, status, 2)
|
||||
|
||||
|
@ -239,7 +255,7 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) {
|
|||
|
||||
t.Run("AddInstances", func(t *testing.T) {
|
||||
_, err := r.AddInstances(aurora.InstanceKey{JobKey: job.JobKey(), InstanceId: 0}, 2)
|
||||
assert.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
success, err := monitor.Instances(job.JobKey(), 4, 1, 50)
|
||||
assert.True(t, success)
|
||||
assert.NoError(t, err)
|
||||
|
@ -247,7 +263,7 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) {
|
|||
|
||||
t.Run("KillInstances", func(t *testing.T) {
|
||||
_, err := r.KillInstances(job.JobKey(), 2, 3)
|
||||
assert.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
success, err := monitor.Instances(job.JobKey(), 2, 1, 50)
|
||||
assert.True(t, success)
|
||||
assert.NoError(t, err)
|
||||
|
@ -255,7 +271,7 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) {
|
|||
|
||||
t.Run("RestartInstances", func(t *testing.T) {
|
||||
_, err := r.RestartInstances(job.JobKey(), 0)
|
||||
assert.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
success, err := monitor.Instances(job.JobKey(), 2, 1, 50)
|
||||
assert.True(t, success)
|
||||
assert.NoError(t, err)
|
||||
|
@ -263,7 +279,7 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) {
|
|||
|
||||
t.Run("RestartJobs", func(t *testing.T) {
|
||||
_, err := r.RestartJob(job.JobKey())
|
||||
assert.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
success, err := monitor.Instances(job.JobKey(), 2, 1, 50)
|
||||
assert.True(t, success)
|
||||
assert.NoError(t, err)
|
||||
|
@ -272,7 +288,7 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) {
|
|||
// Tasks must exist for it to, be killed
|
||||
t.Run("KillJob", func(t *testing.T) {
|
||||
_, err := r.KillJob(job.JobKey())
|
||||
assert.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
success, err := monitor.Instances(job.JobKey(), 0, 1, 50)
|
||||
assert.True(t, success)
|
||||
assert.NoError(t, err)
|
||||
|
@ -287,7 +303,7 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) {
|
|||
AddLabel("chips", "chips")
|
||||
|
||||
_, err := r.CreateJob(job)
|
||||
assert.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
|
||||
success, err := monitor.Instances(job.JobKey(), 2, 1, 50)
|
||||
assert.True(t, success)
|
||||
|
@ -300,7 +316,6 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) {
|
|||
|
||||
// Test configuring an executor that doesn't exist for CreateJob API
|
||||
func TestRealisClient_CreateJob_ExecutorDoesNotExist(t *testing.T) {
|
||||
|
||||
// Create a single job
|
||||
job := realis.NewJob().
|
||||
Environment("prod").
|
||||
|
@ -338,7 +353,7 @@ func TestRealisClient_GetPendingReason(t *testing.T) {
|
|||
InstanceCount(1)
|
||||
|
||||
resp, err := r.CreateJob(job)
|
||||
assert.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode)
|
||||
|
||||
taskQ := &aurora.TaskQuery{
|
||||
|
@ -381,7 +396,7 @@ func TestRealisClient_CreateService_WithPulse_Thermos(t *testing.T) {
|
|||
job.InstanceCount(2)
|
||||
|
||||
_, result, err := r.CreateService(job, settings)
|
||||
assert.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
|
||||
updateQ := aurora.JobUpdateQuery{
|
||||
Key: result.GetKey(),
|
||||
|
@ -401,13 +416,13 @@ pulseLoop:
|
|||
case <-ticker.C:
|
||||
|
||||
_, err = r.PulseJobUpdate(result.GetKey())
|
||||
assert.Nil(t, err, "unable to pulse job update")
|
||||
assert.NoError(t, err, "unable to pulse job update")
|
||||
|
||||
respDetail, err := r.JobUpdateDetails(updateQ)
|
||||
assert.Nil(t, err)
|
||||
assert.NoError(t, err)
|
||||
|
||||
updateDetails = response.JobUpdateDetails(respDetail)
|
||||
assert.Len(t, updateDetails, 1, "No update found")
|
||||
require.Len(t, updateDetails, 1, "No update found")
|
||||
|
||||
status := updateDetails[0].Update.Summary.State.Status
|
||||
if _, ok := realis.ActiveJobUpdateStates[status]; !ok {
|
||||
|
@ -426,7 +441,8 @@ pulseLoop:
|
|||
_, err := r.AbortJobUpdate(*updateDetails[0].GetUpdate().GetSummary().GetKey(), "")
|
||||
assert.NoError(t, err)
|
||||
_, err = r.KillJob(job.JobKey())
|
||||
require.NoError(t, err, "timed out during pulse update test")
|
||||
assert.NoError(t, err, "timed out during pulse update test")
|
||||
t.FailNow()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -456,7 +472,7 @@ func TestRealisClient_CreateService(t *testing.T) {
|
|||
job.InstanceCount(3)
|
||||
|
||||
_, result, err := r.CreateService(job, settings)
|
||||
assert.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
assert.NotNil(t, result)
|
||||
|
||||
// Test asking the scheduler to backup a Snapshot
|
||||
|
@ -480,21 +496,24 @@ func TestRealisClient_CreateService(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
require.NoError(t, mErr)
|
||||
assert.True(t, ok)
|
||||
assert.NoError(t, mErr)
|
||||
|
||||
// Kill task test task after confirming it came up fine
|
||||
_, err = r.KillJob(job.JobKey())
|
||||
assert.NoError(t, err)
|
||||
|
||||
success, err := monitor.Instances(job.JobKey(), 0, 1, 50)
|
||||
require.NoError(t, mErr)
|
||||
assert.True(t, success)
|
||||
|
||||
// Create a client which will timeout and close the connection before receiving an answer
|
||||
timeoutClient, err := realis.NewRealisClient(realis.SchedulerUrl(auroraURL),
|
||||
timeoutClient, err := realis.NewRealisClient(
|
||||
realis.SchedulerUrl(auroraURL),
|
||||
realis.BasicAuth("aurora", "secret"),
|
||||
realis.TimeoutMS(10))
|
||||
assert.NoError(t, err)
|
||||
realis.TimeoutMS(10),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
defer timeoutClient.Close()
|
||||
|
||||
// Test case where http connection timeouts out.
|
||||
|
@ -503,7 +522,7 @@ func TestRealisClient_CreateService(t *testing.T) {
|
|||
|
||||
// Make sure a timedout error was returned
|
||||
_, _, err = timeoutClient.CreateService(job, settings)
|
||||
assert.Error(t, err)
|
||||
require.Error(t, err)
|
||||
assert.True(t, realis.IsTimeout(err))
|
||||
|
||||
updateReceivedQuery := aurora.JobUpdateQuery{
|
||||
|
@ -513,9 +532,9 @@ func TestRealisClient_CreateService(t *testing.T) {
|
|||
Limit: 1}
|
||||
|
||||
updateSummaries, err := monitor.JobUpdateQuery(updateReceivedQuery, time.Second*1, time.Second*50)
|
||||
assert.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Len(t, updateSummaries, 1)
|
||||
require.Len(t, updateSummaries, 1)
|
||||
|
||||
r.AbortJobUpdate(*updateSummaries[0].Key, "Cleaning up")
|
||||
_, err = r.KillJob(job.JobKey())
|
||||
|
@ -531,7 +550,7 @@ func TestRealisClient_CreateService(t *testing.T) {
|
|||
|
||||
// Make sure a timedout error was returned
|
||||
_, _, err = timeoutClient.CreateService(job, settings)
|
||||
assert.Error(t, err)
|
||||
require.Error(t, err)
|
||||
assert.True(t, realis.IsTimeout(err))
|
||||
|
||||
summary, err := r.GetJobUpdateSummaries(
|
||||
|
@ -542,7 +561,7 @@ func TestRealisClient_CreateService(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
|
||||
// Payload should have been rejected, no update should exist
|
||||
assert.Len(t, summary.GetResult_().GetGetJobUpdateSummariesResult_().GetUpdateSummaries(), 0)
|
||||
require.Len(t, summary.GetResult_().GetGetJobUpdateSummariesResult_().GetUpdateSummaries(), 0)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -565,9 +584,9 @@ func TestRealisClient_CreateService_ExecutorDoesNotExist(t *testing.T) {
|
|||
job.InstanceCount(3)
|
||||
|
||||
resp, result, err := r.CreateService(job, settings)
|
||||
assert.Error(t, err)
|
||||
require.Error(t, err)
|
||||
assert.Nil(t, result)
|
||||
assert.Equal(t, aurora.ResponseCode_INVALID_REQUEST, resp.GetResponseCode())
|
||||
require.Equal(t, aurora.ResponseCode_INVALID_REQUEST, resp.GetResponseCode())
|
||||
}
|
||||
|
||||
func TestRealisClient_ScheduleCronJob_Thermos(t *testing.T) {
|
||||
|
@ -591,23 +610,23 @@ func TestRealisClient_ScheduleCronJob_Thermos(t *testing.T) {
|
|||
IsService(false)
|
||||
|
||||
_, err = r.ScheduleCronJob(job)
|
||||
assert.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Run("Start", func(t *testing.T) {
|
||||
_, err := r.StartCronJob(job.JobKey())
|
||||
assert.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("Deschedule", func(t *testing.T) {
|
||||
_, err := r.DescheduleCronJob(job.JobKey())
|
||||
assert.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
}
|
||||
func TestRealisClient_StartMaintenance(t *testing.T) {
|
||||
hosts := []string{"localhost"}
|
||||
|
||||
_, _, err := r.StartMaintenance(hosts...)
|
||||
assert.NoError(t, err, "unable to start maintenance")
|
||||
require.NoError(t, err, "unable to start maintenance")
|
||||
|
||||
// Monitor change to DRAINING and DRAINED mode
|
||||
hostResults, err := monitor.HostMaintenance(
|
||||
|
@ -615,11 +634,11 @@ func TestRealisClient_StartMaintenance(t *testing.T) {
|
|||
[]aurora.MaintenanceMode{aurora.MaintenanceMode_SCHEDULED},
|
||||
1,
|
||||
50)
|
||||
assert.Equal(t, map[string]bool{"localhost": true}, hostResults)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, map[string]bool{"localhost": true}, hostResults)
|
||||
|
||||
_, _, err = r.EndMaintenance(hosts...)
|
||||
assert.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Monitor change to DRAINING and DRAINED mode
|
||||
_, err = monitor.HostMaintenance(
|
||||
|
@ -645,8 +664,8 @@ func TestRealisClient_DrainHosts(t *testing.T) {
|
|||
[]aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING},
|
||||
1,
|
||||
50)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, map[string]bool{"localhost": true}, hostResults)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("MonitorNonExistentHost", func(t *testing.T) {
|
||||
|
@ -658,13 +677,13 @@ func TestRealisClient_DrainHosts(t *testing.T) {
|
|||
1)
|
||||
|
||||
// Assert monitor returned an error that was not nil, and also a list of the non-transitioned hosts
|
||||
assert.Error(t, err)
|
||||
require.Error(t, err)
|
||||
assert.Equal(t, map[string]bool{"localhost": true, "IMAGINARY_HOST": false}, hostResults)
|
||||
})
|
||||
|
||||
t.Run("EndMaintenance", func(t *testing.T) {
|
||||
_, _, err := r.EndMaintenance(hosts...)
|
||||
assert.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Monitor change to DRAINING and DRAINED mode
|
||||
_, err = monitor.HostMaintenance(
|
||||
|
@ -682,7 +701,7 @@ func TestRealisClient_SLADrainHosts(t *testing.T) {
|
|||
policy := aurora.SlaPolicy{PercentageSlaPolicy: &aurora.PercentageSlaPolicy{Percentage: 50.0}}
|
||||
|
||||
_, err := r.SLADrainHosts(&policy, 30, hosts...)
|
||||
assert.NoError(t, err, "unable to drain host with SLA policy")
|
||||
require.NoError(t, err, "unable to drain host with SLA policy")
|
||||
|
||||
// Monitor change to DRAINING and DRAINED mode
|
||||
hostResults, err := monitor.HostMaintenance(
|
||||
|
@ -690,11 +709,11 @@ func TestRealisClient_SLADrainHosts(t *testing.T) {
|
|||
[]aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING},
|
||||
1,
|
||||
50)
|
||||
assert.Equal(t, map[string]bool{"localhost": true}, hostResults)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, map[string]bool{"localhost": true}, hostResults)
|
||||
|
||||
_, _, err = r.EndMaintenance(hosts...)
|
||||
assert.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Monitor change to DRAINING and DRAINED mode
|
||||
_, err = monitor.HostMaintenance(
|
||||
|
@ -879,7 +898,7 @@ func TestAuroraJob_UpdateSlaPolicy(t *testing.T) {
|
|||
settings.MinWaitInInstanceRunningMs = 5 * 1000
|
||||
|
||||
_, result, err := r.CreateService(job, settings)
|
||||
assert.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
assert.NotNil(t, result)
|
||||
|
||||
var ok bool
|
||||
|
@ -893,8 +912,8 @@ func TestAuroraJob_UpdateSlaPolicy(t *testing.T) {
|
|||
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
assert.True(t, ok)
|
||||
assert.NoError(t, mErr)
|
||||
assert.True(t, ok)
|
||||
|
||||
// Kill task test task after confirming it came up fine
|
||||
_, err = r.KillJob(job.JobKey())
|
||||
|
|
3
retry.go
3
retry.go
|
@ -173,10 +173,11 @@ func (r *realisClient) thriftCallWithRetries(
|
|||
|
||||
e, ok := e.Err().(*url.Error)
|
||||
if ok {
|
||||
|
||||
// EOF error occurs when the server closes the read buffer of the client. This is common
|
||||
// when the server is overloaded and should be retried. All other errors that are permanent
|
||||
// will not be retried.
|
||||
if e.Err != io.EOF && !e.Temporary() {
|
||||
if e.Err != io.EOF && !e.Temporary() && r.RealisConfig().failOnPermanentErrors {
|
||||
return nil, errors.Wrap(clientErr, "permanent connection error")
|
||||
}
|
||||
|
||||
|
|
5
util.go
5
util.go
|
@ -8,6 +8,8 @@ import (
|
|||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
const apiPath = "/api"
|
||||
|
||||
var ActiveStates = make(map[aurora.ScheduleStatus]bool)
|
||||
var SlaveAssignedStates = make(map[aurora.ScheduleStatus]bool)
|
||||
var LiveStates = make(map[aurora.ScheduleStatus]bool)
|
||||
|
@ -67,7 +69,8 @@ func validateAuroraURL(urlStr string) (string, error) {
|
|||
return "", errors.Errorf("only protocols http and https are supported %v\n", u.Scheme)
|
||||
}
|
||||
|
||||
if u.Path != APIPath {
|
||||
// This could theoretically be elsewhwere but we'll be strict for the sake of simplicty
|
||||
if u.Path != apiPath {
|
||||
return "", errors.Errorf("expected /api path %v\n", u.Path)
|
||||
}
|
||||
|
||||
|
|
|
@ -45,6 +45,12 @@ func TestAuroraURLValidator(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("ipAddrNoPath", func(t *testing.T) {
|
||||
url, err := validateAuroraURL("http://192.168.1.33:8081")
|
||||
assert.Equal(t, "http://192.168.1.33:8081/api", url)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("URLNoProtocol", func(t *testing.T) {
|
||||
url, err := validateAuroraURL("goodurl.com:8081/api")
|
||||
assert.Equal(t, "http://goodurl.com:8081/api", url)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue