API for scheduling, descheduling, and starting cron jobs has been created. New response helper added for ScheduleStatusResult.

This commit is contained in:
Renan DelValle 2016-09-30 01:24:49 -04:00
parent ca8b9359cf
commit 15c2472ffd
5 changed files with 90 additions and 4 deletions

View file

@ -17,5 +17,8 @@ library has been tested. Vendoring a working version of this library is highly r
* Create or import a custom transport that uses https://github.com/jmcvetta/napping to improve efficiency * Create or import a custom transport that uses https://github.com/jmcvetta/napping to improve efficiency
* End to end testing with Vagrant setup * End to end testing with Vagrant setup
## Importing
We suggest using http://labix.org/gopkg.in for any imports of packages in this library
## Contributions ## Contributions
Contributions are very much welcome. Please raise an issue so that the contribution may be discussed before it's made. Contributions are very much welcome. Please raise an issue so that the contribution may be discussed before it's made.

View file

@ -132,6 +132,39 @@ func main() {
} }
} }
} }
break
case "scheduleCron":
fmt.Println("Scheduling a Cron job")
// Cron config
job.CronSchedule("* * * * *")
job.IsService(false)
resp, err := r.ScheduleCronJob(job)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
fmt.Println(resp.String())
break
case "startCron":
fmt.Println("Starting a Cron job")
resp, err := r.StartCronJob(job.JobKey())
if err != nil {
fmt.Println(err)
os.Exit(1)
}
fmt.Println(resp.String())
break
case "descheduleCron":
fmt.Println("Descheduling a Cron job")
resp, err := r.DescheduleCronJob(job.JobKey())
if err != nil {
fmt.Println(err)
os.Exit(1)
}
fmt.Println(resp.String())
break break
case "kill": case "kill":
fmt.Println("Killing job") fmt.Println("Killing job")

12
job.go
View file

@ -25,6 +25,8 @@ type Job interface {
Role(role string) Job Role(role string) Job
Name(name string) Job Name(name string) Job
CPU(cpus float64) Job CPU(cpus float64) Job
CronSchedule(cron string) Job
CronCollisionPolicy(policy aurora.CronCollisionPolicy) Job
Disk(disk int64) Job Disk(disk int64) Job
RAM(ram int64) Job RAM(ram int64) Job
ExecutorName(name string) Job ExecutorName(name string) Job
@ -156,6 +158,16 @@ func (j AuroraJob) InstanceCount(instCount int32) Job {
return j return j
} }
func (j AuroraJob) CronSchedule(cron string) Job {
j.jobConfig.CronSchedule = &cron
return j
}
func (j AuroraJob) CronCollisionPolicy(policy aurora.CronCollisionPolicy) Job {
j.jobConfig.CronCollisionPolicy = policy
return j
}
// How many instances of the job to run // How many instances of the job to run
func (j AuroraJob) GetInstanceCount() int32 { func (j AuroraJob) GetInstanceCount() int32 {
return j.jobConfig.InstanceCount return j.jobConfig.InstanceCount

View file

@ -25,12 +25,14 @@ import (
"net/http/cookiejar" "net/http/cookiejar"
"os" "os"
"time" "time"
"github.com/rdelval/gorealis/response"
) )
type Realis interface { type Realis interface {
AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error)
AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error)
CreateJob(auroraJob Job) (*aurora.Response, error) CreateJob(auroraJob Job) (*aurora.Response, error)
DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error)
FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error)
GetInstanceIds(key *aurora.JobKey, states map[aurora.ScheduleStatus]bool) (map[int32]bool, error) GetInstanceIds(key *aurora.JobKey, states map[aurora.ScheduleStatus]bool) (map[int32]bool, error)
JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aurora.Response, error) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aurora.Response, error)
@ -39,7 +41,9 @@ type Realis interface {
RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error)
RestartJob(key *aurora.JobKey) (*aurora.Response, error) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error)
ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error)
StartCronJob(key *aurora.JobKey) (*aurora.Response, error)
Close() Close()
} }
@ -112,12 +116,12 @@ func (r realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sched
JobName: key.Name, JobName: key.Name,
Statuses: states} Statuses: states}
response, err := r.client.GetTasksWithoutConfigs(taskQ) resp, err := r.client.GetTasksWithoutConfigs(taskQ)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "Error querying Aurora Scheduler for active IDs") return nil, errors.Wrap(err, "Error querying Aurora Scheduler for active IDs")
} }
tasks := response.GetResult_().GetScheduleStatusResult_().GetTasks() tasks := response.ScheduleStatusResult(resp).GetTasks()
jobInstanceIds := make(map[int32]bool) jobInstanceIds := make(map[int32]bool)
for _, task := range tasks { for _, task := range tasks {
@ -176,6 +180,36 @@ func (r realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
return response, nil return response, nil
} }
func (r realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) {
response, err := r.client.ScheduleCronJob(auroraJob.JobConfig())
if err != nil {
return nil, errors.Wrap(err, "Error sending Cron Job Schedule message to Aurora Scheduler")
}
return response, nil
}
func (r realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) {
response, err := r.client.DescheduleCronJob(key)
if err != nil {
return nil, errors.Wrap(err, "Error sending Cron Job De-schedule message to Aurora Scheduler")
}
return response, nil
}
func (r realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error) {
response, err := r.client.StartCronJob(key)
if err != nil {
return nil, errors.Wrap(err, "Error sending Start Cron Job message to Aurora Scheduler")
}
return response, nil
}
// Restarts specific instances specified // Restarts specific instances specified
func (r realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) { func (r realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) {
instanceIds := make(map[int32]bool) instanceIds := make(map[int32]bool)
@ -263,12 +297,12 @@ func (r realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskC
InstanceIds: ids, InstanceIds: ids,
Statuses: aurora.ACTIVE_STATES} Statuses: aurora.ACTIVE_STATES}
response, err := r.client.GetTasksStatus(taskQ) resp, err := r.client.GetTasksStatus(taskQ)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "Error querying Aurora Scheduler for task configuration") return nil, errors.Wrap(err, "Error querying Aurora Scheduler for task configuration")
} }
tasks := response.GetResult_().GetScheduleStatusResult_().GetTasks() tasks := response.ScheduleStatusResult(resp).GetTasks()
if len(tasks) == 0 { if len(tasks) == 0 {
return nil, errors.Errorf("Instance %d for jobkey %s/%s/%s doesn't exist", return nil, errors.Errorf("Instance %d for jobkey %s/%s/%s doesn't exist",

View file

@ -27,3 +27,7 @@ func JobUpdateKey(resp *aurora.Response) *aurora.JobUpdateKey {
func JobUpdateDetails(resp *aurora.Response) []*aurora.JobUpdateDetails { func JobUpdateDetails(resp *aurora.Response) []*aurora.JobUpdateDetails {
return resp.Result_.GetJobUpdateDetailsResult_.DetailsList return resp.Result_.GetJobUpdateDetailsResult_.DetailsList
} }
func ScheduleStatusResult(resp *aurora.Response) *aurora.ScheduleStatusResult_ {
return resp.GetResult_().GetScheduleStatusResult_()
}