Changing return type of all realis job related tasks to be *aurora.Response in order to account for future changes and provide the programmer flexibility

This commit is contained in:
Renan DelValle 2016-08-09 13:30:26 -07:00
parent f52e5105b8
commit 37be87d609
2 changed files with 43 additions and 43 deletions

View file

@ -47,11 +47,11 @@ func main() {
switch *executor { switch *executor {
case "thermos": case "thermos":
payload, err := ioutil.ReadFile("examples/thermos_payload.json") payload, err := ioutil.ReadFile("examples/thermos_payload.json")
if err != nil { if err != nil {
fmt.Print("Error reading json config file: ", err) fmt.Print("Error reading json config file: ", err)
os.Exit(1) os.Exit(1)
} }
job = realis.NewJob(). job = realis.NewJob().
Environment("prod"). Environment("prod").
Role("vagrant"). Role("vagrant").
@ -59,7 +59,7 @@ func main() {
ExecutorName(aurora.AURORA_EXECUTOR_NAME). ExecutorName(aurora.AURORA_EXECUTOR_NAME).
ExecutorData(string(payload)). ExecutorData(string(payload)).
CPU(1). CPU(1).
Ram(64). RAM(64).
Disk(100). Disk(100).
IsService(true). IsService(true).
InstanceCount(1). InstanceCount(1).
@ -73,7 +73,7 @@ func main() {
ExecutorName("docker-compose-executor"). ExecutorName("docker-compose-executor").
ExecutorData("{}"). ExecutorData("{}").
CPU(1). CPU(1).
Ram(64). RAM(64).
Disk(100). Disk(100).
IsService(false). IsService(false).
InstanceCount(1). InstanceCount(1).
@ -89,62 +89,62 @@ func main() {
switch *cmd { switch *cmd {
case "create": case "create":
fmt.Println("Creating job") fmt.Println("Creating job")
msg, err := r.CreateJob(job) response, err := r.CreateJob(job)
if err != nil { if err != nil {
fmt.Print(err) fmt.Print(err)
} }
fmt.Print(msg) fmt.Print(response.String())
break break
case "kill": case "kill":
fmt.Println("Killing job") fmt.Println("Killing job")
msg, err := r.KillJob(job.JobKey()) response, err := r.KillJob(job.JobKey())
if err != nil { if err != nil {
fmt.Print(err) fmt.Print(err)
} }
fmt.Print(msg) fmt.Print(response.String())
break break
case "restart": case "restart":
fmt.Println("Restarting job") fmt.Println("Restarting job")
msg, err := r.RestartJob(job.JobKey()) response, err := r.RestartJob(job.JobKey())
if err != nil { if err != nil {
fmt.Print(err) fmt.Print(err)
} }
fmt.Print(msg) fmt.Print(response.String())
break break
case "flexUp": case "flexUp":
fmt.Println("Flexing up job") fmt.Println("Flexing up job")
msg, err := r.AddInstances(job.JobKey(), 5) response, err := r.AddInstances(job.JobKey(), 5)
if err != nil { if err != nil {
fmt.Print(err) fmt.Print(err)
} }
fmt.Print(msg) fmt.Print(response.String())
break break
case "update": case "update":
fmt.Println("Updating a job with a new name") fmt.Println("Updating a job with a new name")
updateJob := realis.NewUpdateJob(job) updateJob := realis.NewUpdateJob(job)
updateJob.InstanceCount(3).Ram(128) updateJob.InstanceCount(3).RAM(128)
msg, err := r.StartJobUpdate(updateJob, "") resposne, err := r.StartJobUpdate(updateJob, "")
if err != nil { if err != nil {
fmt.Print(err) fmt.Print(err)
} }
fmt.Print(msg) fmt.Print(resposne.String())
break break
case "abortUpdate": case "abortUpdate":
fmt.Println("Abort update") fmt.Println("Abort update")
msg, err := r.AbortJobUpdate(job.JobKey(), *updateId, "") response, err := r.AbortJobUpdate(job.JobKey(), *updateId, "")
if err != nil { if err != nil {
fmt.Print(err) fmt.Print(err)
} }
fmt.Print(msg) fmt.Print(response.String())
break break
default: default:
fmt.Println("Only Create, Kill, and Restart are supported now") fmt.Println("Only create, kill, restart, flexUp, update, and abortUpdate are supported now")
os.Exit(1) os.Exit(1)
} }
} }

View file

@ -52,7 +52,7 @@ func NewDefaultConfig(url string) (RealisConfig, error) {
jar, err := cookiejar.New(nil) jar, err := cookiejar.New(nil)
if err != nil { if err != nil {
return RealisConfig{}, errors.Wrap(err, "Error creating Cookie Jar") return RealisConfig{}, errors.Wrap(err, "Error creating Cookie Jar.")
} }
//Custom client to timeout after 10 seconds to avoid hanging //Custom client to timeout after 10 seconds to avoid hanging
@ -60,12 +60,12 @@ func NewDefaultConfig(url string) (RealisConfig, error) {
thrift.THttpClientOptions{Client: &http.Client{Timeout: time.Second * 10, Jar: jar}}) thrift.THttpClientOptions{Client: &http.Client{Timeout: time.Second * 10, Jar: jar}})
if err != nil { if err != nil {
return RealisConfig{}, errors.Wrap(err, "Error creating transport") return RealisConfig{}, errors.Wrap(err, "Error creating transport.")
} }
if err := trans.Open(); err != nil { if err := trans.Open(); err != nil {
fmt.Fprintln(os.Stderr) fmt.Fprintln(os.Stderr)
return RealisConfig{}, errors.Wrapf(err, "Error opening connection to %s", url) return RealisConfig{}, errors.Wrapf(err, "Error opening connection to %s.", url)
} }
return RealisConfig{transport: trans}, nil return RealisConfig{transport: trans}, nil
@ -111,85 +111,86 @@ func (r *Realis) getActiveTaskIds(key *aurora.JobKey) (map[int32]bool, error) {
} }
// Sends a kill message to the scheduler for all active tasks under a job // Sends a kill message to the scheduler for all active tasks under a job
func (r *Realis) KillJob(key *aurora.JobKey) (string, error) { func (r *Realis) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
taskIds, err := r.getActiveTaskIds(key) taskIds, err := r.getActiveTaskIds(key)
if err != nil { if err != nil {
return "", errors.Wrap(err, "Could not retrieve relevant task IDs") return nil, errors.Wrap(err, "Could not retrieve relevant task IDs.")
} }
if len(taskIds) > 0 { if len(taskIds) > 0 {
response, err := r.client.KillTasks(key, taskIds) response, err := r.client.KillTasks(key, taskIds)
if err != nil { if err != nil {
return "", errors.Wrap(err, "Error sending Kill command to Aurora Scheduler") return nil, errors.Wrap(err, "Error sending Kill command to Aurora Scheduler.")
} }
return response.String(), nil
return response, nil
} else { } else {
return "No tasks in the Active state.", nil return nil, errors.New("No tasks in the Active state.")
} }
} }
// Sends a create job message to the scheduler with a specific job configuration // Sends a create job message to the scheduler with a specific job configuration
func (r *Realis) CreateJob(auroraJob *Job) (string, error) { func (r *Realis) CreateJob(auroraJob *Job) (*aurora.Response, error) {
response, err := r.client.CreateJob(auroraJob.jobConfig) response, err := r.client.CreateJob(auroraJob.jobConfig)
if err != nil { if err != nil {
return "", errors.Wrap(err, "Error sending Create command to Aurora Scheduler") return nil, errors.Wrap(err, "Error sending Create command to Aurora Scheduler.")
} }
return response.String(), nil return response, nil
} }
// Restarts all active tasks under a job configuration // Restarts all active tasks under a job configuration
func (r *Realis) RestartJob(key *aurora.JobKey) (string, error) { func (r *Realis) RestartJob(key *aurora.JobKey) (*aurora.Response, error) {
taskIds, err := r.getActiveTaskIds(key) taskIds, err := r.getActiveTaskIds(key)
if err != nil { if err != nil {
return "", errors.Wrap(err, "Could not retrieve relevant task IDs") return nil, errors.Wrap(err, "Could not retrieve relevant task IDs.")
} }
if len(taskIds) > 0 { if len(taskIds) > 0 {
response, err := r.client.RestartShards(key, taskIds) response, err := r.client.RestartShards(key, taskIds)
if err != nil { if err != nil {
return "", errors.Wrap(err, "Error sending Restart command to Aurora Scheduler") return nil, errors.Wrap(err, "Error sending Restart command to Aurora Scheduler.")
} }
return response.String(), nil return response, nil
} else { } else {
return "No tasks in the Active state.", nil return nil, errors.New("No tasks in the Active state.")
} }
} }
// Update all tasks under a job configuration // Update all tasks under a job configuration
func (r *Realis) StartJobUpdate(updateJob *UpdateJob, message string) (string, error) { func (r *Realis) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) {
response, err := r.client.StartJobUpdate(updateJob.req, message) response, err := r.client.StartJobUpdate(updateJob.req, message)
if err != nil { if err != nil {
return "", errors.Wrap(err, "Error sending StartJobUpdate command to Aurora Scheduler") return nil, errors.Wrap(err, "Error sending StartJobUpdate command to Aurora Scheduler.")
} }
return response.String(), nil return response, nil
} }
func (r *Realis) AbortJobUpdate( func (r *Realis) AbortJobUpdate(
key *aurora.JobKey, key *aurora.JobKey,
updateId string, updateId string,
message string) (string, error) { message string) (*aurora.Response, error) {
response, err := r.client.AbortJobUpdate(&aurora.JobUpdateKey{key, updateId}, message) response, err := r.client.AbortJobUpdate(&aurora.JobUpdateKey{key, updateId}, message)
if err != nil { if err != nil {
return "", errors.Wrap(err, "Error sending AbortJobUpdate command to Aurora Scheduler") return nil, errors.Wrap(err, "Error sending AbortJobUpdate command to Aurora Scheduler.")
} }
return response.String(), nil return response, nil
} }
// Scale up the number of instances under a job configuration // Scale up the number of instances under a job configuration
func (r *Realis) AddInstances(key *aurora.JobKey, count int32) (string, error) { func (r *Realis) AddInstances(key *aurora.JobKey, count int32) (*aurora.Response, error) {
//Scale up using the config from task 0. All tasks should be homogeneous. //Scale up using the config from task 0. All tasks should be homogeneous.
instKey := &aurora.InstanceKey{key, 0} instKey := &aurora.InstanceKey{key, 0}
@ -197,9 +198,8 @@ func (r *Realis) AddInstances(key *aurora.JobKey, count int32) (string, error) {
response, err := r.client.AddInstances(instKey, count) response, err := r.client.AddInstances(instKey, count)
if err != nil { if err != nil {
return "", errors.Wrap(err, "Error sending AddInstances command to Aurora Scheduler") return nil, errors.Wrap(err, "Error sending AddInstances command to Aurora Scheduler.")
} }
return response.String(), nil return response, nil
} }