Adding support for PartitionPolicy.

This commit is contained in:
Robert Allen 2018-12-20 16:38:06 -06:00 committed by Renan DelValle
parent 461b23400c
commit c553f67d4e
4 changed files with 77 additions and 3 deletions

8
job.go
View file

@ -202,3 +202,11 @@ func (j *AuroraJob) ThermosExecutor(thermos ThermosExecutor) *AuroraJob {
func (j *AuroraJob) BuildThermosPayload() error {
return j.task.BuildThermosPayload()
}
func (j *AuroraJob) PartitionPolicy(reschedule bool, delay int64) *AuroraJob {
j.task.PartitionPolicy(aurora.PartitionPolicy{
Reschedule: reschedule,
DelaySecs: &delay,
})
return j
}

View file

@ -239,3 +239,11 @@ func (j *JobUpdate) ThermosExecutor(thermos ThermosExecutor) *JobUpdate {
func (j *JobUpdate) BuildThermosPayload() error {
return j.task.BuildThermosPayload()
}
func (j *JobUpdate) PartitionPolicy(reschedule bool, delay int64) *JobUpdate {
j.task.PartitionPolicy(aurora.PartitionPolicy{
Reschedule: reschedule,
DelaySecs: &delay,
})
return j
}

View file

@ -685,3 +685,37 @@ func TestRealisClient_ForceExplicitTaskReconciliation(t *testing.T) {
err = r.ForceExplicitTaskReconciliation(&batchSize)
assert.NoError(t, err)
}
func TestRealisClient_PartitionPolicy(t *testing.T) {
role := "vagrant"
var partitionDelay int64 = 30
job := realis.NewJobUpdate().
Environment("prod").
Role(role).
Name("create_thermos_job_partition_policy_test").
ExecutorName(aurora.AURORA_EXECUTOR_NAME).
ThermosExecutor(thermosExec).
CPU(.5).
RAM(64).
Disk(100).
IsService(true).
InstanceCount(2).
BatchSize(2).
PartitionPolicy(true, partitionDelay)
result, err := r.CreateService(job)
assert.NoError(t, err)
var ok bool
var mErr error
if ok, mErr = r.JobUpdateMonitor(*result.GetKey(), 5*time.Second, 4*time.Minute); !ok || mErr != nil {
// Update may already be in a terminal state so don't check for error
err := r.AbortJobUpdate(*result.GetKey(), "Monitor timed out.")
err = r.KillJob(job.JobKey())
assert.NoError(t, err)
}
}

30
task.go
View file

@ -90,6 +90,14 @@ func TaskFromThrift(config *aurora.TaskConfig) *AuroraTask {
ExecutorData(config.ExecutorConfig.Data)
}
if config.PartitionPolicy != nil {
newTask.PartitionPolicy(
aurora.PartitionPolicy{
Reschedule: config.PartitionPolicy.Reschedule,
DelaySecs: thrift.Int64Ptr(*config.PartitionPolicy.DelaySecs),
})
}
// Make a deep copy of the task's container
if config.Container != nil {
if config.Container.Mesos != nil {
@ -125,7 +133,10 @@ func TaskFromThrift(config *aurora.TaskConfig) *AuroraTask {
// Copy only ports. Set CPU, RAM, DISK, and GPU
if resource != nil {
if resource.NamedPort != nil {
newTask.task.Resources = append(newTask.task.Resources, &aurora.Resource{NamedPort: thrift.StringPtr(*resource.NamedPort)})
newTask.task.Resources = append(
newTask.task.Resources,
&aurora.Resource{NamedPort: thrift.StringPtr(*resource.NamedPort)},
)
newTask.portCount++
}
@ -155,7 +166,9 @@ func TaskFromThrift(config *aurora.TaskConfig) *AuroraTask {
taskConstraint := constraint.Constraint
if taskConstraint.Limit != nil {
newConstraint.Constraint = &aurora.TaskConstraint{Limit: &aurora.LimitConstraint{Limit: taskConstraint.Limit.Limit}}
newConstraint.Constraint = &aurora.TaskConstraint{
Limit: &aurora.LimitConstraint{Limit: taskConstraint.Limit.Limit},
}
newTask.task.Constraints = append(newTask.task.Constraints, &newConstraint)
} else if taskConstraint.Value != nil {
@ -182,7 +195,11 @@ func TaskFromThrift(config *aurora.TaskConfig) *AuroraTask {
for _, uri := range config.MesosFetcherUris {
newTask.task.MesosFetcherUris = append(
newTask.task.MesosFetcherUris,
&aurora.MesosFetcherURI{Value: uri.Value, Extract: thrift.BoolPtr(*uri.Extract), Cache: thrift.BoolPtr(*uri.Cache)})
&aurora.MesosFetcherURI{
Value: uri.Value,
Extract: thrift.BoolPtr(*uri.Extract),
Cache: thrift.BoolPtr(*uri.Cache),
})
}
return newTask
@ -423,3 +440,10 @@ func (t *AuroraTask) BuildThermosPayload() error {
}
return nil
}
// Set a partition policy for the job configuration to implement.
func (t *AuroraTask) PartitionPolicy(policy aurora.PartitionPolicy) *AuroraTask {
t.task.PartitionPolicy = &policy
return t
}