Simplifying API. Many API calls have gone from a tuple of two returns to a single return.
This commit is contained in:
parent
573e45a59c
commit
d747a48626
3 changed files with 149 additions and 166 deletions
234
realis.go
234
realis.go
|
@ -179,7 +179,7 @@ func newTJSONTransport(url string, timeout time.Duration, config *RealisConfig)
|
|||
}
|
||||
httpTrans := (trans).(*thrift.THttpClient)
|
||||
httpTrans.SetHeader("Content-Type", "application/x-thrift")
|
||||
httpTrans.SetHeader("User-Agent", "GoRealis v"+VERSION)
|
||||
httpTrans.SetHeader("User-Agent", "gorealis v"+VERSION)
|
||||
return trans, err
|
||||
}
|
||||
|
||||
|
@ -192,7 +192,7 @@ func newTBinTransport(url string, timeout time.Duration, config *RealisConfig) (
|
|||
httpTrans.DelHeader("Content-Type") // Workaround for using thrift HttpPostClient
|
||||
httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary")
|
||||
httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary")
|
||||
httpTrans.SetHeader("User-Agent", "GoRealis v"+VERSION)
|
||||
httpTrans.SetHeader("User-Agent", "gorealis v"+VERSION)
|
||||
|
||||
return trans, err
|
||||
}
|
||||
|
@ -299,17 +299,6 @@ func NewRealisClient(options ...ClientOption) (*RealisClient, error) {
|
|||
lock: &sync.Mutex{}}, nil
|
||||
}
|
||||
|
||||
func GetDefaultClusterFromZKUrl(zkurl string) *Cluster {
|
||||
return &Cluster{
|
||||
Name: "defaultCluster",
|
||||
AuthMechanism: "UNAUTHENTICATED",
|
||||
ZK: zkurl,
|
||||
SchedZKPath: "/aurora/scheduler",
|
||||
AgentRunDir: "latest",
|
||||
AgentRoot: "/var/lib/mesos",
|
||||
}
|
||||
}
|
||||
|
||||
func GetCerts(certPath string) (*x509.CertPool, error) {
|
||||
globalRootCAs := x509.NewCertPool()
|
||||
caFiles, err := ioutil.ReadDir(certPath)
|
||||
|
@ -344,7 +333,7 @@ func defaultTTransport(url string, timeout time.Duration, config *RealisConfig)
|
|||
if config.certsPath != "" {
|
||||
rootCAs, err := GetCerts(config.certsPath)
|
||||
if err != nil {
|
||||
config.logger.Println("error occured couldn't fetch certs")
|
||||
config.logger.Println("error occurred couldn't fetch certs")
|
||||
return nil, err
|
||||
}
|
||||
tlsConfig.RootCAs = rootCAs
|
||||
|
@ -358,7 +347,7 @@ func defaultTTransport(url string, timeout time.Duration, config *RealisConfig)
|
|||
if config.clientKey != "" && config.clientCert != "" {
|
||||
cert, err := tls.LoadX509KeyPair(config.clientCert, config.clientKey)
|
||||
if err != nil {
|
||||
config.logger.Println("error occured loading client certs and keys")
|
||||
config.logger.Println("error occurred loading client certs and keys")
|
||||
return nil, err
|
||||
}
|
||||
tlsConfig.Certificates = []tls.Certificate{cert}
|
||||
|
@ -490,7 +479,7 @@ func (r *RealisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche
|
|||
|
||||
}
|
||||
|
||||
func (r *RealisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error) {
|
||||
func (r *RealisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.GetJobUpdateSummariesResult_, error) {
|
||||
r.logger.DebugPrintf("GetJobUpdateSummaries Thrift Payload: %+v\n", jobUpdateQuery)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
|
@ -501,10 +490,10 @@ func (r *RealisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue
|
|||
return nil, errors.Wrap(retryErr, "Error getting job update summaries from Aurora Scheduler")
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
return resp.GetResult_().GetGetJobUpdateSummariesResult_(), nil
|
||||
}
|
||||
|
||||
func (r *RealisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsResult_, error) {
|
||||
func (r *RealisClient) GetJobs(role string) (*aurora.GetJobsResult_, error) {
|
||||
|
||||
var result *aurora.GetJobsResult_
|
||||
|
||||
|
@ -513,18 +502,19 @@ func (r *RealisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe
|
|||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, result, errors.Wrap(retryErr, "Error getting Jobs from Aurora Scheduler")
|
||||
return result, errors.Wrap(retryErr, "Error getting Jobs from Aurora Scheduler")
|
||||
}
|
||||
|
||||
if resp.GetResult_() != nil {
|
||||
result = resp.GetResult_().GetJobsResult_
|
||||
}
|
||||
|
||||
return resp, result, nil
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// Kill specific instances of a job.
|
||||
func (r *RealisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) {
|
||||
// Kill specific instances of a job. Returns true, nil if a task was actually killed as a result of this API call.
|
||||
// Returns false, nil if no tasks were killed as a result of this call but there was no error making the call.
|
||||
func (r *RealisClient) KillInstances(key *aurora.JobKey, instances ...int32) (bool, error) {
|
||||
r.logger.DebugPrintf("KillTasks Thrift Payload: %+v %v\n", key, instances)
|
||||
|
||||
instanceIds := make(map[int32]bool)
|
||||
|
@ -538,9 +528,16 @@ func (r *RealisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a
|
|||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "Error sending Kill command to Aurora Scheduler")
|
||||
return false, errors.Wrap(retryErr, "Error sending Kill command to Aurora Scheduler")
|
||||
}
|
||||
return resp, nil
|
||||
|
||||
if len(resp.GetDetails()) > 0 {
|
||||
r.logger.Println("KillTasks was called but no tasks killed as a result.")
|
||||
return false, nil
|
||||
} else {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (r *RealisClient) RealisConfig() *RealisConfig {
|
||||
|
@ -548,69 +545,69 @@ func (r *RealisClient) RealisConfig() *RealisConfig {
|
|||
}
|
||||
|
||||
// Sends a kill message to the scheduler for all active tasks under a job.
|
||||
func (r *RealisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
|
||||
func (r *RealisClient) KillJob(key *aurora.JobKey) error {
|
||||
|
||||
r.logger.DebugPrintf("KillTasks Thrift Payload: %+v\n", key)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
// Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards
|
||||
return r.client.KillTasks(key, nil, "")
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "Error sending Kill command to Aurora Scheduler")
|
||||
return errors.Wrap(retryErr, "Error sending Kill command to Aurora Scheduler")
|
||||
}
|
||||
return resp, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// Sends a create job message to the scheduler with a specific job configuration.
|
||||
// Although this API is able to create service jobs, it is better to use CreateService instead
|
||||
// as that API uses the update thrift call which has a few extra features available.
|
||||
// Use this API to create ad-hoc jobs.
|
||||
func (r *RealisClient) CreateJob(auroraJob *AuroraJob) (*aurora.Response, error) {
|
||||
func (r *RealisClient) CreateJob(auroraJob *AuroraJob) error {
|
||||
|
||||
r.logger.DebugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig())
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.CreateJob(auroraJob.JobConfig())
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return resp, errors.Wrap(retryErr, "Error sending Create command to Aurora Scheduler")
|
||||
return errors.Wrap(retryErr, "Error sending Create command to Aurora Scheduler")
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// This API uses an update thrift call to create the services giving a few more robust features.
|
||||
func (r *RealisClient) CreateService(auroraJob *AuroraJob, settings *aurora.JobUpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error) {
|
||||
func (r *RealisClient) CreateService(auroraJob *AuroraJob, settings *aurora.JobUpdateSettings) (*aurora.StartJobUpdateResult_, error) {
|
||||
// Create a new job update object and ship it to the StartJobUpdate api
|
||||
update := NewUpdateJob(auroraJob.TaskConfig(), settings)
|
||||
update.InstanceCount(auroraJob.GetInstanceCount())
|
||||
|
||||
resp, err := r.StartJobUpdate(update, "")
|
||||
updateResult, err := r.StartJobUpdate(update, "")
|
||||
if err != nil {
|
||||
return resp, nil, errors.Wrap(err, "unable to create service")
|
||||
return nil, errors.Wrap(err, "unable to create service")
|
||||
}
|
||||
|
||||
if resp.GetResult_() != nil {
|
||||
return resp, resp.GetResult_().GetStartJobUpdateResult_(), nil
|
||||
if updateResult != nil {
|
||||
return updateResult, nil
|
||||
}
|
||||
|
||||
return nil, nil, errors.New("results object is nil")
|
||||
return nil, errors.New("results object is nil")
|
||||
}
|
||||
|
||||
func (r *RealisClient) ScheduleCronJob(auroraJob *AuroraJob) (*aurora.Response, error) {
|
||||
func (r *RealisClient) ScheduleCronJob(auroraJob *AuroraJob) error {
|
||||
r.logger.DebugPrintf("ScheduleCronJob Thrift Payload: %+v\n", auroraJob.JobConfig())
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.ScheduleCronJob(auroraJob.JobConfig())
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "Error sending Cron AuroraJob Schedule message to Aurora Scheduler")
|
||||
return errors.Wrap(retryErr, "Error sending Cron AuroraJob Schedule message to Aurora Scheduler")
|
||||
}
|
||||
return resp, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *RealisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) {
|
||||
|
@ -645,7 +642,7 @@ func (r *RealisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error
|
|||
}
|
||||
|
||||
// Restarts specific instances specified
|
||||
func (r *RealisClient) RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) {
|
||||
func (r *RealisClient) RestartInstances(key *aurora.JobKey, instances ...int32) error {
|
||||
r.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instances)
|
||||
|
||||
instanceIds := make(map[int32]bool)
|
||||
|
@ -654,43 +651,43 @@ func (r *RealisClient) RestartInstances(key *aurora.JobKey, instances ...int32)
|
|||
instanceIds[instId] = true
|
||||
}
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.RestartShards(key, instanceIds)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "Error sending Restart command to Aurora Scheduler")
|
||||
return errors.Wrap(retryErr, "Error sending Restart command to Aurora Scheduler")
|
||||
}
|
||||
return resp, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// Restarts all active tasks under a job configuration.
|
||||
func (r *RealisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) {
|
||||
func (r *RealisClient) RestartJob(key *aurora.JobKey) error {
|
||||
|
||||
instanceIds, err1 := r.GetInstanceIds(key, aurora.ACTIVE_STATES)
|
||||
if err1 != nil {
|
||||
return nil, errors.Wrap(err1, "Could not retrieve relevant task instance IDs")
|
||||
return errors.Wrap(err1, "Could not retrieve relevant task instance IDs")
|
||||
}
|
||||
|
||||
r.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instanceIds)
|
||||
|
||||
if len(instanceIds) > 0 {
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.RestartShards(key, instanceIds)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "Error sending Restart command to Aurora Scheduler")
|
||||
return errors.Wrap(retryErr, "Error sending Restart command to Aurora Scheduler")
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
return nil
|
||||
} else {
|
||||
return nil, errors.New("No tasks in the Active state")
|
||||
return errors.New("No tasks in the Active state")
|
||||
}
|
||||
}
|
||||
|
||||
// Update all tasks under a job configuration. Currently gorealis doesn't support for canary deployments.
|
||||
func (r *RealisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) {
|
||||
func (r *RealisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.StartJobUpdateResult_, error) {
|
||||
|
||||
r.logger.DebugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message)
|
||||
|
||||
|
@ -699,60 +696,60 @@ func (r *RealisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au
|
|||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return resp, errors.Wrap(retryErr, "Error sending StartJobUpdate command to Aurora Scheduler")
|
||||
return nil, errors.Wrap(retryErr, "Error sending StartJobUpdate command to Aurora Scheduler")
|
||||
}
|
||||
return resp, nil
|
||||
return resp.GetResult_().GetStartJobUpdateResult_(), nil
|
||||
}
|
||||
|
||||
// Abort AuroraJob Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI.
|
||||
func (r *RealisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) {
|
||||
func (r *RealisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) error {
|
||||
|
||||
r.logger.DebugPrintf("AbortJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.AbortJobUpdate(&updateKey, message)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "Error sending AbortJobUpdate command to Aurora Scheduler")
|
||||
return errors.Wrap(retryErr, "Error sending AbortJobUpdate command to Aurora Scheduler")
|
||||
}
|
||||
return resp, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
//Pause AuroraJob Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.
|
||||
func (r *RealisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) {
|
||||
// Pause AuroraJob Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.
|
||||
func (r *RealisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) error {
|
||||
|
||||
r.logger.DebugPrintf("PauseJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.PauseJobUpdate(updateKey, message)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "Error sending PauseJobUpdate command to Aurora Scheduler")
|
||||
return errors.Wrap(retryErr, "Error sending PauseJobUpdate command to Aurora Scheduler")
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
//Resume Paused AuroraJob Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.
|
||||
func (r *RealisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) {
|
||||
// Resume Paused AuroraJob Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.
|
||||
func (r *RealisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message string) error {
|
||||
|
||||
r.logger.DebugPrintf("ResumeJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.ResumeJobUpdate(updateKey, message)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "Error sending ResumeJobUpdate command to Aurora Scheduler")
|
||||
return errors.Wrap(retryErr, "Error sending ResumeJobUpdate command to Aurora Scheduler")
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
//Pulse AuroraJob Update on Aurora. UpdateID is returned from StartJobUpdate or the Aurora web UI.
|
||||
func (r *RealisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.Response, error) {
|
||||
// Pulse AuroraJob Update on Aurora. UpdateID is returned from StartJobUpdate or the Aurora web UI.
|
||||
func (r *RealisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.PulseJobUpdateResult_, error) {
|
||||
|
||||
r.logger.DebugPrintf("PulseJobUpdate Thrift Payload: %+v\n", updateKey)
|
||||
|
||||
|
@ -764,35 +761,35 @@ func (r *RealisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R
|
|||
return nil, errors.Wrap(retryErr, "Error sending PulseJobUpdate command to Aurora Scheduler")
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
return resp.GetResult_().GetPulseJobUpdateResult_(), nil
|
||||
}
|
||||
|
||||
// Scale up the number of instances under a job configuration using the configuration for specific
|
||||
// instance to scale up.
|
||||
func (r *RealisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) {
|
||||
func (r *RealisClient) AddInstances(instKey aurora.InstanceKey, count int32) error {
|
||||
|
||||
r.logger.DebugPrintf("AddInstances Thrift Payload: %+v %v\n", instKey, count)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.AddInstances(&instKey, count)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "Error sending AddInstances command to Aurora Scheduler")
|
||||
return errors.Wrap(retryErr, "Error sending AddInstances command to Aurora Scheduler")
|
||||
}
|
||||
return resp, nil
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
//Scale down the number of instances under a job configuration using the configuration of a specific instance
|
||||
func (r *RealisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora.Response, error) {
|
||||
// Scale down the number of instances under a job configuration using the configuration of a specific instance
|
||||
func (r *RealisClient) RemoveInstances(key *aurora.JobKey, count int32) error {
|
||||
instanceIds, err := r.GetInstanceIds(key, aurora.ACTIVE_STATES)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "RemoveInstances: Could not retrieve relevant instance IDs")
|
||||
return errors.Wrap(err, "RemoveInstances: Could not retrieve relevant instance IDs")
|
||||
}
|
||||
if len(instanceIds) < int(count) {
|
||||
return nil, errors.New(fmt.Sprintf("RemoveInstances: No sufficient instances to Kill - "+
|
||||
"Instances to kill %d Total Instances %d", count, len(instanceIds)))
|
||||
return errors.Errorf("Insufficient acative available for killing "+
|
||||
" Instances to be killed %d Active instances %d", count, len(instanceIds))
|
||||
}
|
||||
instanceList := make([]int32, count)
|
||||
i := 0
|
||||
|
@ -803,11 +800,17 @@ func (r *RealisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora
|
|||
break
|
||||
}
|
||||
}
|
||||
return r.KillInstances(key, instanceList...)
|
||||
killed, err := r.KillInstances(key, instanceList...)
|
||||
|
||||
if !killed {
|
||||
return errors.New("Flex down was not able to reduce the number of instances running.")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get information about task including a fully hydrated task configuration object
|
||||
func (r *RealisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) {
|
||||
func (r *RealisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) {
|
||||
|
||||
r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", query)
|
||||
|
||||
|
@ -823,7 +826,7 @@ func (r *RealisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.S
|
|||
}
|
||||
|
||||
// Get pending reason
|
||||
func (r *RealisClient) GetPendingReason(query *aurora.TaskQuery) (pendingReasons []*aurora.PendingReason, e error) {
|
||||
func (r *RealisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.PendingReason, error) {
|
||||
|
||||
r.logger.DebugPrintf("GetPendingReason Thrift Payload: %+v\n", query)
|
||||
|
||||
|
@ -840,6 +843,8 @@ func (r *RealisClient) GetPendingReason(query *aurora.TaskQuery) (pendingReasons
|
|||
if resp.GetResult_() != nil {
|
||||
result = resp.GetResult_().GetGetPendingReasonResult_().GetReasons()
|
||||
}
|
||||
|
||||
var pendingReasons []*aurora.PendingReason
|
||||
for reason := range result {
|
||||
pendingReasons = append(pendingReasons, reason)
|
||||
}
|
||||
|
@ -847,7 +852,7 @@ func (r *RealisClient) GetPendingReason(query *aurora.TaskQuery) (pendingReasons
|
|||
}
|
||||
|
||||
// Get information about task including without a task configuration object
|
||||
func (r *RealisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) {
|
||||
func (r *RealisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) {
|
||||
|
||||
r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query)
|
||||
|
||||
|
@ -901,7 +906,7 @@ func (r *RealisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
|
|||
return tasks[0].AssignedTask.Task, nil
|
||||
}
|
||||
|
||||
func (r *RealisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aurora.Response, error) {
|
||||
func (r *RealisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aurora.GetJobUpdateDetailsResult_, error) {
|
||||
|
||||
r.logger.DebugPrintf("GetJobUpdateDetails Thrift Payload: %+v\n", updateQuery)
|
||||
|
||||
|
@ -912,22 +917,21 @@ func (r *RealisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur
|
|||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "Unable to get job update details")
|
||||
}
|
||||
return resp, nil
|
||||
|
||||
return resp.GetResult_().GetGetJobUpdateDetailsResult_(), nil
|
||||
}
|
||||
|
||||
func (r *RealisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) {
|
||||
func (r *RealisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string) error {
|
||||
|
||||
r.logger.DebugPrintf("RollbackJobUpdate Thrift Payload: %+v %v\n", key, message)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.RollbackJobUpdate(&key, message)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "Unable to roll back job update")
|
||||
return errors.Wrap(retryErr, "Unable to roll back job update")
|
||||
}
|
||||
return resp, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
/* Admin functions */
|
||||
|
@ -937,12 +941,12 @@ func (r *RealisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string
|
|||
// Set a list of nodes to DRAINING. This means nothing will be able to be scheduled on them and any existing
|
||||
// tasks will be killed and re-scheduled elsewhere in the cluster. Tasks from DRAINING nodes are not guaranteed
|
||||
// to return to running unless there is enough capacity in the cluster to run them.
|
||||
func (r *RealisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.DrainHostsResult_, error) {
|
||||
func (r *RealisClient) DrainHosts(hosts ...string) (*aurora.DrainHostsResult_, error) {
|
||||
|
||||
var result *aurora.DrainHostsResult_
|
||||
|
||||
if len(hosts) == 0 {
|
||||
return nil, nil, errors.New("no hosts provided to drain")
|
||||
return nil, errors.New("no hosts provided to drain")
|
||||
}
|
||||
|
||||
drainList := aurora.NewHosts()
|
||||
|
@ -958,14 +962,14 @@ func (r *RealisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr
|
|||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
|
||||
return result, errors.Wrap(retryErr, "Unable to recover connection")
|
||||
}
|
||||
|
||||
if resp.GetResult_() != nil {
|
||||
result = resp.GetResult_().GetDrainHostsResult_()
|
||||
}
|
||||
|
||||
return resp, result, nil
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// Start SLA Aware Drain.
|
||||
|
@ -1001,12 +1005,12 @@ func (r *RealisClient) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, ho
|
|||
return result, nil
|
||||
}
|
||||
|
||||
func (r *RealisClient) StartMaintenance(hosts ...string) (*aurora.Response, *aurora.StartMaintenanceResult_, error) {
|
||||
func (r *RealisClient) StartMaintenance(hosts ...string) (*aurora.StartMaintenanceResult_, error) {
|
||||
|
||||
var result *aurora.StartMaintenanceResult_
|
||||
|
||||
if len(hosts) == 0 {
|
||||
return nil, nil, errors.New("no hosts provided to start maintenance on")
|
||||
return nil, errors.New("no hosts provided to start maintenance on")
|
||||
}
|
||||
|
||||
hostList := aurora.NewHosts()
|
||||
|
@ -1022,22 +1026,22 @@ func (r *RealisClient) StartMaintenance(hosts ...string) (*aurora.Response, *aur
|
|||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
|
||||
return result, errors.Wrap(retryErr, "Unable to recover connection")
|
||||
}
|
||||
|
||||
if resp.GetResult_() != nil {
|
||||
result = resp.GetResult_().GetStartMaintenanceResult_()
|
||||
}
|
||||
|
||||
return resp, result, nil
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (r *RealisClient) EndMaintenance(hosts ...string) (*aurora.Response, *aurora.EndMaintenanceResult_, error) {
|
||||
func (r *RealisClient) EndMaintenance(hosts ...string) (*aurora.EndMaintenanceResult_, error) {
|
||||
|
||||
var result *aurora.EndMaintenanceResult_
|
||||
|
||||
if len(hosts) == 0 {
|
||||
return nil, nil, errors.New("no hosts provided to end maintenance on")
|
||||
return nil, errors.New("no hosts provided to end maintenance on")
|
||||
}
|
||||
|
||||
hostList := aurora.NewHosts()
|
||||
|
@ -1053,22 +1057,22 @@ func (r *RealisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror
|
|||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
|
||||
return result, errors.Wrap(retryErr, "Unable to recover connection")
|
||||
}
|
||||
|
||||
if resp.GetResult_() != nil {
|
||||
result = resp.GetResult_().GetEndMaintenanceResult_()
|
||||
}
|
||||
|
||||
return resp, result, nil
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (r *RealisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *aurora.MaintenanceStatusResult_, error) {
|
||||
func (r *RealisClient) MaintenanceStatus(hosts ...string) (*aurora.MaintenanceStatusResult_, error) {
|
||||
|
||||
var result *aurora.MaintenanceStatusResult_
|
||||
|
||||
if len(hosts) == 0 {
|
||||
return nil, nil, errors.New("no hosts provided to get maintenance status from")
|
||||
return nil, errors.New("no hosts provided to get maintenance status from")
|
||||
}
|
||||
|
||||
hostList := aurora.NewHosts()
|
||||
|
@ -1086,19 +1090,19 @@ func (r *RealisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au
|
|||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
|
||||
return result, errors.Wrap(retryErr, "Unable to recover connection")
|
||||
}
|
||||
|
||||
if resp.GetResult_() != nil {
|
||||
result = resp.GetResult_().GetMaintenanceStatusResult_()
|
||||
}
|
||||
|
||||
return resp, result, nil
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// SetQuota sets a quota aggregate for the given role
|
||||
// TODO(zircote) Currently investigating an error that is returned from thrift calls that include resources for `NamedPort` and `NumGpu`
|
||||
func (r *RealisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb *int64) (*aurora.Response, error) {
|
||||
func (r *RealisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb *int64) error {
|
||||
ram := aurora.NewResource()
|
||||
ram.RamMb = ramMb
|
||||
c := aurora.NewResource()
|
||||
|
@ -1110,28 +1114,28 @@ func (r *RealisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb
|
|||
quota.Resources[ram] = true
|
||||
quota.Resources[c] = true
|
||||
quota.Resources[d] = true
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.adminClient.SetQuota(role, quota)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return resp, errors.Wrap(retryErr, "Unable to set role quota")
|
||||
return errors.Wrap(retryErr, "Unable to set role quota")
|
||||
}
|
||||
return resp, retryErr
|
||||
return retryErr
|
||||
|
||||
}
|
||||
|
||||
// GetQuota returns the resource aggregate for the given role
|
||||
func (r *RealisClient) GetQuota(role string) (*aurora.Response, error) {
|
||||
func (r *RealisClient) GetQuota(role string) (*aurora.GetQuotaResult_, error) {
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.adminClient.GetQuota(role)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return resp, errors.Wrap(retryErr, "Unable to get role quota")
|
||||
return nil, errors.Wrap(retryErr, "Unable to get role quota")
|
||||
}
|
||||
return resp, retryErr
|
||||
return resp.GetResult_().GetGetQuotaResult_(), retryErr
|
||||
}
|
||||
|
||||
// Force Aurora Scheduler to perform a snapshot and write to Mesos log
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue