Adding support for setting GPU as a resource.

This commit is contained in:
Renan DelValle 2019-01-07 17:12:17 -08:00
parent 9a835631b2
commit 25f5ff9f07
No known key found for this signature in database
GPG key ID: C240AD6D6F443EC9
3 changed files with 86 additions and 57 deletions

37
job.go
View file

@ -57,10 +57,19 @@ type Job interface {
Container(container Container) Job
}
type ResourceType int
const (
CPU ResourceType = iota
RAM
DISK
GPU
)
// Structure to collect all information pertaining to an Aurora job.
type AuroraJob struct {
jobConfig *aurora.JobConfiguration
resources map[string]*aurora.Resource
resources map[ResourceType]*aurora.Resource
portCount int
}
@ -87,10 +96,10 @@ func NewJob() Job {
ramMb := aurora.NewResource()
diskMb := aurora.NewResource()
resources := make(map[string]*aurora.Resource)
resources["cpu"] = numCpus
resources["ram"] = ramMb
resources["disk"] = diskMb
resources := make(map[ResourceType]*aurora.Resource)
resources[CPU] = numCpus
resources[RAM] = ramMb
resources[DISK] = diskMb
taskConfig.Resources = make(map[*aurora.Resource]bool)
taskConfig.Resources[numCpus] = true
@ -154,20 +163,28 @@ func (j *AuroraJob) ExecutorData(data string) Job {
}
func (j *AuroraJob) CPU(cpus float64) Job {
*j.resources["cpu"].NumCpus = cpus
*j.resources[CPU].NumCpus = cpus
return j
}
func (j *AuroraJob) RAM(ram int64) Job {
*j.resources["ram"].RamMb = ram
*j.resources[RAM].RamMb = ram
return j
}
func (j *AuroraJob) Disk(disk int64) Job {
*j.resources["disk"].DiskMb = disk
*j.resources[DISK].DiskMb = disk
return j
}
func (j *AuroraJob) GPU(gpus int64) Job {
if _,ok := j.resources[GPU]; !ok {
numGPUs := &aurora.Resource{NumGpus: new(int64)}
j.resources[GPU] = numGPUs
j.TaskConfig().Resources[numGPUs] = true
}
*j.resources[GPU].NumGpus = gpus
return j
}

View file

@ -22,6 +22,7 @@ import (
"testing"
"time"
"git.apache.org/thrift.git/lib/go/thrift"
realis "github.com/paypal/gorealis"
"github.com/paypal/gorealis/gen-go/apache/aurora"
"github.com/paypal/gorealis/response"
@ -309,10 +310,8 @@ func TestRealisClient_CreateService_WithPulse_Thermos(t *testing.T) {
AddPorts(1).
AddLabel("currentTime", time.Now().String())
pulse := int32(30)
timeout := 300
settings := realis.NewUpdateSettings()
settings.BlockIfNoPulsesAfterMs = &pulse
settings.BlockIfNoPulsesAfterMs = thrift.Int32Ptr(30)
settings.UpdateGroupSize = 1
settings.WaitForBatchCompletion = true
job.InstanceCount(2)
@ -327,52 +326,60 @@ func TestRealisClient_CreateService_WithPulse_Thermos(t *testing.T) {
Limit: 1,
}
start := time.Now()
for i := 0; i*int(pulse) <= timeout; i++ {
var updateDetails []*aurora.JobUpdateDetails
fmt.Println("sending PulseJobUpdate....")
resp, err = r.PulseJobUpdate(result.GetKey())
assert.NotNil(t, resp)
assert.Nil(t, err)
ticker := time.NewTicker(time.Second * 3)
timer := time.NewTimer(time.Minute * 6)
defer ticker.Stop()
defer timer.Stop()
respDetail, err := r.JobUpdateDetails(updateQ)
assert.Nil(t, err)
pulseLoop:
for {
select {
case <-ticker.C:
updateDetail := response.JobUpdateDetails(respDetail)
if len(updateDetail) == 0 {
fmt.Println("No update found")
assert.NotEqual(t, len(updateDetail), 0)
}
status := updateDetail[0].Update.Summary.State.Status
if _, ok := aurora.ACTIVE_JOB_UPDATE_STATES[status]; !ok {
fmt.Println("sending PulseJobUpdate....")
resp, err = r.PulseJobUpdate(result.GetKey())
assert.NotNil(t, resp)
assert.Nil(t, err)
// Rolled forward is the only state in which an update has been successfully updated
// if we encounter an inactive state and it is not at rolled forward, update failed
if status == aurora.JobUpdateStatus_ROLLED_FORWARD {
fmt.Println("Update succeded")
break
} else {
fmt.Println("Update failed")
break
respDetail, err := r.JobUpdateDetails(updateQ)
assert.Nil(t, err)
updateDetails = response.JobUpdateDetails(respDetail)
if len(updateDetails) == 0 {
fmt.Println("No update found")
assert.NotEqual(t, len(updateDetails), 0)
}
status := updateDetails[0].Update.Summary.State.Status
if _, ok := aurora.ACTIVE_JOB_UPDATE_STATES[status]; !ok {
// Rolled forward is the only state in which an update has been successfully updated
// if we encounter an inactive state and it is not at rolled forward, update failed
if status == aurora.JobUpdateStatus_ROLLED_FORWARD {
fmt.Println("Update succeded")
break pulseLoop
} else {
fmt.Println("Update failed")
break pulseLoop
}
}
fmt.Println("Polling, update still active...")
case <-timer.C:
_, err := r.AbortJobUpdate(*updateDetails[0].GetUpdate().GetSummary().GetKey(), "")
assert.NoError(t, err)
_, err = r.KillJob(job.JobKey())
assert.NoError(t, err)
assert.FailNow(t, "timed out during pulse update test")
}
fmt.Println("Polling, update still active...")
time.Sleep(time.Duration(pulse) * time.Second)
}
end := time.Now()
fmt.Printf("Update call took %d ns\n", (end.UnixNano() - start.UnixNano()))
t.Run("TestRealisClient_KillJob_Thermos", func(t *testing.T) {
start := time.Now()
resp, err := r.KillJob(job.JobKey())
end := time.Now()
assert.NoError(t, err)
assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode)
fmt.Printf("Kill call took %d ns\n", (end.UnixNano() - start.UnixNano()))
})
resp, err = r.KillJob(job.JobKey())
assert.NoError(t, err)
assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode)
}
// Test configuring an executor that doesn't exist for CreateJob API
@ -403,7 +410,7 @@ func TestRealisClient_CreateService(t *testing.T) {
var ok bool
var mErr error
if ok, mErr = monitor.JobUpdate(*result.GetKey(), 5, 180); !ok || mErr != nil {
if ok, mErr = monitor.JobUpdate(*result.GetKey(), 5, 240); !ok || mErr != nil {
// Update may already be in a terminal state so don't check for error
_, err := r.AbortJobUpdate(*result.GetKey(), "Monitor timed out.")

View file

@ -37,17 +37,17 @@ func NewDefaultUpdateJob(config *aurora.TaskConfig) *UpdateJob {
// Rebuild resource map from TaskConfig
for ptr := range config.Resources {
if ptr.NumCpus != nil {
job.resources["cpu"].NumCpus = ptr.NumCpus
job.resources[CPU].NumCpus = ptr.NumCpus
continue // Guard against Union violations that Go won't enforce
}
if ptr.RamMb != nil {
job.resources["ram"].RamMb = ptr.RamMb
job.resources[RAM].RamMb = ptr.RamMb
continue
}
if ptr.DiskMb != nil {
job.resources["disk"].DiskMb = ptr.DiskMb
job.resources[DISK].DiskMb = ptr.DiskMb
continue
}
}
@ -77,19 +77,24 @@ func NewUpdateJob(config *aurora.TaskConfig, settings *aurora.JobUpdateSettings)
// Rebuild resource map from TaskConfig
for ptr := range config.Resources {
if ptr.NumCpus != nil {
job.resources["cpu"].NumCpus = ptr.NumCpus
job.resources[CPU].NumCpus = ptr.NumCpus
continue // Guard against Union violations that Go won't enforce
}
if ptr.RamMb != nil {
job.resources["ram"].RamMb = ptr.RamMb
job.resources[RAM].RamMb = ptr.RamMb
continue
}
if ptr.DiskMb != nil {
job.resources["disk"].DiskMb = ptr.DiskMb
job.resources[DISK].DiskMb = ptr.DiskMb
continue
}
if ptr.NumGpus != nil {
job.resources[GPU].NumGpus = ptr.NumGpus
continue // Guard against Union violations that Go won't enforce
}
}
//TODO(rdelvalle): Deep copy job struct to avoid unexpected behavior