Thread safety, misc fixes, and refactoring (#51)

* Changing incorrect license in some source files.

* Changing CreateService to mimic CreateJob by setting the batch size to the instance count.

* Changing Getcerts to GetCerts to match the style of the rest of the codebase.

* Overhauled error handling. Backoff now recognizes temporary errors and continues to retry if it finds one.

* Changed thrift function call wrapper to be more explicitly named and to perform more safety checks.

* Moved Jitter function from realis to retry.

* API code is now more uniform and follows a certain template.

* Lock added whenever a thrift call is made or when a modification is done to the connection. Note that calling ReestablishConn externally may result in some race conditions. We will move to make this function private in the near future.

* Added test for Realis session thread safety. Tested ScheduleStatus monitor. Tested monitor timing out.

* Returning nil whenever there is an error return so that there are no ambiguities.

* Using defer with unlock so that the lock is still released if a panic is invoked.
This commit is contained in:
Renan DelValle 2018-01-21 19:30:01 -08:00 committed by GitHub
parent b2ffb73183
commit a941bcb679
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 256 additions and 245 deletions

View file

@ -1,5 +1,5 @@
-----------------------------------------
## Please read instrucitons below ##
## Please read instructions below ##
Before submitting, please make sure you run a vagrant box running Aurora with the latest version shown in .auroraversion and run go test from the project root.

View file

@ -1,18 +1,16 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package realis

377
realis.go
View file

@ -21,12 +21,13 @@ import (
"encoding/base64"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"net/http/cookiejar"
"path/filepath"
"time"
"sync"
"git.apache.org/thrift.git/lib/go/thrift"
"github.com/paypal/gorealis/gen-go/apache/aurora"
"github.com/paypal/gorealis/response"
@ -56,6 +57,7 @@ type Realis interface {
ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error)
StartCronJob(key *aurora.JobKey) (*aurora.Response, error)
// TODO: Remove this method and make it private to avoid race conditions
ReestablishConn() error
RealisConfig() *RealisConfig
Close()
@ -72,6 +74,7 @@ type realisClient struct {
readonlyClient *aurora.ReadOnlySchedulerClient
adminClient *aurora.AuroraAdminClient
logger Logger
lock sync.Mutex
}
type RealisConfig struct {
@ -285,19 +288,6 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
}
// Jitter returns a time.Duration between duration and duration + maxFactor *
// duration.
//
// This allows clients to avoid converging on periodic behavior. If maxFactor
// is 0.0, a suggested default value will be chosen.
func Jitter(duration time.Duration, maxFactor float64) time.Duration {
if maxFactor <= 0.0 {
maxFactor = 1.0
}
wait := duration + time.Duration(rand.Float64()*maxFactor*float64(duration))
return wait
}
func GetDefaultClusterFromZKUrl(zkurl string) *Cluster {
return &Cluster{
Name: "defaultCluster",
@ -309,7 +299,7 @@ func GetDefaultClusterFromZKUrl(zkurl string) *Cluster {
}
}
func Getcerts(certpath string) (*x509.CertPool, error) {
func GetCerts(certpath string) (*x509.CertPool, error) {
globalRootCAs := x509.NewCertPool()
caFiles, err := ioutil.ReadDir(certpath)
if err != nil {
@ -339,7 +329,7 @@ func defaultTTransport(urlstr string, timeoutms int, config *RealisConfig) (thri
tlsConfig.InsecureSkipVerify = true
}
if config.certspath != "" {
rootCAs, err := Getcerts("examples/certs")
rootCAs, err := GetCerts("examples/certs")
if err != nil {
config.logger.Println("error occured couldn't fetch certs")
return nil, err
@ -428,15 +418,51 @@ func basicAuth(username, password string) string {
return base64.StdEncoding.EncodeToString([]byte(auth))
}
type auroraThriftCall func() (resp *aurora.Response, err error)
// Takes a Thrift API function call and returns response and error.
// If Error from the API call is retryable, the functions re-establishes the connection with Aurora by
// using the same configuration used by the original client. Locks usage of and changes to client connection in order
// to make realis sessions thread safe.
func (r *realisClient) thriftCallHelper(auroraCall auroraThriftCall) (*aurora.Response, error) {
// Only allow one go-routine make use or modify the thrift client connection
r.lock.Lock()
defer r.lock.Unlock()
resp, cliErr := auroraCall()
if cliErr != nil {
// Re-establish conn returns a temporary error or nil
// as we can always retry to connect to the scheduler.
retryConnErr := r.ReestablishConn()
return resp, retryConnErr
}
if resp == nil {
return nil, errors.New("Response is nil")
}
if resp.GetResponseCode() == aurora.ResponseCode_ERROR_TRANSIENT {
return resp, NewTemporaryError(errors.New("Aurora scheduler temporarily unavailable"))
}
if resp.GetResponseCode() != aurora.ResponseCode_OK {
return nil, errors.New(response.CombineMessage(resp))
}
return resp, nil
}
func (r *realisClient) ReestablishConn() error {
// Close existing connection
r.logger.Println("ReestablishConn begin ....")
r.logger.Println("Re-establishing Connection to Aurora")
r.Close()
// Recreate connection from scratch using original options
newRealis, err := NewRealisClient(r.config.options...)
if err != nil {
return err
// This could be a temporary network hiccup
return NewTemporaryError(err)
}
// If we are able to successfully re-connect, make receiver
@ -471,25 +497,25 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche
var resp *aurora.Response
var clientErr error
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.client.GetTasksWithoutConfigs(taskQ)
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
// Pass error directly to backoff which makes exceptions for temporary errors
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
// If we encountered an error we couldn't recover from by retrying, return an error to the user
if retryErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error querying Aurora Scheduler for active IDs")
}
resp, clientErr = response.ResponseCodeCheck(resp)
if clientErr != nil {
return nil, clientErr
}
// Construct instance id map to stay in line with thrift's representation of sets
tasks := response.ScheduleStatusResult(resp).GetTasks()
jobInstanceIds := make(map[int32]bool)
for _, task := range tasks {
@ -504,23 +530,22 @@ func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue
var clientErr error
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.readonlyClient.GetJobUpdateSummaries(jobUpdateQuery)
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
if retryErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error getting job update summaries from Aurora Scheduler")
}
return response.ResponseCodeCheck(resp)
return resp, nil
}
// Kill specific instances of a job.
@ -535,22 +560,21 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a
}
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.client.KillTasks(key, instanceIds, "")
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
if retryErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Kill command to Aurora Scheduler")
}
return response.ResponseCodeCheck(resp)
return resp, nil
}
func (r *realisClient) RealisConfig() *RealisConfig {
@ -570,21 +594,21 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
if len(instanceIds) > 0 {
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.client.KillTasks(key, instanceIds, "")
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
return nil, errors.Wrap(err, retryErr.Error()+"Error sending Kill command to Aurora Scheduler")
if retryErr != nil {
return nil, errors.Wrap(err, retryErr.Error()+": Error sending Kill command to Aurora Scheduler")
}
return response.ResponseCodeCheck(resp)
return resp, nil
}
return nil, errors.New("No tasks in the Active state")
}
@ -598,23 +622,21 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
var clientErr error
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.client.CreateJob(auroraJob.JobConfig())
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending Create command to Aurora Scheduler")
if retryErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Create command to Aurora Scheduler")
}
return response.ResponseCodeCheck(resp)
return resp, nil
}
// This API uses an update thrift call to create the services giving a few more robust features.
@ -622,6 +644,7 @@ func (r *realisClient) CreateService(auroraJob Job, settings UpdateSettings) (*a
// Create a new job update object and ship it to the StartJobUpdate api
update := NewUpdateJob(auroraJob.TaskConfig(), &settings.settings)
update.InstanceCount(auroraJob.GetInstanceCount())
update.BatchSize(auroraJob.GetInstanceCount())
resp, err := r.StartJobUpdate(update, "")
if err != nil {
@ -640,22 +663,21 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
var clientErr error
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.client.ScheduleCronJob(auroraJob.JobConfig())
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending Cron Job Schedule message to Aurora Scheduler")
if retryErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Cron Job Schedule message to Aurora Scheduler")
}
return response.ResponseCodeCheck(resp)
return resp, nil
}
func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) {
@ -664,22 +686,21 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response,
var clientErr error
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.client.DescheduleCronJob(key)
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending Cron Job De-schedule message to Aurora Scheduler")
if retryErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Cron Job De-schedule message to Aurora Scheduler")
}
return response.ResponseCodeCheck(resp)
return resp, nil
}
@ -688,22 +709,21 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error
var clientErr error
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.client.StartCronJob(key)
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending Start Cron Job message to Aurora Scheduler")
if retryErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Start Cron Job message to Aurora Scheduler")
}
return response.ResponseCodeCheck(resp)
return resp, nil
}
@ -718,22 +738,21 @@ func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32)
var clientErr error
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.client.RestartShards(key, instanceIds)
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending Restart command to Aurora Scheduler")
if retryErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Restart command to Aurora Scheduler")
}
return response.ResponseCodeCheck(resp)
return resp, nil
}
// Restarts all active tasks under a job configuration.
@ -747,22 +766,22 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
var clientErr error
if len(instanceIds) > 0 {
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.client.RestartShards(key, instanceIds)
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending Restart command to Aurora Scheduler")
}
return response.ResponseCodeCheck(resp)
if retryErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Restart command to Aurora Scheduler")
}
return resp, nil
} else {
return nil, errors.New("No tasks in the Active state")
}
@ -775,22 +794,21 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au
var clientErr error
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.client.StartJobUpdate(updateJob.req, message)
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending StartJobUpdate command to Aurora Scheduler")
if retryErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending StartJobUpdate command to Aurora Scheduler")
}
return response.ResponseCodeCheck(resp)
return resp, nil
}
// Abort Job Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI.
@ -801,22 +819,21 @@ func (r *realisClient) AbortJobUpdate(
var clientErr error
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.client.AbortJobUpdate(&updateKey, message)
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending AbortJobUpdate command to Aurora Scheduler")
if retryErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending AbortJobUpdate command to Aurora Scheduler")
}
return response.ResponseCodeCheck(resp)
return resp, nil
}
// Scale up the number of instances under a job configuration using the configuration for specific
@ -827,22 +844,21 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a
var clientErr error
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.client.AddInstances(&instKey, count)
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending AddInstances command to Aurora Scheduler")
if retryErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending AddInstances command to Aurora Scheduler")
}
return response.ResponseCodeCheck(resp)
return resp, nil
}
@ -875,24 +891,19 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.S
var clientErr error
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.client.GetTasksStatus(query)
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+"Error querying Aurora Scheduler for task status")
}
//Check for response code..
if resp.GetResponseCode() != aurora.ResponseCode_OK {
return nil, errors.New(resp.ResponseCode.String() + "--" + response.CombineMessage(resp))
if retryErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error querying Aurora Scheduler for task status")
}
return response.ScheduleStatusResult(resp).GetTasks(), nil
@ -904,24 +915,19 @@ func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks []
var clientErr error
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.client.GetTasksWithoutConfigs(query)
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+"Error querying Aurora Scheduler for task status without configs")
}
//Check for response code..
if resp.GetResponseCode() != aurora.ResponseCode_OK {
return nil, errors.New(resp.ResponseCode.String() + "--" + response.CombineMessage(resp))
if retryErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error querying Aurora Scheduler for task status without configs")
}
return response.ScheduleStatusResult(resp).GetTasks(), nil
@ -946,24 +952,19 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
var clientErr error
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.client.GetTasksStatus(taskQ)
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+"Error querying Aurora Scheduler for task configuration")
}
//Check for response code..
if resp.GetResponseCode() != aurora.ResponseCode_OK {
return nil, errors.New(resp.ResponseCode.String() + "--" + response.CombineMessage(resp))
if retryErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+": Error querying Aurora Scheduler for task configuration")
}
tasks := response.ScheduleStatusResult(resp).GetTasks()
@ -986,22 +987,21 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur
var clientErr error
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.client.GetJobUpdateDetails(&updateQuery)
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+"Unable to get job update details")
if retryErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+": Unable to get job update details")
}
return response.ResponseCodeCheck(resp)
return resp, nil
}
@ -1010,22 +1010,21 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string
var clientErr error
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.client.RollbackJobUpdate(&key, message)
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+"Unable to roll back job update")
if retryErr != nil {
return nil, errors.Wrap(clientErr, retryErr.Error()+": Unable to roll back job update")
}
return response.ResponseCodeCheck(resp)
return resp, nil
}
// Set a list of nodes to DRAINING. This means nothing will be able to be scheduled on them and any existing
@ -1048,26 +1047,25 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr
}
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.adminClient.DrainHosts(drainList)
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
return nil, nil, errors.Wrap(clientErr, retryErr.Error()+"Unable to recover connection")
}
if resp != nil && resp.GetResult_() != nil {
result = resp.GetResult_().GetDrainHostsResult_()
}
if retryErr != nil {
return resp, result, errors.Wrap(clientErr, retryErr.Error()+": Unable to recover connection")
}
return resp, result, nil
}
@ -1088,26 +1086,25 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror
}
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.adminClient.EndMaintenance(hostList)
})
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
return false, nil
}
if clientErr != nil {
return false, clientErr
}
return true, nil
})
if clientErr != nil {
return nil, nil, errors.Wrap(clientErr, retryErr.Error()+"Unable to recover connection")
}
if resp != nil && resp.GetResult_() != nil {
result = resp.GetResult_().GetEndMaintenanceResult_()
}
if retryErr != nil {
return resp, result, errors.Wrap(clientErr, retryErr.Error()+": Unable to recover connection")
}
return resp, result, nil
}
@ -1115,7 +1112,7 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au
var resp *aurora.Response
var result *aurora.MaintenanceStatusResult_
var returnErr, clientErr, payloadErr error
var clientErr error
if len(hosts) == 0 {
return nil, nil, errors.New("no hosts provided to get maintenance status from")
@ -1129,23 +1126,14 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au
retryErr := ExponentialBackoff(defaultBackoff, func() (bool, error) {
// Send thrift call, if we have a thrift send error, attempt to reconnect
// and continue trying to resend command
if resp, clientErr = r.adminClient.MaintenanceStatus(hostList); clientErr != nil {
// Experienced an connection error
err1 := r.ReestablishConn()
if err1 != nil {
r.logger.Println("error in re-establishing connection: ", err1)
}
return false, nil
}
// Make thrift call. If we encounter an error sending the call, attempt to reconnect
// and continue trying to resend command until we run out of retries.
resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) {
return r.adminClient.MaintenanceStatus(hostList)
})
// If error is NOT due to connection
if _, payloadErr = response.ResponseCodeCheck(resp); payloadErr != nil {
// TODO(rdelvalle): an leader election may cause the response to have
// failed when it should have succeeded. Retry everything for now until
// we figure out a more concrete fix.
return false, nil
if clientErr != nil {
return false, clientErr
}
// Successful call
@ -1157,20 +1145,9 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au
result = resp.GetResult_().GetMaintenanceStatusResult_()
}
// Prioritize returning a bad payload error over a client error as a bad payload error indicates
// a deeper issue
if payloadErr != nil {
returnErr = payloadErr
} else {
returnErr = clientErr
}
// Timed out on retries. *Note that when we fix the unexpected errors with a correct payload,
// this will can become either a timeout error or a payload error
if retryErr != nil {
return resp, result, errors.Wrap(returnErr, "Unable to recover connection")
return resp, result, errors.Wrap(clientErr, retryErr.Error()+": Unable to recover connection")
}
return resp, result, nil
}

