diff --git a/realis.go b/realis.go index 7f08943..d8a477e 100644 --- a/realis.go +++ b/realis.go @@ -60,7 +60,7 @@ type RealisConfig struct { func NewClient(config RealisConfig) Realis { httpTrans := (config.transport).(*thrift.THttpClient) - httpTrans.SetHeader("User-Agent", "GoRealis v0.1") + httpTrans.SetHeader("User-Agent", "GoRealis v1.0.4") protocolFactory := thrift.NewTJSONProtocolFactory() @@ -140,12 +140,12 @@ func (r realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*au instanceIds[instId] = true } - response, err := r.client.KillTasks(key, instanceIds) + resp, err := r.client.KillTasks(key, instanceIds) if err != nil { return nil, errors.Wrap(err, "Error sending Kill command to Aurora Scheduler") } - return response, nil + return response.ResponseCodeCheck(resp) } // Sends a kill message to the scheduler for all active tasks under a job. @@ -157,13 +157,13 @@ func (r realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { } if len(instanceIds) > 0 { - response, err := r.client.KillTasks(key, instanceIds) + resp, err := r.client.KillTasks(key, instanceIds) if err != nil { return nil, errors.Wrap(err, "Error sending Kill command to Aurora Scheduler") } - return response, nil + return response.ResponseCodeCheck(resp) } else { return nil, errors.New("No tasks in the Active state") } @@ -171,43 +171,43 @@ func (r realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { // Sends a create job message to the scheduler with a specific job configuration. func (r realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { - response, err := r.client.CreateJob(auroraJob.JobConfig()) + resp, err := r.client.CreateJob(auroraJob.JobConfig()) if err != nil { return nil, errors.Wrap(err, "Error sending Create command to Aurora Scheduler") } - return response, nil + return response.ResponseCodeCheck(resp) } func (r realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) { - response, err := r.client.ScheduleCronJob(auroraJob.JobConfig()) + resp, err := r.client.ScheduleCronJob(auroraJob.JobConfig()) if err != nil { return nil, errors.Wrap(err, "Error sending Cron Job Schedule message to Aurora Scheduler") } - return response, nil + return response.ResponseCodeCheck(resp) } func (r realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) { - response, err := r.client.DescheduleCronJob(key) + resp, err := r.client.DescheduleCronJob(key) if err != nil { return nil, errors.Wrap(err, "Error sending Cron Job De-schedule message to Aurora Scheduler") } - return response, nil + return response.ResponseCodeCheck(resp) } func (r realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error) { - response, err := r.client.StartCronJob(key) + resp, err := r.client.StartCronJob(key) if err != nil { return nil, errors.Wrap(err, "Error sending Start Cron Job message to Aurora Scheduler") } - return response, nil + return response.ResponseCodeCheck(resp) } // Restarts specific instances specified @@ -218,12 +218,12 @@ func (r realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) ( instanceIds[instId] = true } - response, err := r.client.RestartShards(key, instanceIds) + resp, err := r.client.RestartShards(key, instanceIds) if err != nil { return nil, errors.Wrap(err, "Error sending Restart command to Aurora Scheduler") } - return response, nil + return response.ResponseCodeCheck(resp) } // Restarts all active tasks under a job configuration. @@ -235,13 +235,13 @@ func (r realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) { } if len(instanceIds) > 0 { - response, err := r.client.RestartShards(key, instanceIds) + resp, err := r.client.RestartShards(key, instanceIds) if err != nil { return nil, errors.Wrap(err, "Error sending Restart command to Aurora Scheduler") } - return response, nil + return response.ResponseCodeCheck(resp) } else { return nil, errors.New("No tasks in the Active state") } @@ -250,13 +250,13 @@ func (r realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) { // Update all tasks under a job configuration. Currently gorealis doesn't support for canary deployments. func (r realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) { - response, err := r.client.StartJobUpdate(updateJob.req, message) + resp, err := r.client.StartJobUpdate(updateJob.req, message) if err != nil { return nil, errors.Wrap(err, "Error sending StartJobUpdate command to Aurora Scheduler") } - return response, nil + return response.ResponseCodeCheck(resp) } // Abort Job Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI. @@ -264,26 +264,26 @@ func (r realisClient) AbortJobUpdate( updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) { - response, err := r.client.AbortJobUpdate(&updateKey, message) + resp, err := r.client.AbortJobUpdate(&updateKey, message) if err != nil { return nil, errors.Wrap(err, "Error sending AbortJobUpdate command to Aurora Scheduler") } - return response, nil + return response.ResponseCodeCheck(resp) } // Scale up the number of instances under a job configuration using the configuration for specific // instance to scale up. func (r realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) { - response, err := r.client.AddInstances(&instKey, count) + resp, err := r.client.AddInstances(&instKey, count) if err != nil { return nil, errors.Wrap(err, "Error sending AddInstances command to Aurora Scheduler") } - return response, nil + return response.ResponseCodeCheck(resp) } func (r realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error) { @@ -312,6 +312,7 @@ func (r realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskC instKey.JobKey.Name) } + // Currently, instance 0 is always picked return tasks[0].AssignedTask.Task, nil } @@ -322,7 +323,7 @@ func (r realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*auro return nil, errors.Wrap(err, "Unable to get job update details") } - return resp, nil + return response.ResponseCodeCheck(resp) } func (r realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) { @@ -331,5 +332,5 @@ func (r realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string) return nil, errors.Wrap(err, "Unable to roll back job update") } - return resp, nil + return response.ResponseCodeCheck(resp) } diff --git a/response/response.go b/response/response.go index b64e665..c9da05e 100644 --- a/response/response.go +++ b/response/response.go @@ -16,7 +16,9 @@ package response import ( + "bytes" "github.com/rdelval/gorealis/gen-go/apache/aurora" + "github.com/pkg/errors" ) // Get key from a response created by a StartJobUpdate call @@ -31,3 +33,24 @@ func JobUpdateDetails(resp *aurora.Response) []*aurora.JobUpdateDetails { func ScheduleStatusResult(resp *aurora.Response) *aurora.ScheduleStatusResult_ { return resp.GetResult_().GetScheduleStatusResult_() } + +func ResponseCodeCheck(resp *aurora.Response) (*aurora.Response, error) { + if resp.GetResponseCode() != aurora.ResponseCode_OK { + return resp, errors.New(CombineMessage(resp)) + } + + return resp, nil +} + +// Based on aurora client: src/main/python/apache/aurora/client/base.py +func CombineMessage(resp *aurora.Response) string { + var buffer bytes.Buffer + for _, detail := range resp.GetDetails() { + buffer.WriteString(detail.GetMessage() + ", ") + } + + if buffer.Len() > 0 { + buffer.Truncate(buffer.Len()-2) // Get rid of trailing comma + space + } + return buffer.String() +}