Compare commits
7 commits
Author | SHA1 | Date | |
---|---|---|---|
|
73e7ab2671 | ||
|
22b1d82d88 | ||
|
2f7015571c | ||
|
296af622d1 | ||
|
9a835631b2 | ||
|
b100158080 | ||
|
45a4416830 |
11 changed files with 146 additions and 71 deletions
3
.gitattributes
vendored
Normal file
3
.gitattributes
vendored
Normal file
|
@ -0,0 +1,3 @@
|
||||||
|
gen-go/ linguist-generated=true
|
||||||
|
vendor/ linguist-generated=true
|
||||||
|
Gopkg.lock linguist-generated=true
|
|
@ -2,6 +2,11 @@ sudo: required
|
||||||
|
|
||||||
language: go
|
language: go
|
||||||
|
|
||||||
|
branches:
|
||||||
|
only:
|
||||||
|
- master
|
||||||
|
- master-v2.0
|
||||||
|
|
||||||
go:
|
go:
|
||||||
- "1.10.x"
|
- "1.10.x"
|
||||||
|
|
||||||
|
|
|
@ -18,7 +18,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/paypal/gorealis"
|
realis "github.com/paypal/gorealis"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -22,7 +22,7 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/paypal/gorealis"
|
realis "github.com/paypal/gorealis"
|
||||||
"github.com/paypal/gorealis/gen-go/apache/aurora"
|
"github.com/paypal/gorealis/gen-go/apache/aurora"
|
||||||
"github.com/paypal/gorealis/response"
|
"github.com/paypal/gorealis/response"
|
||||||
)
|
)
|
||||||
|
|
|
@ -23,7 +23,7 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/paypal/gorealis"
|
realis "github.com/paypal/gorealis"
|
||||||
"github.com/paypal/gorealis/gen-go/apache/aurora"
|
"github.com/paypal/gorealis/gen-go/apache/aurora"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
48
job.go
48
job.go
|
@ -55,12 +55,22 @@ type Job interface {
|
||||||
GetInstanceCount() int32
|
GetInstanceCount() int32
|
||||||
MaxFailure(maxFail int32) Job
|
MaxFailure(maxFail int32) Job
|
||||||
Container(container Container) Job
|
Container(container Container) Job
|
||||||
|
PartitionPolicy(policy *aurora.PartitionPolicy) Job
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ResourceType int
|
||||||
|
|
||||||
|
const (
|
||||||
|
CPU ResourceType = iota
|
||||||
|
RAM
|
||||||
|
DISK
|
||||||
|
GPU
|
||||||
|
)
|
||||||
|
|
||||||
// Structure to collect all information pertaining to an Aurora job.
|
// Structure to collect all information pertaining to an Aurora job.
|
||||||
type AuroraJob struct {
|
type AuroraJob struct {
|
||||||
jobConfig *aurora.JobConfiguration
|
jobConfig *aurora.JobConfiguration
|
||||||
resources map[string]*aurora.Resource
|
resources map[ResourceType]*aurora.Resource
|
||||||
portCount int
|
portCount int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -87,15 +97,8 @@ func NewJob() Job {
|
||||||
ramMb := aurora.NewResource()
|
ramMb := aurora.NewResource()
|
||||||
diskMb := aurora.NewResource()
|
diskMb := aurora.NewResource()
|
||||||
|
|
||||||
resources := make(map[string]*aurora.Resource)
|
resources := map[ResourceType]*aurora.Resource{CPU: numCpus, RAM: ramMb, DISK: diskMb}
|
||||||
resources["cpu"] = numCpus
|
taskConfig.Resources = map[*aurora.Resource]bool{numCpus: true, ramMb: true, diskMb: true}
|
||||||
resources["ram"] = ramMb
|
|
||||||
resources["disk"] = diskMb
|
|
||||||
|
|
||||||
taskConfig.Resources = make(map[*aurora.Resource]bool)
|
|
||||||
taskConfig.Resources[numCpus] = true
|
|
||||||
taskConfig.Resources[ramMb] = true
|
|
||||||
taskConfig.Resources[diskMb] = true
|
|
||||||
|
|
||||||
numCpus.NumCpus = new(float64)
|
numCpus.NumCpus = new(float64)
|
||||||
ramMb.RamMb = new(int64)
|
ramMb.RamMb = new(int64)
|
||||||
|
@ -154,20 +157,28 @@ func (j *AuroraJob) ExecutorData(data string) Job {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (j *AuroraJob) CPU(cpus float64) Job {
|
func (j *AuroraJob) CPU(cpus float64) Job {
|
||||||
*j.resources["cpu"].NumCpus = cpus
|
*j.resources[CPU].NumCpus = cpus
|
||||||
|
|
||||||
return j
|
return j
|
||||||
}
|
}
|
||||||
|
|
||||||
func (j *AuroraJob) RAM(ram int64) Job {
|
func (j *AuroraJob) RAM(ram int64) Job {
|
||||||
*j.resources["ram"].RamMb = ram
|
*j.resources[RAM].RamMb = ram
|
||||||
|
|
||||||
return j
|
return j
|
||||||
}
|
}
|
||||||
|
|
||||||
func (j *AuroraJob) Disk(disk int64) Job {
|
func (j *AuroraJob) Disk(disk int64) Job {
|
||||||
*j.resources["disk"].DiskMb = disk
|
*j.resources[DISK].DiskMb = disk
|
||||||
|
return j
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *AuroraJob) GPU(gpus int64) Job {
|
||||||
|
if _, ok := j.resources[GPU]; !ok {
|
||||||
|
numGPUs := &aurora.Resource{NumGpus: new(int64)}
|
||||||
|
j.resources[GPU] = numGPUs
|
||||||
|
j.TaskConfig().Resources[numGPUs] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
*j.resources[GPU].NumGpus = gpus
|
||||||
return j
|
return j
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -315,3 +326,10 @@ func (j *AuroraJob) Container(container Container) Job {
|
||||||
|
|
||||||
return j
|
return j
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Set a partition policy for the job configuration to implement.
|
||||||
|
func (j *AuroraJob) PartitionPolicy(policy *aurora.PartitionPolicy) Job {
|
||||||
|
j.jobConfig.TaskConfig.PartitionPolicy = policy
|
||||||
|
|
||||||
|
return j
|
||||||
|
}
|
||||||
|
|
|
@ -40,20 +40,20 @@ func (l *LevelLogger) EnableDebug(enable bool) {
|
||||||
func (l LevelLogger) DebugPrintf(format string, a ...interface{}) {
|
func (l LevelLogger) DebugPrintf(format string, a ...interface{}) {
|
||||||
if l.debug {
|
if l.debug {
|
||||||
l.Print("[DEBUG] ")
|
l.Print("[DEBUG] ")
|
||||||
l.Printf(format, a)
|
l.Printf(format, a...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l LevelLogger) DebugPrint(a ...interface{}) {
|
func (l LevelLogger) DebugPrint(a ...interface{}) {
|
||||||
if l.debug {
|
if l.debug {
|
||||||
l.Print("[DEBUG] ")
|
l.Print("[DEBUG] ")
|
||||||
l.Print(a)
|
l.Print(a...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l LevelLogger) DebugPrintln(a ...interface{}) {
|
func (l LevelLogger) DebugPrintln(a ...interface{}) {
|
||||||
if l.debug {
|
if l.debug {
|
||||||
l.Print("[DEBUG] ")
|
l.Print("[DEBUG] ")
|
||||||
l.Println(a)
|
l.Println(a...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,7 +36,7 @@ import (
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
const VERSION = "1.3.0"
|
const VERSION = "1.3.1"
|
||||||
|
|
||||||
// TODO(rdelvalle): Move documentation to interface in order to make godoc look better/more accessible
|
// TODO(rdelvalle): Move documentation to interface in order to make godoc look better/more accessible
|
||||||
type Realis interface {
|
type Realis interface {
|
||||||
|
|
|
@ -22,11 +22,13 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/paypal/gorealis"
|
"git.apache.org/thrift.git/lib/go/thrift"
|
||||||
|
realis "github.com/paypal/gorealis"
|
||||||
"github.com/paypal/gorealis/gen-go/apache/aurora"
|
"github.com/paypal/gorealis/gen-go/apache/aurora"
|
||||||
"github.com/paypal/gorealis/response"
|
"github.com/paypal/gorealis/response"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
var r realis.Realis
|
var r realis.Realis
|
||||||
|
@ -309,10 +311,8 @@ func TestRealisClient_CreateService_WithPulse_Thermos(t *testing.T) {
|
||||||
AddPorts(1).
|
AddPorts(1).
|
||||||
AddLabel("currentTime", time.Now().String())
|
AddLabel("currentTime", time.Now().String())
|
||||||
|
|
||||||
pulse := int32(30)
|
|
||||||
timeout := 300
|
|
||||||
settings := realis.NewUpdateSettings()
|
settings := realis.NewUpdateSettings()
|
||||||
settings.BlockIfNoPulsesAfterMs = &pulse
|
settings.BlockIfNoPulsesAfterMs = thrift.Int32Ptr(30)
|
||||||
settings.UpdateGroupSize = 1
|
settings.UpdateGroupSize = 1
|
||||||
settings.WaitForBatchCompletion = true
|
settings.WaitForBatchCompletion = true
|
||||||
job.InstanceCount(2)
|
job.InstanceCount(2)
|
||||||
|
@ -327,52 +327,58 @@ func TestRealisClient_CreateService_WithPulse_Thermos(t *testing.T) {
|
||||||
Limit: 1,
|
Limit: 1,
|
||||||
}
|
}
|
||||||
|
|
||||||
start := time.Now()
|
var updateDetails []*aurora.JobUpdateDetails
|
||||||
for i := 0; i*int(pulse) <= timeout; i++ {
|
|
||||||
|
|
||||||
fmt.Println("sending PulseJobUpdate....")
|
ticker := time.NewTicker(time.Second * 3)
|
||||||
resp, err = r.PulseJobUpdate(result.GetKey())
|
timer := time.NewTimer(time.Minute * 6)
|
||||||
assert.NotNil(t, resp)
|
defer ticker.Stop()
|
||||||
assert.Nil(t, err)
|
defer timer.Stop()
|
||||||
|
|
||||||
respDetail, err := r.JobUpdateDetails(updateQ)
|
pulseLoop:
|
||||||
assert.Nil(t, err)
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
|
||||||
updateDetail := response.JobUpdateDetails(respDetail)
|
fmt.Println("sending PulseJobUpdate....")
|
||||||
if len(updateDetail) == 0 {
|
resp, err = r.PulseJobUpdate(result.GetKey())
|
||||||
fmt.Println("No update found")
|
require.NotNil(t, resp, "received a nil response from Aurora")
|
||||||
assert.NotEqual(t, len(updateDetail), 0)
|
assert.Nil(t, err)
|
||||||
}
|
|
||||||
status := updateDetail[0].Update.Summary.State.Status
|
|
||||||
|
|
||||||
if _, ok := aurora.ACTIVE_JOB_UPDATE_STATES[status]; !ok {
|
respDetail, err := r.JobUpdateDetails(updateQ)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
|
||||||
// Rolled forward is the only state in which an update has been successfully updated
|
updateDetails = response.JobUpdateDetails(respDetail)
|
||||||
// if we encounter an inactive state and it is not at rolled forward, update failed
|
if len(updateDetails) == 0 {
|
||||||
if status == aurora.JobUpdateStatus_ROLLED_FORWARD {
|
fmt.Println("No update found")
|
||||||
fmt.Println("Update succeded")
|
assert.NotEqual(t, len(updateDetails), 0)
|
||||||
break
|
|
||||||
} else {
|
|
||||||
fmt.Println("Update failed")
|
|
||||||
break
|
|
||||||
}
|
}
|
||||||
|
status := updateDetails[0].Update.Summary.State.Status
|
||||||
|
|
||||||
|
if _, ok := aurora.ACTIVE_JOB_UPDATE_STATES[status]; !ok {
|
||||||
|
|
||||||
|
// Rolled forward is the only state in which an update has been successfully updated
|
||||||
|
// if we encounter an inactive state and it is not at rolled forward, update failed
|
||||||
|
if status == aurora.JobUpdateStatus_ROLLED_FORWARD {
|
||||||
|
fmt.Println("Update succeded")
|
||||||
|
break pulseLoop
|
||||||
|
} else {
|
||||||
|
fmt.Println("Update failed")
|
||||||
|
break pulseLoop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println("Polling, update still active...")
|
||||||
|
case <-timer.C:
|
||||||
|
_, err := r.AbortJobUpdate(*updateDetails[0].GetUpdate().GetSummary().GetKey(), "")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
_, err = r.KillJob(job.JobKey())
|
||||||
|
require.NoError(t, err, "timed out during pulse update test")
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Println("Polling, update still active...")
|
|
||||||
time.Sleep(time.Duration(pulse) * time.Second)
|
|
||||||
}
|
}
|
||||||
end := time.Now()
|
|
||||||
fmt.Printf("Update call took %d ns\n", (end.UnixNano() - start.UnixNano()))
|
|
||||||
|
|
||||||
t.Run("TestRealisClient_KillJob_Thermos", func(t *testing.T) {
|
|
||||||
start := time.Now()
|
|
||||||
resp, err := r.KillJob(job.JobKey())
|
|
||||||
end := time.Now()
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode)
|
|
||||||
fmt.Printf("Kill call took %d ns\n", (end.UnixNano() - start.UnixNano()))
|
|
||||||
})
|
|
||||||
|
|
||||||
|
resp, err = r.KillJob(job.JobKey())
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test configuring an executor that doesn't exist for CreateJob API
|
// Test configuring an executor that doesn't exist for CreateJob API
|
||||||
|
@ -403,7 +409,7 @@ func TestRealisClient_CreateService(t *testing.T) {
|
||||||
var ok bool
|
var ok bool
|
||||||
var mErr error
|
var mErr error
|
||||||
|
|
||||||
if ok, mErr = monitor.JobUpdate(*result.GetKey(), 5, 180); !ok || mErr != nil {
|
if ok, mErr = monitor.JobUpdate(*result.GetKey(), 5, 240); !ok || mErr != nil {
|
||||||
// Update may already be in a terminal state so don't check for error
|
// Update may already be in a terminal state so don't check for error
|
||||||
_, err := r.AbortJobUpdate(*result.GetKey(), "Monitor timed out.")
|
_, err := r.AbortJobUpdate(*result.GetKey(), "Monitor timed out.")
|
||||||
|
|
||||||
|
@ -692,3 +698,41 @@ func TestRealisClient_ForceExplicitTaskReconciliation(t *testing.T) {
|
||||||
err = r.ForceExplicitTaskReconciliation(&batchSize)
|
err = r.ForceExplicitTaskReconciliation(&batchSize)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRealisClient_PartitionPolicy(t *testing.T) {
|
||||||
|
|
||||||
|
role := "vagrant"
|
||||||
|
var partitionDelay int64 = 30
|
||||||
|
job := realis.NewJob().
|
||||||
|
Environment("prod").
|
||||||
|
Role(role).
|
||||||
|
Name("create_thermos_job_partition_policy_test").
|
||||||
|
ExecutorName(aurora.AURORA_EXECUTOR_NAME).
|
||||||
|
ExecutorData(string(thermosPayload)).
|
||||||
|
CPU(.5).
|
||||||
|
RAM(64).
|
||||||
|
Disk(100).
|
||||||
|
IsService(true).
|
||||||
|
InstanceCount(2).
|
||||||
|
PartitionPolicy(&aurora.PartitionPolicy{Reschedule: true, DelaySecs: &partitionDelay})
|
||||||
|
|
||||||
|
settings := realis.NewUpdateSettings()
|
||||||
|
settings.UpdateGroupSize = 2
|
||||||
|
resp, result, err := r.CreateService(job, settings)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode)
|
||||||
|
|
||||||
|
var ok bool
|
||||||
|
var mErr error
|
||||||
|
|
||||||
|
if ok, mErr = monitor.JobUpdate(*result.GetKey(), 5, 180); !ok || mErr != nil {
|
||||||
|
// Update may already be in a terminal state so don't check for error
|
||||||
|
_, err := r.AbortJobUpdate(*result.GetKey(), "Monitor timed out.")
|
||||||
|
|
||||||
|
_, err = r.KillJob(job.JobKey())
|
||||||
|
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
17
updatejob.go
17
updatejob.go
|
@ -37,17 +37,17 @@ func NewDefaultUpdateJob(config *aurora.TaskConfig) *UpdateJob {
|
||||||
// Rebuild resource map from TaskConfig
|
// Rebuild resource map from TaskConfig
|
||||||
for ptr := range config.Resources {
|
for ptr := range config.Resources {
|
||||||
if ptr.NumCpus != nil {
|
if ptr.NumCpus != nil {
|
||||||
job.resources["cpu"].NumCpus = ptr.NumCpus
|
job.resources[CPU].NumCpus = ptr.NumCpus
|
||||||
continue // Guard against Union violations that Go won't enforce
|
continue // Guard against Union violations that Go won't enforce
|
||||||
}
|
}
|
||||||
|
|
||||||
if ptr.RamMb != nil {
|
if ptr.RamMb != nil {
|
||||||
job.resources["ram"].RamMb = ptr.RamMb
|
job.resources[RAM].RamMb = ptr.RamMb
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if ptr.DiskMb != nil {
|
if ptr.DiskMb != nil {
|
||||||
job.resources["disk"].DiskMb = ptr.DiskMb
|
job.resources[DISK].DiskMb = ptr.DiskMb
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -77,19 +77,24 @@ func NewUpdateJob(config *aurora.TaskConfig, settings *aurora.JobUpdateSettings)
|
||||||
// Rebuild resource map from TaskConfig
|
// Rebuild resource map from TaskConfig
|
||||||
for ptr := range config.Resources {
|
for ptr := range config.Resources {
|
||||||
if ptr.NumCpus != nil {
|
if ptr.NumCpus != nil {
|
||||||
job.resources["cpu"].NumCpus = ptr.NumCpus
|
job.resources[CPU].NumCpus = ptr.NumCpus
|
||||||
continue // Guard against Union violations that Go won't enforce
|
continue // Guard against Union violations that Go won't enforce
|
||||||
}
|
}
|
||||||
|
|
||||||
if ptr.RamMb != nil {
|
if ptr.RamMb != nil {
|
||||||
job.resources["ram"].RamMb = ptr.RamMb
|
job.resources[RAM].RamMb = ptr.RamMb
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if ptr.DiskMb != nil {
|
if ptr.DiskMb != nil {
|
||||||
job.resources["disk"].DiskMb = ptr.DiskMb
|
job.resources[DISK].DiskMb = ptr.DiskMb
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ptr.NumGpus != nil {
|
||||||
|
job.resources[GPU].NumGpus = ptr.NumGpus
|
||||||
|
continue // Guard against Union violations that Go won't enforce
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//TODO(rdelvalle): Deep copy job struct to avoid unexpected behavior
|
//TODO(rdelvalle): Deep copy job struct to avoid unexpected behavior
|
||||||
|
|
|
@ -20,7 +20,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/paypal/gorealis"
|
realis "github.com/paypal/gorealis"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue