Ported all code from Thrift 0.9.3 to Thrift 0.12.0 while backporting some fixes from gorealis v2
This commit is contained in:
parent
b90dd67418
commit
1e7e73cd38
7 changed files with 121 additions and 163 deletions
|
@ -316,13 +316,9 @@ func main() {
|
|||
}
|
||||
currInstances := int32(len(live))
|
||||
fmt.Println("Current num of instances: ", currInstances)
|
||||
var instId int32
|
||||
for k := range live {
|
||||
instId = k
|
||||
}
|
||||
resp, err := r.AddInstances(aurora.InstanceKey{
|
||||
JobKey: job.JobKey(),
|
||||
InstanceId: instId,
|
||||
InstanceId: live[0],
|
||||
},
|
||||
numOfInstances)
|
||||
|
||||
|
@ -364,13 +360,9 @@ func main() {
|
|||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
var instId int32
|
||||
for k := range live {
|
||||
instId = k
|
||||
}
|
||||
taskConfig, err := r.FetchTaskConfig(aurora.InstanceKey{
|
||||
JobKey: job.JobKey(),
|
||||
InstanceId: instId,
|
||||
InstanceId: live[0],
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
|
@ -467,14 +459,9 @@ func main() {
|
|||
log.Fatal(err)
|
||||
|
||||
}
|
||||
var instId int32
|
||||
for k := range live {
|
||||
instId = k
|
||||
break
|
||||
}
|
||||
config, err := r.FetchTaskConfig(aurora.InstanceKey{
|
||||
JobKey: job.JobKey(),
|
||||
InstanceId: instId,
|
||||
InstanceId: live[0],
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
|
|
48
job.go
48
job.go
|
@ -88,9 +88,9 @@ func NewJob() Job {
|
|||
taskConfig.Job = jobKey
|
||||
taskConfig.Container = aurora.NewContainer()
|
||||
taskConfig.Container.Mesos = aurora.NewMesosContainer()
|
||||
taskConfig.MesosFetcherUris = make(map[*aurora.MesosFetcherURI]bool)
|
||||
taskConfig.Metadata = make(map[*aurora.Metadata]bool)
|
||||
taskConfig.Constraints = make(map[*aurora.Constraint]bool)
|
||||
taskConfig.MesosFetcherUris = make([]*aurora.MesosFetcherURI,0)
|
||||
taskConfig.Metadata = make([]*aurora.Metadata,0)
|
||||
taskConfig.Constraints = make([]*aurora.Constraint,0)
|
||||
|
||||
// Resources
|
||||
numCpus := aurora.NewResource()
|
||||
|
@ -98,7 +98,7 @@ func NewJob() Job {
|
|||
diskMb := aurora.NewResource()
|
||||
|
||||
resources := map[ResourceType]*aurora.Resource{CPU: numCpus, RAM: ramMb, DISK: diskMb}
|
||||
taskConfig.Resources = map[*aurora.Resource]bool{numCpus: true, ramMb: true, diskMb: true}
|
||||
taskConfig.Resources = []*aurora.Resource{numCpus, ramMb, diskMb}
|
||||
|
||||
numCpus.NumCpus = new(float64)
|
||||
ramMb.RamMb = new(int64)
|
||||
|
@ -171,14 +171,15 @@ func (j *AuroraJob) Disk(disk int64) Job {
|
|||
return j
|
||||
}
|
||||
|
||||
func (j *AuroraJob) GPU(gpus int64) Job {
|
||||
func (j *AuroraJob) GPU(gpu int64) Job {
|
||||
// GPU resource must be set explicitly since the scheduler by default
|
||||
// rejects jobs with GPU resources attached to it.
|
||||
if _, ok := j.resources[GPU]; !ok {
|
||||
numGPUs := &aurora.Resource{NumGpus: new(int64)}
|
||||
j.resources[GPU] = numGPUs
|
||||
j.TaskConfig().Resources[numGPUs] = true
|
||||
j.resources[GPU] = &aurora.Resource{}
|
||||
j.JobConfig().GetTaskConfig().Resources = append(j.JobConfig().GetTaskConfig().Resources, j.resources[GPU])
|
||||
}
|
||||
|
||||
*j.resources[GPU].NumGpus = gpus
|
||||
j.resources[GPU].NumGpus = &gpu
|
||||
return j
|
||||
}
|
||||
|
||||
|
@ -233,11 +234,8 @@ func (j *AuroraJob) TaskConfig() *aurora.TaskConfig {
|
|||
// --enable_mesos_fetcher flag enabled. Currently there is no duplicate detection.
|
||||
func (j *AuroraJob) AddURIs(extract bool, cache bool, values ...string) Job {
|
||||
for _, value := range values {
|
||||
j.jobConfig.TaskConfig.MesosFetcherUris[&aurora.MesosFetcherURI{
|
||||
Value: value,
|
||||
Extract: &extract,
|
||||
Cache: &cache,
|
||||
}] = true
|
||||
j.jobConfig.TaskConfig.MesosFetcherUris = append(j.jobConfig.TaskConfig.MesosFetcherUris,
|
||||
&aurora.MesosFetcherURI{Value: value, Extract: &extract, Cache: &cache,})
|
||||
}
|
||||
return j
|
||||
}
|
||||
|
@ -245,7 +243,7 @@ func (j *AuroraJob) AddURIs(extract bool, cache bool, values ...string) Job {
|
|||
// Adds a Mesos label to the job. Note that Aurora will add the
|
||||
// prefix "org.apache.aurora.metadata." to the beginning of each key.
|
||||
func (j *AuroraJob) AddLabel(key string, value string) Job {
|
||||
j.jobConfig.TaskConfig.Metadata[&aurora.Metadata{Key: key, Value: value}] = true
|
||||
j.jobConfig.TaskConfig.Metadata = append( j.jobConfig.TaskConfig.Metadata, &aurora.Metadata{Key: key, Value: value})
|
||||
return j
|
||||
}
|
||||
|
||||
|
@ -254,7 +252,7 @@ func (j *AuroraJob) AddLabel(key string, value string) Job {
|
|||
func (j *AuroraJob) AddNamedPorts(names ...string) Job {
|
||||
j.portCount += len(names)
|
||||
for _, name := range names {
|
||||
j.jobConfig.TaskConfig.Resources[&aurora.Resource{NamedPort: &name}] = true
|
||||
j.jobConfig.TaskConfig.Resources = append(j.jobConfig.TaskConfig.Resources, &aurora.Resource{NamedPort: &name})
|
||||
}
|
||||
|
||||
return j
|
||||
|
@ -269,7 +267,7 @@ func (j *AuroraJob) AddPorts(num int) Job {
|
|||
j.portCount += num
|
||||
for i := start; i < j.portCount; i++ {
|
||||
portName := "org.apache.aurora.port." + strconv.Itoa(i)
|
||||
j.jobConfig.TaskConfig.Resources[&aurora.Resource{NamedPort: &portName}] = true
|
||||
j.jobConfig.TaskConfig.Resources = append(j.jobConfig.TaskConfig.Resources,&aurora.Resource{NamedPort: &portName})
|
||||
}
|
||||
|
||||
return j
|
||||
|
@ -281,20 +279,17 @@ func (j *AuroraJob) AddPorts(num int) Job {
|
|||
// If negated = true , treat this as a 'not' - to avoid specific values.
|
||||
// Values - list of values we look for in attribute name
|
||||
func (j *AuroraJob) AddValueConstraint(name string, negated bool, values ...string) Job {
|
||||
constraintValues := make(map[string]bool)
|
||||
for _, value := range values {
|
||||
constraintValues[value] = true
|
||||
}
|
||||
j.jobConfig.TaskConfig.Constraints[&aurora.Constraint{
|
||||
j.jobConfig.TaskConfig.Constraints = append(j.jobConfig.TaskConfig.Constraints,
|
||||
&aurora.Constraint{
|
||||
Name: name,
|
||||
Constraint: &aurora.TaskConstraint{
|
||||
Value: &aurora.ValueConstraint{
|
||||
Negated: negated,
|
||||
Values: constraintValues,
|
||||
Values: values,
|
||||
},
|
||||
Limit: nil,
|
||||
},
|
||||
}] = true
|
||||
})
|
||||
|
||||
return j
|
||||
}
|
||||
|
@ -303,13 +298,14 @@ func (j *AuroraJob) AddValueConstraint(name string, negated bool, values ...stri
|
|||
// A constraint that specifies the maximum number of active tasks on a host with
|
||||
// a matching attribute that may be scheduled simultaneously.
|
||||
func (j *AuroraJob) AddLimitConstraint(name string, limit int32) Job {
|
||||
j.jobConfig.TaskConfig.Constraints[&aurora.Constraint{
|
||||
j.jobConfig.TaskConfig.Constraints = append(j.jobConfig.TaskConfig.Constraints,
|
||||
&aurora.Constraint{
|
||||
Name: name,
|
||||
Constraint: &aurora.TaskConstraint{
|
||||
Value: nil,
|
||||
Limit: &aurora.LimitConstraint{Limit: limit},
|
||||
},
|
||||
}] = true
|
||||
})
|
||||
|
||||
return j
|
||||
}
|
||||
|
|
12
monitors.go
12
monitors.go
|
@ -103,7 +103,7 @@ func (m *Monitor) JobUpdateStatus(updateKey aurora.JobUpdateKey,
|
|||
|
||||
// Monitor a Job until all instances enter one of the LIVE_STATES
|
||||
func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval, timeout int) (bool, error) {
|
||||
return m.ScheduleStatus(key, instances, aurora.LIVE_STATES, interval, timeout)
|
||||
return m.ScheduleStatus(key, instances, LiveStates, interval, timeout)
|
||||
}
|
||||
|
||||
// Monitor a Job until all instances enter a desired status.
|
||||
|
@ -116,12 +116,18 @@ func (m *Monitor) ScheduleStatus(key *aurora.JobKey, instanceCount int32, desire
|
|||
timer := time.NewTimer(time.Second * time.Duration(timeout))
|
||||
defer timer.Stop()
|
||||
|
||||
wantedStatuses := make([]aurora.ScheduleStatus, 0)
|
||||
|
||||
for status := range desiredStatuses {
|
||||
wantedStatuses = append(wantedStatuses, status)
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
|
||||
// Query Aurora for the state of the job key ever interval
|
||||
instCount, cliErr := m.Client.GetInstanceIds(key, desiredStatuses)
|
||||
instCount, cliErr := m.Client.GetInstanceIds(key, wantedStatuses)
|
||||
if cliErr != nil {
|
||||
return false, errors.Wrap(cliErr, "Unable to communicate with Aurora")
|
||||
}
|
||||
|
@ -174,7 +180,7 @@ func (m *Monitor) HostMaintenance(hosts []string, modes []aurora.MaintenanceMode
|
|||
return hostResult, errors.Wrap(err, "client error in monitor")
|
||||
}
|
||||
|
||||
for status := range result.GetStatuses() {
|
||||
for _, status := range result.GetStatuses() {
|
||||
|
||||
if _, ok := desiredMode[status.GetMode()]; ok {
|
||||
hostResult[status.GetHost()] = true
|
||||
|
|
188
realis.go
188
realis.go
|
@ -26,11 +26,12 @@ import (
|
|||
"net/http/cookiejar"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.apache.org/thrift.git/lib/go/thrift"
|
||||
"github.com/apache/thrift/lib/go/thrift"
|
||||
"github.com/paypal/gorealis/gen-go/apache/aurora"
|
||||
"github.com/paypal/gorealis/response"
|
||||
"github.com/pkg/errors"
|
||||
|
@ -46,7 +47,7 @@ type Realis interface {
|
|||
CreateService(auroraJob Job, settings *aurora.JobUpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error)
|
||||
DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error)
|
||||
FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error)
|
||||
GetInstanceIds(key *aurora.JobKey, states map[aurora.ScheduleStatus]bool) (map[int32]bool, error)
|
||||
GetInstanceIds(key *aurora.JobKey, states []aurora.ScheduleStatus) ([]int32, error)
|
||||
GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error)
|
||||
GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error)
|
||||
GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error)
|
||||
|
@ -95,6 +96,7 @@ type realisClient struct {
|
|||
logger LevelLogger
|
||||
lock *sync.Mutex
|
||||
debug bool
|
||||
transport thrift.TTransport
|
||||
}
|
||||
|
||||
type RealisConfig struct {
|
||||
|
@ -366,7 +368,8 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
|
|||
readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory),
|
||||
adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory),
|
||||
logger: LevelLogger{Logger: config.logger, debug: config.debug, trace: config.trace},
|
||||
lock: &sync.Mutex{}}, nil
|
||||
lock: &sync.Mutex{},
|
||||
transport: config.transport}, nil
|
||||
}
|
||||
|
||||
func GetDefaultClusterFromZKUrl(zkurl string) *Cluster {
|
||||
|
@ -525,13 +528,11 @@ func (r *realisClient) Close() {
|
|||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
|
||||
r.client.Transport.Close()
|
||||
r.readonlyClient.Transport.Close()
|
||||
r.adminClient.Transport.Close()
|
||||
r.transport.Close()
|
||||
}
|
||||
|
||||
// Uses predefined set of states to retrieve a set of active jobs in Apache Aurora.
|
||||
func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.ScheduleStatus]bool) (map[int32]bool, error) {
|
||||
func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states []aurora.ScheduleStatus) ([]int32, error) {
|
||||
taskQ := &aurora.TaskQuery{
|
||||
Role: &key.Role,
|
||||
Environment: &key.Environment,
|
||||
|
@ -542,7 +543,7 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche
|
|||
r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", taskQ)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.GetTasksWithoutConfigs(taskQ)
|
||||
return r.client.GetTasksWithoutConfigs(nil, taskQ)
|
||||
})
|
||||
|
||||
// If we encountered an error we couldn't recover from by retrying, return an error to the user
|
||||
|
@ -552,9 +553,9 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche
|
|||
|
||||
// Construct instance id map to stay in line with thrift's representation of sets
|
||||
tasks := response.ScheduleStatusResult(resp).GetTasks()
|
||||
jobInstanceIds := make(map[int32]bool)
|
||||
jobInstanceIds := make([]int32, 0, len(tasks))
|
||||
for _, task := range tasks {
|
||||
jobInstanceIds[task.GetAssignedTask().GetInstanceId()] = true
|
||||
jobInstanceIds = append(jobInstanceIds, task.GetAssignedTask().GetInstanceId())
|
||||
}
|
||||
return jobInstanceIds, nil
|
||||
|
||||
|
@ -565,7 +566,7 @@ func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue
|
|||
r.logger.DebugPrintf("GetJobUpdateSummaries Thrift Payload: %+v\n", jobUpdateQuery)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.readonlyClient.GetJobUpdateSummaries(jobUpdateQuery)
|
||||
return r.readonlyClient.GetJobUpdateSummaries(nil, jobUpdateQuery)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -580,7 +581,7 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe
|
|||
var result *aurora.GetJobsResult_
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.readonlyClient.GetJobs(role)
|
||||
return r.readonlyClient.GetJobs(nil, role)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -598,14 +599,8 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe
|
|||
func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) {
|
||||
r.logger.DebugPrintf("KillTasks Thrift Payload: %+v %v\n", key, instances)
|
||||
|
||||
instanceIds := make(map[int32]bool)
|
||||
|
||||
for _, instId := range instances {
|
||||
instanceIds[instId] = true
|
||||
}
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.KillTasks(key, instanceIds, "")
|
||||
return r.client.KillTasks(nil, key, instances, "")
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -625,7 +620,7 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
|
|||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
// Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards
|
||||
return r.client.KillTasks(key, nil, "")
|
||||
return r.client.KillTasks(nil, key, nil, "")
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -643,7 +638,7 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
|
|||
r.logger.DebugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig())
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.CreateJob(auroraJob.JobConfig())
|
||||
return r.client.CreateJob(nil, auroraJob.JobConfig())
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -674,7 +669,7 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
|
|||
r.logger.DebugPrintf("ScheduleCronJob Thrift Payload: %+v\n", auroraJob.JobConfig())
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.ScheduleCronJob(auroraJob.JobConfig())
|
||||
return r.client.ScheduleCronJob(nil, auroraJob.JobConfig())
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -688,7 +683,7 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response,
|
|||
r.logger.DebugPrintf("DescheduleCronJob Thrift Payload: %+v\n", key)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.DescheduleCronJob(key)
|
||||
return r.client.DescheduleCronJob(nil, key)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -704,7 +699,7 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error
|
|||
r.logger.DebugPrintf("StartCronJob Thrift Payload: %+v\n", key)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.StartCronJob(key)
|
||||
return r.client.StartCronJob(nil, key)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -718,14 +713,8 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error
|
|||
func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) {
|
||||
r.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instances)
|
||||
|
||||
instanceIds := make(map[int32]bool)
|
||||
|
||||
for _, instId := range instances {
|
||||
instanceIds[instId] = true
|
||||
}
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.RestartShards(key, instanceIds)
|
||||
return r.client.RestartShards(nil, key, instances)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -746,7 +735,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
|
|||
|
||||
if len(instanceIds) > 0 {
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.RestartShards(key, instanceIds)
|
||||
return r.client.RestartShards(nil, key, instanceIds)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -765,7 +754,7 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au
|
|||
r.logger.DebugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.StartJobUpdate(updateJob.req, message)
|
||||
return r.client.StartJobUpdate(nil, updateJob.req, message)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -782,7 +771,7 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str
|
|||
r.logger.DebugPrintf("AbortJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.AbortJobUpdate(&updateKey, message)
|
||||
return r.client.AbortJobUpdate(nil, &updateKey, message)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -802,7 +791,7 @@ func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st
|
|||
r.logger.DebugPrintf("PauseJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.PauseJobUpdate(updateKey, message)
|
||||
return r.client.PauseJobUpdate(nil, updateKey, message)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -818,7 +807,7 @@ func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s
|
|||
r.logger.DebugPrintf("ResumeJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.ResumeJobUpdate(updateKey, message)
|
||||
return r.client.ResumeJobUpdate(nil, updateKey, message)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -834,7 +823,7 @@ func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R
|
|||
r.logger.DebugPrintf("PulseJobUpdate Thrift Payload: %+v\n", updateKey)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.PulseJobUpdate(updateKey)
|
||||
return r.client.PulseJobUpdate(nil, updateKey)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -851,7 +840,7 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a
|
|||
r.logger.DebugPrintf("AddInstances Thrift Payload: %+v %v\n", instKey, count)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.AddInstances(&instKey, count)
|
||||
return r.client.AddInstances(nil, &instKey, count)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -861,35 +850,34 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a
|
|||
|
||||
}
|
||||
|
||||
//Scale down the number of instances under a job configuration using the configuration of a specific instance
|
||||
// Scale down the number of instances under a job configuration using the configuration of a specific instance
|
||||
func (r *realisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora.Response, error) {
|
||||
instanceIds, err := r.GetInstanceIds(key, aurora.ACTIVE_STATES)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "RemoveInstances: Could not retrieve relevant instance IDs")
|
||||
}
|
||||
|
||||
if len(instanceIds) < int(count) {
|
||||
return nil, errors.New(fmt.Sprintf("RemoveInstances: No sufficient instances to Kill - "+
|
||||
"Instances to kill %d Total Instances %d", count, len(instanceIds)))
|
||||
return nil, errors.Errorf("Insufficient active instances available for killing: "+
|
||||
" Instances to be killed %d Active instances %d", count, len(instanceIds))
|
||||
}
|
||||
instanceList := make([]int32, count)
|
||||
i := 0
|
||||
for k := range instanceIds {
|
||||
instanceList[i] = k
|
||||
i += 1
|
||||
if i == int(count) {
|
||||
break
|
||||
}
|
||||
}
|
||||
return r.KillInstances(key, instanceList...)
|
||||
|
||||
// Sort instanceIds in ** decreasing ** order
|
||||
sort.Slice(instanceIds, func(i, j int) bool {
|
||||
return instanceIds[i] > instanceIds[j]
|
||||
})
|
||||
|
||||
// Kill the instances with the highest ID number first
|
||||
return r.KillInstances(key, instanceIds[:count]...)
|
||||
}
|
||||
|
||||
// Get information about task including a fully hydrated task configuration object
|
||||
func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) {
|
||||
func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) {
|
||||
|
||||
r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", query)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.GetTasksStatus(query)
|
||||
return r.client.GetTasksStatus(nil, query)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -900,36 +888,34 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.S
|
|||
}
|
||||
|
||||
// Get pending reason
|
||||
func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) (pendingReasons []*aurora.PendingReason, e error) {
|
||||
func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.PendingReason, error) {
|
||||
|
||||
r.logger.DebugPrintf("GetPendingReason Thrift Payload: %+v\n", query)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.GetPendingReason(query)
|
||||
return r.client.GetPendingReason(nil, query)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "Error querying Aurora Scheduler for pending Reasons")
|
||||
}
|
||||
|
||||
var result map[*aurora.PendingReason]bool
|
||||
var pendingReasons []*aurora.PendingReason
|
||||
|
||||
if resp.GetResult_() != nil {
|
||||
result = resp.GetResult_().GetGetPendingReasonResult_().GetReasons()
|
||||
}
|
||||
for reason := range result {
|
||||
pendingReasons = append(pendingReasons, reason)
|
||||
pendingReasons = resp.GetResult_().GetGetPendingReasonResult_().GetReasons()
|
||||
}
|
||||
|
||||
return pendingReasons, nil
|
||||
}
|
||||
|
||||
// Get information about task including without a task configuration object
|
||||
func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) {
|
||||
func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) {
|
||||
|
||||
r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.GetTasksWithoutConfigs(query)
|
||||
return r.client.GetTasksWithoutConfigs(nil, query)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -942,22 +928,18 @@ func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks []
|
|||
|
||||
// Get the task configuration from the aurora scheduler for a job
|
||||
func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error) {
|
||||
|
||||
ids := make(map[int32]bool)
|
||||
|
||||
ids[instKey.InstanceId] = true
|
||||
taskQ := &aurora.TaskQuery{
|
||||
Role: &instKey.JobKey.Role,
|
||||
Environment: &instKey.JobKey.Environment,
|
||||
JobName: &instKey.JobKey.Name,
|
||||
InstanceIds: ids,
|
||||
InstanceIds: []int32{instKey.InstanceId},
|
||||
Statuses: aurora.ACTIVE_STATES,
|
||||
}
|
||||
|
||||
r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", taskQ)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.GetTasksStatus(taskQ)
|
||||
return r.client.GetTasksStatus(nil, taskQ)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -983,7 +965,7 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur
|
|||
r.logger.DebugPrintf("GetJobUpdateDetails Thrift Payload: %+v\n", updateQuery)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.GetJobUpdateDetails(&updateQuery)
|
||||
return r.client.GetJobUpdateDetails(nil, &updateQuery)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -998,7 +980,7 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string
|
|||
r.logger.DebugPrintf("RollbackJobUpdate Thrift Payload: %+v %v\n", key, message)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.RollbackJobUpdate(&key, message)
|
||||
return r.client.RollbackJobUpdate(nil, &key, message)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -1023,15 +1005,12 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr
|
|||
}
|
||||
|
||||
drainList := aurora.NewHosts()
|
||||
drainList.HostNames = make(map[string]bool)
|
||||
for _, host := range hosts {
|
||||
drainList.HostNames[host] = true
|
||||
}
|
||||
drainList.HostNames = hosts
|
||||
|
||||
r.logger.DebugPrintf("DrainHosts Thrift Payload: %v\n", drainList)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.adminClient.DrainHosts(drainList)
|
||||
return r.adminClient.DrainHosts(nil, drainList)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -1056,15 +1035,12 @@ func (r *realisClient) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, ho
|
|||
}
|
||||
|
||||
drainList := aurora.NewHosts()
|
||||
drainList.HostNames = make(map[string]bool)
|
||||
for _, host := range hosts {
|
||||
drainList.HostNames[host] = true
|
||||
}
|
||||
drainList.HostNames = hosts
|
||||
|
||||
r.logger.DebugPrintf("SLADrainHosts Thrift Payload: %v\n", drainList)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.adminClient.SlaDrainHosts(drainList, policy, timeout)
|
||||
return r.adminClient.SlaDrainHosts(nil, drainList, policy, timeout)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -1087,15 +1063,12 @@ func (r *realisClient) StartMaintenance(hosts ...string) (*aurora.Response, *aur
|
|||
}
|
||||
|
||||
hostList := aurora.NewHosts()
|
||||
hostList.HostNames = make(map[string]bool)
|
||||
for _, host := range hosts {
|
||||
hostList.HostNames[host] = true
|
||||
}
|
||||
hostList.HostNames = hosts
|
||||
|
||||
r.logger.DebugPrintf("StartMaintenance Thrift Payload: %v\n", hostList)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.adminClient.StartMaintenance(hostList)
|
||||
return r.adminClient.StartMaintenance(nil, hostList)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -1118,15 +1091,12 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror
|
|||
}
|
||||
|
||||
hostList := aurora.NewHosts()
|
||||
hostList.HostNames = make(map[string]bool)
|
||||
for _, host := range hosts {
|
||||
hostList.HostNames[host] = true
|
||||
}
|
||||
hostList.HostNames = hosts
|
||||
|
||||
r.logger.DebugPrintf("EndMaintenance Thrift Payload: %v\n", hostList)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.adminClient.EndMaintenance(hostList)
|
||||
return r.adminClient.EndMaintenance(nil, hostList)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -1149,17 +1119,14 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au
|
|||
}
|
||||
|
||||
hostList := aurora.NewHosts()
|
||||
hostList.HostNames = make(map[string]bool)
|
||||
for _, host := range hosts {
|
||||
hostList.HostNames[host] = true
|
||||
}
|
||||
hostList.HostNames = hosts
|
||||
|
||||
r.logger.DebugPrintf("MaintenanceStatus Thrift Payload: %v\n", hostList)
|
||||
|
||||
// Make thrift call. If we encounter an error sending the call, attempt to reconnect
|
||||
// and continue trying to resend command until we run out of retries.
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.adminClient.MaintenanceStatus(hostList)
|
||||
return r.adminClient.MaintenanceStatus(nil, hostList)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -1176,19 +1143,16 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au
|
|||
// SetQuota sets a quota aggregate for the given role
|
||||
// TODO(zircote) Currently investigating an error that is returned from thrift calls that include resources for `NamedPort` and `NumGpu`
|
||||
func (r *realisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb *int64) (*aurora.Response, error) {
|
||||
ram := aurora.NewResource()
|
||||
ram.RamMb = ramMb
|
||||
c := aurora.NewResource()
|
||||
c.NumCpus = cpu
|
||||
d := aurora.NewResource()
|
||||
d.DiskMb = diskMb
|
||||
ramRes := aurora.NewResource()
|
||||
ramRes.RamMb = ramMb
|
||||
cpuRes := aurora.NewResource()
|
||||
cpuRes.NumCpus = cpu
|
||||
diskRes := aurora.NewResource()
|
||||
diskRes.DiskMb = diskMb
|
||||
quota := aurora.NewResourceAggregate()
|
||||
quota.Resources = make(map[*aurora.Resource]bool)
|
||||
quota.Resources[ram] = true
|
||||
quota.Resources[c] = true
|
||||
quota.Resources[d] = true
|
||||
quota.Resources = []*aurora.Resource{cpuRes, ramRes, diskRes}
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.adminClient.SetQuota(role, quota)
|
||||
return r.adminClient.SetQuota(nil, role, quota)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -1202,7 +1166,7 @@ func (r *realisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb
|
|||
func (r *realisClient) GetQuota(role string) (*aurora.Response, error) {
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.adminClient.GetQuota(role)
|
||||
return r.adminClient.GetQuota(nil, role)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -1215,7 +1179,7 @@ func (r *realisClient) GetQuota(role string) (*aurora.Response, error) {
|
|||
func (r *realisClient) Snapshot() error {
|
||||
|
||||
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.adminClient.Snapshot()
|
||||
return r.adminClient.Snapshot(nil)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -1229,7 +1193,7 @@ func (r *realisClient) Snapshot() error {
|
|||
func (r *realisClient) PerformBackup() error {
|
||||
|
||||
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.adminClient.PerformBackup()
|
||||
return r.adminClient.PerformBackup(nil)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -1242,7 +1206,7 @@ func (r *realisClient) PerformBackup() error {
|
|||
func (r *realisClient) ForceImplicitTaskReconciliation() error {
|
||||
|
||||
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.adminClient.TriggerImplicitTaskReconciliation()
|
||||
return r.adminClient.TriggerImplicitTaskReconciliation(nil)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
@ -1262,7 +1226,7 @@ func (r *realisClient) ForceExplicitTaskReconciliation(batchSize *int32) error {
|
|||
settings.BatchSize = batchSize
|
||||
|
||||
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.adminClient.TriggerExplicitTaskReconciliation(settings)
|
||||
return r.adminClient.TriggerExplicitTaskReconciliation(nil, settings)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
|
|
@ -22,7 +22,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"git.apache.org/thrift.git/lib/go/thrift"
|
||||
"github.com/apache/thrift/lib/go/thrift"
|
||||
realis "github.com/paypal/gorealis"
|
||||
"github.com/paypal/gorealis/gen-go/apache/aurora"
|
||||
"github.com/paypal/gorealis/response"
|
||||
|
|
2
retry.go
2
retry.go
|
@ -20,7 +20,7 @@ import (
|
|||
"net/url"
|
||||
"time"
|
||||
|
||||
"git.apache.org/thrift.git/lib/go/thrift"
|
||||
"github.com/apache/thrift/lib/go/thrift"
|
||||
"github.com/paypal/gorealis/gen-go/apache/aurora"
|
||||
"github.com/paypal/gorealis/response"
|
||||
"github.com/pkg/errors"
|
||||
|
|
13
updatejob.go
13
updatejob.go
|
@ -35,7 +35,7 @@ func NewDefaultUpdateJob(config *aurora.TaskConfig) *UpdateJob {
|
|||
job.jobConfig.TaskConfig = config
|
||||
|
||||
// Rebuild resource map from TaskConfig
|
||||
for ptr := range config.Resources {
|
||||
for _, ptr := range config.Resources {
|
||||
if ptr.NumCpus != nil {
|
||||
job.resources[CPU].NumCpus = ptr.NumCpus
|
||||
continue // Guard against Union violations that Go won't enforce
|
||||
|
@ -50,10 +50,15 @@ func NewDefaultUpdateJob(config *aurora.TaskConfig) *UpdateJob {
|
|||
job.resources[DISK].DiskMb = ptr.DiskMb
|
||||
continue
|
||||
}
|
||||
|
||||
if ptr.NumGpus != nil {
|
||||
job.resources[GPU] = &aurora.Resource{NumGpus: ptr.NumGpus}
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Mirrors defaults set by Pystachio
|
||||
req.Settings.UpdateOnlyTheseInstances = make(map[*aurora.Range]bool)
|
||||
req.Settings.UpdateOnlyTheseInstances = make([]*aurora.Range,0)
|
||||
req.Settings.UpdateGroupSize = 1
|
||||
req.Settings.WaitForBatchCompletion = false
|
||||
req.Settings.MinWaitInInstanceRunningMs = 45000
|
||||
|
@ -75,7 +80,7 @@ func NewUpdateJob(config *aurora.TaskConfig, settings *aurora.JobUpdateSettings)
|
|||
job.jobConfig.TaskConfig = config
|
||||
|
||||
// Rebuild resource map from TaskConfig
|
||||
for ptr := range config.Resources {
|
||||
for _, ptr := range config.Resources {
|
||||
if ptr.NumCpus != nil {
|
||||
job.resources[CPU].NumCpus = ptr.NumCpus
|
||||
continue // Guard against Union violations that Go won't enforce
|
||||
|
@ -147,7 +152,7 @@ func NewUpdateSettings() *aurora.JobUpdateSettings {
|
|||
|
||||
us := new(aurora.JobUpdateSettings)
|
||||
// Mirrors defaults set by Pystachio
|
||||
us.UpdateOnlyTheseInstances = make(map[*aurora.Range]bool)
|
||||
us.UpdateOnlyTheseInstances = make([]*aurora.Range, 0)
|
||||
us.UpdateGroupSize = 1
|
||||
us.WaitForBatchCompletion = false
|
||||
us.MinWaitInInstanceRunningMs = 45000
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue