From 15c2472ffd5ee6997d9ae66b58eccdf1bfe3d822 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Fri, 30 Sep 2016 01:24:49 -0400 Subject: [PATCH] API for scheduling, descheduling, and starting cron jobs has been created. New response helper added for ScheduleStatusResult. --- README.md | 3 +++ examples/client.go | 33 +++++++++++++++++++++++++++++++++ job.go | 12 ++++++++++++ realis.go | 42 ++++++++++++++++++++++++++++++++++++++---- response/response.go | 4 ++++ 5 files changed, 90 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index fff84ac..2d2a4a6 100644 --- a/README.md +++ b/README.md @@ -17,5 +17,8 @@ library has been tested. Vendoring a working version of this library is highly r * Create or import a custom transport that uses https://github.com/jmcvetta/napping to improve efficiency * End to end testing with Vagrant setup +## Importing +We suggest using http://labix.org/gopkg.in for any imports of packages in this library + ## Contributions Contributions are very much welcome. Please raise an issue so that the contribution may be discussed before it's made. \ No newline at end of file diff --git a/examples/client.go b/examples/client.go index 91b660d..3bff6e8 100644 --- a/examples/client.go +++ b/examples/client.go @@ -132,6 +132,39 @@ func main() { } } } + break + case "scheduleCron": + fmt.Println("Scheduling a Cron job") + // Cron config + job.CronSchedule("* * * * *") + job.IsService(false) + resp, err := r.ScheduleCronJob(job) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + fmt.Println(resp.String()) + + break + case "startCron": + fmt.Println("Starting a Cron job") + resp, err := r.StartCronJob(job.JobKey()) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + fmt.Println(resp.String()) + + break + case "descheduleCron": + fmt.Println("Descheduling a Cron job") + resp, err := r.DescheduleCronJob(job.JobKey()) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + fmt.Println(resp.String()) + break case "kill": fmt.Println("Killing job") diff --git a/job.go b/job.go index 2e77a3d..dbc43f6 100644 --- a/job.go +++ b/job.go @@ -25,6 +25,8 @@ type Job interface { Role(role string) Job Name(name string) Job CPU(cpus float64) Job + CronSchedule(cron string) Job + CronCollisionPolicy(policy aurora.CronCollisionPolicy) Job Disk(disk int64) Job RAM(ram int64) Job ExecutorName(name string) Job @@ -156,6 +158,16 @@ func (j AuroraJob) InstanceCount(instCount int32) Job { return j } +func (j AuroraJob) CronSchedule(cron string) Job { + j.jobConfig.CronSchedule = &cron + return j +} + +func (j AuroraJob) CronCollisionPolicy(policy aurora.CronCollisionPolicy) Job { + j.jobConfig.CronCollisionPolicy = policy + return j +} + // How many instances of the job to run func (j AuroraJob) GetInstanceCount() int32 { return j.jobConfig.InstanceCount diff --git a/realis.go b/realis.go index 2a689f9..e9c2b7a 100644 --- a/realis.go +++ b/realis.go @@ -25,12 +25,14 @@ import ( "net/http/cookiejar" "os" "time" + "github.com/rdelval/gorealis/response" ) type Realis interface { AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) CreateJob(auroraJob Job) (*aurora.Response, error) + DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error) GetInstanceIds(key *aurora.JobKey, states map[aurora.ScheduleStatus]bool) (map[int32]bool, error) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aurora.Response, error) @@ -39,7 +41,9 @@ type Realis interface { RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) RestartJob(key *aurora.JobKey) (*aurora.Response, error) RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) + ScheduleCronJob(auroraJob Job) (*aurora.Response, error) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) + StartCronJob(key *aurora.JobKey) (*aurora.Response, error) Close() } @@ -112,12 +116,12 @@ func (r realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sched JobName: key.Name, Statuses: states} - response, err := r.client.GetTasksWithoutConfigs(taskQ) + resp, err := r.client.GetTasksWithoutConfigs(taskQ) if err != nil { return nil, errors.Wrap(err, "Error querying Aurora Scheduler for active IDs") } - tasks := response.GetResult_().GetScheduleStatusResult_().GetTasks() + tasks := response.ScheduleStatusResult(resp).GetTasks() jobInstanceIds := make(map[int32]bool) for _, task := range tasks { @@ -176,6 +180,36 @@ func (r realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { return response, nil } +func (r realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) { + response, err := r.client.ScheduleCronJob(auroraJob.JobConfig()) + + if err != nil { + return nil, errors.Wrap(err, "Error sending Cron Job Schedule message to Aurora Scheduler") + } + + return response, nil +} + +func (r realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) { + response, err := r.client.DescheduleCronJob(key) + + if err != nil { + return nil, errors.Wrap(err, "Error sending Cron Job De-schedule message to Aurora Scheduler") + } + + return response, nil +} + +func (r realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error) { + response, err := r.client.StartCronJob(key) + + if err != nil { + return nil, errors.Wrap(err, "Error sending Start Cron Job message to Aurora Scheduler") + } + + return response, nil +} + // Restarts specific instances specified func (r realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) { instanceIds := make(map[int32]bool) @@ -263,12 +297,12 @@ func (r realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskC InstanceIds: ids, Statuses: aurora.ACTIVE_STATES} - response, err := r.client.GetTasksStatus(taskQ) + resp, err := r.client.GetTasksStatus(taskQ) if err != nil { return nil, errors.Wrap(err, "Error querying Aurora Scheduler for task configuration") } - tasks := response.GetResult_().GetScheduleStatusResult_().GetTasks() + tasks := response.ScheduleStatusResult(resp).GetTasks() if len(tasks) == 0 { return nil, errors.Errorf("Instance %d for jobkey %s/%s/%s doesn't exist", diff --git a/response/response.go b/response/response.go index 35c6337..b64e665 100644 --- a/response/response.go +++ b/response/response.go @@ -27,3 +27,7 @@ func JobUpdateKey(resp *aurora.Response) *aurora.JobUpdateKey { func JobUpdateDetails(resp *aurora.Response) []*aurora.JobUpdateDetails { return resp.Result_.GetJobUpdateDetailsResult_.DetailsList } + +func ScheduleStatusResult(resp *aurora.Response) *aurora.ScheduleStatusResult_ { + return resp.GetResult_().GetScheduleStatusResult_() +}