From 2f7015571cec92d09d9a55f6ac87a9c293c8807d Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Tue, 8 Jan 2019 15:11:52 -0800 Subject: [PATCH] Adding support for setting GPU as a resource. (#93) * Adding support for setting GPU as a resource. * Refactoring pulse update test. --- job.go | 40 +++++++++++++-------- realis_e2e_test.go | 88 +++++++++++++++++++++++++--------------------- updatejob.go | 17 +++++---- 3 files changed, 83 insertions(+), 62 deletions(-) diff --git a/job.go b/job.go index eb7eaa0..86849a2 100644 --- a/job.go +++ b/job.go @@ -58,10 +58,19 @@ type Job interface { PartitionPolicy(policy *aurora.PartitionPolicy) Job } +type ResourceType int + +const ( + CPU ResourceType = iota + RAM + DISK + GPU +) + // Structure to collect all information pertaining to an Aurora job. type AuroraJob struct { jobConfig *aurora.JobConfiguration - resources map[string]*aurora.Resource + resources map[ResourceType]*aurora.Resource portCount int } @@ -88,15 +97,8 @@ func NewJob() Job { ramMb := aurora.NewResource() diskMb := aurora.NewResource() - resources := make(map[string]*aurora.Resource) - resources["cpu"] = numCpus - resources["ram"] = ramMb - resources["disk"] = diskMb - - taskConfig.Resources = make(map[*aurora.Resource]bool) - taskConfig.Resources[numCpus] = true - taskConfig.Resources[ramMb] = true - taskConfig.Resources[diskMb] = true + resources := map[ResourceType]*aurora.Resource{CPU: numCpus, RAM: ramMb, DISK: diskMb} + taskConfig.Resources = map[*aurora.Resource]bool{numCpus: true, ramMb: true, diskMb: true} numCpus.NumCpus = new(float64) ramMb.RamMb = new(int64) @@ -155,20 +157,28 @@ func (j *AuroraJob) ExecutorData(data string) Job { } func (j *AuroraJob) CPU(cpus float64) Job { - *j.resources["cpu"].NumCpus = cpus - + *j.resources[CPU].NumCpus = cpus return j } func (j *AuroraJob) RAM(ram int64) Job { - *j.resources["ram"].RamMb = ram - + *j.resources[RAM].RamMb = ram return j } func (j *AuroraJob) Disk(disk int64) Job { - *j.resources["disk"].DiskMb = disk + *j.resources[DISK].DiskMb = disk + return j +} +func (j *AuroraJob) GPU(gpus int64) Job { + if _, ok := j.resources[GPU]; !ok { + numGPUs := &aurora.Resource{NumGpus: new(int64)} + j.resources[GPU] = numGPUs + j.TaskConfig().Resources[numGPUs] = true + } + + *j.resources[GPU].NumGpus = gpus return j } diff --git a/realis_e2e_test.go b/realis_e2e_test.go index 7fb3504..163e67c 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -22,11 +22,13 @@ import ( "testing" "time" + "git.apache.org/thrift.git/lib/go/thrift" realis "github.com/paypal/gorealis" "github.com/paypal/gorealis/gen-go/apache/aurora" "github.com/paypal/gorealis/response" "github.com/pkg/errors" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) var r realis.Realis @@ -309,10 +311,8 @@ func TestRealisClient_CreateService_WithPulse_Thermos(t *testing.T) { AddPorts(1). AddLabel("currentTime", time.Now().String()) - pulse := int32(30) - timeout := 300 settings := realis.NewUpdateSettings() - settings.BlockIfNoPulsesAfterMs = &pulse + settings.BlockIfNoPulsesAfterMs = thrift.Int32Ptr(30) settings.UpdateGroupSize = 1 settings.WaitForBatchCompletion = true job.InstanceCount(2) @@ -327,52 +327,58 @@ func TestRealisClient_CreateService_WithPulse_Thermos(t *testing.T) { Limit: 1, } - start := time.Now() - for i := 0; i*int(pulse) <= timeout; i++ { + var updateDetails []*aurora.JobUpdateDetails - fmt.Println("sending PulseJobUpdate....") - resp, err = r.PulseJobUpdate(result.GetKey()) - assert.NotNil(t, resp) - assert.Nil(t, err) + ticker := time.NewTicker(time.Second * 3) + timer := time.NewTimer(time.Minute * 6) + defer ticker.Stop() + defer timer.Stop() - respDetail, err := r.JobUpdateDetails(updateQ) - assert.Nil(t, err) +pulseLoop: + for { + select { + case <-ticker.C: - updateDetail := response.JobUpdateDetails(respDetail) - if len(updateDetail) == 0 { - fmt.Println("No update found") - assert.NotEqual(t, len(updateDetail), 0) - } - status := updateDetail[0].Update.Summary.State.Status + fmt.Println("sending PulseJobUpdate....") + resp, err = r.PulseJobUpdate(result.GetKey()) + require.NotNil(t, resp, "received a nil response from Aurora") + assert.Nil(t, err) - if _, ok := aurora.ACTIVE_JOB_UPDATE_STATES[status]; !ok { + respDetail, err := r.JobUpdateDetails(updateQ) + assert.Nil(t, err) - // Rolled forward is the only state in which an update has been successfully updated - // if we encounter an inactive state and it is not at rolled forward, update failed - if status == aurora.JobUpdateStatus_ROLLED_FORWARD { - fmt.Println("Update succeded") - break - } else { - fmt.Println("Update failed") - break + updateDetails = response.JobUpdateDetails(respDetail) + if len(updateDetails) == 0 { + fmt.Println("No update found") + assert.NotEqual(t, len(updateDetails), 0) } + status := updateDetails[0].Update.Summary.State.Status + + if _, ok := aurora.ACTIVE_JOB_UPDATE_STATES[status]; !ok { + + // Rolled forward is the only state in which an update has been successfully updated + // if we encounter an inactive state and it is not at rolled forward, update failed + if status == aurora.JobUpdateStatus_ROLLED_FORWARD { + fmt.Println("Update succeded") + break pulseLoop + } else { + fmt.Println("Update failed") + break pulseLoop + } + } + + fmt.Println("Polling, update still active...") + case <-timer.C: + _, err := r.AbortJobUpdate(*updateDetails[0].GetUpdate().GetSummary().GetKey(), "") + assert.NoError(t, err) + _, err = r.KillJob(job.JobKey()) + require.NoError(t, err, "timed out during pulse update test") } - - fmt.Println("Polling, update still active...") - time.Sleep(time.Duration(pulse) * time.Second) } - end := time.Now() - fmt.Printf("Update call took %d ns\n", (end.UnixNano() - start.UnixNano())) - - t.Run("TestRealisClient_KillJob_Thermos", func(t *testing.T) { - start := time.Now() - resp, err := r.KillJob(job.JobKey()) - end := time.Now() - assert.NoError(t, err) - assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode) - fmt.Printf("Kill call took %d ns\n", (end.UnixNano() - start.UnixNano())) - }) + resp, err = r.KillJob(job.JobKey()) + assert.NoError(t, err) + assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode) } // Test configuring an executor that doesn't exist for CreateJob API @@ -403,7 +409,7 @@ func TestRealisClient_CreateService(t *testing.T) { var ok bool var mErr error - if ok, mErr = monitor.JobUpdate(*result.GetKey(), 5, 180); !ok || mErr != nil { + if ok, mErr = monitor.JobUpdate(*result.GetKey(), 5, 240); !ok || mErr != nil { // Update may already be in a terminal state so don't check for error _, err := r.AbortJobUpdate(*result.GetKey(), "Monitor timed out.") diff --git a/updatejob.go b/updatejob.go index 4ee4f14..f7ef2fb 100644 --- a/updatejob.go +++ b/updatejob.go @@ -37,17 +37,17 @@ func NewDefaultUpdateJob(config *aurora.TaskConfig) *UpdateJob { // Rebuild resource map from TaskConfig for ptr := range config.Resources { if ptr.NumCpus != nil { - job.resources["cpu"].NumCpus = ptr.NumCpus + job.resources[CPU].NumCpus = ptr.NumCpus continue // Guard against Union violations that Go won't enforce } if ptr.RamMb != nil { - job.resources["ram"].RamMb = ptr.RamMb + job.resources[RAM].RamMb = ptr.RamMb continue } if ptr.DiskMb != nil { - job.resources["disk"].DiskMb = ptr.DiskMb + job.resources[DISK].DiskMb = ptr.DiskMb continue } } @@ -77,19 +77,24 @@ func NewUpdateJob(config *aurora.TaskConfig, settings *aurora.JobUpdateSettings) // Rebuild resource map from TaskConfig for ptr := range config.Resources { if ptr.NumCpus != nil { - job.resources["cpu"].NumCpus = ptr.NumCpus + job.resources[CPU].NumCpus = ptr.NumCpus continue // Guard against Union violations that Go won't enforce } if ptr.RamMb != nil { - job.resources["ram"].RamMb = ptr.RamMb + job.resources[RAM].RamMb = ptr.RamMb continue } if ptr.DiskMb != nil { - job.resources["disk"].DiskMb = ptr.DiskMb + job.resources[DISK].DiskMb = ptr.DiskMb continue } + + if ptr.NumGpus != nil { + job.resources[GPU].NumGpus = ptr.NumGpus + continue // Guard against Union violations that Go won't enforce + } } //TODO(rdelvalle): Deep copy job struct to avoid unexpected behavior