diff --git a/job.go b/job.go index 8b1e93b..1bc54b0 100644 --- a/job.go +++ b/job.go @@ -21,7 +21,7 @@ import ( // Structure to collect all information pertaining to an Aurora job. type AuroraJob struct { jobConfig *aurora.JobConfiguration - task *Task + task *AuroraTask } // Create a AuroraJob object with everything initialized. @@ -29,7 +29,7 @@ func NewJob() *AuroraJob { jobKey := &aurora.JobKey{} - // Task clientConfig + // AuroraTask clientConfig task := NewTask() task.task.Job = jobKey @@ -45,7 +45,7 @@ func NewJob() *AuroraJob { } } -// Set AuroraJob Key environment. Explicit changes to Task's job key are not needed +// Set AuroraJob Key environment. Explicit changes to AuroraTask's job key are not needed // because they share a pointer to the same JobKey. func (j *AuroraJob) Environment(env string) *AuroraJob { j.jobConfig.Key.Environment = env @@ -101,7 +101,7 @@ func (j *AuroraJob) JobConfig() *aurora.JobConfiguration { } /* - Task specific API, see task.go for further documentation. + AuroraTask specific API, see task.go for further documentation. These functions are provided for the convenience of chaining API calls. */ diff --git a/task.go b/task.go index 28efd22..f875cd5 100644 --- a/task.go +++ b/task.go @@ -17,16 +17,30 @@ package realis import ( "strconv" + "git.apache.org/thrift.git/lib/go/thrift" "github.com/paypal/gorealis/gen-go/apache/aurora" ) -type Task struct { +type ResourceType int + +const ( + CPU ResourceType = iota + RAM + DISK +) + +const ( + dedicated = "dedicated" + portPrefix = "org.apache.aurora.port." +) + +type AuroraTask struct { task *aurora.TaskConfig - resources map[string]*aurora.Resource + resources map[ResourceType]*aurora.Resource portCount int } -func NewTask() *Task { +func NewTask() *AuroraTask { numCpus := &aurora.Resource{} ramMb := &aurora.Resource{} diskMb := &aurora.Resource{} @@ -35,12 +49,12 @@ func NewTask() *Task { ramMb.RamMb = new(int64) diskMb.DiskMb = new(int64) - resources := make(map[string]*aurora.Resource) - resources["cpu"] = numCpus - resources["ram"] = ramMb - resources["disk"] = diskMb + resources := make(map[ResourceType]*aurora.Resource) + resources[CPU] = numCpus + resources[RAM] = ramMb + resources[DISK] = diskMb - return &Task{task: &aurora.TaskConfig{ + return &AuroraTask{task: &aurora.TaskConfig{ Job: &aurora.JobKey{}, MesosFetcherUris: make([]*aurora.MesosFetcherURI, 0), Metadata: make([]*aurora.Metadata, 0), @@ -53,27 +67,26 @@ func NewTask() *Task { portCount: 0} } -// Set Task Key environment. -func (t *Task) Environment(env string) *Task { +// Set AuroraTask Key environment. +func (t *AuroraTask) Environment(env string) *AuroraTask { t.task.Job.Environment = env return t } -// Set Task Key Role. -func (t *Task) Role(role string) *Task { +// Set AuroraTask Key Role. +func (t *AuroraTask) Role(role string) *AuroraTask { t.task.Job.Role = role return t } -// Set Task Key Name. -func (t *Task) Name(name string) *Task { +// Set AuroraTask Key Name. +func (t *AuroraTask) Name(name string) *AuroraTask { t.task.Job.Name = name return t } // Set name of the executor that will the task will be configured to. -func (t *Task) ExecutorName(name string) *Task { - +func (t *AuroraTask) ExecutorName(name string) *AuroraTask { if t.task.ExecutorConfig == nil { t.task.ExecutorConfig = aurora.NewExecutorConfig() } @@ -83,8 +96,7 @@ func (t *Task) ExecutorName(name string) *Task { } // Will be included as part of entire task inside the scheduler that will be serialized. -func (t *Task) ExecutorData(data string) *Task { - +func (t *AuroraTask) ExecutorData(data string) *AuroraTask { if t.task.ExecutorConfig == nil { t.task.ExecutorConfig = aurora.NewExecutorConfig() } @@ -93,45 +105,41 @@ func (t *Task) ExecutorData(data string) *Task { return t } -func (t *Task) CPU(cpus float64) *Task { - *t.resources["cpu"].NumCpus = cpus - +func (t *AuroraTask) CPU(cpus float64) *AuroraTask { + *t.resources[CPU].NumCpus = cpus return t } -func (t *Task) RAM(ram int64) *Task { - *t.resources["ram"].RamMb = ram - +func (t *AuroraTask) RAM(ram int64) *AuroraTask { + *t.resources[RAM].RamMb = ram return t } -func (t *Task) Disk(disk int64) *Task { - *t.resources["disk"].DiskMb = disk - +func (t *AuroraTask) Disk(disk int64) *AuroraTask { + *t.resources[DISK].DiskMb = disk return t } -func (t *Task) Tier(tier string) *Task { - *t.task.Tier = tier - +func (t *AuroraTask) Tier(tier string) *AuroraTask { + t.task.Tier = &tier return t } // How many failures to tolerate before giving up. -func (t *Task) MaxFailure(maxFail int32) *Task { +func (t *AuroraTask) MaxFailure(maxFail int32) *AuroraTask { t.task.MaxTaskFailures = maxFail return t } // Restart the job's tasks if they fail -func (t *Task) IsService(isService bool) *Task { +func (t *AuroraTask) IsService(isService bool) *AuroraTask { t.task.IsService = isService return t } // Add a list of URIs with the same extract and cache configuration. Scheduler must have // --enable_mesos_fetcher flag enabled. Currently there is no duplicate detection. -func (t *Task) AddURIs(extract bool, cache bool, values ...string) *Task { +func (t *AuroraTask) AddURIs(extract bool, cache bool, values ...string) *AuroraTask { for _, value := range values { t.task.MesosFetcherUris = append( t.task.MesosFetcherUris, @@ -142,14 +150,14 @@ func (t *Task) AddURIs(extract bool, cache bool, values ...string) *Task { // Adds a Mesos label to the job. Note that Aurora will add the // prefix "org.apache.aurora.metadata." to the beginning of each key. -func (t *Task) AddLabel(key string, value string) *Task { +func (t *AuroraTask) AddLabel(key string, value string) *AuroraTask { t.task.Metadata = append(t.task.Metadata, &aurora.Metadata{Key: key, Value: value}) return t } // Add a named port to the job configuration These are random ports as it's // not currently possible to request specific ports using Aurora. -func (t *Task) AddNamedPorts(names ...string) *Task { +func (t *AuroraTask) AddNamedPorts(names ...string) *AuroraTask { t.portCount += len(names) for _, name := range names { t.task.Resources = append(t.task.Resources, &aurora.Resource{NamedPort: &name}) @@ -162,11 +170,11 @@ func (t *Task) AddNamedPorts(names ...string) *Task { // will be org.apache.aurora.port.X, where X is the current port count for the job configuration // starting at 0. These are random ports as it's not currently possible to request // specific ports using Aurora. -func (t *Task) AddPorts(num int) *Task { +func (t *AuroraTask) AddPorts(num int) *AuroraTask { start := t.portCount t.portCount += num for i := start; i < t.portCount; i++ { - portName := "org.apache.aurora.port." + strconv.Itoa(i) + portName := portPrefix + strconv.Itoa(i) t.task.Resources = append(t.task.Resources, &aurora.Resource{NamedPort: &portName}) } @@ -178,7 +186,7 @@ func (t *Task) AddPorts(num int) *Task { // name - Mesos slave attribute that the constraint is matched against. // If negated = true , treat this as a 'not' - to avoid specific values. // Values - list of values we look for in attribute name -func (t *Task) AddValueConstraint(name string, negated bool, values ...string) *Task { +func (t *AuroraTask) AddValueConstraint(name string, negated bool, values ...string) *AuroraTask { t.task.Constraints = append(t.task.Constraints, &aurora.Constraint{ Name: name, @@ -197,7 +205,7 @@ func (t *Task) AddValueConstraint(name string, negated bool, values ...string) * // From Aurora Docs: // A constraint that specifies the maximum number of active tasks on a host with // a matching attribute that may be scheduled simultaneously. -func (t *Task) AddLimitConstraint(name string, limit int32) *Task { +func (t *AuroraTask) AddLimitConstraint(name string, limit int32) *AuroraTask { t.task.Constraints = append(t.task.Constraints, &aurora.Constraint{ Name: name, @@ -217,18 +225,122 @@ func (t *Task) AddLimitConstraint(name string, limit int32) *Task { // the role field in the job configuration, and will reject the job creation otherwise. // A wildcard (*) may be used for the role portion of the dedicated attribute, which // will allow any owner to elect for a job to run on the host(s) -func (t *Task) AddDedicatedConstraint(role, name string) *Task { - t.AddValueConstraint("dedicated", false, role+"/"+name) - +func (t *AuroraTask) AddDedicatedConstraint(role, name string) *AuroraTask { + t.AddValueConstraint(dedicated, false, role+"/"+name) return t } // Set a container to run for the job configuration to run. -func (t *Task) Container(container Container) *Task { +func (t *AuroraTask) Container(container Container) *AuroraTask { t.task.Container = container.Build() - return t } -func (t *Task) TaskConfig() *aurora.TaskConfig { + +func (t *AuroraTask) TaskConfig() *aurora.TaskConfig { return t.task } + +func (t *AuroraTask) Clone() *AuroraTask { + + newTask := NewTask() + + // Pass values using receivers as much as possible + newTask. + CPU(*t.resources[CPU].NumCpus). + RAM(*t.resources[RAM].RamMb). + Disk(*t.resources[DISK].DiskMb). + Environment(t.task.Job.Environment). + Role(t.task.Job.Role). + Name(t.task.Job.Name). + MaxFailure(t.task.MaxTaskFailures). + IsService(t.task.IsService) + + if t.task.Tier != nil { + newTask.Tier(*t.task.Tier) + } + + if t.task.ExecutorConfig != nil { + newTask. + ExecutorName(t.task.ExecutorConfig.Name). + ExecutorData(t.task.ExecutorConfig.Data) + } + + // Make a deep copy of the task's container + if t.task.Container != nil { + if t.task.Container.Mesos != nil { + mesosContainer := NewMesosContainer() + + if t.task.Container.Mesos.Image != nil { + if t.task.Container.Mesos.Image.Appc != nil { + mesosContainer.AppcImage(t.task.Container.Mesos.Image.Appc.Name, t.task.Container.Mesos.Image.Appc.ImageId) + } else if t.task.Container.Mesos.Image.Docker != nil { + mesosContainer.DockerImage(t.task.Container.Mesos.Image.Docker.Name, t.task.Container.Mesos.Image.Docker.Tag) + } + } + + for _, vol := range t.task.Container.Mesos.Volumes { + mesosContainer.AddVolume(vol.ContainerPath, vol.HostPath, vol.Mode) + } + + newTask.Container(mesosContainer) + } else if t.task.Container.Docker != nil { + dockerContainer := NewDockerContainer() + dockerContainer.Image(t.task.Container.Docker.Image) + + for _, param := range t.task.Container.Docker.Parameters { + dockerContainer.AddParameter(param.Name, param.Value) + } + + newTask.Container(dockerContainer) + } + } + + // Copy all ports + for _, resource := range t.task.Resources { + // Copy only ports, skip CPU, RAM, and DISK + if resource != nil && resource.NamedPort != nil { + newTask.task.Resources = append(newTask.task.Resources, &aurora.Resource{NamedPort: thrift.StringPtr(*resource.NamedPort)}) + newTask.portCount++ + } + } + + // Copy constraints + for _, constraint := range t.task.Constraints { + if constraint != nil && constraint.Constraint != nil { + + newConstraint := aurora.Constraint{Name: constraint.Name} + + taskConstraint := constraint.Constraint + if taskConstraint.Limit != nil { + newConstraint.Constraint = &aurora.TaskConstraint{Limit: &aurora.LimitConstraint{Limit: taskConstraint.Limit.Limit}} + newTask.task.Constraints = append(newTask.task.Constraints, &newConstraint) + + } else if taskConstraint.Value != nil { + + values := make([]string, 0) + for _, val := range taskConstraint.Value.Values { + values = append(values, val) + } + + newConstraint.Constraint = &aurora.TaskConstraint{ + Value: &aurora.ValueConstraint{Negated: taskConstraint.Value.Negated, Values: values}} + + newTask.task.Constraints = append(newTask.task.Constraints, &newConstraint) + } + } + } + + // Copy labels + for _, label := range t.task.Metadata { + newTask.task.Metadata = append(newTask.task.Metadata, &aurora.Metadata{Key: label.Key, Value: label.Value}) + } + + // Copy Mesos fetcher URIs + for _, uri := range t.task.MesosFetcherUris { + newTask.task.MesosFetcherUris = append( + newTask.task.MesosFetcherUris, + &aurora.MesosFetcherURI{Value: uri.Value, Extract: thrift.BoolPtr(*uri.Extract), Cache: thrift.BoolPtr(*uri.Cache)}) + } + + return newTask +}