Refactoring code to be compatible with Thrift 0.12.0 generated code. Tests are still not refactored.

This commit is contained in:
Renan DelValle 2018-11-27 18:45:10 -08:00
parent cec9c001fb
commit 59e3a7065e
No known key found for this signature in database
GPG key ID: C240AD6D6F443EC9
5 changed files with 136 additions and 178 deletions

View file

@ -294,47 +294,44 @@ func main() {
log.Fatal(err) log.Fatal(err)
} }
fmt.Println("Number of live instances: ", len(live)) fmt.Println("Active instances: ", live)
case "flexUp": case "flexUp":
fmt.Println("Flexing up job") fmt.Println("Flexing up job")
numOfInstances := int32(4) numOfInstances := 4
live, err := r.GetInstanceIds(job.JobKey(), aurora.ACTIVE_STATES) live, err := r.GetInstanceIds(job.JobKey(), aurora.ACTIVE_STATES)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
currInstances := int32(len(live)) currInstances := len(live)
fmt.Println("Current num of instances: ", currInstances) fmt.Println("Current num of instances: ", currInstances)
var instId int32
for k := range live {
instId = k
}
err = r.AddInstances(aurora.InstanceKey{ err = r.AddInstances(aurora.InstanceKey{
JobKey: job.JobKey(), JobKey: job.JobKey(),
InstanceId: instId, InstanceId: live[0],
}, },
numOfInstances) int32(numOfInstances))
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
if ok, err := monitor.Instances(job.JobKey(), currInstances+numOfInstances, 5*time.Second, 50*time.Second); !ok || err != nil { if ok, err := monitor.Instances(job.JobKey(), int32(currInstances+numOfInstances), 5*time.Second, 50*time.Second); !ok || err != nil {
fmt.Println("Flexing up failed") fmt.Println("Flexing up failed")
} }
case "flexDown": case "flexDown":
fmt.Println("Flexing down job") fmt.Println("Flexing down job")
numOfInstances := int32(2) numOfInstances := 2
live, err := r.GetInstanceIds(job.JobKey(), aurora.ACTIVE_STATES) live, err := r.GetInstanceIds(job.JobKey(), aurora.ACTIVE_STATES)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
currInstances := int32(len(live)) currInstances := len(live)
fmt.Println("Current num of instances: ", currInstances) fmt.Println("Current num of instances: ", currInstances)
err = r.RemoveInstances(job.JobKey(), numOfInstances) err = r.RemoveInstances(job.JobKey(), numOfInstances)
@ -342,7 +339,7 @@ func main() {
log.Fatal(err) log.Fatal(err)
} }
if ok, err := monitor.Instances(job.JobKey(), currInstances-numOfInstances, 5*time.Second, 100*time.Second); !ok || err != nil { if ok, err := monitor.Instances(job.JobKey(), int32(currInstances-numOfInstances), 5*time.Second, 100*time.Second); !ok || err != nil {
fmt.Println("flexDown failed") fmt.Println("flexDown failed")
} }
@ -352,13 +349,9 @@ func main() {
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
var instId int32
for k := range live {
instId = k
}
taskConfig, err := r.FetchTaskConfig(aurora.InstanceKey{ taskConfig, err := r.FetchTaskConfig(aurora.InstanceKey{
JobKey: job.JobKey(), JobKey: job.JobKey(),
InstanceId: instId, InstanceId: live[0],
}) })
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
@ -451,14 +444,9 @@ func main() {
log.Fatal(err) log.Fatal(err)
} }
var instId int32
for k := range live {
instId = k
break
}
config, err := r.FetchTaskConfig(aurora.InstanceKey{ config, err := r.FetchTaskConfig(aurora.InstanceKey{
JobKey: job.JobKey(), JobKey: job.JobKey(),
InstanceId: instId, InstanceId: live[0],
}) })
if err != nil { if err != nil {

71
job.go
View file

@ -41,28 +41,25 @@ func NewJob() *AuroraJob {
taskConfig.Job = jobKey taskConfig.Job = jobKey
taskConfig.Container = aurora.NewContainer() taskConfig.Container = aurora.NewContainer()
taskConfig.Container.Mesos = aurora.NewMesosContainer() taskConfig.Container.Mesos = aurora.NewMesosContainer()
taskConfig.MesosFetcherUris = make(map[*aurora.MesosFetcherURI]bool) taskConfig.MesosFetcherUris = make([]*aurora.MesosFetcherURI, 0)
taskConfig.Metadata = make(map[*aurora.Metadata]bool) taskConfig.Metadata = make([]*aurora.Metadata, 0)
taskConfig.Constraints = make(map[*aurora.Constraint]bool) taskConfig.Constraints = make([]*aurora.Constraint, 0)
// Resources // Resources
numCpus := aurora.NewResource() numCpus := aurora.NewResource()
ramMb := aurora.NewResource() ramMb := aurora.NewResource()
diskMb := aurora.NewResource() diskMb := aurora.NewResource()
numCpus.NumCpus = new(float64)
ramMb.RamMb = new(int64)
diskMb.DiskMb = new(int64)
resources := make(map[string]*aurora.Resource) resources := make(map[string]*aurora.Resource)
resources["cpu"] = numCpus resources["cpu"] = numCpus
resources["ram"] = ramMb resources["ram"] = ramMb
resources["disk"] = diskMb resources["disk"] = diskMb
taskConfig.Resources = make(map[*aurora.Resource]bool) taskConfig.Resources = []*aurora.Resource{numCpus, ramMb, diskMb}
taskConfig.Resources[numCpus] = true
taskConfig.Resources[ramMb] = true
taskConfig.Resources[diskMb] = true
numCpus.NumCpus = new(float64)
ramMb.RamMb = new(int64)
diskMb.DiskMb = new(int64)
return &AuroraJob{ return &AuroraJob{
jobConfig: jobConfig, jobConfig: jobConfig,
@ -185,11 +182,9 @@ func (j *AuroraJob) TaskConfig() *aurora.TaskConfig {
// --enable_mesos_fetcher flag enabled. Currently there is no duplicate detection. // --enable_mesos_fetcher flag enabled. Currently there is no duplicate detection.
func (j *AuroraJob) AddURIs(extract bool, cache bool, values ...string) *AuroraJob { func (j *AuroraJob) AddURIs(extract bool, cache bool, values ...string) *AuroraJob {
for _, value := range values { for _, value := range values {
j.jobConfig.TaskConfig.MesosFetcherUris[&aurora.MesosFetcherURI{ j.jobConfig.TaskConfig.MesosFetcherUris = append(
Value: value, j.jobConfig.TaskConfig.MesosFetcherUris,
Extract: &extract, &aurora.MesosFetcherURI{Value: value, Extract: &extract, Cache: &cache})
Cache: &cache,
}] = true
} }
return j return j
} }
@ -197,7 +192,7 @@ func (j *AuroraJob) AddURIs(extract bool, cache bool, values ...string) *AuroraJ
// Adds a Mesos label to the job. Note that Aurora will add the // Adds a Mesos label to the job. Note that Aurora will add the
// prefix "org.apache.aurora.metadata." to the beginning of each key. // prefix "org.apache.aurora.metadata." to the beginning of each key.
func (j *AuroraJob) AddLabel(key string, value string) *AuroraJob { func (j *AuroraJob) AddLabel(key string, value string) *AuroraJob {
j.jobConfig.TaskConfig.Metadata[&aurora.Metadata{Key: key, Value: value}] = true j.jobConfig.TaskConfig.Metadata = append(j.jobConfig.TaskConfig.Metadata, &aurora.Metadata{Key: key, Value: value})
return j return j
} }
@ -206,7 +201,7 @@ func (j *AuroraJob) AddLabel(key string, value string) *AuroraJob {
func (j *AuroraJob) AddNamedPorts(names ...string) *AuroraJob { func (j *AuroraJob) AddNamedPorts(names ...string) *AuroraJob {
j.portCount += len(names) j.portCount += len(names)
for _, name := range names { for _, name := range names {
j.jobConfig.TaskConfig.Resources[&aurora.Resource{NamedPort: &name}] = true j.jobConfig.TaskConfig.Resources = append(j.jobConfig.TaskConfig.Resources, &aurora.Resource{NamedPort: &name})
} }
return j return j
@ -221,7 +216,7 @@ func (j *AuroraJob) AddPorts(num int) *AuroraJob {
j.portCount += num j.portCount += num
for i := start; i < j.portCount; i++ { for i := start; i < j.portCount; i++ {
portName := "org.apache.aurora.port." + strconv.Itoa(i) portName := "org.apache.aurora.port." + strconv.Itoa(i)
j.jobConfig.TaskConfig.Resources[&aurora.Resource{NamedPort: &portName}] = true j.jobConfig.TaskConfig.Resources = append(j.jobConfig.TaskConfig.Resources, &aurora.Resource{NamedPort: &portName})
} }
return j return j
@ -233,20 +228,17 @@ func (j *AuroraJob) AddPorts(num int) *AuroraJob {
// If negated = true , treat this as a 'not' - to avoid specific values. // If negated = true , treat this as a 'not' - to avoid specific values.
// Values - list of values we look for in attribute name // Values - list of values we look for in attribute name
func (j *AuroraJob) AddValueConstraint(name string, negated bool, values ...string) *AuroraJob { func (j *AuroraJob) AddValueConstraint(name string, negated bool, values ...string) *AuroraJob {
constraintValues := make(map[string]bool) j.jobConfig.TaskConfig.Constraints = append(j.jobConfig.TaskConfig.Constraints,
for _, value := range values { &aurora.Constraint{
constraintValues[value] = true Name: name,
} Constraint: &aurora.TaskConstraint{
j.jobConfig.TaskConfig.Constraints[&aurora.Constraint{ Value: &aurora.ValueConstraint{
Name: name, Negated: negated,
Constraint: &aurora.TaskConstraint{ Values: values,
Value: &aurora.ValueConstraint{ },
Negated: negated, Limit: nil,
Values: constraintValues,
}, },
Limit: nil, })
},
}] = true
return j return j
} }
@ -255,13 +247,14 @@ func (j *AuroraJob) AddValueConstraint(name string, negated bool, values ...stri
// A constraint that specifies the maximum number of active tasks on a host with // A constraint that specifies the maximum number of active tasks on a host with
// a matching attribute that may be scheduled simultaneously. // a matching attribute that may be scheduled simultaneously.
func (j *AuroraJob) AddLimitConstraint(name string, limit int32) *AuroraJob { func (j *AuroraJob) AddLimitConstraint(name string, limit int32) *AuroraJob {
j.jobConfig.TaskConfig.Constraints[&aurora.Constraint{ j.jobConfig.TaskConfig.Constraints = append(j.jobConfig.TaskConfig.Constraints,
Name: name, &aurora.Constraint{
Constraint: &aurora.TaskConstraint{ Name: name,
Value: nil, Constraint: &aurora.TaskConstraint{
Limit: &aurora.LimitConstraint{Limit: limit}, Value: nil,
}, Limit: &aurora.LimitConstraint{Limit: limit},
}] = true },
})
return j return j
} }

View file

@ -59,9 +59,16 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval, timeout tim
m.Client.RealisConfig().logger.Println("No update found") m.Client.RealisConfig().logger.Println("No update found")
return false, errors.New("No update found for " + updateKey.String()) return false, errors.New("No update found for " + updateKey.String())
} }
status := updateDetail.GetDetailsList()[0].Update.Summary.State.Status status := updateDetail.GetDetailsList()[0].Update.Summary.State.Status
if _, ok := aurora.ACTIVE_JOB_UPDATE_STATES[status]; !ok { // Convert Thrift Set to Golang map for quick lookup
activeStatus := map[aurora.JobUpdateStatus]bool{}
for _, stat := range aurora.ACTIVE_JOB_UPDATE_STATES {
activeStatus[stat] = true
}
if _, ok := activeStatus[status]; !ok {
// Rolled forward is the only state in which an update has been successfully updated // Rolled forward is the only state in which an update has been successfully updated
// if we encounter an inactive state and it is not at rolled forward, update failed // if we encounter an inactive state and it is not at rolled forward, update failed
@ -93,7 +100,7 @@ func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval, timeo
// Monitor a AuroraJob until all instances enter a desired status. // Monitor a AuroraJob until all instances enter a desired status.
// Defaults sets of desired statuses provided by the thrift API include: // Defaults sets of desired statuses provided by the thrift API include:
// ACTIVE_STATES, SLAVE_ASSIGNED_STATES, LIVE_STATES, and TERMINAL_STATES // ACTIVE_STATES, SLAVE_ASSIGNED_STATES, LIVE_STATES, and TERMINAL_STATES
func (m *Monitor) ScheduleStatus(key *aurora.JobKey, instanceCount int32, desiredStatuses map[aurora.ScheduleStatus]bool, interval, timeout time.Duration) (bool, error) { func (m *Monitor) ScheduleStatus(key *aurora.JobKey, instanceCount int32, desiredStatuses []aurora.ScheduleStatus, interval, timeout time.Duration) (bool, error) {
if interval < 1*time.Second || timeout < 1*time.Second { if interval < 1*time.Second || timeout < 1*time.Second {
return false, errors.New("Interval or timeout cannot be below one second.") return false, errors.New("Interval or timeout cannot be below one second.")
} }
@ -164,7 +171,7 @@ func (m *Monitor) HostMaintenance(hosts []string, modes []aurora.MaintenanceMode
return hostResult, errors.Wrap(err, "client error in monitor") return hostResult, errors.Wrap(err, "client error in monitor")
} }
for status := range result.GetStatuses() { for _, status := range result.GetStatuses() {
if _, ok := desiredMode[status.GetMode()]; ok { if _, ok := desiredMode[status.GetMode()]; ok {
hostResult[status.GetHost()] = true hostResult[status.GetHost()] = true

186
realis.go
View file

@ -26,6 +26,7 @@ import (
"net/http/cookiejar" "net/http/cookiejar"
"os" "os"
"path/filepath" "path/filepath"
"sort"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -46,6 +47,7 @@ type RealisClient struct {
logger LevelLogger logger LevelLogger
lock *sync.Mutex lock *sync.Mutex
debug bool debug bool
transport thrift.TTransport
} }
type RealisConfig struct { type RealisConfig struct {
@ -234,7 +236,7 @@ func NewRealisClient(options ...ClientOption) (*RealisClient, error) {
config.logger.DebugPrintln("Number of options applied to config: ", len(options)) config.logger.DebugPrintln("Number of options applied to config: ", len(options))
//Set default Transport to JSON if needed. // Set default Transport to JSON if needed.
if !config.jsonTransport && !config.binTransport { if !config.jsonTransport && !config.binTransport {
config.jsonTransport = true config.jsonTransport = true
} }
@ -296,7 +298,9 @@ func NewRealisClient(options ...ClientOption) (*RealisClient, error) {
readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory), readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory),
adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory), adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory),
logger: LevelLogger{config.logger, config.debug}, logger: LevelLogger{config.logger, config.debug},
lock: &sync.Mutex{}}, nil lock: &sync.Mutex{},
transport: config.transport,
}, nil
} }
func GetCerts(certPath string) (*x509.CertPool, error) { func GetCerts(certPath string) (*x509.CertPool, error) {
@ -355,7 +359,7 @@ func defaultTTransport(url string, timeout time.Duration, config *RealisConfig)
transport.TLSClientConfig = tlsConfig transport.TLSClientConfig = tlsConfig
} }
trans, err := thrift.NewTHttpPostClientWithOptions(url+"/api", trans, err := thrift.NewTHttpClientWithOptions(url+"/api",
thrift.THttpClientOptions{Client: &http.Client{Timeout: timeout, Transport: &transport, Jar: jar}}) thrift.THttpClientOptions{Client: &http.Client{Timeout: timeout, Transport: &transport, Jar: jar}})
if err != nil { if err != nil {
@ -379,7 +383,7 @@ func newDefaultConfig(url string, timeout time.Duration, config *RealisConfig) (
func newTJSONConfig(url string, timeout time.Duration, config *RealisConfig) (*RealisConfig, error) { func newTJSONConfig(url string, timeout time.Duration, config *RealisConfig) (*RealisConfig, error) {
trans, err := defaultTTransport(url, timeout, config) trans, err := defaultTTransport(url, timeout, config)
if err != nil { if err != nil {
return &RealisConfig{}, errors.Wrap(err, "Error creating realis config") return nil, errors.Wrap(err, "Error creating realis config")
} }
httpTrans := (trans).(*thrift.THttpClient) httpTrans := (trans).(*thrift.THttpClient)
@ -393,7 +397,7 @@ func newTJSONConfig(url string, timeout time.Duration, config *RealisConfig) (*R
func newTBinaryConfig(url string, timeout time.Duration, config *RealisConfig) (*RealisConfig, error) { func newTBinaryConfig(url string, timeout time.Duration, config *RealisConfig) (*RealisConfig, error) {
trans, err := defaultTTransport(url, timeout, config) trans, err := defaultTTransport(url, timeout, config)
if err != nil { if err != nil {
return &RealisConfig{}, errors.Wrap(err, "Error creating realis config") return nil, errors.Wrap(err, "Error creating realis config")
} }
httpTrans := (trans).(*thrift.THttpClient) httpTrans := (trans).(*thrift.THttpClient)
@ -444,13 +448,11 @@ func (r *RealisClient) Close() {
r.lock.Lock() r.lock.Lock()
defer r.lock.Unlock() defer r.lock.Unlock()
r.client.Transport.Close() r.transport.Close()
r.readonlyClient.Transport.Close()
r.adminClient.Transport.Close()
} }
// Uses predefined set of states to retrieve a set of active jobs in Apache Aurora. // Uses predefined set of states to retrieve a set of active jobs in Apache Aurora.
func (r *RealisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.ScheduleStatus]bool) (map[int32]bool, error) { func (r *RealisClient) GetInstanceIds(key *aurora.JobKey, states []aurora.ScheduleStatus) ([]int32, error) {
taskQ := &aurora.TaskQuery{ taskQ := &aurora.TaskQuery{
Role: &key.Role, Role: &key.Role,
Environment: &key.Environment, Environment: &key.Environment,
@ -461,7 +463,7 @@ func (r *RealisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche
r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", taskQ) r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", taskQ)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.GetTasksWithoutConfigs(taskQ) return r.client.GetTasksWithoutConfigs(nil, taskQ)
}) })
// If we encountered an error we couldn't recover from by retrying, return an error to the user // If we encountered an error we couldn't recover from by retrying, return an error to the user
@ -471,9 +473,9 @@ func (r *RealisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche
// Construct instance id map to stay in line with thrift's representation of sets // Construct instance id map to stay in line with thrift's representation of sets
tasks := response.ScheduleStatusResult(resp).GetTasks() tasks := response.ScheduleStatusResult(resp).GetTasks()
jobInstanceIds := make(map[int32]bool) jobInstanceIds := make([]int32, 0, len(tasks))
for _, task := range tasks { for _, task := range tasks {
jobInstanceIds[task.GetAssignedTask().GetInstanceId()] = true jobInstanceIds = append(jobInstanceIds, task.GetAssignedTask().GetInstanceId())
} }
return jobInstanceIds, nil return jobInstanceIds, nil
@ -483,7 +485,7 @@ func (r *RealisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue
r.logger.DebugPrintf("GetJobUpdateSummaries Thrift Payload: %+v\n", jobUpdateQuery) r.logger.DebugPrintf("GetJobUpdateSummaries Thrift Payload: %+v\n", jobUpdateQuery)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.readonlyClient.GetJobUpdateSummaries(jobUpdateQuery) return r.readonlyClient.GetJobUpdateSummaries(nil, jobUpdateQuery)
}) })
if retryErr != nil { if retryErr != nil {
@ -498,7 +500,7 @@ func (r *RealisClient) GetJobs(role string) (*aurora.GetJobsResult_, error) {
var result *aurora.GetJobsResult_ var result *aurora.GetJobsResult_
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.readonlyClient.GetJobs(role) return r.readonlyClient.GetJobs(nil, role)
}) })
if retryErr != nil { if retryErr != nil {
@ -517,14 +519,8 @@ func (r *RealisClient) GetJobs(role string) (*aurora.GetJobsResult_, error) {
func (r *RealisClient) KillInstances(key *aurora.JobKey, instances ...int32) (bool, error) { func (r *RealisClient) KillInstances(key *aurora.JobKey, instances ...int32) (bool, error) {
r.logger.DebugPrintf("KillTasks Thrift Payload: %+v %v\n", key, instances) r.logger.DebugPrintf("KillTasks Thrift Payload: %+v %v\n", key, instances)
instanceIds := make(map[int32]bool)
for _, instId := range instances {
instanceIds[instId] = true
}
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.KillTasks(key, instanceIds, "") return r.client.KillTasks(nil, key, instances, "")
}) })
if retryErr != nil { if retryErr != nil {
@ -551,7 +547,7 @@ func (r *RealisClient) KillJob(key *aurora.JobKey) error {
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
// Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards // Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards
return r.client.KillTasks(key, nil, "") return r.client.KillTasks(nil, key, nil, "")
}) })
if retryErr != nil { if retryErr != nil {
@ -569,7 +565,7 @@ func (r *RealisClient) CreateJob(auroraJob *AuroraJob) error {
r.logger.DebugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig()) r.logger.DebugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig())
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.CreateJob(auroraJob.JobConfig()) return r.client.CreateJob(nil, auroraJob.JobConfig())
}) })
if retryErr != nil { if retryErr != nil {
@ -601,7 +597,7 @@ func (r *RealisClient) ScheduleCronJob(auroraJob *AuroraJob) error {
r.logger.DebugPrintf("ScheduleCronJob Thrift Payload: %+v\n", auroraJob.JobConfig()) r.logger.DebugPrintf("ScheduleCronJob Thrift Payload: %+v\n", auroraJob.JobConfig())
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.ScheduleCronJob(auroraJob.JobConfig()) return r.client.ScheduleCronJob(nil, auroraJob.JobConfig())
}) })
if retryErr != nil { if retryErr != nil {
@ -615,7 +611,7 @@ func (r *RealisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response,
r.logger.DebugPrintf("DescheduleCronJob Thrift Payload: %+v\n", key) r.logger.DebugPrintf("DescheduleCronJob Thrift Payload: %+v\n", key)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.DescheduleCronJob(key) return r.client.DescheduleCronJob(nil, key)
}) })
if retryErr != nil { if retryErr != nil {
@ -631,7 +627,7 @@ func (r *RealisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error
r.logger.DebugPrintf("StartCronJob Thrift Payload: %+v\n", key) r.logger.DebugPrintf("StartCronJob Thrift Payload: %+v\n", key)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.StartCronJob(key) return r.client.StartCronJob(nil, key)
}) })
if retryErr != nil { if retryErr != nil {
@ -645,14 +641,8 @@ func (r *RealisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error
func (r *RealisClient) RestartInstances(key *aurora.JobKey, instances ...int32) error { func (r *RealisClient) RestartInstances(key *aurora.JobKey, instances ...int32) error {
r.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instances) r.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instances)
instanceIds := make(map[int32]bool)
for _, instId := range instances {
instanceIds[instId] = true
}
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.RestartShards(key, instanceIds) return r.client.RestartShards(nil, key, instances)
}) })
if retryErr != nil { if retryErr != nil {
@ -673,7 +663,7 @@ func (r *RealisClient) RestartJob(key *aurora.JobKey) error {
if len(instanceIds) > 0 { if len(instanceIds) > 0 {
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.RestartShards(key, instanceIds) return r.client.RestartShards(nil, key, instanceIds)
}) })
if retryErr != nil { if retryErr != nil {
@ -692,7 +682,7 @@ func (r *RealisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au
r.logger.DebugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message) r.logger.DebugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.StartJobUpdate(updateJob.req, message) return r.client.StartJobUpdate(nil, updateJob.req, message)
}) })
if retryErr != nil { if retryErr != nil {
@ -707,7 +697,7 @@ func (r *RealisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str
r.logger.DebugPrintf("AbortJobUpdate Thrift Payload: %+v %v\n", updateKey, message) r.logger.DebugPrintf("AbortJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.AbortJobUpdate(&updateKey, message) return r.client.AbortJobUpdate(nil, &updateKey, message)
}) })
if retryErr != nil { if retryErr != nil {
@ -722,7 +712,7 @@ func (r *RealisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st
r.logger.DebugPrintf("PauseJobUpdate Thrift Payload: %+v %v\n", updateKey, message) r.logger.DebugPrintf("PauseJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.PauseJobUpdate(updateKey, message) return r.client.PauseJobUpdate(nil, updateKey, message)
}) })
if retryErr != nil { if retryErr != nil {
@ -738,7 +728,7 @@ func (r *RealisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s
r.logger.DebugPrintf("ResumeJobUpdate Thrift Payload: %+v %v\n", updateKey, message) r.logger.DebugPrintf("ResumeJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.ResumeJobUpdate(updateKey, message) return r.client.ResumeJobUpdate(nil, updateKey, message)
}) })
if retryErr != nil { if retryErr != nil {
@ -754,7 +744,7 @@ func (r *RealisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.P
r.logger.DebugPrintf("PulseJobUpdate Thrift Payload: %+v\n", updateKey) r.logger.DebugPrintf("PulseJobUpdate Thrift Payload: %+v\n", updateKey)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.PulseJobUpdate(updateKey) return r.client.PulseJobUpdate(nil, updateKey)
}) })
if retryErr != nil { if retryErr != nil {
@ -771,7 +761,7 @@ func (r *RealisClient) AddInstances(instKey aurora.InstanceKey, count int32) err
r.logger.DebugPrintf("AddInstances Thrift Payload: %+v %v\n", instKey, count) r.logger.DebugPrintf("AddInstances Thrift Payload: %+v %v\n", instKey, count)
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.AddInstances(&instKey, count) return r.client.AddInstances(nil, &instKey, count)
}) })
if retryErr != nil { if retryErr != nil {
@ -782,25 +772,26 @@ func (r *RealisClient) AddInstances(instKey aurora.InstanceKey, count int32) err
} }
// Scale down the number of instances under a job configuration using the configuration of a specific instance // Scale down the number of instances under a job configuration using the configuration of a specific instance
func (r *RealisClient) RemoveInstances(key *aurora.JobKey, count int32) error { // Instances with a higher instance ID will be removed first. For example, if our instance ID list is [0,1,2,3]
// and we want to remove 2 instances, 2 and 3 will always be picked.
func (r *RealisClient) RemoveInstances(key *aurora.JobKey, count int) error {
instanceIds, err := r.GetInstanceIds(key, aurora.ACTIVE_STATES) instanceIds, err := r.GetInstanceIds(key, aurora.ACTIVE_STATES)
if err != nil { if err != nil {
return errors.Wrap(err, "RemoveInstances: Could not retrieve relevant instance IDs") return errors.Wrap(err, "RemoveInstances: Could not retrieve relevant instance IDs")
} }
if len(instanceIds) < int(count) { if len(instanceIds) < count {
return errors.Errorf("Insufficient acative available for killing "+ return errors.Errorf("Insufficient active instances available for killing: "+
" Instances to be killed %d Active instances %d", count, len(instanceIds)) " Instances to be killed %d Active instances %d", count, len(instanceIds))
} }
instanceList := make([]int32, count)
i := 0 // Sort instanceIds in decreasing order
for k := range instanceIds { sort.Slice(instanceIds, func(i, j int) bool {
instanceList[i] = k return instanceIds[i] > instanceIds[j]
i += 1 })
if i == int(count) {
break // Get the last count instance ids to kill
} instanceIds = instanceIds[:count]
} killed, err := r.KillInstances(key, instanceIds...)
killed, err := r.KillInstances(key, instanceList...)
if !killed { if !killed {
return errors.New("Flex down was not able to reduce the number of instances running.") return errors.New("Flex down was not able to reduce the number of instances running.")
@ -815,7 +806,7 @@ func (r *RealisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.Schedul
r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", query) r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", query)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.GetTasksStatus(query) return r.client.GetTasksStatus(nil, query)
}) })
if retryErr != nil { if retryErr != nil {
@ -831,24 +822,20 @@ func (r *RealisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.Pend
r.logger.DebugPrintf("GetPendingReason Thrift Payload: %+v\n", query) r.logger.DebugPrintf("GetPendingReason Thrift Payload: %+v\n", query)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.GetPendingReason(query) return r.client.GetPendingReason(nil, query)
}) })
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "Error querying Aurora Scheduler for pending Reasons") return nil, errors.Wrap(retryErr, "Error querying Aurora Scheduler for pending Reasons")
} }
var result map[*aurora.PendingReason]bool var result []*aurora.PendingReason
if resp.GetResult_() != nil { if resp.GetResult_() != nil {
result = resp.GetResult_().GetGetPendingReasonResult_().GetReasons() result = resp.GetResult_().GetGetPendingReasonResult_().GetReasons()
} }
var pendingReasons []*aurora.PendingReason return result, nil
for reason := range result {
pendingReasons = append(pendingReasons, reason)
}
return pendingReasons, nil
} }
// Get information about task including without a task configuration object // Get information about task including without a task configuration object
@ -857,7 +844,7 @@ func (r *RealisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*auror
r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query) r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.GetTasksWithoutConfigs(query) return r.client.GetTasksWithoutConfigs(nil, query)
}) })
if retryErr != nil { if retryErr != nil {
@ -871,9 +858,8 @@ func (r *RealisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*auror
// Get the task configuration from the aurora scheduler for a job // Get the task configuration from the aurora scheduler for a job
func (r *RealisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error) { func (r *RealisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error) {
ids := make(map[int32]bool) ids := []int32{instKey.GetInstanceId()}
ids[instKey.InstanceId] = true
taskQ := &aurora.TaskQuery{ taskQ := &aurora.TaskQuery{
Role: &instKey.JobKey.Role, Role: &instKey.JobKey.Role,
Environment: &instKey.JobKey.Environment, Environment: &instKey.JobKey.Environment,
@ -885,7 +871,7 @@ func (r *RealisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", taskQ) r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", taskQ)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.GetTasksStatus(taskQ) return r.client.GetTasksStatus(nil, taskQ)
}) })
if retryErr != nil { if retryErr != nil {
@ -911,7 +897,7 @@ func (r *RealisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur
r.logger.DebugPrintf("GetJobUpdateDetails Thrift Payload: %+v\n", updateQuery) r.logger.DebugPrintf("GetJobUpdateDetails Thrift Payload: %+v\n", updateQuery)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.GetJobUpdateDetails(&updateQuery) return r.client.GetJobUpdateDetails(nil, &updateQuery)
}) })
if retryErr != nil { if retryErr != nil {
@ -925,7 +911,7 @@ func (r *RealisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string
r.logger.DebugPrintf("RollbackJobUpdate Thrift Payload: %+v %v\n", key, message) r.logger.DebugPrintf("RollbackJobUpdate Thrift Payload: %+v %v\n", key, message)
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.RollbackJobUpdate(&key, message) return r.client.RollbackJobUpdate(nil, &key, message)
}) })
if retryErr != nil { if retryErr != nil {
@ -950,15 +936,12 @@ func (r *RealisClient) DrainHosts(hosts ...string) (*aurora.DrainHostsResult_, e
} }
drainList := aurora.NewHosts() drainList := aurora.NewHosts()
drainList.HostNames = make(map[string]bool) drainList.HostNames = hosts
for _, host := range hosts {
drainList.HostNames[host] = true
}
r.logger.DebugPrintf("DrainHosts Thrift Payload: %v\n", drainList) r.logger.DebugPrintf("DrainHosts Thrift Payload: %v\n", drainList)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.DrainHosts(drainList) return r.adminClient.DrainHosts(nil, drainList)
}) })
if retryErr != nil { if retryErr != nil {
@ -983,15 +966,12 @@ func (r *RealisClient) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, ho
} }
drainList := aurora.NewHosts() drainList := aurora.NewHosts()
drainList.HostNames = make(map[string]bool) drainList.HostNames = hosts
for _, host := range hosts {
drainList.HostNames[host] = true
}
r.logger.DebugPrintf("SLADrainHosts Thrift Payload: %v\n", drainList) r.logger.DebugPrintf("SLADrainHosts Thrift Payload: %v\n", drainList)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.SlaDrainHosts(drainList, policy, timeout) return r.adminClient.SlaDrainHosts(nil, drainList, policy, timeout)
}) })
if retryErr != nil { if retryErr != nil {
@ -1014,15 +994,12 @@ func (r *RealisClient) StartMaintenance(hosts ...string) (*aurora.StartMaintenan
} }
hostList := aurora.NewHosts() hostList := aurora.NewHosts()
hostList.HostNames = make(map[string]bool) hostList.HostNames = hosts
for _, host := range hosts {
hostList.HostNames[host] = true
}
r.logger.DebugPrintf("StartMaintenance Thrift Payload: %v\n", hostList) r.logger.DebugPrintf("StartMaintenance Thrift Payload: %v\n", hostList)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.StartMaintenance(hostList) return r.adminClient.StartMaintenance(nil, hostList)
}) })
if retryErr != nil { if retryErr != nil {
@ -1045,15 +1022,12 @@ func (r *RealisClient) EndMaintenance(hosts ...string) (*aurora.EndMaintenanceRe
} }
hostList := aurora.NewHosts() hostList := aurora.NewHosts()
hostList.HostNames = make(map[string]bool) hostList.HostNames = hosts
for _, host := range hosts {
hostList.HostNames[host] = true
}
r.logger.DebugPrintf("EndMaintenance Thrift Payload: %v\n", hostList) r.logger.DebugPrintf("EndMaintenance Thrift Payload: %v\n", hostList)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.EndMaintenance(hostList) return r.adminClient.EndMaintenance(nil, hostList)
}) })
if retryErr != nil { if retryErr != nil {
@ -1076,17 +1050,14 @@ func (r *RealisClient) MaintenanceStatus(hosts ...string) (*aurora.MaintenanceSt
} }
hostList := aurora.NewHosts() hostList := aurora.NewHosts()
hostList.HostNames = make(map[string]bool) hostList.HostNames = hosts
for _, host := range hosts {
hostList.HostNames[host] = true
}
r.logger.DebugPrintf("MaintenanceStatus Thrift Payload: %v\n", hostList) r.logger.DebugPrintf("MaintenanceStatus Thrift Payload: %v\n", hostList)
// Make thrift call. If we encounter an error sending the call, attempt to reconnect // Make thrift call. If we encounter an error sending the call, attempt to reconnect
// and continue trying to resend command until we run out of retries. // and continue trying to resend command until we run out of retries.
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.MaintenanceStatus(hostList) return r.adminClient.MaintenanceStatus(nil, hostList)
}) })
if retryErr != nil { if retryErr != nil {
@ -1103,19 +1074,18 @@ func (r *RealisClient) MaintenanceStatus(hosts ...string) (*aurora.MaintenanceSt
// SetQuota sets a quota aggregate for the given role // SetQuota sets a quota aggregate for the given role
// TODO(zircote) Currently investigating an error that is returned from thrift calls that include resources for `NamedPort` and `NumGpu` // TODO(zircote) Currently investigating an error that is returned from thrift calls that include resources for `NamedPort` and `NumGpu`
func (r *RealisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb *int64) error { func (r *RealisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb *int64) error {
ram := aurora.NewResource() ramResource := aurora.NewResource()
ram.RamMb = ramMb ramResource.RamMb = ramMb
c := aurora.NewResource() cpuResource := aurora.NewResource()
c.NumCpus = cpu cpuResource.NumCpus = cpu
d := aurora.NewResource() diskResource := aurora.NewResource()
d.DiskMb = diskMb diskResource.DiskMb = diskMb
quota := aurora.NewResourceAggregate() quota := aurora.NewResourceAggregate()
quota.Resources = make(map[*aurora.Resource]bool) quota.Resources = []*aurora.Resource{ramResource, cpuResource, diskResource}
quota.Resources[ram] = true
quota.Resources[c] = true
quota.Resources[d] = true
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.SetQuota(role, quota) return r.adminClient.SetQuota(nil, role, quota)
}) })
if retryErr != nil { if retryErr != nil {
@ -1129,7 +1099,7 @@ func (r *RealisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb
func (r *RealisClient) GetQuota(role string) (*aurora.GetQuotaResult_, error) { func (r *RealisClient) GetQuota(role string) (*aurora.GetQuotaResult_, error) {
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.GetQuota(role) return r.adminClient.GetQuota(nil, role)
}) })
if retryErr != nil { if retryErr != nil {
@ -1142,7 +1112,7 @@ func (r *RealisClient) GetQuota(role string) (*aurora.GetQuotaResult_, error) {
func (r *RealisClient) Snapshot() error { func (r *RealisClient) Snapshot() error {
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.Snapshot() return r.adminClient.Snapshot(nil)
}) })
if retryErr != nil { if retryErr != nil {
@ -1156,7 +1126,7 @@ func (r *RealisClient) Snapshot() error {
func (r *RealisClient) PerformBackup() error { func (r *RealisClient) PerformBackup() error {
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.PerformBackup() return r.adminClient.PerformBackup(nil)
}) })
if retryErr != nil { if retryErr != nil {
@ -1170,7 +1140,7 @@ func (r *RealisClient) PerformBackup() error {
func (r *RealisClient) ForceImplicitTaskReconciliation() error { func (r *RealisClient) ForceImplicitTaskReconciliation() error {
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.TriggerImplicitTaskReconciliation() return r.adminClient.TriggerImplicitTaskReconciliation(nil)
}) })
if retryErr != nil { if retryErr != nil {
@ -1191,7 +1161,7 @@ func (r *RealisClient) ForceExplicitTaskReconciliation(batchSize *int32) error {
settings.BatchSize = batchSize settings.BatchSize = batchSize
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.TriggerExplicitTaskReconciliation(settings) return r.adminClient.TriggerExplicitTaskReconciliation(nil, settings)
}) })
if retryErr != nil { if retryErr != nil {

View file

@ -35,7 +35,7 @@ func NewDefaultUpdateJob(config *aurora.TaskConfig) *UpdateJob {
job.jobConfig.TaskConfig = config job.jobConfig.TaskConfig = config
// Rebuild resource map from TaskConfig // Rebuild resource map from TaskConfig
for ptr := range config.Resources { for _, ptr := range config.Resources {
if ptr.NumCpus != nil { if ptr.NumCpus != nil {
job.resources["cpu"].NumCpus = ptr.NumCpus job.resources["cpu"].NumCpus = ptr.NumCpus
continue // Guard against Union violations that Go won't enforce continue // Guard against Union violations that Go won't enforce
@ -53,7 +53,7 @@ func NewDefaultUpdateJob(config *aurora.TaskConfig) *UpdateJob {
} }
// Mirrors defaults set by Pystachio // Mirrors defaults set by Pystachio
req.Settings.UpdateOnlyTheseInstances = make(map[*aurora.Range]bool) req.Settings.UpdateOnlyTheseInstances = []*aurora.Range{}
req.Settings.UpdateGroupSize = 1 req.Settings.UpdateGroupSize = 1
req.Settings.WaitForBatchCompletion = false req.Settings.WaitForBatchCompletion = false
req.Settings.MinWaitInInstanceRunningMs = 45000 req.Settings.MinWaitInInstanceRunningMs = 45000
@ -75,7 +75,7 @@ func NewUpdateJob(config *aurora.TaskConfig, settings *aurora.JobUpdateSettings)
job.jobConfig.TaskConfig = config job.jobConfig.TaskConfig = config
// Rebuild resource map from TaskConfig // Rebuild resource map from TaskConfig
for ptr := range config.Resources { for _, ptr := range config.Resources {
if ptr.NumCpus != nil { if ptr.NumCpus != nil {
job.resources["cpu"].NumCpus = ptr.NumCpus job.resources["cpu"].NumCpus = ptr.NumCpus
continue // Guard against Union violations that Go won't enforce continue // Guard against Union violations that Go won't enforce
@ -142,7 +142,7 @@ func NewUpdateSettings() *aurora.JobUpdateSettings {
us := aurora.JobUpdateSettings{} us := aurora.JobUpdateSettings{}
// Mirrors defaults set by Pystachio // Mirrors defaults set by Pystachio
us.UpdateOnlyTheseInstances = make(map[*aurora.Range]bool) us.UpdateOnlyTheseInstances = []*aurora.Range{}
us.UpdateGroupSize = 1 us.UpdateGroupSize = 1
us.WaitForBatchCompletion = false us.WaitForBatchCompletion = false
us.MinWaitInInstanceRunningMs = 45000 us.MinWaitInInstanceRunningMs = 45000