From 8a9a97c150331f411143b81ef8782bc85efc9b93 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Wed, 7 Nov 2018 19:09:16 -0800 Subject: [PATCH] Removing unnecessary interface from Aurora Job. --- examples/client.go | 10 ++--- job.go | 98 ++++++++++++++++------------------------------ monitors.go | 4 +- realis.go | 25 ++++++------ updatejob.go | 12 +++--- 5 files changed, 60 insertions(+), 89 deletions(-) diff --git a/examples/client.go b/examples/client.go index 7ab9af3..ca876c4 100644 --- a/examples/client.go +++ b/examples/client.go @@ -31,10 +31,10 @@ var cmd, executor, url, clustersConfig, clusterName, updateId, username, passwor var caCertsPath string var clientKey, clientCert string -var CONNECTION_TIMEOUT = 20000 +var CONNECTION_TIMEOUT = 20 * time.Second func init() { - flag.StringVar(&cmd, "cmd", "", "Job request type to send to Aurora Scheduler") + flag.StringVar(&cmd, "cmd", "", "Aurora Job request type to send to Aurora Scheduler") flag.StringVar(&executor, "executor", "thermos", "Executor to use") flag.StringVar(&url, "url", "", "URL at which the Aurora Scheduler exists as [url]:[port]") flag.StringVar(&clustersConfig, "clusters", "", "Location of the clusters.json file used by aurora.") @@ -74,7 +74,7 @@ func init() { func main() { - var job realis.Job + var job *realis.AuroraJob var err error var monitor *realis.Monitor var r *realis.RealisClient @@ -82,7 +82,7 @@ func main() { clientOptions := []realis.ClientOption{ realis.BasicAuth(username, password), realis.ThriftJSON(), - realis.TimeoutMS(CONNECTION_TIMEOUT), + realis.Timeout(CONNECTION_TIMEOUT), realis.BackOff(realis.Backoff{ Steps: 2, Duration: 10 * time.Second, @@ -101,7 +101,7 @@ func main() { } if caCertsPath != "" { - clientOptions = append(clientOptions, realis.Certspath(caCertsPath)) + clientOptions = append(clientOptions, realis.CertsPath(caCertsPath)) } if clientKey != "" && clientCert != "" { diff --git a/job.go b/job.go index 0ff2aac..44beb83 100644 --- a/job.go +++ b/job.go @@ -20,43 +20,6 @@ import ( "github.com/paypal/gorealis/gen-go/apache/aurora" ) -type Job interface { - // Set Job Key environment. - Environment(env string) Job - Role(role string) Job - Name(name string) Job - CPU(cpus float64) Job - CronSchedule(cron string) Job - CronCollisionPolicy(policy aurora.CronCollisionPolicy) Job - Disk(disk int64) Job - RAM(ram int64) Job - ExecutorName(name string) Job - ExecutorData(data string) Job - AddPorts(num int) Job - AddLabel(key string, value string) Job - AddNamedPorts(names ...string) Job - AddLimitConstraint(name string, limit int32) Job - AddValueConstraint(name string, negated bool, values ...string) Job - - // From Aurora Docs: - // dedicated attribute. Aurora treats this specially, and only allows matching jobs - // to run on these machines, and will only schedule matching jobs on these machines. - // When a job is created, the scheduler requires that the $role component matches - // the role field in the job configuration, and will reject the job creation otherwise. - // A wildcard (*) may be used for the role portion of the dedicated attribute, which - // will allow any owner to elect for a job to run on the host(s) - AddDedicatedConstraint(role, name string) Job - AddURIs(extract bool, cache bool, values ...string) Job - JobKey() *aurora.JobKey - JobConfig() *aurora.JobConfiguration - TaskConfig() *aurora.TaskConfig - IsService(isService bool) Job - InstanceCount(instCount int32) Job - GetInstanceCount() int32 - MaxFailure(maxFail int32) Job - Container(container Container) Job -} - // Structure to collect all information pertaining to an Aurora job. type AuroraJob struct { jobConfig *aurora.JobConfiguration @@ -64,13 +27,13 @@ type AuroraJob struct { portCount int } -// Create a Job object with everything initialized. -func NewJob() Job { +// Create a AuroraJob object with everything initialized. +func NewJob() *AuroraJob { jobConfig := aurora.NewJobConfiguration() taskConfig := aurora.NewTaskConfig() jobKey := aurora.NewJobKey() - // Job Config + // AuroraJob Config jobConfig.Key = jobKey jobConfig.TaskConfig = taskConfig @@ -108,14 +71,14 @@ func NewJob() Job { } } -// Set Job Key environment. -func (j *AuroraJob) Environment(env string) Job { +// Set AuroraJob Key environment. +func (j *AuroraJob) Environment(env string) *AuroraJob { j.jobConfig.Key.Environment = env return j } -// Set Job Key Role. -func (j *AuroraJob) Role(role string) Job { +// Set AuroraJob Key Role. +func (j *AuroraJob) Role(role string) *AuroraJob { j.jobConfig.Key.Role = role //Will be deprecated @@ -125,14 +88,14 @@ func (j *AuroraJob) Role(role string) Job { return j } -// Set Job Key Name. -func (j *AuroraJob) Name(name string) Job { +// Set AuroraJob Key Name. +func (j *AuroraJob) Name(name string) *AuroraJob { j.jobConfig.Key.Name = name return j } // Set name of the executor that will the task will be configured to. -func (j *AuroraJob) ExecutorName(name string) Job { +func (j *AuroraJob) ExecutorName(name string) *AuroraJob { if j.jobConfig.TaskConfig.ExecutorConfig == nil { j.jobConfig.TaskConfig.ExecutorConfig = aurora.NewExecutorConfig() @@ -143,7 +106,7 @@ func (j *AuroraJob) ExecutorName(name string) Job { } // Will be included as part of entire task inside the scheduler that will be serialized. -func (j *AuroraJob) ExecutorData(data string) Job { +func (j *AuroraJob) ExecutorData(data string) *AuroraJob { if j.jobConfig.TaskConfig.ExecutorConfig == nil { j.jobConfig.TaskConfig.ExecutorConfig = aurora.NewExecutorConfig() @@ -153,42 +116,42 @@ func (j *AuroraJob) ExecutorData(data string) Job { return j } -func (j *AuroraJob) CPU(cpus float64) Job { +func (j *AuroraJob) CPU(cpus float64) *AuroraJob { *j.resources["cpu"].NumCpus = cpus return j } -func (j *AuroraJob) RAM(ram int64) Job { +func (j *AuroraJob) RAM(ram int64) *AuroraJob { *j.resources["ram"].RamMb = ram return j } -func (j *AuroraJob) Disk(disk int64) Job { +func (j *AuroraJob) Disk(disk int64) *AuroraJob { *j.resources["disk"].DiskMb = disk return j } // How many failures to tolerate before giving up. -func (j *AuroraJob) MaxFailure(maxFail int32) Job { +func (j *AuroraJob) MaxFailure(maxFail int32) *AuroraJob { j.jobConfig.TaskConfig.MaxTaskFailures = maxFail return j } // How many instances of the job to run -func (j *AuroraJob) InstanceCount(instCount int32) Job { +func (j *AuroraJob) InstanceCount(instCount int32) *AuroraJob { j.jobConfig.InstanceCount = instCount return j } -func (j *AuroraJob) CronSchedule(cron string) Job { +func (j *AuroraJob) CronSchedule(cron string) *AuroraJob { j.jobConfig.CronSchedule = &cron return j } -func (j *AuroraJob) CronCollisionPolicy(policy aurora.CronCollisionPolicy) Job { +func (j *AuroraJob) CronCollisionPolicy(policy aurora.CronCollisionPolicy) *AuroraJob { j.jobConfig.CronCollisionPolicy = policy return j } @@ -199,7 +162,7 @@ func (j *AuroraJob) GetInstanceCount() int32 { } // Restart the job's tasks if they fail -func (j *AuroraJob) IsService(isService bool) Job { +func (j *AuroraJob) IsService(isService bool) *AuroraJob { j.jobConfig.TaskConfig.IsService = isService return j } @@ -220,7 +183,7 @@ func (j *AuroraJob) TaskConfig() *aurora.TaskConfig { // Add a list of URIs with the same extract and cache configuration. Scheduler must have // --enable_mesos_fetcher flag enabled. Currently there is no duplicate detection. -func (j *AuroraJob) AddURIs(extract bool, cache bool, values ...string) Job { +func (j *AuroraJob) AddURIs(extract bool, cache bool, values ...string) *AuroraJob { for _, value := range values { j.jobConfig.TaskConfig.MesosFetcherUris[&aurora.MesosFetcherURI{ Value: value, @@ -233,14 +196,14 @@ func (j *AuroraJob) AddURIs(extract bool, cache bool, values ...string) Job { // Adds a Mesos label to the job. Note that Aurora will add the // prefix "org.apache.aurora.metadata." to the beginning of each key. -func (j *AuroraJob) AddLabel(key string, value string) Job { +func (j *AuroraJob) AddLabel(key string, value string) *AuroraJob { j.jobConfig.TaskConfig.Metadata[&aurora.Metadata{Key: key, Value: value}] = true return j } // Add a named port to the job configuration These are random ports as it's // not currently possible to request specific ports using Aurora. -func (j *AuroraJob) AddNamedPorts(names ...string) Job { +func (j *AuroraJob) AddNamedPorts(names ...string) *AuroraJob { j.portCount += len(names) for _, name := range names { j.jobConfig.TaskConfig.Resources[&aurora.Resource{NamedPort: &name}] = true @@ -253,7 +216,7 @@ func (j *AuroraJob) AddNamedPorts(names ...string) Job { // will be org.apache.aurora.port.X, where X is the current port count for the job configuration // starting at 0. These are random ports as it's not currently possible to request // specific ports using Aurora. -func (j *AuroraJob) AddPorts(num int) Job { +func (j *AuroraJob) AddPorts(num int) *AuroraJob { start := j.portCount j.portCount += num for i := start; i < j.portCount; i++ { @@ -269,7 +232,7 @@ func (j *AuroraJob) AddPorts(num int) Job { // name - Mesos slave attribute that the constraint is matched against. // If negated = true , treat this as a 'not' - to avoid specific values. // Values - list of values we look for in attribute name -func (j *AuroraJob) AddValueConstraint(name string, negated bool, values ...string) Job { +func (j *AuroraJob) AddValueConstraint(name string, negated bool, values ...string) *AuroraJob { constraintValues := make(map[string]bool) for _, value := range values { constraintValues[value] = true @@ -291,7 +254,7 @@ func (j *AuroraJob) AddValueConstraint(name string, negated bool, values ...stri // From Aurora Docs: // A constraint that specifies the maximum number of active tasks on a host with // a matching attribute that may be scheduled simultaneously. -func (j *AuroraJob) AddLimitConstraint(name string, limit int32) Job { +func (j *AuroraJob) AddLimitConstraint(name string, limit int32) *AuroraJob { j.jobConfig.TaskConfig.Constraints[&aurora.Constraint{ Name: name, Constraint: &aurora.TaskConstraint{ @@ -303,14 +266,21 @@ func (j *AuroraJob) AddLimitConstraint(name string, limit int32) Job { return j } -func (j *AuroraJob) AddDedicatedConstraint(role, name string) Job { +// From Aurora Docs: +// dedicated attribute. Aurora treats this specially, and only allows matching jobs +// to run on these machines, and will only schedule matching jobs on these machines. +// When a job is created, the scheduler requires that the $role component matches +// the role field in the job configuration, and will reject the job creation otherwise. +// A wildcard (*) may be used for the role portion of the dedicated attribute, which +// will allow any owner to elect for a job to run on the host(s) +func (j *AuroraJob) AddDedicatedConstraint(role, name string) *AuroraJob { j.AddValueConstraint("dedicated", false, role+"/"+name) return j } // Set a container to run for the job configuration to run. -func (j *AuroraJob) Container(container Container) Job { +func (j *AuroraJob) Container(container Container) *AuroraJob { j.jobConfig.TaskConfig.Container = container.Build() return j diff --git a/monitors.go b/monitors.go index 170964e..d23426f 100644 --- a/monitors.go +++ b/monitors.go @@ -87,12 +87,12 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout } } -// Monitor a Job until all instances enter one of the LIVE_STATES +// Monitor a AuroraJob until all instances enter one of the LIVE_STATES func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval, timeout int) (bool, error) { return m.ScheduleStatus(key, instances, aurora.LIVE_STATES, interval, timeout) } -// Monitor a Job until all instances enter a desired status. +// Monitor a AuroraJob until all instances enter a desired status. // Defaults sets of desired statuses provided by the thrift API include: // ACTIVE_STATES, SLAVE_ASSIGNED_STATES, LIVE_STATES, and TERMINAL_STATES func (m *Monitor) ScheduleStatus(key *aurora.JobKey, instanceCount int32, desiredStatuses map[aurora.ScheduleStatus]bool, interval, timeout int) (bool, error) { diff --git a/realis.go b/realis.go index 740f003..b29c899 100644 --- a/realis.go +++ b/realis.go @@ -172,7 +172,7 @@ func Debug() ClientOption { } } -func newTJSONTransport(url string, timeout int, config *RealisConfig) (thrift.TTransport, error) { +func newTJSONTransport(url string, timeout time.Duration, config *RealisConfig) (thrift.TTransport, error) { trans, err := defaultTTransport(url, timeout, config) if err != nil { return nil, errors.Wrap(err, "Error creating realis") @@ -183,7 +183,7 @@ func newTJSONTransport(url string, timeout int, config *RealisConfig) (thrift.TT return trans, err } -func newTBinTransport(url string, timeout int, config *RealisConfig) (thrift.TTransport, error) { +func newTBinTransport(url string, timeout time.Duration, config *RealisConfig) (thrift.TTransport, error) { trans, err := defaultTTransport(url, timeout, config) if err != nil { return nil, errors.Wrap(err, "Error creating realis") @@ -567,7 +567,7 @@ func (r *RealisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { // Although this API is able to create service jobs, it is better to use CreateService instead // as that API uses the update thrift call which has a few extra features available. // Use this API to create ad-hoc jobs. -func (r *RealisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { +func (r *RealisClient) CreateJob(auroraJob *AuroraJob) (*aurora.Response, error) { r.logger.DebugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig()) @@ -578,11 +578,12 @@ func (r *RealisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { if retryErr != nil { return resp, errors.Wrap(retryErr, "Error sending Create command to Aurora Scheduler") } + return resp, nil } // This API uses an update thrift call to create the services giving a few more robust features. -func (r *RealisClient) CreateService(auroraJob Job, settings *aurora.JobUpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error) { +func (r *RealisClient) CreateService(auroraJob *AuroraJob, settings *aurora.JobUpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error) { // Create a new job update object and ship it to the StartJobUpdate api update := NewUpdateJob(auroraJob.TaskConfig(), settings) update.InstanceCount(auroraJob.GetInstanceCount()) @@ -599,7 +600,7 @@ func (r *RealisClient) CreateService(auroraJob Job, settings *aurora.JobUpdateSe return nil, nil, errors.New("results object is nil") } -func (r *RealisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) { +func (r *RealisClient) ScheduleCronJob(auroraJob *AuroraJob) (*aurora.Response, error) { r.logger.DebugPrintf("ScheduleCronJob Thrift Payload: %+v\n", auroraJob.JobConfig()) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { @@ -607,7 +608,7 @@ func (r *RealisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "Error sending Cron Job Schedule message to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error sending Cron AuroraJob Schedule message to Aurora Scheduler") } return resp, nil } @@ -621,7 +622,7 @@ func (r *RealisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "Error sending Cron Job De-schedule message to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error sending Cron AuroraJob De-schedule message to Aurora Scheduler") } return resp, nil @@ -637,7 +638,7 @@ func (r *RealisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "Error sending Start Cron Job message to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error sending Start Cron AuroraJob message to Aurora Scheduler") } return resp, nil @@ -703,7 +704,7 @@ func (r *RealisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au return resp, nil } -// Abort Job Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI. +// Abort AuroraJob Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI. func (r *RealisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) { r.logger.DebugPrintf("AbortJobUpdate Thrift Payload: %+v %v\n", updateKey, message) @@ -718,7 +719,7 @@ func (r *RealisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str return resp, nil } -//Pause Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. +//Pause AuroraJob Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. func (r *RealisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) { r.logger.DebugPrintf("PauseJobUpdate Thrift Payload: %+v %v\n", updateKey, message) @@ -734,7 +735,7 @@ func (r *RealisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st return resp, nil } -//Resume Paused Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. +//Resume Paused AuroraJob Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. func (r *RealisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) { r.logger.DebugPrintf("ResumeJobUpdate Thrift Payload: %+v %v\n", updateKey, message) @@ -750,7 +751,7 @@ func (r *RealisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s return resp, nil } -//Pulse Job Update on Aurora. UpdateID is returned from StartJobUpdate or the Aurora web UI. +//Pulse AuroraJob Update on Aurora. UpdateID is returned from StartJobUpdate or the Aurora web UI. func (r *RealisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.Response, error) { r.logger.DebugPrintf("PulseJobUpdate Thrift Payload: %+v\n", updateKey) diff --git a/updatejob.go b/updatejob.go index 4ee4f14..9160694 100644 --- a/updatejob.go +++ b/updatejob.go @@ -20,8 +20,8 @@ import ( // Structure to collect all information required to create job update type UpdateJob struct { - Job // SetInstanceCount for job is hidden, access via full qualifier - req *aurora.JobUpdateRequest + *AuroraJob // SetInstanceCount for job is hidden, access via full qualifier + req *aurora.JobUpdateRequest } // Create a default UpdateJob object. @@ -31,7 +31,7 @@ func NewDefaultUpdateJob(config *aurora.TaskConfig) *UpdateJob { req.TaskConfig = config req.Settings = NewUpdateSettings() - job := NewJob().(*AuroraJob) + job := NewJob() job.jobConfig.TaskConfig = config // Rebuild resource map from TaskConfig @@ -62,7 +62,7 @@ func NewDefaultUpdateJob(config *aurora.TaskConfig) *UpdateJob { req.Settings.RollbackOnFailure = true //TODO(rdelvalle): Deep copy job struct to avoid unexpected behavior - return &UpdateJob{Job: job, req: req} + return &UpdateJob{AuroraJob: job, req: req} } func NewUpdateJob(config *aurora.TaskConfig, settings *aurora.JobUpdateSettings) *UpdateJob { @@ -71,7 +71,7 @@ func NewUpdateJob(config *aurora.TaskConfig, settings *aurora.JobUpdateSettings) req.TaskConfig = config req.Settings = settings - job := NewJob().(*AuroraJob) + job := NewJob() job.jobConfig.TaskConfig = config // Rebuild resource map from TaskConfig @@ -93,7 +93,7 @@ func NewUpdateJob(config *aurora.TaskConfig, settings *aurora.JobUpdateSettings) } //TODO(rdelvalle): Deep copy job struct to avoid unexpected behavior - return &UpdateJob{Job: job, req: req} + return &UpdateJob{AuroraJob: job, req: req} } // Set instance count the job will have after the update.