diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md index 87265a3..0445c51 100644 --- a/.github/PULL_REQUEST_TEMPLATE.md +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -1,5 +1,5 @@ ----------------------------------------- -## Please read instrucitons below ## +## Please read instructions below ## Before submitting, please make sure you run a vagrant box running Aurora with the latest version shown in .auroraversion and run go test from the project root. diff --git a/errors.go b/errors.go index 242176e..08600ee 100644 --- a/errors.go +++ b/errors.go @@ -1,18 +1,16 @@ -/* -Copyright 2014 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package realis diff --git a/realis.go b/realis.go index a1669c6..bcca881 100644 --- a/realis.go +++ b/realis.go @@ -21,12 +21,13 @@ import ( "encoding/base64" "fmt" "io/ioutil" - "math/rand" "net/http" "net/http/cookiejar" "path/filepath" "time" + "sync" + "git.apache.org/thrift.git/lib/go/thrift" "github.com/paypal/gorealis/gen-go/apache/aurora" "github.com/paypal/gorealis/response" @@ -56,6 +57,7 @@ type Realis interface { ScheduleCronJob(auroraJob Job) (*aurora.Response, error) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) StartCronJob(key *aurora.JobKey) (*aurora.Response, error) + // TODO: Remove this method and make it private to avoid race conditions ReestablishConn() error RealisConfig() *RealisConfig Close() @@ -72,6 +74,7 @@ type realisClient struct { readonlyClient *aurora.ReadOnlySchedulerClient adminClient *aurora.AuroraAdminClient logger Logger + lock sync.Mutex } type RealisConfig struct { @@ -285,19 +288,6 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { } -// Jitter returns a time.Duration between duration and duration + maxFactor * -// duration. -// -// This allows clients to avoid converging on periodic behavior. If maxFactor -// is 0.0, a suggested default value will be chosen. -func Jitter(duration time.Duration, maxFactor float64) time.Duration { - if maxFactor <= 0.0 { - maxFactor = 1.0 - } - wait := duration + time.Duration(rand.Float64()*maxFactor*float64(duration)) - return wait -} - func GetDefaultClusterFromZKUrl(zkurl string) *Cluster { return &Cluster{ Name: "defaultCluster", @@ -309,7 +299,7 @@ func GetDefaultClusterFromZKUrl(zkurl string) *Cluster { } } -func Getcerts(certpath string) (*x509.CertPool, error) { +func GetCerts(certpath string) (*x509.CertPool, error) { globalRootCAs := x509.NewCertPool() caFiles, err := ioutil.ReadDir(certpath) if err != nil { @@ -339,7 +329,7 @@ func defaultTTransport(urlstr string, timeoutms int, config *RealisConfig) (thri tlsConfig.InsecureSkipVerify = true } if config.certspath != "" { - rootCAs, err := Getcerts("examples/certs") + rootCAs, err := GetCerts("examples/certs") if err != nil { config.logger.Println("error occured couldn't fetch certs") return nil, err @@ -428,15 +418,51 @@ func basicAuth(username, password string) string { return base64.StdEncoding.EncodeToString([]byte(auth)) } +type auroraThriftCall func() (resp *aurora.Response, err error) + +// Takes a Thrift API function call and returns response and error. +// If Error from the API call is retryable, the functions re-establishes the connection with Aurora by +// using the same configuration used by the original client. Locks usage of and changes to client connection in order +// to make realis sessions thread safe. +func (r *realisClient) thriftCallHelper(auroraCall auroraThriftCall) (*aurora.Response, error) { + // Only allow one go-routine make use or modify the thrift client connection + r.lock.Lock() + defer r.lock.Unlock() + resp, cliErr := auroraCall() + + if cliErr != nil { + // Re-establish conn returns a temporary error or nil + // as we can always retry to connect to the scheduler. + retryConnErr := r.ReestablishConn() + + return resp, retryConnErr + } + + if resp == nil { + return nil, errors.New("Response is nil") + } + + if resp.GetResponseCode() == aurora.ResponseCode_ERROR_TRANSIENT { + return resp, NewTemporaryError(errors.New("Aurora scheduler temporarily unavailable")) + } + + if resp.GetResponseCode() != aurora.ResponseCode_OK { + return nil, errors.New(response.CombineMessage(resp)) + } + + return resp, nil +} + func (r *realisClient) ReestablishConn() error { // Close existing connection - r.logger.Println("ReestablishConn begin ....") + r.logger.Println("Re-establishing Connection to Aurora") r.Close() // Recreate connection from scratch using original options newRealis, err := NewRealisClient(r.config.options...) if err != nil { - return err + // This could be a temporary network hiccup + return NewTemporaryError(err) } // If we are able to successfully re-connect, make receiver @@ -471,25 +497,25 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche var resp *aurora.Response var clientErr error retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) { + + resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { return r.client.GetTasksWithoutConfigs(taskQ) }) - if clientErr != nil && clientErr.Error() == RetryConnErr.Error() { - return false, nil - } + + // Pass error directly to backoff which makes exceptions for temporary errors if clientErr != nil { return false, clientErr } + return true, nil }) - if clientErr != nil { + // If we encountered an error we couldn't recover from by retrying, return an error to the user + if retryErr != nil { return nil, errors.Wrap(clientErr, retryErr.Error()+": Error querying Aurora Scheduler for active IDs") } - resp, clientErr = response.ResponseCodeCheck(resp) - if clientErr != nil { - return nil, clientErr - } + + // Construct instance id map to stay in line with thrift's representation of sets tasks := response.ScheduleStatusResult(resp).GetTasks() jobInstanceIds := make(map[int32]bool) for _, task := range tasks { @@ -504,23 +530,22 @@ func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue var clientErr error retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) { + resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { return r.readonlyClient.GetJobUpdateSummaries(jobUpdateQuery) }) - if clientErr != nil && clientErr.Error() == RetryConnErr.Error() { - return false, nil - } + if clientErr != nil { return false, clientErr } + return true, nil }) - if clientErr != nil { + if retryErr != nil { return nil, errors.Wrap(clientErr, retryErr.Error()+": Error getting job update summaries from Aurora Scheduler") } - return response.ResponseCodeCheck(resp) + return resp, nil } // Kill specific instances of a job. @@ -535,22 +560,21 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a } retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) { + resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { return r.client.KillTasks(key, instanceIds, "") }) - if clientErr != nil && clientErr.Error() == RetryConnErr.Error() { - return false, nil - } + if clientErr != nil { return false, clientErr } + return true, nil }) - if clientErr != nil { + if retryErr != nil { return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Kill command to Aurora Scheduler") } - return response.ResponseCodeCheck(resp) + return resp, nil } func (r *realisClient) RealisConfig() *RealisConfig { @@ -570,21 +594,21 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { if len(instanceIds) > 0 { retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) { + resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { return r.client.KillTasks(key, instanceIds, "") }) - if clientErr != nil && clientErr.Error() == RetryConnErr.Error() { - return false, nil - } + if clientErr != nil { return false, clientErr } + return true, nil }) - if clientErr != nil { - return nil, errors.Wrap(err, retryErr.Error()+"Error sending Kill command to Aurora Scheduler") + + if retryErr != nil { + return nil, errors.Wrap(err, retryErr.Error()+": Error sending Kill command to Aurora Scheduler") } - return response.ResponseCodeCheck(resp) + return resp, nil } return nil, errors.New("No tasks in the Active state") } @@ -598,23 +622,21 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { var clientErr error retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) { + resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { return r.client.CreateJob(auroraJob.JobConfig()) }) - if clientErr != nil && clientErr.Error() == RetryConnErr.Error() { - return false, nil - } + if clientErr != nil { return false, clientErr } + return true, nil }) - if clientErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending Create command to Aurora Scheduler") + if retryErr != nil { + return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Create command to Aurora Scheduler") } - return response.ResponseCodeCheck(resp) - + return resp, nil } // This API uses an update thrift call to create the services giving a few more robust features. @@ -622,6 +644,7 @@ func (r *realisClient) CreateService(auroraJob Job, settings UpdateSettings) (*a // Create a new job update object and ship it to the StartJobUpdate api update := NewUpdateJob(auroraJob.TaskConfig(), &settings.settings) update.InstanceCount(auroraJob.GetInstanceCount()) + update.BatchSize(auroraJob.GetInstanceCount()) resp, err := r.StartJobUpdate(update, "") if err != nil { @@ -640,22 +663,21 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) var clientErr error retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) { + resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { return r.client.ScheduleCronJob(auroraJob.JobConfig()) }) - if clientErr != nil && clientErr.Error() == RetryConnErr.Error() { - return false, nil - } + if clientErr != nil { return false, clientErr } + return true, nil }) - if clientErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending Cron Job Schedule message to Aurora Scheduler") + if retryErr != nil { + return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Cron Job Schedule message to Aurora Scheduler") } - return response.ResponseCodeCheck(resp) + return resp, nil } func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) { @@ -664,22 +686,21 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, var clientErr error retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) { + resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { return r.client.DescheduleCronJob(key) }) - if clientErr != nil && clientErr.Error() == RetryConnErr.Error() { - return false, nil - } + if clientErr != nil { return false, clientErr } + return true, nil }) - if clientErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending Cron Job De-schedule message to Aurora Scheduler") + if retryErr != nil { + return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Cron Job De-schedule message to Aurora Scheduler") } - return response.ResponseCodeCheck(resp) + return resp, nil } @@ -688,22 +709,21 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error var clientErr error retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) { + resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { return r.client.StartCronJob(key) }) - if clientErr != nil && clientErr.Error() == RetryConnErr.Error() { - return false, nil - } + if clientErr != nil { return false, clientErr } + return true, nil }) - if clientErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending Start Cron Job message to Aurora Scheduler") + if retryErr != nil { + return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Start Cron Job message to Aurora Scheduler") } - return response.ResponseCodeCheck(resp) + return resp, nil } @@ -718,22 +738,21 @@ func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) var clientErr error retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) { + resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { return r.client.RestartShards(key, instanceIds) }) - if clientErr != nil && clientErr.Error() == RetryConnErr.Error() { - return false, nil - } + if clientErr != nil { return false, clientErr } + return true, nil }) - if clientErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending Restart command to Aurora Scheduler") + if retryErr != nil { + return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Restart command to Aurora Scheduler") } - return response.ResponseCodeCheck(resp) + return resp, nil } // Restarts all active tasks under a job configuration. @@ -747,22 +766,22 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) var clientErr error if len(instanceIds) > 0 { retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) { + resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { return r.client.RestartShards(key, instanceIds) }) - if clientErr != nil && clientErr.Error() == RetryConnErr.Error() { - return false, nil - } + if clientErr != nil { return false, clientErr } + return true, nil }) - if clientErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending Restart command to Aurora Scheduler") - } - return response.ResponseCodeCheck(resp) + if retryErr != nil { + return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Restart command to Aurora Scheduler") + } + + return resp, nil } else { return nil, errors.New("No tasks in the Active state") } @@ -775,22 +794,21 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au var clientErr error retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) { + resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { return r.client.StartJobUpdate(updateJob.req, message) }) - if clientErr != nil && clientErr.Error() == RetryConnErr.Error() { - return false, nil - } + if clientErr != nil { return false, clientErr } + return true, nil }) - if clientErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending StartJobUpdate command to Aurora Scheduler") + if retryErr != nil { + return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending StartJobUpdate command to Aurora Scheduler") } - return response.ResponseCodeCheck(resp) + return resp, nil } // Abort Job Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI. @@ -801,22 +819,21 @@ func (r *realisClient) AbortJobUpdate( var clientErr error retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) { + resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { return r.client.AbortJobUpdate(&updateKey, message) }) - if clientErr != nil && clientErr.Error() == RetryConnErr.Error() { - return false, nil - } + if clientErr != nil { return false, clientErr } + return true, nil }) - if clientErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending AbortJobUpdate command to Aurora Scheduler") + if retryErr != nil { + return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending AbortJobUpdate command to Aurora Scheduler") } - return response.ResponseCodeCheck(resp) + return resp, nil } // Scale up the number of instances under a job configuration using the configuration for specific @@ -827,22 +844,21 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a var clientErr error retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) { + resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { return r.client.AddInstances(&instKey, count) }) - if clientErr != nil && clientErr.Error() == RetryConnErr.Error() { - return false, nil - } + if clientErr != nil { return false, clientErr } + return true, nil }) - if clientErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending AddInstances command to Aurora Scheduler") + if retryErr != nil { + return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending AddInstances command to Aurora Scheduler") } - return response.ResponseCodeCheck(resp) + return resp, nil } @@ -875,24 +891,19 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.S var clientErr error retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) { + resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { return r.client.GetTasksStatus(query) }) - if clientErr != nil && clientErr.Error() == RetryConnErr.Error() { - return false, nil - } + if clientErr != nil { return false, clientErr } + return true, nil }) - if clientErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+"Error querying Aurora Scheduler for task status") - } - //Check for response code.. - if resp.GetResponseCode() != aurora.ResponseCode_OK { - return nil, errors.New(resp.ResponseCode.String() + "--" + response.CombineMessage(resp)) + if retryErr != nil { + return nil, errors.Wrap(clientErr, retryErr.Error()+": Error querying Aurora Scheduler for task status") } return response.ScheduleStatusResult(resp).GetTasks(), nil @@ -904,24 +915,19 @@ func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks [] var clientErr error retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) { + resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { return r.client.GetTasksWithoutConfigs(query) }) - if clientErr != nil && clientErr.Error() == RetryConnErr.Error() { - return false, nil - } + if clientErr != nil { return false, clientErr } + return true, nil }) - if clientErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+"Error querying Aurora Scheduler for task status without configs") - } - //Check for response code.. - if resp.GetResponseCode() != aurora.ResponseCode_OK { - return nil, errors.New(resp.ResponseCode.String() + "--" + response.CombineMessage(resp)) + if retryErr != nil { + return nil, errors.Wrap(clientErr, retryErr.Error()+": Error querying Aurora Scheduler for task status without configs") } return response.ScheduleStatusResult(resp).GetTasks(), nil @@ -946,24 +952,19 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task var clientErr error retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) { + resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { return r.client.GetTasksStatus(taskQ) }) - if clientErr != nil && clientErr.Error() == RetryConnErr.Error() { - return false, nil - } + if clientErr != nil { return false, clientErr } + return true, nil }) - if clientErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+"Error querying Aurora Scheduler for task configuration") - } - //Check for response code.. - if resp.GetResponseCode() != aurora.ResponseCode_OK { - return nil, errors.New(resp.ResponseCode.String() + "--" + response.CombineMessage(resp)) + if retryErr != nil { + return nil, errors.Wrap(clientErr, retryErr.Error()+": Error querying Aurora Scheduler for task configuration") } tasks := response.ScheduleStatusResult(resp).GetTasks() @@ -986,22 +987,21 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur var clientErr error retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) { + resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { return r.client.GetJobUpdateDetails(&updateQuery) }) - if clientErr != nil && clientErr.Error() == RetryConnErr.Error() { - return false, nil - } + if clientErr != nil { return false, clientErr } + return true, nil }) - if clientErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+"Unable to get job update details") + if retryErr != nil { + return nil, errors.Wrap(clientErr, retryErr.Error()+": Unable to get job update details") } - return response.ResponseCodeCheck(resp) + return resp, nil } @@ -1010,22 +1010,21 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string var clientErr error retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) { + resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { return r.client.RollbackJobUpdate(&key, message) }) - if clientErr != nil && clientErr.Error() == RetryConnErr.Error() { - return false, nil - } + if clientErr != nil { return false, clientErr } + return true, nil }) - if clientErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+"Unable to roll back job update") + if retryErr != nil { + return nil, errors.Wrap(clientErr, retryErr.Error()+": Unable to roll back job update") } - return response.ResponseCodeCheck(resp) + return resp, nil } // Set a list of nodes to DRAINING. This means nothing will be able to be scheduled on them and any existing @@ -1048,26 +1047,25 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr } retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) { + resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { return r.adminClient.DrainHosts(drainList) }) - if clientErr != nil && clientErr.Error() == RetryConnErr.Error() { - return false, nil - } + if clientErr != nil { return false, clientErr } + return true, nil }) - if clientErr != nil { - return nil, nil, errors.Wrap(clientErr, retryErr.Error()+"Unable to recover connection") - } - if resp != nil && resp.GetResult_() != nil { result = resp.GetResult_().GetDrainHostsResult_() } + if retryErr != nil { + return resp, result, errors.Wrap(clientErr, retryErr.Error()+": Unable to recover connection") + } + return resp, result, nil } @@ -1088,26 +1086,25 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror } retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { - resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) { + resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { return r.adminClient.EndMaintenance(hostList) }) - if clientErr != nil && clientErr.Error() == RetryConnErr.Error() { - return false, nil - } + if clientErr != nil { return false, clientErr } + return true, nil }) - if clientErr != nil { - return nil, nil, errors.Wrap(clientErr, retryErr.Error()+"Unable to recover connection") - } - if resp != nil && resp.GetResult_() != nil { result = resp.GetResult_().GetEndMaintenanceResult_() } + if retryErr != nil { + return resp, result, errors.Wrap(clientErr, retryErr.Error()+": Unable to recover connection") + } + return resp, result, nil } @@ -1115,7 +1112,7 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au var resp *aurora.Response var result *aurora.MaintenanceStatusResult_ - var returnErr, clientErr, payloadErr error + var clientErr error if len(hosts) == 0 { return nil, nil, errors.New("no hosts provided to get maintenance status from") @@ -1129,23 +1126,14 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au retryErr := ExponentialBackoff(defaultBackoff, func() (bool, error) { - // Send thrift call, if we have a thrift send error, attempt to reconnect - // and continue trying to resend command - if resp, clientErr = r.adminClient.MaintenanceStatus(hostList); clientErr != nil { - // Experienced an connection error - err1 := r.ReestablishConn() - if err1 != nil { - r.logger.Println("error in re-establishing connection: ", err1) - } - return false, nil - } + // Make thrift call. If we encounter an error sending the call, attempt to reconnect + // and continue trying to resend command until we run out of retries. + resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { + return r.adminClient.MaintenanceStatus(hostList) + }) - // If error is NOT due to connection - if _, payloadErr = response.ResponseCodeCheck(resp); payloadErr != nil { - // TODO(rdelvalle): an leader election may cause the response to have - // failed when it should have succeeded. Retry everything for now until - // we figure out a more concrete fix. - return false, nil + if clientErr != nil { + return false, clientErr } // Successful call @@ -1157,20 +1145,9 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au result = resp.GetResult_().GetMaintenanceStatusResult_() } - // Prioritize returning a bad payload error over a client error as a bad payload error indicates - // a deeper issue - if payloadErr != nil { - returnErr = payloadErr - } else { - returnErr = clientErr - } - - // Timed out on retries. *Note that when we fix the unexpected errors with a correct payload, - // this will can become either a timeout error or a payload error if retryErr != nil { - return resp, result, errors.Wrap(returnErr, "Unable to recover connection") + return resp, result, errors.Wrap(clientErr, retryErr.Error()+": Unable to recover connection") } return resp, result, nil - } diff --git a/realis_e2e_test.go b/realis_e2e_test.go index 64f480f..d278ffc 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -21,6 +21,8 @@ import ( "testing" "time" + "sync" + "github.com/paypal/gorealis" "github.com/paypal/gorealis/gen-go/apache/aurora" "github.com/stretchr/testify/assert" @@ -72,8 +74,8 @@ func TestRealisClient_ReestablishConn(t *testing.T) { assert.NoError(t, err) } -func TestGetCacerts(t *testing.T) { - certs, err := realis.Getcerts("./examples/certs") +func TestGetCACerts(t *testing.T) { + certs, err := realis.GetCerts("./examples/certs") assert.NoError(t, err) assert.Equal(t, len(certs.Subjects()), 2) @@ -167,6 +169,7 @@ func TestRealisClient_ScheduleCronJob_Thermos(t *testing.T) { fmt.Printf("Deschedule cron call took %d ns\n", (end.UnixNano() - start.UnixNano())) }) } + func TestRealisClient_DrainHosts(t *testing.T) { hosts := []string{"192.168.33.7"} _, _, err := r.DrainHosts(hosts...) @@ -214,3 +217,48 @@ func TestRealisClient_DrainHosts(t *testing.T) { }) } + +// Test multiple go routines using a single connection +func TestRealisClient_SessionThreadSafety(t *testing.T) { + + // Create a single job + job := realis.NewJob(). + Environment("prod"). + Role("vagrant"). + Name("create_thermos_job_test_multi"). + ExecutorName(aurora.AURORA_EXECUTOR_NAME). + ExecutorData(string(thermosPayload)). + CPU(.25). + RAM(4). + Disk(10). + InstanceCount(100) // Impossible amount to go live in the current vagrant default settings + + resp, err := r.CreateJob(job) + assert.NoError(t, err) + + assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode) + + wg := sync.WaitGroup{} + for i := 0; i < 20; i++ { + + wg.Add(1) + + // Launch multiple monitors that will poll every second + go func() { + defer wg.Done() + + // Test Schedule status monitor for terminal state and timing out after 30 seconds + success, err := monitor.ScheduleStatus(job.JobKey(), job.GetInstanceCount(), aurora.LIVE_STATES, 1, 30) + assert.False(t, success) + assert.Error(t, err) + + resp, err := r.KillJob(job.JobKey()) + assert.NoError(t, err) + + assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode) + + }() + } + + wg.Wait() +} diff --git a/response/response.go b/response/response.go index 1cc0167..b77348d 100644 --- a/response/response.go +++ b/response/response.go @@ -17,9 +17,9 @@ package response import ( "bytes" + "errors" "github.com/paypal/gorealis/gen-go/apache/aurora" - "github.com/pkg/errors" ) // Get key from a response created by a StartJobUpdate call @@ -39,6 +39,7 @@ func JobUpdateSummaries(resp *aurora.Response) []*aurora.JobUpdateSummary { return resp.GetResult_().GetGetJobUpdateSummariesResult_().GetUpdateSummaries() } +// Deprecated: Replaced by checks done inside of thriftCallHelper func ResponseCodeCheck(resp *aurora.Response) (*aurora.Response, error) { if resp == nil { return resp, errors.New("Response is nil") diff --git a/retry.go b/retry.go index 72c1cd1..dd7b113 100644 --- a/retry.go +++ b/retry.go @@ -20,22 +20,26 @@ import ( "errors" "time" - "github.com/paypal/gorealis/gen-go/apache/aurora" + "math/rand" ) -const ( - ConnRefusedErr = "connection refused" - NoLeaderFoundErr = "No leader found" -) - -var RetryConnErr = errors.New("error occured during with aurora retrying") +// Jitter returns a time.Duration between duration and duration + maxFactor * +// duration. +// +// This allows clients to avoid converging on periodic behavior. If maxFactor +// is 0.0, a suggested default value will be chosen. +func Jitter(duration time.Duration, maxFactor float64) time.Duration { + if maxFactor <= 0.0 { + maxFactor = 1.0 + } + wait := duration + time.Duration(rand.Float64()*maxFactor*float64(duration)) + return wait +} // ConditionFunc returns true if the condition is satisfied, or an error // if the loop should be aborted. type ConditionFunc func() (done bool, err error) -type AuroraThriftCall func() (resp *aurora.Response, err error) - // Modified version of the Kubernetes exponential-backoff code. // ExponentialBackoff repeats a condition check with exponential backoff. // @@ -76,20 +80,3 @@ func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error { } return NewTimeoutError(errors.New("Timed out while retrying")) } - -// CheckAndRetryConn function takes realis client and a trhift API function to call and returns response and error -// If Error from the APi call is Retry able . THe functions re establishes the connection with aurora by getting the latest aurora master from zookeeper. -// If Error is retyable return resp and RetryConnErr error. -func CheckAndRetryConn(r Realis, auroraCall AuroraThriftCall) (*aurora.Response, error) { - resp, cliErr := auroraCall() - - // TODO: Return different error type based on the error that was returned by the API call - if cliErr != nil { - r.ReestablishConn() - return resp, NewPermamentError(RetryConnErr) - } - if resp != nil && resp.GetResponseCode() == aurora.ResponseCode_ERROR_TRANSIENT { - return resp, NewTemporaryError(errors.New("Aurora scheduler temporarily unavailable")) - } - return resp, nil -}