* Bumping up CI to go1.17 and enabling CI for PRs. * Adding go.sum now that issues seem to have gone away. * Bump up aurora to 0.25.0 and mesos to 1.9.0 * Fixing Mac tests. Adding extra time for killing thermos jobs. * Reduce the thermos overhead for unit tests Co-authored-by: lenhattan86 <lenhattan86@users.noreply.github.com>
296 lines
8.4 KiB
Go
296 lines
8.4 KiB
Go
/**
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package realis
|
|
|
|
import (
|
|
"time"
|
|
|
|
"github.com/apache/thrift/lib/go/thrift"
|
|
"github.com/aurora-scheduler/gorealis/v2/gen-go/apache/aurora"
|
|
)
|
|
|
|
// Structure to collect all information required to create job update
|
|
type JobUpdate struct {
|
|
task *AuroraTask
|
|
request *aurora.JobUpdateRequest
|
|
}
|
|
|
|
// Create a default JobUpdate object with an empty task and no fields filled in.
|
|
func NewJobUpdate() *JobUpdate {
|
|
newTask := NewTask()
|
|
|
|
return &JobUpdate{
|
|
task: newTask,
|
|
request: &aurora.JobUpdateRequest{TaskConfig: newTask.TaskConfig(), Settings: newUpdateSettings()},
|
|
}
|
|
}
|
|
|
|
// Creates an update with default values using an AuroraTask as the underlying task configuration.
|
|
// This function has a high level understanding of Aurora Tasks and thus will support copying a task that is configured
|
|
// to use Thermos.
|
|
func JobUpdateFromAuroraTask(task *AuroraTask) *JobUpdate {
|
|
newTask := task.Clone()
|
|
|
|
return &JobUpdate{
|
|
task: newTask,
|
|
request: &aurora.JobUpdateRequest{TaskConfig: newTask.TaskConfig(), Settings: newUpdateSettings()},
|
|
}
|
|
}
|
|
|
|
// JobUpdateFromConfig creates an update with default values using an aurora.TaskConfig
|
|
// primitive as the underlying task configuration.
|
|
// This function should not be used unless the implications of using a primitive value are understood.
|
|
// For example, the primitive has no concept of Thermos.
|
|
func JobUpdateFromConfig(task *aurora.TaskConfig) *JobUpdate {
|
|
// Perform a deep copy to avoid unexpected behavior
|
|
newTask := TaskFromThrift(task)
|
|
|
|
return &JobUpdate{
|
|
task: newTask,
|
|
request: &aurora.JobUpdateRequest{TaskConfig: newTask.TaskConfig(), Settings: newUpdateSettings()},
|
|
}
|
|
}
|
|
|
|
// Set instance count the job will have after the update.
|
|
func (j *JobUpdate) InstanceCount(inst int32) *JobUpdate {
|
|
j.request.InstanceCount = inst
|
|
return j
|
|
}
|
|
|
|
// Max number of instances being updated at any given moment.
|
|
func (j *JobUpdate) BatchSize(size int32) *JobUpdate {
|
|
j.request.Settings.UpdateGroupSize = size
|
|
return j
|
|
}
|
|
|
|
// Minimum number of seconds a shard must remain in RUNNING state before considered a success.
|
|
func (j *JobUpdate) WatchTime(timeout time.Duration) *JobUpdate {
|
|
j.request.Settings.MinWaitInInstanceRunningMs = int32(timeout.Milliseconds())
|
|
return j
|
|
}
|
|
|
|
// Wait for all instances in a group to be done before moving on.
|
|
func (j *JobUpdate) WaitForBatchCompletion(batchWait bool) *JobUpdate {
|
|
j.request.Settings.WaitForBatchCompletion = batchWait
|
|
return j
|
|
}
|
|
|
|
// Max number of instance failures to tolerate before marking instance as FAILED.
|
|
func (j *JobUpdate) MaxPerInstanceFailures(inst int32) *JobUpdate {
|
|
j.request.Settings.MaxPerInstanceFailures = inst
|
|
return j
|
|
}
|
|
|
|
// Max number of FAILED instances to tolerate before terminating the update.
|
|
func (j *JobUpdate) MaxFailedInstances(inst int32) *JobUpdate {
|
|
j.request.Settings.MaxFailedInstances = inst
|
|
return j
|
|
}
|
|
|
|
// When False, prevents auto rollback of a failed update.
|
|
func (j *JobUpdate) RollbackOnFail(rollback bool) *JobUpdate {
|
|
j.request.Settings.RollbackOnFailure = rollback
|
|
return j
|
|
}
|
|
|
|
// Sets the interval at which pulses should be received by the job update before timing out.
|
|
func (j *JobUpdate) PulseIntervalTimeout(timeout time.Duration) *JobUpdate {
|
|
j.request.Settings.BlockIfNoPulsesAfterMs = thrift.Int32Ptr(int32(timeout.Seconds() * 1000))
|
|
return j
|
|
}
|
|
func (j *JobUpdate) BatchUpdateStrategy(autoPause bool, batchSize int32) *JobUpdate {
|
|
j.request.Settings.UpdateStrategy = &aurora.JobUpdateStrategy{
|
|
BatchStrategy: &aurora.BatchJobUpdateStrategy{GroupSize: batchSize, AutopauseAfterBatch: autoPause},
|
|
}
|
|
return j
|
|
}
|
|
|
|
func (j *JobUpdate) QueueUpdateStrategy(groupSize int32) *JobUpdate {
|
|
j.request.Settings.UpdateStrategy = &aurora.JobUpdateStrategy{
|
|
QueueStrategy: &aurora.QueueJobUpdateStrategy{GroupSize: groupSize},
|
|
}
|
|
return j
|
|
}
|
|
|
|
func (j *JobUpdate) VariableBatchStrategy(autoPause bool, batchSizes ...int32) *JobUpdate {
|
|
j.request.Settings.UpdateStrategy = &aurora.JobUpdateStrategy{
|
|
VarBatchStrategy: &aurora.VariableBatchJobUpdateStrategy{GroupSizes: batchSizes, AutopauseAfterBatch: autoPause},
|
|
}
|
|
return j
|
|
}
|
|
|
|
// SlaAware makes the scheduler enforce the SLA Aware policy if the job meets the SLA awareness criteria.
|
|
// By default, the scheduler will only apply SLA Awareness to jobs in the production tier with 20 or more instances.
|
|
func (j *JobUpdate) SlaAware(slaAware bool) *JobUpdate {
|
|
j.request.Settings.SlaAware = &slaAware
|
|
return j
|
|
}
|
|
|
|
// AddInstanceRange allows updates to only touch a certain specific range of instances
|
|
func (j *JobUpdate) AddInstanceRange(first, last int32) *JobUpdate {
|
|
j.request.Settings.UpdateOnlyTheseInstances = append(j.request.Settings.UpdateOnlyTheseInstances,
|
|
&aurora.Range{First: first, Last: last})
|
|
return j
|
|
}
|
|
|
|
func newUpdateSettings() *aurora.JobUpdateSettings {
|
|
us := aurora.JobUpdateSettings{}
|
|
// Mirrors defaults set by Pystachio
|
|
us.UpdateOnlyTheseInstances = []*aurora.Range{}
|
|
us.UpdateGroupSize = 1
|
|
us.WaitForBatchCompletion = false
|
|
us.MinWaitInInstanceRunningMs = 45000
|
|
us.MaxPerInstanceFailures = 0
|
|
us.MaxFailedInstances = 0
|
|
us.RollbackOnFailure = true
|
|
|
|
return &us
|
|
}
|
|
|
|
/*
|
|
These methods are provided for user convenience in order to chain
|
|
calls for configuration.
|
|
API below here are wrappers around modifying an AuroraTask instance.
|
|
See task.go for further documentation.
|
|
*/
|
|
|
|
func (j *JobUpdate) Environment(env string) *JobUpdate {
|
|
j.task.Environment(env)
|
|
return j
|
|
}
|
|
|
|
func (j *JobUpdate) Role(role string) *JobUpdate {
|
|
j.task.Role(role)
|
|
return j
|
|
}
|
|
|
|
func (j *JobUpdate) Name(name string) *JobUpdate {
|
|
j.task.Name(name)
|
|
return j
|
|
}
|
|
|
|
func (j *JobUpdate) ExecutorName(name string) *JobUpdate {
|
|
j.task.ExecutorName(name)
|
|
return j
|
|
}
|
|
|
|
func (j *JobUpdate) ExecutorData(data string) *JobUpdate {
|
|
j.task.ExecutorData(data)
|
|
return j
|
|
}
|
|
|
|
func (j *JobUpdate) CPU(cpus float64) *JobUpdate {
|
|
j.task.CPU(cpus)
|
|
return j
|
|
}
|
|
|
|
func (j *JobUpdate) RAM(ram int64) *JobUpdate {
|
|
j.task.RAM(ram)
|
|
return j
|
|
}
|
|
|
|
func (j *JobUpdate) Disk(disk int64) *JobUpdate {
|
|
j.task.Disk(disk)
|
|
return j
|
|
}
|
|
|
|
func (j *JobUpdate) Tier(tier string) *JobUpdate {
|
|
j.task.Tier(tier)
|
|
return j
|
|
}
|
|
|
|
func (j *JobUpdate) TaskMaxFailure(maxFail int32) *JobUpdate {
|
|
j.task.MaxFailure(maxFail)
|
|
return j
|
|
}
|
|
|
|
func (j *JobUpdate) IsService(isService bool) *JobUpdate {
|
|
j.task.IsService(isService)
|
|
return j
|
|
}
|
|
|
|
func (j *JobUpdate) Priority(priority int32) *JobUpdate {
|
|
j.task.Priority(priority)
|
|
return j
|
|
}
|
|
|
|
func (j *JobUpdate) Production(production bool) *JobUpdate {
|
|
j.task.Production(production)
|
|
return j
|
|
}
|
|
|
|
func (j *JobUpdate) TaskConfig() *aurora.TaskConfig {
|
|
return j.task.TaskConfig()
|
|
}
|
|
|
|
func (j *JobUpdate) AddURIs(extract bool, cache bool, values ...string) *JobUpdate {
|
|
j.task.AddURIs(extract, cache, values...)
|
|
return j
|
|
}
|
|
|
|
func (j *JobUpdate) AddLabel(key string, value string) *JobUpdate {
|
|
j.task.AddLabel(key, value)
|
|
return j
|
|
}
|
|
|
|
func (j *JobUpdate) AddNamedPorts(names ...string) *JobUpdate {
|
|
j.task.AddNamedPorts(names...)
|
|
return j
|
|
}
|
|
|
|
func (j *JobUpdate) AddPorts(num int) *JobUpdate {
|
|
j.task.AddPorts(num)
|
|
return j
|
|
}
|
|
func (j *JobUpdate) AddValueConstraint(name string, negated bool, values ...string) *JobUpdate {
|
|
j.task.AddValueConstraint(name, negated, values...)
|
|
return j
|
|
}
|
|
|
|
func (j *JobUpdate) AddLimitConstraint(name string, limit int32) *JobUpdate {
|
|
j.task.AddLimitConstraint(name, limit)
|
|
return j
|
|
}
|
|
|
|
func (j *JobUpdate) AddDedicatedConstraint(role, name string) *JobUpdate {
|
|
j.task.AddDedicatedConstraint(role, name)
|
|
return j
|
|
}
|
|
|
|
func (j *JobUpdate) Container(container Container) *JobUpdate {
|
|
j.task.Container(container)
|
|
return j
|
|
}
|
|
|
|
func (j *JobUpdate) JobKey() aurora.JobKey {
|
|
return j.task.JobKey()
|
|
}
|
|
|
|
func (j *JobUpdate) ThermosExecutor(thermos ThermosExecutor) *JobUpdate {
|
|
j.task.ThermosExecutor(thermos)
|
|
return j
|
|
}
|
|
|
|
func (j *JobUpdate) BuildThermosPayload() error {
|
|
return j.task.BuildThermosPayload()
|
|
}
|
|
|
|
func (j *JobUpdate) PartitionPolicy(reschedule bool, delay int64) *JobUpdate {
|
|
j.task.PartitionPolicy(aurora.PartitionPolicy{
|
|
Reschedule: reschedule,
|
|
DelaySecs: &delay,
|
|
})
|
|
return j
|
|
}
|