Adding ability to start an update (#12)

* Adding ability to start an update.

* Refactoring Job parsing code to be re-usable.
This commit is contained in:
Renán I. Del Valle 2020-05-07 15:40:22 -07:00 committed by GitHub
parent d7db155d88
commit 4806936c71
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 173 additions and 78 deletions

View file

@ -15,10 +15,7 @@
package cmd
import (
"strings"
"github.com/aurora-scheduler/australis/internal"
realis "github.com/aurora-scheduler/gorealis/v2"
"github.com/spf13/cobra"
)
@ -34,60 +31,15 @@ var createCmd = &cobra.Command{
}
func createJob(cmd *cobra.Command, args []string) {
job, err := internal.UnmarshalJob(args[0])
if err != nil {
log.Fatalln(err)
}
auroraJob := realis.NewJob().
Environment(job.Environment).
Role(job.Role).
Name(job.Name).
CPU(job.CPU).
RAM(job.RAM).
Disk(job.Disk).
IsService(job.Service).
InstanceCount(job.Instances)
// Adding URIs.
for _, uri := range job.URIs {
auroraJob.AddURIs(uri.Extract, uri.Cache, uri.URI)
}
// Adding Metadata.
for key, value := range job.Metadata {
auroraJob.AddLabel(key, value)
}
// If thermos jobs processes are provided, use them
if len(job.Thermos) > 0 {
thermosExec := realis.ThermosExecutor{}
for _, process := range job.Thermos {
thermosExec.AddProcess(realis.NewThermosProcess(process.Name, process.Cmd))
}
auroraJob.ThermosExecutor(thermosExec)
} else if job.Executor.Name != "" {
// Non-Thermos executor
if job.Executor.Name == "" {
log.Fatal("no executor name provided")
}
auroraJob.ExecutorName(job.Executor.Name)
auroraJob.ExecutorData(job.Executor.Data)
} else if job.Container != nil {
if job.Container.Docker == nil {
log.Fatal("no container specified")
}
if job.Container.Docker.Tag != "" && !strings.ContainsRune(job.Container.Docker.Name, ':') {
job.Container.Docker.Name += ":" + job.Container.Docker.Tag
}
auroraJob.Container(realis.NewDockerContainer().Image(job.Container.Docker.Name))
} else {
log.Fatal("task does not contain a thermos definition, a custom executor name, or a container to launch")
auroraJob, err := job.ToRealis()
if err != nil {
log.Fatalln(err)
}
if err := client.CreateJob(auroraJob); err != nil {

View file

@ -230,7 +230,7 @@ func slaDrain(cmd *cobra.Command, args []string) {
if cmd.Flags().Changed(percentageFlag) {
log.Infoln("Setting hosts to DRAINING with the Percentage SLA policy.")
policy.PercentageSlaPolicy = &aurora.PercentageSlaPolicy{
Percentage: percent,
Percentage: percent,
DurationSecs: int64(duration.Seconds()),
}
}
@ -273,5 +273,25 @@ func maintenance(cmd *cobra.Command, args []string) {
}
func update(cmd *cobra.Command, args []string) {
updateJob, err := internal.UnmarshalUpdate(args[0])
if err != nil {
log.Fatal(err)
}
update, err := updateJob.ToRealis()
if err != nil {
log.Fatal(err)
}
result, err := client.StartJobUpdate(update, "")
if err != nil {
log.Fatalf("Update failed to start %v", err)
}
if ok, monitorErr := client.MonitorJobUpdate(*result.GetKey(),
startUpdateCmd.MonitorInterval,
startUpdateCmd.MonitorTimeout); !ok || monitorErr != nil {
log.Fatal("update did not ROLL FORWARD before monitor timed out")
}
}

2
go.mod
View file

@ -1,7 +1,7 @@
module github.com/aurora-scheduler/australis
require (
github.com/aurora-scheduler/gorealis/v2 v2.22.0
github.com/aurora-scheduler/gorealis/v2 v2.22.1
github.com/pkg/errors v0.9.1
github.com/sirupsen/logrus v1.6.0
github.com/spf13/cobra v1.0.0

114
internal/converter.go Normal file
View file

@ -0,0 +1,114 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package internal
import (
"errors"
"fmt"
"strings"
realis "github.com/aurora-scheduler/gorealis/v2"
)
func (j *Job) ToRealis() (*realis.AuroraJob, error) {
auroraJob := realis.NewJob().
Environment(j.Environment).
Role(j.Role).
Name(j.Name).
CPU(j.CPU).
RAM(j.RAM).
Disk(j.Disk).
IsService(j.Service).
InstanceCount(j.Instances).
MaxFailure(j.MaxFailures)
// Adding URIs.
for _, uri := range j.URIs {
auroraJob.AddURIs(uri.Extract, uri.Cache, uri.URI)
}
// Adding Metadata.
for key, value := range j.Metadata {
auroraJob.AddLabel(key, value)
}
// If thermos jobs processes are provided, use them
if len(j.Thermos) > 0 {
thermosExec := realis.ThermosExecutor{}
for _, process := range j.Thermos {
thermosExec.AddProcess(realis.NewThermosProcess(process.Name, process.Cmd))
}
auroraJob.ThermosExecutor(thermosExec)
} else if j.Executor.Name != "" {
// Non-Thermos executor
if j.Executor.Name == "" {
return nil, errors.New("no executor name provided")
}
auroraJob.ExecutorName(j.Executor.Name)
auroraJob.ExecutorData(j.Executor.Data)
} else if j.Container != nil {
if j.Container.Docker == nil {
return nil, errors.New("no container specified")
}
if j.Container.Docker.Tag != "" && !strings.ContainsRune(j.Container.Docker.Name, ':') {
j.Container.Docker.Name += ":" + j.Container.Docker.Tag
}
auroraJob.Container(realis.NewDockerContainer().Image(j.Container.Docker.Name))
}
return auroraJob, nil
}
func (u *UpdateJob) ToRealis() (*realis.JobUpdate, error) {
jobConfig, err := u.JobConfig.ToRealis()
if err != nil {
return nil, fmt.Errorf("invalid job configuration %w", err)
}
update := realis.JobUpdateFromAuroraTask(jobConfig.AuroraTask())
update.MaxPerInstanceFailures(u.UpdateSettings.MaxPerInstanceFailures).
MaxFailedInstances(u.UpdateSettings.MaxFailedInstances).
WatchTime(u.UpdateSettings.MinTimeInRunning).
RollbackOnFail(u.UpdateSettings.RollbackOnFailure).
PulseIntervalTimeout(u.UpdateSettings.PulseTimeout).
SlaAware(u.UpdateSettings.SLAAware).
InstanceCount(u.UpdateSettings.InstanceCount)
strategy := u.UpdateSettings.Strategy
switch {
case strategy.VariableBatch != nil:
update.VariableBatchStrategy(strategy.VariableBatch.AutoPause, strategy.VariableBatch.GroupSizes...)
case strategy.Batch != nil:
update.BatchUpdateStrategy(strategy.Batch.AutoPause,strategy.Batch.GroupSize)
case strategy.Queue != nil:
update.QueueUpdateStrategy(strategy.Queue.GroupSize)
default:
update.QueueUpdateStrategy(1)
}
for _,r := range u.UpdateSettings.InstanceRanges {
update.AddInstanceRange(r.First, r.Last)
}
return update, nil
}

View file

@ -52,6 +52,7 @@ type Job struct {
Disk int64 `yaml:"disk"`
Executor Executor `yaml:"executor"`
Instances int32 `yaml:"instances"`
MaxFailures int32 `yaml:"maxFailures"`
URIs []URI `yaml:"uris"`
Metadata map[string]string `yaml:"labels"`
Service bool `yaml:"service"`
@ -59,22 +60,22 @@ type Job struct {
Container *Container `yaml:"container,omitempty"`
}
type InstanceRange struct {
Start int `yaml:"start"`
End int `yaml:"end"`
First int32 `yaml:"first"`
Last int32 `yaml:"last"`
}
type VariableBatchStrategy struct {
GroupSizes []int `yaml:"groupSizes"`
AutoPause bool `yaml:"autoPause"`
GroupSizes []int32 `yaml:"groupSizes"`
AutoPause bool `yaml:"autoPause"`
}
type BatchStrategy struct {
GroupSize int `yaml:"groupSize"`
AutoPause bool `yaml:"autoPause"`
GroupSize int32 `yaml:"groupSize"`
AutoPause bool `yaml:"autoPause"`
}
type QueueStrategy struct {
GroupSize int `yaml:"groupSize"`
GroupSize int32 `yaml:"groupSize"`
}
type UpdateStrategy struct {
@ -83,12 +84,13 @@ type UpdateStrategy struct {
Queue *QueueStrategy `yaml:"queue"`
}
type UpdateSettings struct {
MaxPerInstanceFailures int `yaml:"maxPerInstanceFailures"`
MaxFailedInstances int `yaml:"maxFailedInstances"`
MaxPerInstanceFailures int32 `yaml:"maxPerInstanceFailures"`
MaxFailedInstances int32 `yaml:"maxFailedInstances"`
MinTimeInRunning time.Duration `yaml:"minTimeRunning"`
RollbackOnFailure bool `yaml:"rollbackOnFailure"`
InstanceRanges []InstanceRange `yaml:"instanceRanges"`
BlockIfNoPulseAfter time.Duration `yaml:"blockIfNoPulseAfter"`
InstanceCount int32 `yaml:"instanceCount"`
PulseTimeout time.Duration `yaml:"pulseTimeout"`
SLAAware bool `yaml:"slaAware"`
Strategy UpdateStrategy `yaml:"strategy"`
}

View file

@ -27,6 +27,7 @@ import (
"github.com/spf13/cobra"
yaml "gopkg.in/yaml.v2"
)
type MonitorCmdConfig struct {
Cmd *cobra.Command
MonitorInterval, MonitorTimeout time.Duration
@ -109,44 +110,47 @@ func UnmarshalJob(filename string) (Job, error) {
return job, errors.Wrap(err, "unable to parse job config file")
}
if !job.Validate() {
return job, errors.New("invalid job config")
if err := job.Validate(); err != nil {
return job, fmt.Errorf("invalid job config %w", err)
}
}
return job, nil
}
func (j *Job) Validate() bool {
func (j *Job) Validate() error {
if j.Name == "" {
return false
return errors.New("job name not specified")
}
if j.Role == "" {
return false
return errors.New("job role not specified")
}
if j.Environment == "" {
return false
return errors.New("job environment not specified")
}
if j.Instances <= 0 {
return false
return errors.New("number of instances in job cannot be less than or equal to 0")
}
if j.CPU <= 0.0 {
return false
return errors.New("CPU must be greater than 0")
}
if j.RAM <= 0 {
return false
return errors.New("RAM must be greater than 0")
}
if j.Disk <= 0 {
return false
return errors.New("Disk must be greater than 0")
}
return true
if len(j.Thermos) == 0 && j.Executor.Name == "" && j.Container == nil {
return errors.New("task does not contain a thermos definition, a custom executor name, or a container to launch")
}
return nil
}
func UnmarshalUpdate(filename string) (UpdateJob, error) {
@ -160,11 +164,11 @@ func UnmarshalUpdate(filename string) (UpdateJob, error) {
return updateJob, errors.Wrap(err, "unable to parse job config file")
}
if !updateJob.JobConfig.Validate() {
return updateJob, errors.New("invalid job config")
if err := updateJob.JobConfig.Validate(); err != nil {
return updateJob, fmt.Errorf("invalid job config %w", err)
}
if err := updateJob.UpdateSettings.Validate(); err != nil {
return updateJob, errors.Wrap(err, "invalid update configuration")
return updateJob, fmt.Errorf("invalid update configuration %w", err)
}
}
@ -172,6 +176,10 @@ func UnmarshalUpdate(filename string) (UpdateJob, error) {
}
func (u *UpdateSettings) Validate() error {
if u.InstanceCount <= 0 {
return errors.New("instance count must be larger than 0")
}
if u.Strategy.VariableBatch != nil {
if len(u.Strategy.VariableBatch.GroupSizes) == 0 {
return errors.New("variable batch strategy must specify at least one batch size")
@ -191,7 +199,6 @@ func (u *UpdateSettings) Validate() error {
}
} else {
log.Info("No strategy set, falling back on queue strategy with a group size 1")
u.Strategy.Queue = &QueueStrategy{GroupSize: 1}
}
return nil
}