Initial support for Thermos and GPU resources.

This commit is contained in:
Renan DelValle 2019-01-07 14:38:08 -08:00
parent afcdaa84b8
commit e13349db26
No known key found for this signature in database
GPG key ID: C240AD6D6F443EC9
9 changed files with 359 additions and 176 deletions

View file

@ -17,7 +17,6 @@ package main
import ( import (
"flag" "flag"
"fmt" "fmt"
"io/ioutil"
"log" "log"
"strings" "strings"
"time" "time"
@ -114,23 +113,21 @@ func main() {
switch executor { switch executor {
case "thermos": case "thermos":
payload, err := ioutil.ReadFile("examples/thermos_payload.json") thermosExec := realis.ThermosExecutor{}
if err != nil { thermosExec.AddProcess(realis.NewThermosProcess("boostrap", "echo bootsrapping")).
log.Fatalln("Error reading json config file: ", err) AddProcess(realis.NewThermosProcess("hello_gorealis", "while true; do echo hello world from gorealis; sleep 10; done"))
}
job = realis.NewJob(). job = realis.NewJob().
Environment("prod"). Environment("prod").
Role("vagrant"). Role("vagrant").
Name("hello_world_from_gorealis"). Name("hello_world_from_gorealis").
ExecutorName(aurora.AURORA_EXECUTOR_NAME).
ExecutorData(string(payload)).
CPU(1). CPU(1).
RAM(64). RAM(64).
Disk(100). Disk(100).
IsService(true). IsService(true).
InstanceCount(1). InstanceCount(1).
AddPorts(1) AddPorts(1).
ThermosExecutor(thermosExec)
case "compose": case "compose":
job = realis.NewJob(). job = realis.NewJob().
Environment("prod"). Environment("prod").
@ -180,7 +177,8 @@ func main() {
case "createService": case "createService":
// Create a service with three instances using the update API instead of the createJob API // Create a service with three instances using the update API instead of the createJob API
fmt.Println("Creating service") fmt.Println("Creating service")
settings := realis.JobUpdateFromConfig(job.TaskConfig()).InstanceCount(3) settings := realis.JobUpdateFromAuroraTask(job.AuroraTask()).InstanceCount(3)
result, err := r.CreateService(settings) result, err := r.CreateService(settings)
if err != nil { if err != nil {
log.Fatal("error: ", err) log.Fatal("error: ", err)

View file

@ -1,62 +0,0 @@
{
"environment": "prod",
"health_check_config": {
"initial_interval_secs": 15.0,
"health_checker": {
"http": {
"expected_response_code": 0,
"endpoint": "/health",
"expected_response": "ok"
}
},
"interval_secs": 10.0,
"timeout_secs": 1.0,
"max_consecutive_failures": 0
},
"name": "hello_world_from_gorealis",
"service": false,
"max_task_failures": 1,
"cron_collision_policy": "KILL_EXISTING",
"enable_hooks": false,
"cluster": "devcluster",
"task": {
"processes": [
{
"daemon": false,
"name": "hello",
"ephemeral": false,
"max_failures": 1,
"min_duration": 5,
"cmdline": "\n while true; do\n echo hello world from gorealis\n sleep 10\n done\n ",
"final": false
}
],
"name": "hello",
"finalization_wait": 30,
"max_failures": 1,
"max_concurrency": 0,
"resources": {
"gpu": 0,
"disk": 134217728,
"ram": 134217728,
"cpu": 1.0
},
"constraints": [
{
"order": [
"hello"
]
}
]
},
"production": false,
"role": "vagrant",
"lifecycle": {
"http": {
"graceful_shutdown_endpoint": "/quitquitquit",
"port": "health",
"shutdown_endpoint": "/abortabortabort"
}
},
"priority": 0
}

15
job.go
View file

@ -60,6 +60,7 @@ func (j *AuroraJob) Role(role string) *AuroraJob {
identity := &aurora.Identity{User: role} identity := &aurora.Identity{User: role}
j.jobConfig.Owner = identity j.jobConfig.Owner = identity
j.jobConfig.TaskConfig.Owner = identity j.jobConfig.TaskConfig.Owner = identity
return j return j
} }
@ -100,6 +101,11 @@ func (j *AuroraJob) JobConfig() *aurora.JobConfiguration {
return j.jobConfig return j.jobConfig
} }
// Get the current job configurations key to use for some realis calls.
func (j *AuroraJob) AuroraTask() *AuroraTask {
return j.task
}
/* /*
AuroraTask specific API, see task.go for further documentation. AuroraTask specific API, see task.go for further documentation.
These functions are provided for the convenience of chaining API calls. These functions are provided for the convenience of chaining API calls.
@ -187,3 +193,12 @@ func (j *AuroraJob) Container(container Container) *AuroraJob {
j.task.Container(container) j.task.Container(container)
return j return j
} }
func (j *AuroraJob) ThermosExecutor(thermos ThermosExecutor) *AuroraJob {
j.task.ThermosExecutor(thermos)
return j
}
func (j *AuroraJob) BuildThermosPayload() error {
return j.task.BuildThermosPayload()
}

View file

@ -129,19 +129,19 @@ func newUpdateSettings() *aurora.JobUpdateSettings {
See task.go for further documentation. See task.go for further documentation.
*/ */
func (t *JobUpdate) Environment(env string) *JobUpdate { func (j *JobUpdate) Environment(env string) *JobUpdate {
t.task.Environment(env) j.task.Environment(env)
return t return j
} }
func (t *JobUpdate) Role(role string) *JobUpdate { func (j *JobUpdate) Role(role string) *JobUpdate {
t.task.Role(role) j.task.Role(role)
return t return j
} }
func (t *JobUpdate) Name(name string) *JobUpdate { func (j *JobUpdate) Name(name string) *JobUpdate {
t.task.Name(name) j.task.Name(name)
return t return j
} }
func (j *JobUpdate) ExecutorName(name string) *JobUpdate { func (j *JobUpdate) ExecutorName(name string) *JobUpdate {
@ -230,3 +230,12 @@ func (j *JobUpdate) Container(container Container) *JobUpdate {
func (j *JobUpdate) JobKey() aurora.JobKey { func (j *JobUpdate) JobKey() aurora.JobKey {
return j.task.JobKey() return j.task.JobKey()
} }
func (j *JobUpdate) ThermosExecutor(thermos ThermosExecutor) *JobUpdate {
j.task.ThermosExecutor(thermos)
return j
}
func (j *JobUpdate) BuildThermosPayload() error {
return j.task.BuildThermosPayload()
}

View file

@ -566,9 +566,15 @@ func (c *Client) KillJob(key aurora.JobKey) error {
// as that API uses the update thrift call which has a few extra features available. // as that API uses the update thrift call which has a few extra features available.
// Use this API to create ad-hoc jobs. // Use this API to create ad-hoc jobs.
func (c *Client) CreateJob(auroraJob *AuroraJob) error { func (c *Client) CreateJob(auroraJob *AuroraJob) error {
// If no thermos configuration has been set this will result in a NOOP
err := auroraJob.BuildThermosPayload()
c.logger.DebugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig()) c.logger.DebugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig())
if err != nil {
return errors.Wrap(err, "Unable to create Thermos payload")
}
_, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { _, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
return c.client.CreateJob(nil, auroraJob.JobConfig()) return c.client.CreateJob(nil, auroraJob.JobConfig())
}) })
@ -587,16 +593,19 @@ func (c *Client) CreateService(update *JobUpdate) (*aurora.StartJobUpdateResult_
return nil, errors.Wrap(err, "unable to create service") return nil, errors.Wrap(err, "unable to create service")
} }
if updateResult != nil { return updateResult, err
return updateResult, nil
}
return nil, errors.New("results object is nil")
} }
func (c *Client) ScheduleCronJob(auroraJob *AuroraJob) error { func (c *Client) ScheduleCronJob(auroraJob *AuroraJob) error {
// If no thermos configuration has been set this will result in a NOOP
err := auroraJob.BuildThermosPayload()
c.logger.DebugPrintf("ScheduleCronJob Thrift Payload: %+v\n", auroraJob.JobConfig()) c.logger.DebugPrintf("ScheduleCronJob Thrift Payload: %+v\n", auroraJob.JobConfig())
if err != nil {
return errors.Wrap(err, "Unable to create Thermos payload")
}
_, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { _, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
return c.client.ScheduleCronJob(nil, auroraJob.JobConfig()) return c.client.ScheduleCronJob(nil, auroraJob.JobConfig())
}) })
@ -680,6 +689,10 @@ func (c *Client) RestartJob(key aurora.JobKey) error {
// Update all tasks under a job configuration. Currently gorealis doesn't support for canary deployments. // Update all tasks under a job configuration. Currently gorealis doesn't support for canary deployments.
func (c *Client) StartJobUpdate(updateJob *JobUpdate, message string) (*aurora.StartJobUpdateResult_, error) { func (c *Client) StartJobUpdate(updateJob *JobUpdate, message string) (*aurora.StartJobUpdateResult_, error) {
if err := updateJob.BuildThermosPayload(); err != nil {
return nil, errors.New("unable to generate the proper Thermos executor payload")
}
c.logger.DebugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message) c.logger.DebugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message)
resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) {
@ -689,7 +702,12 @@ func (c *Client) StartJobUpdate(updateJob *JobUpdate, message string) (*aurora.S
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "Error sending StartJobUpdate command to Aurora Scheduler") return nil, errors.Wrap(retryErr, "Error sending StartJobUpdate command to Aurora Scheduler")
} }
return resp.GetResult_().GetStartJobUpdateResult_(), nil
if resp.GetResult_() != nil && resp.GetResult_().GetStartJobUpdateResult_() != nil {
return resp.GetResult_().GetStartJobUpdateResult_(), nil
}
return nil, errors.New("Thrift error: Field in response is nil unexpectedly.")
} }
// Abort AuroraJob Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI. // Abort AuroraJob Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI.

