diff --git a/examples/client.go b/examples/client.go index 89f8213..6529fcf 100644 --- a/examples/client.go +++ b/examples/client.go @@ -294,47 +294,44 @@ func main() { log.Fatal(err) } - fmt.Println("Number of live instances: ", len(live)) + fmt.Println("Active instances: ", live) case "flexUp": fmt.Println("Flexing up job") - numOfInstances := int32(4) + numOfInstances := 4 live, err := r.GetInstanceIds(job.JobKey(), aurora.ACTIVE_STATES) if err != nil { log.Fatal(err) } - currInstances := int32(len(live)) + currInstances := len(live) fmt.Println("Current num of instances: ", currInstances) - var instId int32 - for k := range live { - instId = k - } + err = r.AddInstances(aurora.InstanceKey{ JobKey: job.JobKey(), - InstanceId: instId, + InstanceId: live[0], }, - numOfInstances) + int32(numOfInstances)) if err != nil { log.Fatal(err) } - if ok, err := monitor.Instances(job.JobKey(), currInstances+numOfInstances, 5*time.Second, 50*time.Second); !ok || err != nil { + if ok, err := monitor.Instances(job.JobKey(), int32(currInstances+numOfInstances), 5*time.Second, 50*time.Second); !ok || err != nil { fmt.Println("Flexing up failed") } case "flexDown": fmt.Println("Flexing down job") - numOfInstances := int32(2) + numOfInstances := 2 live, err := r.GetInstanceIds(job.JobKey(), aurora.ACTIVE_STATES) if err != nil { log.Fatal(err) } - currInstances := int32(len(live)) + currInstances := len(live) fmt.Println("Current num of instances: ", currInstances) err = r.RemoveInstances(job.JobKey(), numOfInstances) @@ -342,7 +339,7 @@ func main() { log.Fatal(err) } - if ok, err := monitor.Instances(job.JobKey(), currInstances-numOfInstances, 5*time.Second, 100*time.Second); !ok || err != nil { + if ok, err := monitor.Instances(job.JobKey(), int32(currInstances-numOfInstances), 5*time.Second, 100*time.Second); !ok || err != nil { fmt.Println("flexDown failed") } @@ -352,13 +349,9 @@ func main() { if err != nil { log.Fatal(err) } - var instId int32 - for k := range live { - instId = k - } taskConfig, err := r.FetchTaskConfig(aurora.InstanceKey{ JobKey: job.JobKey(), - InstanceId: instId, + InstanceId: live[0], }) if err != nil { log.Fatal(err) @@ -451,14 +444,9 @@ func main() { log.Fatal(err) } - var instId int32 - for k := range live { - instId = k - break - } config, err := r.FetchTaskConfig(aurora.InstanceKey{ JobKey: job.JobKey(), - InstanceId: instId, + InstanceId: live[0], }) if err != nil { diff --git a/job.go b/job.go index 44beb83..5cb22be 100644 --- a/job.go +++ b/job.go @@ -41,28 +41,25 @@ func NewJob() *AuroraJob { taskConfig.Job = jobKey taskConfig.Container = aurora.NewContainer() taskConfig.Container.Mesos = aurora.NewMesosContainer() - taskConfig.MesosFetcherUris = make(map[*aurora.MesosFetcherURI]bool) - taskConfig.Metadata = make(map[*aurora.Metadata]bool) - taskConfig.Constraints = make(map[*aurora.Constraint]bool) + taskConfig.MesosFetcherUris = make([]*aurora.MesosFetcherURI, 0) + taskConfig.Metadata = make([]*aurora.Metadata, 0) + taskConfig.Constraints = make([]*aurora.Constraint, 0) // Resources numCpus := aurora.NewResource() ramMb := aurora.NewResource() diskMb := aurora.NewResource() + numCpus.NumCpus = new(float64) + ramMb.RamMb = new(int64) + diskMb.DiskMb = new(int64) + resources := make(map[string]*aurora.Resource) resources["cpu"] = numCpus resources["ram"] = ramMb resources["disk"] = diskMb - taskConfig.Resources = make(map[*aurora.Resource]bool) - taskConfig.Resources[numCpus] = true - taskConfig.Resources[ramMb] = true - taskConfig.Resources[diskMb] = true - - numCpus.NumCpus = new(float64) - ramMb.RamMb = new(int64) - diskMb.DiskMb = new(int64) + taskConfig.Resources = []*aurora.Resource{numCpus, ramMb, diskMb} return &AuroraJob{ jobConfig: jobConfig, @@ -185,11 +182,9 @@ func (j *AuroraJob) TaskConfig() *aurora.TaskConfig { // --enable_mesos_fetcher flag enabled. Currently there is no duplicate detection. func (j *AuroraJob) AddURIs(extract bool, cache bool, values ...string) *AuroraJob { for _, value := range values { - j.jobConfig.TaskConfig.MesosFetcherUris[&aurora.MesosFetcherURI{ - Value: value, - Extract: &extract, - Cache: &cache, - }] = true + j.jobConfig.TaskConfig.MesosFetcherUris = append( + j.jobConfig.TaskConfig.MesosFetcherUris, + &aurora.MesosFetcherURI{Value: value, Extract: &extract, Cache: &cache}) } return j } @@ -197,7 +192,7 @@ func (j *AuroraJob) AddURIs(extract bool, cache bool, values ...string) *AuroraJ // Adds a Mesos label to the job. Note that Aurora will add the // prefix "org.apache.aurora.metadata." to the beginning of each key. func (j *AuroraJob) AddLabel(key string, value string) *AuroraJob { - j.jobConfig.TaskConfig.Metadata[&aurora.Metadata{Key: key, Value: value}] = true + j.jobConfig.TaskConfig.Metadata = append(j.jobConfig.TaskConfig.Metadata, &aurora.Metadata{Key: key, Value: value}) return j } @@ -206,7 +201,7 @@ func (j *AuroraJob) AddLabel(key string, value string) *AuroraJob { func (j *AuroraJob) AddNamedPorts(names ...string) *AuroraJob { j.portCount += len(names) for _, name := range names { - j.jobConfig.TaskConfig.Resources[&aurora.Resource{NamedPort: &name}] = true + j.jobConfig.TaskConfig.Resources = append(j.jobConfig.TaskConfig.Resources, &aurora.Resource{NamedPort: &name}) } return j @@ -221,7 +216,7 @@ func (j *AuroraJob) AddPorts(num int) *AuroraJob { j.portCount += num for i := start; i < j.portCount; i++ { portName := "org.apache.aurora.port." + strconv.Itoa(i) - j.jobConfig.TaskConfig.Resources[&aurora.Resource{NamedPort: &portName}] = true + j.jobConfig.TaskConfig.Resources = append(j.jobConfig.TaskConfig.Resources, &aurora.Resource{NamedPort: &portName}) } return j @@ -233,20 +228,17 @@ func (j *AuroraJob) AddPorts(num int) *AuroraJob { // If negated = true , treat this as a 'not' - to avoid specific values. // Values - list of values we look for in attribute name func (j *AuroraJob) AddValueConstraint(name string, negated bool, values ...string) *AuroraJob { - constraintValues := make(map[string]bool) - for _, value := range values { - constraintValues[value] = true - } - j.jobConfig.TaskConfig.Constraints[&aurora.Constraint{ - Name: name, - Constraint: &aurora.TaskConstraint{ - Value: &aurora.ValueConstraint{ - Negated: negated, - Values: constraintValues, + j.jobConfig.TaskConfig.Constraints = append(j.jobConfig.TaskConfig.Constraints, + &aurora.Constraint{ + Name: name, + Constraint: &aurora.TaskConstraint{ + Value: &aurora.ValueConstraint{ + Negated: negated, + Values: values, + }, + Limit: nil, }, - Limit: nil, - }, - }] = true + }) return j } @@ -255,13 +247,14 @@ func (j *AuroraJob) AddValueConstraint(name string, negated bool, values ...stri // A constraint that specifies the maximum number of active tasks on a host with // a matching attribute that may be scheduled simultaneously. func (j *AuroraJob) AddLimitConstraint(name string, limit int32) *AuroraJob { - j.jobConfig.TaskConfig.Constraints[&aurora.Constraint{ - Name: name, - Constraint: &aurora.TaskConstraint{ - Value: nil, - Limit: &aurora.LimitConstraint{Limit: limit}, - }, - }] = true + j.jobConfig.TaskConfig.Constraints = append(j.jobConfig.TaskConfig.Constraints, + &aurora.Constraint{ + Name: name, + Constraint: &aurora.TaskConstraint{ + Value: nil, + Limit: &aurora.LimitConstraint{Limit: limit}, + }, + }) return j } diff --git a/monitors.go b/monitors.go index a62cdff..e8d4380 100644 --- a/monitors.go +++ b/monitors.go @@ -59,9 +59,16 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval, timeout tim m.Client.RealisConfig().logger.Println("No update found") return false, errors.New("No update found for " + updateKey.String()) } + status := updateDetail.GetDetailsList()[0].Update.Summary.State.Status - if _, ok := aurora.ACTIVE_JOB_UPDATE_STATES[status]; !ok { + // Convert Thrift Set to Golang map for quick lookup + activeStatus := map[aurora.JobUpdateStatus]bool{} + for _, stat := range aurora.ACTIVE_JOB_UPDATE_STATES { + activeStatus[stat] = true + } + + if _, ok := activeStatus[status]; !ok { // Rolled forward is the only state in which an update has been successfully updated // if we encounter an inactive state and it is not at rolled forward, update failed @@ -93,7 +100,7 @@ func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval, timeo // Monitor a AuroraJob until all instances enter a desired status. // Defaults sets of desired statuses provided by the thrift API include: // ACTIVE_STATES, SLAVE_ASSIGNED_STATES, LIVE_STATES, and TERMINAL_STATES -func (m *Monitor) ScheduleStatus(key *aurora.JobKey, instanceCount int32, desiredStatuses map[aurora.ScheduleStatus]bool, interval, timeout time.Duration) (bool, error) { +func (m *Monitor) ScheduleStatus(key *aurora.JobKey, instanceCount int32, desiredStatuses []aurora.ScheduleStatus, interval, timeout time.Duration) (bool, error) { if interval < 1*time.Second || timeout < 1*time.Second { return false, errors.New("Interval or timeout cannot be below one second.") } @@ -164,7 +171,7 @@ func (m *Monitor) HostMaintenance(hosts []string, modes []aurora.MaintenanceMode return hostResult, errors.Wrap(err, "client error in monitor") } - for status := range result.GetStatuses() { + for _, status := range result.GetStatuses() { if _, ok := desiredMode[status.GetMode()]; ok { hostResult[status.GetHost()] = true diff --git a/realis.go b/realis.go index 50d4677..6aeb155 100644 --- a/realis.go +++ b/realis.go @@ -26,6 +26,7 @@ import ( "net/http/cookiejar" "os" "path/filepath" + "sort" "strings" "sync" "time" @@ -46,6 +47,7 @@ type RealisClient struct { logger LevelLogger lock *sync.Mutex debug bool + transport thrift.TTransport } type RealisConfig struct { @@ -234,7 +236,7 @@ func NewRealisClient(options ...ClientOption) (*RealisClient, error) { config.logger.DebugPrintln("Number of options applied to config: ", len(options)) - //Set default Transport to JSON if needed. + // Set default Transport to JSON if needed. if !config.jsonTransport && !config.binTransport { config.jsonTransport = true } @@ -296,7 +298,9 @@ func NewRealisClient(options ...ClientOption) (*RealisClient, error) { readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory), adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory), logger: LevelLogger{config.logger, config.debug}, - lock: &sync.Mutex{}}, nil + lock: &sync.Mutex{}, + transport: config.transport, + }, nil } func GetCerts(certPath string) (*x509.CertPool, error) { @@ -355,7 +359,7 @@ func defaultTTransport(url string, timeout time.Duration, config *RealisConfig) transport.TLSClientConfig = tlsConfig } - trans, err := thrift.NewTHttpPostClientWithOptions(url+"/api", + trans, err := thrift.NewTHttpClientWithOptions(url+"/api", thrift.THttpClientOptions{Client: &http.Client{Timeout: timeout, Transport: &transport, Jar: jar}}) if err != nil { @@ -379,7 +383,7 @@ func newDefaultConfig(url string, timeout time.Duration, config *RealisConfig) ( func newTJSONConfig(url string, timeout time.Duration, config *RealisConfig) (*RealisConfig, error) { trans, err := defaultTTransport(url, timeout, config) if err != nil { - return &RealisConfig{}, errors.Wrap(err, "Error creating realis config") + return nil, errors.Wrap(err, "Error creating realis config") } httpTrans := (trans).(*thrift.THttpClient) @@ -393,7 +397,7 @@ func newTJSONConfig(url string, timeout time.Duration, config *RealisConfig) (*R func newTBinaryConfig(url string, timeout time.Duration, config *RealisConfig) (*RealisConfig, error) { trans, err := defaultTTransport(url, timeout, config) if err != nil { - return &RealisConfig{}, errors.Wrap(err, "Error creating realis config") + return nil, errors.Wrap(err, "Error creating realis config") } httpTrans := (trans).(*thrift.THttpClient) @@ -444,13 +448,11 @@ func (r *RealisClient) Close() { r.lock.Lock() defer r.lock.Unlock() - r.client.Transport.Close() - r.readonlyClient.Transport.Close() - r.adminClient.Transport.Close() + r.transport.Close() } // Uses predefined set of states to retrieve a set of active jobs in Apache Aurora. -func (r *RealisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.ScheduleStatus]bool) (map[int32]bool, error) { +func (r *RealisClient) GetInstanceIds(key *aurora.JobKey, states []aurora.ScheduleStatus) ([]int32, error) { taskQ := &aurora.TaskQuery{ Role: &key.Role, Environment: &key.Environment, @@ -461,7 +463,7 @@ func (r *RealisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", taskQ) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.GetTasksWithoutConfigs(taskQ) + return r.client.GetTasksWithoutConfigs(nil, taskQ) }) // If we encountered an error we couldn't recover from by retrying, return an error to the user @@ -471,9 +473,9 @@ func (r *RealisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche // Construct instance id map to stay in line with thrift's representation of sets tasks := response.ScheduleStatusResult(resp).GetTasks() - jobInstanceIds := make(map[int32]bool) + jobInstanceIds := make([]int32, 0, len(tasks)) for _, task := range tasks { - jobInstanceIds[task.GetAssignedTask().GetInstanceId()] = true + jobInstanceIds = append(jobInstanceIds, task.GetAssignedTask().GetInstanceId()) } return jobInstanceIds, nil @@ -483,7 +485,7 @@ func (r *RealisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue r.logger.DebugPrintf("GetJobUpdateSummaries Thrift Payload: %+v\n", jobUpdateQuery) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.readonlyClient.GetJobUpdateSummaries(jobUpdateQuery) + return r.readonlyClient.GetJobUpdateSummaries(nil, jobUpdateQuery) }) if retryErr != nil { @@ -498,7 +500,7 @@ func (r *RealisClient) GetJobs(role string) (*aurora.GetJobsResult_, error) { var result *aurora.GetJobsResult_ resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.readonlyClient.GetJobs(role) + return r.readonlyClient.GetJobs(nil, role) }) if retryErr != nil { @@ -517,14 +519,8 @@ func (r *RealisClient) GetJobs(role string) (*aurora.GetJobsResult_, error) { func (r *RealisClient) KillInstances(key *aurora.JobKey, instances ...int32) (bool, error) { r.logger.DebugPrintf("KillTasks Thrift Payload: %+v %v\n", key, instances) - instanceIds := make(map[int32]bool) - - for _, instId := range instances { - instanceIds[instId] = true - } - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.KillTasks(key, instanceIds, "") + return r.client.KillTasks(nil, key, instances, "") }) if retryErr != nil { @@ -551,7 +547,7 @@ func (r *RealisClient) KillJob(key *aurora.JobKey) error { _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { // Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards - return r.client.KillTasks(key, nil, "") + return r.client.KillTasks(nil, key, nil, "") }) if retryErr != nil { @@ -569,7 +565,7 @@ func (r *RealisClient) CreateJob(auroraJob *AuroraJob) error { r.logger.DebugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig()) _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.CreateJob(auroraJob.JobConfig()) + return r.client.CreateJob(nil, auroraJob.JobConfig()) }) if retryErr != nil { @@ -601,7 +597,7 @@ func (r *RealisClient) ScheduleCronJob(auroraJob *AuroraJob) error { r.logger.DebugPrintf("ScheduleCronJob Thrift Payload: %+v\n", auroraJob.JobConfig()) _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.ScheduleCronJob(auroraJob.JobConfig()) + return r.client.ScheduleCronJob(nil, auroraJob.JobConfig()) }) if retryErr != nil { @@ -615,7 +611,7 @@ func (r *RealisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, r.logger.DebugPrintf("DescheduleCronJob Thrift Payload: %+v\n", key) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.DescheduleCronJob(key) + return r.client.DescheduleCronJob(nil, key) }) if retryErr != nil { @@ -631,7 +627,7 @@ func (r *RealisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error r.logger.DebugPrintf("StartCronJob Thrift Payload: %+v\n", key) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.StartCronJob(key) + return r.client.StartCronJob(nil, key) }) if retryErr != nil { @@ -645,14 +641,8 @@ func (r *RealisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error func (r *RealisClient) RestartInstances(key *aurora.JobKey, instances ...int32) error { r.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instances) - instanceIds := make(map[int32]bool) - - for _, instId := range instances { - instanceIds[instId] = true - } - _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.RestartShards(key, instanceIds) + return r.client.RestartShards(nil, key, instances) }) if retryErr != nil { @@ -673,7 +663,7 @@ func (r *RealisClient) RestartJob(key *aurora.JobKey) error { if len(instanceIds) > 0 { _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.RestartShards(key, instanceIds) + return r.client.RestartShards(nil, key, instanceIds) }) if retryErr != nil { @@ -692,7 +682,7 @@ func (r *RealisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au r.logger.DebugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.StartJobUpdate(updateJob.req, message) + return r.client.StartJobUpdate(nil, updateJob.req, message) }) if retryErr != nil { @@ -707,7 +697,7 @@ func (r *RealisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str r.logger.DebugPrintf("AbortJobUpdate Thrift Payload: %+v %v\n", updateKey, message) _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.AbortJobUpdate(&updateKey, message) + return r.client.AbortJobUpdate(nil, &updateKey, message) }) if retryErr != nil { @@ -722,7 +712,7 @@ func (r *RealisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st r.logger.DebugPrintf("PauseJobUpdate Thrift Payload: %+v %v\n", updateKey, message) _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.PauseJobUpdate(updateKey, message) + return r.client.PauseJobUpdate(nil, updateKey, message) }) if retryErr != nil { @@ -738,7 +728,7 @@ func (r *RealisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s r.logger.DebugPrintf("ResumeJobUpdate Thrift Payload: %+v %v\n", updateKey, message) _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.ResumeJobUpdate(updateKey, message) + return r.client.ResumeJobUpdate(nil, updateKey, message) }) if retryErr != nil { @@ -754,7 +744,7 @@ func (r *RealisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.P r.logger.DebugPrintf("PulseJobUpdate Thrift Payload: %+v\n", updateKey) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.PulseJobUpdate(updateKey) + return r.client.PulseJobUpdate(nil, updateKey) }) if retryErr != nil { @@ -771,7 +761,7 @@ func (r *RealisClient) AddInstances(instKey aurora.InstanceKey, count int32) err r.logger.DebugPrintf("AddInstances Thrift Payload: %+v %v\n", instKey, count) _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.AddInstances(&instKey, count) + return r.client.AddInstances(nil, &instKey, count) }) if retryErr != nil { @@ -782,25 +772,26 @@ func (r *RealisClient) AddInstances(instKey aurora.InstanceKey, count int32) err } // Scale down the number of instances under a job configuration using the configuration of a specific instance -func (r *RealisClient) RemoveInstances(key *aurora.JobKey, count int32) error { +// Instances with a higher instance ID will be removed first. For example, if our instance ID list is [0,1,2,3] +// and we want to remove 2 instances, 2 and 3 will always be picked. +func (r *RealisClient) RemoveInstances(key *aurora.JobKey, count int) error { instanceIds, err := r.GetInstanceIds(key, aurora.ACTIVE_STATES) if err != nil { return errors.Wrap(err, "RemoveInstances: Could not retrieve relevant instance IDs") } - if len(instanceIds) < int(count) { - return errors.Errorf("Insufficient acative available for killing "+ + if len(instanceIds) < count { + return errors.Errorf("Insufficient active instances available for killing: "+ " Instances to be killed %d Active instances %d", count, len(instanceIds)) } - instanceList := make([]int32, count) - i := 0 - for k := range instanceIds { - instanceList[i] = k - i += 1 - if i == int(count) { - break - } - } - killed, err := r.KillInstances(key, instanceList...) + + // Sort instanceIds in decreasing order + sort.Slice(instanceIds, func(i, j int) bool { + return instanceIds[i] > instanceIds[j] + }) + + // Get the last count instance ids to kill + instanceIds = instanceIds[:count] + killed, err := r.KillInstances(key, instanceIds...) if !killed { return errors.New("Flex down was not able to reduce the number of instances running.") @@ -815,7 +806,7 @@ func (r *RealisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.Schedul r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", query) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.GetTasksStatus(query) + return r.client.GetTasksStatus(nil, query) }) if retryErr != nil { @@ -831,24 +822,20 @@ func (r *RealisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.Pend r.logger.DebugPrintf("GetPendingReason Thrift Payload: %+v\n", query) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.GetPendingReason(query) + return r.client.GetPendingReason(nil, query) }) if retryErr != nil { return nil, errors.Wrap(retryErr, "Error querying Aurora Scheduler for pending Reasons") } - var result map[*aurora.PendingReason]bool + var result []*aurora.PendingReason if resp.GetResult_() != nil { result = resp.GetResult_().GetGetPendingReasonResult_().GetReasons() } - var pendingReasons []*aurora.PendingReason - for reason := range result { - pendingReasons = append(pendingReasons, reason) - } - return pendingReasons, nil + return result, nil } // Get information about task including without a task configuration object @@ -857,7 +844,7 @@ func (r *RealisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*auror r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.GetTasksWithoutConfigs(query) + return r.client.GetTasksWithoutConfigs(nil, query) }) if retryErr != nil { @@ -871,9 +858,8 @@ func (r *RealisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*auror // Get the task configuration from the aurora scheduler for a job func (r *RealisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error) { - ids := make(map[int32]bool) + ids := []int32{instKey.GetInstanceId()} - ids[instKey.InstanceId] = true taskQ := &aurora.TaskQuery{ Role: &instKey.JobKey.Role, Environment: &instKey.JobKey.Environment, @@ -885,7 +871,7 @@ func (r *RealisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", taskQ) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.GetTasksStatus(taskQ) + return r.client.GetTasksStatus(nil, taskQ) }) if retryErr != nil { @@ -911,7 +897,7 @@ func (r *RealisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur r.logger.DebugPrintf("GetJobUpdateDetails Thrift Payload: %+v\n", updateQuery) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.GetJobUpdateDetails(&updateQuery) + return r.client.GetJobUpdateDetails(nil, &updateQuery) }) if retryErr != nil { @@ -925,7 +911,7 @@ func (r *RealisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string r.logger.DebugPrintf("RollbackJobUpdate Thrift Payload: %+v %v\n", key, message) _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.RollbackJobUpdate(&key, message) + return r.client.RollbackJobUpdate(nil, &key, message) }) if retryErr != nil { @@ -950,15 +936,12 @@ func (r *RealisClient) DrainHosts(hosts ...string) (*aurora.DrainHostsResult_, e } drainList := aurora.NewHosts() - drainList.HostNames = make(map[string]bool) - for _, host := range hosts { - drainList.HostNames[host] = true - } + drainList.HostNames = hosts r.logger.DebugPrintf("DrainHosts Thrift Payload: %v\n", drainList) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.DrainHosts(drainList) + return r.adminClient.DrainHosts(nil, drainList) }) if retryErr != nil { @@ -983,15 +966,12 @@ func (r *RealisClient) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, ho } drainList := aurora.NewHosts() - drainList.HostNames = make(map[string]bool) - for _, host := range hosts { - drainList.HostNames[host] = true - } + drainList.HostNames = hosts r.logger.DebugPrintf("SLADrainHosts Thrift Payload: %v\n", drainList) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.SlaDrainHosts(drainList, policy, timeout) + return r.adminClient.SlaDrainHosts(nil, drainList, policy, timeout) }) if retryErr != nil { @@ -1014,15 +994,12 @@ func (r *RealisClient) StartMaintenance(hosts ...string) (*aurora.StartMaintenan } hostList := aurora.NewHosts() - hostList.HostNames = make(map[string]bool) - for _, host := range hosts { - hostList.HostNames[host] = true - } + hostList.HostNames = hosts r.logger.DebugPrintf("StartMaintenance Thrift Payload: %v\n", hostList) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.StartMaintenance(hostList) + return r.adminClient.StartMaintenance(nil, hostList) }) if retryErr != nil { @@ -1045,15 +1022,12 @@ func (r *RealisClient) EndMaintenance(hosts ...string) (*aurora.EndMaintenanceRe } hostList := aurora.NewHosts() - hostList.HostNames = make(map[string]bool) - for _, host := range hosts { - hostList.HostNames[host] = true - } + hostList.HostNames = hosts r.logger.DebugPrintf("EndMaintenance Thrift Payload: %v\n", hostList) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.EndMaintenance(hostList) + return r.adminClient.EndMaintenance(nil, hostList) }) if retryErr != nil { @@ -1076,17 +1050,14 @@ func (r *RealisClient) MaintenanceStatus(hosts ...string) (*aurora.MaintenanceSt } hostList := aurora.NewHosts() - hostList.HostNames = make(map[string]bool) - for _, host := range hosts { - hostList.HostNames[host] = true - } + hostList.HostNames = hosts r.logger.DebugPrintf("MaintenanceStatus Thrift Payload: %v\n", hostList) // Make thrift call. If we encounter an error sending the call, attempt to reconnect // and continue trying to resend command until we run out of retries. resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.MaintenanceStatus(hostList) + return r.adminClient.MaintenanceStatus(nil, hostList) }) if retryErr != nil { @@ -1103,19 +1074,18 @@ func (r *RealisClient) MaintenanceStatus(hosts ...string) (*aurora.MaintenanceSt // SetQuota sets a quota aggregate for the given role // TODO(zircote) Currently investigating an error that is returned from thrift calls that include resources for `NamedPort` and `NumGpu` func (r *RealisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb *int64) error { - ram := aurora.NewResource() - ram.RamMb = ramMb - c := aurora.NewResource() - c.NumCpus = cpu - d := aurora.NewResource() - d.DiskMb = diskMb + ramResource := aurora.NewResource() + ramResource.RamMb = ramMb + cpuResource := aurora.NewResource() + cpuResource.NumCpus = cpu + diskResource := aurora.NewResource() + diskResource.DiskMb = diskMb + quota := aurora.NewResourceAggregate() - quota.Resources = make(map[*aurora.Resource]bool) - quota.Resources[ram] = true - quota.Resources[c] = true - quota.Resources[d] = true + quota.Resources = []*aurora.Resource{ramResource, cpuResource, diskResource} + _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.SetQuota(role, quota) + return r.adminClient.SetQuota(nil, role, quota) }) if retryErr != nil { @@ -1129,7 +1099,7 @@ func (r *RealisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb func (r *RealisClient) GetQuota(role string) (*aurora.GetQuotaResult_, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.GetQuota(role) + return r.adminClient.GetQuota(nil, role) }) if retryErr != nil { @@ -1142,7 +1112,7 @@ func (r *RealisClient) GetQuota(role string) (*aurora.GetQuotaResult_, error) { func (r *RealisClient) Snapshot() error { _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.Snapshot() + return r.adminClient.Snapshot(nil) }) if retryErr != nil { @@ -1156,7 +1126,7 @@ func (r *RealisClient) Snapshot() error { func (r *RealisClient) PerformBackup() error { _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.PerformBackup() + return r.adminClient.PerformBackup(nil) }) if retryErr != nil { @@ -1170,7 +1140,7 @@ func (r *RealisClient) PerformBackup() error { func (r *RealisClient) ForceImplicitTaskReconciliation() error { _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.TriggerImplicitTaskReconciliation() + return r.adminClient.TriggerImplicitTaskReconciliation(nil) }) if retryErr != nil { @@ -1191,7 +1161,7 @@ func (r *RealisClient) ForceExplicitTaskReconciliation(batchSize *int32) error { settings.BatchSize = batchSize _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.TriggerExplicitTaskReconciliation(settings) + return r.adminClient.TriggerExplicitTaskReconciliation(nil, settings) }) if retryErr != nil { diff --git a/updatejob.go b/updatejob.go index af80bee..1b557d2 100644 --- a/updatejob.go +++ b/updatejob.go @@ -35,7 +35,7 @@ func NewDefaultUpdateJob(config *aurora.TaskConfig) *UpdateJob { job.jobConfig.TaskConfig = config // Rebuild resource map from TaskConfig - for ptr := range config.Resources { + for _, ptr := range config.Resources { if ptr.NumCpus != nil { job.resources["cpu"].NumCpus = ptr.NumCpus continue // Guard against Union violations that Go won't enforce @@ -53,7 +53,7 @@ func NewDefaultUpdateJob(config *aurora.TaskConfig) *UpdateJob { } // Mirrors defaults set by Pystachio - req.Settings.UpdateOnlyTheseInstances = make(map[*aurora.Range]bool) + req.Settings.UpdateOnlyTheseInstances = []*aurora.Range{} req.Settings.UpdateGroupSize = 1 req.Settings.WaitForBatchCompletion = false req.Settings.MinWaitInInstanceRunningMs = 45000 @@ -75,7 +75,7 @@ func NewUpdateJob(config *aurora.TaskConfig, settings *aurora.JobUpdateSettings) job.jobConfig.TaskConfig = config // Rebuild resource map from TaskConfig - for ptr := range config.Resources { + for _, ptr := range config.Resources { if ptr.NumCpus != nil { job.resources["cpu"].NumCpus = ptr.NumCpus continue // Guard against Union violations that Go won't enforce @@ -142,7 +142,7 @@ func NewUpdateSettings() *aurora.JobUpdateSettings { us := aurora.JobUpdateSettings{} // Mirrors defaults set by Pystachio - us.UpdateOnlyTheseInstances = make(map[*aurora.Range]bool) + us.UpdateOnlyTheseInstances = []*aurora.Range{} us.UpdateGroupSize = 1 us.WaitForBatchCompletion = false us.MinWaitInInstanceRunningMs = 45000