View file

@ -21,6 +21,8 @@ import (
"testing"
"time"
"sync"
"github.com/paypal/gorealis"
"github.com/paypal/gorealis/gen-go/apache/aurora"
"github.com/stretchr/testify/assert"
@ -72,8 +74,8 @@ func TestRealisClient_ReestablishConn(t *testing.T) {
assert.NoError(t, err)
}
func TestGetCacerts(t *testing.T) {
certs, err := realis.Getcerts("./examples/certs")
func TestGetCACerts(t *testing.T) {
certs, err := realis.GetCerts("./examples/certs")
assert.NoError(t, err)
assert.Equal(t, len(certs.Subjects()), 2)
@ -167,6 +169,7 @@ func TestRealisClient_ScheduleCronJob_Thermos(t *testing.T) {
fmt.Printf("Deschedule cron call took %d ns\n", (end.UnixNano() - start.UnixNano()))
})
}
func TestRealisClient_DrainHosts(t *testing.T) {
hosts := []string{"192.168.33.7"}
_, _, err := r.DrainHosts(hosts...)
@ -214,3 +217,48 @@ func TestRealisClient_DrainHosts(t *testing.T) {
})
}
// Test multiple go routines using a single connection
func TestRealisClient_SessionThreadSafety(t *testing.T) {
// Create a single job
job := realis.NewJob().
Environment("prod").
Role("vagrant").
Name("create_thermos_job_test_multi").
ExecutorName(aurora.AURORA_EXECUTOR_NAME).
ExecutorData(string(thermosPayload)).
CPU(.25).
RAM(4).
Disk(10).
InstanceCount(100) // Impossible amount to go live in the current vagrant default settings
resp, err := r.CreateJob(job)
assert.NoError(t, err)
assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode)
wg := sync.WaitGroup{}
for i := 0; i < 20; i++ {
wg.Add(1)
// Launch multiple monitors that will poll every second
go func() {
defer wg.Done()
// Test Schedule status monitor for terminal state and timing out after 30 seconds
success, err := monitor.ScheduleStatus(job.JobKey(), job.GetInstanceCount(), aurora.LIVE_STATES, 1, 30)
assert.False(t, success)
assert.Error(t, err)
resp, err := r.KillJob(job.JobKey())
assert.NoError(t, err)
assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode)
}()
}
wg.Wait()
}

