diff --git a/cmd/create.go b/cmd/create.go index 75f33f5..c062c4c 100644 --- a/cmd/create.go +++ b/cmd/create.go @@ -15,10 +15,7 @@ package cmd import ( - "strings" - "github.com/aurora-scheduler/australis/internal" - realis "github.com/aurora-scheduler/gorealis/v2" "github.com/spf13/cobra" ) @@ -34,60 +31,15 @@ var createCmd = &cobra.Command{ } func createJob(cmd *cobra.Command, args []string) { - job, err := internal.UnmarshalJob(args[0]) if err != nil { log.Fatalln(err) } - auroraJob := realis.NewJob(). - Environment(job.Environment). - Role(job.Role). - Name(job.Name). - CPU(job.CPU). - RAM(job.RAM). - Disk(job.Disk). - IsService(job.Service). - InstanceCount(job.Instances) - - // Adding URIs. - for _, uri := range job.URIs { - auroraJob.AddURIs(uri.Extract, uri.Cache, uri.URI) - } - - // Adding Metadata. - for key, value := range job.Metadata { - auroraJob.AddLabel(key, value) - } - - // If thermos jobs processes are provided, use them - if len(job.Thermos) > 0 { - thermosExec := realis.ThermosExecutor{} - for _, process := range job.Thermos { - thermosExec.AddProcess(realis.NewThermosProcess(process.Name, process.Cmd)) - } - auroraJob.ThermosExecutor(thermosExec) - } else if job.Executor.Name != "" { - // Non-Thermos executor - if job.Executor.Name == "" { - log.Fatal("no executor name provided") - } - - auroraJob.ExecutorName(job.Executor.Name) - auroraJob.ExecutorData(job.Executor.Data) - } else if job.Container != nil { - if job.Container.Docker == nil { - log.Fatal("no container specified") - } - - if job.Container.Docker.Tag != "" && !strings.ContainsRune(job.Container.Docker.Name, ':') { - job.Container.Docker.Name += ":" + job.Container.Docker.Tag - } - auroraJob.Container(realis.NewDockerContainer().Image(job.Container.Docker.Name)) - - } else { - log.Fatal("task does not contain a thermos definition, a custom executor name, or a container to launch") + auroraJob, err := job.ToRealis() + if err != nil { + log.Fatalln(err) } if err := client.CreateJob(auroraJob); err != nil { diff --git a/cmd/start.go b/cmd/start.go index e6ef8c7..39cc7dd 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -230,7 +230,7 @@ func slaDrain(cmd *cobra.Command, args []string) { if cmd.Flags().Changed(percentageFlag) { log.Infoln("Setting hosts to DRAINING with the Percentage SLA policy.") policy.PercentageSlaPolicy = &aurora.PercentageSlaPolicy{ - Percentage: percent, + Percentage: percent, DurationSecs: int64(duration.Seconds()), } } @@ -273,5 +273,25 @@ func maintenance(cmd *cobra.Command, args []string) { } func update(cmd *cobra.Command, args []string) { + updateJob, err := internal.UnmarshalUpdate(args[0]) + if err != nil { + log.Fatal(err) + } + + update, err := updateJob.ToRealis() + if err != nil { + log.Fatal(err) + } + + result, err := client.StartJobUpdate(update, "") + if err != nil { + log.Fatalf("Update failed to start %v", err) + } + + if ok, monitorErr := client.MonitorJobUpdate(*result.GetKey(), + startUpdateCmd.MonitorInterval, + startUpdateCmd.MonitorTimeout); !ok || monitorErr != nil { + log.Fatal("update did not ROLL FORWARD before monitor timed out") + } } diff --git a/go.mod b/go.mod index ffd3e03..a4ef81b 100644 --- a/go.mod +++ b/go.mod @@ -1,7 +1,7 @@ module github.com/aurora-scheduler/australis require ( - github.com/aurora-scheduler/gorealis/v2 v2.22.0 + github.com/aurora-scheduler/gorealis/v2 v2.22.1 github.com/pkg/errors v0.9.1 github.com/sirupsen/logrus v1.6.0 github.com/spf13/cobra v1.0.0 diff --git a/internal/converter.go b/internal/converter.go new file mode 100644 index 0000000..a9204f7 --- /dev/null +++ b/internal/converter.go @@ -0,0 +1,114 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package internal + +import ( + "errors" + "fmt" + "strings" + + realis "github.com/aurora-scheduler/gorealis/v2" +) + +func (j *Job) ToRealis() (*realis.AuroraJob, error) { + + auroraJob := realis.NewJob(). + Environment(j.Environment). + Role(j.Role). + Name(j.Name). + CPU(j.CPU). + RAM(j.RAM). + Disk(j.Disk). + IsService(j.Service). + InstanceCount(j.Instances). + MaxFailure(j.MaxFailures) + + // Adding URIs. + for _, uri := range j.URIs { + auroraJob.AddURIs(uri.Extract, uri.Cache, uri.URI) + } + + // Adding Metadata. + for key, value := range j.Metadata { + auroraJob.AddLabel(key, value) + } + + // If thermos jobs processes are provided, use them + if len(j.Thermos) > 0 { + thermosExec := realis.ThermosExecutor{} + for _, process := range j.Thermos { + thermosExec.AddProcess(realis.NewThermosProcess(process.Name, process.Cmd)) + } + auroraJob.ThermosExecutor(thermosExec) + } else if j.Executor.Name != "" { + // Non-Thermos executor + if j.Executor.Name == "" { + return nil, errors.New("no executor name provided") + } + + auroraJob.ExecutorName(j.Executor.Name) + auroraJob.ExecutorData(j.Executor.Data) + } else if j.Container != nil { + if j.Container.Docker == nil { + return nil, errors.New("no container specified") + } + + if j.Container.Docker.Tag != "" && !strings.ContainsRune(j.Container.Docker.Name, ':') { + j.Container.Docker.Name += ":" + j.Container.Docker.Tag + } + auroraJob.Container(realis.NewDockerContainer().Image(j.Container.Docker.Name)) + + } + + return auroraJob, nil +} + +func (u *UpdateJob) ToRealis() (*realis.JobUpdate, error) { + + jobConfig, err := u.JobConfig.ToRealis() + if err != nil { + return nil, fmt.Errorf("invalid job configuration %w", err) + } + + update := realis.JobUpdateFromAuroraTask(jobConfig.AuroraTask()) + + update.MaxPerInstanceFailures(u.UpdateSettings.MaxPerInstanceFailures). + MaxFailedInstances(u.UpdateSettings.MaxFailedInstances). + WatchTime(u.UpdateSettings.MinTimeInRunning). + RollbackOnFail(u.UpdateSettings.RollbackOnFailure). + PulseIntervalTimeout(u.UpdateSettings.PulseTimeout). + SlaAware(u.UpdateSettings.SLAAware). + InstanceCount(u.UpdateSettings.InstanceCount) + + strategy := u.UpdateSettings.Strategy + switch { + case strategy.VariableBatch != nil: + update.VariableBatchStrategy(strategy.VariableBatch.AutoPause, strategy.VariableBatch.GroupSizes...) + case strategy.Batch != nil: + update.BatchUpdateStrategy(strategy.Batch.AutoPause,strategy.Batch.GroupSize) + case strategy.Queue != nil: + update.QueueUpdateStrategy(strategy.Queue.GroupSize) + default: + update.QueueUpdateStrategy(1) + } + + for _,r := range u.UpdateSettings.InstanceRanges { + update.AddInstanceRange(r.First, r.Last) + } + + return update, nil + + +} diff --git a/internal/types.go b/internal/types.go index 470f719..16cca88 100644 --- a/internal/types.go +++ b/internal/types.go @@ -52,6 +52,7 @@ type Job struct { Disk int64 `yaml:"disk"` Executor Executor `yaml:"executor"` Instances int32 `yaml:"instances"` + MaxFailures int32 `yaml:"maxFailures"` URIs []URI `yaml:"uris"` Metadata map[string]string `yaml:"labels"` Service bool `yaml:"service"` @@ -59,22 +60,22 @@ type Job struct { Container *Container `yaml:"container,omitempty"` } type InstanceRange struct { - Start int `yaml:"start"` - End int `yaml:"end"` + First int32 `yaml:"first"` + Last int32 `yaml:"last"` } type VariableBatchStrategy struct { - GroupSizes []int `yaml:"groupSizes"` - AutoPause bool `yaml:"autoPause"` + GroupSizes []int32 `yaml:"groupSizes"` + AutoPause bool `yaml:"autoPause"` } type BatchStrategy struct { - GroupSize int `yaml:"groupSize"` - AutoPause bool `yaml:"autoPause"` + GroupSize int32 `yaml:"groupSize"` + AutoPause bool `yaml:"autoPause"` } type QueueStrategy struct { - GroupSize int `yaml:"groupSize"` + GroupSize int32 `yaml:"groupSize"` } type UpdateStrategy struct { @@ -83,12 +84,13 @@ type UpdateStrategy struct { Queue *QueueStrategy `yaml:"queue"` } type UpdateSettings struct { - MaxPerInstanceFailures int `yaml:"maxPerInstanceFailures"` - MaxFailedInstances int `yaml:"maxFailedInstances"` + MaxPerInstanceFailures int32 `yaml:"maxPerInstanceFailures"` + MaxFailedInstances int32 `yaml:"maxFailedInstances"` MinTimeInRunning time.Duration `yaml:"minTimeRunning"` RollbackOnFailure bool `yaml:"rollbackOnFailure"` InstanceRanges []InstanceRange `yaml:"instanceRanges"` - BlockIfNoPulseAfter time.Duration `yaml:"blockIfNoPulseAfter"` + InstanceCount int32 `yaml:"instanceCount"` + PulseTimeout time.Duration `yaml:"pulseTimeout"` SLAAware bool `yaml:"slaAware"` Strategy UpdateStrategy `yaml:"strategy"` } diff --git a/internal/util.go b/internal/util.go index db0193c..aad41a7 100644 --- a/internal/util.go +++ b/internal/util.go @@ -27,6 +27,7 @@ import ( "github.com/spf13/cobra" yaml "gopkg.in/yaml.v2" ) + type MonitorCmdConfig struct { Cmd *cobra.Command MonitorInterval, MonitorTimeout time.Duration @@ -109,44 +110,47 @@ func UnmarshalJob(filename string) (Job, error) { return job, errors.Wrap(err, "unable to parse job config file") } - if !job.Validate() { - return job, errors.New("invalid job config") + if err := job.Validate(); err != nil { + return job, fmt.Errorf("invalid job config %w", err) } } return job, nil } -func (j *Job) Validate() bool { +func (j *Job) Validate() error { if j.Name == "" { - return false + return errors.New("job name not specified") } if j.Role == "" { - return false + return errors.New("job role not specified") } if j.Environment == "" { - return false + return errors.New("job environment not specified") } if j.Instances <= 0 { - return false + return errors.New("number of instances in job cannot be less than or equal to 0") } if j.CPU <= 0.0 { - return false + return errors.New("CPU must be greater than 0") } if j.RAM <= 0 { - return false + return errors.New("RAM must be greater than 0") } if j.Disk <= 0 { - return false + return errors.New("Disk must be greater than 0") } - return true + if len(j.Thermos) == 0 && j.Executor.Name == "" && j.Container == nil { + return errors.New("task does not contain a thermos definition, a custom executor name, or a container to launch") + } + return nil } func UnmarshalUpdate(filename string) (UpdateJob, error) { @@ -160,11 +164,11 @@ func UnmarshalUpdate(filename string) (UpdateJob, error) { return updateJob, errors.Wrap(err, "unable to parse job config file") } - if !updateJob.JobConfig.Validate() { - return updateJob, errors.New("invalid job config") + if err := updateJob.JobConfig.Validate(); err != nil { + return updateJob, fmt.Errorf("invalid job config %w", err) } if err := updateJob.UpdateSettings.Validate(); err != nil { - return updateJob, errors.Wrap(err, "invalid update configuration") + return updateJob, fmt.Errorf("invalid update configuration %w", err) } } @@ -172,6 +176,10 @@ func UnmarshalUpdate(filename string) (UpdateJob, error) { } func (u *UpdateSettings) Validate() error { + if u.InstanceCount <= 0 { + return errors.New("instance count must be larger than 0") + } + if u.Strategy.VariableBatch != nil { if len(u.Strategy.VariableBatch.GroupSizes) == 0 { return errors.New("variable batch strategy must specify at least one batch size") @@ -191,7 +199,6 @@ func (u *UpdateSettings) Validate() error { } } else { log.Info("No strategy set, falling back on queue strategy with a group size 1") - u.Strategy.Queue = &QueueStrategy{GroupSize: 1} } return nil }