Renamed Task to AuroraTask to avoid confusion with Mesos tasks. Added constants to access certain resources to avoid confusion and to ensure compile time safety.

This commit is contained in:
Renan DelValle 2018-12-11 16:51:10 -08:00
parent e00e0a0492
commit 98b4061513
No known key found for this signature in database
GPG key ID: C240AD6D6F443EC9
2 changed files with 161 additions and 49 deletions

8
job.go
View file

@ -21,7 +21,7 @@ import (
// Structure to collect all information pertaining to an Aurora job. // Structure to collect all information pertaining to an Aurora job.
type AuroraJob struct { type AuroraJob struct {
jobConfig *aurora.JobConfiguration jobConfig *aurora.JobConfiguration
task *Task task *AuroraTask
} }
// Create a AuroraJob object with everything initialized. // Create a AuroraJob object with everything initialized.
@ -29,7 +29,7 @@ func NewJob() *AuroraJob {
jobKey := &aurora.JobKey{} jobKey := &aurora.JobKey{}
// Task clientConfig // AuroraTask clientConfig
task := NewTask() task := NewTask()
task.task.Job = jobKey task.task.Job = jobKey
@ -45,7 +45,7 @@ func NewJob() *AuroraJob {
} }
} }
// Set AuroraJob Key environment. Explicit changes to Task's job key are not needed // Set AuroraJob Key environment. Explicit changes to AuroraTask's job key are not needed
// because they share a pointer to the same JobKey. // because they share a pointer to the same JobKey.
func (j *AuroraJob) Environment(env string) *AuroraJob { func (j *AuroraJob) Environment(env string) *AuroraJob {
j.jobConfig.Key.Environment = env j.jobConfig.Key.Environment = env
@ -101,7 +101,7 @@ func (j *AuroraJob) JobConfig() *aurora.JobConfiguration {
} }
/* /*
Task specific API, see task.go for further documentation. AuroraTask specific API, see task.go for further documentation.
These functions are provided for the convenience of chaining API calls. These functions are provided for the convenience of chaining API calls.
*/ */

202
task.go
View file

@ -17,16 +17,30 @@ package realis
import ( import (
"strconv" "strconv"
"git.apache.org/thrift.git/lib/go/thrift"
"github.com/paypal/gorealis/gen-go/apache/aurora" "github.com/paypal/gorealis/gen-go/apache/aurora"
) )
type Task struct { type ResourceType int
const (
CPU ResourceType = iota
RAM
DISK
)
const (
dedicated = "dedicated"
portPrefix = "org.apache.aurora.port."
)
type AuroraTask struct {
task *aurora.TaskConfig task *aurora.TaskConfig
resources map[string]*aurora.Resource resources map[ResourceType]*aurora.Resource
portCount int portCount int
} }
func NewTask() *Task { func NewTask() *AuroraTask {
numCpus := &aurora.Resource{} numCpus := &aurora.Resource{}
ramMb := &aurora.Resource{} ramMb := &aurora.Resource{}
diskMb := &aurora.Resource{} diskMb := &aurora.Resource{}
@ -35,12 +49,12 @@ func NewTask() *Task {
ramMb.RamMb = new(int64) ramMb.RamMb = new(int64)
diskMb.DiskMb = new(int64) diskMb.DiskMb = new(int64)
resources := make(map[string]*aurora.Resource) resources := make(map[ResourceType]*aurora.Resource)
resources["cpu"] = numCpus resources[CPU] = numCpus
resources["ram"] = ramMb resources[RAM] = ramMb
resources["disk"] = diskMb resources[DISK] = diskMb
return &Task{task: &aurora.TaskConfig{ return &AuroraTask{task: &aurora.TaskConfig{
Job: &aurora.JobKey{}, Job: &aurora.JobKey{},
MesosFetcherUris: make([]*aurora.MesosFetcherURI, 0), MesosFetcherUris: make([]*aurora.MesosFetcherURI, 0),
Metadata: make([]*aurora.Metadata, 0), Metadata: make([]*aurora.Metadata, 0),
@ -53,27 +67,26 @@ func NewTask() *Task {
portCount: 0} portCount: 0}
} }
// Set Task Key environment. // Set AuroraTask Key environment.
func (t *Task) Environment(env string) *Task { func (t *AuroraTask) Environment(env string) *AuroraTask {
t.task.Job.Environment = env t.task.Job.Environment = env
return t return t
} }
// Set Task Key Role. // Set AuroraTask Key Role.
func (t *Task) Role(role string) *Task { func (t *AuroraTask) Role(role string) *AuroraTask {
t.task.Job.Role = role t.task.Job.Role = role
return t return t
} }
// Set Task Key Name. // Set AuroraTask Key Name.
func (t *Task) Name(name string) *Task { func (t *AuroraTask) Name(name string) *AuroraTask {
t.task.Job.Name = name t.task.Job.Name = name
return t return t
} }
// Set name of the executor that will the task will be configured to. // Set name of the executor that will the task will be configured to.
func (t *Task) ExecutorName(name string) *Task { func (t *AuroraTask) ExecutorName(name string) *AuroraTask {
if t.task.ExecutorConfig == nil { if t.task.ExecutorConfig == nil {
t.task.ExecutorConfig = aurora.NewExecutorConfig() t.task.ExecutorConfig = aurora.NewExecutorConfig()
} }
@ -83,8 +96,7 @@ func (t *Task) ExecutorName(name string) *Task {
} }
// Will be included as part of entire task inside the scheduler that will be serialized. // Will be included as part of entire task inside the scheduler that will be serialized.
func (t *Task) ExecutorData(data string) *Task { func (t *AuroraTask) ExecutorData(data string) *AuroraTask {
if t.task.ExecutorConfig == nil { if t.task.ExecutorConfig == nil {
t.task.ExecutorConfig = aurora.NewExecutorConfig() t.task.ExecutorConfig = aurora.NewExecutorConfig()
} }
@ -93,45 +105,41 @@ func (t *Task) ExecutorData(data string) *Task {
return t return t
} }
func (t *Task) CPU(cpus float64) *Task { func (t *AuroraTask) CPU(cpus float64) *AuroraTask {
*t.resources["cpu"].NumCpus = cpus *t.resources[CPU].NumCpus = cpus
return t return t
} }
func (t *Task) RAM(ram int64) *Task { func (t *AuroraTask) RAM(ram int64) *AuroraTask {
*t.resources["ram"].RamMb = ram *t.resources[RAM].RamMb = ram
return t return t
} }
func (t *Task) Disk(disk int64) *Task { func (t *AuroraTask) Disk(disk int64) *AuroraTask {
*t.resources["disk"].DiskMb = disk *t.resources[DISK].DiskMb = disk
return t return t
} }
func (t *Task) Tier(tier string) *Task { func (t *AuroraTask) Tier(tier string) *AuroraTask {
*t.task.Tier = tier t.task.Tier = &tier
return t return t
} }
// How many failures to tolerate before giving up. // How many failures to tolerate before giving up.
func (t *Task) MaxFailure(maxFail int32) *Task { func (t *AuroraTask) MaxFailure(maxFail int32) *AuroraTask {
t.task.MaxTaskFailures = maxFail t.task.MaxTaskFailures = maxFail
return t return t
} }
// Restart the job's tasks if they fail // Restart the job's tasks if they fail
func (t *Task) IsService(isService bool) *Task { func (t *AuroraTask) IsService(isService bool) *AuroraTask {
t.task.IsService = isService t.task.IsService = isService
return t return t
} }
// Add a list of URIs with the same extract and cache configuration. Scheduler must have // Add a list of URIs with the same extract and cache configuration. Scheduler must have
// --enable_mesos_fetcher flag enabled. Currently there is no duplicate detection. // --enable_mesos_fetcher flag enabled. Currently there is no duplicate detection.
func (t *Task) AddURIs(extract bool, cache bool, values ...string) *Task { func (t *AuroraTask) AddURIs(extract bool, cache bool, values ...string) *AuroraTask {
for _, value := range values { for _, value := range values {
t.task.MesosFetcherUris = append( t.task.MesosFetcherUris = append(
t.task.MesosFetcherUris, t.task.MesosFetcherUris,
@ -142,14 +150,14 @@ func (t *Task) AddURIs(extract bool, cache bool, values ...string) *Task {
// Adds a Mesos label to the job. Note that Aurora will add the // Adds a Mesos label to the job. Note that Aurora will add the
// prefix "org.apache.aurora.metadata." to the beginning of each key. // prefix "org.apache.aurora.metadata." to the beginning of each key.
func (t *Task) AddLabel(key string, value string) *Task { func (t *AuroraTask) AddLabel(key string, value string) *AuroraTask {
t.task.Metadata = append(t.task.Metadata, &aurora.Metadata{Key: key, Value: value}) t.task.Metadata = append(t.task.Metadata, &aurora.Metadata{Key: key, Value: value})
return t return t
} }
// Add a named port to the job configuration These are random ports as it's // Add a named port to the job configuration These are random ports as it's
// not currently possible to request specific ports using Aurora. // not currently possible to request specific ports using Aurora.
func (t *Task) AddNamedPorts(names ...string) *Task { func (t *AuroraTask) AddNamedPorts(names ...string) *AuroraTask {
t.portCount += len(names) t.portCount += len(names)
for _, name := range names { for _, name := range names {
t.task.Resources = append(t.task.Resources, &aurora.Resource{NamedPort: &name}) t.task.Resources = append(t.task.Resources, &aurora.Resource{NamedPort: &name})
@ -162,11 +170,11 @@ func (t *Task) AddNamedPorts(names ...string) *Task {
// will be org.apache.aurora.port.X, where X is the current port count for the job configuration // will be org.apache.aurora.port.X, where X is the current port count for the job configuration
// starting at 0. These are random ports as it's not currently possible to request // starting at 0. These are random ports as it's not currently possible to request
// specific ports using Aurora. // specific ports using Aurora.
func (t *Task) AddPorts(num int) *Task { func (t *AuroraTask) AddPorts(num int) *AuroraTask {
start := t.portCount start := t.portCount
t.portCount += num t.portCount += num
for i := start; i < t.portCount; i++ { for i := start; i < t.portCount; i++ {
portName := "org.apache.aurora.port." + strconv.Itoa(i) portName := portPrefix + strconv.Itoa(i)
t.task.Resources = append(t.task.Resources, &aurora.Resource{NamedPort: &portName}) t.task.Resources = append(t.task.Resources, &aurora.Resource{NamedPort: &portName})
} }
@ -178,7 +186,7 @@ func (t *Task) AddPorts(num int) *Task {
// name - Mesos slave attribute that the constraint is matched against. // name - Mesos slave attribute that the constraint is matched against.
// If negated = true , treat this as a 'not' - to avoid specific values. // If negated = true , treat this as a 'not' - to avoid specific values.
// Values - list of values we look for in attribute name // Values - list of values we look for in attribute name
func (t *Task) AddValueConstraint(name string, negated bool, values ...string) *Task { func (t *AuroraTask) AddValueConstraint(name string, negated bool, values ...string) *AuroraTask {
t.task.Constraints = append(t.task.Constraints, t.task.Constraints = append(t.task.Constraints,
&aurora.Constraint{ &aurora.Constraint{
Name: name, Name: name,
@ -197,7 +205,7 @@ func (t *Task) AddValueConstraint(name string, negated bool, values ...string) *
// From Aurora Docs: // From Aurora Docs:
// A constraint that specifies the maximum number of active tasks on a host with // A constraint that specifies the maximum number of active tasks on a host with
// a matching attribute that may be scheduled simultaneously. // a matching attribute that may be scheduled simultaneously.
func (t *Task) AddLimitConstraint(name string, limit int32) *Task { func (t *AuroraTask) AddLimitConstraint(name string, limit int32) *AuroraTask {
t.task.Constraints = append(t.task.Constraints, t.task.Constraints = append(t.task.Constraints,
&aurora.Constraint{ &aurora.Constraint{
Name: name, Name: name,
@ -217,18 +225,122 @@ func (t *Task) AddLimitConstraint(name string, limit int32) *Task {
// the role field in the job configuration, and will reject the job creation otherwise. // the role field in the job configuration, and will reject the job creation otherwise.
// A wildcard (*) may be used for the role portion of the dedicated attribute, which // A wildcard (*) may be used for the role portion of the dedicated attribute, which
// will allow any owner to elect for a job to run on the host(s) // will allow any owner to elect for a job to run on the host(s)
func (t *Task) AddDedicatedConstraint(role, name string) *Task { func (t *AuroraTask) AddDedicatedConstraint(role, name string) *AuroraTask {
t.AddValueConstraint("dedicated", false, role+"/"+name) t.AddValueConstraint(dedicated, false, role+"/"+name)
return t return t
} }
// Set a container to run for the job configuration to run. // Set a container to run for the job configuration to run.
func (t *Task) Container(container Container) *Task { func (t *AuroraTask) Container(container Container) *AuroraTask {
t.task.Container = container.Build() t.task.Container = container.Build()
return t return t
} }
func (t *Task) TaskConfig() *aurora.TaskConfig {
func (t *AuroraTask) TaskConfig() *aurora.TaskConfig {
return t.task return t.task
} }
func (t *AuroraTask) Clone() *AuroraTask {
newTask := NewTask()
// Pass values using receivers as much as possible
newTask.
CPU(*t.resources[CPU].NumCpus).
RAM(*t.resources[RAM].RamMb).
Disk(*t.resources[DISK].DiskMb).
Environment(t.task.Job.Environment).
Role(t.task.Job.Role).
Name(t.task.Job.Name).
MaxFailure(t.task.MaxTaskFailures).
IsService(t.task.IsService)
if t.task.Tier != nil {
newTask.Tier(*t.task.Tier)
}
if t.task.ExecutorConfig != nil {
newTask.
ExecutorName(t.task.ExecutorConfig.Name).
ExecutorData(t.task.ExecutorConfig.Data)
}
// Make a deep copy of the task's container
if t.task.Container != nil {
if t.task.Container.Mesos != nil {
mesosContainer := NewMesosContainer()
if t.task.Container.Mesos.Image != nil {
if t.task.Container.Mesos.Image.Appc != nil {
mesosContainer.AppcImage(t.task.Container.Mesos.Image.Appc.Name, t.task.Container.Mesos.Image.Appc.ImageId)
} else if t.task.Container.Mesos.Image.Docker != nil {
mesosContainer.DockerImage(t.task.Container.Mesos.Image.Docker.Name, t.task.Container.Mesos.Image.Docker.Tag)
}
}
for _, vol := range t.task.Container.Mesos.Volumes {
mesosContainer.AddVolume(vol.ContainerPath, vol.HostPath, vol.Mode)
}
newTask.Container(mesosContainer)
} else if t.task.Container.Docker != nil {
dockerContainer := NewDockerContainer()
dockerContainer.Image(t.task.Container.Docker.Image)
for _, param := range t.task.Container.Docker.Parameters {
dockerContainer.AddParameter(param.Name, param.Value)
}
newTask.Container(dockerContainer)
}
}
// Copy all ports
for _, resource := range t.task.Resources {
// Copy only ports, skip CPU, RAM, and DISK
if resource != nil && resource.NamedPort != nil {
newTask.task.Resources = append(newTask.task.Resources, &aurora.Resource{NamedPort: thrift.StringPtr(*resource.NamedPort)})
newTask.portCount++
}
}
// Copy constraints
for _, constraint := range t.task.Constraints {
if constraint != nil && constraint.Constraint != nil {
newConstraint := aurora.Constraint{Name: constraint.Name}
taskConstraint := constraint.Constraint
if taskConstraint.Limit != nil {
newConstraint.Constraint = &aurora.TaskConstraint{Limit: &aurora.LimitConstraint{Limit: taskConstraint.Limit.Limit}}
newTask.task.Constraints = append(newTask.task.Constraints, &newConstraint)
} else if taskConstraint.Value != nil {
values := make([]string, 0)
for _, val := range taskConstraint.Value.Values {
values = append(values, val)
}
newConstraint.Constraint = &aurora.TaskConstraint{
Value: &aurora.ValueConstraint{Negated: taskConstraint.Value.Negated, Values: values}}
newTask.task.Constraints = append(newTask.task.Constraints, &newConstraint)
}
}
}
// Copy labels
for _, label := range t.task.Metadata {
newTask.task.Metadata = append(newTask.task.Metadata, &aurora.Metadata{Key: label.Key, Value: label.Value})
}
// Copy Mesos fetcher URIs
for _, uri := range t.task.MesosFetcherUris {
newTask.task.MesosFetcherUris = append(
newTask.task.MesosFetcherUris,
&aurora.MesosFetcherURI{Value: uri.Value, Extract: thrift.BoolPtr(*uri.Extract), Cache: thrift.BoolPtr(*uri.Cache)})
}
return newTask
}