Added the ability to monitor job updates.

Added the ability to kill and restart specific instances.
Fixed incorrect documentation on using-the-sample-client.
Added helper functions under the response package to extract fields from
aurora.Response.
This commit is contained in:
Renan DelValle 2016-08-25 18:56:55 -07:00
parent d8958966db
commit 01b700554a
6 changed files with 165 additions and 27 deletions

View file

@ -24,6 +24,7 @@ These commands are set to run on a vagrant box. To be able to runt he docker com
executor examples, the vagrant box must be configured properly to use the docker compose executor.
### Thermos
#### Creating a Thermos job
```
$ cd $GOPATH/src/github.com/rdelval/gorealis
@ -35,11 +36,12 @@ $ go run $GOPATH/src/github.com/rdelval/gorealis.git/examples/client.go -executo
```
### Docker Compose executor (custom executor)
#### Creating Docker Compose executor job
```
$ go run $GOPATH/src/github.com/rdelval/gorealis/examples/client.go -executor=compose -url=http://192.168.33.7:8081 -cmd=create
```
#### Kill a Docker Compose executor job
```
$ go run $GOPATH/src/github.com/rdelval/gorealis.git/examples/client.go -executor=compose -url=http://192.168.33.7:8081 -cmd=kill
$ go run $GOPATH/src/github.com/rdelval/gorealis/examples/client.go -executor=compose -url=http://192.168.33.7:8081 -cmd=kill
```

View file