View file

@ -17,9 +17,9 @@ package response
import (
"bytes"
"errors"
"github.com/paypal/gorealis/gen-go/apache/aurora"
"github.com/pkg/errors"
)
// Get key from a response created by a StartJobUpdate call
@ -39,6 +39,7 @@ func JobUpdateSummaries(resp *aurora.Response) []*aurora.JobUpdateSummary {
return resp.GetResult_().GetGetJobUpdateSummariesResult_().GetUpdateSummaries()
}
// Deprecated: Replaced by checks done inside of thriftCallHelper
func ResponseCodeCheck(resp *aurora.Response) (*aurora.Response, error) {
if resp == nil {
return resp, errors.New("Response is nil")

View file

@ -20,22 +20,26 @@ import (
"errors"
"time"
"github.com/paypal/gorealis/gen-go/apache/aurora"
"math/rand"
)
const (
ConnRefusedErr = "connection refused"
NoLeaderFoundErr = "No leader found"
)
var RetryConnErr = errors.New("error occured during with aurora retrying")
// Jitter returns a time.Duration between duration and duration + maxFactor *
// duration.
//
// This allows clients to avoid converging on periodic behavior. If maxFactor
// is 0.0, a suggested default value will be chosen.
func Jitter(duration time.Duration, maxFactor float64) time.Duration {
if maxFactor <= 0.0 {
maxFactor = 1.0
}
wait := duration + time.Duration(rand.Float64()*maxFactor*float64(duration))
return wait
}
// ConditionFunc returns true if the condition is satisfied, or an error
// if the loop should be aborted.
type ConditionFunc func() (done bool, err error)
type AuroraThriftCall func() (resp *aurora.Response, err error)
// Modified version of the Kubernetes exponential-backoff code.
// ExponentialBackoff repeats a condition check with exponential backoff.
//
@ -76,20 +80,3 @@ func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error {
}
return NewTimeoutError(errors.New("Timed out while retrying"))
}
// CheckAndRetryConn function takes realis client and a trhift API function to call and returns response and error
// If Error from the APi call is Retry able . THe functions re establishes the connection with aurora by getting the latest aurora master from zookeeper.
// If Error is retyable return resp and RetryConnErr error.
func CheckAndRetryConn(r Realis, auroraCall AuroraThriftCall) (*aurora.Response, error) {
resp, cliErr := auroraCall()
// TODO: Return different error type based on the error that was returned by the API call
if cliErr != nil {
r.ReestablishConn()
return resp, NewPermamentError(RetryConnErr)
}
if resp != nil && resp.GetResponseCode() == aurora.ResponseCode_ERROR_TRANSIENT {
return resp, NewTemporaryError(errors.New("Aurora scheduler temporarily unavailable"))
}
return resp, nil
}