diff --git a/CHANGELOG.md b/CHANGELOG.md index 551bae9..a65b6fe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,8 @@ -1.22.0 (unreleased) +1.22.1 (unreleased) + +* Adding safeguards against setting multiple constraints with the same name for a single task. + +1.22.0 * CreateService and StartJobUpdate do not continue retrying if a timeout has been encountered by the HTTP client. Instead they now return an error that conforms to the Timedout interface. diff --git a/docker-compose.yml b/docker-compose.yml index 4c45490..932053d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -38,6 +38,7 @@ services: ports: - "5051:5051" environment: + MESOS_ATTRIBUTES: 'zone:west' MESOS_MASTER: zk://192.168.33.2:2181/mesos MESOS_CONTAINERIZERS: docker,mesos MESOS_PORT: 5051 @@ -62,6 +63,7 @@ services: ports: - "5061:5061" environment: + MESOS_ATTRIBUTES: 'zone:east' MESOS_MASTER: zk://192.168.33.2:2181/mesos MESOS_CONTAINERIZERS: docker,mesos MESOS_HOSTNAME: localhost diff --git a/job.go b/job.go index b614f95..1d85f33 100644 --- a/job.go +++ b/job.go @@ -73,12 +73,15 @@ const ( GPU ) +const portNamePrefix = "org.apache.aurora.port." + // AuroraJob is a structure to collect all information pertaining to an Aurora job. type AuroraJob struct { - jobConfig *aurora.JobConfiguration - resources map[resourceType]*aurora.Resource - metadata map[string]*aurora.Metadata - portCount int + jobConfig *aurora.JobConfiguration + resources map[resourceType]*aurora.Resource + metadata map[string]*aurora.Metadata + constraints map[string]*aurora.Constraint + portCount int } // NewJob is used to create a Job object with everything initialized. @@ -109,10 +112,11 @@ func NewJob() Job { diskMb.DiskMb = new(int64) return &AuroraJob{ - jobConfig: jobConfig, - resources: resources, - metadata: make(map[string]*aurora.Metadata), - portCount: 0, + jobConfig: jobConfig, + resources: resources, + metadata: make(map[string]*aurora.Metadata), + constraints: make(map[string]*aurora.Constraint), + portCount: 0, } } @@ -258,12 +262,12 @@ func (j *AuroraJob) AddURIs(extract bool, cache bool, values ...string) Job { // AddLabel adds a Mesos label to the job. Note that Aurora will add the // prefix "org.apache.aurora.metadata." to the beginning of each key. func (j *AuroraJob) AddLabel(key string, value string) Job { - if _, ok := j.metadata[key]; ok { - j.metadata[key].Value = value - } else { - j.metadata[key] = &aurora.Metadata{Key: key, Value: value} + if _, ok := j.metadata[key]; !ok { + j.metadata[key] = &aurora.Metadata{Key: key} j.jobConfig.TaskConfig.Metadata = append(j.jobConfig.TaskConfig.Metadata, j.metadata[key]) } + + j.metadata[key].Value = value return j } @@ -288,7 +292,7 @@ func (j *AuroraJob) AddPorts(num int) Job { start := j.portCount j.portCount += num for i := start; i < j.portCount; i++ { - portName := "org.apache.aurora.port." + strconv.Itoa(i) + portName := portNamePrefix + strconv.Itoa(i) j.jobConfig.TaskConfig.Resources = append( j.jobConfig.TaskConfig.Resources, &aurora.Resource{NamedPort: &portName}) @@ -297,47 +301,56 @@ func (j *AuroraJob) AddPorts(num int) Job { return j } -// AddValueConstraint allows the user to add a value constrain to the job to limiti which agents the job's -// tasks can be run on. +// AddValueConstraint allows the user to add a value constrain to the job to limit which agents the job's +// tasks can be run on. If the name matches a constraint that was previously set, the previous value will be +// overwritten. In case the previous constraint attached to the name was of type limit, the constraint will be clobbered +// by this new Value constraint. // From Aurora Docs: // Add a Value constraint // name - Mesos slave attribute that the constraint is matched against. // If negated = true , treat this as a 'not' - to avoid specific values. // Values - list of values we look for in attribute name func (j *AuroraJob) AddValueConstraint(name string, negated bool, values ...string) Job { - j.jobConfig.TaskConfig.Constraints = append(j.jobConfig.TaskConfig.Constraints, - &aurora.Constraint{ - Name: name, - Constraint: &aurora.TaskConstraint{ - Value: &aurora.ValueConstraint{ - Negated: negated, - Values: values, - }, - Limit: nil, - }, - }) + if _, ok := j.constraints[name]; !ok { + j.constraints[name] = &aurora.Constraint{Name: name} + j.jobConfig.TaskConfig.Constraints = append(j.jobConfig.TaskConfig.Constraints, j.constraints[name]) + } + + j.constraints[name].Constraint = &aurora.TaskConstraint{ + Value: &aurora.ValueConstraint{ + Negated: negated, + Values: values, + }, + Limit: nil, + } return j } // AddLimitConstraint allows the user to limit how many tasks form the same Job are run on a single host. +// If the name matches a constraint that was previously set, the previous value will be +// overwritten. In case the previous constraint attached to the name was of type Value, the constraint will be clobbered +// by this new Limit constraint. // From Aurora Docs: // A constraint that specifies the maximum number of active tasks on a host with // a matching attribute that may be scheduled simultaneously. func (j *AuroraJob) AddLimitConstraint(name string, limit int32) Job { - j.jobConfig.TaskConfig.Constraints = append(j.jobConfig.TaskConfig.Constraints, - &aurora.Constraint{ - Name: name, - Constraint: &aurora.TaskConstraint{ - Value: nil, - Limit: &aurora.LimitConstraint{Limit: limit}, - }, - }) + if _, ok := j.constraints[name]; !ok { + j.constraints[name] = &aurora.Constraint{Name: name} + j.jobConfig.TaskConfig.Constraints = append(j.jobConfig.TaskConfig.Constraints, j.constraints[name]) + } + + j.constraints[name].Constraint = &aurora.TaskConstraint{ + Value: nil, + Limit: &aurora.LimitConstraint{Limit: limit}, + } return j } -// AddDedicatedConstraint allows the user to add a dedicated constraint to a Job configuration. +// AddDedicatedConstraint is a convenience function that allows the user to +// add a dedicated constraint to a Job configuration. +// In case a previous dedicated constraint was set, it will be clobbered by this new value. func (j *AuroraJob) AddDedicatedConstraint(role, name string) Job { j.AddValueConstraint("dedicated", false, role+"/"+name) diff --git a/realis_e2e_test.go b/realis_e2e_test.go index 999f2b9..27e8b03 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -306,6 +306,40 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) { _, err = r.KillJob(job.JobKey()) assert.NoError(t, err) }) + + t.Run("Duplicate_constraints", func(t *testing.T) { + job.Name("thermos_duplicate_constraints"). + AddValueConstraint("zone", false, "east", "west"). + AddValueConstraint("zone", false, "east"). + AddValueConstraint("zone", true, "west") + + _, err := r.CreateJob(job) + require.NoError(t, err) + + success, err := monitor.Instances(job.JobKey(), 2, 1, 50) + assert.True(t, success) + assert.NoError(t, err) + + _, err = r.KillJob(job.JobKey()) + assert.NoError(t, err) + }) + + t.Run("Overwrite_constraints", func(t *testing.T) { + job.Name("thermos_overwrite_constraints"). + AddLimitConstraint("zone", 1). + AddValueConstraint("zone", true, "west", "east"). + AddLimitConstraint("zone", 2) + + _, err := r.CreateJob(job) + require.NoError(t, err) + + success, err := monitor.Instances(job.JobKey(), 2, 1, 50) + assert.True(t, success) + assert.NoError(t, err) + + _, err = r.KillJob(job.JobKey()) + assert.NoError(t, err) + }) } // Test configuring an executor that doesn't exist for CreateJob API @@ -505,7 +539,7 @@ func TestRealisClient_CreateService(t *testing.T) { timeoutClient, err := realis.NewRealisClient( realis.SchedulerUrl(auroraURL), realis.BasicAuth("aurora", "secret"), - realis.TimeoutMS(10), + realis.TimeoutMS(5), ) require.NoError(t, err) defer timeoutClient.Close()