From c553f67d4e7d7ee9761d4aef70811eae30d09e38 Mon Sep 17 00:00:00 2001 From: Robert Allen Date: Thu, 20 Dec 2018 16:38:06 -0600 Subject: [PATCH] Adding support for PartitionPolicy. --- job.go | 8 ++++++++ jobUpdate.go | 8 ++++++++ realis_e2e_test.go | 34 ++++++++++++++++++++++++++++++++++ task.go | 30 +++++++++++++++++++++++++++--- 4 files changed, 77 insertions(+), 3 deletions(-) diff --git a/job.go b/job.go index b7a3c52..800f98f 100644 --- a/job.go +++ b/job.go @@ -202,3 +202,11 @@ func (j *AuroraJob) ThermosExecutor(thermos ThermosExecutor) *AuroraJob { func (j *AuroraJob) BuildThermosPayload() error { return j.task.BuildThermosPayload() } + +func (j *AuroraJob) PartitionPolicy(reschedule bool, delay int64) *AuroraJob { + j.task.PartitionPolicy(aurora.PartitionPolicy{ + Reschedule: reschedule, + DelaySecs: &delay, + }) + return j +} diff --git a/jobUpdate.go b/jobUpdate.go index c93a788..8176da1 100644 --- a/jobUpdate.go +++ b/jobUpdate.go @@ -239,3 +239,11 @@ func (j *JobUpdate) ThermosExecutor(thermos ThermosExecutor) *JobUpdate { func (j *JobUpdate) BuildThermosPayload() error { return j.task.BuildThermosPayload() } + +func (j *JobUpdate) PartitionPolicy(reschedule bool, delay int64) *JobUpdate { + j.task.PartitionPolicy(aurora.PartitionPolicy{ + Reschedule: reschedule, + DelaySecs: &delay, + }) + return j +} diff --git a/realis_e2e_test.go b/realis_e2e_test.go index f58d426..79fe25c 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -685,3 +685,37 @@ func TestRealisClient_ForceExplicitTaskReconciliation(t *testing.T) { err = r.ForceExplicitTaskReconciliation(&batchSize) assert.NoError(t, err) } + +func TestRealisClient_PartitionPolicy(t *testing.T) { + + role := "vagrant" + var partitionDelay int64 = 30 + job := realis.NewJobUpdate(). + Environment("prod"). + Role(role). + Name("create_thermos_job_partition_policy_test"). + ExecutorName(aurora.AURORA_EXECUTOR_NAME). + ThermosExecutor(thermosExec). + CPU(.5). + RAM(64). + Disk(100). + IsService(true). + InstanceCount(2). + BatchSize(2). + PartitionPolicy(true, partitionDelay) + + result, err := r.CreateService(job) + assert.NoError(t, err) + + var ok bool + var mErr error + + if ok, mErr = r.JobUpdateMonitor(*result.GetKey(), 5*time.Second, 4*time.Minute); !ok || mErr != nil { + // Update may already be in a terminal state so don't check for error + err := r.AbortJobUpdate(*result.GetKey(), "Monitor timed out.") + err = r.KillJob(job.JobKey()) + + assert.NoError(t, err) + } + +} diff --git a/task.go b/task.go index 139fa31..d4c2b7b 100644 --- a/task.go +++ b/task.go @@ -90,6 +90,14 @@ func TaskFromThrift(config *aurora.TaskConfig) *AuroraTask { ExecutorData(config.ExecutorConfig.Data) } + if config.PartitionPolicy != nil { + newTask.PartitionPolicy( + aurora.PartitionPolicy{ + Reschedule: config.PartitionPolicy.Reschedule, + DelaySecs: thrift.Int64Ptr(*config.PartitionPolicy.DelaySecs), + }) + } + // Make a deep copy of the task's container if config.Container != nil { if config.Container.Mesos != nil { @@ -125,7 +133,10 @@ func TaskFromThrift(config *aurora.TaskConfig) *AuroraTask { // Copy only ports. Set CPU, RAM, DISK, and GPU if resource != nil { if resource.NamedPort != nil { - newTask.task.Resources = append(newTask.task.Resources, &aurora.Resource{NamedPort: thrift.StringPtr(*resource.NamedPort)}) + newTask.task.Resources = append( + newTask.task.Resources, + &aurora.Resource{NamedPort: thrift.StringPtr(*resource.NamedPort)}, + ) newTask.portCount++ } @@ -155,7 +166,9 @@ func TaskFromThrift(config *aurora.TaskConfig) *AuroraTask { taskConstraint := constraint.Constraint if taskConstraint.Limit != nil { - newConstraint.Constraint = &aurora.TaskConstraint{Limit: &aurora.LimitConstraint{Limit: taskConstraint.Limit.Limit}} + newConstraint.Constraint = &aurora.TaskConstraint{ + Limit: &aurora.LimitConstraint{Limit: taskConstraint.Limit.Limit}, + } newTask.task.Constraints = append(newTask.task.Constraints, &newConstraint) } else if taskConstraint.Value != nil { @@ -182,7 +195,11 @@ func TaskFromThrift(config *aurora.TaskConfig) *AuroraTask { for _, uri := range config.MesosFetcherUris { newTask.task.MesosFetcherUris = append( newTask.task.MesosFetcherUris, - &aurora.MesosFetcherURI{Value: uri.Value, Extract: thrift.BoolPtr(*uri.Extract), Cache: thrift.BoolPtr(*uri.Cache)}) + &aurora.MesosFetcherURI{ + Value: uri.Value, + Extract: thrift.BoolPtr(*uri.Extract), + Cache: thrift.BoolPtr(*uri.Cache), + }) } return newTask @@ -423,3 +440,10 @@ func (t *AuroraTask) BuildThermosPayload() error { } return nil } + +// Set a partition policy for the job configuration to implement. +func (t *AuroraTask) PartitionPolicy(policy aurora.PartitionPolicy) *AuroraTask { + t.task.PartitionPolicy = &policy + + return t +}