diff --git a/docs/using-the-sample-client.md b/docs/using-the-sample-client.md index 02b4925..293aedd 100644 --- a/docs/using-the-sample-client.md +++ b/docs/using-the-sample-client.md @@ -24,6 +24,7 @@ These commands are set to run on a vagrant box. To be able to runt he docker com executor examples, the vagrant box must be configured properly to use the docker compose executor. ### Thermos + #### Creating a Thermos job ``` $ cd $GOPATH/src/github.com/rdelval/gorealis @@ -35,11 +36,12 @@ $ go run $GOPATH/src/github.com/rdelval/gorealis.git/examples/client.go -executo ``` ### Docker Compose executor (custom executor) + #### Creating Docker Compose executor job ``` $ go run $GOPATH/src/github.com/rdelval/gorealis/examples/client.go -executor=compose -url=http://192.168.33.7:8081 -cmd=create ``` #### Kill a Docker Compose executor job ``` -$ go run $GOPATH/src/github.com/rdelval/gorealis.git/examples/client.go -executor=compose -url=http://192.168.33.7:8081 -cmd=kill +$ go run $GOPATH/src/github.com/rdelval/gorealis/examples/client.go -executor=compose -url=http://192.168.33.7:8081 -cmd=kill ``` diff --git a/examples/client.go b/examples/client.go index 954a54d..273ee7b 100644 --- a/examples/client.go +++ b/examples/client.go @@ -21,6 +21,7 @@ import ( "github.com/rdelval/gorealis" "io/ioutil" "os" + "github.com/rdelval/gorealis/response" ) func main() { @@ -115,46 +116,46 @@ func main() { switch *cmd { case "create": fmt.Println("Creating job") - response, err := r.CreateJob(job) + resp, err := r.CreateJob(job) if err != nil { fmt.Println(err) os.Exit(1) } - fmt.Println(response.String()) + fmt.Println(resp.String()) break case "kill": fmt.Println("Killing job") - response, err := r.KillJob(job.JobKey()) + resp, err := r.KillJob(job.JobKey()) if err != nil { fmt.Println(err) os.Exit(1) } - fmt.Println(response.String()) + fmt.Println(resp.String()) break case "restart": fmt.Println("Restarting job") - response, err := r.RestartJob(job.JobKey()) + resp, err := r.RestartJob(job.JobKey()) if err != nil { fmt.Println(err) os.Exit(1) } - fmt.Println(response.String()) + fmt.Println(resp.String()) break case "flexUp": fmt.Println("Flexing up job") - response, err := r.AddInstances(aurora.InstanceKey{job.JobKey(), 0}, 5) + resp, err := r.AddInstances(aurora.InstanceKey{job.JobKey(), 0}, 5) if err != nil { fmt.Println(err) os.Exit(1) } - fmt.Println(response.String()) + fmt.Println(resp.String()) break case "update": - fmt.Println("Updating a job with a new name") + fmt.Println("Updating a job with with more RAM and to 3 instances") taskConfig, err := r.FetchTaskConfig(aurora.InstanceKey{job.JobKey(), 0}) if err != nil { fmt.Println(err) @@ -163,21 +164,32 @@ func main() { updateJob := realis.NewUpdateJob(taskConfig) updateJob.InstanceCount(3).RAM(128) - resposne, err := r.StartJobUpdate(updateJob, "") + resp, err := r.StartJobUpdate(updateJob, "") if err != nil { fmt.Println(err) os.Exit(1) } - fmt.Println(resposne.String()) + + jobUpdateKey := response.JobUpdateKey(resp) + r.MonitorJobUpdate(*jobUpdateKey, 5, 100) + + break + case "updateDetails": + resp, err := r.JobUpdateDetails(aurora.JobUpdateKey{job.JobKey(), *updateId}) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + response.JobUpdateDetails(resp) break case "abortUpdate": fmt.Println("Abort update") - response, err := r.AbortJobUpdate(job.JobKey(), *updateId, "") + resp, err := r.AbortJobUpdate(aurora.JobUpdateKey{job.JobKey(), *updateId}, "") if err != nil { fmt.Println(err) os.Exit(1) } - fmt.Println(response.String()) + fmt.Println(resp.String()) break case "taskConfig": fmt.Println("Getting job info") @@ -189,7 +201,7 @@ func main() { print(config.String()) break default: - fmt.Println("Only create, kill, restart, flexUp, update, and abortUpdate are supported now") + fmt.Println("Command not supported") os.Exit(1) } } diff --git a/monitors.go b/monitors.go new file mode 100644 index 0000000..7415de2 --- /dev/null +++ b/monitors.go @@ -0,0 +1,61 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Collection of monitors to create synchronicity +package realis + +import ( + "gen-go/apache/aurora" + "fmt" + "os" + "github.com/rdelval/gorealis/response" + "time" +) + +// Polls the scheduler every certain amount of time to see if the update has succeeded +func (r realisClient) MonitorJobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout int) bool { + + for i:= 0; i*interval <= timeout; i++ { + respDetail, err := r.JobUpdateDetails(updateKey) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + + updateDetail := response.JobUpdateDetails(respDetail) + + // Iterate through the history of the events available + for _, detail := range updateDetail.UpdateEvents { + if _, ok := aurora.ACTIVE_JOB_UPDATE_STATES[detail.Status]; !ok { + + // Rolled forward is the only state in which an update has been successfully updated + // if we encounter an inactive state and it is not at rolled forward, update failed + if detail.Status == aurora.JobUpdateStatus_ROLLED_FORWARD { + fmt.Println("Update succeded") + return true + } else { + fmt.Println("Update failed") + return false + } + } + } + + fmt.Println("Polling, update still active...") + time.Sleep(time.Duration(interval) * time.Second) + } + + + fmt.Println("Timed out") + return false +} diff --git a/realis.go b/realis.go index 1cb1655..014ddef 100644 --- a/realis.go +++ b/realis.go @@ -28,12 +28,15 @@ import ( ) type Realis interface { - AbortJobUpdate(key *aurora.JobKey, updateId string, message string) (*aurora.Response, error) + AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) CreateJob(auroraJob Job) (*aurora.Response, error) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error) + JobUpdateDetails(updateKey aurora.JobUpdateKey) (*aurora.Response, error) KillJob(key *aurora.JobKey) (*aurora.Response, error) - KillInstance(key *aurora.JobKey, instanceId int32) (*aurora.Response, error) + KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) + MonitorJobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout int) bool + RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) RestartJob(key *aurora.JobKey) (*aurora.Response, error) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) Close() @@ -56,7 +59,8 @@ func NewClient(config RealisConfig) Realis { protocolFactory := thrift.NewTJSONProtocolFactory() - return realisClient{client: aurora.NewAuroraSchedulerManagerClientFactory(config.transport, protocolFactory)} + return realisClient{ + client: aurora.NewAuroraSchedulerManagerClientFactory(config.transport, protocolFactory)} } // Create a default configuration of the transport layer, requires a URL to test connection with. @@ -122,14 +126,16 @@ func (r realisClient) getActiveInstanceIds(key *aurora.JobKey) (map[int32]bool, return jobInstanceIds, nil } -// Kill a specific instance of a job. -func (r realisClient) KillInstance(key *aurora.JobKey, instanceId int32) (*aurora.Response, error) { +// Kill specific instances of a job. +func (r realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) { instanceIds := make(map[int32]bool) - instanceIds[instanceId] = true + + for _, instId := range instances { + instanceIds[instId] = true + } response, err := r.client.KillTasks(key, instanceIds) - if err != nil { return nil, errors.Wrap(err, "Error sending Kill command to Aurora Scheduler") } @@ -169,6 +175,22 @@ func (r realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { return response, nil } +// Restarts specific instances specified +func (r realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) { + instanceIds := make(map[int32]bool) + + for _, instId := range instances { + instanceIds[instId] = true + } + + response, err := r.client.RestartShards(key, instanceIds) + if err != nil { + return nil, errors.Wrap(err, "Error sending Restart command to Aurora Scheduler") + } + + return response, nil +} + // Restarts all active tasks under a job configuration. func (r realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) { @@ -204,11 +226,10 @@ func (r realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aur // Abort Job Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI. func (r realisClient) AbortJobUpdate( - key *aurora.JobKey, - updateId string, + updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) { - response, err := r.client.AbortJobUpdate(&aurora.JobUpdateKey{key, updateId}, message) + response, err := r.client.AbortJobUpdate(&updateKey, message) if err != nil { return nil, errors.Wrap(err, "Error sending AbortJobUpdate command to Aurora Scheduler") @@ -258,3 +279,13 @@ func (r realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskC return tasks[0].AssignedTask.Task, nil } + +func (r realisClient) JobUpdateDetails(updateKey aurora.JobUpdateKey) (*aurora.Response, error) { + + resp, err := r.client.GetJobUpdateDetails(&updateKey) + if err != nil { + return nil, errors.Wrap(err, "Unable to get job update details") + } + + return resp,nil +} diff --git a/response/response.go b/response/response.go new file mode 100644 index 0000000..d195bba --- /dev/null +++ b/response/response.go @@ -0,0 +1,34 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Helper functions to process aurora.Response +package response + +import ( + "gen-go/apache/aurora" +) + +// Get key from a response created by a StartJobUpdate call +func JobUpdateKey(resp *aurora.Response) *aurora.JobUpdateKey { + return resp.Result_.StartJobUpdateResult_.GetKey() +} + +func JobUpdateDetails(resp *aurora.Response) *aurora.JobUpdateDetails { + return resp.Result_.GetJobUpdateDetailsResult_.Details +} + + + + + diff --git a/updatejob.go b/updatejob.go index e222485..ea3c68b 100644 --- a/updatejob.go +++ b/updatejob.go @@ -98,14 +98,12 @@ func (u *UpdateJob) MaxPerInstanceFailures(inst int32) *UpdateJob { // Max number of FAILED instances to tolerate before terminating the update. func (u *UpdateJob) MaxFailedInstances(inst int32) *UpdateJob { - u.req.Settings.MaxFailedInstances = inst return u } // When False, prevents auto rollback of a failed update. func (u *UpdateJob) RollbackOnFail(rollback bool) *UpdateJob { - u.req.Settings.RollbackOnFailure = rollback return u }