Changing how constraints are handled internally (#115)

* Updating Changelog to reflect what's changing in 1.22.1

* Bug fix: Setting the same constraint multiple times is no longer allowed.

* Constraints map has been added to handle constraints being added to Aurora Jobs.

* Lowering timeout to avoid flaky test for bad payload timeout.

* Adding attributes to Mesos agents in order to test limits by constraint.



* Make two instances schedulable per zone in order to experience flaky behavior.
This commit is contained in:
Renan I. Del Valle 2020-01-15 08:21:12 -08:00 committed by GitHub
parent 9da3b96b1f
commit c6a2a23ddb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 90 additions and 37 deletions

View file

@ -1,4 +1,8 @@
1.22.0 (unreleased) 1.22.1 (unreleased)
* Adding safeguards against setting multiple constraints with the same name for a single task.
1.22.0
* CreateService and StartJobUpdate do not continue retrying if a timeout has been encountered * CreateService and StartJobUpdate do not continue retrying if a timeout has been encountered
by the HTTP client. Instead they now return an error that conforms to the Timedout interface. by the HTTP client. Instead they now return an error that conforms to the Timedout interface.

View file

@ -38,6 +38,7 @@ services:
ports: ports:
- "5051:5051" - "5051:5051"
environment: environment:
MESOS_ATTRIBUTES: 'zone:west'
MESOS_MASTER: zk://192.168.33.2:2181/mesos MESOS_MASTER: zk://192.168.33.2:2181/mesos
MESOS_CONTAINERIZERS: docker,mesos MESOS_CONTAINERIZERS: docker,mesos
MESOS_PORT: 5051 MESOS_PORT: 5051
@ -62,6 +63,7 @@ services:
ports: ports:
- "5061:5061" - "5061:5061"
environment: environment:
MESOS_ATTRIBUTES: 'zone:east'
MESOS_MASTER: zk://192.168.33.2:2181/mesos MESOS_MASTER: zk://192.168.33.2:2181/mesos
MESOS_CONTAINERIZERS: docker,mesos MESOS_CONTAINERIZERS: docker,mesos
MESOS_HOSTNAME: localhost MESOS_HOSTNAME: localhost

83
job.go
View file

@ -73,12 +73,15 @@ const (
GPU GPU
) )
const portNamePrefix = "org.apache.aurora.port."
// AuroraJob is a structure to collect all information pertaining to an Aurora job. // AuroraJob is a structure to collect all information pertaining to an Aurora job.
type AuroraJob struct { type AuroraJob struct {
jobConfig *aurora.JobConfiguration jobConfig *aurora.JobConfiguration
resources map[resourceType]*aurora.Resource resources map[resourceType]*aurora.Resource
metadata map[string]*aurora.Metadata metadata map[string]*aurora.Metadata
portCount int constraints map[string]*aurora.Constraint
portCount int
} }
// NewJob is used to create a Job object with everything initialized. // NewJob is used to create a Job object with everything initialized.
@ -109,10 +112,11 @@ func NewJob() Job {
diskMb.DiskMb = new(int64) diskMb.DiskMb = new(int64)
return &AuroraJob{ return &AuroraJob{
jobConfig: jobConfig, jobConfig: jobConfig,
resources: resources, resources: resources,
metadata: make(map[string]*aurora.Metadata), metadata: make(map[string]*aurora.Metadata),
portCount: 0, constraints: make(map[string]*aurora.Constraint),
portCount: 0,
} }
} }
@ -258,12 +262,12 @@ func (j *AuroraJob) AddURIs(extract bool, cache bool, values ...string) Job {
// AddLabel adds a Mesos label to the job. Note that Aurora will add the // AddLabel adds a Mesos label to the job. Note that Aurora will add the
// prefix "org.apache.aurora.metadata." to the beginning of each key. // prefix "org.apache.aurora.metadata." to the beginning of each key.
func (j *AuroraJob) AddLabel(key string, value string) Job { func (j *AuroraJob) AddLabel(key string, value string) Job {
if _, ok := j.metadata[key]; ok { if _, ok := j.metadata[key]; !ok {
j.metadata[key].Value = value j.metadata[key] = &aurora.Metadata{Key: key}
} else {
j.metadata[key] = &aurora.Metadata{Key: key, Value: value}
j.jobConfig.TaskConfig.Metadata = append(j.jobConfig.TaskConfig.Metadata, j.metadata[key]) j.jobConfig.TaskConfig.Metadata = append(j.jobConfig.TaskConfig.Metadata, j.metadata[key])
} }
j.metadata[key].Value = value
return j return j
} }
@ -288,7 +292,7 @@ func (j *AuroraJob) AddPorts(num int) Job {
start := j.portCount start := j.portCount
j.portCount += num j.portCount += num
for i := start; i < j.portCount; i++ { for i := start; i < j.portCount; i++ {
portName := "org.apache.aurora.port." + strconv.Itoa(i) portName := portNamePrefix + strconv.Itoa(i)
j.jobConfig.TaskConfig.Resources = append( j.jobConfig.TaskConfig.Resources = append(
j.jobConfig.TaskConfig.Resources, j.jobConfig.TaskConfig.Resources,
&aurora.Resource{NamedPort: &portName}) &aurora.Resource{NamedPort: &portName})
@ -297,47 +301,56 @@ func (j *AuroraJob) AddPorts(num int) Job {
return j return j
} }
// AddValueConstraint allows the user to add a value constrain to the job to limiti which agents the job's // AddValueConstraint allows the user to add a value constrain to the job to limit which agents the job's
// tasks can be run on. // tasks can be run on. If the name matches a constraint that was previously set, the previous value will be
// overwritten. In case the previous constraint attached to the name was of type limit, the constraint will be clobbered
// by this new Value constraint.
// From Aurora Docs: // From Aurora Docs:
// Add a Value constraint // Add a Value constraint
// name - Mesos slave attribute that the constraint is matched against. // name - Mesos slave attribute that the constraint is matched against.
// If negated = true , treat this as a 'not' - to avoid specific values. // If negated = true , treat this as a 'not' - to avoid specific values.
// Values - list of values we look for in attribute name // Values - list of values we look for in attribute name
func (j *AuroraJob) AddValueConstraint(name string, negated bool, values ...string) Job { func (j *AuroraJob) AddValueConstraint(name string, negated bool, values ...string) Job {
j.jobConfig.TaskConfig.Constraints = append(j.jobConfig.TaskConfig.Constraints, if _, ok := j.constraints[name]; !ok {
&aurora.Constraint{ j.constraints[name] = &aurora.Constraint{Name: name}
Name: name, j.jobConfig.TaskConfig.Constraints = append(j.jobConfig.TaskConfig.Constraints, j.constraints[name])
Constraint: &aurora.TaskConstraint{ }
Value: &aurora.ValueConstraint{
Negated: negated, j.constraints[name].Constraint = &aurora.TaskConstraint{
Values: values, Value: &aurora.ValueConstraint{
}, Negated: negated,
Limit: nil, Values: values,
}, },
}) Limit: nil,
}
return j return j
} }
// AddLimitConstraint allows the user to limit how many tasks form the same Job are run on a single host. // AddLimitConstraint allows the user to limit how many tasks form the same Job are run on a single host.
// If the name matches a constraint that was previously set, the previous value will be
// overwritten. In case the previous constraint attached to the name was of type Value, the constraint will be clobbered
// by this new Limit constraint.
// From Aurora Docs: // From Aurora Docs:
// A constraint that specifies the maximum number of active tasks on a host with // A constraint that specifies the maximum number of active tasks on a host with
// a matching attribute that may be scheduled simultaneously. // a matching attribute that may be scheduled simultaneously.
func (j *AuroraJob) AddLimitConstraint(name string, limit int32) Job { func (j *AuroraJob) AddLimitConstraint(name string, limit int32) Job {
j.jobConfig.TaskConfig.Constraints = append(j.jobConfig.TaskConfig.Constraints, if _, ok := j.constraints[name]; !ok {
&aurora.Constraint{ j.constraints[name] = &aurora.Constraint{Name: name}
Name: name, j.jobConfig.TaskConfig.Constraints = append(j.jobConfig.TaskConfig.Constraints, j.constraints[name])
Constraint: &aurora.TaskConstraint{ }
Value: nil,
Limit: &aurora.LimitConstraint{Limit: limit}, j.constraints[name].Constraint = &aurora.TaskConstraint{
}, Value: nil,
}) Limit: &aurora.LimitConstraint{Limit: limit},
}
return j return j
} }
// AddDedicatedConstraint allows the user to add a dedicated constraint to a Job configuration. // AddDedicatedConstraint is a convenience function that allows the user to
// add a dedicated constraint to a Job configuration.
// In case a previous dedicated constraint was set, it will be clobbered by this new value.
func (j *AuroraJob) AddDedicatedConstraint(role, name string) Job { func (j *AuroraJob) AddDedicatedConstraint(role, name string) Job {
j.AddValueConstraint("dedicated", false, role+"/"+name) j.AddValueConstraint("dedicated", false, role+"/"+name)

View file

@ -306,6 +306,40 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) {
_, err = r.KillJob(job.JobKey()) _, err = r.KillJob(job.JobKey())
assert.NoError(t, err) assert.NoError(t, err)
}) })
t.Run("Duplicate_constraints", func(t *testing.T) {
job.Name("thermos_duplicate_constraints").
AddValueConstraint("zone", false, "east", "west").
AddValueConstraint("zone", false, "east").
AddValueConstraint("zone", true, "west")
_, err := r.CreateJob(job)
require.NoError(t, err)
success, err := monitor.Instances(job.JobKey(), 2, 1, 50)
assert.True(t, success)
assert.NoError(t, err)
_, err = r.KillJob(job.JobKey())
assert.NoError(t, err)
})
t.Run("Overwrite_constraints", func(t *testing.T) {
job.Name("thermos_overwrite_constraints").
AddLimitConstraint("zone", 1).
AddValueConstraint("zone", true, "west", "east").
AddLimitConstraint("zone", 2)
_, err := r.CreateJob(job)
require.NoError(t, err)
success, err := monitor.Instances(job.JobKey(), 2, 1, 50)
assert.True(t, success)
assert.NoError(t, err)
_, err = r.KillJob(job.JobKey())
assert.NoError(t, err)
})
} }
// Test configuring an executor that doesn't exist for CreateJob API // Test configuring an executor that doesn't exist for CreateJob API
@ -505,7 +539,7 @@ func TestRealisClient_CreateService(t *testing.T) {
timeoutClient, err := realis.NewRealisClient( timeoutClient, err := realis.NewRealisClient(
realis.SchedulerUrl(auroraURL), realis.SchedulerUrl(auroraURL),
realis.BasicAuth("aurora", "secret"), realis.BasicAuth("aurora", "secret"),
realis.TimeoutMS(10), realis.TimeoutMS(5),
) )
require.NoError(t, err) require.NoError(t, err)
defer timeoutClient.Close() defer timeoutClient.Close()