diff --git a/examples/client.go b/examples/client.go index dd5ce2a..7425542 100644 --- a/examples/client.go +++ b/examples/client.go @@ -316,13 +316,9 @@ func main() { } currInstances := int32(len(live)) fmt.Println("Current num of instances: ", currInstances) - var instId int32 - for k := range live { - instId = k - } resp, err := r.AddInstances(aurora.InstanceKey{ JobKey: job.JobKey(), - InstanceId: instId, + InstanceId: live[0], }, numOfInstances) @@ -364,13 +360,9 @@ func main() { if err != nil { log.Fatal(err) } - var instId int32 - for k := range live { - instId = k - } taskConfig, err := r.FetchTaskConfig(aurora.InstanceKey{ JobKey: job.JobKey(), - InstanceId: instId, + InstanceId: live[0], }) if err != nil { log.Fatal(err) @@ -467,14 +459,9 @@ func main() { log.Fatal(err) } - var instId int32 - for k := range live { - instId = k - break - } config, err := r.FetchTaskConfig(aurora.InstanceKey{ JobKey: job.JobKey(), - InstanceId: instId, + InstanceId: live[0], }) if err != nil { diff --git a/job.go b/job.go index 86849a2..bfbfa81 100644 --- a/job.go +++ b/job.go @@ -88,9 +88,9 @@ func NewJob() Job { taskConfig.Job = jobKey taskConfig.Container = aurora.NewContainer() taskConfig.Container.Mesos = aurora.NewMesosContainer() - taskConfig.MesosFetcherUris = make(map[*aurora.MesosFetcherURI]bool) - taskConfig.Metadata = make(map[*aurora.Metadata]bool) - taskConfig.Constraints = make(map[*aurora.Constraint]bool) + taskConfig.MesosFetcherUris = make([]*aurora.MesosFetcherURI,0) + taskConfig.Metadata = make([]*aurora.Metadata,0) + taskConfig.Constraints = make([]*aurora.Constraint,0) // Resources numCpus := aurora.NewResource() @@ -98,7 +98,7 @@ func NewJob() Job { diskMb := aurora.NewResource() resources := map[ResourceType]*aurora.Resource{CPU: numCpus, RAM: ramMb, DISK: diskMb} - taskConfig.Resources = map[*aurora.Resource]bool{numCpus: true, ramMb: true, diskMb: true} + taskConfig.Resources = []*aurora.Resource{numCpus, ramMb, diskMb} numCpus.NumCpus = new(float64) ramMb.RamMb = new(int64) @@ -171,14 +171,15 @@ func (j *AuroraJob) Disk(disk int64) Job { return j } -func (j *AuroraJob) GPU(gpus int64) Job { +func (j *AuroraJob) GPU(gpu int64) Job { + // GPU resource must be set explicitly since the scheduler by default + // rejects jobs with GPU resources attached to it. if _, ok := j.resources[GPU]; !ok { - numGPUs := &aurora.Resource{NumGpus: new(int64)} - j.resources[GPU] = numGPUs - j.TaskConfig().Resources[numGPUs] = true + j.resources[GPU] = &aurora.Resource{} + j.JobConfig().GetTaskConfig().Resources = append(j.JobConfig().GetTaskConfig().Resources, j.resources[GPU]) } - *j.resources[GPU].NumGpus = gpus + j.resources[GPU].NumGpus = &gpu return j } @@ -233,11 +234,8 @@ func (j *AuroraJob) TaskConfig() *aurora.TaskConfig { // --enable_mesos_fetcher flag enabled. Currently there is no duplicate detection. func (j *AuroraJob) AddURIs(extract bool, cache bool, values ...string) Job { for _, value := range values { - j.jobConfig.TaskConfig.MesosFetcherUris[&aurora.MesosFetcherURI{ - Value: value, - Extract: &extract, - Cache: &cache, - }] = true + j.jobConfig.TaskConfig.MesosFetcherUris = append(j.jobConfig.TaskConfig.MesosFetcherUris, + &aurora.MesosFetcherURI{Value: value, Extract: &extract, Cache: &cache,}) } return j } @@ -245,7 +243,7 @@ func (j *AuroraJob) AddURIs(extract bool, cache bool, values ...string) Job { // Adds a Mesos label to the job. Note that Aurora will add the // prefix "org.apache.aurora.metadata." to the beginning of each key. func (j *AuroraJob) AddLabel(key string, value string) Job { - j.jobConfig.TaskConfig.Metadata[&aurora.Metadata{Key: key, Value: value}] = true + j.jobConfig.TaskConfig.Metadata = append( j.jobConfig.TaskConfig.Metadata, &aurora.Metadata{Key: key, Value: value}) return j } @@ -254,7 +252,7 @@ func (j *AuroraJob) AddLabel(key string, value string) Job { func (j *AuroraJob) AddNamedPorts(names ...string) Job { j.portCount += len(names) for _, name := range names { - j.jobConfig.TaskConfig.Resources[&aurora.Resource{NamedPort: &name}] = true + j.jobConfig.TaskConfig.Resources = append(j.jobConfig.TaskConfig.Resources, &aurora.Resource{NamedPort: &name}) } return j @@ -269,7 +267,7 @@ func (j *AuroraJob) AddPorts(num int) Job { j.portCount += num for i := start; i < j.portCount; i++ { portName := "org.apache.aurora.port." + strconv.Itoa(i) - j.jobConfig.TaskConfig.Resources[&aurora.Resource{NamedPort: &portName}] = true + j.jobConfig.TaskConfig.Resources = append(j.jobConfig.TaskConfig.Resources,&aurora.Resource{NamedPort: &portName}) } return j @@ -281,20 +279,17 @@ func (j *AuroraJob) AddPorts(num int) Job { // If negated = true , treat this as a 'not' - to avoid specific values. // Values - list of values we look for in attribute name func (j *AuroraJob) AddValueConstraint(name string, negated bool, values ...string) Job { - constraintValues := make(map[string]bool) - for _, value := range values { - constraintValues[value] = true - } - j.jobConfig.TaskConfig.Constraints[&aurora.Constraint{ + j.jobConfig.TaskConfig.Constraints = append(j.jobConfig.TaskConfig.Constraints, + &aurora.Constraint{ Name: name, Constraint: &aurora.TaskConstraint{ Value: &aurora.ValueConstraint{ Negated: negated, - Values: constraintValues, + Values: values, }, Limit: nil, }, - }] = true + }) return j } @@ -303,13 +298,14 @@ func (j *AuroraJob) AddValueConstraint(name string, negated bool, values ...stri // A constraint that specifies the maximum number of active tasks on a host with // a matching attribute that may be scheduled simultaneously. func (j *AuroraJob) AddLimitConstraint(name string, limit int32) Job { - j.jobConfig.TaskConfig.Constraints[&aurora.Constraint{ + j.jobConfig.TaskConfig.Constraints = append(j.jobConfig.TaskConfig.Constraints, + &aurora.Constraint{ Name: name, Constraint: &aurora.TaskConstraint{ Value: nil, Limit: &aurora.LimitConstraint{Limit: limit}, }, - }] = true + }) return j } diff --git a/monitors.go b/monitors.go index 3106cc6..6ef7c1a 100644 --- a/monitors.go +++ b/monitors.go @@ -103,7 +103,7 @@ func (m *Monitor) JobUpdateStatus(updateKey aurora.JobUpdateKey, // Monitor a Job until all instances enter one of the LIVE_STATES func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval, timeout int) (bool, error) { - return m.ScheduleStatus(key, instances, aurora.LIVE_STATES, interval, timeout) + return m.ScheduleStatus(key, instances, LiveStates, interval, timeout) } // Monitor a Job until all instances enter a desired status. @@ -116,12 +116,18 @@ func (m *Monitor) ScheduleStatus(key *aurora.JobKey, instanceCount int32, desire timer := time.NewTimer(time.Second * time.Duration(timeout)) defer timer.Stop() + wantedStatuses := make([]aurora.ScheduleStatus, 0) + + for status := range desiredStatuses { + wantedStatuses = append(wantedStatuses, status) + } + for { select { case <-ticker.C: // Query Aurora for the state of the job key ever interval - instCount, cliErr := m.Client.GetInstanceIds(key, desiredStatuses) + instCount, cliErr := m.Client.GetInstanceIds(key, wantedStatuses) if cliErr != nil { return false, errors.Wrap(cliErr, "Unable to communicate with Aurora") } @@ -174,7 +180,7 @@ func (m *Monitor) HostMaintenance(hosts []string, modes []aurora.MaintenanceMode return hostResult, errors.Wrap(err, "client error in monitor") } - for status := range result.GetStatuses() { + for _, status := range result.GetStatuses() { if _, ok := desiredMode[status.GetMode()]; ok { hostResult[status.GetHost()] = true diff --git a/realis.go b/realis.go index ee2bb32..7719b08 100644 --- a/realis.go +++ b/realis.go @@ -26,11 +26,12 @@ import ( "net/http/cookiejar" "os" "path/filepath" - "strings" + "sort" + "strings" "sync" "time" - "git.apache.org/thrift.git/lib/go/thrift" + "github.com/apache/thrift/lib/go/thrift" "github.com/paypal/gorealis/gen-go/apache/aurora" "github.com/paypal/gorealis/response" "github.com/pkg/errors" @@ -46,7 +47,7 @@ type Realis interface { CreateService(auroraJob Job, settings *aurora.JobUpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error) - GetInstanceIds(key *aurora.JobKey, states map[aurora.ScheduleStatus]bool) (map[int32]bool, error) + GetInstanceIds(key *aurora.JobKey, states []aurora.ScheduleStatus) ([]int32, error) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) @@ -95,6 +96,7 @@ type realisClient struct { logger LevelLogger lock *sync.Mutex debug bool + transport thrift.TTransport } type RealisConfig struct { @@ -366,7 +368,8 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory), adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory), logger: LevelLogger{Logger: config.logger, debug: config.debug, trace: config.trace}, - lock: &sync.Mutex{}}, nil + lock: &sync.Mutex{}, + transport: config.transport}, nil } func GetDefaultClusterFromZKUrl(zkurl string) *Cluster { @@ -525,13 +528,11 @@ func (r *realisClient) Close() { r.lock.Lock() defer r.lock.Unlock() - r.client.Transport.Close() - r.readonlyClient.Transport.Close() - r.adminClient.Transport.Close() + r.transport.Close() } // Uses predefined set of states to retrieve a set of active jobs in Apache Aurora. -func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.ScheduleStatus]bool) (map[int32]bool, error) { +func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states []aurora.ScheduleStatus) ([]int32, error) { taskQ := &aurora.TaskQuery{ Role: &key.Role, Environment: &key.Environment, @@ -542,7 +543,7 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", taskQ) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.GetTasksWithoutConfigs(taskQ) + return r.client.GetTasksWithoutConfigs(nil, taskQ) }) // If we encountered an error we couldn't recover from by retrying, return an error to the user @@ -552,9 +553,9 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche // Construct instance id map to stay in line with thrift's representation of sets tasks := response.ScheduleStatusResult(resp).GetTasks() - jobInstanceIds := make(map[int32]bool) + jobInstanceIds := make([]int32, 0, len(tasks)) for _, task := range tasks { - jobInstanceIds[task.GetAssignedTask().GetInstanceId()] = true + jobInstanceIds = append(jobInstanceIds, task.GetAssignedTask().GetInstanceId()) } return jobInstanceIds, nil @@ -565,7 +566,7 @@ func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue r.logger.DebugPrintf("GetJobUpdateSummaries Thrift Payload: %+v\n", jobUpdateQuery) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.readonlyClient.GetJobUpdateSummaries(jobUpdateQuery) + return r.readonlyClient.GetJobUpdateSummaries(nil, jobUpdateQuery) }) if retryErr != nil { @@ -580,7 +581,7 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe var result *aurora.GetJobsResult_ resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.readonlyClient.GetJobs(role) + return r.readonlyClient.GetJobs(nil, role) }) if retryErr != nil { @@ -598,14 +599,8 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) { r.logger.DebugPrintf("KillTasks Thrift Payload: %+v %v\n", key, instances) - instanceIds := make(map[int32]bool) - - for _, instId := range instances { - instanceIds[instId] = true - } - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.KillTasks(key, instanceIds, "") + return r.client.KillTasks(nil, key, instances, "") }) if retryErr != nil { @@ -625,7 +620,7 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { // Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards - return r.client.KillTasks(key, nil, "") + return r.client.KillTasks(nil, key, nil, "") }) if retryErr != nil { @@ -643,7 +638,7 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { r.logger.DebugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig()) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.CreateJob(auroraJob.JobConfig()) + return r.client.CreateJob(nil, auroraJob.JobConfig()) }) if retryErr != nil { @@ -674,7 +669,7 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) r.logger.DebugPrintf("ScheduleCronJob Thrift Payload: %+v\n", auroraJob.JobConfig()) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.ScheduleCronJob(auroraJob.JobConfig()) + return r.client.ScheduleCronJob(nil, auroraJob.JobConfig()) }) if retryErr != nil { @@ -688,7 +683,7 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, r.logger.DebugPrintf("DescheduleCronJob Thrift Payload: %+v\n", key) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.DescheduleCronJob(key) + return r.client.DescheduleCronJob(nil, key) }) if retryErr != nil { @@ -704,7 +699,7 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error r.logger.DebugPrintf("StartCronJob Thrift Payload: %+v\n", key) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.StartCronJob(key) + return r.client.StartCronJob(nil, key) }) if retryErr != nil { @@ -718,14 +713,8 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) { r.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instances) - instanceIds := make(map[int32]bool) - - for _, instId := range instances { - instanceIds[instId] = true - } - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.RestartShards(key, instanceIds) + return r.client.RestartShards(nil, key, instances) }) if retryErr != nil { @@ -746,7 +735,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) if len(instanceIds) > 0 { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.RestartShards(key, instanceIds) + return r.client.RestartShards(nil, key, instanceIds) }) if retryErr != nil { @@ -765,7 +754,7 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au r.logger.DebugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.StartJobUpdate(updateJob.req, message) + return r.client.StartJobUpdate(nil, updateJob.req, message) }) if retryErr != nil { @@ -782,7 +771,7 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str r.logger.DebugPrintf("AbortJobUpdate Thrift Payload: %+v %v\n", updateKey, message) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.AbortJobUpdate(&updateKey, message) + return r.client.AbortJobUpdate(nil, &updateKey, message) }) if retryErr != nil { @@ -802,7 +791,7 @@ func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st r.logger.DebugPrintf("PauseJobUpdate Thrift Payload: %+v %v\n", updateKey, message) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.PauseJobUpdate(updateKey, message) + return r.client.PauseJobUpdate(nil, updateKey, message) }) if retryErr != nil { @@ -818,7 +807,7 @@ func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s r.logger.DebugPrintf("ResumeJobUpdate Thrift Payload: %+v %v\n", updateKey, message) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.ResumeJobUpdate(updateKey, message) + return r.client.ResumeJobUpdate(nil, updateKey, message) }) if retryErr != nil { @@ -834,7 +823,7 @@ func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R r.logger.DebugPrintf("PulseJobUpdate Thrift Payload: %+v\n", updateKey) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.PulseJobUpdate(updateKey) + return r.client.PulseJobUpdate(nil, updateKey) }) if retryErr != nil { @@ -851,7 +840,7 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a r.logger.DebugPrintf("AddInstances Thrift Payload: %+v %v\n", instKey, count) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.AddInstances(&instKey, count) + return r.client.AddInstances(nil, &instKey, count) }) if retryErr != nil { @@ -861,35 +850,34 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a } -//Scale down the number of instances under a job configuration using the configuration of a specific instance +// Scale down the number of instances under a job configuration using the configuration of a specific instance func (r *realisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora.Response, error) { instanceIds, err := r.GetInstanceIds(key, aurora.ACTIVE_STATES) if err != nil { return nil, errors.Wrap(err, "RemoveInstances: Could not retrieve relevant instance IDs") } + if len(instanceIds) < int(count) { - return nil, errors.New(fmt.Sprintf("RemoveInstances: No sufficient instances to Kill - "+ - "Instances to kill %d Total Instances %d", count, len(instanceIds))) + return nil, errors.Errorf("Insufficient active instances available for killing: "+ + " Instances to be killed %d Active instances %d", count, len(instanceIds)) } - instanceList := make([]int32, count) - i := 0 - for k := range instanceIds { - instanceList[i] = k - i += 1 - if i == int(count) { - break - } - } - return r.KillInstances(key, instanceList...) + + // Sort instanceIds in ** decreasing ** order + sort.Slice(instanceIds, func(i, j int) bool { + return instanceIds[i] > instanceIds[j] + }) + + // Kill the instances with the highest ID number first + return r.KillInstances(key, instanceIds[:count]...) } // Get information about task including a fully hydrated task configuration object -func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) { +func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) { r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", query) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.GetTasksStatus(query) + return r.client.GetTasksStatus(nil, query) }) if retryErr != nil { @@ -900,36 +888,34 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.S } // Get pending reason -func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) (pendingReasons []*aurora.PendingReason, e error) { +func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.PendingReason, error) { r.logger.DebugPrintf("GetPendingReason Thrift Payload: %+v\n", query) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.GetPendingReason(query) + return r.client.GetPendingReason(nil, query) }) if retryErr != nil { return nil, errors.Wrap(retryErr, "Error querying Aurora Scheduler for pending Reasons") } - var result map[*aurora.PendingReason]bool + var pendingReasons []*aurora.PendingReason if resp.GetResult_() != nil { - result = resp.GetResult_().GetGetPendingReasonResult_().GetReasons() - } - for reason := range result { - pendingReasons = append(pendingReasons, reason) + pendingReasons = resp.GetResult_().GetGetPendingReasonResult_().GetReasons() } + return pendingReasons, nil } // Get information about task including without a task configuration object -func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) { +func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) { r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.GetTasksWithoutConfigs(query) + return r.client.GetTasksWithoutConfigs(nil, query) }) if retryErr != nil { @@ -942,22 +928,18 @@ func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks [] // Get the task configuration from the aurora scheduler for a job func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error) { - - ids := make(map[int32]bool) - - ids[instKey.InstanceId] = true taskQ := &aurora.TaskQuery{ Role: &instKey.JobKey.Role, Environment: &instKey.JobKey.Environment, JobName: &instKey.JobKey.Name, - InstanceIds: ids, + InstanceIds: []int32{instKey.InstanceId}, Statuses: aurora.ACTIVE_STATES, } r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", taskQ) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.GetTasksStatus(taskQ) + return r.client.GetTasksStatus(nil, taskQ) }) if retryErr != nil { @@ -983,7 +965,7 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur r.logger.DebugPrintf("GetJobUpdateDetails Thrift Payload: %+v\n", updateQuery) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.GetJobUpdateDetails(&updateQuery) + return r.client.GetJobUpdateDetails(nil, &updateQuery) }) if retryErr != nil { @@ -998,7 +980,7 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string r.logger.DebugPrintf("RollbackJobUpdate Thrift Payload: %+v %v\n", key, message) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.RollbackJobUpdate(&key, message) + return r.client.RollbackJobUpdate(nil, &key, message) }) if retryErr != nil { @@ -1023,15 +1005,12 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr } drainList := aurora.NewHosts() - drainList.HostNames = make(map[string]bool) - for _, host := range hosts { - drainList.HostNames[host] = true - } + drainList.HostNames = hosts r.logger.DebugPrintf("DrainHosts Thrift Payload: %v\n", drainList) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.DrainHosts(drainList) + return r.adminClient.DrainHosts(nil, drainList) }) if retryErr != nil { @@ -1056,15 +1035,12 @@ func (r *realisClient) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, ho } drainList := aurora.NewHosts() - drainList.HostNames = make(map[string]bool) - for _, host := range hosts { - drainList.HostNames[host] = true - } + drainList.HostNames = hosts r.logger.DebugPrintf("SLADrainHosts Thrift Payload: %v\n", drainList) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.SlaDrainHosts(drainList, policy, timeout) + return r.adminClient.SlaDrainHosts(nil, drainList, policy, timeout) }) if retryErr != nil { @@ -1087,15 +1063,12 @@ func (r *realisClient) StartMaintenance(hosts ...string) (*aurora.Response, *aur } hostList := aurora.NewHosts() - hostList.HostNames = make(map[string]bool) - for _, host := range hosts { - hostList.HostNames[host] = true - } + hostList.HostNames = hosts r.logger.DebugPrintf("StartMaintenance Thrift Payload: %v\n", hostList) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.StartMaintenance(hostList) + return r.adminClient.StartMaintenance(nil, hostList) }) if retryErr != nil { @@ -1118,15 +1091,12 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror } hostList := aurora.NewHosts() - hostList.HostNames = make(map[string]bool) - for _, host := range hosts { - hostList.HostNames[host] = true - } + hostList.HostNames = hosts r.logger.DebugPrintf("EndMaintenance Thrift Payload: %v\n", hostList) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.EndMaintenance(hostList) + return r.adminClient.EndMaintenance(nil, hostList) }) if retryErr != nil { @@ -1149,17 +1119,14 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au } hostList := aurora.NewHosts() - hostList.HostNames = make(map[string]bool) - for _, host := range hosts { - hostList.HostNames[host] = true - } + hostList.HostNames = hosts r.logger.DebugPrintf("MaintenanceStatus Thrift Payload: %v\n", hostList) // Make thrift call. If we encounter an error sending the call, attempt to reconnect // and continue trying to resend command until we run out of retries. resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.MaintenanceStatus(hostList) + return r.adminClient.MaintenanceStatus(nil, hostList) }) if retryErr != nil { @@ -1176,19 +1143,16 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au // SetQuota sets a quota aggregate for the given role // TODO(zircote) Currently investigating an error that is returned from thrift calls that include resources for `NamedPort` and `NumGpu` func (r *realisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb *int64) (*aurora.Response, error) { - ram := aurora.NewResource() - ram.RamMb = ramMb - c := aurora.NewResource() - c.NumCpus = cpu - d := aurora.NewResource() - d.DiskMb = diskMb + ramRes := aurora.NewResource() + ramRes.RamMb = ramMb + cpuRes := aurora.NewResource() + cpuRes.NumCpus = cpu + diskRes := aurora.NewResource() + diskRes.DiskMb = diskMb quota := aurora.NewResourceAggregate() - quota.Resources = make(map[*aurora.Resource]bool) - quota.Resources[ram] = true - quota.Resources[c] = true - quota.Resources[d] = true + quota.Resources = []*aurora.Resource{cpuRes, ramRes, diskRes} resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.SetQuota(role, quota) + return r.adminClient.SetQuota(nil, role, quota) }) if retryErr != nil { @@ -1202,7 +1166,7 @@ func (r *realisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb func (r *realisClient) GetQuota(role string) (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.GetQuota(role) + return r.adminClient.GetQuota(nil, role) }) if retryErr != nil { @@ -1215,7 +1179,7 @@ func (r *realisClient) GetQuota(role string) (*aurora.Response, error) { func (r *realisClient) Snapshot() error { _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.Snapshot() + return r.adminClient.Snapshot(nil) }) if retryErr != nil { @@ -1229,7 +1193,7 @@ func (r *realisClient) Snapshot() error { func (r *realisClient) PerformBackup() error { _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.PerformBackup() + return r.adminClient.PerformBackup(nil) }) if retryErr != nil { @@ -1242,7 +1206,7 @@ func (r *realisClient) PerformBackup() error { func (r *realisClient) ForceImplicitTaskReconciliation() error { _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.TriggerImplicitTaskReconciliation() + return r.adminClient.TriggerImplicitTaskReconciliation(nil) }) if retryErr != nil { @@ -1262,7 +1226,7 @@ func (r *realisClient) ForceExplicitTaskReconciliation(batchSize *int32) error { settings.BatchSize = batchSize _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.TriggerExplicitTaskReconciliation(settings) + return r.adminClient.TriggerExplicitTaskReconciliation(nil, settings) }) if retryErr != nil { diff --git a/realis_e2e_test.go b/realis_e2e_test.go index 163e67c..bd68674 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -22,7 +22,7 @@ import ( "testing" "time" - "git.apache.org/thrift.git/lib/go/thrift" + "github.com/apache/thrift/lib/go/thrift" realis "github.com/paypal/gorealis" "github.com/paypal/gorealis/gen-go/apache/aurora" "github.com/paypal/gorealis/response" diff --git a/retry.go b/retry.go index e442ee3..68b13cf 100644 --- a/retry.go +++ b/retry.go @@ -20,7 +20,7 @@ import ( "net/url" "time" - "git.apache.org/thrift.git/lib/go/thrift" + "github.com/apache/thrift/lib/go/thrift" "github.com/paypal/gorealis/gen-go/apache/aurora" "github.com/paypal/gorealis/response" "github.com/pkg/errors" diff --git a/updatejob.go b/updatejob.go index f7ef2fb..c8161cd 100644 --- a/updatejob.go +++ b/updatejob.go @@ -35,7 +35,7 @@ func NewDefaultUpdateJob(config *aurora.TaskConfig) *UpdateJob { job.jobConfig.TaskConfig = config // Rebuild resource map from TaskConfig - for ptr := range config.Resources { + for _, ptr := range config.Resources { if ptr.NumCpus != nil { job.resources[CPU].NumCpus = ptr.NumCpus continue // Guard against Union violations that Go won't enforce @@ -50,10 +50,15 @@ func NewDefaultUpdateJob(config *aurora.TaskConfig) *UpdateJob { job.resources[DISK].DiskMb = ptr.DiskMb continue } + + if ptr.NumGpus != nil { + job.resources[GPU] = &aurora.Resource{NumGpus: ptr.NumGpus} + continue + } } // Mirrors defaults set by Pystachio - req.Settings.UpdateOnlyTheseInstances = make(map[*aurora.Range]bool) + req.Settings.UpdateOnlyTheseInstances = make([]*aurora.Range,0) req.Settings.UpdateGroupSize = 1 req.Settings.WaitForBatchCompletion = false req.Settings.MinWaitInInstanceRunningMs = 45000 @@ -75,7 +80,7 @@ func NewUpdateJob(config *aurora.TaskConfig, settings *aurora.JobUpdateSettings) job.jobConfig.TaskConfig = config // Rebuild resource map from TaskConfig - for ptr := range config.Resources { + for _, ptr := range config.Resources { if ptr.NumCpus != nil { job.resources[CPU].NumCpus = ptr.NumCpus continue // Guard against Union violations that Go won't enforce @@ -147,7 +152,7 @@ func NewUpdateSettings() *aurora.JobUpdateSettings { us := new(aurora.JobUpdateSettings) // Mirrors defaults set by Pystachio - us.UpdateOnlyTheseInstances = make(map[*aurora.Range]bool) + us.UpdateOnlyTheseInstances = make([]*aurora.Range, 0) us.UpdateGroupSize = 1 us.WaitForBatchCompletion = false us.MinWaitInInstanceRunningMs = 45000