View file

@ -29,7 +29,7 @@ import (
) )
var r *realis.Client var r *realis.Client
var thermosPayload []byte var thermosExec realis.ThermosExecutor
func TestMain(m *testing.M) { func TestMain(m *testing.M) {
var err error var err error
@ -46,11 +46,8 @@ func TestMain(m *testing.M) {
defer r.Close() defer r.Close()
thermosPayload, err = ioutil.ReadFile("examples/thermos_payload.json") thermosExec.AddProcess(realis.NewThermosProcess("boostrap", "echo bootsrapping")).
if err != nil { AddProcess(realis.NewThermosProcess("hello_gorealis", "while true; do echo hello world from gorealis; sleep 10; done"))
fmt.Println("Error reading thermos payload file: ", err)
os.Exit(1)
}
os.Exit(m.Run()) os.Exit(m.Run())
} }
@ -97,8 +94,7 @@ func TestBadCredentials(t *testing.T) {
Environment("prod"). Environment("prod").
Role("vagrant"). Role("vagrant").
Name("create_thermos_job_test"). Name("create_thermos_job_test").
ExecutorName(aurora.AURORA_EXECUTOR_NAME). ThermosExecutor(thermosExec).
ExecutorData(string(thermosPayload)).
CPU(.5). CPU(.5).
RAM(64). RAM(64).
Disk(100). Disk(100).
@ -230,8 +226,7 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) {
Environment("prod"). Environment("prod").
Role(role). Role(role).
Name("create_thermos_job_test"). Name("create_thermos_job_test").
ExecutorName(aurora.AURORA_EXECUTOR_NAME). ThermosExecutor(thermosExec).
ExecutorData(string(thermosPayload)).
CPU(.5). CPU(.5).
RAM(64). RAM(64).
Disk(100). Disk(100).
@ -270,7 +265,7 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) {
err := r.KillJob(job.JobKey()) err := r.KillJob(job.JobKey())
assert.NoError(t, err) assert.NoError(t, err)
success, err := r.InstancesMonitor(job.JobKey(), 0, 1*time.Second, 50*time.Second) success, err := r.InstancesMonitor(job.JobKey(), 0, 1*time.Second, 60*time.Second)
assert.True(t, success) assert.True(t, success)
assert.NoError(t, err) assert.NoError(t, err)
}) })
@ -307,8 +302,7 @@ func TestRealisClient_GetPendingReason(t *testing.T) {
Environment(env). Environment(env).
Role(role). Role(role).
Name(name). Name(name).
ExecutorName(aurora.AURORA_EXECUTOR_NAME). ThermosExecutor(thermosExec).
ExecutorData(string(thermosPayload)).
CPU(1000). CPU(1000).
RAM(64). RAM(64).
Disk(100). Disk(100).
@ -338,12 +332,11 @@ func TestRealisClient_CreateService_WithPulse_Thermos(t *testing.T) {
job := realis.NewJobUpdate(). job := realis.NewJobUpdate().
Environment("prod"). Environment("prod").
Role(role). Role(role).
Name("create_thermos_job_test"). Name("create_thermos_job_pulse_test").
ExecutorName(aurora.AURORA_EXECUTOR_NAME).
ExecutorData(string(thermosPayload)).
CPU(.5). CPU(.5).
RAM(64). RAM(64).
Disk(100). Disk(100).
ThermosExecutor(thermosExec).
IsService(true). IsService(true).
InstanceCount(2). InstanceCount(2).
AddPorts(1). AddPorts(1).
@ -351,8 +344,6 @@ func TestRealisClient_CreateService_WithPulse_Thermos(t *testing.T) {
PulseIntervalTimeout(30 * time.Millisecond). PulseIntervalTimeout(30 * time.Millisecond).
BatchSize(1).WaitForBatchCompletion(true) BatchSize(1).WaitForBatchCompletion(true)
pulse := int32(30)
timeout := 300
result, err := r.CreateService(job) result, err := r.CreateService(job)
fmt.Println(result.String()) fmt.Println(result.String())
@ -365,43 +356,52 @@ func TestRealisClient_CreateService_WithPulse_Thermos(t *testing.T) {
var updateDetails []*aurora.JobUpdateDetails var updateDetails []*aurora.JobUpdateDetails
for i := 0; i*int(pulse) <= timeout; i++ { ticker := time.NewTicker(time.Second * 3)
timer := time.NewTimer(time.Minute * 6)
defer ticker.Stop()
defer timer.Stop()
pulseStatus, err := r.PulseJobUpdate(result.GetKey()) pulseLoop:
for {
select {
case <-ticker.C:
pulseStatus, err := r.PulseJobUpdate(result.GetKey())
assert.Nil(t, err) assert.Nil(t, err)
if pulseStatus != aurora.JobUpdatePulseStatus_OK && pulseStatus != aurora.JobUpdatePulseStatus_FINISHED { if pulseStatus != aurora.JobUpdatePulseStatus_OK && pulseStatus != aurora.JobUpdatePulseStatus_FINISHED {
assert.Fail(t, "Pulse update status received doesn't exist") assert.FailNow(t, "pulse update status received doesn't exist")
}
updateDetails, err = r.JobUpdateDetails(updateQ)
assert.Nil(t, err)
assert.Equal(t, len(updateDetails), 1, "No update matching query found")
status := updateDetails[0].Update.Summary.State.Status
if _, ok := realis.ActiveJobUpdateStates[status]; !ok {
// Rolled forward is the only state in which an update has been successfully updated
// if we encounter an inactive state and it is not at rolled forward, update failed
if status == aurora.JobUpdateStatus_ROLLED_FORWARD {
fmt.Println("Update succeeded")
break
} else {
fmt.Println("Update failed")
break
} }
updateDetails, err = r.JobUpdateDetails(updateQ)
assert.Nil(t, err)
assert.Equal(t, len(updateDetails), 1, "no update matching query found")
status := updateDetails[0].Update.Summary.State.Status
if _, ok := realis.ActiveJobUpdateStates[status]; !ok {
// Rolled forward is the only state in which an update has been successfully updated
// if we encounter an inactive state and it is not at rolled forward, update failed
if status == aurora.JobUpdateStatus_ROLLED_FORWARD {
fmt.Println("Update succeeded")
break pulseLoop
} else {
fmt.Println("Update failed")
break pulseLoop
}
}
case <-timer.C:
err := r.AbortJobUpdate(*updateDetails[0].GetUpdate().GetSummary().GetKey(), "")
assert.NoError(t, err)
err = r.KillJob(job.JobKey())
assert.NoError(t, err)
assert.FailNow(t, "timed out during pulse update test")
} }
fmt.Println("Polling, update still active...")
} }
t.Run("TestRealisClient_KillJob_Thermos", func(t *testing.T) { err = r.KillJob(job.JobKey())
err := r.AbortJobUpdate(*updateDetails[0].GetUpdate().GetSummary().GetKey(), "") assert.NoError(t, err)
assert.NoError(t, err)
err = r.KillJob(job.JobKey())
assert.NoError(t, err)
})
} }
// Test configuring an executor that doesn't exist for CreateJob API // Test configuring an executor that doesn't exist for CreateJob API
@ -412,8 +412,7 @@ func TestRealisClient_CreateService(t *testing.T) {
Environment("prod"). Environment("prod").
Role("vagrant"). Role("vagrant").
Name("create_service_test"). Name("create_service_test").
ExecutorName(aurora.AURORA_EXECUTOR_NAME). ThermosExecutor(thermosExec).
ExecutorData(string(thermosPayload)).
CPU(.25). CPU(.25).
RAM(4). RAM(4).
Disk(10). Disk(10).
@ -609,8 +608,7 @@ func TestRealisClient_SessionThreadSafety(t *testing.T) {
Environment("prod"). Environment("prod").
Role("vagrant"). Role("vagrant").
Name("create_thermos_job_test_multi"). Name("create_thermos_job_test_multi").
ExecutorName(aurora.AURORA_EXECUTOR_NAME). ThermosExecutor(thermosExec).
ExecutorData(string(thermosPayload)).
CPU(.25). CPU(.25).
RAM(4). RAM(4).
Disk(10). Disk(10).

58
task.go
View file

@ -15,6 +15,7 @@
package realis package realis
import ( import (
"encoding/json"
"strconv" "strconv"
"git.apache.org/thrift.git/lib/go/thrift" "git.apache.org/thrift.git/lib/go/thrift"
@ -27,6 +28,7 @@ const (
CPU ResourceType = iota CPU ResourceType = iota
RAM RAM
DISK DISK
GPU
) )
const ( const (
@ -38,12 +40,14 @@ type AuroraTask struct {
task *aurora.TaskConfig task *aurora.TaskConfig
resources map[ResourceType]*aurora.Resource resources map[ResourceType]*aurora.Resource
portCount int portCount int
thermos *ThermosExecutor
} }
func NewTask() *AuroraTask { func NewTask() *AuroraTask {
numCpus := &aurora.Resource{} numCpus := &aurora.Resource{}
ramMb := &aurora.Resource{} ramMb := &aurora.Resource{}
diskMb := &aurora.Resource{} diskMb := &aurora.Resource{}
numGpus := &aurora.Resource{}
numCpus.NumCpus = new(float64) numCpus.NumCpus = new(float64)
ramMb.RamMb = new(int64) ramMb.RamMb = new(int64)
@ -53,6 +57,7 @@ func NewTask() *AuroraTask {
resources[CPU] = numCpus resources[CPU] = numCpus
resources[RAM] = ramMb resources[RAM] = ramMb
resources[DISK] = diskMb resources[DISK] = diskMb
resources[GPU] = numGpus
return &AuroraTask{task: &aurora.TaskConfig{ return &AuroraTask{task: &aurora.TaskConfig{
Job: &aurora.JobKey{}, Job: &aurora.JobKey{},
@ -122,7 +127,7 @@ func TaskFromThrift(config *aurora.TaskConfig) *AuroraTask {
// Copy all ports // Copy all ports
for _, resource := range config.Resources { for _, resource := range config.Resources {
// Copy only ports, skip CPU, RAM, and DISK // Copy only ports. Set CPU, RAM, DISK, and GPU
if resource != nil { if resource != nil {
if resource.NamedPort != nil { if resource.NamedPort != nil {
newTask.task.Resources = append(newTask.task.Resources, &aurora.Resource{NamedPort: thrift.StringPtr(*resource.NamedPort)}) newTask.task.Resources = append(newTask.task.Resources, &aurora.Resource{NamedPort: thrift.StringPtr(*resource.NamedPort)})
@ -140,6 +145,10 @@ func TaskFromThrift(config *aurora.TaskConfig) *AuroraTask {
if resource.DiskMb != nil { if resource.DiskMb != nil {
newTask.Disk(*resource.DiskMb) newTask.Disk(*resource.DiskMb)
} }
if resource.NumGpus != nil {
newTask.GPU(*resource.NumGpus)
}
} }
} }
@ -237,6 +246,11 @@ func (t *AuroraTask) Disk(disk int64) *AuroraTask {
return t return t
} }
func (t *AuroraTask) GPU(gpu int64) *AuroraTask {
*t.resources[GPU].NumGpus = gpu
return t
}
func (t *AuroraTask) Tier(tier string) *AuroraTask { func (t *AuroraTask) Tier(tier string) *AuroraTask {
t.task.Tier = &tier t.task.Tier = &tier
return t return t
@ -363,5 +377,47 @@ func (t *AuroraTask) JobKey() aurora.JobKey {
func (t *AuroraTask) Clone() *AuroraTask { func (t *AuroraTask) Clone() *AuroraTask {
newTask := TaskFromThrift(t.task) newTask := TaskFromThrift(t.task)
if t.thermos != nil {
newTask.ThermosExecutor(*t.thermos.Clone())
}
return newTask return newTask
} }
func (t *AuroraTask) ThermosExecutor(thermos ThermosExecutor) *AuroraTask {
t.thermos = &thermos
return t
}
func (t *AuroraTask) BuildThermosPayload() error {
if t.thermos != nil {
// Set the correct resources
if t.resources[CPU].NumCpus != nil {
t.thermos.cpu(*t.resources[CPU].NumCpus)
}
if t.resources[RAM].RamMb != nil {
t.thermos.ram(*t.resources[RAM].RamMb)
}
if t.resources[DISK].DiskMb != nil {
t.thermos.disk(*t.resources[DISK].DiskMb)
}
if t.resources[GPU].NumGpus != nil {
t.thermos.gpu(*t.resources[GPU].NumGpus)
}
payload, err := json.Marshal(t.thermos)
if err != nil {
return err
}
t.ExecutorName(aurora.AURORA_EXECUTOR_NAME)
t.ExecutorData(string(payload))
}
return nil
}

View file

@ -14,36 +14,182 @@
package realis package realis
import "encoding/json"
type ThermosExecutor struct { type ThermosExecutor struct {
Task ThermosTask `json:"task""` Task ThermosTask `json:"task""`
order *ThermosConstraint `json:"-"`
} }
type ThermosTask struct { type ThermosTask struct {
Processes []ThermosProcess `json:"processes"` Processes map[string]*ThermosProcess `json:"processes"`
Constraints []ThermosConstraint `json:"constraints"` Constraints []*ThermosConstraint `json:"constraints"`
Resources ThermosResources `json:"resources"` Resources thermosResources `json:"resources"`
} }
type ThermosConstraint struct { type ThermosConstraint struct {
Order []string `json:"order"` Order []string `json:"order,omitempty"`
} }
type ThermosResources struct { // This struct should always be controlled by the Aurora job struct.
CPU float64 `json:"cpu"` // Therefore it is private.
Disk int `json:"disk"` type thermosResources struct {
RAM int `json:"ram"` CPU *float64 `json:"cpu,omitempty"`
GPU int `json:"gpu"` Disk *int64 `json:"disk,omitempty"`
RAM *int64 `json:"ram,omitempty"`
GPU *int64 `json:"gpu,omitempty"`
} }
type ThermosProcess struct { type ThermosProcess struct {
Daemon bool `json:"daemon"` Name string `json:"name"`
Name string `json:"name"` Cmdline string `json:"cmdline"`
Ephemeral bool `json:"ephemeral"` Daemon bool `json:"daemon"`
MaxFailures int `json:"max_failures"` Ephemeral bool `json:"ephemeral"`
MinDuration int `json:"min_duration"` MaxFailures int `json:"max_failures"`
Cmdline string `json:"cmdline"` MinDuration int `json:"min_duration"`
Final bool `json:"final` Final bool `json:"final"`
} }
func NewThermosProcess(name, command string) ThermosProcess {
return ThermosProcess{
Name: name,
Cmdline: command,
MaxFailures: 1,
Daemon: false,
Ephemeral: false,
MinDuration: 5,
Final: false}
}
// Processes must have unique names. Adding a process whose name already exists will
// result in overwriting the previous version of the process.
func (t *ThermosExecutor) AddProcess(process ThermosProcess) *ThermosExecutor {
if len(t.Task.Processes) == 0 {
t.Task.Processes = make(map[string]*ThermosProcess, 0)
}
t.Task.Processes[process.Name] = &process
// Add Process to order
t.addToOrder(process.Name)
return t
}
// Only constraint that should be added for now is the order of execution, therefore this
// receiver is private.
func (t *ThermosExecutor) addConstraint(constraint *ThermosConstraint) *ThermosExecutor {
if len(t.Task.Constraints) == 0 {
t.Task.Constraints = make([]*ThermosConstraint, 0)
}
t.Task.Constraints = append(t.Task.Constraints, constraint)
return t
}
// Order in which the Processes should be executed. Index 0 will be executed first, index N will be executed last.
func (t *ThermosExecutor) ProcessOrder(order ...string) *ThermosExecutor {
if t.order == nil {
t.order = &ThermosConstraint{}
t.addConstraint(t.order)
}
t.order.Order = order
return t
}
// Add Process to execution order. By default this is a FIFO setup. Custom order can be given by overriding
// with ProcessOrder
func (t *ThermosExecutor) addToOrder(name string) {
if t.order == nil {
t.order = &ThermosConstraint{Order: make([]string, 0)}
t.addConstraint(t.order)
}
t.order.Order = append(t.order.Order, name)
}
// Ram is determined by the job object.
func (t *ThermosExecutor) ram(ram int64) {
// Convert from bytes to MiB
ram *= 1024 ^ 2
t.Task.Resources.RAM = &ram
}
// Disk is determined by the job object.
func (t *ThermosExecutor) disk(disk int64) {
// Convert from bytes to MiB
disk *= 1024 ^ 2
t.Task.Resources.Disk = &disk
}
// CPU is determined by the job object.
func (t *ThermosExecutor) cpu(cpu float64) {
t.Task.Resources.CPU = &cpu
}
// GPU is determined by the job object.
func (t *ThermosExecutor) gpu(gpu int64) {
t.Task.Resources.GPU = &gpu
}
// Deep copy of Thermos executor
func (t *ThermosExecutor) Clone() *ThermosExecutor {
tNew := ThermosExecutor{}
if t.order != nil {
tNew.order = &ThermosConstraint{Order: t.order.Order}
tNew.addConstraint(tNew.order)
}
tNew.Task.Processes = make(map[string]*ThermosProcess)
for name, process := range t.Task.Processes {
newProcess := *process
tNew.Task.Processes[name] = &newProcess
}
tNew.Task.Resources = t.Task.Resources
return &tNew
}
type thermosTaskJSON struct {
Processes []*ThermosProcess `json:"processes"`
Constraints []*ThermosConstraint `json:"constraints"`
Resources thermosResources `json:"resources"`
}
// Custom Marshaling for Thermos Task to match what Thermos expects
func (t *ThermosTask) MarshalJSON() ([]byte, error) {
// Convert map to array to match what Thermos expects
processes := make([]*ThermosProcess, 0)
for _, process := range t.Processes {
processes = append(processes, process)
}
return json.Marshal(&thermosTaskJSON{
Processes: processes,
Constraints: t.Constraints,
Resources: t.Resources,
})
}
// Custom Unmarshaling to match what Thermos would contain
func (t *ThermosTask) UnmarshalJSON(data []byte) error {
// Thermos format
aux := &thermosTaskJSON{}
if err := json.Unmarshal(data, &aux); err != nil {
return err
}
processes := make(map[string]*ThermosProcess)
for _, process := range aux.Processes {
processes[process.Name] = process
}
return nil
}

View file

@ -1,10 +1,10 @@
package realis_test package realis
import ( import (
"encoding/json" "encoding/json"
"testing" "testing"
"github.com/paypal/gorealis/v2" "git.apache.org/thrift.git/lib/go/thrift"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@ -40,27 +40,32 @@ func TestThermosTask(t *testing.T) {
] ]
} }
}`) }`)
thermos := realis.ThermosExecutor{} thermos := ThermosExecutor{}
err := json.Unmarshal(thermosJSON, &thermos) err := json.Unmarshal(thermosJSON, &thermos)
assert.NoError(t, err) assert.NoError(t, err)
thermosExpected := realis.ThermosExecutor{ process := &ThermosProcess{
Task: realis.ThermosTask{ Daemon: false,
Processes: []realis.ThermosProcess{ Name: "hello",
{ Ephemeral: false,
Daemon: false, MaxFailures: 1,
Name: "hello", MinDuration: 5,
Ephemeral: false, Cmdline: "\n while true; do\n echo hello world from gorealis\n sleep 10\n done\n ",
MaxFailures: 1, Final: false,
MinDuration: 5, }
Cmdline: "\n while true; do\n echo hello world from gorealis\n sleep 10\n done\n ",
Final: false,
},
},
Constraints: []realis.ThermosConstraint{{Order: []string{"hello"}}},
Resources: realis.ThermosResources{CPU: 1.1, Disk: 134217728, RAM: 134217728, GPU: 0}}}
assert.ObjectsAreEqual(thermosExpected, thermos) constraint := &ThermosConstraint{Order: []string{process.Name}}
thermosExpected := ThermosExecutor{
Task: ThermosTask{
Processes: map[string]*ThermosProcess{process.Name: process},
Constraints: []*ThermosConstraint{constraint},
Resources: thermosResources{CPU: thrift.Float64Ptr(1.1),
Disk: thrift.Int64Ptr(134217728),
RAM: thrift.Int64Ptr(134217728),
GPU: thrift.Int64Ptr(0)}}}
assert.ObjectsAreEqualValues(thermosExpected, thermos)
} }