Compare commits

...

7 commits

Author SHA1 Message Date
Renan DelValle
73e7ab2671
Releasing version 1.3.1 2019-01-08 15:57:19 -08:00
Renan DelValle
22b1d82d88
Bug fix for logger interface. Varidic arguments need to be unrolled when passed to print functions. 2019-01-08 15:37:25 -08:00
Renan DelValle
2f7015571c
Adding support for setting GPU as a resource. (#93)
* Adding support for setting GPU as a resource.
* Refactoring pulse update test.
2019-01-08 15:11:52 -08:00
Robert Allen
296af622d1 This adds the following function to the PartitionPolicy configuration to the Job interface (#91)
* Adding Partition Policy API
2018-12-20 14:38:06 -08:00
Renan DelValle
9a835631b2
Running goimports on all repository to conform to newest goimports. 2018-12-19 15:33:35 -08:00
Renan DelValle
b100158080
Updating Travis CI config file to include running CI on master-v2.0 branch 2018-12-19 15:30:22 -08:00
Renan DelValle
45a4416830
Adding .gitattributes to ignore generated files. 2018-12-03 16:09:46 -08:00
11 changed files with 146 additions and 71 deletions

3
.gitattributes vendored Normal file
View file

@ -0,0 +1,3 @@
gen-go/ linguist-generated=true
vendor/ linguist-generated=true
Gopkg.lock linguist-generated=true

View file

@ -2,6 +2,11 @@ sudo: required
language: go language: go
branches:
only:
- master
- master-v2.0
go: go:
- "1.10.x" - "1.10.x"

View file

@ -18,7 +18,7 @@ import (
"fmt" "fmt"
"testing" "testing"
"github.com/paypal/gorealis" realis "github.com/paypal/gorealis"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )

View file

@ -22,7 +22,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/paypal/gorealis" realis "github.com/paypal/gorealis"
"github.com/paypal/gorealis/gen-go/apache/aurora" "github.com/paypal/gorealis/gen-go/apache/aurora"
"github.com/paypal/gorealis/response" "github.com/paypal/gorealis/response"
) )

View file

@ -23,7 +23,7 @@ import (
"os" "os"
"time" "time"
"github.com/paypal/gorealis" realis "github.com/paypal/gorealis"
"github.com/paypal/gorealis/gen-go/apache/aurora" "github.com/paypal/gorealis/gen-go/apache/aurora"
"github.com/pkg/errors" "github.com/pkg/errors"
) )

48
job.go
View file

