From d747a486260d2a6493a603ecdbfc469b4eb6089e Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Thu, 8 Nov 2018 18:09:14 -0800 Subject: [PATCH] Simplifying API. Many API calls have gone from a tuple of two returns to a single return. --- examples/client.go | 68 +++++-------- monitors.go | 13 +-- realis.go | 234 +++++++++++++++++++++++---------------------- 3 files changed, 149 insertions(+), 166 deletions(-) diff --git a/examples/client.go b/examples/client.go index ca876c4..8d711e8 100644 --- a/examples/client.go +++ b/examples/client.go @@ -24,7 +24,6 @@ import ( "github.com/paypal/gorealis" "github.com/paypal/gorealis/gen-go/apache/aurora" - "github.com/paypal/gorealis/response" ) var cmd, executor, url, clustersConfig, clusterName, updateId, username, password, zkUrl, hostList, role string @@ -167,14 +166,13 @@ func main() { switch cmd { case "create": fmt.Println("Creating job") - resp, err := r.CreateJob(job) + err := r.CreateJob(job) if err != nil { log.Fatalln(err) } - fmt.Println(resp.String()) if ok, mErr := monitor.Instances(job.JobKey(), job.GetInstanceCount(), 5, 50); !ok || mErr != nil { - _, err := r.KillJob(job.JobKey()) + err := r.KillJob(job.JobKey()) if err != nil { log.Fatalln(err) } @@ -186,16 +184,15 @@ func main() { fmt.Println("Creating service") settings := realis.NewUpdateSettings() job.InstanceCount(3) - resp, result, err := r.CreateService(job, settings) + result, err := r.CreateService(job, settings) if err != nil { - log.Println("error: ", err) - log.Fatal("response: ", resp.String()) + log.Fatal("error: ", err) } fmt.Println(result.String()) if ok, mErr := monitor.JobUpdate(*result.GetKey(), 5, 180); !ok || mErr != nil { - _, err := r.AbortJobUpdate(*result.GetKey(), "Monitor timed out") - _, err = r.KillJob(job.JobKey()) + err := r.AbortJobUpdate(*result.GetKey(), "Monitor timed out") + err = r.KillJob(job.JobKey()) if err != nil { log.Fatal(err) } @@ -206,14 +203,13 @@ func main() { fmt.Println("Creating a docker based job") container := realis.NewDockerContainer().Image("python:2.7").AddParameter("network", "host") job.Container(container) - resp, err := r.CreateJob(job) + err := r.CreateJob(job) if err != nil { log.Fatal(err) } - fmt.Println(resp.String()) if ok, err := monitor.Instances(job.JobKey(), job.GetInstanceCount(), 10, 300); !ok || err != nil { - _, err := r.KillJob(job.JobKey()) + err := r.KillJob(job.JobKey()) if err != nil { log.Fatal(err) } @@ -223,14 +219,13 @@ func main() { fmt.Println("Creating a docker based job") container := realis.NewMesosContainer().DockerImage("python", "2.7") job.Container(container) - resp, err := r.CreateJob(job) + err := r.CreateJob(job) if err != nil { log.Fatal(err) } - fmt.Println(resp.String()) if ok, err := monitor.Instances(job.JobKey(), job.GetInstanceCount(), 10, 300); !ok || err != nil { - _, err := r.KillJob(job.JobKey()) + err := r.KillJob(job.JobKey()) if err != nil { log.Fatal(err) } @@ -241,11 +236,10 @@ func main() { // Cron config job.CronSchedule("* * * * *") job.IsService(false) - resp, err := r.ScheduleCronJob(job) + err := r.ScheduleCronJob(job) if err != nil { log.Fatal(err) } - fmt.Println(resp.String()) case "startCron": fmt.Println("Starting a Cron job") @@ -266,7 +260,7 @@ func main() { case "kill": fmt.Println("Killing job") - resp, err := r.KillJob(job.JobKey()) + err := r.KillJob(job.JobKey()) if err != nil { log.Fatal(err) } @@ -274,17 +268,14 @@ func main() { if ok, err := monitor.Instances(job.JobKey(), 0, 5, 50); !ok || err != nil { log.Fatal("Unable to kill all instances of job") } - fmt.Println(resp.String()) case "restart": fmt.Println("Restarting job") - resp, err := r.RestartJob(job.JobKey()) + err := r.RestartJob(job.JobKey()) if err != nil { log.Fatal(err) } - fmt.Println(resp.String()) - case "liveCount": fmt.Println("Getting instance count") @@ -320,7 +311,7 @@ func main() { for k := range live { instId = k } - resp, err := r.AddInstances(aurora.InstanceKey{ + err = r.AddInstances(aurora.InstanceKey{ JobKey: job.JobKey(), InstanceId: instId, }, @@ -333,7 +324,6 @@ func main() { if ok, err := monitor.Instances(job.JobKey(), currInstances+numOfInstances, 5, 50); !ok || err != nil { fmt.Println("Flexing up failed") } - fmt.Println(resp.String()) case "flexDown": fmt.Println("Flexing down job") @@ -347,7 +337,7 @@ func main() { currInstances := int32(len(live)) fmt.Println("Current num of instances: ", currInstances) - resp, err := r.RemoveInstances(job.JobKey(), numOfInstances) + err = r.RemoveInstances(job.JobKey(), numOfInstances) if err != nil { log.Fatal(err) } @@ -356,8 +346,6 @@ func main() { fmt.Println("flexDown failed") } - fmt.Println(resp.String()) - case "update": fmt.Println("Updating a job with with more RAM and to 5 instances") live, err := r.GetInstanceIds(job.JobKey(), aurora.ACTIVE_STATES) @@ -378,16 +366,16 @@ func main() { updateJob := realis.NewDefaultUpdateJob(taskConfig) updateJob.InstanceCount(5).RAM(128) - resp, err := r.StartJobUpdate(updateJob, "") + result, err := r.StartJobUpdate(updateJob, "") if err != nil { log.Fatal(err) } - jobUpdateKey := response.JobUpdateKey(resp) + jobUpdateKey := result.GetKey() monitor.JobUpdate(*jobUpdateKey, 5, 500) case "pauseJobUpdate": - resp, err := r.PauseJobUpdate(&aurora.JobUpdateKey{ + err := r.PauseJobUpdate(&aurora.JobUpdateKey{ Job: job.JobKey(), ID: updateId, }, "") @@ -395,10 +383,9 @@ func main() { if err != nil { log.Fatal(err) } - fmt.Println("PauseJobUpdate response: ", resp.String()) case "resumeJobUpdate": - resp, err := r.ResumeJobUpdate(&aurora.JobUpdateKey{ + err := r.ResumeJobUpdate(&aurora.JobUpdateKey{ Job: job.JobKey(), ID: updateId, }, "") @@ -406,7 +393,6 @@ func main() { if err != nil { log.Fatal(err) } - fmt.Println("ResumeJobUpdate response: ", resp.String()) case "pulseJobUpdate": resp, err := r.PulseJobUpdate(&aurora.JobUpdateKey{ @@ -420,7 +406,7 @@ func main() { fmt.Println("PulseJobUpdate response: ", resp.String()) case "updateDetails": - resp, err := r.JobUpdateDetails(aurora.JobUpdateQuery{ + result, err := r.JobUpdateDetails(aurora.JobUpdateQuery{ Key: &aurora.JobUpdateKey{ Job: job.JobKey(), ID: updateId, @@ -432,11 +418,11 @@ func main() { log.Fatal(err) } - fmt.Println(response.JobUpdateDetails(resp)) + fmt.Println(result) case "abortUpdate": fmt.Println("Abort update") - resp, err := r.AbortJobUpdate(aurora.JobUpdateKey{ + err := r.AbortJobUpdate(aurora.JobUpdateKey{ Job: job.JobKey(), ID: updateId, }, @@ -445,11 +431,10 @@ func main() { if err != nil { log.Fatal(err) } - fmt.Println(resp.String()) case "rollbackUpdate": fmt.Println("Abort update") - resp, err := r.RollbackJobUpdate(aurora.JobUpdateKey{ + err := r.RollbackJobUpdate(aurora.JobUpdateKey{ Job: job.JobKey(), ID: updateId, }, @@ -458,7 +443,6 @@ func main() { if err != nil { log.Fatal(err) } - fmt.Println(resp.String()) case "taskConfig": fmt.Println("Getting job info") @@ -532,7 +516,7 @@ func main() { log.Fatal("No hosts specified to drain") } hosts := strings.Split(hostList, ",") - _, result, err := r.DrainHosts(hosts...) + result, err := r.DrainHosts(hosts...) if err != nil { log.Fatalf("error: %+v\n", err.Error()) } @@ -591,7 +575,7 @@ func main() { log.Fatal("No hosts specified to drain") } hosts := strings.Split(hostList, ",") - _, result, err := r.EndMaintenance(hosts...) + result, err := r.EndMaintenance(hosts...) if err != nil { log.Fatalf("error: %+v\n", err.Error()) } @@ -630,7 +614,7 @@ func main() { case "getJobs": fmt.Println("GetJobs...role: ", role) - _, result, err := r.GetJobs(role) + result, err := r.GetJobs(role) if err != nil { log.Fatalf("error: %+v\n", err.Error()) } diff --git a/monitors.go b/monitors.go index d23426f..4b9d006 100644 --- a/monitors.go +++ b/monitors.go @@ -19,7 +19,6 @@ import ( "time" "github.com/paypal/gorealis/gen-go/apache/aurora" - "github.com/paypal/gorealis/response" "github.com/pkg/errors" ) @@ -44,24 +43,20 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout defer ticker.Stop() timer := time.NewTimer(time.Second * time.Duration(timeout)) defer timer.Stop() - var cliErr error - var respDetail *aurora.Response for { select { case <-ticker.C: - respDetail, cliErr = m.Client.JobUpdateDetails(updateQ) + updateDetail, cliErr := m.Client.JobUpdateDetails(updateQ) if cliErr != nil { return false, cliErr } - updateDetail := response.JobUpdateDetails(respDetail) - - if len(updateDetail) == 0 { + if len(updateDetail.GetDetailsList()) == 0 { m.Client.RealisConfig().logger.Println("No update found") return false, errors.New("No update found for " + updateKey.String()) } - status := updateDetail[0].Update.Summary.State.Status + status := updateDetail.GetDetailsList()[0].Update.Summary.State.Status if _, ok := aurora.ACTIVE_JOB_UPDATE_STATES[status]; !ok { @@ -151,7 +146,7 @@ func (m *Monitor) HostMaintenance(hosts []string, modes []aurora.MaintenanceMode select { case <-ticker.C: // Client call has multiple retries internally - _, result, err := m.Client.MaintenanceStatus(hosts...) + result, err := m.Client.MaintenanceStatus(hosts...) if err != nil { // Error is either a payload error or a severe connection error for host := range remainingHosts { diff --git a/realis.go b/realis.go index b29c899..50d4677 100644 --- a/realis.go +++ b/realis.go @@ -179,7 +179,7 @@ func newTJSONTransport(url string, timeout time.Duration, config *RealisConfig) } httpTrans := (trans).(*thrift.THttpClient) httpTrans.SetHeader("Content-Type", "application/x-thrift") - httpTrans.SetHeader("User-Agent", "GoRealis v"+VERSION) + httpTrans.SetHeader("User-Agent", "gorealis v"+VERSION) return trans, err } @@ -192,7 +192,7 @@ func newTBinTransport(url string, timeout time.Duration, config *RealisConfig) ( httpTrans.DelHeader("Content-Type") // Workaround for using thrift HttpPostClient httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary") httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary") - httpTrans.SetHeader("User-Agent", "GoRealis v"+VERSION) + httpTrans.SetHeader("User-Agent", "gorealis v"+VERSION) return trans, err } @@ -299,17 +299,6 @@ func NewRealisClient(options ...ClientOption) (*RealisClient, error) { lock: &sync.Mutex{}}, nil } -func GetDefaultClusterFromZKUrl(zkurl string) *Cluster { - return &Cluster{ - Name: "defaultCluster", - AuthMechanism: "UNAUTHENTICATED", - ZK: zkurl, - SchedZKPath: "/aurora/scheduler", - AgentRunDir: "latest", - AgentRoot: "/var/lib/mesos", - } -} - func GetCerts(certPath string) (*x509.CertPool, error) { globalRootCAs := x509.NewCertPool() caFiles, err := ioutil.ReadDir(certPath) @@ -344,7 +333,7 @@ func defaultTTransport(url string, timeout time.Duration, config *RealisConfig) if config.certsPath != "" { rootCAs, err := GetCerts(config.certsPath) if err != nil { - config.logger.Println("error occured couldn't fetch certs") + config.logger.Println("error occurred couldn't fetch certs") return nil, err } tlsConfig.RootCAs = rootCAs @@ -358,7 +347,7 @@ func defaultTTransport(url string, timeout time.Duration, config *RealisConfig) if config.clientKey != "" && config.clientCert != "" { cert, err := tls.LoadX509KeyPair(config.clientCert, config.clientKey) if err != nil { - config.logger.Println("error occured loading client certs and keys") + config.logger.Println("error occurred loading client certs and keys") return nil, err } tlsConfig.Certificates = []tls.Certificate{cert} @@ -490,7 +479,7 @@ func (r *RealisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche } -func (r *RealisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error) { +func (r *RealisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.GetJobUpdateSummariesResult_, error) { r.logger.DebugPrintf("GetJobUpdateSummaries Thrift Payload: %+v\n", jobUpdateQuery) resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { @@ -501,10 +490,10 @@ func (r *RealisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue return nil, errors.Wrap(retryErr, "Error getting job update summaries from Aurora Scheduler") } - return resp, nil + return resp.GetResult_().GetGetJobUpdateSummariesResult_(), nil } -func (r *RealisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsResult_, error) { +func (r *RealisClient) GetJobs(role string) (*aurora.GetJobsResult_, error) { var result *aurora.GetJobsResult_ @@ -513,18 +502,19 @@ func (r *RealisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe }) if retryErr != nil { - return nil, result, errors.Wrap(retryErr, "Error getting Jobs from Aurora Scheduler") + return result, errors.Wrap(retryErr, "Error getting Jobs from Aurora Scheduler") } if resp.GetResult_() != nil { result = resp.GetResult_().GetJobsResult_ } - return resp, result, nil + return result, nil } -// Kill specific instances of a job. -func (r *RealisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) { +// Kill specific instances of a job. Returns true, nil if a task was actually killed as a result of this API call. +// Returns false, nil if no tasks were killed as a result of this call but there was no error making the call. +func (r *RealisClient) KillInstances(key *aurora.JobKey, instances ...int32) (bool, error) { r.logger.DebugPrintf("KillTasks Thrift Payload: %+v %v\n", key, instances) instanceIds := make(map[int32]bool) @@ -538,9 +528,16 @@ func (r *RealisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "Error sending Kill command to Aurora Scheduler") + return false, errors.Wrap(retryErr, "Error sending Kill command to Aurora Scheduler") } - return resp, nil + + if len(resp.GetDetails()) > 0 { + r.logger.Println("KillTasks was called but no tasks killed as a result.") + return false, nil + } else { + return true, nil + } + } func (r *RealisClient) RealisConfig() *RealisConfig { @@ -548,69 +545,69 @@ func (r *RealisClient) RealisConfig() *RealisConfig { } // Sends a kill message to the scheduler for all active tasks under a job. -func (r *RealisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { +func (r *RealisClient) KillJob(key *aurora.JobKey) error { r.logger.DebugPrintf("KillTasks Thrift Payload: %+v\n", key) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { + _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { // Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards return r.client.KillTasks(key, nil, "") }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "Error sending Kill command to Aurora Scheduler") + return errors.Wrap(retryErr, "Error sending Kill command to Aurora Scheduler") } - return resp, nil + return nil } // Sends a create job message to the scheduler with a specific job configuration. // Although this API is able to create service jobs, it is better to use CreateService instead // as that API uses the update thrift call which has a few extra features available. // Use this API to create ad-hoc jobs. -func (r *RealisClient) CreateJob(auroraJob *AuroraJob) (*aurora.Response, error) { +func (r *RealisClient) CreateJob(auroraJob *AuroraJob) error { r.logger.DebugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig()) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { + _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.CreateJob(auroraJob.JobConfig()) }) if retryErr != nil { - return resp, errors.Wrap(retryErr, "Error sending Create command to Aurora Scheduler") + return errors.Wrap(retryErr, "Error sending Create command to Aurora Scheduler") } - return resp, nil + return nil } // This API uses an update thrift call to create the services giving a few more robust features. -func (r *RealisClient) CreateService(auroraJob *AuroraJob, settings *aurora.JobUpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error) { +func (r *RealisClient) CreateService(auroraJob *AuroraJob, settings *aurora.JobUpdateSettings) (*aurora.StartJobUpdateResult_, error) { // Create a new job update object and ship it to the StartJobUpdate api update := NewUpdateJob(auroraJob.TaskConfig(), settings) update.InstanceCount(auroraJob.GetInstanceCount()) - resp, err := r.StartJobUpdate(update, "") + updateResult, err := r.StartJobUpdate(update, "") if err != nil { - return resp, nil, errors.Wrap(err, "unable to create service") + return nil, errors.Wrap(err, "unable to create service") } - if resp.GetResult_() != nil { - return resp, resp.GetResult_().GetStartJobUpdateResult_(), nil + if updateResult != nil { + return updateResult, nil } - return nil, nil, errors.New("results object is nil") + return nil, errors.New("results object is nil") } -func (r *RealisClient) ScheduleCronJob(auroraJob *AuroraJob) (*aurora.Response, error) { +func (r *RealisClient) ScheduleCronJob(auroraJob *AuroraJob) error { r.logger.DebugPrintf("ScheduleCronJob Thrift Payload: %+v\n", auroraJob.JobConfig()) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { + _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.ScheduleCronJob(auroraJob.JobConfig()) }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "Error sending Cron AuroraJob Schedule message to Aurora Scheduler") + return errors.Wrap(retryErr, "Error sending Cron AuroraJob Schedule message to Aurora Scheduler") } - return resp, nil + return nil } func (r *RealisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) { @@ -645,7 +642,7 @@ func (r *RealisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error } // Restarts specific instances specified -func (r *RealisClient) RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) { +func (r *RealisClient) RestartInstances(key *aurora.JobKey, instances ...int32) error { r.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instances) instanceIds := make(map[int32]bool) @@ -654,43 +651,43 @@ func (r *RealisClient) RestartInstances(key *aurora.JobKey, instances ...int32) instanceIds[instId] = true } - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { + _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.RestartShards(key, instanceIds) }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "Error sending Restart command to Aurora Scheduler") + return errors.Wrap(retryErr, "Error sending Restart command to Aurora Scheduler") } - return resp, nil + return nil } // Restarts all active tasks under a job configuration. -func (r *RealisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) { +func (r *RealisClient) RestartJob(key *aurora.JobKey) error { instanceIds, err1 := r.GetInstanceIds(key, aurora.ACTIVE_STATES) if err1 != nil { - return nil, errors.Wrap(err1, "Could not retrieve relevant task instance IDs") + return errors.Wrap(err1, "Could not retrieve relevant task instance IDs") } r.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instanceIds) if len(instanceIds) > 0 { - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { + _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.RestartShards(key, instanceIds) }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "Error sending Restart command to Aurora Scheduler") + return errors.Wrap(retryErr, "Error sending Restart command to Aurora Scheduler") } - return resp, nil + return nil } else { - return nil, errors.New("No tasks in the Active state") + return errors.New("No tasks in the Active state") } } // Update all tasks under a job configuration. Currently gorealis doesn't support for canary deployments. -func (r *RealisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) { +func (r *RealisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.StartJobUpdateResult_, error) { r.logger.DebugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message) @@ -699,60 +696,60 @@ func (r *RealisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au }) if retryErr != nil { - return resp, errors.Wrap(retryErr, "Error sending StartJobUpdate command to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "Error sending StartJobUpdate command to Aurora Scheduler") } - return resp, nil + return resp.GetResult_().GetStartJobUpdateResult_(), nil } // Abort AuroraJob Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI. -func (r *RealisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) { +func (r *RealisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) error { r.logger.DebugPrintf("AbortJobUpdate Thrift Payload: %+v %v\n", updateKey, message) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { + _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.AbortJobUpdate(&updateKey, message) }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "Error sending AbortJobUpdate command to Aurora Scheduler") + return errors.Wrap(retryErr, "Error sending AbortJobUpdate command to Aurora Scheduler") } - return resp, nil + return nil } -//Pause AuroraJob Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. -func (r *RealisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) { +// Pause AuroraJob Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. +func (r *RealisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) error { r.logger.DebugPrintf("PauseJobUpdate Thrift Payload: %+v %v\n", updateKey, message) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { + _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.PauseJobUpdate(updateKey, message) }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "Error sending PauseJobUpdate command to Aurora Scheduler") + return errors.Wrap(retryErr, "Error sending PauseJobUpdate command to Aurora Scheduler") } - return resp, nil + return nil } -//Resume Paused AuroraJob Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. -func (r *RealisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) { +// Resume Paused AuroraJob Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. +func (r *RealisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message string) error { r.logger.DebugPrintf("ResumeJobUpdate Thrift Payload: %+v %v\n", updateKey, message) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { + _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.ResumeJobUpdate(updateKey, message) }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "Error sending ResumeJobUpdate command to Aurora Scheduler") + return errors.Wrap(retryErr, "Error sending ResumeJobUpdate command to Aurora Scheduler") } - return resp, nil + return nil } -//Pulse AuroraJob Update on Aurora. UpdateID is returned from StartJobUpdate or the Aurora web UI. -func (r *RealisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.Response, error) { +// Pulse AuroraJob Update on Aurora. UpdateID is returned from StartJobUpdate or the Aurora web UI. +func (r *RealisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.PulseJobUpdateResult_, error) { r.logger.DebugPrintf("PulseJobUpdate Thrift Payload: %+v\n", updateKey) @@ -764,35 +761,35 @@ func (r *RealisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R return nil, errors.Wrap(retryErr, "Error sending PulseJobUpdate command to Aurora Scheduler") } - return resp, nil + return resp.GetResult_().GetPulseJobUpdateResult_(), nil } // Scale up the number of instances under a job configuration using the configuration for specific // instance to scale up. -func (r *RealisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) { +func (r *RealisClient) AddInstances(instKey aurora.InstanceKey, count int32) error { r.logger.DebugPrintf("AddInstances Thrift Payload: %+v %v\n", instKey, count) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { + _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.AddInstances(&instKey, count) }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "Error sending AddInstances command to Aurora Scheduler") + return errors.Wrap(retryErr, "Error sending AddInstances command to Aurora Scheduler") } - return resp, nil + return nil } -//Scale down the number of instances under a job configuration using the configuration of a specific instance -func (r *RealisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora.Response, error) { +// Scale down the number of instances under a job configuration using the configuration of a specific instance +func (r *RealisClient) RemoveInstances(key *aurora.JobKey, count int32) error { instanceIds, err := r.GetInstanceIds(key, aurora.ACTIVE_STATES) if err != nil { - return nil, errors.Wrap(err, "RemoveInstances: Could not retrieve relevant instance IDs") + return errors.Wrap(err, "RemoveInstances: Could not retrieve relevant instance IDs") } if len(instanceIds) < int(count) { - return nil, errors.New(fmt.Sprintf("RemoveInstances: No sufficient instances to Kill - "+ - "Instances to kill %d Total Instances %d", count, len(instanceIds))) + return errors.Errorf("Insufficient acative available for killing "+ + " Instances to be killed %d Active instances %d", count, len(instanceIds)) } instanceList := make([]int32, count) i := 0 @@ -803,11 +800,17 @@ func (r *RealisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora break } } - return r.KillInstances(key, instanceList...) + killed, err := r.KillInstances(key, instanceList...) + + if !killed { + return errors.New("Flex down was not able to reduce the number of instances running.") + } + + return nil } // Get information about task including a fully hydrated task configuration object -func (r *RealisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) { +func (r *RealisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) { r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", query) @@ -823,7 +826,7 @@ func (r *RealisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.S } // Get pending reason -func (r *RealisClient) GetPendingReason(query *aurora.TaskQuery) (pendingReasons []*aurora.PendingReason, e error) { +func (r *RealisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.PendingReason, error) { r.logger.DebugPrintf("GetPendingReason Thrift Payload: %+v\n", query) @@ -840,6 +843,8 @@ func (r *RealisClient) GetPendingReason(query *aurora.TaskQuery) (pendingReasons if resp.GetResult_() != nil { result = resp.GetResult_().GetGetPendingReasonResult_().GetReasons() } + + var pendingReasons []*aurora.PendingReason for reason := range result { pendingReasons = append(pendingReasons, reason) } @@ -847,7 +852,7 @@ func (r *RealisClient) GetPendingReason(query *aurora.TaskQuery) (pendingReasons } // Get information about task including without a task configuration object -func (r *RealisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) { +func (r *RealisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) { r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query) @@ -901,7 +906,7 @@ func (r *RealisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task return tasks[0].AssignedTask.Task, nil } -func (r *RealisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aurora.Response, error) { +func (r *RealisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aurora.GetJobUpdateDetailsResult_, error) { r.logger.DebugPrintf("GetJobUpdateDetails Thrift Payload: %+v\n", updateQuery) @@ -912,22 +917,21 @@ func (r *RealisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur if retryErr != nil { return nil, errors.Wrap(retryErr, "Unable to get job update details") } - return resp, nil - + return resp.GetResult_().GetGetJobUpdateDetailsResult_(), nil } -func (r *RealisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) { +func (r *RealisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string) error { r.logger.DebugPrintf("RollbackJobUpdate Thrift Payload: %+v %v\n", key, message) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { + _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.RollbackJobUpdate(&key, message) }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "Unable to roll back job update") + return errors.Wrap(retryErr, "Unable to roll back job update") } - return resp, nil + return nil } /* Admin functions */ @@ -937,12 +941,12 @@ func (r *RealisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string // Set a list of nodes to DRAINING. This means nothing will be able to be scheduled on them and any existing // tasks will be killed and re-scheduled elsewhere in the cluster. Tasks from DRAINING nodes are not guaranteed // to return to running unless there is enough capacity in the cluster to run them. -func (r *RealisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.DrainHostsResult_, error) { +func (r *RealisClient) DrainHosts(hosts ...string) (*aurora.DrainHostsResult_, error) { var result *aurora.DrainHostsResult_ if len(hosts) == 0 { - return nil, nil, errors.New("no hosts provided to drain") + return nil, errors.New("no hosts provided to drain") } drainList := aurora.NewHosts() @@ -958,14 +962,14 @@ func (r *RealisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr }) if retryErr != nil { - return resp, result, errors.Wrap(retryErr, "Unable to recover connection") + return result, errors.Wrap(retryErr, "Unable to recover connection") } if resp.GetResult_() != nil { result = resp.GetResult_().GetDrainHostsResult_() } - return resp, result, nil + return result, nil } // Start SLA Aware Drain. @@ -1001,12 +1005,12 @@ func (r *RealisClient) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, ho return result, nil } -func (r *RealisClient) StartMaintenance(hosts ...string) (*aurora.Response, *aurora.StartMaintenanceResult_, error) { +func (r *RealisClient) StartMaintenance(hosts ...string) (*aurora.StartMaintenanceResult_, error) { var result *aurora.StartMaintenanceResult_ if len(hosts) == 0 { - return nil, nil, errors.New("no hosts provided to start maintenance on") + return nil, errors.New("no hosts provided to start maintenance on") } hostList := aurora.NewHosts() @@ -1022,22 +1026,22 @@ func (r *RealisClient) StartMaintenance(hosts ...string) (*aurora.Response, *aur }) if retryErr != nil { - return resp, result, errors.Wrap(retryErr, "Unable to recover connection") + return result, errors.Wrap(retryErr, "Unable to recover connection") } if resp.GetResult_() != nil { result = resp.GetResult_().GetStartMaintenanceResult_() } - return resp, result, nil + return result, nil } -func (r *RealisClient) EndMaintenance(hosts ...string) (*aurora.Response, *aurora.EndMaintenanceResult_, error) { +func (r *RealisClient) EndMaintenance(hosts ...string) (*aurora.EndMaintenanceResult_, error) { var result *aurora.EndMaintenanceResult_ if len(hosts) == 0 { - return nil, nil, errors.New("no hosts provided to end maintenance on") + return nil, errors.New("no hosts provided to end maintenance on") } hostList := aurora.NewHosts() @@ -1053,22 +1057,22 @@ func (r *RealisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror }) if retryErr != nil { - return resp, result, errors.Wrap(retryErr, "Unable to recover connection") + return result, errors.Wrap(retryErr, "Unable to recover connection") } if resp.GetResult_() != nil { result = resp.GetResult_().GetEndMaintenanceResult_() } - return resp, result, nil + return result, nil } -func (r *RealisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *aurora.MaintenanceStatusResult_, error) { +func (r *RealisClient) MaintenanceStatus(hosts ...string) (*aurora.MaintenanceStatusResult_, error) { var result *aurora.MaintenanceStatusResult_ if len(hosts) == 0 { - return nil, nil, errors.New("no hosts provided to get maintenance status from") + return nil, errors.New("no hosts provided to get maintenance status from") } hostList := aurora.NewHosts() @@ -1086,19 +1090,19 @@ func (r *RealisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au }) if retryErr != nil { - return resp, result, errors.Wrap(retryErr, "Unable to recover connection") + return result, errors.Wrap(retryErr, "Unable to recover connection") } if resp.GetResult_() != nil { result = resp.GetResult_().GetMaintenanceStatusResult_() } - return resp, result, nil + return result, nil } // SetQuota sets a quota aggregate for the given role // TODO(zircote) Currently investigating an error that is returned from thrift calls that include resources for `NamedPort` and `NumGpu` -func (r *RealisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb *int64) (*aurora.Response, error) { +func (r *RealisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb *int64) error { ram := aurora.NewResource() ram.RamMb = ramMb c := aurora.NewResource() @@ -1110,28 +1114,28 @@ func (r *RealisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb quota.Resources[ram] = true quota.Resources[c] = true quota.Resources[d] = true - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { + _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.adminClient.SetQuota(role, quota) }) if retryErr != nil { - return resp, errors.Wrap(retryErr, "Unable to set role quota") + return errors.Wrap(retryErr, "Unable to set role quota") } - return resp, retryErr + return retryErr } // GetQuota returns the resource aggregate for the given role -func (r *RealisClient) GetQuota(role string) (*aurora.Response, error) { +func (r *RealisClient) GetQuota(role string) (*aurora.GetQuotaResult_, error) { resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.adminClient.GetQuota(role) }) if retryErr != nil { - return resp, errors.Wrap(retryErr, "Unable to get role quota") + return nil, errors.Wrap(retryErr, "Unable to get role quota") } - return resp, retryErr + return resp.GetResult_().GetGetQuotaResult_(), retryErr } // Force Aurora Scheduler to perform a snapshot and write to Mesos log