diff --git a/examples/client.go b/examples/client.go index 8160678..e1e3411 100644 --- a/examples/client.go +++ b/examples/client.go @@ -135,11 +135,11 @@ func main() { job = realis.NewJob(). Environment("prod"). Role("vagrant"). - Name("docker-compose"). + Name("docker-compose-test"). ExecutorName("docker-compose-executor"). ExecutorData("{}"). CPU(0.25). - RAM(64). + RAM(512). Disk(100). IsService(true). InstanceCount(1). @@ -192,14 +192,15 @@ func main() { fmt.Println("Creating service") settings := realis.NewUpdateSettings() job.InstanceCount(3) - _, resp, err := r.CreateService(job, *settings) + resp, result, err := r.CreateService(job, settings) if err != nil { - fmt.Println(err) + fmt.Println("error: ", err) + fmt.Println("response: ", resp.String()) os.Exit(1) } - fmt.Println(resp.String()) + fmt.Println(result.String()) - if ok, err := monitor.JobUpdate(*resp.GetKey(), 5, 50); !ok || err != nil { + if ok, err := monitor.JobUpdate(*result.GetKey(), 5, 50); !ok || err != nil { _, err := r.KillJob(job.JobKey()) if err != nil { fmt.Println(err) @@ -389,7 +390,7 @@ func main() { } if resp.ResponseCode == aurora.ResponseCode_OK { - if ok, err := monitor.Instances(job.JobKey(), currInstances-numOfInstances, 5, 50); !ok || err != nil { + if ok, err := monitor.Instances(job.JobKey(), currInstances-numOfInstances, 5, 100); !ok || err != nil { fmt.Println("flexDown failed") } } @@ -427,6 +428,39 @@ func main() { jobUpdateKey := response.JobUpdateKey(resp) monitor.JobUpdate(*jobUpdateKey, 5, 500) break + + case "pauseJobUpdate": + resp, err := r.PauseJobUpdate(&aurora.JobUpdateKey{ + Job: job.JobKey(), + ID: updateId, + }, "") + + if err != nil { + fmt.Println(err) + } + fmt.Println("PauseJobUpdate response: ", resp.String()) + + case "resumeJobUpdate": + resp, err := r.ResumeJobUpdate(&aurora.JobUpdateKey{ + Job: job.JobKey(), + ID: updateId, + }, "") + + if err != nil { + fmt.Println(err) + } + fmt.Println("ResumeJobUpdate response: ", resp.String()) + + case "pulseJobUpdate": + resp, err := r.PulseJobUpdate(&aurora.JobUpdateKey{ + Job: job.JobKey(), + ID: updateId, + }) + if err != nil { + fmt.Println(err) + } + fmt.Println("PulseJobUpdate response: ", resp.String()) + case "updateDetails": resp, err := r.JobUpdateDetails(aurora.JobUpdateQuery{ Key: &aurora.JobUpdateKey{ diff --git a/realis.go b/realis.go index 03554aa..5a3b202 100644 --- a/realis.go +++ b/realis.go @@ -40,7 +40,7 @@ type Realis interface { AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) CreateJob(auroraJob Job) (*aurora.Response, error) - CreateService(auroraJob Job, settings UpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error) + CreateService(auroraJob Job, settings *aurora.JobUpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error) GetInstanceIds(key *aurora.JobKey, states map[aurora.ScheduleStatus]bool) (map[int32]bool, error) @@ -57,6 +57,10 @@ type Realis interface { RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) + + PauseJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error) + ResumeJobUpdate(key *aurora.JobUpdateKey, message string) (*aurora.Response, error) + PulseJobUpdate(key *aurora.JobUpdateKey) (*aurora.Response, error) StartCronJob(key *aurora.JobKey) (*aurora.Response, error) // TODO: Remove this method and make it private to avoid race conditions ReestablishConn() error @@ -549,6 +553,7 @@ func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue return resp, nil } + func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsResult_, error) { var resp *aurora.Response var result *aurora.GetJobsResult_ @@ -660,9 +665,9 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { } // This API uses an update thrift call to create the services giving a few more robust features. -func (r *realisClient) CreateService(auroraJob Job, settings UpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error) { +func (r *realisClient) CreateService(auroraJob Job, settings *aurora.JobUpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error) { // Create a new job update object and ship it to the StartJobUpdate api - update := NewUpdateJob(auroraJob.TaskConfig(), &settings.settings) + update := NewUpdateJob(auroraJob.TaskConfig(), settings) update.InstanceCount(auroraJob.GetInstanceCount()) update.BatchSize(auroraJob.GetInstanceCount()) @@ -695,7 +700,7 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Cron Job Schedule message to Aurora Scheduler") + return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending Cron Job Schedule message to Aurora Scheduler") } return resp, nil } @@ -718,7 +723,7 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, }) if retryErr != nil { - return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending Cron Job De-schedule message to Aurora Scheduler") + return nil, errors.Wrap(clientErr, retryErr.Error()+"Error sending Cron Job De-schedule message to Aurora Scheduler") } return resp, nil @@ -859,6 +864,73 @@ func (r *realisClient) AbortJobUpdate( return resp, nil } +//Pause Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. +func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) { + var resp *aurora.Response + var clientErr error + retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { + resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { + return r.client.PauseJobUpdate(updateKey, message) + }) + + if clientErr != nil { + return false, clientErr + } + + return true, nil + }) + if retryErr != nil { + return nil, errors.Wrap(clientErr, retryErr.Error()+": Error sending PauseJobUpdate command to Aurora Scheduler") + } + + return resp, nil +} + +//Resume Paused Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. +func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) { + var resp *aurora.Response + var clientErr error + retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { + resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { + return r.client.ResumeJobUpdate(updateKey, message) + }) + + if clientErr != nil { + return false, clientErr + } + + return true, nil + }) + if retryErr != nil { + return nil, errors.Wrap(retryErr, "Error sending ResumeJobUpdate command to Aurora Scheduler") + } + + return resp, nil +} + +//Pulse Job Update on Aurora. UpdateID is returned from StartJobUpdate or the Aurora web UI. +func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.Response, error) { + + var resp *aurora.Response + var clientErr error + retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { + resp, clientErr = r.thriftCallHelper(func() (*aurora.Response, error) { + return r.client.PulseJobUpdate(updateKey) + }) + + if clientErr != nil { + return false, clientErr + } + + return true, nil + }) + if retryErr != nil { + return nil, errors.Wrap(retryErr, "Error sending PulseJobUpdate command to Aurora Scheduler") + } + + return resp, nil +} + // Scale up the number of instances under a job configuration using the configuration for specific // instance to scale up. func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) { diff --git a/realis_e2e_test.go b/realis_e2e_test.go index 1538886..03720a2 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -26,6 +26,7 @@ import ( "github.com/paypal/gorealis" "github.com/paypal/gorealis/gen-go/apache/aurora" "github.com/stretchr/testify/assert" + "github.com/paypal/gorealis/response" ) var r realis.Realis @@ -128,6 +129,91 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) { }) } +func TestRealisClient_CreateJobWithPulse_Thermos(t *testing.T) { + + fmt.Println("Creating service") + role := "vagrant" + job := realis.NewJob(). + Environment("prod"). + Role(role). + Name("create_thermos_job_test"). + ExecutorName(aurora.AURORA_EXECUTOR_NAME). + ExecutorData(string(thermosPayload)). + CPU(1). + RAM(64). + Disk(100). + IsService(true). + InstanceCount(1). + AddPorts(1). + AddLabel("currentTime", time.Now().String()) + + pulse := int32(30) + timeout := 300 + settings := realis.NewUpdateSettings() + settings.BlockIfNoPulsesAfterMs = &pulse + settings.UpdateGroupSize = 1 + settings.WaitForBatchCompletion = true + job.InstanceCount(2) + resp, result, err := r.CreateService(job, settings) + fmt.Println(result.String()) + + assert.NoError(t, err) + assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode) + + + updateQ := aurora.JobUpdateQuery{ + Key: result.GetKey(), + Limit: 1, + } + + start := time.Now() + for i := 0; i*int(pulse) <= timeout; i++ { + + fmt.Println("sending PulseJobUpdate....") + resp, err = r.PulseJobUpdate(result.GetKey()) + assert.NotNil(t, resp) + assert.Nil(t, err) + + respDetail, err := r.JobUpdateDetails(updateQ) + assert.Nil(t, err) + + updateDetail := response.JobUpdateDetails(respDetail) + if len(updateDetail) == 0 { + fmt.Println("No update found") + assert.NotEqual(t, len(updateDetail), 0) + } + status := updateDetail[0].Update.Summary.State.Status + + if _, ok := aurora.ACTIVE_JOB_UPDATE_STATES[status]; !ok { + + // Rolled forward is the only state in which an update has been successfully updated + // if we encounter an inactive state and it is not at rolled forward, update failed + if status == aurora.JobUpdateStatus_ROLLED_FORWARD { + fmt.Println("Update succeded") + break + } else { + fmt.Println("Update failed") + break + } + } + + fmt.Println("Polling, update still active...") + time.Sleep(time.Duration(pulse) * time.Second) + } + end := time.Now() + fmt.Printf("Update call took %d ns\n", (end.UnixNano() - start.UnixNano())) + + t.Run("TestRealisClient_KillJob_Thermos", func(t *testing.T) { + start := time.Now() + resp, err := r.KillJob(job.JobKey()) + end := time.Now() + assert.NoError(t, err) + assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode) + fmt.Printf("Kill call took %d ns\n", (end.UnixNano() - start.UnixNano())) + }) + +} + func TestRealisClient_ScheduleCronJob_Thermos(t *testing.T) { thermosCronPayload, err := ioutil.ReadFile("examples/thermos_cron_payload.json") diff --git a/updatejob.go b/updatejob.go index bffe285..a1be06a 100644 --- a/updatejob.go +++ b/updatejob.go @@ -29,8 +29,7 @@ func NewDefaultUpdateJob(config *aurora.TaskConfig) *UpdateJob { req := aurora.NewJobUpdateRequest() req.TaskConfig = config - s := NewUpdateSettings().Settings() - req.Settings = &s + req.Settings = NewUpdateSettings() job := NewJob().(*AuroraJob) job.jobConfig.TaskConfig = config @@ -139,64 +138,18 @@ func (u *UpdateJob) RollbackOnFail(rollback bool) *UpdateJob { return u } -// TODO(rdelvalle): Integrate this struct with the JobUpdate struct so that we don't repeat code -type UpdateSettings struct { - settings aurora.JobUpdateSettings -} -func NewUpdateSettings() *UpdateSettings { - - us := new(UpdateSettings) +func NewUpdateSettings() *aurora.JobUpdateSettings { + us := new(aurora.JobUpdateSettings) // Mirrors defaults set by Pystachio - us.settings.UpdateOnlyTheseInstances = make(map[*aurora.Range]bool) - us.settings.UpdateGroupSize = 1 - us.settings.WaitForBatchCompletion = false - us.settings.MinWaitInInstanceRunningMs = 45000 - us.settings.MaxPerInstanceFailures = 0 - us.settings.MaxFailedInstances = 0 - us.settings.RollbackOnFailure = true + us.UpdateOnlyTheseInstances = make(map[*aurora.Range]bool) + us.UpdateGroupSize = 1 + us.WaitForBatchCompletion = false + us.MinWaitInInstanceRunningMs = 45000 + us.MaxPerInstanceFailures = 0 + us.MaxFailedInstances = 0 + us.RollbackOnFailure = true return us } - -// Max number of instances being updated at any given moment. -func (u *UpdateSettings) BatchSize(size int32) *UpdateSettings { - u.settings.UpdateGroupSize = size - return u -} - -// Minimum number of seconds a shard must remain in RUNNING state before considered a success. -func (u *UpdateSettings) WatchTime(ms int32) *UpdateSettings { - u.settings.MinWaitInInstanceRunningMs = ms - return u -} - -// Wait for all instances in a group to be done before moving on. -func (u *UpdateSettings) WaitForBatchCompletion(batchWait bool) *UpdateSettings { - u.settings.WaitForBatchCompletion = batchWait - return u -} - -// Max number of instance failures to tolerate before marking instance as FAILED. -func (u *UpdateSettings) MaxPerInstanceFailures(inst int32) *UpdateSettings { - u.settings.MaxPerInstanceFailures = inst - return u -} - -// Max number of FAILED instances to tolerate before terminating the update. -func (u *UpdateSettings) MaxFailedInstances(inst int32) *UpdateSettings { - u.settings.MaxFailedInstances = inst - return u -} - -// When False, prevents auto rollback of a failed update. -func (u *UpdateSettings) RollbackOnFail(rollback bool) *UpdateSettings { - u.settings.RollbackOnFailure = rollback - return u -} - -// Return internal Thrift API structure -func (u UpdateSettings) Settings() aurora.JobUpdateSettings { - return u.settings -}