@ -55,12 +55,22 @@ type Job interface {
GetInstanceCount() int32 GetInstanceCount() int32
MaxFailure(maxFail int32) Job MaxFailure(maxFail int32) Job
Container(container Container) Job Container(container Container) Job
PartitionPolicy(policy *aurora.PartitionPolicy) Job
} }
type ResourceType int
const (
CPU ResourceType = iota
RAM
DISK
GPU
)
// Structure to collect all information pertaining to an Aurora job. // Structure to collect all information pertaining to an Aurora job.
type AuroraJob struct { type AuroraJob struct {
jobConfig *aurora.JobConfiguration jobConfig *aurora.JobConfiguration
resources map[string]*aurora.Resource resources map[ResourceType]*aurora.Resource
portCount int portCount int
} }
@ -87,15 +97,8 @@ func NewJob() Job {
ramMb := aurora.NewResource() ramMb := aurora.NewResource()
diskMb := aurora.NewResource() diskMb := aurora.NewResource()
resources := make(map[string]*aurora.Resource) resources := map[ResourceType]*aurora.Resource{CPU: numCpus, RAM: ramMb, DISK: diskMb}
resources["cpu"] = numCpus taskConfig.Resources = map[*aurora.Resource]bool{numCpus: true, ramMb: true, diskMb: true}
resources["ram"] = ramMb
resources["disk"] = diskMb
taskConfig.Resources = make(map[*aurora.Resource]bool)
taskConfig.Resources[numCpus] = true
taskConfig.Resources[ramMb] = true
taskConfig.Resources[diskMb] = true
numCpus.NumCpus = new(float64) numCpus.NumCpus = new(float64)
ramMb.RamMb = new(int64) ramMb.RamMb = new(int64)
@ -154,20 +157,28 @@ func (j *AuroraJob) ExecutorData(data string) Job {
} }
func (j *AuroraJob) CPU(cpus float64) Job { func (j *AuroraJob) CPU(cpus float64) Job {
*j.resources["cpu"].NumCpus = cpus *j.resources[CPU].NumCpus = cpus
return j return j
} }
func (j *AuroraJob) RAM(ram int64) Job { func (j *AuroraJob) RAM(ram int64) Job {
*j.resources["ram"].RamMb = ram *j.resources[RAM].RamMb = ram
return j return j
} }
func (j *AuroraJob) Disk(disk int64) Job { func (j *AuroraJob) Disk(disk int64) Job {
*j.resources["disk"].DiskMb = disk *j.resources[DISK].DiskMb = disk
return j
}
func (j *AuroraJob) GPU(gpus int64) Job {
if _, ok := j.resources[GPU]; !ok {
numGPUs := &aurora.Resource{NumGpus: new(int64)}
j.resources[GPU] = numGPUs
j.TaskConfig().Resources[numGPUs] = true
}
*j.resources[GPU].NumGpus = gpus
return j return j
} }
@ -315,3 +326,10 @@ func (j *AuroraJob) Container(container Container) Job {
return j return j
} }
// Set a partition policy for the job configuration to implement.
func (j *AuroraJob) PartitionPolicy(policy *aurora.PartitionPolicy) Job {
j.jobConfig.TaskConfig.PartitionPolicy = policy
return j
}

View file