@ -21,6 +21,7 @@ import (
"github.com/rdelval/gorealis"
"io/ioutil"
"os"
"github.com/rdelval/gorealis/response"
)
func main() {
@ -115,46 +116,46 @@ func main() {
switch *cmd {
case "create":
fmt.Println("Creating job")
response, err := r.CreateJob(job)
resp, err := r.CreateJob(job)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
fmt.Println(response.String())
fmt.Println(resp.String())
break
case "kill":
fmt.Println("Killing job")
response, err := r.KillJob(job.JobKey())
resp, err := r.KillJob(job.JobKey())
if err != nil {
fmt.Println(err)
os.Exit(1)
}
fmt.Println(response.String())
fmt.Println(resp.String())
break
case "restart":
fmt.Println("Restarting job")
response, err := r.RestartJob(job.JobKey())
resp, err := r.RestartJob(job.JobKey())
if err != nil {
fmt.Println(err)
os.Exit(1)
}
fmt.Println(response.String())
fmt.Println(resp.String())
break
case "flexUp":
fmt.Println("Flexing up job")
response, err := r.AddInstances(aurora.InstanceKey{job.JobKey(), 0}, 5)
resp, err := r.AddInstances(aurora.InstanceKey{job.JobKey(), 0}, 5)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
fmt.Println(response.String())
fmt.Println(resp.String())
break
case "update":
fmt.Println("Updating a job with a new name")
fmt.Println("Updating a job with with more RAM and to 3 instances")
taskConfig, err := r.FetchTaskConfig(aurora.InstanceKey{job.JobKey(), 0})
if err != nil {
fmt.Println(err)
@ -163,21 +164,32 @@ func main() {
updateJob := realis.NewUpdateJob(taskConfig)
updateJob.InstanceCount(3).RAM(128)
resposne, err := r.StartJobUpdate(updateJob, "")
resp, err := r.StartJobUpdate(updateJob, "")
if err != nil {
fmt.Println(err)
os.Exit(1)
}
fmt.Println(resposne.String())
jobUpdateKey := response.JobUpdateKey(resp)
r.MonitorJobUpdate(*jobUpdateKey, 5, 100)
break
case "updateDetails":
resp, err := r.JobUpdateDetails(aurora.JobUpdateKey{job.JobKey(), *updateId})
if err != nil {
fmt.Println(err)
os.Exit(1)
}
response.JobUpdateDetails(resp)
break
case "abortUpdate":
fmt.Println("Abort update")
response, err := r.AbortJobUpdate(job.JobKey(), *updateId, "")
resp, err := r.AbortJobUpdate(aurora.JobUpdateKey{job.JobKey(), *updateId}, "")
if err != nil {
fmt.Println(err)
os.Exit(1)
}
fmt.Println(response.String())
fmt.Println(resp.String())
break
case "taskConfig":
fmt.Println("Getting job info")
@ -189,7 +201,7 @@ func main() {
print(config.String())
break
default:
fmt.Println("Only create, kill, restart, flexUp, update, and abortUpdate are supported now")
fmt.Println("Command not supported")
os.Exit(1)
}
}

61
monitors.go Normal file
View file

@ -0,0 +1,61 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Collection of monitors to create synchronicity
package realis
import (
"gen-go/apache/aurora"
"fmt"
"os"
"github.com/rdelval/gorealis/response"
"time"
)
// Polls the scheduler every certain amount of time to see if the update has succeeded
func (r realisClient) MonitorJobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout int) bool {
for i:= 0; i*interval <= timeout; i++ {
respDetail, err := r.JobUpdateDetails(updateKey)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
updateDetail := response.JobUpdateDetails(respDetail)
// Iterate through the history of the events available
for _, detail := range updateDetail.UpdateEvents {
if _, ok := aurora.ACTIVE_JOB_UPDATE_STATES[detail.Status]; !ok {
// Rolled forward is the only state in which an update has been successfully updated
// if we encounter an inactive state and it is not at rolled forward, update failed
if detail.Status == aurora.JobUpdateStatus_ROLLED_FORWARD {
fmt.Println("Update succeded")
return true
} else {
fmt.Println("Update failed")
return false
}
}
}
fmt.Println("Polling, update still active...")
time.Sleep(time.Duration(interval) * time.Second)
}
fmt.Println("Timed out")
return false
}

View file

@ -28,12 +28,15 @@ import (
)
type Realis interface {
AbortJobUpdate(key *aurora.JobKey, updateId string, message string) (*aurora.Response, error)
AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error)
AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error)
CreateJob(auroraJob Job) (*aurora.Response, error)
FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error)
JobUpdateDetails(updateKey aurora.JobUpdateKey) (*aurora.Response, error)
KillJob(key *aurora.JobKey) (*aurora.Response, error)
KillInstance(key *aurora.JobKey, instanceId int32) (*aurora.Response, error)
KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error)
MonitorJobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout int) bool
RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error)
RestartJob(key *aurora.JobKey) (*aurora.Response, error)
StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error)
Close()
@ -56,7 +59,8 @@ func NewClient(config RealisConfig) Realis {
protocolFactory := thrift.NewTJSONProtocolFactory()
return realisClient{client: aurora.NewAuroraSchedulerManagerClientFactory(config.transport, protocolFactory)}
return realisClient{
client: aurora.NewAuroraSchedulerManagerClientFactory(config.transport, protocolFactory)}
}
// Create a default configuration of the transport layer, requires a URL to test connection with.
@ -122,14 +126,16 @@ func (r realisClient) getActiveInstanceIds(key *aurora.JobKey) (map[int32]bool,
return jobInstanceIds, nil
}
// Kill a specific instance of a job.
func (r realisClient) KillInstance(key *aurora.JobKey, instanceId int32) (*aurora.Response, error) {
// Kill specific instances of a job.
func (r realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) {
instanceIds := make(map[int32]bool)
instanceIds[instanceId] = true
for _, instId := range instances {
instanceIds[instId] = true
}
response, err := r.client.KillTasks(key, instanceIds)
if err != nil {
return nil, errors.Wrap(err, "Error sending Kill command to Aurora Scheduler")
}
@ -169,6 +175,22 @@ func (r realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
return response, nil
}
// Restarts specific instances specified
func (r realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) {
instanceIds := make(map[int32]bool)
for _, instId := range instances {
instanceIds[instId] = true
}
response, err := r.client.RestartShards(key, instanceIds)
if err != nil {
return nil, errors.Wrap(err, "Error sending Restart command to Aurora Scheduler")
}
return response, nil
}
// Restarts all active tasks under a job configuration.
func (r realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) {
@ -204,11 +226,10 @@ func (r realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aur
// Abort Job Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI.
func (r realisClient) AbortJobUpdate(
key *aurora.JobKey,
updateId string,
updateKey aurora.JobUpdateKey,
message string) (*aurora.Response, error) {
response, err := r.client.AbortJobUpdate(&aurora.JobUpdateKey{key, updateId}, message)
response, err := r.client.AbortJobUpdate(&updateKey, message)
if err != nil {
return nil, errors.Wrap(err, "Error sending AbortJobUpdate command to Aurora Scheduler")
@ -258,3 +279,13 @@ func (r realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskC
return tasks[0].AssignedTask.Task, nil
}
func (r realisClient) JobUpdateDetails(updateKey aurora.JobUpdateKey) (*aurora.Response, error) {
resp, err := r.client.GetJobUpdateDetails(&updateKey)
if err != nil {
return nil, errors.Wrap(err, "Unable to get job update details")
}
return resp,nil
}

34
response/response.go Normal file
View file

@ -0,0 +1,34 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Helper functions to process aurora.Response
package response
import (
"gen-go/apache/aurora"
)
// Get key from a response created by a StartJobUpdate call
func JobUpdateKey(resp *aurora.Response) *aurora.JobUpdateKey {
return resp.Result_.StartJobUpdateResult_.GetKey()
}
func JobUpdateDetails(resp *aurora.Response) *aurora.JobUpdateDetails {
return resp.Result_.GetJobUpdateDetailsResult_.Details
}

View file

@ -98,14 +98,12 @@ func (u *UpdateJob) MaxPerInstanceFailures(inst int32) *UpdateJob {
// Max number of FAILED instances to tolerate before terminating the update.
func (u *UpdateJob) MaxFailedInstances(inst int32) *UpdateJob {
u.req.Settings.MaxFailedInstances = inst
return u
}
// When False, prevents auto rollback of a failed update.
func (u *UpdateJob) RollbackOnFail(rollback bool) *UpdateJob {
u.req.Settings.RollbackOnFailure = rollback
return u
}