This commit is contained in:
Tan N. Le 2021-11-05 15:48:35 +05:30 committed by GitHub
commit 0d4da131de
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 107 additions and 0 deletions

View file

@ -294,3 +294,8 @@ func (j *JobUpdate) PartitionPolicy(reschedule bool, delay int64) *JobUpdate {
})
return j
}
func (j *JobUpdate) SlaPolicy(sla *aurora.SlaPolicy) *JobUpdate {
j.task.task.SlaPolicy = sla
return j
}

View file

@ -522,6 +522,52 @@ func (c *Client) RestartInstances(key aurora.JobKey, instances ...int32) error {
return nil
}
// Restarts specific instances specified with slaAware.
// If slaPolicy is nil, uses the existing slaPolicy of taskConfig.
func (c *Client) SlaRestartInstances(jobKey aurora.JobKey,
slaPolicy *aurora.SlaPolicy,
instances ...int32) (*aurora.StartJobUpdateResult_, error) {
c.logger.DebugPrintf("SlaRestartInstances Thrift Payload: %v %+v %v\n", slaPolicy, jobKey, instances)
if len(instances) == 0 {
c.logger.DebugPrintf("it is not necessary to restart 0 instances")
return nil, nil
}
jobSummary, err := c.GetJobSummary(jobKey.Role)
if err != nil {
return nil, err
}
var jobConfig *aurora.JobConfiguration
for _, s := range jobSummary.Summaries {
if s.Job.Key.Environment == jobKey.Environment && s.Job.Key.Name == jobKey.Name {
jobConfig = s.Job
}
}
if jobConfig == nil {
return nil, fmt.Errorf("failed to find %v", jobKey)
}
// create job update request
jobUpdate := JobUpdateFromConfig(jobConfig.TaskConfig)
jobUpdate.
SlaAware(true).
InstanceCount(jobConfig.InstanceCount)
if slaPolicy != nil {
jobUpdate.SlaPolicy(slaPolicy)
}
for _, v := range instances {
jobUpdate.AddInstanceRange(v, v)
}
msg := fmt.Sprintf("SlaRestartInstances %v-%v via StartJobUpdate", jobKey, instances)
if result, err := c.StartJobUpdate(jobUpdate, fmt.Sprintf("restart instances %v", instances)); err != nil {
return nil, errors.Wrap(err, msg)
} else {
return result, nil
}
}
// Restarts all active tasks under a job configuration.
func (c *Client) RestartJob(key aurora.JobKey) error {

View file

@ -925,3 +925,59 @@ func TestRealisClient_GetJobSummary(t *testing.T) {
err = r.KillJob(job.JobKey())
assert.NoError(t, err)
}
func TestRealisClient_SlaRestartInstances(t *testing.T) {
// Create a single job
role := "vagrant"
env := "prod"
name := "slaRestartInstances"
job := realis.NewJob().
Environment(env).
Role(role).
Name(name).
ThermosExecutor(thermosExec).
CPU(.01).
RAM(4).
Disk(10).
Tier("preferred").
InstanceCount(3).
IsService(true)
// Needed to populate the task config correctly
assert.NoError(t, job.BuildThermosPayload())
var cpu = 3.5
var ram int64 = 20480
var disk int64 = 10240
err := r.SetQuota(role, &cpu, &ram, &disk)
assert.NoError(t, err)
err = r.CreateJob(job)
assert.NoError(t, err)
// waiting until all instances running
success, err := r.MonitorScheduleStatus(job.JobKey(),
job.GetInstanceCount(),
[]aurora.ScheduleStatus{aurora.ScheduleStatus_RUNNING},
1*time.Second,
150*time.Second)
assert.True(t, success)
assert.NoError(t, err)
slaPolicy := &aurora.SlaPolicy{
PercentageSlaPolicy: &aurora.PercentageSlaPolicy{
Percentage: 50,
DurationSecs: 0,
},
}
t.Run("TestRealisClient_SlaRestartInstances", func(t *testing.T) {
result, err := r.SlaRestartInstances(job.JobKey(), slaPolicy, 0)
assert.NoError(t, err)
assert.NotNil(t, result)
assert.NoError(t, r.AbortJobUpdate(*result.GetKey(), "abort update to kill the job"))
assert.NoError(t, r.KillJob(job.JobKey()))
})
}