From e13349db26d328cab0d1cd94b482f6259bf62e8f Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Mon, 7 Jan 2019 14:38:08 -0800 Subject: [PATCH] Initial support for Thermos and GPU resources. --- examples/client.go | 16 ++- examples/thermos_payload.json | 62 ------------ job.go | 15 +++ jobUpdate.go | 27 +++-- realis.go | 30 ++++-- realis_e2e_test.go | 104 ++++++++++---------- task.go | 58 ++++++++++- thermos.go | 180 ++++++++++++++++++++++++++++++---- thermos_test.go | 43 ++++---- 9 files changed, 359 insertions(+), 176 deletions(-) delete mode 100644 examples/thermos_payload.json diff --git a/examples/client.go b/examples/client.go index 59991f7..7c92ac5 100644 --- a/examples/client.go +++ b/examples/client.go @@ -17,7 +17,6 @@ package main import ( "flag" "fmt" - "io/ioutil" "log" "strings" "time" @@ -114,23 +113,21 @@ func main() { switch executor { case "thermos": - payload, err := ioutil.ReadFile("examples/thermos_payload.json") - if err != nil { - log.Fatalln("Error reading json config file: ", err) - } + thermosExec := realis.ThermosExecutor{} + thermosExec.AddProcess(realis.NewThermosProcess("boostrap", "echo bootsrapping")). + AddProcess(realis.NewThermosProcess("hello_gorealis", "while true; do echo hello world from gorealis; sleep 10; done")) job = realis.NewJob(). Environment("prod"). Role("vagrant"). Name("hello_world_from_gorealis"). - ExecutorName(aurora.AURORA_EXECUTOR_NAME). - ExecutorData(string(payload)). CPU(1). RAM(64). Disk(100). IsService(true). InstanceCount(1). - AddPorts(1) + AddPorts(1). + ThermosExecutor(thermosExec) case "compose": job = realis.NewJob(). Environment("prod"). @@ -180,7 +177,8 @@ func main() { case "createService": // Create a service with three instances using the update API instead of the createJob API fmt.Println("Creating service") - settings := realis.JobUpdateFromConfig(job.TaskConfig()).InstanceCount(3) + settings := realis.JobUpdateFromAuroraTask(job.AuroraTask()).InstanceCount(3) + result, err := r.CreateService(settings) if err != nil { log.Fatal("error: ", err) diff --git a/examples/thermos_payload.json b/examples/thermos_payload.json deleted file mode 100644 index db630fb..0000000 --- a/examples/thermos_payload.json +++ /dev/null @@ -1,62 +0,0 @@ -{ - "environment": "prod", - "health_check_config": { - "initial_interval_secs": 15.0, - "health_checker": { - "http": { - "expected_response_code": 0, - "endpoint": "/health", - "expected_response": "ok" - } - }, - "interval_secs": 10.0, - "timeout_secs": 1.0, - "max_consecutive_failures": 0 - }, - "name": "hello_world_from_gorealis", - "service": false, - "max_task_failures": 1, - "cron_collision_policy": "KILL_EXISTING", - "enable_hooks": false, - "cluster": "devcluster", - "task": { - "processes": [ - { - "daemon": false, - "name": "hello", - "ephemeral": false, - "max_failures": 1, - "min_duration": 5, - "cmdline": "\n while true; do\n echo hello world from gorealis\n sleep 10\n done\n ", - "final": false - } - ], - "name": "hello", - "finalization_wait": 30, - "max_failures": 1, - "max_concurrency": 0, - "resources": { - "gpu": 0, - "disk": 134217728, - "ram": 134217728, - "cpu": 1.0 - }, - "constraints": [ - { - "order": [ - "hello" - ] - } - ] - }, - "production": false, - "role": "vagrant", - "lifecycle": { - "http": { - "graceful_shutdown_endpoint": "/quitquitquit", - "port": "health", - "shutdown_endpoint": "/abortabortabort" - } - }, - "priority": 0 -} \ No newline at end of file diff --git a/job.go b/job.go index a9881b5..b7a3c52 100644 --- a/job.go +++ b/job.go @@ -60,6 +60,7 @@ func (j *AuroraJob) Role(role string) *AuroraJob { identity := &aurora.Identity{User: role} j.jobConfig.Owner = identity j.jobConfig.TaskConfig.Owner = identity + return j } @@ -100,6 +101,11 @@ func (j *AuroraJob) JobConfig() *aurora.JobConfiguration { return j.jobConfig } +// Get the current job configurations key to use for some realis calls. +func (j *AuroraJob) AuroraTask() *AuroraTask { + return j.task +} + /* AuroraTask specific API, see task.go for further documentation. These functions are provided for the convenience of chaining API calls. @@ -187,3 +193,12 @@ func (j *AuroraJob) Container(container Container) *AuroraJob { j.task.Container(container) return j } + +func (j *AuroraJob) ThermosExecutor(thermos ThermosExecutor) *AuroraJob { + j.task.ThermosExecutor(thermos) + return j +} + +func (j *AuroraJob) BuildThermosPayload() error { + return j.task.BuildThermosPayload() +} diff --git a/jobUpdate.go b/jobUpdate.go index bfc3820..c941cd5 100644 --- a/jobUpdate.go +++ b/jobUpdate.go @@ -129,19 +129,19 @@ func newUpdateSettings() *aurora.JobUpdateSettings { See task.go for further documentation. */ -func (t *JobUpdate) Environment(env string) *JobUpdate { - t.task.Environment(env) - return t +func (j *JobUpdate) Environment(env string) *JobUpdate { + j.task.Environment(env) + return j } -func (t *JobUpdate) Role(role string) *JobUpdate { - t.task.Role(role) - return t +func (j *JobUpdate) Role(role string) *JobUpdate { + j.task.Role(role) + return j } -func (t *JobUpdate) Name(name string) *JobUpdate { - t.task.Name(name) - return t +func (j *JobUpdate) Name(name string) *JobUpdate { + j.task.Name(name) + return j } func (j *JobUpdate) ExecutorName(name string) *JobUpdate { @@ -230,3 +230,12 @@ func (j *JobUpdate) Container(container Container) *JobUpdate { func (j *JobUpdate) JobKey() aurora.JobKey { return j.task.JobKey() } + +func (j *JobUpdate) ThermosExecutor(thermos ThermosExecutor) *JobUpdate { + j.task.ThermosExecutor(thermos) + return j +} + +func (j *JobUpdate) BuildThermosPayload() error { + return j.task.BuildThermosPayload() +} diff --git a/realis.go b/realis.go index 8840402..2754257 100644 --- a/realis.go +++ b/realis.go @@ -566,9 +566,15 @@ func (c *Client) KillJob(key aurora.JobKey) error { // as that API uses the update thrift call which has a few extra features available. // Use this API to create ad-hoc jobs. func (c *Client) CreateJob(auroraJob *AuroraJob) error { + // If no thermos configuration has been set this will result in a NOOP + err := auroraJob.BuildThermosPayload() c.logger.DebugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig()) + if err != nil { + return errors.Wrap(err, "Unable to create Thermos payload") + } + _, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { return c.client.CreateJob(nil, auroraJob.JobConfig()) }) @@ -587,16 +593,19 @@ func (c *Client) CreateService(update *JobUpdate) (*aurora.StartJobUpdateResult_ return nil, errors.Wrap(err, "unable to create service") } - if updateResult != nil { - return updateResult, nil - } - - return nil, errors.New("results object is nil") + return updateResult, err } func (c *Client) ScheduleCronJob(auroraJob *AuroraJob) error { + // If no thermos configuration has been set this will result in a NOOP + err := auroraJob.BuildThermosPayload() + c.logger.DebugPrintf("ScheduleCronJob Thrift Payload: %+v\n", auroraJob.JobConfig()) + if err != nil { + return errors.Wrap(err, "Unable to create Thermos payload") + } + _, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { return c.client.ScheduleCronJob(nil, auroraJob.JobConfig()) }) @@ -680,6 +689,10 @@ func (c *Client) RestartJob(key aurora.JobKey) error { // Update all tasks under a job configuration. Currently gorealis doesn't support for canary deployments. func (c *Client) StartJobUpdate(updateJob *JobUpdate, message string) (*aurora.StartJobUpdateResult_, error) { + if err := updateJob.BuildThermosPayload(); err != nil { + return nil, errors.New("unable to generate the proper Thermos executor payload") + } + c.logger.DebugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message) resp, retryErr := c.thriftCallWithRetries(func() (*aurora.Response, error) { @@ -689,7 +702,12 @@ func (c *Client) StartJobUpdate(updateJob *JobUpdate, message string) (*aurora.S if retryErr != nil { return nil, errors.Wrap(retryErr, "Error sending StartJobUpdate command to Aurora Scheduler") } - return resp.GetResult_().GetStartJobUpdateResult_(), nil + + if resp.GetResult_() != nil && resp.GetResult_().GetStartJobUpdateResult_() != nil { + return resp.GetResult_().GetStartJobUpdateResult_(), nil + } + + return nil, errors.New("Thrift error: Field in response is nil unexpectedly.") } // Abort AuroraJob Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI. diff --git a/realis_e2e_test.go b/realis_e2e_test.go index 3826fc8..f58d426 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -29,7 +29,7 @@ import ( ) var r *realis.Client -var thermosPayload []byte +var thermosExec realis.ThermosExecutor func TestMain(m *testing.M) { var err error @@ -46,11 +46,8 @@ func TestMain(m *testing.M) { defer r.Close() - thermosPayload, err = ioutil.ReadFile("examples/thermos_payload.json") - if err != nil { - fmt.Println("Error reading thermos payload file: ", err) - os.Exit(1) - } + thermosExec.AddProcess(realis.NewThermosProcess("boostrap", "echo bootsrapping")). + AddProcess(realis.NewThermosProcess("hello_gorealis", "while true; do echo hello world from gorealis; sleep 10; done")) os.Exit(m.Run()) } @@ -97,8 +94,7 @@ func TestBadCredentials(t *testing.T) { Environment("prod"). Role("vagrant"). Name("create_thermos_job_test"). - ExecutorName(aurora.AURORA_EXECUTOR_NAME). - ExecutorData(string(thermosPayload)). + ThermosExecutor(thermosExec). CPU(.5). RAM(64). Disk(100). @@ -230,8 +226,7 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) { Environment("prod"). Role(role). Name("create_thermos_job_test"). - ExecutorName(aurora.AURORA_EXECUTOR_NAME). - ExecutorData(string(thermosPayload)). + ThermosExecutor(thermosExec). CPU(.5). RAM(64). Disk(100). @@ -270,7 +265,7 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) { err := r.KillJob(job.JobKey()) assert.NoError(t, err) - success, err := r.InstancesMonitor(job.JobKey(), 0, 1*time.Second, 50*time.Second) + success, err := r.InstancesMonitor(job.JobKey(), 0, 1*time.Second, 60*time.Second) assert.True(t, success) assert.NoError(t, err) }) @@ -307,8 +302,7 @@ func TestRealisClient_GetPendingReason(t *testing.T) { Environment(env). Role(role). Name(name). - ExecutorName(aurora.AURORA_EXECUTOR_NAME). - ExecutorData(string(thermosPayload)). + ThermosExecutor(thermosExec). CPU(1000). RAM(64). Disk(100). @@ -338,12 +332,11 @@ func TestRealisClient_CreateService_WithPulse_Thermos(t *testing.T) { job := realis.NewJobUpdate(). Environment("prod"). Role(role). - Name("create_thermos_job_test"). - ExecutorName(aurora.AURORA_EXECUTOR_NAME). - ExecutorData(string(thermosPayload)). + Name("create_thermos_job_pulse_test"). CPU(.5). RAM(64). Disk(100). + ThermosExecutor(thermosExec). IsService(true). InstanceCount(2). AddPorts(1). @@ -351,8 +344,6 @@ func TestRealisClient_CreateService_WithPulse_Thermos(t *testing.T) { PulseIntervalTimeout(30 * time.Millisecond). BatchSize(1).WaitForBatchCompletion(true) - pulse := int32(30) - timeout := 300 result, err := r.CreateService(job) fmt.Println(result.String()) @@ -365,43 +356,52 @@ func TestRealisClient_CreateService_WithPulse_Thermos(t *testing.T) { var updateDetails []*aurora.JobUpdateDetails - for i := 0; i*int(pulse) <= timeout; i++ { + ticker := time.NewTicker(time.Second * 3) + timer := time.NewTimer(time.Minute * 6) + defer ticker.Stop() + defer timer.Stop() - pulseStatus, err := r.PulseJobUpdate(result.GetKey()) +pulseLoop: + for { + select { + case <-ticker.C: + pulseStatus, err := r.PulseJobUpdate(result.GetKey()) - assert.Nil(t, err) - if pulseStatus != aurora.JobUpdatePulseStatus_OK && pulseStatus != aurora.JobUpdatePulseStatus_FINISHED { - assert.Fail(t, "Pulse update status received doesn't exist") - } - - updateDetails, err = r.JobUpdateDetails(updateQ) - assert.Nil(t, err) - - assert.Equal(t, len(updateDetails), 1, "No update matching query found") - status := updateDetails[0].Update.Summary.State.Status - - if _, ok := realis.ActiveJobUpdateStates[status]; !ok { - - // Rolled forward is the only state in which an update has been successfully updated - // if we encounter an inactive state and it is not at rolled forward, update failed - if status == aurora.JobUpdateStatus_ROLLED_FORWARD { - fmt.Println("Update succeeded") - break - } else { - fmt.Println("Update failed") - break + assert.Nil(t, err) + if pulseStatus != aurora.JobUpdatePulseStatus_OK && pulseStatus != aurora.JobUpdatePulseStatus_FINISHED { + assert.FailNow(t, "pulse update status received doesn't exist") } + + updateDetails, err = r.JobUpdateDetails(updateQ) + assert.Nil(t, err) + + assert.Equal(t, len(updateDetails), 1, "no update matching query found") + status := updateDetails[0].Update.Summary.State.Status + + if _, ok := realis.ActiveJobUpdateStates[status]; !ok { + + // Rolled forward is the only state in which an update has been successfully updated + // if we encounter an inactive state and it is not at rolled forward, update failed + if status == aurora.JobUpdateStatus_ROLLED_FORWARD { + fmt.Println("Update succeeded") + break pulseLoop + } else { + fmt.Println("Update failed") + break pulseLoop + } + } + + case <-timer.C: + err := r.AbortJobUpdate(*updateDetails[0].GetUpdate().GetSummary().GetKey(), "") + assert.NoError(t, err) + err = r.KillJob(job.JobKey()) + assert.NoError(t, err) + assert.FailNow(t, "timed out during pulse update test") } - fmt.Println("Polling, update still active...") } - t.Run("TestRealisClient_KillJob_Thermos", func(t *testing.T) { - err := r.AbortJobUpdate(*updateDetails[0].GetUpdate().GetSummary().GetKey(), "") - assert.NoError(t, err) - err = r.KillJob(job.JobKey()) - assert.NoError(t, err) - }) - + err = r.KillJob(job.JobKey()) + assert.NoError(t, err) } // Test configuring an executor that doesn't exist for CreateJob API @@ -412,8 +412,7 @@ func TestRealisClient_CreateService(t *testing.T) { Environment("prod"). Role("vagrant"). Name("create_service_test"). - ExecutorName(aurora.AURORA_EXECUTOR_NAME). - ExecutorData(string(thermosPayload)). + ThermosExecutor(thermosExec). CPU(.25). RAM(4). Disk(10). @@ -609,8 +608,7 @@ func TestRealisClient_SessionThreadSafety(t *testing.T) { Environment("prod"). Role("vagrant"). Name("create_thermos_job_test_multi"). - ExecutorName(aurora.AURORA_EXECUTOR_NAME). - ExecutorData(string(thermosPayload)). + ThermosExecutor(thermosExec). CPU(.25). RAM(4). Disk(10). diff --git a/task.go b/task.go index f05cef6..d13d876 100644 --- a/task.go +++ b/task.go @@ -15,6 +15,7 @@ package realis import ( + "encoding/json" "strconv" "git.apache.org/thrift.git/lib/go/thrift" @@ -27,6 +28,7 @@ const ( CPU ResourceType = iota RAM DISK + GPU ) const ( @@ -38,12 +40,14 @@ type AuroraTask struct { task *aurora.TaskConfig resources map[ResourceType]*aurora.Resource portCount int + thermos *ThermosExecutor } func NewTask() *AuroraTask { numCpus := &aurora.Resource{} ramMb := &aurora.Resource{} diskMb := &aurora.Resource{} + numGpus := &aurora.Resource{} numCpus.NumCpus = new(float64) ramMb.RamMb = new(int64) @@ -53,6 +57,7 @@ func NewTask() *AuroraTask { resources[CPU] = numCpus resources[RAM] = ramMb resources[DISK] = diskMb + resources[GPU] = numGpus return &AuroraTask{task: &aurora.TaskConfig{ Job: &aurora.JobKey{}, @@ -122,7 +127,7 @@ func TaskFromThrift(config *aurora.TaskConfig) *AuroraTask { // Copy all ports for _, resource := range config.Resources { - // Copy only ports, skip CPU, RAM, and DISK + // Copy only ports. Set CPU, RAM, DISK, and GPU if resource != nil { if resource.NamedPort != nil { newTask.task.Resources = append(newTask.task.Resources, &aurora.Resource{NamedPort: thrift.StringPtr(*resource.NamedPort)}) @@ -140,6 +145,10 @@ func TaskFromThrift(config *aurora.TaskConfig) *AuroraTask { if resource.DiskMb != nil { newTask.Disk(*resource.DiskMb) } + + if resource.NumGpus != nil { + newTask.GPU(*resource.NumGpus) + } } } @@ -237,6 +246,11 @@ func (t *AuroraTask) Disk(disk int64) *AuroraTask { return t } +func (t *AuroraTask) GPU(gpu int64) *AuroraTask { + *t.resources[GPU].NumGpus = gpu + return t +} + func (t *AuroraTask) Tier(tier string) *AuroraTask { t.task.Tier = &tier return t @@ -363,5 +377,47 @@ func (t *AuroraTask) JobKey() aurora.JobKey { func (t *AuroraTask) Clone() *AuroraTask { newTask := TaskFromThrift(t.task) + + if t.thermos != nil { + newTask.ThermosExecutor(*t.thermos.Clone()) + } + return newTask } + +func (t *AuroraTask) ThermosExecutor(thermos ThermosExecutor) *AuroraTask { + t.thermos = &thermos + + return t +} + +func (t *AuroraTask) BuildThermosPayload() error { + if t.thermos != nil { + + // Set the correct resources + if t.resources[CPU].NumCpus != nil { + t.thermos.cpu(*t.resources[CPU].NumCpus) + } + + if t.resources[RAM].RamMb != nil { + t.thermos.ram(*t.resources[RAM].RamMb) + } + + if t.resources[DISK].DiskMb != nil { + t.thermos.disk(*t.resources[DISK].DiskMb) + } + + if t.resources[GPU].NumGpus != nil { + t.thermos.gpu(*t.resources[GPU].NumGpus) + } + + payload, err := json.Marshal(t.thermos) + if err != nil { + return err + } + + t.ExecutorName(aurora.AURORA_EXECUTOR_NAME) + t.ExecutorData(string(payload)) + } + return nil +} diff --git a/thermos.go b/thermos.go index c5ce098..e579f85 100644 --- a/thermos.go +++ b/thermos.go @@ -14,36 +14,182 @@ package realis +import "encoding/json" type ThermosExecutor struct { - Task ThermosTask `json:"task""` + Task ThermosTask `json:"task""` + order *ThermosConstraint `json:"-"` } type ThermosTask struct { - Processes []ThermosProcess `json:"processes"` - Constraints []ThermosConstraint `json:"constraints"` - Resources ThermosResources `json:"resources"` + Processes map[string]*ThermosProcess `json:"processes"` + Constraints []*ThermosConstraint `json:"constraints"` + Resources thermosResources `json:"resources"` } type ThermosConstraint struct { - Order []string `json:"order"` + Order []string `json:"order,omitempty"` } -type ThermosResources struct { - CPU float64 `json:"cpu"` - Disk int `json:"disk"` - RAM int `json:"ram"` - GPU int `json:"gpu"` +// This struct should always be controlled by the Aurora job struct. +// Therefore it is private. +type thermosResources struct { + CPU *float64 `json:"cpu,omitempty"` + Disk *int64 `json:"disk,omitempty"` + RAM *int64 `json:"ram,omitempty"` + GPU *int64 `json:"gpu,omitempty"` } type ThermosProcess struct { - Daemon bool `json:"daemon"` - Name string `json:"name"` - Ephemeral bool `json:"ephemeral"` - MaxFailures int `json:"max_failures"` - MinDuration int `json:"min_duration"` - Cmdline string `json:"cmdline"` - Final bool `json:"final` + Name string `json:"name"` + Cmdline string `json:"cmdline"` + Daemon bool `json:"daemon"` + Ephemeral bool `json:"ephemeral"` + MaxFailures int `json:"max_failures"` + MinDuration int `json:"min_duration"` + Final bool `json:"final"` } +func NewThermosProcess(name, command string) ThermosProcess { + return ThermosProcess{ + Name: name, + Cmdline: command, + MaxFailures: 1, + Daemon: false, + Ephemeral: false, + MinDuration: 5, + Final: false} +} +// Processes must have unique names. Adding a process whose name already exists will +// result in overwriting the previous version of the process. +func (t *ThermosExecutor) AddProcess(process ThermosProcess) *ThermosExecutor { + if len(t.Task.Processes) == 0 { + t.Task.Processes = make(map[string]*ThermosProcess, 0) + } + + t.Task.Processes[process.Name] = &process + + // Add Process to order + t.addToOrder(process.Name) + return t +} + +// Only constraint that should be added for now is the order of execution, therefore this +// receiver is private. +func (t *ThermosExecutor) addConstraint(constraint *ThermosConstraint) *ThermosExecutor { + if len(t.Task.Constraints) == 0 { + t.Task.Constraints = make([]*ThermosConstraint, 0) + } + + t.Task.Constraints = append(t.Task.Constraints, constraint) + return t +} + +// Order in which the Processes should be executed. Index 0 will be executed first, index N will be executed last. +func (t *ThermosExecutor) ProcessOrder(order ...string) *ThermosExecutor { + if t.order == nil { + t.order = &ThermosConstraint{} + t.addConstraint(t.order) + } + + t.order.Order = order + return t +} + +// Add Process to execution order. By default this is a FIFO setup. Custom order can be given by overriding +// with ProcessOrder +func (t *ThermosExecutor) addToOrder(name string) { + if t.order == nil { + t.order = &ThermosConstraint{Order: make([]string, 0)} + t.addConstraint(t.order) + } + + t.order.Order = append(t.order.Order, name) +} + +// Ram is determined by the job object. +func (t *ThermosExecutor) ram(ram int64) { + // Convert from bytes to MiB + ram *= 1024 ^ 2 + t.Task.Resources.RAM = &ram +} + +// Disk is determined by the job object. +func (t *ThermosExecutor) disk(disk int64) { + // Convert from bytes to MiB + disk *= 1024 ^ 2 + t.Task.Resources.Disk = &disk +} + +// CPU is determined by the job object. +func (t *ThermosExecutor) cpu(cpu float64) { + t.Task.Resources.CPU = &cpu +} + +// GPU is determined by the job object. +func (t *ThermosExecutor) gpu(gpu int64) { + t.Task.Resources.GPU = &gpu +} + +// Deep copy of Thermos executor +func (t *ThermosExecutor) Clone() *ThermosExecutor { + tNew := ThermosExecutor{} + + if t.order != nil { + tNew.order = &ThermosConstraint{Order: t.order.Order} + + tNew.addConstraint(tNew.order) + } + + tNew.Task.Processes = make(map[string]*ThermosProcess) + + for name, process := range t.Task.Processes { + newProcess := *process + tNew.Task.Processes[name] = &newProcess + } + + tNew.Task.Resources = t.Task.Resources + + return &tNew +} + +type thermosTaskJSON struct { + Processes []*ThermosProcess `json:"processes"` + Constraints []*ThermosConstraint `json:"constraints"` + Resources thermosResources `json:"resources"` +} + +// Custom Marshaling for Thermos Task to match what Thermos expects +func (t *ThermosTask) MarshalJSON() ([]byte, error) { + + // Convert map to array to match what Thermos expects + processes := make([]*ThermosProcess, 0) + for _, process := range t.Processes { + processes = append(processes, process) + } + + return json.Marshal(&thermosTaskJSON{ + Processes: processes, + Constraints: t.Constraints, + Resources: t.Resources, + }) +} + +// Custom Unmarshaling to match what Thermos would contain +func (t *ThermosTask) UnmarshalJSON(data []byte) error { + + // Thermos format + aux := &thermosTaskJSON{} + + if err := json.Unmarshal(data, &aux); err != nil { + return err + } + + processes := make(map[string]*ThermosProcess) + for _, process := range aux.Processes { + processes[process.Name] = process + } + + return nil +} diff --git a/thermos_test.go b/thermos_test.go index a3baa3b..142bcda 100644 --- a/thermos_test.go +++ b/thermos_test.go @@ -1,10 +1,10 @@ -package realis_test +package realis import ( "encoding/json" "testing" - "github.com/paypal/gorealis/v2" + "git.apache.org/thrift.git/lib/go/thrift" "github.com/stretchr/testify/assert" ) @@ -40,27 +40,32 @@ func TestThermosTask(t *testing.T) { ] } }`) - thermos := realis.ThermosExecutor{} + thermos := ThermosExecutor{} err := json.Unmarshal(thermosJSON, &thermos) assert.NoError(t, err) - thermosExpected := realis.ThermosExecutor{ - Task: realis.ThermosTask{ - Processes: []realis.ThermosProcess{ - { - Daemon: false, - Name: "hello", - Ephemeral: false, - MaxFailures: 1, - MinDuration: 5, - Cmdline: "\n while true; do\n echo hello world from gorealis\n sleep 10\n done\n ", - Final: false, - }, - }, - Constraints: []realis.ThermosConstraint{{Order: []string{"hello"}}}, - Resources: realis.ThermosResources{CPU: 1.1, Disk: 134217728, RAM: 134217728, GPU: 0}}} + process := &ThermosProcess{ + Daemon: false, + Name: "hello", + Ephemeral: false, + MaxFailures: 1, + MinDuration: 5, + Cmdline: "\n while true; do\n echo hello world from gorealis\n sleep 10\n done\n ", + Final: false, + } - assert.ObjectsAreEqual(thermosExpected, thermos) + constraint := &ThermosConstraint{Order: []string{process.Name}} + + thermosExpected := ThermosExecutor{ + Task: ThermosTask{ + Processes: map[string]*ThermosProcess{process.Name: process}, + Constraints: []*ThermosConstraint{constraint}, + Resources: thermosResources{CPU: thrift.Float64Ptr(1.1), + Disk: thrift.Int64Ptr(134217728), + RAM: thrift.Int64Ptr(134217728), + GPU: thrift.Int64Ptr(0)}}} + + assert.ObjectsAreEqualValues(thermosExpected, thermos) }