From e7f9c0cba96a7207d48eea61a351f5ed21c33c71 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A1n=20Del=20Valle?= Date: Thu, 25 Feb 2021 17:58:21 -0800 Subject: [PATCH 01/15] Bumping up version to 1.23.1 --- CHANGELOG.md | 2 ++ realis.go | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 97f5c82..a1ee217 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +1.23.1 (unreleased) + 1.23.0 * First release tested against Aurora Scheduler 0.23.0 diff --git a/realis.go b/realis.go index 32b36b1..ce1fc99 100644 --- a/realis.go +++ b/realis.go @@ -35,7 +35,7 @@ import ( "github.com/paypal/gorealis/response" ) -const version = "1.23.0" +const version = "1.23.1" // Realis is an interface that defines the various APIs that may be used to communicate with // the Apache Aurora scheduler. From a9d99067ee689d9833fd62e92f8a66aaa64c2da6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A1n=20I=2E=20Del=20Valle?= Date: Thu, 29 Apr 2021 10:48:43 -0700 Subject: [PATCH 02/15] Documentation fix (#130) Fixes documentation so that it is more compliant with godoc format. --- monitors.go | 4 ++-- realis.go | 64 ++++++++++++++++++++++++++++------------------------- retry.go | 2 +- 3 files changed, 37 insertions(+), 33 deletions(-) diff --git a/monitors.go b/monitors.go index d3de726..0d9423a 100644 --- a/monitors.go +++ b/monitors.go @@ -117,7 +117,7 @@ func (m *Monitor) JobUpdateQuery( } } -// AutoPaused monitor is a special monitor for auto pause enabled batch updates. This monitor ensures that the update +// AutoPausedUpdateMonitor is a special monitor for auto pause enabled batch updates. This monitor ensures that the update // being monitored is capable of auto pausing and has auto pausing enabled. After verifying this information, // the monitor watches for the job to enter the ROLL_FORWARD_PAUSED state and calculates the current batch // the update is in using information from the update configuration. @@ -183,7 +183,7 @@ func (m *Monitor) AutoPausedUpdateMonitor(key aurora.JobUpdateKey, interval, tim return calculateCurrentBatch(int32(len(updatingInstances)), batchSizes), nil } -// Monitor a Job until all instances enter one of the LIVE_STATES +// Instances will monitor a Job until all instances enter one of the LIVE_STATES func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval, timeout int) (bool, error) { return m.ScheduleStatus(key, instances, LiveStates, interval, timeout) } diff --git a/realis.go b/realis.go index ce1fc99..262f22d 100644 --- a/realis.go +++ b/realis.go @@ -386,7 +386,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { return nil, errors.New("incomplete Options -- url, cluster.json, or Zookeeper address required") } - config.logger.Println("Addresss obtained: ", url) + config.logger.Println("Address obtained: ", url) url, err = validateAuroraURL(url) if err != nil { return nil, errors.Wrap(err, "invalid Aurora url") @@ -428,7 +428,8 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory), logger: LevelLogger{logger: config.logger, debug: config.debug, trace: config.trace}, lock: &sync.Mutex{}, - transport: config.transport}, nil + transport: config.transport, + }, nil } // GetDefaultClusterFromZKUrl creates a cluster object from a Zoookeper url. This is deprecated in favor of using @@ -485,11 +486,11 @@ func defaultTTransport(url string, timeoutMs int, config *config) (thrift.TTrans }) if err != nil { - return nil, errors.Wrap(err, "Error creating transport") + return nil, errors.Wrap(err, "error creating transport") } if err := trans.Open(); err != nil { - return nil, errors.Wrapf(err, "Error opening connection to %s", url) + return nil, errors.Wrapf(err, "error opening connection to %s", url) } return trans, nil @@ -532,16 +533,17 @@ func (r *realisClient) ReestablishConn() error { return nil } -// Releases resources associated with the realis client. +// Close releases resources associated with the realis client. func (r *realisClient) Close() { r.lock.Lock() defer r.lock.Unlock() - r.transport.Close() + // The return value of Close here is ignored on purpose because there's nothing that can be done if it fails. + _ = r.transport.Close() } -// Uses predefined set of states to retrieve a set of active jobs in Apache Aurora. +// GetInstanceIds uses a predefined set of states to retrieve a set of active jobs in the Aurora Scheduler. func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states []aurora.ScheduleStatus) ([]int32, error) { taskQ := &aurora.TaskQuery{ JobKeys: []*aurora.JobKey{{Environment: key.Environment, Role: key.Role, Name: key.Name}}, @@ -609,7 +611,7 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe return resp, result, nil } -// Kill specific instances of a job. +// KillInstances kills specific instances of a job. func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) { r.logger.debugPrintf("KillTasks Thrift Payload: %+v %v\n", key, instances) @@ -629,7 +631,7 @@ func (r *realisClient) RealisConfig() *config { return r.config } -// Sends a kill message to the scheduler for all active tasks under a job. +// KillJob kills all instances of a job. func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { r.logger.debugPrintf("KillTasks Thrift Payload: %+v\n", key) @@ -647,7 +649,7 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { return resp, nil } -// Sends a create job message to the scheduler with a specific job configuration. +// CreateJob sends a create job message to the scheduler with a specific job configuration. // Although this API is able to create service jobs, it is better to use CreateService instead // as that API uses the update thrift call which has a few extra features available. // Use this API to create ad-hoc jobs. @@ -667,7 +669,7 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { return resp, nil } -// This API uses an update thrift call to create the services giving a few more robust features. +// CreateService uses the scheduler's updating mechanism to create a job. func (r *realisClient) CreateService( auroraJob Job, settings *aurora.JobUpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error) { @@ -741,7 +743,7 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error } -// Restarts specific instances specified +// RestartInstances restarts the specified instances of a Job. func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) { r.logger.debugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instances) @@ -757,12 +759,12 @@ func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) return resp, nil } -// Restarts all active tasks under a job configuration. +// RestartJob restarts all active instances of a Job. func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) { instanceIds, err1 := r.GetInstanceIds(key, aurora.ACTIVE_STATES) if err1 != nil { - return nil, errors.Wrap(err1, "Could not retrieve relevant task instance IDs") + return nil, errors.Wrap(err1, "could not retrieve relevant task instance IDs") } r.logger.debugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instanceIds) @@ -784,7 +786,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) return nil, errors.New("No tasks in the Active state") } -// Update all tasks under a job configuration. Currently gorealis doesn't support for canary deployments. +// StartJobUpdate updates all instances under a job configuration. func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) { r.logger.debugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message) @@ -806,7 +808,8 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au return resp, nil } -// Abort Job Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI. +// AbortJobUpdate terminates a job update in the scheduler. +// It requires the updateId which can be obtained on the Aurora web UI. // This API is meant to be synchronous. It will attempt to wait until the update transitions to the aborted state. // However, if the job update does not transition to the ABORT state an error will be returned. func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) { @@ -834,7 +837,8 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str return resp, err } -// Pause Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. +// PauseJobUpdate pauses the progress of an ongoing update. +// The UpdateID value needed for this function is returned from StartJobUpdate or can be obtained from the Aurora web UI. func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) { r.logger.debugPrintf("PauseJobUpdate Thrift Payload: %+v %v\n", updateKey, message) @@ -852,7 +856,7 @@ func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st return resp, nil } -// Resume Paused Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. +// ResumeJobUpdate resumes a previously Paused Job update. func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) { r.logger.debugPrintf("ResumeJobUpdate Thrift Payload: %+v %v\n", updateKey, message) @@ -870,7 +874,7 @@ func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s return resp, nil } -// Pulse Job Update on Aurora. UpdateID is returned from StartJobUpdate or the Aurora web UI. +// PulseJobUpdate sends a pulse to an ongoing Job update. func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.Response, error) { r.logger.debugPrintf("PulseJobUpdate Thrift Payload: %+v\n", updateKey) @@ -888,8 +892,7 @@ func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R return resp, nil } -// Scale up the number of instances under a job configuration using the configuration for specific -// instance to scale up. +// AddInstances scales up the number of instances for a Job. func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) { r.logger.debugPrintf("AddInstances Thrift Payload: %+v %v\n", instKey, count) @@ -907,15 +910,15 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a } -// Scale down the number of instances under a job configuration using the configuration of a specific instance +// RemoveInstances scales down the number of instances for a Job. func (r *realisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora.Response, error) { instanceIds, err := r.GetInstanceIds(key, aurora.ACTIVE_STATES) if err != nil { - return nil, errors.Wrap(err, "RemoveInstances: Could not retrieve relevant instance IDs") + return nil, errors.Wrap(err, "could not retrieve relevant instance IDs") } if len(instanceIds) < int(count) { - return nil, errors.Errorf("Insufficient active instances available for killing: "+ + return nil, errors.Errorf("insufficient active instances available for killing: "+ " Instances to be killed %d Active instances %d", count, len(instanceIds)) } @@ -928,7 +931,7 @@ func (r *realisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora return r.KillInstances(key, instanceIds[:count]...) } -// Get information about task including a fully hydrated task configuration object +// GetTaskStatus gets information about task including a fully hydrated task configuration object. func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) { r.logger.debugPrintf("GetTasksStatus Thrift Payload: %+v\n", query) @@ -946,7 +949,7 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.Schedul return response.ScheduleStatusResult(resp).GetTasks(), nil } -// Get pending reason +// GetPendingReason returns the reason why the an instance of a Job has not been scheduled. func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.PendingReason, error) { r.logger.debugPrintf("GetPendingReason Thrift Payload: %+v\n", query) @@ -970,7 +973,8 @@ func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.Pend return pendingReasons, nil } -// Get information about task including without a task configuration object +// GetTasksWithoutConfigs gets information about task including without a task configuration object. +// This is a more lightweight version of GetTaskStatus but contains less information as a result. func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) { r.logger.debugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query) @@ -989,7 +993,7 @@ func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*auror } -// Get the task configuration from the aurora scheduler for a job +// FetchTaskConfig gets the task configuration from the aurora scheduler for a job. func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error) { taskQ := &aurora.TaskQuery{ Role: &instKey.JobKey.Role, @@ -1014,7 +1018,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task tasks := response.ScheduleStatusResult(resp).GetTasks() if len(tasks) == 0 { - return nil, errors.Errorf("Instance %d for jobkey %s/%s/%s doesn't exist", + return nil, errors.Errorf("instance %d for jobkey %s/%s/%s doesn't exist", instKey.InstanceId, instKey.JobKey.Environment, instKey.JobKey.Role, @@ -1036,7 +1040,7 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "Unable to get job update details") + return nil, errors.Wrap(retryErr, "unable to get job update details") } return resp, nil diff --git a/retry.go b/retry.go index dff5658..0491194 100644 --- a/retry.go +++ b/retry.go @@ -179,7 +179,7 @@ func (r *realisClient) thriftCallWithRetries( return nil, errors.Wrap(clientErr, "permanent connection error") } - // Corner case where thrift payload was received by Aurora but connection timedout before Aurora was + // Corner case where thrift payload was received by Aurora but connection timed out before Aurora was // able to reply. In this case we will return whatever response was received and a TimedOut behaving // error. Users can take special action on a timeout by using IsTimedout and reacting accordingly. if e.Timeout() { From 82b40a53f01a84471a66c20bfe1b8c309407a5a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A1n=20I=2E=20Del=20Valle?= Date: Tue, 11 May 2021 13:37:23 -0700 Subject: [PATCH 03/15] Add verification to retry mechanism (#131) CreateJob, CreateService, and StartJobUpdate now include a rudimentary verification function to check if the call made it to the Aurora Scheduler when the client experiences a timeout. --- go.mod | 2 +- helpers.go | 21 ++++++ realis.go | 158 ++++++++++++++++++++++++++++++++--------- realis_admin.go | 44 +++++++++--- response/response.go | 4 ++ retry.go | 163 +++++++++++++++++++++++++++---------------- util.go | 2 +- 7 files changed, 286 insertions(+), 108 deletions(-) create mode 100644 helpers.go diff --git a/go.mod b/go.mod index 506556b..9497185 100644 --- a/go.mod +++ b/go.mod @@ -8,5 +8,5 @@ require ( github.com/pkg/errors v0.9.1 github.com/pmezard/go-difflib v1.0.0 // indirect github.com/samuel/go-zookeeper v0.0.0-20171117190445-471cd4e61d7a - github.com/stretchr/testify v1.2.0 + github.com/stretchr/testify v1.7.0 ) diff --git a/helpers.go b/helpers.go new file mode 100644 index 0000000..56821c1 --- /dev/null +++ b/helpers.go @@ -0,0 +1,21 @@ +package realis + +import ( + "context" + + "github.com/paypal/gorealis/gen-go/apache/aurora" +) + +func (r *realisClient) jobExists(key aurora.JobKey) (bool, error) { + resp, err := r.client.GetConfigSummary(context.TODO(), &key) + if err != nil { + return false, err + } + + return resp == nil || + resp.GetResult_() == nil || + resp.GetResult_().GetConfigSummaryResult_() == nil || + resp.GetResult_().GetConfigSummaryResult_().GetSummary() == nil || + resp.GetResponseCode() != aurora.ResponseCode_OK, + nil +} diff --git a/realis.go b/realis.go index 262f22d..772e9ab 100644 --- a/realis.go +++ b/realis.go @@ -65,7 +65,6 @@ type Realis interface { RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) - PauseJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error) ResumeJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error) PulseJobUpdate(key *aurora.JobUpdateKey) (*aurora.Response, error) @@ -556,7 +555,9 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states []aurora.Schedu false, func() (*aurora.Response, error) { return r.client.GetTasksWithoutConfigs(context.TODO(), taskQ) - }) + }, + nil, + ) // If we encountered an error we couldn't recover from by retrying, return an error to the user if retryErr != nil { @@ -581,10 +582,16 @@ func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue false, func() (*aurora.Response, error) { return r.readonlyClient.GetJobUpdateSummaries(context.TODO(), jobUpdateQuery) - }) + }, + nil, + ) if retryErr != nil { - return nil, errors.Wrap(retryErr, "error getting job update summaries from Aurora Scheduler") + return resp, errors.Wrap(retryErr, "error getting job update summaries from Aurora Scheduler") + } + + if resp.GetResult_() == nil || resp.GetResult_().GetGetJobUpdateSummariesResult_() == nil { + return nil, errors.New("unexpected response from scheduler") } return resp, nil @@ -598,7 +605,9 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe false, func() (*aurora.Response, error) { return r.readonlyClient.GetJobs(context.TODO(), role) - }) + }, + nil, + ) if retryErr != nil { return nil, result, errors.Wrap(retryErr, "error getting Jobs from Aurora Scheduler") @@ -619,7 +628,9 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a false, func() (*aurora.Response, error) { return r.client.KillTasks(context.TODO(), key, instances, "") - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler") @@ -641,7 +652,9 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { func() (*aurora.Response, error) { // Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards return r.client.KillTasks(context.TODO(), key, nil, "") - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler") @@ -657,15 +670,32 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { r.logger.debugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig()) + // Response is checked by the thrift retry code resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { return r.client.CreateJob(context.TODO(), auroraJob.JobConfig()) - }) + }, + // On a client timeout, attempt to verify that payload made to the Scheduler by + // trying to get the config summary for the job key + func() (*aurora.Response, bool) { + exists, err := r.jobExists(*auroraJob.JobKey()) + if err != nil { + r.logger.Print("verification failed ", err) + } + + if exists { + return &aurora.Response{ResponseCode: aurora.ResponseCode_OK}, true + } + + return nil, false + }, + ) if retryErr != nil { return resp, errors.Wrap(retryErr, "error sending Create command to Aurora Scheduler") } + return resp, nil } @@ -680,17 +710,12 @@ func (r *realisClient) CreateService( resp, err := r.StartJobUpdate(update, "") if err != nil { if IsTimeout(err) { - return resp, nil, err + return nil, nil, err } - return resp, nil, errors.Wrap(err, "unable to create service") } - if resp.GetResult_() != nil { - return resp, resp.GetResult_().GetStartJobUpdateResult_(), nil - } - - return resp, nil, errors.New("results object is nil") + return resp, resp.GetResult_().StartJobUpdateResult_, nil } func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) { @@ -700,7 +725,9 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) false, func() (*aurora.Response, error) { return r.client.ScheduleCronJob(context.TODO(), auroraJob.JobConfig()) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending Cron Job Schedule message to Aurora Scheduler") @@ -716,7 +743,9 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, false, func() (*aurora.Response, error) { return r.client.DescheduleCronJob(context.TODO(), key) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending Cron Job De-schedule message to Aurora Scheduler") @@ -734,7 +763,9 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error false, func() (*aurora.Response, error) { return r.client.StartCronJob(context.TODO(), key) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending Start Cron Job message to Aurora Scheduler") @@ -751,7 +782,9 @@ func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) false, func() (*aurora.Response, error) { return r.client.RestartShards(context.TODO(), key, instances) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler") @@ -774,7 +807,9 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) false, func() (*aurora.Response, error) { return r.client.RestartShards(context.TODO(), key, instanceIds) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler") @@ -795,16 +830,51 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au true, func() (*aurora.Response, error) { return r.client.StartJobUpdate(context.TODO(), updateJob.req, message) - }) + }, + func() (*aurora.Response, bool) { + summariesResp, err := r.readonlyClient.GetJobUpdateSummaries( + context.TODO(), + &aurora.JobUpdateQuery{ + JobKey: updateJob.JobKey(), + UpdateStatuses: aurora.ACTIVE_JOB_UPDATE_STATES, + Limit: 1, + }) + + if err != nil { + r.logger.Print("verification failed ", err) + return nil, false + } + + summaries := response.JobUpdateSummaries(summariesResp) + if len(summaries) == 0 { + return nil, false + } + + return &aurora.Response{ + ResponseCode: aurora.ResponseCode_OK, + Result_: &aurora.Result_{ + StartJobUpdateResult_: &aurora.StartJobUpdateResult_{ + UpdateSummary: summaries[0], + Key: summaries[0].Key, + }, + }, + }, true + }, + ) if retryErr != nil { // A timeout took place when attempting this call, attempt to recover if IsTimeout(retryErr) { - return resp, retryErr + return nil, retryErr } return resp, errors.Wrap(retryErr, "error sending StartJobUpdate command to Aurora Scheduler") } + + if resp.GetResult_() == nil { + return resp, errors.New("no result in response") + } + return resp, nil } @@ -820,7 +890,9 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str false, func() (*aurora.Response, error) { return r.client.AbortJobUpdate(context.TODO(), &updateKey, message) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending AbortJobUpdate command to Aurora Scheduler") @@ -847,7 +919,9 @@ func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st false, func() (*aurora.Response, error) { return r.client.PauseJobUpdate(context.TODO(), updateKey, message) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending PauseJobUpdate command to Aurora Scheduler") @@ -865,7 +939,9 @@ func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s false, func() (*aurora.Response, error) { return r.client.ResumeJobUpdate(context.TODO(), updateKey, message) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending ResumeJobUpdate command to Aurora Scheduler") @@ -883,7 +959,9 @@ func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R false, func() (*aurora.Response, error) { return r.client.PulseJobUpdate(context.TODO(), updateKey) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending PulseJobUpdate command to Aurora Scheduler") @@ -901,7 +979,9 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a false, func() (*aurora.Response, error) { return r.client.AddInstances(context.TODO(), &instKey, count) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error sending AddInstances command to Aurora Scheduler") @@ -940,7 +1020,9 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.Schedul false, func() (*aurora.Response, error) { return r.client.GetTasksStatus(context.TODO(), query) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status") @@ -958,7 +1040,9 @@ func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.Pend false, func() (*aurora.Response, error) { return r.client.GetPendingReason(context.TODO(), query) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for pending Reasons") @@ -983,7 +1067,9 @@ func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*auror false, func() (*aurora.Response, error) { return r.client.GetTasksWithoutConfigs(context.TODO(), query) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status without configs") @@ -1009,7 +1095,9 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task false, func() (*aurora.Response, error) { return r.client.GetTasksStatus(context.TODO(), taskQ) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task configuration") @@ -1037,7 +1125,9 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur false, func() (*aurora.Response, error) { return r.client.GetJobUpdateDetails(context.TODO(), &updateQuery) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "unable to get job update details") @@ -1054,7 +1144,9 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string false, func() (*aurora.Response, error) { return r.client.RollbackJobUpdate(context.TODO(), &key, message) - }) + }, + nil, + ) if retryErr != nil { return nil, errors.Wrap(retryErr, "unable to roll back job update") diff --git a/realis_admin.go b/realis_admin.go index 184ae55..cec92af 100644 --- a/realis_admin.go +++ b/realis_admin.go @@ -30,7 +30,9 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr false, func() (*aurora.Response, error) { return r.adminClient.DrainHosts(context.TODO(), drainList) - }) + }, + nil, + ) if retryErr != nil { return resp, result, errors.Wrap(retryErr, "Unable to recover connection") @@ -65,7 +67,9 @@ func (r *realisClient) SLADrainHosts( false, func() (*aurora.Response, error) { return r.adminClient.SlaDrainHosts(context.TODO(), drainList, policy, timeout) - }) + }, + nil, + ) if retryErr != nil { return result, errors.Wrap(retryErr, "Unable to recover connection") @@ -95,7 +99,9 @@ func (r *realisClient) StartMaintenance(hosts ...string) (*aurora.Response, *aur false, func() (*aurora.Response, error) { return r.adminClient.StartMaintenance(context.TODO(), hostList) - }) + }, + nil, + ) if retryErr != nil { return resp, result, errors.Wrap(retryErr, "Unable to recover connection") @@ -125,7 +131,9 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror false, func() (*aurora.Response, error) { return r.adminClient.EndMaintenance(context.TODO(), hostList) - }) + }, + nil, + ) if retryErr != nil { return resp, result, errors.Wrap(retryErr, "Unable to recover connection") @@ -157,7 +165,9 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au false, func() (*aurora.Response, error) { return r.adminClient.MaintenanceStatus(context.TODO(), hostList) - }) + }, + nil, + ) if retryErr != nil { return resp, result, errors.Wrap(retryErr, "Unable to recover connection") @@ -182,7 +192,9 @@ func (r *realisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb false, func() (*aurora.Response, error) { return r.adminClient.SetQuota(context.TODO(), role, quota) - }) + }, + nil, + ) if retryErr != nil { return resp, errors.Wrap(retryErr, "Unable to set role quota") @@ -198,7 +210,9 @@ func (r *realisClient) GetQuota(role string) (*aurora.Response, error) { false, func() (*aurora.Response, error) { return r.adminClient.GetQuota(context.TODO(), role) - }) + }, + nil, + ) if retryErr != nil { return resp, errors.Wrap(retryErr, "Unable to get role quota") @@ -213,7 +227,9 @@ func (r *realisClient) Snapshot() error { false, func() (*aurora.Response, error) { return r.adminClient.Snapshot(context.TODO()) - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "Unable to recover connection") @@ -229,7 +245,9 @@ func (r *realisClient) PerformBackup() error { false, func() (*aurora.Response, error) { return r.adminClient.PerformBackup(context.TODO()) - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "Unable to recover connection") @@ -244,7 +262,9 @@ func (r *realisClient) ForceImplicitTaskReconciliation() error { false, func() (*aurora.Response, error) { return r.adminClient.TriggerImplicitTaskReconciliation(context.TODO()) - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "Unable to recover connection") @@ -265,7 +285,9 @@ func (r *realisClient) ForceExplicitTaskReconciliation(batchSize *int32) error { _, retryErr := r.thriftCallWithRetries(false, func() (*aurora.Response, error) { return r.adminClient.TriggerExplicitTaskReconciliation(context.TODO(), settings) - }) + }, + nil, + ) if retryErr != nil { return errors.Wrap(retryErr, "Unable to recover connection") diff --git a/response/response.go b/response/response.go index b77348d..4a67ca0 100644 --- a/response/response.go +++ b/response/response.go @@ -36,6 +36,10 @@ func ScheduleStatusResult(resp *aurora.Response) *aurora.ScheduleStatusResult_ { } func JobUpdateSummaries(resp *aurora.Response) []*aurora.JobUpdateSummary { + if resp.GetResult_() == nil || resp.GetResult_().GetGetJobUpdateSummariesResult_() == nil { + return nil + } + return resp.GetResult_().GetGetJobUpdateSummariesResult_().GetUpdateSummaries() } diff --git a/retry.go b/retry.go index 0491194..eefcf2a 100644 --- a/retry.go +++ b/retry.go @@ -114,10 +114,19 @@ func ExponentialBackoff(backoff Backoff, logger logger, condition ConditionFunc) type auroraThriftCall func() (resp *aurora.Response, err error) +// verifyOntimeout defines the type of function that will be used to verify whether a Thirft call to the Scheduler +// made it to the scheduler or not. In general, these types of functions will have to interact with the scheduler +// through the very same Thrift API which previously encountered a time out from the client. +// This means that the functions themselves should be kept to a minimum number of Thrift calls. +// It should also be noted that this is a best effort mechanism and +// is likely to fail for the same reasons that the original call failed. +type verifyOnTimeout func() (*aurora.Response, bool) + // Duplicates the functionality of ExponentialBackoff but is specifically targeted towards ThriftCalls. func (r *realisClient) thriftCallWithRetries( returnOnTimeout bool, - thriftCall auroraThriftCall) (*aurora.Response, error) { + thriftCall auroraThriftCall, + verifyOnTimeout verifyOnTimeout) (*aurora.Response, error) { var resp *aurora.Response var clientErr error @@ -157,42 +166,22 @@ func (r *realisClient) thriftCallWithRetries( r.logger.tracePrintf("Aurora Thrift Call ended resp: %v clientErr: %v", resp, clientErr) }() - // Check if our thrift call is returning an error. This is a retryable event as we don't know - // if it was caused by network issues. + // Check if our thrift call is returning an error. if clientErr != nil { - // Print out the error to the user r.logger.Printf("Client Error: %v", clientErr) - // Determine if error is a temporary URL error by going up the stack - e, ok := clientErr.(thrift.TTransportException) - if ok { - r.logger.debugPrint("Encountered a transport exception") + temporary, timedout := isConnectionError(clientErr) + if !temporary && r.RealisConfig().failOnPermanentErrors { + return nil, errors.Wrap(clientErr, "permanent connection error") + } - e, ok := e.Err().(*url.Error) - if ok { - - // EOF error occurs when the server closes the read buffer of the client. This is common - // when the server is overloaded and should be retried. All other errors that are permanent - // will not be retried. - if e.Err != io.EOF && !e.Temporary() && r.RealisConfig().failOnPermanentErrors { - return nil, errors.Wrap(clientErr, "permanent connection error") - } - - // Corner case where thrift payload was received by Aurora but connection timed out before Aurora was - // able to reply. In this case we will return whatever response was received and a TimedOut behaving - // error. Users can take special action on a timeout by using IsTimedout and reacting accordingly. - if e.Timeout() { - timeouts++ - r.logger.debugPrintf( - "Client closed connection (timedout) %d times before server responded, "+ - "consider increasing connection timeout", - timeouts) - if returnOnTimeout { - return resp, newTimedoutError(errors.New("client connection closed before server answer")) - } - } - } + // There exists a corner case where thrift payload was received by Aurora but + // connection timed out before Aurora was able to reply. + // Users can take special action on a timeout by using IsTimedout and reacting accordingly + // if they have configured the client to return on a timeout. + if timedout && returnOnTimeout { + return resp, newTimedoutError(errors.New("client connection closed before server answer")) } // In the future, reestablish connection should be able to check if it is actually possible @@ -202,48 +191,71 @@ func (r *realisClient) thriftCallWithRetries( if reestablishErr != nil { r.logger.debugPrintf("error re-establishing connection ", reestablishErr) } - } else { - // If there was no client error, but the response is nil, something went wrong. - // Ideally, we'll never encounter this but we're placing a safeguard here. - if resp == nil { - return nil, errors.New("response from aurora is nil") + // If users did not opt for a return on timeout in order to react to a timedout error, + // attempt to verify that the call made it to the scheduler after the connection was re-established. + if timedout { + timeouts++ + r.logger.debugPrintf( + "Client closed connection %d times before server responded, "+ + "consider increasing connection timeout", + timeouts) + + // Allow caller to provide a function which checks if the original call was successful before + // it timed out. + if verifyOnTimeout != nil { + if verifyResp, ok := verifyOnTimeout(); ok { + r.logger.Print("verified that the call went through successfully after a client timeout") + // Response here might be different than the original as it is no longer constructed + // by the scheduler but mimicked. + // This is OK since the scheduler is very unlikely to change responses at this point in its + // development cycle but we must be careful to not return an incorrectly constructed response. + return verifyResp, nil + } + } } - // Check Response Code from thrift and make a decision to continue retrying or not. - switch responseCode := resp.GetResponseCode(); responseCode { + // Retry the thrift payload + continue + } - // If the thrift call succeeded, stop retrying - case aurora.ResponseCode_OK: - return resp, nil + // If there was no client error, but the response is nil, something went wrong. + // Ideally, we'll never encounter this but we're placing a safeguard here. + if resp == nil { + return nil, errors.New("response from aurora is nil") + } - // If the response code is transient, continue retrying - case aurora.ResponseCode_ERROR_TRANSIENT: - r.logger.Println("Aurora replied with Transient error code, retrying") - continue + // Check Response Code from thrift and make a decision to continue retrying or not. + switch responseCode := resp.GetResponseCode(); responseCode { - // Failure scenarios, these indicate a bad payload or a bad config. Stop retrying. - case aurora.ResponseCode_INVALID_REQUEST, - aurora.ResponseCode_ERROR, - aurora.ResponseCode_AUTH_FAILED, - aurora.ResponseCode_JOB_UPDATING_ERROR: - r.logger.Printf("Terminal Response Code %v from Aurora, won't retry\n", resp.GetResponseCode().String()) - return resp, errors.New(response.CombineMessage(resp)) + // If the thrift call succeeded, stop retrying + case aurora.ResponseCode_OK: + return resp, nil - // The only case that should fall down to here is a WARNING response code. - // It is currently not used as a response in the scheduler so it is unknown how to handle it. - default: - r.logger.debugPrintf("unhandled response code %v received from Aurora\n", responseCode) - return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String()) - } + // If the response code is transient, continue retrying + case aurora.ResponseCode_ERROR_TRANSIENT: + r.logger.Println("Aurora replied with Transient error code, retrying") + continue + + // Failure scenarios, these indicate a bad payload or a bad config. Stop retrying. + case aurora.ResponseCode_INVALID_REQUEST, + aurora.ResponseCode_ERROR, + aurora.ResponseCode_AUTH_FAILED, + aurora.ResponseCode_JOB_UPDATING_ERROR: + r.logger.Printf("Terminal Response Code %v from Aurora, won't retry\n", resp.GetResponseCode().String()) + return resp, errors.New(response.CombineMessage(resp)) + + // The only case that should fall down to here is a WARNING response code. + // It is currently not used as a response in the scheduler so it is unknown how to handle it. + default: + r.logger.debugPrintf("unhandled response code %v received from Aurora\n", responseCode) + return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String()) } } - r.logger.debugPrintf("it took %v retries to complete this operation\n", curStep) - if curStep > 1 { - r.config.logger.Printf("retried this thrift call %d time(s)", curStep) + r.config.logger.Printf("this thrift call was retried %d time(s)", curStep) } // Provide more information to the user wherever possible. @@ -253,3 +265,30 @@ func (r *realisClient) thriftCallWithRetries( return nil, newRetryError(errors.New("ran out of retries"), curStep) } + +// isConnectionError processes the error received by the client. +// The return values indicate weather this was determined to be a temporary error +// and weather it was determined to be a timeout error +func isConnectionError(err error) (bool, bool) { + + // Determine if error is a temporary URL error by going up the stack + transportException, ok := err.(thrift.TTransportException) + if !ok { + return false, false + } + + urlError, ok := transportException.Err().(*url.Error) + if !ok { + return false, false + } + + // EOF error occurs when the server closes the read buffer of the client. This is common + // when the server is overloaded and we consider it temporary. + // All other which are not temporary as per the member function Temporary(), + // are considered not temporary (permanent). + if urlError.Err != io.EOF && !urlError.Temporary() { + return false, false + } + + return true, urlError.Timeout() +} diff --git a/util.go b/util.go index 19930e2..4307d1c 100644 --- a/util.go +++ b/util.go @@ -29,7 +29,7 @@ var TerminalStates = make(map[aurora.ScheduleStatus]bool) // ActiveJobUpdateStates - States a Job Update may be in where it is considered active. var ActiveJobUpdateStates = make(map[aurora.JobUpdateStatus]bool) -// TerminalJobUpdateStates returns a slice containing all the terminal states an update may end up in. +// TerminalUpdateStates returns a slice containing all the terminal states an update may be in. // This is a function in order to avoid having a slice that can be accidentally mutated. func TerminalUpdateStates() []aurora.JobUpdateStatus { return []aurora.JobUpdateStatus{ From 49877b7d4159904f719c4aeed8c159514a46ad24 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A1n=20I=2E=20Del=20Valle?= Date: Fri, 6 Aug 2021 10:00:57 -0700 Subject: [PATCH 04/15] Adds support for running CI on github actions. (#132) --- .github/main.yml | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 .github/main.yml diff --git a/.github/main.yml b/.github/main.yml new file mode 100644 index 0000000..0445266 --- /dev/null +++ b/.github/main.yml @@ -0,0 +1,25 @@ +name: CI + +on: [push] + +jobs: + build: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + - name: Setup Go for use with actions + uses: actions/setup-go@v2 + with: + go-version: 1.16 + - name: Install goimports + run: go get golang.org/x/tools/cmd/goimports + - name: Set env with list of directories in repo containin go code + run: echo GO_USR_DIRS=$(go list -f {{.Dir}} ./... | grep -E -v "/gen-go/|/vendor/") >> $GITHUB_ENV + - name: Run goimports check + run: test -z "`for d in $GO_USR_DIRS; do goimports -d $d/*.go | tee /dev/stderr; done`" + - name: Create aurora/mesos docker cluster + run: docker-compose up -d + - name: Run tests + run: go test -timeout 35m -race -coverprofile=coverage.txt -covermode=atomic -v github.com/paypal/gorealis From c7e309f4219594e2edf19e295fe8245e1d76980d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A1n=20I=2E=20Del=20Valle?= Date: Fri, 6 Aug 2021 10:06:12 -0700 Subject: [PATCH 05/15] Actions fix (#133) * Moving main.yml to the right place. --- .github/workflows/main.yml | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 .github/workflows/main.yml diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml new file mode 100644 index 0000000..0445266 --- /dev/null +++ b/.github/workflows/main.yml @@ -0,0 +1,25 @@ +name: CI + +on: [push] + +jobs: + build: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + - name: Setup Go for use with actions + uses: actions/setup-go@v2 + with: + go-version: 1.16 + - name: Install goimports + run: go get golang.org/x/tools/cmd/goimports + - name: Set env with list of directories in repo containin go code + run: echo GO_USR_DIRS=$(go list -f {{.Dir}} ./... | grep -E -v "/gen-go/|/vendor/") >> $GITHUB_ENV + - name: Run goimports check + run: test -z "`for d in $GO_USR_DIRS; do goimports -d $d/*.go | tee /dev/stderr; done`" + - name: Create aurora/mesos docker cluster + run: docker-compose up -d + - name: Run tests + run: go test -timeout 35m -race -coverprofile=coverage.txt -covermode=atomic -v github.com/paypal/gorealis From 86eb0458087a0a9d8a0af2bc2fcc61f822d8bcbd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A1n=20I=2E=20Del=20Valle?= Date: Fri, 6 Aug 2021 10:57:45 -0700 Subject: [PATCH 06/15] Adds go.sum and removes dep files (#134) --- Gopkg.lock | 64 ------------------------------------------------------ Gopkg.toml | 16 -------------- go.sum | 30 +++++++++++++++++++++++++ 3 files changed, 30 insertions(+), 80 deletions(-) delete mode 100644 Gopkg.lock delete mode 100644 Gopkg.toml create mode 100644 go.sum diff --git a/Gopkg.lock b/Gopkg.lock deleted file mode 100644 index 1994a30..0000000 --- a/Gopkg.lock +++ /dev/null @@ -1,64 +0,0 @@ -# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'. - - -[[projects]] - digest = "1:89696c38cec777120b8b1bb5e2d363d655cf2e1e7d8c851919aaa0fd576d9b86" - name = "github.com/apache/thrift" - packages = ["lib/go/thrift"] - pruneopts = "" - revision = "384647d290e2e4a55a14b1b7ef1b7e66293a2c33" - version = "v0.12.0" - -[[projects]] - digest = "1:56c130d885a4aacae1dd9c7b71cfe39912c7ebc1ff7d2b46083c8812996dc43b" - name = "github.com/davecgh/go-spew" - packages = ["spew"] - pruneopts = "" - revision = "346938d642f2ec3594ed81d874461961cd0faa76" - version = "v1.1.0" - -[[projects]] - digest = "1:df48fb76fb2a40edea0c9b3d960bc95e326660d82ff1114e1f88001f7a236b40" - name = "github.com/pkg/errors" - packages = ["."] - pruneopts = "" - revision = "e881fd58d78e04cf6d0de1217f8707c8cc2249bc" - -[[projects]] - digest = "1:256484dbbcd271f9ecebc6795b2df8cad4c458dd0f5fd82a8c2fa0c29f233411" - name = "github.com/pmezard/go-difflib" - packages = ["difflib"] - pruneopts = "" - revision = "792786c7400a136282c1664665ae0a8db921c6c2" - version = "v1.0.0" - -[[projects]] - digest = "1:78bea5e26e82826dacc5fd64a1013a6711b7075ec8072819b89e6ad76cb8196d" - name = "github.com/samuel/go-zookeeper" - packages = ["zk"] - pruneopts = "" - revision = "471cd4e61d7a78ece1791fa5faa0345dc8c7d5a5" - -[[projects]] - digest = "1:381bcbeb112a51493d9d998bbba207a529c73dbb49b3fd789e48c63fac1f192c" - name = "github.com/stretchr/testify" - packages = [ - "assert", - "require", - ] - pruneopts = "" - revision = "ffdc059bfe9ce6a4e144ba849dbedead332c6053" - version = "v1.3.0" - -[solve-meta] - analyzer-name = "dep" - analyzer-version = 1 - input-imports = [ - "github.com/apache/thrift/lib/go/thrift", - "github.com/pkg/errors", - "github.com/samuel/go-zookeeper/zk", - "github.com/stretchr/testify/assert", - "github.com/stretchr/testify/require", - ] - solver-name = "gps-cdcl" - solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml deleted file mode 100644 index 5d6c9f2..0000000 --- a/Gopkg.toml +++ /dev/null @@ -1,16 +0,0 @@ -[[constraint]] - name = "github.com/apache/thrift" - version = "0.12.0" - -[[constraint]] - name = "github.com/pkg/errors" - revision = "e881fd58d78e04cf6d0de1217f8707c8cc2249bc" - -[[constraint]] - name = "github.com/samuel/go-zookeeper" - revision = "471cd4e61d7a78ece1791fa5faa0345dc8c7d5a5" - -[[constraint]] - name = "github.com/stretchr/testify" - version = "1.3.0" - diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..f3f5eb6 --- /dev/null +++ b/go.sum @@ -0,0 +1,30 @@ +github.com/apache/thrift v0.13.0 h1:5hryIiq9gtn+MiLVn0wP37kb/uTeRZgN08WoCsAhIhI= +github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= +github.com/apache/thrift v0.14.0 h1:vqZ2DP42i8th2OsgCcYZkirtbzvpZEFx53LiWDJXIAs= +github.com/apache/thrift v0.14.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pkg/errors v0.0.0-20171216070316-e881fd58d78e h1:+RHxT/gm0O3UF7nLJbdNzAmULvCFt4XfXHWzh3XI/zs= +github.com/pkg/errors v0.0.0-20171216070316-e881fd58d78e/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/ridv/thrift v0.12.1 h1:b80V1Oa2Mbd++jrlJZbJsIybO5/MCfbXKzd1A5v4aSo= +github.com/ridv/thrift v0.12.1/go.mod h1:yTMRF94RCZjO1fY1xt69yncvMbQCPdRL8BhbwIrjPx8= +github.com/ridv/thrift v0.13.1 h1:/8XnTRUqJJeiuqoL7mfnJQmXQa4GJn9tUCiP7+i6Y9o= +github.com/ridv/thrift v0.13.1/go.mod h1:yTMRF94RCZjO1fY1xt69yncvMbQCPdRL8BhbwIrjPx8= +github.com/ridv/thrift v0.13.2 h1:Q3Smr8poXd7VkWZPHvdJZzlQCJO+b5W37ECfoUL4qHc= +github.com/ridv/thrift v0.13.2/go.mod h1:yTMRF94RCZjO1fY1xt69yncvMbQCPdRL8BhbwIrjPx8= +github.com/samuel/go-zookeeper v0.0.0-20171117190445-471cd4e61d7a h1:EYL2xz/Zdo0hyqdZMXR4lmT2O11jDLTPCEqIe/FR6W4= +github.com/samuel/go-zookeeper v0.0.0-20171117190445-471cd4e61d7a/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= +github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.0 h1:LThGCOvhuJic9Gyd1VBCkhyUXmO8vKaBFvBsJ2k03rg= +github.com/stretchr/testify v1.2.0/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= From dbc396b0dbed60d2a23311fd09e73cf9581c070e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A1n=20I=2E=20Del=20Valle?= Date: Fri, 6 Aug 2021 10:58:34 -0700 Subject: [PATCH 07/15] Disables Travis CI (#135) Travis CI is no longer needed as we have migrated to Github Actions --- .travis.yml | 33 --------------------------------- 1 file changed, 33 deletions(-) delete mode 100644 .travis.yml diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index f9afb52..0000000 --- a/.travis.yml +++ /dev/null @@ -1,33 +0,0 @@ - -os: linux -dist: xenial -language: go - -branches: - only: - - main - - future - -go: - - "1.15.x" - -env: - global: - - GO_USR_DIRS=$(go list -f {{.Dir}} ./... | grep -E -v "/gen-go/|/vendor/") - -services: - - docker - -before_install: - - go get golang.org/x/tools/cmd/goimports - - test -z "`for d in $GO_USR_DIRS; do goimports -d $d/*.go | tee /dev/stderr; done`" - -install: - - go mod download - - docker-compose up -d - -script: - - go test -timeout 30m -race -coverprofile=coverage.txt -covermode=atomic -v github.com/paypal/gorealis - -after_success: - - bash <(curl -s https://codecov.io/bash) From 5c39a23eb214cf14772f2b95def8ec8e0c535f9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A1n=20I=2E=20Del=20Valle?= Date: Fri, 6 Aug 2021 15:52:46 -0700 Subject: [PATCH 08/15] Enable Github Actions for PRs Run CI on pull requests and when the branch is pushed. --- .github/workflows/main.yml | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 0445266..7edfc51 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -1,7 +1,12 @@ name: CI -on: [push] - +on: + push: + branches: + - main + pull_request: + branches: + - main jobs: build: From 62df98a3c859334eff5becaa7b24ae8c761d160f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A1n=20I=2E=20Del=20Valle?= Date: Fri, 6 Aug 2021 16:02:52 -0700 Subject: [PATCH 09/15] Bug fix for auto paused update monitor (#136) Returns success if the update has finished updating successfully. --- monitors.go | 3 ++- realis_e2e_test.go | 6 ++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/monitors.go b/monitors.go index 0d9423a..edbe4e4 100644 --- a/monitors.go +++ b/monitors.go @@ -168,7 +168,8 @@ func (m *Monitor) AutoPausedUpdateMonitor(key aurora.JobUpdateKey, interval, tim return -1, err } - if summary[0].State.Status != aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED { + if !(summary[0].State.Status == aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED || + summary[0].State.Status == aurora.JobUpdateStatus_ROLLED_FORWARD) { return -1, errors.Errorf("update is in a terminal state %v", summary[0].State.Status) } diff --git a/realis_e2e_test.go b/realis_e2e_test.go index 27e8b03..99155ab 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -1059,8 +1059,10 @@ func TestRealisClient_BatchAwareAutoPause(t *testing.T) { assert.Equal(t, i, curStep) - _, err = r.ResumeJobUpdate(&key, "auto resuming test") - require.NoError(t, err) + if i != len(updateGroups)-1 { + _, err = r.ResumeJobUpdate(&key, "auto resuming test") + require.NoError(t, err) + } } _, err = r.KillJob(job.JobKey()) From c59d01ab519dd63274cc23ec8a76cc7bf4f9954c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A1n=20I=2E=20Del=20Valle?= Date: Fri, 6 Aug 2021 18:16:06 -0700 Subject: [PATCH 10/15] Changes Travis CI badge to Github Actions badge (#137) --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 3d33bd8..0ce8bc1 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# gorealis [![GoDoc](https://godoc.org/github.com/paypal/gorealis?status.svg)](https://godoc.org/github.com/paypal/gorealis) [![Build Status](https://travis-ci.org/paypal/gorealis.svg?branch=main)](https://travis-ci.org/paypal/gorealis) [![codecov](https://codecov.io/gh/paypal/gorealis/branch/main/graph/badge.svg)](https://codecov.io/gh/paypal/gorealis) +# gorealis [![GoDoc](https://godoc.org/github.com/paypal/gorealis?status.svg)](https://godoc.org/github.com/paypal/gorealis) ![CI Build Status](https://github.com/paypal/gorealis/actions/workflows/main.yml/badge.svg) [![codecov](https://codecov.io/gh/paypal/gorealis/branch/main/graph/badge.svg)](https://codecov.io/gh/paypal/gorealis) Version 1 of Go library for interacting with [Aurora Scheduler](https://github.com/aurora-scheduler/aurora). From fff2c16751974fd8d7ec44547d509f8dcea4c6ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A1n=20I=2E=20Del=20Valle?= Date: Wed, 1 Sep 2021 10:09:11 -0700 Subject: [PATCH 11/15] Enabling code analysis --- .github/workflows/codeql-analysis.yml | 57 +++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 .github/workflows/codeql-analysis.yml diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml new file mode 100644 index 0000000..6c1890f --- /dev/null +++ b/.github/workflows/codeql-analysis.yml @@ -0,0 +1,57 @@ +# For most projects, this workflow file will not need changing; you simply need +# to commit it to your repository. +# +# You may wish to alter this file to override the set of languages analyzed, +# or to provide custom queries or build logic. +# +# ******** NOTE ******** +# We have attempted to detect the languages in your repository. Please check +# the `language` matrix defined below to confirm you have the correct set of +# supported CodeQL languages. +# +name: "CodeQL" + +on: + push: + branches: [ main ] + pull_request: + # The branches below must be a subset of the branches above + branches: [ main ] + schedule: + - cron: '34 4 * * 3' + +jobs: + analyze: + name: Analyze + runs-on: ubuntu-latest + permissions: + actions: read + contents: read + security-events: write + + strategy: + fail-fast: false + matrix: + language: [ 'go' ] + # CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python' ] + # Learn more: + # https://docs.github.com/en/free-pro-team@latest/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#changing-the-languages-that-are-analyzed + + steps: + - name: Checkout repository + uses: actions/checkout@v2 + + # Initializes the CodeQL tools for scanning. + - name: Initialize CodeQL + uses: github/codeql-action/init@v1 + with: + languages: ${{ matrix.language }} + # If you wish to specify custom queries, you can do so here or in a config file. + # By default, queries listed here will override any specified in a config file. + # Prefix the list here with "+" to use these queries and those in the config file. + # queries: ./path/to/local/query, your-org/your-repo/queries@main + + - run: go build examples/client.go + + - name: Perform CodeQL Analysis + uses: github/codeql-action/analyze@v1 From db9bebb80270daa939d75244d7f1f5ec488abc65 Mon Sep 17 00:00:00 2001 From: "Tan N. Le" Date: Mon, 1 Nov 2021 18:17:49 -0700 Subject: [PATCH 12/15] enable default sla for slaDrain (#138) --- realis.go | 9 +++++++++ realis_admin.go | 12 ++++++++++++ realis_e2e_test.go | 47 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 68 insertions(+) diff --git a/realis.go b/realis.go index 772e9ab..89de748 100644 --- a/realis.go +++ b/realis.go @@ -129,6 +129,15 @@ var defaultBackoff = Backoff{ Jitter: 0.1, } +var defaultSlaPolicy = aurora.SlaPolicy{ + PercentageSlaPolicy: &aurora.PercentageSlaPolicy{ + Percentage: 66, + DurationSecs: 300, + }, +} + +const defaultSlaDrainTimeoutSecs = 900 + // ClientOption is an alias for a function that modifies the realis config object type ClientOption func(*config) diff --git a/realis_admin.go b/realis_admin.go index cec92af..9c58081 100644 --- a/realis_admin.go +++ b/realis_admin.go @@ -58,6 +58,18 @@ func (r *realisClient) SLADrainHosts( return nil, errors.New("no hosts provided to drain") } + if policy == nil || policy.CountSetFieldsSlaPolicy() == 0 { + policy = &defaultSlaPolicy + r.logger.Printf("Warning: start draining with default sla policy %v", policy) + } + + if timeout < 0 { + r.logger.Printf("Warning: timeout %d secs is invalid, draining with default timeout %d secs", + timeout, + defaultSlaDrainTimeoutSecs) + timeout = defaultSlaDrainTimeoutSecs + } + drainList := aurora.NewHosts() drainList.HostNames = hosts diff --git a/realis_e2e_test.go b/realis_e2e_test.go index 99155ab..78d8f0e 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -750,6 +750,53 @@ func TestRealisClient_SLADrainHosts(t *testing.T) { 5, 10) assert.NoError(t, err) + + // slaDrainHosts goes with default policy if no policy is specified + _, err = r.SLADrainHosts(nil, 30, hosts...) + require.NoError(t, err, "unable to drain host with SLA policy") + + // Monitor change to DRAINING and DRAINED mode + hostResults, err = monitor.HostMaintenance( + hosts, + []aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING}, + 1, + 50) + assert.NoError(t, err) + assert.Equal(t, map[string]bool{"localhost": true}, hostResults) + + _, _, err = r.EndMaintenance(hosts...) + require.NoError(t, err) + + // Monitor change to DRAINING and DRAINED mode + _, err = monitor.HostMaintenance( + hosts, + []aurora.MaintenanceMode{aurora.MaintenanceMode_NONE}, + 5, + 10) + assert.NoError(t, err) + + _, err = r.SLADrainHosts(&aurora.SlaPolicy{}, 30, hosts...) + require.NoError(t, err, "unable to drain host with SLA policy") + + // Monitor change to DRAINING and DRAINED mode + hostResults, err = monitor.HostMaintenance( + hosts, + []aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING}, + 1, + 50) + assert.NoError(t, err) + assert.Equal(t, map[string]bool{"localhost": true}, hostResults) + + _, _, err = r.EndMaintenance(hosts...) + require.NoError(t, err) + + // Monitor change to DRAINING and DRAINED mode + _, err = monitor.HostMaintenance( + hosts, + []aurora.MaintenanceMode{aurora.MaintenanceMode_NONE}, + 5, + 10) + assert.NoError(t, err) } // Test multiple go routines using a single connection From c318042e96a9ef8e07c59acb6025c3ddea17de6b Mon Sep 17 00:00:00 2001 From: "Tan N. Le" Date: Tue, 9 Nov 2021 09:00:35 -0800 Subject: [PATCH 13/15] release 1.24.0 (#139) --- CHANGELOG.md | 9 ++++++++- realis.go | 2 +- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a1ee217..c16d735 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,11 @@ -1.23.1 (unreleased) +1.24.1 (unreleased) + +1.24.0 + +* enable default sla for slaDrain +* Changes Travis CI badge to Github Actions badge +* Bug fix for auto paused update monitor +* Adds support for running CI on github actions 1.23.0 diff --git a/realis.go b/realis.go index 89de748..0464908 100644 --- a/realis.go +++ b/realis.go @@ -35,7 +35,7 @@ import ( "github.com/paypal/gorealis/response" ) -const version = "1.23.1" +const version = "1.24.1" // Realis is an interface that defines the various APIs that may be used to communicate with // the Apache Aurora scheduler. From 8454a6ebf3574dae344adf00865f4ccdbc0533ee Mon Sep 17 00:00:00 2001 From: shivrsrivastava <115449917+shivrsrivastava@users.noreply.github.com> Date: Thu, 13 Oct 2022 10:16:07 +0530 Subject: [PATCH 14/15] Adding priority to the task (#140) --- job.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/job.go b/job.go index 1d85f33..8470546 100644 --- a/job.go +++ b/job.go @@ -62,6 +62,7 @@ type Job interface { PartitionPolicy(policy *aurora.PartitionPolicy) Job Tier(tier string) Job SlaPolicy(policy *aurora.SlaPolicy) Job + Priority(priority int32) Job } type resourceType int @@ -383,3 +384,8 @@ func (j *AuroraJob) SlaPolicy(policy *aurora.SlaPolicy) Job { return j } + +func (j *AuroraJob) Priority(priority int32) Job { + j.jobConfig.TaskConfig.Priority = priority + return j +} From db10285368cdbca6bed4ebf5a63390447589c9d8 Mon Sep 17 00:00:00 2001 From: "Tan N. Le" Date: Wed, 12 Oct 2022 21:47:07 -0700 Subject: [PATCH 15/15] Update CHANGELOG.md --- CHANGELOG.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c16d735..9cfbaef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,8 @@ -1.24.1 (unreleased) +1.25.1 (unreleased) + +1.25.0 + +* Add priority api 1.24.0