@ -40,20 +40,20 @@ func (l *LevelLogger) EnableDebug(enable bool) {
func (l LevelLogger) DebugPrintf(format string, a ...interface{}) { func (l LevelLogger) DebugPrintf(format string, a ...interface{}) {
if l.debug { if l.debug {
l.Print("[DEBUG] ") l.Print("[DEBUG] ")
l.Printf(format, a) l.Printf(format, a...)
} }
} }
func (l LevelLogger) DebugPrint(a ...interface{}) { func (l LevelLogger) DebugPrint(a ...interface{}) {
if l.debug { if l.debug {
l.Print("[DEBUG] ") l.Print("[DEBUG] ")
l.Print(a) l.Print(a...)
} }
} }
func (l LevelLogger) DebugPrintln(a ...interface{}) { func (l LevelLogger) DebugPrintln(a ...interface{}) {
if l.debug { if l.debug {
l.Print("[DEBUG] ") l.Print("[DEBUG] ")
l.Println(a) l.Println(a...)
} }
} }

View file

@ -36,7 +36,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
) )
const VERSION = "1.3.0" const VERSION = "1.3.1"
// TODO(rdelvalle): Move documentation to interface in order to make godoc look better/more accessible // TODO(rdelvalle): Move documentation to interface in order to make godoc look better/more accessible
type Realis interface { type Realis interface {

View file

@ -22,11 +22,13 @@ import (
"testing" "testing"
"time" "time"
"github.com/paypal/gorealis" "git.apache.org/thrift.git/lib/go/thrift"
realis "github.com/paypal/gorealis"
"github.com/paypal/gorealis/gen-go/apache/aurora" "github.com/paypal/gorealis/gen-go/apache/aurora"
"github.com/paypal/gorealis/response" "github.com/paypal/gorealis/response"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
var r realis.Realis var r realis.Realis
@ -309,10 +311,8 @@ func TestRealisClient_CreateService_WithPulse_Thermos(t *testing.T) {
AddPorts(1). AddPorts(1).
AddLabel("currentTime", time.Now().String()) AddLabel("currentTime", time.Now().String())
pulse := int32(30)
timeout := 300
settings := realis.NewUpdateSettings() settings := realis.NewUpdateSettings()
settings.BlockIfNoPulsesAfterMs = &pulse settings.BlockIfNoPulsesAfterMs = thrift.Int32Ptr(30)
settings.UpdateGroupSize = 1 settings.UpdateGroupSize = 1
settings.WaitForBatchCompletion = true settings.WaitForBatchCompletion = true
job.InstanceCount(2) job.InstanceCount(2)
@ -327,52 +327,58 @@ func TestRealisClient_CreateService_WithPulse_Thermos(t *testing.T) {
Limit: 1, Limit: 1,
} }
start := time.Now() var updateDetails []*aurora.JobUpdateDetails
for i := 0; i*int(pulse) <= timeout; i++ {
fmt.Println("sending PulseJobUpdate....") ticker := time.NewTicker(time.Second * 3)
resp, err = r.PulseJobUpdate(result.GetKey()) timer := time.NewTimer(time.Minute * 6)
assert.NotNil(t, resp) defer ticker.Stop()
assert.Nil(t, err) defer timer.Stop()
respDetail, err := r.JobUpdateDetails(updateQ) pulseLoop:
assert.Nil(t, err) for {
select {
case <-ticker.C:
updateDetail := response.JobUpdateDetails(respDetail) fmt.Println("sending PulseJobUpdate....")
if len(updateDetail) == 0 { resp, err = r.PulseJobUpdate(result.GetKey())
fmt.Println("No update found") require.NotNil(t, resp, "received a nil response from Aurora")
assert.NotEqual(t, len(updateDetail), 0) assert.Nil(t, err)
}
status := updateDetail[0].Update.Summary.State.Status
if _, ok := aurora.ACTIVE_JOB_UPDATE_STATES[status]; !ok { respDetail, err := r.JobUpdateDetails(updateQ)
assert.Nil(t, err)
// Rolled forward is the only state in which an update has been successfully updated updateDetails = response.JobUpdateDetails(respDetail)
// if we encounter an inactive state and it is not at rolled forward, update failed if len(updateDetails) == 0 {
if status == aurora.JobUpdateStatus_ROLLED_FORWARD { fmt.Println("No update found")
fmt.Println("Update succeded") assert.NotEqual(t, len(updateDetails), 0)
break
} else {
fmt.Println("Update failed")
break
} }
status := updateDetails[0].Update.Summary.State.Status
if _, ok := aurora.ACTIVE_JOB_UPDATE_STATES[status]; !ok {
// Rolled forward is the only state in which an update has been successfully updated
// if we encounter an inactive state and it is not at rolled forward, update failed
if status == aurora.JobUpdateStatus_ROLLED_FORWARD {
fmt.Println("Update succeded")
break pulseLoop
} else {
fmt.Println("Update failed")
break pulseLoop
}
}
fmt.Println("Polling, update still active...")
case <-timer.C:
_, err := r.AbortJobUpdate(*updateDetails[0].GetUpdate().GetSummary().GetKey(), "")
assert.NoError(t, err)
_, err = r.KillJob(job.JobKey())
require.NoError(t, err, "timed out during pulse update test")
} }
fmt.Println("Polling, update still active...")
time.Sleep(time.Duration(pulse) * time.Second)
} }
end := time.Now()
fmt.Printf("Update call took %d ns\n", (end.UnixNano() - start.UnixNano()))
t.Run("TestRealisClient_KillJob_Thermos", func(t *testing.T) {
start := time.Now()
resp, err := r.KillJob(job.JobKey())
end := time.Now()
assert.NoError(t, err)
assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode)
fmt.Printf("Kill call took %d ns\n", (end.UnixNano() - start.UnixNano()))
})
resp, err = r.KillJob(job.JobKey())
assert.NoError(t, err)
assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode)
} }
// Test configuring an executor that doesn't exist for CreateJob API // Test configuring an executor that doesn't exist for CreateJob API
@ -403,7 +409,7 @@ func TestRealisClient_CreateService(t *testing.T) {
var ok bool var ok bool
var mErr error var mErr error
if ok, mErr = monitor.JobUpdate(*result.GetKey(), 5, 180); !ok || mErr != nil { if ok, mErr = monitor.JobUpdate(*result.GetKey(), 5, 240); !ok || mErr != nil {
// Update may already be in a terminal state so don't check for error // Update may already be in a terminal state so don't check for error
_, err := r.AbortJobUpdate(*result.GetKey(), "Monitor timed out.") _, err := r.AbortJobUpdate(*result.GetKey(), "Monitor timed out.")
@ -692,3 +698,41 @@ func TestRealisClient_ForceExplicitTaskReconciliation(t *testing.T) {
err = r.ForceExplicitTaskReconciliation(&batchSize) err = r.ForceExplicitTaskReconciliation(&batchSize)
assert.NoError(t, err) assert.NoError(t, err)
} }
func TestRealisClient_PartitionPolicy(t *testing.T) {
role := "vagrant"
var partitionDelay int64 = 30
job := realis.NewJob().
Environment("prod").
Role(role).
Name("create_thermos_job_partition_policy_test").
ExecutorName(aurora.AURORA_EXECUTOR_NAME).
ExecutorData(string(thermosPayload)).
CPU(.5).
RAM(64).
Disk(100).
IsService(true).
InstanceCount(2).
PartitionPolicy(&aurora.PartitionPolicy{Reschedule: true, DelaySecs: &partitionDelay})
settings := realis.NewUpdateSettings()
settings.UpdateGroupSize = 2
resp, result, err := r.CreateService(job, settings)
assert.NoError(t, err)
assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode)
var ok bool
var mErr error
if ok, mErr = monitor.JobUpdate(*result.GetKey(), 5, 180); !ok || mErr != nil {
// Update may already be in a terminal state so don't check for error
_, err := r.AbortJobUpdate(*result.GetKey(), "Monitor timed out.")
_, err = r.KillJob(job.JobKey())
assert.NoError(t, err)
}
}

View file

@ -37,17 +37,17 @@ func NewDefaultUpdateJob(config *aurora.TaskConfig) *UpdateJob {
// Rebuild resource map from TaskConfig // Rebuild resource map from TaskConfig
for ptr := range config.Resources { for ptr := range config.Resources {
if ptr.NumCpus != nil { if ptr.NumCpus != nil {
job.resources["cpu"].NumCpus = ptr.NumCpus job.resources[CPU].NumCpus = ptr.NumCpus
continue // Guard against Union violations that Go won't enforce continue // Guard against Union violations that Go won't enforce
} }
if ptr.RamMb != nil { if ptr.RamMb != nil {
job.resources["ram"].RamMb = ptr.RamMb job.resources[RAM].RamMb = ptr.RamMb
continue continue
} }
if ptr.DiskMb != nil { if ptr.DiskMb != nil {
job.resources["disk"].DiskMb = ptr.DiskMb job.resources[DISK].DiskMb = ptr.DiskMb
continue continue
} }
} }
@ -77,19 +77,24 @@ func NewUpdateJob(config *aurora.TaskConfig, settings *aurora.JobUpdateSettings)
// Rebuild resource map from TaskConfig // Rebuild resource map from TaskConfig
for ptr := range config.Resources { for ptr := range config.Resources {
if ptr.NumCpus != nil { if ptr.NumCpus != nil {
job.resources["cpu"].NumCpus = ptr.NumCpus job.resources[CPU].NumCpus = ptr.NumCpus
continue // Guard against Union violations that Go won't enforce continue // Guard against Union violations that Go won't enforce
} }
if ptr.RamMb != nil { if ptr.RamMb != nil {
job.resources["ram"].RamMb = ptr.RamMb job.resources[RAM].RamMb = ptr.RamMb
continue continue
} }
if ptr.DiskMb != nil { if ptr.DiskMb != nil {
job.resources["disk"].DiskMb = ptr.DiskMb job.resources[DISK].DiskMb = ptr.DiskMb
continue continue
} }
if ptr.NumGpus != nil {
job.resources[GPU].NumGpus = ptr.NumGpus
continue // Guard against Union violations that Go won't enforce
}
} }
//TODO(rdelvalle): Deep copy job struct to avoid unexpected behavior //TODO(rdelvalle): Deep copy job struct to avoid unexpected behavior

View file

@ -20,7 +20,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/paypal/gorealis" realis "github.com/paypal/gorealis"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )