Adding CreateService to realis interface. Uses the StartJobUpdate API to create services instead of the createJobs API
This commit is contained in:
parent
f6a21e0f59
commit
878a11f896
3 changed files with 109 additions and 10 deletions
|
@ -181,6 +181,29 @@ func main() {
|
|||
}
|
||||
|
||||
}
|
||||
break
|
||||
case "createService":
|
||||
// Create a service with three instances using the update API instead of the createJob API
|
||||
fmt.Println("Creating service")
|
||||
settings := realis.NewUpdateSettings()
|
||||
job.InstanceCount(3)
|
||||
_, resp, err := r.CreateService(job, *settings)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
fmt.Println(resp.String())
|
||||
|
||||
if ok, err := monitor.Instances(job.JobKey(), job.GetInstanceCount(), 5, 50); !ok || err != nil {
|
||||
_, err := r.KillJob(job.JobKey())
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
fmt.Println("ok: ", ok)
|
||||
fmt.Println("err: ", err)
|
||||
}
|
||||
|
||||
break
|
||||
case "createDocker":
|
||||
fmt.Println("Creating a docker based job")
|
||||
|
@ -336,7 +359,6 @@ func main() {
|
|||
}
|
||||
fmt.Println(resp.String())
|
||||
break
|
||||
|
||||
case "flexDown":
|
||||
fmt.Println("Flexing down job")
|
||||
|
||||
|
@ -391,7 +413,6 @@ func main() {
|
|||
|
||||
jobUpdateKey := response.JobUpdateKey(resp)
|
||||
monitor.JobUpdate(*jobUpdateKey, 5, 500)
|
||||
|
||||
break
|
||||
case "updateDetails":
|
||||
resp, err := r.JobUpdateDetails(aurora.JobUpdateQuery{
|
||||
|
@ -441,9 +462,7 @@ func main() {
|
|||
}
|
||||
print(config.String())
|
||||
break
|
||||
|
||||
case "updatesummary":
|
||||
|
||||
fmt.Println("Getting job update summary")
|
||||
jobquery := &aurora.JobUpdateQuery{
|
||||
Role: &job.JobKey().Role,
|
||||
|
@ -455,7 +474,6 @@ func main() {
|
|||
os.Exit(1)
|
||||
}
|
||||
fmt.Println(updatesummary)
|
||||
|
||||
case "taskStatus":
|
||||
fmt.Println("Getting task status")
|
||||
taskQ := &aurora.TaskQuery{Role: job.JobKey().Role,
|
||||
|
@ -469,7 +487,6 @@ func main() {
|
|||
}
|
||||
fmt.Printf("length: %d\n ", len(tasks))
|
||||
fmt.Printf("tasks: %+v\n", tasks)
|
||||
|
||||
case "tasksWithoutConfig":
|
||||
fmt.Println("Getting task status")
|
||||
taskQ := &aurora.TaskQuery{Role: job.JobKey().Role,
|
||||
|
@ -483,7 +500,6 @@ func main() {
|
|||
}
|
||||
fmt.Printf("length: %d\n ", len(tasks))
|
||||
fmt.Printf("tasks: %+v\n", tasks)
|
||||
|
||||
case "drainHosts":
|
||||
fmt.Println("Setting hosts to DRAINING")
|
||||
if hostList == "" {
|
||||
|
@ -515,7 +531,6 @@ func main() {
|
|||
}
|
||||
|
||||
fmt.Print(result.String())
|
||||
|
||||
case "endMaintenance":
|
||||
fmt.Println("Setting hosts to ACTIVE")
|
||||
if hostList == "" {
|
||||
|
@ -547,7 +562,6 @@ func main() {
|
|||
}
|
||||
|
||||
fmt.Print(result.String())
|
||||
|
||||
default:
|
||||
fmt.Println("Command not supported")
|
||||
os.Exit(1)
|
||||
|
|
19
realis.go
19
realis.go
|
@ -42,6 +42,7 @@ type Realis interface {
|
|||
AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error)
|
||||
AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error)
|
||||
CreateJob(auroraJob Job) (*aurora.Response, error)
|
||||
CreateService(auroraJob Job, settings UpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error)
|
||||
DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error)
|
||||
FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error)
|
||||
GetInstanceIds(key *aurora.JobKey, states map[aurora.ScheduleStatus]bool) (map[int32]bool, error)
|
||||
|
@ -595,6 +596,24 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
|
|||
|
||||
}
|
||||
|
||||
// This API uses an update thrift call to create the services giving a few more robust features.
|
||||
func (r *realisClient) CreateService(auroraJob Job, settings UpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error) {
|
||||
// Create a new job update object and ship it to the StartJobUpdate api
|
||||
update := NewUpdateJob(auroraJob.TaskConfig(), &settings.settings)
|
||||
update.InstanceCount(auroraJob.GetInstanceCount())
|
||||
|
||||
resp, err := r.StartJobUpdate(update, "")
|
||||
if err != nil {
|
||||
return resp, nil, errors.Wrap(err, "unable to create service")
|
||||
}
|
||||
|
||||
if resp != nil && resp.GetResult_() != nil {
|
||||
return resp, resp.GetResult_().GetStartJobUpdateResult_(), nil
|
||||
}
|
||||
|
||||
return resp, nil, errors.New("results object is nil")
|
||||
}
|
||||
|
||||
func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) {
|
||||
var resp *aurora.Response
|
||||
var clientErr error
|
||||
|
|
68
updatejob.go
68
updatejob.go
|
@ -24,12 +24,15 @@ type UpdateJob struct {
|
|||
req *aurora.JobUpdateRequest
|
||||
}
|
||||
|
||||
|
||||
|
||||
// Create a default UpdateJob object.
|
||||
func NewDefaultUpdateJob(config *aurora.TaskConfig) *UpdateJob {
|
||||
|
||||
req := aurora.NewJobUpdateRequest()
|
||||
req.TaskConfig = config
|
||||
req.Settings = aurora.NewJobUpdateSettings()
|
||||
s := NewUpdateSettings().Settings()
|
||||
req.Settings = &s
|
||||
|
||||
job := NewJob().(*AuroraJob)
|
||||
job.jobConfig.TaskConfig = config
|
||||
|
@ -137,3 +140,66 @@ func (u *UpdateJob) RollbackOnFail(rollback bool) *UpdateJob {
|
|||
u.req.Settings.RollbackOnFailure = rollback
|
||||
return u
|
||||
}
|
||||
|
||||
|
||||
// TODO(rdelvalle): Integrate this struct with the JobUpdate struct so that we don't repeat code
|
||||
type UpdateSettings struct {
|
||||
settings aurora.JobUpdateSettings
|
||||
}
|
||||
|
||||
func NewUpdateSettings() *UpdateSettings {
|
||||
|
||||
us := new(UpdateSettings)
|
||||
|
||||
// Mirrors defaults set by Pystachio
|
||||
us.settings.UpdateOnlyTheseInstances = make(map[*aurora.Range]bool)
|
||||
us.settings.UpdateGroupSize = 1
|
||||
us.settings.WaitForBatchCompletion = false
|
||||
us.settings.MinWaitInInstanceRunningMs = 45000
|
||||
us.settings.MaxPerInstanceFailures = 0
|
||||
us.settings.MaxFailedInstances = 0
|
||||
us.settings.RollbackOnFailure = true
|
||||
|
||||
return us
|
||||
}
|
||||
|
||||
// Max number of instances being updated at any given moment.
|
||||
func (u *UpdateSettings) BatchSize(size int32) *UpdateSettings {
|
||||
u.settings.UpdateGroupSize = size
|
||||
return u
|
||||
}
|
||||
|
||||
// Minimum number of seconds a shard must remain in RUNNING state before considered a success.
|
||||
func (u *UpdateSettings) WatchTime(ms int32) *UpdateSettings {
|
||||
u.settings.MinWaitInInstanceRunningMs = ms
|
||||
return u
|
||||
}
|
||||
|
||||
// Wait for all instances in a group to be done before moving on.
|
||||
func (u *UpdateSettings) WaitForBatchCompletion(batchWait bool) *UpdateSettings {
|
||||
u.settings.WaitForBatchCompletion = batchWait
|
||||
return u
|
||||
}
|
||||
|
||||
// Max number of instance failures to tolerate before marking instance as FAILED.
|
||||
func (u *UpdateSettings) MaxPerInstanceFailures(inst int32) *UpdateSettings {
|
||||
u.settings.MaxPerInstanceFailures = inst
|
||||
return u
|
||||
}
|
||||
|
||||
// Max number of FAILED instances to tolerate before terminating the update.
|
||||
func (u *UpdateSettings) MaxFailedInstances(inst int32) *UpdateSettings {
|
||||
u.settings.MaxFailedInstances = inst
|
||||
return u
|
||||
}
|
||||
|
||||
// When False, prevents auto rollback of a failed update.
|
||||
func (u *UpdateSettings) RollbackOnFail(rollback bool) *UpdateSettings {
|
||||
u.settings.RollbackOnFailure = rollback
|
||||
return u
|
||||
}
|
||||
|
||||
// Return internal Thrift API structure
|
||||
func (u UpdateSettings) Settings() aurora.JobUpdateSettings {
|
||||
return u.settings
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue