From 5836ede37bc0521a14554b7bc4e516e3d46f2e2b Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Mon, 10 Dec 2018 18:57:16 -0800 Subject: [PATCH] Splitting off Aurora task from Aurora Job since Update mechanism only needs task. --- job.go | 222 ++++++++++++++---------------------------------- jobupdate.go | 30 +++---- realis.go | 4 +- task.go | 234 +++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 312 insertions(+), 178 deletions(-) create mode 100644 task.go diff --git a/job.go b/job.go index 0c8ac04..8b1e93b 100644 --- a/job.go +++ b/job.go @@ -15,16 +15,13 @@ package realis import ( - "strconv" - "github.com/paypal/gorealis/gen-go/apache/aurora" ) // Structure to collect all information pertaining to an Aurora job. type AuroraJob struct { jobConfig *aurora.JobConfiguration - resources map[string]*aurora.Resource - portCount int + task *Task } // Create a AuroraJob object with everything initialized. @@ -33,45 +30,23 @@ func NewJob() *AuroraJob { jobKey := &aurora.JobKey{} // Task clientConfig - taskConfig := &aurora.TaskConfig{ - Job: jobKey, - MesosFetcherUris: make([]*aurora.MesosFetcherURI, 0), - Metadata: make([]*aurora.Metadata, 0), - Constraints: make([]*aurora.Constraint, 0), - // Container is a Union so one container field must be set. Set Mesos by default. - Container: NewMesosContainer().Build(), - } + task := NewTask() + task.task.Job = jobKey // AuroraJob clientConfig jobConfig := &aurora.JobConfiguration{ Key: jobKey, - TaskConfig: taskConfig, + TaskConfig: task.TaskConfig(), } - // Resources - numCpus := &aurora.Resource{} - ramMb := &aurora.Resource{} - diskMb := &aurora.Resource{} - - numCpus.NumCpus = new(float64) - ramMb.RamMb = new(int64) - diskMb.DiskMb = new(int64) - - resources := make(map[string]*aurora.Resource) - resources["cpu"] = numCpus - resources["ram"] = ramMb - resources["disk"] = diskMb - - taskConfig.Resources = []*aurora.Resource{numCpus, ramMb, diskMb} - return &AuroraJob{ jobConfig: jobConfig, - resources: resources, - portCount: 0, + task: task, } } -// Set AuroraJob Key environment. +// Set AuroraJob Key environment. Explicit changes to Task's job key are not needed +// because they share a pointer to the same JobKey. func (j *AuroraJob) Environment(env string) *AuroraJob { j.jobConfig.Key.Environment = env return j @@ -94,58 +69,6 @@ func (j *AuroraJob) Name(name string) *AuroraJob { return j } -// Set name of the executor that will the task will be configured to. -func (j *AuroraJob) ExecutorName(name string) *AuroraJob { - - if j.jobConfig.TaskConfig.ExecutorConfig == nil { - j.jobConfig.TaskConfig.ExecutorConfig = aurora.NewExecutorConfig() - } - - j.jobConfig.TaskConfig.ExecutorConfig.Name = name - return j -} - -// Will be included as part of entire task inside the scheduler that will be serialized. -func (j *AuroraJob) ExecutorData(data string) *AuroraJob { - - if j.jobConfig.TaskConfig.ExecutorConfig == nil { - j.jobConfig.TaskConfig.ExecutorConfig = aurora.NewExecutorConfig() - } - - j.jobConfig.TaskConfig.ExecutorConfig.Data = data - return j -} - -func (j *AuroraJob) CPU(cpus float64) *AuroraJob { - *j.resources["cpu"].NumCpus = cpus - - return j -} - -func (j *AuroraJob) RAM(ram int64) *AuroraJob { - *j.resources["ram"].RamMb = ram - - return j -} - -func (j *AuroraJob) Disk(disk int64) *AuroraJob { - *j.resources["disk"].DiskMb = disk - - return j -} - -func (j *AuroraJob) Tier(tier string) *AuroraJob { - *j.jobConfig.TaskConfig.Tier = tier - - return j -} - -// How many failures to tolerate before giving up. -func (j *AuroraJob) MaxFailure(maxFail int32) *AuroraJob { - j.jobConfig.TaskConfig.MaxTaskFailures = maxFail - return j -} - // How many instances of the job to run func (j *AuroraJob) InstanceCount(instCount int32) *AuroraJob { j.jobConfig.InstanceCount = instCount @@ -167,12 +90,6 @@ func (j *AuroraJob) GetInstanceCount() int32 { return j.jobConfig.InstanceCount } -// Restart the job's tasks if they fail -func (j *AuroraJob) IsService(isService bool) *AuroraJob { - j.jobConfig.TaskConfig.IsService = isService - return j -} - // Get the current job configurations key to use for some realis calls. func (j *AuroraJob) JobKey() *aurora.JobKey { return j.jobConfig.Key @@ -183,107 +100,90 @@ func (j *AuroraJob) JobConfig() *aurora.JobConfiguration { return j.jobConfig } +/* + Task specific API, see task.go for further documentation. + These functions are provided for the convenience of chaining API calls. +*/ + +func (j *AuroraJob) ExecutorName(name string) *AuroraJob { + j.task.ExecutorName(name) + return j +} + +func (j *AuroraJob) ExecutorData(data string) *AuroraJob { + j.task.ExecutorData(data) + return j +} + +func (j *AuroraJob) CPU(cpus float64) *AuroraJob { + j.task.CPU(cpus) + return j +} + +func (j *AuroraJob) RAM(ram int64) *AuroraJob { + j.task.RAM(ram) + return j +} + +func (j *AuroraJob) Disk(disk int64) *AuroraJob { + j.task.Disk(disk) + return j +} + +func (j *AuroraJob) Tier(tier string) *AuroraJob { + j.task.Tier(tier) + return j +} + +func (j *AuroraJob) MaxFailure(maxFail int32) *AuroraJob { + j.task.MaxFailure(maxFail) + return j +} + +func (j *AuroraJob) IsService(isService bool) *AuroraJob { + j.task.IsService(isService) + return j +} + func (j *AuroraJob) TaskConfig() *aurora.TaskConfig { - return j.jobConfig.TaskConfig + return j.task.TaskConfig() } -// Add a list of URIs with the same extract and cache configuration. Scheduler must have -// --enable_mesos_fetcher flag enabled. Currently there is no duplicate detection. func (j *AuroraJob) AddURIs(extract bool, cache bool, values ...string) *AuroraJob { - for _, value := range values { - j.jobConfig.TaskConfig.MesosFetcherUris = append( - j.jobConfig.TaskConfig.MesosFetcherUris, - &aurora.MesosFetcherURI{Value: value, Extract: &extract, Cache: &cache}) - } + j.task.AddURIs(extract, cache, values...) return j } -// Adds a Mesos label to the job. Note that Aurora will add the -// prefix "org.apache.aurora.metadata." to the beginning of each key. func (j *AuroraJob) AddLabel(key string, value string) *AuroraJob { - j.jobConfig.TaskConfig.Metadata = append(j.jobConfig.TaskConfig.Metadata, &aurora.Metadata{Key: key, Value: value}) + j.task.AddLabel(key, value) return j } -// Add a named port to the job configuration These are random ports as it's -// not currently possible to request specific ports using Aurora. func (j *AuroraJob) AddNamedPorts(names ...string) *AuroraJob { - j.portCount += len(names) - for _, name := range names { - j.jobConfig.TaskConfig.Resources = append(j.jobConfig.TaskConfig.Resources, &aurora.Resource{NamedPort: &name}) - } - + j.task.AddNamedPorts(names...) return j } -// Adds a request for a number of ports to the job configuration. The names chosen for these ports -// will be org.apache.aurora.port.X, where X is the current port count for the job configuration -// starting at 0. These are random ports as it's not currently possible to request -// specific ports using Aurora. func (j *AuroraJob) AddPorts(num int) *AuroraJob { - start := j.portCount - j.portCount += num - for i := start; i < j.portCount; i++ { - portName := "org.apache.aurora.port." + strconv.Itoa(i) - j.jobConfig.TaskConfig.Resources = append(j.jobConfig.TaskConfig.Resources, &aurora.Resource{NamedPort: &portName}) - } - + j.task.AddPorts(num) return j } - -// From Aurora Docs: -// Add a Value constraint -// name - Mesos slave attribute that the constraint is matched against. -// If negated = true , treat this as a 'not' - to avoid specific values. -// Values - list of values we look for in attribute name func (j *AuroraJob) AddValueConstraint(name string, negated bool, values ...string) *AuroraJob { - j.jobConfig.TaskConfig.Constraints = append(j.jobConfig.TaskConfig.Constraints, - &aurora.Constraint{ - Name: name, - Constraint: &aurora.TaskConstraint{ - Value: &aurora.ValueConstraint{ - Negated: negated, - Values: values, - }, - Limit: nil, - }, - }) - + j.task.AddValueConstraint(name, negated, values...) return j } -// From Aurora Docs: -// A constraint that specifies the maximum number of active tasks on a host with -// a matching attribute that may be scheduled simultaneously. func (j *AuroraJob) AddLimitConstraint(name string, limit int32) *AuroraJob { - j.jobConfig.TaskConfig.Constraints = append(j.jobConfig.TaskConfig.Constraints, - &aurora.Constraint{ - Name: name, - Constraint: &aurora.TaskConstraint{ - Value: nil, - Limit: &aurora.LimitConstraint{Limit: limit}, - }, - }) - + j.task.AddLimitConstraint(name, limit) return j } -// From Aurora Docs: -// dedicated attribute. Aurora treats this specially, and only allows matching jobs -// to run on these machines, and will only schedule matching jobs on these machines. -// When a job is created, the scheduler requires that the $role component matches -// the role field in the job configuration, and will reject the job creation otherwise. -// A wildcard (*) may be used for the role portion of the dedicated attribute, which -// will allow any owner to elect for a job to run on the host(s) func (j *AuroraJob) AddDedicatedConstraint(role, name string) *AuroraJob { - j.AddValueConstraint("dedicated", false, role+"/"+name) - + j.task.AddDedicatedConstraint(role, name) return j } -// Set a container to run for the job configuration to run. func (j *AuroraJob) Container(container Container) *AuroraJob { - j.jobConfig.TaskConfig.Container = container.Build() - + j.task.Container(container) return j } diff --git a/jobupdate.go b/jobupdate.go index 68efbd3..7c57c9b 100644 --- a/jobupdate.go +++ b/jobupdate.go @@ -20,31 +20,31 @@ import ( // Structure to collect all information required to create job update type JobUpdate struct { - Job *AuroraJob + Task *Task request *aurora.JobUpdateRequest } // Create a default JobUpdate object. -func NewDefaultJobUpdate(job *AuroraJob) *JobUpdate { +func NewDefaultJobUpdate(task *Task) *JobUpdate { req := aurora.JobUpdateRequest{} - req.TaskConfig = job.jobConfig.TaskConfig + req.TaskConfig = task.task req.Settings = NewUpdateSettings() // Rebuild resource map from TaskConfig - for _, ptr := range job.jobConfig.TaskConfig.Resources { + for _, ptr := range task.task.Resources { if ptr.NumCpus != nil { - job.resources["cpu"].NumCpus = ptr.NumCpus + task.resources["cpu"].NumCpus = ptr.NumCpus continue // Guard against Union violations that Go won't enforce } if ptr.RamMb != nil { - job.resources["ram"].RamMb = ptr.RamMb + task.resources["ram"].RamMb = ptr.RamMb continue } if ptr.DiskMb != nil { - job.resources["disk"].DiskMb = ptr.DiskMb + task.resources["disk"].DiskMb = ptr.DiskMb continue } } @@ -58,8 +58,8 @@ func NewDefaultJobUpdate(job *AuroraJob) *JobUpdate { req.Settings.MaxFailedInstances = 0 req.Settings.RollbackOnFailure = true - //TODO(rdelvalle): Deep copy job struct to avoid unexpected behavior - return &JobUpdate{Job: job, request: &req} + //TODO(rdelvalle): Deep copy task struct to avoid unexpected behavior + return &JobUpdate{Task: task, request: &req} } func NewUpdateJob(config *aurora.TaskConfig, settings *aurora.JobUpdateSettings) *JobUpdate { @@ -68,29 +68,29 @@ func NewUpdateJob(config *aurora.TaskConfig, settings *aurora.JobUpdateSettings) req.TaskConfig = config req.Settings = settings - job := NewJob() - job.jobConfig.TaskConfig = config + task := NewTask() + task.task = config // Rebuild resource map from TaskConfig for _, ptr := range config.Resources { if ptr.NumCpus != nil { - job.resources["cpu"].NumCpus = ptr.NumCpus + task.resources["cpu"].NumCpus = ptr.NumCpus continue // Guard against Union violations that Go won't enforce } if ptr.RamMb != nil { - job.resources["ram"].RamMb = ptr.RamMb + task.resources["ram"].RamMb = ptr.RamMb continue } if ptr.DiskMb != nil { - job.resources["disk"].DiskMb = ptr.DiskMb + task.resources["disk"].DiskMb = ptr.DiskMb continue } } //TODO(rdelvalle): Deep copy job struct to avoid unexpected behavior - return &JobUpdate{Job: job, request: req} + return &JobUpdate{Task: task, request: req} } // Set instance count the job will have after the update. diff --git a/realis.go b/realis.go index 3a56311..75c66da 100644 --- a/realis.go +++ b/realis.go @@ -677,12 +677,12 @@ func (c *Client) RestartJob(key *aurora.JobKey) error { } // Update all tasks under a job configuration. Currently gorealis doesn't support for canary deployments. -func (c *Client) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.StartJobUpdateResult_, error) { +func (c *Client) StartJobUpdate(updateJob *JobUpdate, message string) (*aurora.StartJobUpdateResult_, error) { c.logger.DebugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message) resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { - return c.client.StartJobUpdate(nil, updateJob.req, message) + return c.client.StartJobUpdate(nil, updateJob.request, message) }) if retryErr != nil { diff --git a/task.go b/task.go new file mode 100644 index 0000000..28efd22 --- /dev/null +++ b/task.go @@ -0,0 +1,234 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package realis + +import ( + "strconv" + + "github.com/paypal/gorealis/gen-go/apache/aurora" +) + +type Task struct { + task *aurora.TaskConfig + resources map[string]*aurora.Resource + portCount int +} + +func NewTask() *Task { + numCpus := &aurora.Resource{} + ramMb := &aurora.Resource{} + diskMb := &aurora.Resource{} + + numCpus.NumCpus = new(float64) + ramMb.RamMb = new(int64) + diskMb.DiskMb = new(int64) + + resources := make(map[string]*aurora.Resource) + resources["cpu"] = numCpus + resources["ram"] = ramMb + resources["disk"] = diskMb + + return &Task{task: &aurora.TaskConfig{ + Job: &aurora.JobKey{}, + MesosFetcherUris: make([]*aurora.MesosFetcherURI, 0), + Metadata: make([]*aurora.Metadata, 0), + Constraints: make([]*aurora.Constraint, 0), + // Container is a Union so one container field must be set. Set Mesos by default. + Container: NewMesosContainer().Build(), + Resources: []*aurora.Resource{numCpus, ramMb, diskMb}, + }, + resources: resources, + portCount: 0} +} + +// Set Task Key environment. +func (t *Task) Environment(env string) *Task { + t.task.Job.Environment = env + return t +} + +// Set Task Key Role. +func (t *Task) Role(role string) *Task { + t.task.Job.Role = role + return t +} + +// Set Task Key Name. +func (t *Task) Name(name string) *Task { + t.task.Job.Name = name + return t +} + +// Set name of the executor that will the task will be configured to. +func (t *Task) ExecutorName(name string) *Task { + + if t.task.ExecutorConfig == nil { + t.task.ExecutorConfig = aurora.NewExecutorConfig() + } + + t.task.ExecutorConfig.Name = name + return t +} + +// Will be included as part of entire task inside the scheduler that will be serialized. +func (t *Task) ExecutorData(data string) *Task { + + if t.task.ExecutorConfig == nil { + t.task.ExecutorConfig = aurora.NewExecutorConfig() + } + + t.task.ExecutorConfig.Data = data + return t +} + +func (t *Task) CPU(cpus float64) *Task { + *t.resources["cpu"].NumCpus = cpus + + return t +} + +func (t *Task) RAM(ram int64) *Task { + *t.resources["ram"].RamMb = ram + + return t +} + +func (t *Task) Disk(disk int64) *Task { + *t.resources["disk"].DiskMb = disk + + return t +} + +func (t *Task) Tier(tier string) *Task { + *t.task.Tier = tier + + return t +} + +// How many failures to tolerate before giving up. +func (t *Task) MaxFailure(maxFail int32) *Task { + t.task.MaxTaskFailures = maxFail + return t +} + +// Restart the job's tasks if they fail +func (t *Task) IsService(isService bool) *Task { + t.task.IsService = isService + return t +} + +// Add a list of URIs with the same extract and cache configuration. Scheduler must have +// --enable_mesos_fetcher flag enabled. Currently there is no duplicate detection. +func (t *Task) AddURIs(extract bool, cache bool, values ...string) *Task { + for _, value := range values { + t.task.MesosFetcherUris = append( + t.task.MesosFetcherUris, + &aurora.MesosFetcherURI{Value: value, Extract: &extract, Cache: &cache}) + } + return t +} + +// Adds a Mesos label to the job. Note that Aurora will add the +// prefix "org.apache.aurora.metadata." to the beginning of each key. +func (t *Task) AddLabel(key string, value string) *Task { + t.task.Metadata = append(t.task.Metadata, &aurora.Metadata{Key: key, Value: value}) + return t +} + +// Add a named port to the job configuration These are random ports as it's +// not currently possible to request specific ports using Aurora. +func (t *Task) AddNamedPorts(names ...string) *Task { + t.portCount += len(names) + for _, name := range names { + t.task.Resources = append(t.task.Resources, &aurora.Resource{NamedPort: &name}) + } + + return t +} + +// Adds a request for a number of ports to the job configuration. The names chosen for these ports +// will be org.apache.aurora.port.X, where X is the current port count for the job configuration +// starting at 0. These are random ports as it's not currently possible to request +// specific ports using Aurora. +func (t *Task) AddPorts(num int) *Task { + start := t.portCount + t.portCount += num + for i := start; i < t.portCount; i++ { + portName := "org.apache.aurora.port." + strconv.Itoa(i) + t.task.Resources = append(t.task.Resources, &aurora.Resource{NamedPort: &portName}) + } + + return t +} + +// From Aurora Docs: +// Add a Value constraint +// name - Mesos slave attribute that the constraint is matched against. +// If negated = true , treat this as a 'not' - to avoid specific values. +// Values - list of values we look for in attribute name +func (t *Task) AddValueConstraint(name string, negated bool, values ...string) *Task { + t.task.Constraints = append(t.task.Constraints, + &aurora.Constraint{ + Name: name, + Constraint: &aurora.TaskConstraint{ + Value: &aurora.ValueConstraint{ + Negated: negated, + Values: values, + }, + Limit: nil, + }, + }) + + return t +} + +// From Aurora Docs: +// A constraint that specifies the maximum number of active tasks on a host with +// a matching attribute that may be scheduled simultaneously. +func (t *Task) AddLimitConstraint(name string, limit int32) *Task { + t.task.Constraints = append(t.task.Constraints, + &aurora.Constraint{ + Name: name, + Constraint: &aurora.TaskConstraint{ + Value: nil, + Limit: &aurora.LimitConstraint{Limit: limit}, + }, + }) + + return t +} + +// From Aurora Docs: +// dedicated attribute. Aurora treats this specially, and only allows matching jobs +// to run on these machines, and will only schedule matching jobs on these machines. +// When a job is created, the scheduler requires that the $role component matches +// the role field in the job configuration, and will reject the job creation otherwise. +// A wildcard (*) may be used for the role portion of the dedicated attribute, which +// will allow any owner to elect for a job to run on the host(s) +func (t *Task) AddDedicatedConstraint(role, name string) *Task { + t.AddValueConstraint("dedicated", false, role+"/"+name) + + return t +} + +// Set a container to run for the job configuration to run. +func (t *Task) Container(container Container) *Task { + t.task.Container = container.Build() + + return t +} +func (t *Task) TaskConfig() *aurora.TaskConfig { + return t.task +}