diff --git a/examples/client.go b/examples/client.go index 35cc040..7151efb 100644 --- a/examples/client.go +++ b/examples/client.go @@ -180,9 +180,8 @@ func main() { case "createService": // Create a service with three instances using the update API instead of the createJob API fmt.Println("Creating service") - settings := realis.NewUpdateSettings() - job.InstanceCount(3) - result, err := r.CreateService(job, settings) + settings := realis.JobUpdateFromConfig(job.TaskConfig()).InstanceCount(3) + result, err := r.CreateService(settings) if err != nil { log.Fatal("error: ", err) } @@ -304,8 +303,9 @@ func main() { currInstances := len(live) fmt.Println("Current num of instances: ", currInstances) + key := job.JobKey() err = r.AddInstances(aurora.InstanceKey{ - JobKey: job.JobKey(), + JobKey: &key, InstanceId: live[0], }, int32(numOfInstances)) @@ -345,15 +345,16 @@ func main() { if err != nil { log.Fatal(err) } + + key := job.JobKey() taskConfig, err := r.FetchTaskConfig(aurora.InstanceKey{ - JobKey: job.JobKey(), + JobKey: &key, InstanceId: live[0], }) if err != nil { log.Fatal(err) } - updateJob := realis.NewDefaultUpdateJob(taskConfig) - updateJob.InstanceCount(5).RAM(128) + updateJob := realis.JobUpdateFromConfig(taskConfig).InstanceCount(5).RAM(128) result, err := r.StartJobUpdate(updateJob, "") if err != nil { @@ -367,8 +368,9 @@ func main() { } case "pauseJobUpdate": + key := job.JobKey() err := r.PauseJobUpdate(&aurora.JobUpdateKey{ - Job: job.JobKey(), + Job: &key, ID: updateId, }, "") @@ -377,8 +379,9 @@ func main() { } case "resumeJobUpdate": + key := job.JobKey() err := r.ResumeJobUpdate(&aurora.JobUpdateKey{ - Job: job.JobKey(), + Job: &key, ID: updateId, }, "") @@ -387,8 +390,9 @@ func main() { } case "pulseJobUpdate": + key := job.JobKey() resp, err := r.PulseJobUpdate(&aurora.JobUpdateKey{ - Job: job.JobKey(), + Job: &key, ID: updateId, }) if err != nil { @@ -398,9 +402,10 @@ func main() { fmt.Println("PulseJobUpdate response: ", resp.String()) case "updateDetails": + key := job.JobKey() result, err := r.JobUpdateDetails(aurora.JobUpdateQuery{ Key: &aurora.JobUpdateKey{ - Job: job.JobKey(), + Job: &key, ID: updateId, }, Limit: 1, @@ -414,8 +419,9 @@ func main() { case "abortUpdate": fmt.Println("Abort update") + key := job.JobKey() err := r.AbortJobUpdate(aurora.JobUpdateKey{ - Job: job.JobKey(), + Job: &key, ID: updateId, }, "") @@ -426,8 +432,9 @@ func main() { case "rollbackUpdate": fmt.Println("Abort update") + key := job.JobKey() err := r.RollbackJobUpdate(aurora.JobUpdateKey{ - Job: job.JobKey(), + Job: &key, ID: updateId, }, "") @@ -443,8 +450,9 @@ func main() { log.Fatal(err) } + key := job.JobKey() config, err := r.FetchTaskConfig(aurora.InstanceKey{ - JobKey: job.JobKey(), + JobKey: &key, InstanceId: live[0], }) @@ -456,9 +464,10 @@ func main() { case "updatesummary": fmt.Println("Getting job update summary") + key := job.JobKey() jobquery := &aurora.JobUpdateQuery{ - Role: &job.JobKey().Role, - JobKey: job.JobKey(), + Role: &key.Role, + JobKey: &key, } updatesummary, err := r.GetJobUpdateSummaries(jobquery) if err != nil { @@ -469,10 +478,11 @@ func main() { case "taskStatus": fmt.Println("Getting task status") + key := job.JobKey() taskQ := &aurora.TaskQuery{ - Role: &job.JobKey().Role, - Environment: &job.JobKey().Environment, - JobName: &job.JobKey().Name, + Role: &key.Role, + Environment: &key.Environment, + JobName: &key.Name, } tasks, err := r.GetTaskStatus(taskQ) if err != nil { @@ -484,10 +494,11 @@ func main() { case "tasksWithoutConfig": fmt.Println("Getting task status") + key := job.JobKey() taskQ := &aurora.TaskQuery{ - Role: &job.JobKey().Role, - Environment: &job.JobKey().Environment, - JobName: &job.JobKey().Name, + Role: &key.Role, + Environment: &key.Environment, + JobName: &key.Name, } tasks, err := r.GetTasksWithoutConfigs(taskQ) if err != nil { @@ -580,10 +591,11 @@ func main() { case "getPendingReasons": fmt.Println("Getting pending reasons") + key := job.JobKey() taskQ := &aurora.TaskQuery{ - Role: &job.JobKey().Role, - Environment: &job.JobKey().Environment, - JobName: &job.JobKey().Name, + Role: &key.Role, + Environment: &key.Environment, + JobName: &key.Name, } reasons, err := r.GetPendingReason(taskQ) if err != nil { diff --git a/jobUpdate.go b/jobUpdate.go index 257e921..bd25e79 100644 --- a/jobUpdate.go +++ b/jobUpdate.go @@ -15,6 +15,9 @@ package realis import ( + "time" + + "git.apache.org/thrift.git/lib/go/thrift" "github.com/paypal/gorealis/gen-go/apache/aurora" ) @@ -25,23 +28,33 @@ type JobUpdate struct { } // Create a default JobUpdate object with an empty task and no fields filled in. -func NewJobUpdate(task *AuroraTask) *JobUpdate { +func NewJobUpdate() *JobUpdate { newTask := NewTask() req := aurora.JobUpdateRequest{} req.TaskConfig = newTask.TaskConfig() - req.Settings = NewUpdateSettings() + req.Settings = newUpdateSettings() return &JobUpdate{task: newTask, request: &req} } -func JobUpdateFormTask(task *AuroraTask) *JobUpdate { - // Perform a deep copy to avoid unexpected behavior +func JobUpdateFromAuroraTask(task *AuroraTask) *JobUpdate { newTask := task.Clone() req := aurora.JobUpdateRequest{} req.TaskConfig = newTask.TaskConfig() - req.Settings = NewUpdateSettings() + req.Settings = newUpdateSettings() + + return &JobUpdate{task: newTask, request: &req} +} + +func JobUpdateFromConfig(task *aurora.TaskConfig) *JobUpdate { + // Perform a deep copy to avoid unexpected behavior + newTask := TaskFromThrift(task) + + req := aurora.JobUpdateRequest{} + req.TaskConfig = newTask.TaskConfig() + req.Settings = newUpdateSettings() return &JobUpdate{task: newTask, request: &req} } @@ -59,8 +72,8 @@ func (j *JobUpdate) BatchSize(size int32) *JobUpdate { } // Minimum number of seconds a shard must remain in RUNNING state before considered a success. -func (j *JobUpdate) WatchTime(ms int32) *JobUpdate { - j.request.Settings.MinWaitInInstanceRunningMs = ms +func (j *JobUpdate) WatchTime(timeout time.Duration) *JobUpdate { + j.request.Settings.MinWaitInInstanceRunningMs = int32(timeout.Seconds() * 1000) return j } @@ -88,7 +101,13 @@ func (j *JobUpdate) RollbackOnFail(rollback bool) *JobUpdate { return j } -func NewUpdateSettings() *aurora.JobUpdateSettings { +// Sets the interval at which pulses should be received by the job update before timing out. +func (j *JobUpdate) PulseIntervalTimeout(timeout time.Duration) *JobUpdate { + j.request.Settings.BlockIfNoPulsesAfterMs = thrift.Int32Ptr(int32(timeout.Seconds() * 1000)) + return j +} + +func newUpdateSettings() *aurora.JobUpdateSettings { us := aurora.JobUpdateSettings{} // Mirrors defaults set by Pystachio @@ -104,10 +123,27 @@ func NewUpdateSettings() *aurora.JobUpdateSettings { } /* - AuroraTask specific API, see task.go for further documentation. - These functions are provided for the convenience of chaining API calls. + These methods are provided for user convenience in order to chain + calls for configuration. + API below here are wrappers around modifying an AuroraTask instance. + See task.go for further documentation. */ +func (t *JobUpdate) Environment(env string) *JobUpdate { + t.task.Environment(env) + return t +} + +func (t *JobUpdate) Role(role string) *JobUpdate { + t.task.Role(role) + return t +} + +func (t *JobUpdate) Name(name string) *JobUpdate { + t.task.Name(name) + return t +} + func (j *JobUpdate) ExecutorName(name string) *JobUpdate { j.task.ExecutorName(name) return j @@ -190,3 +226,7 @@ func (j *JobUpdate) Container(container Container) *JobUpdate { j.task.Container(container) return j } + +func (j *JobUpdate) JobKey() aurora.JobKey { + return j.task.JobKey() +}