diff --git a/CHANGELOG.md b/CHANGELOG.md index a4ac174..e38b5b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +1.21.1 (unreleased) + +* CreateService and StartJobUpdate do not continue retrying if a timeout has been encountered +by the HTTP client. Instead they now return an error that conforms to the Timedout interface. +Users can check for a Timedout error by using `realis.IsTimeout(err)`. + 1.21.0 * Version numbering change. Future versions will be labled X.Y.Z where X is the major version, Y is the Aurora version the library has been tested against (e.g. 21 -> 0.21.0), and X is the minor revision. diff --git a/docs/leveraging-the-library.md b/docs/leveraging-the-library.md index 464bddd..e13ca2c 100644 --- a/docs/leveraging-the-library.md +++ b/docs/leveraging-the-library.md @@ -57,4 +57,19 @@ updateJob := realis.NewUpdateJob(job) updateJob.InstanceCount(1) updateJob.Ram(128) msg, err := r.UpdateJob(updateJob, "") -``` \ No newline at end of file +``` + + +* Handling a timeout scenario: + +When sending an API call to Aurora, the call may timeout at the client side. +This means that the time limit has been reached while waiting for the scheduler +to reply. In such a case it is recommended that the timeout is increased through +the use of the `realis.TimeoutMS()` option. + +As these timeouts cannot be totally avoided, there exists a mechanism to mitigate such +scenarios. The `StartJobUpdate` and `CreateService` API will return an error that +implements the Timeout interface. + +An error can be checked to see if it is a Timeout error by using the `realis.IsTimeout()` +function. \ No newline at end of file diff --git a/examples/client.go b/examples/client.go index 7425542..f0dcedf 100644 --- a/examples/client.go +++ b/examples/client.go @@ -31,7 +31,7 @@ var cmd, executor, url, clustersConfig, clusterName, updateId, username, passwor var caCertsPath string var clientKey, clientCert string -var CONNECTION_TIMEOUT = 20000 +var ConnectionTimeout = 20000 func init() { flag.StringVar(&cmd, "cmd", "", "Job request type to send to Aurora Scheduler") @@ -82,7 +82,7 @@ func main() { clientOptions := []realis.ClientOption{ realis.BasicAuth(username, password), realis.ThriftJSON(), - realis.TimeoutMS(CONNECTION_TIMEOUT), + realis.TimeoutMS(ConnectionTimeout), realis.BackOff(realis.Backoff{ Steps: 2, Duration: 10 * time.Second, @@ -92,7 +92,6 @@ func main() { realis.Debug(), } - //check if zkUrl is available. if zkUrl != "" { fmt.Println("zkUrl: ", zkUrl) clientOptions = append(clientOptions, realis.ZKUrl(zkUrl)) diff --git a/job.go b/job.go index 029d911..b7d443b 100644 --- a/job.go +++ b/job.go @@ -123,7 +123,7 @@ func (j *AuroraJob) Environment(env string) Job { func (j *AuroraJob) Role(role string) Job { j.jobConfig.Key.Role = role - //Will be deprecated + // Will be deprecated identity := &aurora.Identity{User: role} j.jobConfig.Owner = identity j.jobConfig.TaskConfig.Owner = identity diff --git a/monitors.go b/monitors.go index 6ef7c1a..72e7027 100644 --- a/monitors.go +++ b/monitors.go @@ -19,7 +19,6 @@ import ( "time" "github.com/paypal/gorealis/gen-go/apache/aurora" - "github.com/paypal/gorealis/response" "github.com/pkg/errors" ) @@ -30,16 +29,20 @@ type Monitor struct { // Polls the scheduler every certain amount of time to see if the update has succeeded func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout int) (bool, error) { - status, err := m.JobUpdateStatus(updateKey, - map[aurora.JobUpdateStatus]bool{ - aurora.JobUpdateStatus_ROLLED_FORWARD: true, - aurora.JobUpdateStatus_ROLLED_BACK: true, - aurora.JobUpdateStatus_ABORTED: true, - aurora.JobUpdateStatus_ERROR: true, - aurora.JobUpdateStatus_FAILED: true, + updateQ := aurora.JobUpdateQuery{ + Key: &updateKey, + Limit: 1, + UpdateStatuses: []aurora.JobUpdateStatus{ + aurora.JobUpdateStatus_ROLLED_FORWARD, + aurora.JobUpdateStatus_ROLLED_BACK, + aurora.JobUpdateStatus_ABORTED, + aurora.JobUpdateStatus_ERROR, + aurora.JobUpdateStatus_FAILED, }, - time.Duration(interval)*time.Second, - time.Duration(timeout)*time.Second) + } + updateSummaries, err := m.JobUpdateQuery(updateQ, time.Duration(interval)*time.Second, time.Duration(timeout)*time.Second) + + status := updateSummaries[0].State.Status if err != nil { return false, err @@ -52,22 +55,43 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout switch status { case aurora.JobUpdateStatus_ROLLED_FORWARD: return true, nil - case aurora.JobUpdateStatus_ROLLED_BACK, aurora.JobUpdateStatus_ABORTED, aurora.JobUpdateStatus_ERROR, aurora.JobUpdateStatus_FAILED: + case aurora.JobUpdateStatus_ROLLED_BACK, + aurora.JobUpdateStatus_ABORTED, + aurora.JobUpdateStatus_ERROR, + aurora.JobUpdateStatus_FAILED: return false, errors.Errorf("bad terminal state for update: %v", status) default: return false, errors.Errorf("unexpected update state: %v", status) } } -func (m *Monitor) JobUpdateStatus(updateKey aurora.JobUpdateKey, +func (m *Monitor) JobUpdateStatus( + updateKey aurora.JobUpdateKey, desiredStatuses map[aurora.JobUpdateStatus]bool, interval time.Duration, timeout time.Duration) (aurora.JobUpdateStatus, error) { - updateQ := aurora.JobUpdateQuery{ - Key: &updateKey, - Limit: 1, + desiredStatusesSlice := make([]aurora.JobUpdateStatus, 0) + + for k := range desiredStatuses { + desiredStatusesSlice = append(desiredStatusesSlice, k) } + + updateQ := aurora.JobUpdateQuery{ + Key: &updateKey, + Limit: 1, + UpdateStatuses: desiredStatusesSlice, + } + summary, err := m.JobUpdateQuery(updateQ, interval, timeout) + + return summary[0].State.Status, err +} + +func (m *Monitor) JobUpdateQuery( + updateQuery aurora.JobUpdateQuery, + interval time.Duration, + timeout time.Duration) ([]*aurora.JobUpdateSummary, error) { + ticker := time.NewTicker(interval) defer ticker.Stop() timer := time.NewTimer(timeout) @@ -78,25 +102,18 @@ func (m *Monitor) JobUpdateStatus(updateKey aurora.JobUpdateKey, for { select { case <-ticker.C: - respDetail, cliErr = m.Client.JobUpdateDetails(updateQ) + respDetail, cliErr = m.Client.GetJobUpdateSummaries(&updateQuery) if cliErr != nil { - return aurora.JobUpdateStatus(-1), cliErr + return nil, cliErr } - updateDetail := response.JobUpdateDetails(respDetail) - - if len(updateDetail) == 0 { - m.Client.RealisConfig().logger.Println("No update found") - return aurora.JobUpdateStatus(-1), errors.New("No update found for " + updateKey.String()) - } - status := updateDetail[0].Update.Summary.State.Status - - if _, ok := desiredStatuses[status]; ok { - return status, nil + updateSummaries := respDetail.Result_.GetJobUpdateSummariesResult_.UpdateSummaries + if len(updateSummaries) >= 1 { + return updateSummaries, nil } case <-timer.C: - return aurora.JobUpdateStatus(-1), newTimedoutError(errors.New("job update monitor timed out")) + return nil, newTimedoutError(errors.New("job update monitor timed out")) } } } @@ -109,7 +126,12 @@ func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval, timeo // Monitor a Job until all instances enter a desired status. // Defaults sets of desired statuses provided by the thrift API include: // ACTIVE_STATES, SLAVE_ASSIGNED_STATES, LIVE_STATES, and TERMINAL_STATES -func (m *Monitor) ScheduleStatus(key *aurora.JobKey, instanceCount int32, desiredStatuses map[aurora.ScheduleStatus]bool, interval, timeout int) (bool, error) { +func (m *Monitor) ScheduleStatus( + key *aurora.JobKey, + instanceCount int32, + desiredStatuses map[aurora.ScheduleStatus]bool, + interval int, + timeout int) (bool, error) { ticker := time.NewTicker(time.Second * time.Duration(interval)) defer ticker.Stop() diff --git a/realis.go b/realis.go index 0b06c07..1b61e7d 100644 --- a/realis.go +++ b/realis.go @@ -23,7 +23,6 @@ import ( "io/ioutil" "log" "net/http" - "net/http/cookiejar" "os" "path/filepath" "sort" @@ -111,7 +110,7 @@ type RealisConfig struct { logger *LevelLogger InsecureSkipVerify bool certspath string - clientkey, clientcert string + clientKey, clientCert string options []ClientOption debug bool trace bool @@ -125,6 +124,8 @@ var defaultBackoff = Backoff{ Jitter: 0.1, } +const APIPath = "/api" + type ClientOption func(*RealisConfig) //Config sets for options in RealisConfig. @@ -204,7 +205,7 @@ func Certspath(certspath string) ClientOption { func ClientCerts(clientKey, clientCert string) ClientOption { return func(config *RealisConfig) { - config.clientkey, config.clientcert = clientKey, clientCert + config.clientKey, config.clientCert = clientKey, clientCert } } @@ -240,24 +241,24 @@ func Trace() ClientOption { func newTJSONTransport(url string, timeout int, config *RealisConfig) (thrift.TTransport, error) { trans, err := defaultTTransport(url, timeout, config) if err != nil { - return nil, errors.Wrap(err, "Error creating realis") + return nil, errors.Wrap(err, "unable to create transport") } httpTrans := (trans).(*thrift.THttpClient) httpTrans.SetHeader("Content-Type", "application/x-thrift") - httpTrans.SetHeader("User-Agent", "GoRealis v"+VERSION) + httpTrans.SetHeader("User-Agent", "gorealis v"+VERSION) return trans, err } func newTBinTransport(url string, timeout int, config *RealisConfig) (thrift.TTransport, error) { trans, err := defaultTTransport(url, timeout, config) if err != nil { - return nil, errors.Wrap(err, "Error creating realis") + return nil, errors.Wrap(err, "unable to create transport") } httpTrans := (trans).(*thrift.THttpClient) httpTrans.DelHeader("Content-Type") // Workaround for using thrift HttpPostClient httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary") httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary") - httpTrans.SetHeader("User-Agent", "GoRealis v"+VERSION) + httpTrans.SetHeader("User-Agent", "gorealis v"+VERSION) return trans, err } @@ -306,7 +307,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { config.logger.DebugPrintln("Number of options applied to config: ", len(options)) - //Set default Transport to JSON if needed. + // Set default Transport to JSON if needed. if !config.jsonTransport && !config.binTransport { config.jsonTransport = true } @@ -318,7 +319,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { if config.zkOptions != nil { url, err = LeaderFromZKOpts(config.zkOptions...) if err != nil { - return nil, NewTemporaryError(errors.Wrap(err, "LeaderFromZK error")) + return nil, NewTemporaryError(errors.Wrap(err, "unable to use zk to get leader")) } config.logger.Println("Scheduler URL from ZK: ", url) } else if config.cluster != nil { @@ -327,20 +328,20 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { url, err = LeaderFromZK(*config.cluster) // If ZK is configured, throw an error if the leader is unable to be determined if err != nil { - return nil, NewTemporaryError(errors.Wrap(err, "LeaderFromZK error")) + return nil, NewTemporaryError(errors.Wrap(err, "unable to use zk to get leader ")) } config.logger.Println("Scheduler URL from ZK: ", url) } else if config.url != "" { url = config.url config.logger.Println("Scheduler URL: ", url) } else { - return nil, errors.New("Incomplete Options -- url, cluster.json, or Zookeeper address required") + return nil, errors.New("incomplete Options -- url, cluster.json, or Zookeeper address required") } if config.jsonTransport { trans, err := newTJSONTransport(url, config.timeoutms, config) if err != nil { - return nil, NewTemporaryError(errors.Wrap(err, "Error creating realis")) + return nil, NewTemporaryError(err) } config.transport = trans config.protoFactory = thrift.NewTJSONProtocolFactory() @@ -348,7 +349,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { } else if config.binTransport { trans, err := newTBinTransport(url, config.timeoutms, config) if err != nil { - return nil, NewTemporaryError(errors.Wrap(err, "Error creating realis")) + return nil, NewTemporaryError(err) } config.transport = trans config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault() @@ -383,15 +384,15 @@ func GetDefaultClusterFromZKUrl(zkurl string) *Cluster { } } -func GetCerts(certpath string) (*x509.CertPool, error) { +func GetCerts(certPath string) (*x509.CertPool, error) { globalRootCAs := x509.NewCertPool() - caFiles, err := ioutil.ReadDir(certpath) + caFiles, err := ioutil.ReadDir(certPath) if err != nil { return nil, err } for _, cert := range caFiles { - capathfile := filepath.Join(certpath, cert.Name()) - caCert, err := ioutil.ReadFile(capathfile) + caPathFile := filepath.Join(certPath, cert.Name()) + caCert, err := ioutil.ReadFile(caPathFile) if err != nil { return nil, err } @@ -401,35 +402,29 @@ func GetCerts(certpath string) (*x509.CertPool, error) { } // Creates a default Thrift Transport object for communications in gorealis using an HTTP Post Client -func defaultTTransport(urlstr string, timeoutms int, config *RealisConfig) (thrift.TTransport, error) { - jar, err := cookiejar.New(nil) - if err != nil { - return &thrift.THttpClient{}, errors.Wrap(err, "Error creating Cookie Jar") - } +func defaultTTransport(url string, timeoutMs int, config *RealisConfig) (thrift.TTransport, error) { var transport http.Transport if config != nil { - tlsConfig := &tls.Config{} - if config.InsecureSkipVerify { - tlsConfig.InsecureSkipVerify = true - } + tlsConfig := &tls.Config{InsecureSkipVerify: config.InsecureSkipVerify} + if config.certspath != "" { rootCAs, err := GetCerts(config.certspath) if err != nil { - config.logger.Println("error occured couldn't fetch certs") + config.logger.Println("error occurred couldn't fetch certs") return nil, err } tlsConfig.RootCAs = rootCAs } - if config.clientkey != "" && config.clientcert == "" { - return nil, fmt.Errorf("have to provide both client key,cert. Only client key provided ") + if config.clientKey != "" && config.clientCert == "" { + return nil, fmt.Errorf("have to provide both client key, cert. Only client key provided ") } - if config.clientkey == "" && config.clientcert != "" { - return nil, fmt.Errorf("have to provide both client key,cert. Only client cert provided ") + if config.clientKey == "" && config.clientCert != "" { + return nil, fmt.Errorf("have to provide both client key, cert. Only client cert provided ") } - if config.clientkey != "" && config.clientcert != "" { - cert, err := tls.LoadX509KeyPair(config.clientcert, config.clientkey) + if config.clientKey != "" && config.clientCert != "" { + cert, err := tls.LoadX509KeyPair(config.clientCert, config.clientKey) if err != nil { - config.logger.Println("error occured loading client certs and keys") + config.logger.Println("error occurred loading client certs and keys") return nil, err } tlsConfig.Certificates = []tls.Certificate{cert} @@ -437,58 +432,24 @@ func defaultTTransport(urlstr string, timeoutms int, config *RealisConfig) (thri transport.TLSClientConfig = tlsConfig } - trans, err := thrift.NewTHttpPostClientWithOptions(urlstr+"/api", - thrift.THttpClientOptions{Client: &http.Client{Timeout: time.Millisecond * time.Duration(timeoutms), Transport: &transport, Jar: jar}}) + trans, err := thrift.NewTHttpClientWithOptions( + url+APIPath, + thrift.THttpClientOptions{ + Client: &http.Client{ + Timeout: time.Millisecond * time.Duration(timeoutMs), + Transport: &transport}}) if err != nil { - return &thrift.THttpClient{}, errors.Wrap(err, "Error creating transport") + return nil, errors.Wrap(err, "Error creating transport") } if err := trans.Open(); err != nil { - return &thrift.THttpClient{}, errors.Wrapf(err, "Error opening connection to %s", urlstr) + return nil, errors.Wrapf(err, "Error opening connection to %s", url) } return trans, nil } -// Create a default configuration of the transport layer, requires a URL to test connection with. -// Uses HTTP Post as transport layer and Thrift JSON as the wire protocol by default. -func newDefaultConfig(url string, timeoutms int, config *RealisConfig) (*RealisConfig, error) { - return newTJSONConfig(url, timeoutms, config) -} - -// Creates a realis config object using HTTP Post and Thrift JSON protocol to communicate with Aurora. -func newTJSONConfig(url string, timeoutms int, config *RealisConfig) (*RealisConfig, error) { - trans, err := defaultTTransport(url, timeoutms, config) - if err != nil { - return &RealisConfig{}, errors.Wrap(err, "Error creating realis config") - } - - httpTrans := (trans).(*thrift.THttpClient) - httpTrans.SetHeader("Content-Type", "application/x-thrift") - httpTrans.SetHeader("User-Agent", "gorealis v"+VERSION) - - return &RealisConfig{transport: trans, protoFactory: thrift.NewTJSONProtocolFactory()}, nil -} - -// Creates a realis config config using HTTP Post and Thrift Binary protocol to communicate with Aurora. -func newTBinaryConfig(url string, timeoutms int, config *RealisConfig) (*RealisConfig, error) { - trans, err := defaultTTransport(url, timeoutms, config) - if err != nil { - return &RealisConfig{}, errors.Wrap(err, "Error creating realis config") - } - - httpTrans := (trans).(*thrift.THttpClient) - httpTrans.DelHeader("Content-Type") // Workaround for using thrift HttpPostClient - - httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary") - httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary") - httpTrans.SetHeader("User-Agent", "gorealis v"+VERSION) - - return &RealisConfig{transport: trans, protoFactory: thrift.NewTBinaryProtocolFactoryDefault()}, nil - -} - func basicAuth(username, password string) string { auth := username + ":" + password return base64.StdEncoding.EncodeToString([]byte(auth)) @@ -534,21 +495,21 @@ func (r *realisClient) Close() { // Uses predefined set of states to retrieve a set of active jobs in Apache Aurora. func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states []aurora.ScheduleStatus) ([]int32, error) { taskQ := &aurora.TaskQuery{ - Role: &key.Role, - Environment: &key.Environment, - JobName: &key.Name, - Statuses: states, + JobKeys: []*aurora.JobKey{{Environment: key.Environment, Role: key.Role, Name: key.Name}}, + Statuses: states, } r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", taskQ) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.GetTasksWithoutConfigs(nil, taskQ) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.client.GetTasksWithoutConfigs(nil, taskQ) + }) // If we encountered an error we couldn't recover from by retrying, return an error to the user if retryErr != nil { - return nil, errors.Wrap(retryErr, "Error querying Aurora Scheduler for active IDs") + return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for active IDs") } // Construct instance id map to stay in line with thrift's representation of sets @@ -565,12 +526,14 @@ func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue r.logger.DebugPrintf("GetJobUpdateSummaries Thrift Payload: %+v\n", jobUpdateQuery) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.readonlyClient.GetJobUpdateSummaries(nil, jobUpdateQuery) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.readonlyClient.GetJobUpdateSummaries(nil, jobUpdateQuery) + }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "Error getting job update summaries from Aurora Scheduler") + return nil, errors.Wrap(retryErr, "error getting job update summaries from Aurora Scheduler") } return resp, nil @@ -580,12 +543,14 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe var result *aurora.GetJobsResult_ - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.readonlyClient.GetJobs(nil, role) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.readonlyClient.GetJobs(nil, role) + }) if retryErr != nil { - return nil, result, errors.Wrap(retryErr, "Error getting Jobs from Aurora Scheduler") + return nil, result, errors.Wrap(retryErr, "error getting Jobs from Aurora Scheduler") } if resp.GetResult_() != nil { @@ -599,12 +564,14 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) { r.logger.DebugPrintf("KillTasks Thrift Payload: %+v %v\n", key, instances) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.KillTasks(nil, key, instances, "") - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.client.KillTasks(nil, key, instances, "") + }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "Error sending Kill command to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler") } return resp, nil } @@ -618,13 +585,15 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { r.logger.DebugPrintf("KillTasks Thrift Payload: %+v\n", key) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - // Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards - return r.client.KillTasks(nil, key, nil, "") - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + // Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards + return r.client.KillTasks(nil, key, nil, "") + }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "Error sending Kill command to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler") } return resp, nil } @@ -637,12 +606,14 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { r.logger.DebugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig()) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.CreateJob(nil, auroraJob.JobConfig()) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.client.CreateJob(nil, auroraJob.JobConfig()) + }) if retryErr != nil { - return resp, errors.Wrap(retryErr, "Error sending Create command to Aurora Scheduler") + return resp, errors.Wrap(retryErr, "error sending Create command to Aurora Scheduler") } return resp, nil } @@ -655,6 +626,10 @@ func (r *realisClient) CreateService(auroraJob Job, settings *aurora.JobUpdateSe resp, err := r.StartJobUpdate(update, "") if err != nil { + if IsTimeout(err) { + return resp, nil, err + } + return resp, nil, errors.Wrap(err, "unable to create service") } @@ -668,12 +643,14 @@ func (r *realisClient) CreateService(auroraJob Job, settings *aurora.JobUpdateSe func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) { r.logger.DebugPrintf("ScheduleCronJob Thrift Payload: %+v\n", auroraJob.JobConfig()) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.ScheduleCronJob(nil, auroraJob.JobConfig()) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.client.ScheduleCronJob(nil, auroraJob.JobConfig()) + }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "Error sending Cron Job Schedule message to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "error sending Cron Job Schedule message to Aurora Scheduler") } return resp, nil } @@ -682,12 +659,14 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, r.logger.DebugPrintf("DescheduleCronJob Thrift Payload: %+v\n", key) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.DescheduleCronJob(nil, key) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.client.DescheduleCronJob(nil, key) + }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "Error sending Cron Job De-schedule message to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "error sending Cron Job De-schedule message to Aurora Scheduler") } return resp, nil @@ -698,12 +677,14 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error r.logger.DebugPrintf("StartCronJob Thrift Payload: %+v\n", key) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.StartCronJob(nil, key) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.client.StartCronJob(nil, key) + }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "Error sending Start Cron Job message to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "error sending Start Cron Job message to Aurora Scheduler") } return resp, nil @@ -713,12 +694,14 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) { r.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instances) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.RestartShards(nil, key, instances) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.client.RestartShards(nil, key, instances) + }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "Error sending Restart command to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler") } return resp, nil } @@ -734,12 +717,14 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) r.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instanceIds) if len(instanceIds) > 0 { - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.RestartShards(nil, key, instanceIds) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.client.RestartShards(nil, key, instanceIds) + }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "Error sending Restart command to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler") } return resp, nil @@ -753,12 +738,19 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au r.logger.DebugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.StartJobUpdate(nil, updateJob.req, message) - }) + resp, retryErr := r.thriftCallWithRetries( + true, + func() (*aurora.Response, error) { + return r.client.StartJobUpdate(nil, updateJob.req, message) + }) if retryErr != nil { - return resp, errors.Wrap(retryErr, "Error sending StartJobUpdate command to Aurora Scheduler") + // A timeout took place when attempting this call, attempt to recover + if IsTimeout(retryErr) { + return resp, retryErr + } else { + return resp, errors.Wrap(retryErr, "error sending StartJobUpdate command to Aurora Scheduler") + } } return resp, nil } @@ -770,12 +762,14 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str r.logger.DebugPrintf("AbortJobUpdate Thrift Payload: %+v %v\n", updateKey, message) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.AbortJobUpdate(nil, &updateKey, message) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.client.AbortJobUpdate(nil, &updateKey, message) + }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "Error sending AbortJobUpdate command to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "error sending AbortJobUpdate command to Aurora Scheduler") } // Make this call synchronous by blocking until it job has successfully transitioned to aborted @@ -785,49 +779,55 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str return resp, err } -//Pause Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. +// Pause Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) { r.logger.DebugPrintf("PauseJobUpdate Thrift Payload: %+v %v\n", updateKey, message) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.PauseJobUpdate(nil, updateKey, message) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.client.PauseJobUpdate(nil, updateKey, message) + }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "Error sending PauseJobUpdate command to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "error sending PauseJobUpdate command to Aurora Scheduler") } return resp, nil } -//Resume Paused Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. +// Resume Paused Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI. func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) { r.logger.DebugPrintf("ResumeJobUpdate Thrift Payload: %+v %v\n", updateKey, message) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.ResumeJobUpdate(nil, updateKey, message) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.client.ResumeJobUpdate(nil, updateKey, message) + }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "Error sending ResumeJobUpdate command to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "error sending ResumeJobUpdate command to Aurora Scheduler") } return resp, nil } -//Pulse Job Update on Aurora. UpdateID is returned from StartJobUpdate or the Aurora web UI. +// Pulse Job Update on Aurora. UpdateID is returned from StartJobUpdate or the Aurora web UI. func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.Response, error) { r.logger.DebugPrintf("PulseJobUpdate Thrift Payload: %+v\n", updateKey) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.PulseJobUpdate(nil, updateKey) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.client.PulseJobUpdate(nil, updateKey) + }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "Error sending PulseJobUpdate command to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "error sending PulseJobUpdate command to Aurora Scheduler") } return resp, nil @@ -839,12 +839,14 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a r.logger.DebugPrintf("AddInstances Thrift Payload: %+v %v\n", instKey, count) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.AddInstances(nil, &instKey, count) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.client.AddInstances(nil, &instKey, count) + }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "Error sending AddInstances command to Aurora Scheduler") + return nil, errors.Wrap(retryErr, "error sending AddInstances command to Aurora Scheduler") } return resp, nil @@ -876,12 +878,14 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.Schedul r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", query) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.GetTasksStatus(nil, query) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.client.GetTasksStatus(nil, query) + }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "Error querying Aurora Scheduler for task status") + return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status") } return response.ScheduleStatusResult(resp).GetTasks(), nil @@ -892,12 +896,14 @@ func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.Pend r.logger.DebugPrintf("GetPendingReason Thrift Payload: %+v\n", query) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.GetPendingReason(nil, query) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.client.GetPendingReason(nil, query) + }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "Error querying Aurora Scheduler for pending Reasons") + return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for pending Reasons") } var pendingReasons []*aurora.PendingReason @@ -914,12 +920,14 @@ func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*auror r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.GetTasksWithoutConfigs(nil, query) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.client.GetTasksWithoutConfigs(nil, query) + }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "Error querying Aurora Scheduler for task status without configs") + return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status without configs") } return response.ScheduleStatusResult(resp).GetTasks(), nil @@ -938,12 +946,14 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", taskQ) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.GetTasksStatus(nil, taskQ) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.client.GetTasksStatus(nil, taskQ) + }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "Error querying Aurora Scheduler for task configuration") + return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task configuration") } tasks := response.ScheduleStatusResult(resp).GetTasks() @@ -964,9 +974,11 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur r.logger.DebugPrintf("GetJobUpdateDetails Thrift Payload: %+v\n", updateQuery) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.GetJobUpdateDetails(nil, &updateQuery) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.client.GetJobUpdateDetails(nil, &updateQuery) + }) if retryErr != nil { return nil, errors.Wrap(retryErr, "Unable to get job update details") @@ -979,259 +991,14 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string r.logger.DebugPrintf("RollbackJobUpdate Thrift Payload: %+v %v\n", key, message) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.client.RollbackJobUpdate(nil, &key, message) - }) + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.client.RollbackJobUpdate(nil, &key, message) + }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "Unable to roll back job update") + return nil, errors.Wrap(retryErr, "unable to roll back job update") } return resp, nil } - -/* Admin functions */ -// TODO(rdelvalle): Consider moving these functions to another interface. It would be a backwards incompatible change, -// but would add safety. - -// Set a list of nodes to DRAINING. This means nothing will be able to be scheduled on them and any existing -// tasks will be killed and re-scheduled elsewhere in the cluster. Tasks from DRAINING nodes are not guaranteed -// to return to running unless there is enough capacity in the cluster to run them. -func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.DrainHostsResult_, error) { - - var result *aurora.DrainHostsResult_ - - if len(hosts) == 0 { - return nil, nil, errors.New("no hosts provided to drain") - } - - drainList := aurora.NewHosts() - drainList.HostNames = hosts - - r.logger.DebugPrintf("DrainHosts Thrift Payload: %v\n", drainList) - - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.DrainHosts(nil, drainList) - }) - - if retryErr != nil { - return resp, result, errors.Wrap(retryErr, "Unable to recover connection") - } - - if resp.GetResult_() != nil { - result = resp.GetResult_().GetDrainHostsResult_() - } - - return resp, result, nil -} - -// Start SLA Aware Drain. -// defaultSlaPolicy is the fallback SlaPolicy to use if a task does not have an SlaPolicy. -// After timeoutSecs, tasks will be forcefully drained without checking SLA. -func (r *realisClient) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, hosts ...string) (*aurora.DrainHostsResult_, error) { - var result *aurora.DrainHostsResult_ - - if len(hosts) == 0 { - return nil, errors.New("no hosts provided to drain") - } - - drainList := aurora.NewHosts() - drainList.HostNames = hosts - - r.logger.DebugPrintf("SLADrainHosts Thrift Payload: %v\n", drainList) - - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.SlaDrainHosts(nil, drainList, policy, timeout) - }) - - if retryErr != nil { - return result, errors.Wrap(retryErr, "Unable to recover connection") - } - - if resp.GetResult_() != nil { - result = resp.GetResult_().GetDrainHostsResult_() - } - - return result, nil -} - -func (r *realisClient) StartMaintenance(hosts ...string) (*aurora.Response, *aurora.StartMaintenanceResult_, error) { - - var result *aurora.StartMaintenanceResult_ - - if len(hosts) == 0 { - return nil, nil, errors.New("no hosts provided to start maintenance on") - } - - hostList := aurora.NewHosts() - hostList.HostNames = hosts - - r.logger.DebugPrintf("StartMaintenance Thrift Payload: %v\n", hostList) - - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.StartMaintenance(nil, hostList) - }) - - if retryErr != nil { - return resp, result, errors.Wrap(retryErr, "Unable to recover connection") - } - - if resp.GetResult_() != nil { - result = resp.GetResult_().GetStartMaintenanceResult_() - } - - return resp, result, nil -} - -func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *aurora.EndMaintenanceResult_, error) { - - var result *aurora.EndMaintenanceResult_ - - if len(hosts) == 0 { - return nil, nil, errors.New("no hosts provided to end maintenance on") - } - - hostList := aurora.NewHosts() - hostList.HostNames = hosts - - r.logger.DebugPrintf("EndMaintenance Thrift Payload: %v\n", hostList) - - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.EndMaintenance(nil, hostList) - }) - - if retryErr != nil { - return resp, result, errors.Wrap(retryErr, "Unable to recover connection") - } - - if resp.GetResult_() != nil { - result = resp.GetResult_().GetEndMaintenanceResult_() - } - - return resp, result, nil -} - -func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *aurora.MaintenanceStatusResult_, error) { - - var result *aurora.MaintenanceStatusResult_ - - if len(hosts) == 0 { - return nil, nil, errors.New("no hosts provided to get maintenance status from") - } - - hostList := aurora.NewHosts() - hostList.HostNames = hosts - - r.logger.DebugPrintf("MaintenanceStatus Thrift Payload: %v\n", hostList) - - // Make thrift call. If we encounter an error sending the call, attempt to reconnect - // and continue trying to resend command until we run out of retries. - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.MaintenanceStatus(nil, hostList) - }) - - if retryErr != nil { - return resp, result, errors.Wrap(retryErr, "Unable to recover connection") - } - - if resp.GetResult_() != nil { - result = resp.GetResult_().GetMaintenanceStatusResult_() - } - - return resp, result, nil -} - -// SetQuota sets a quota aggregate for the given role -// TODO(zircote) Currently investigating an error that is returned from thrift calls that include resources for `NamedPort` and `NumGpu` -func (r *realisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb *int64) (*aurora.Response, error) { - ramRes := aurora.NewResource() - ramRes.RamMb = ramMb - cpuRes := aurora.NewResource() - cpuRes.NumCpus = cpu - diskRes := aurora.NewResource() - diskRes.DiskMb = diskMb - quota := aurora.NewResourceAggregate() - quota.Resources = []*aurora.Resource{cpuRes, ramRes, diskRes} - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.SetQuota(nil, role, quota) - }) - - if retryErr != nil { - return resp, errors.Wrap(retryErr, "Unable to set role quota") - } - return resp, retryErr - -} - -// GetQuota returns the resource aggregate for the given role -func (r *realisClient) GetQuota(role string) (*aurora.Response, error) { - - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.GetQuota(nil, role) - }) - - if retryErr != nil { - return resp, errors.Wrap(retryErr, "Unable to get role quota") - } - return resp, retryErr -} - -// Force Aurora Scheduler to perform a snapshot and write to Mesos log -func (r *realisClient) Snapshot() error { - - _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.Snapshot(nil) - }) - - if retryErr != nil { - return errors.Wrap(retryErr, "Unable to recover connection") - } - - return nil -} - -// Force Aurora Scheduler to write backup file to a file in the backup directory -func (r *realisClient) PerformBackup() error { - - _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.PerformBackup(nil) - }) - - if retryErr != nil { - return errors.Wrap(retryErr, "Unable to recover connection") - } - - return nil -} - -func (r *realisClient) ForceImplicitTaskReconciliation() error { - - _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.TriggerImplicitTaskReconciliation(nil) - }) - - if retryErr != nil { - return errors.Wrap(retryErr, "Unable to recover connection") - } - - return nil -} - -func (r *realisClient) ForceExplicitTaskReconciliation(batchSize *int32) error { - - if batchSize != nil && *batchSize < 1 { - return errors.New("Invalid batch size.") - } - settings := aurora.NewExplicitReconciliationSettings() - - settings.BatchSize = batchSize - - _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { - return r.adminClient.TriggerExplicitTaskReconciliation(nil, settings) - }) - - if retryErr != nil { - return errors.Wrap(retryErr, "Unable to recover connection") - } - - return nil -} diff --git a/realis_admin.go b/realis_admin.go new file mode 100644 index 0000000..8ce3ea3 --- /dev/null +++ b/realis_admin.go @@ -0,0 +1,269 @@ +package realis + +import ( + "github.com/paypal/gorealis/gen-go/apache/aurora" + "github.com/pkg/errors" +) + +// TODO(rdelvalle): Consider moving these functions to another interface. It would be a backwards incompatible change, +// but would add safety. + +// Set a list of nodes to DRAINING. This means nothing will be able to be scheduled on them and any existing +// tasks will be killed and re-scheduled elsewhere in the cluster. Tasks from DRAINING nodes are not guaranteed +// to return to running unless there is enough capacity in the cluster to run them. +func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.DrainHostsResult_, error) { + + var result *aurora.DrainHostsResult_ + + if len(hosts) == 0 { + return nil, nil, errors.New("no hosts provided to drain") + } + + drainList := aurora.NewHosts() + drainList.HostNames = hosts + + r.logger.DebugPrintf("DrainHosts Thrift Payload: %v\n", drainList) + + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.adminClient.DrainHosts(nil, drainList) + }) + + if retryErr != nil { + return resp, result, errors.Wrap(retryErr, "Unable to recover connection") + } + + if resp.GetResult_() != nil { + result = resp.GetResult_().GetDrainHostsResult_() + } + + return resp, result, nil +} + +// Start SLA Aware Drain. +// defaultSlaPolicy is the fallback SlaPolicy to use if a task does not have an SlaPolicy. +// After timeoutSecs, tasks will be forcefully drained without checking SLA. +func (r *realisClient) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, hosts ...string) (*aurora.DrainHostsResult_, error) { + var result *aurora.DrainHostsResult_ + + if len(hosts) == 0 { + return nil, errors.New("no hosts provided to drain") + } + + drainList := aurora.NewHosts() + drainList.HostNames = hosts + + r.logger.DebugPrintf("SLADrainHosts Thrift Payload: %v\n", drainList) + + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.adminClient.SlaDrainHosts(nil, drainList, policy, timeout) + }) + + if retryErr != nil { + return result, errors.Wrap(retryErr, "Unable to recover connection") + } + + if resp.GetResult_() != nil { + result = resp.GetResult_().GetDrainHostsResult_() + } + + return result, nil +} + +func (r *realisClient) StartMaintenance(hosts ...string) (*aurora.Response, *aurora.StartMaintenanceResult_, error) { + + var result *aurora.StartMaintenanceResult_ + + if len(hosts) == 0 { + return nil, nil, errors.New("no hosts provided to start maintenance on") + } + + hostList := aurora.NewHosts() + hostList.HostNames = hosts + + r.logger.DebugPrintf("StartMaintenance Thrift Payload: %v\n", hostList) + + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.adminClient.StartMaintenance(nil, hostList) + }) + + if retryErr != nil { + return resp, result, errors.Wrap(retryErr, "Unable to recover connection") + } + + if resp.GetResult_() != nil { + result = resp.GetResult_().GetStartMaintenanceResult_() + } + + return resp, result, nil +} + +func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *aurora.EndMaintenanceResult_, error) { + + var result *aurora.EndMaintenanceResult_ + + if len(hosts) == 0 { + return nil, nil, errors.New("no hosts provided to end maintenance on") + } + + hostList := aurora.NewHosts() + hostList.HostNames = hosts + + r.logger.DebugPrintf("EndMaintenance Thrift Payload: %v\n", hostList) + + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.adminClient.EndMaintenance(nil, hostList) + }) + + if retryErr != nil { + return resp, result, errors.Wrap(retryErr, "Unable to recover connection") + } + + if resp.GetResult_() != nil { + result = resp.GetResult_().GetEndMaintenanceResult_() + } + + return resp, result, nil +} + +func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *aurora.MaintenanceStatusResult_, error) { + + var result *aurora.MaintenanceStatusResult_ + + if len(hosts) == 0 { + return nil, nil, errors.New("no hosts provided to get maintenance status from") + } + + hostList := aurora.NewHosts() + hostList.HostNames = hosts + + r.logger.DebugPrintf("MaintenanceStatus Thrift Payload: %v\n", hostList) + + // Make thrift call. If we encounter an error sending the call, attempt to reconnect + // and continue trying to resend command until we run out of retries. + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.adminClient.MaintenanceStatus(nil, hostList) + }) + + if retryErr != nil { + return resp, result, errors.Wrap(retryErr, "Unable to recover connection") + } + + if resp.GetResult_() != nil { + result = resp.GetResult_().GetMaintenanceStatusResult_() + } + + return resp, result, nil +} + +// SetQuota sets a quota aggregate for the given role +// TODO(zircote) Currently investigating an error that is returned from thrift calls that include resources for `NamedPort` and `NumGpu` +func (r *realisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb *int64) (*aurora.Response, error) { + quota := &aurora.ResourceAggregate{ + Resources: []*aurora.Resource{{NumCpus: cpu}, {RamMb: ramMb}, {DiskMb: diskMb}}, + } + + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.adminClient.SetQuota(nil, role, quota) + }) + + if retryErr != nil { + return resp, errors.Wrap(retryErr, "Unable to set role quota") + } + return resp, retryErr + +} + +// GetQuota returns the resource aggregate for the given role +func (r *realisClient) GetQuota(role string) (*aurora.Response, error) { + + resp, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.adminClient.GetQuota(nil, role) + }) + + if retryErr != nil { + return resp, errors.Wrap(retryErr, "Unable to get role quota") + } + return resp, retryErr +} + +// Force Aurora Scheduler to perform a snapshot and write to Mesos log +func (r *realisClient) Snapshot() error { + + _, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.adminClient.Snapshot(nil) + }) + + if retryErr != nil { + return errors.Wrap(retryErr, "Unable to recover connection") + } + + return nil +} + +// Force Aurora Scheduler to write backup file to a file in the backup directory +func (r *realisClient) PerformBackup() error { + + _, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.adminClient.PerformBackup(nil) + }) + + if retryErr != nil { + return errors.Wrap(retryErr, "Unable to recover connection") + } + + return nil +} + +func (r *realisClient) ForceImplicitTaskReconciliation() error { + + _, retryErr := r.thriftCallWithRetries( + false, + func() (*aurora.Response, error) { + return r.adminClient.TriggerImplicitTaskReconciliation(nil) + }) + + if retryErr != nil { + return errors.Wrap(retryErr, "Unable to recover connection") + } + + return nil +} + +func (r *realisClient) ForceExplicitTaskReconciliation(batchSize *int32) error { + + if batchSize != nil && *batchSize < 1 { + return errors.New("Invalid batch size.") + } + settings := aurora.NewExplicitReconciliationSettings() + + settings.BatchSize = batchSize + + _, retryErr := r.thriftCallWithRetries(false, + func() (*aurora.Response, error) { + return r.adminClient.TriggerExplicitTaskReconciliation(nil, settings) + }) + + if retryErr != nil { + return errors.Wrap(retryErr, "Unable to recover connection") + } + + return nil +} diff --git a/realis_e2e_test.go b/realis_e2e_test.go index 9f011f0..cc09b7b 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -35,11 +35,13 @@ var r realis.Realis var monitor *realis.Monitor var thermosPayload []byte +const auroraURL = "http://192.168.33.7:8081" + func TestMain(m *testing.M) { var err error // New configuration to connect to docker container - r, err = realis.NewRealisClient(realis.SchedulerUrl("http://192.168.33.7:8081"), + r, err = realis.NewRealisClient(realis.SchedulerUrl(auroraURL), realis.BasicAuth("aurora", "secret"), realis.TimeoutMS(20000)) @@ -93,7 +95,7 @@ func TestNonExistentEndpoint(t *testing.T) { } func TestThriftBinary(t *testing.T) { - r, err := realis.NewRealisClient(realis.SchedulerUrl("http://192.168.33.7:8081"), + r, err := realis.NewRealisClient(realis.SchedulerUrl(auroraURL), realis.BasicAuth("aurora", "secret"), realis.TimeoutMS(20000), realis.ThriftBinary()) @@ -115,7 +117,7 @@ func TestThriftBinary(t *testing.T) { } func TestThriftJSON(t *testing.T) { - r, err := realis.NewRealisClient(realis.SchedulerUrl("http://192.168.33.7:8081"), + r, err := realis.NewRealisClient(realis.SchedulerUrl(auroraURL), realis.BasicAuth("aurora", "secret"), realis.TimeoutMS(20000), realis.ThriftJSON()) @@ -137,7 +139,7 @@ func TestThriftJSON(t *testing.T) { } func TestNoopLogger(t *testing.T) { - r, err := realis.NewRealisClient(realis.SchedulerUrl("http://192.168.33.7:8081"), + r, err := realis.NewRealisClient(realis.SchedulerUrl(auroraURL), realis.BasicAuth("aurora", "secret"), realis.SetLogger(realis.NoopLogger{})) @@ -161,6 +163,8 @@ func TestLeaderFromZK(t *testing.T) { url, err := realis.LeaderFromZK(*cluster) assert.NoError(t, err) + + // Address stored inside of ZK might be different than the one we connect to in our tests. assert.Equal(t, "http://192.168.33.7:8081", url) } @@ -271,6 +275,25 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) { assert.True(t, success) assert.NoError(t, err) }) + + t.Run("Duplicate_Metadata", func(t *testing.T) { + job.Name("thermos_duplicate_metadata"). + AddLabel("hostname", "cookie"). + AddLabel("hostname", "candy"). + AddLabel("hostname", "popcorn"). + AddLabel("hostname", "chips"). + AddLabel("chips", "chips") + + _, err := r.CreateJob(job) + assert.NoError(t, err) + + success, err := monitor.Instances(job.JobKey(), 2, 1, 50) + assert.True(t, success) + assert.NoError(t, err) + + _, err = r.KillJob(job.JobKey()) + assert.NoError(t, err) + }) } // Test configuring an executor that doesn't exist for CreateJob API @@ -461,6 +484,64 @@ func TestRealisClient_CreateService(t *testing.T) { // Kill task test task after confirming it came up fine _, err = r.KillJob(job.JobKey()) assert.NoError(t, err) + + success, err := monitor.Instances(job.JobKey(), 0, 1, 50) + assert.True(t, success) + + // Create a client which will timeout and close the connection before receiving an answer + timeoutClient, err := realis.NewRealisClient(realis.SchedulerUrl(auroraURL), + realis.BasicAuth("aurora", "secret"), + realis.TimeoutMS(10)) + assert.NoError(t, err) + defer timeoutClient.Close() + + // Test case where http connection timeouts out. + t.Run("TimeoutError", func(t *testing.T) { + job.Name("createService_timeout") + + // Make sure a timedout error was returned + _, _, err = timeoutClient.CreateService(job, settings) + assert.Error(t, err) + assert.True(t, realis.IsTimeout(err)) + + updateReceivedQuery := aurora.JobUpdateQuery{ + Role: &job.JobKey().Role, + JobKey: job.JobKey(), + UpdateStatuses: aurora.ACTIVE_JOB_UPDATE_STATES, + Limit: 1} + + updateSummaries, err := monitor.JobUpdateQuery(updateReceivedQuery, time.Second*1, time.Second*50) + assert.NoError(t, err) + + assert.Len(t, updateSummaries, 1) + + _, err = r.AbortJobUpdate(*updateSummaries[0].Key, "Cleaning up") + _, err = r.KillJob(job.JobKey()) + assert.NoError(t, err) + + }) + + // Test case where http connection timeouts out. + t.Run("TimeoutError_BadPayload", func(t *testing.T) { + // Illegal payload + job.InstanceCount(-1) + job.Name("createService_timeout_bad_payload") + + // Make sure a timedout error was returned + _, _, err = timeoutClient.CreateService(job, settings) + assert.Error(t, err) + assert.True(t, realis.IsTimeout(err)) + + summary, err := r.GetJobUpdateSummaries( + &aurora.JobUpdateQuery{ + Role: &job.JobKey().Role, + JobKey: job.JobKey(), + UpdateStatuses: aurora.ACTIVE_JOB_UPDATE_STATES}) + assert.NoError(t, err) + + // Payload should have been rejected, no update should exist + assert.Len(t, summary.GetResult_().GetGetJobUpdateSummariesResult_().GetUpdateSummaries(), 0) + }) } // Test configuring an executor that doesn't exist for CreateJob API diff --git a/retry.go b/retry.go index 68b13cf..256e509 100644 --- a/retry.go +++ b/retry.go @@ -28,7 +28,7 @@ import ( type Backoff struct { Duration time.Duration // the base duration - Factor float64 // Duration is multipled by factor each iteration + Factor float64 // Duration is multiplied by a factor each iteration Jitter float64 // The amount of jitter applied each iteration Steps int // Exit with error after this many steps } @@ -77,7 +77,7 @@ func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc) adjusted = Jitter(duration, backoff.Jitter) } - logger.Printf("A retriable error occurred during function call, backing off for %v before retrying\n", adjusted) + logger.Printf("A retryable error occurred during function call, backing off for %v before retrying\n", adjusted) time.Sleep(adjusted) duration = time.Duration(float64(duration) * backoff.Factor) } @@ -116,10 +116,11 @@ func ExponentialBackoff(backoff Backoff, logger Logger, condition ConditionFunc) type auroraThriftCall func() (resp *aurora.Response, err error) // Duplicates the functionality of ExponentialBackoff but is specifically targeted towards ThriftCalls. -func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*aurora.Response, error) { +func (r *realisClient) thriftCallWithRetries(returnOnTimeout bool, thriftCall auroraThriftCall) (*aurora.Response, error) { var resp *aurora.Response var clientErr error var curStep int + timeouts := 0 backoff := r.config.backoff duration := backoff.Duration @@ -133,7 +134,7 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro adjusted = Jitter(duration, backoff.Jitter) } - r.logger.Printf("A retriable error occurred during thrift call, backing off for %v before retry %v\n", adjusted, curStep) + r.logger.Printf("A retryable error occurred during thrift call, backing off for %v before retry %v\n", adjusted, curStep) time.Sleep(adjusted) duration = time.Duration(float64(duration) * backoff.Factor) @@ -151,7 +152,7 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro r.logger.TracePrintf("Aurora Thrift Call ended resp: %v clientErr: %v\n", resp, clientErr) }() - // Check if our thrift call is returning an error. This is a retriable event as we don't know + // Check if our thrift call is returning an error. This is a retryable event as we don't know // if it was caused by network issues. if clientErr != nil { @@ -169,7 +170,20 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro // when the server is overloaded and should be retried. All other errors that are permanent // will not be retried. if e.Err != io.EOF && !e.Temporary() { - return nil, errors.Wrap(clientErr, "Permanent connection error") + return nil, errors.Wrap(clientErr, "permanent connection error") + } + + // Corner case where thrift payload was received by Aurora but connection timedout before Aurora was + // able to reply. In this case we will return whatever response was received and a TimedOut behaving + // error. Users can take special action on a timeout by using IsTimedout and reacting accordingly. + if e.Timeout() { + timeouts++ + r.logger.DebugPrintf( + "Client closed connection (timedout) %d times before server responded, consider increasing connection timeout", + timeouts) + if returnOnTimeout { + return resp, newTimedoutError(errors.New("client connection closed before server answer")) + } } } } @@ -183,7 +197,7 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro // If there was no client error, but the response is nil, something went wrong. // Ideally, we'll never encounter this but we're placing a safeguard here. if resp == nil { - return nil, errors.New("Response from aurora is nil") + return nil, errors.New("response from aurora is nil") } // Check Response Code from thrift and make a decision to continue retrying or not. @@ -210,7 +224,7 @@ func (r *realisClient) thriftCallWithRetries(thriftCall auroraThriftCall) (*auro // It is currently not used as a response in the scheduler so it is unknown how to handle it. default: r.logger.DebugPrintf("unhandled response code %v received from Aurora\n", responseCode) - return nil, errors.Errorf("unhandled response code from Aurora %v\n", responseCode.String()) + return nil, errors.Errorf("unhandled response code from Aurora %v", responseCode.String()) } } diff --git a/runTests.sh b/runTests.sh new file mode 100755 index 0000000..e213972 --- /dev/null +++ b/runTests.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +docker-compose up -d + +# If running docker-compose up gives any error, don't do anything. +if [ $? -ne 0 ]; then + exit +fi + +# Since we run our docker compose setup in bridge mode to be able to run on MacOS, we have to launch a Docker container within the bridge network in order to avoid any routing issues. +docker run --rm -t -v $(pwd):/go/src/github.com/paypal/gorealis --network gorealis_aurora_cluster golang:1.10-stretch go test -v github.com/paypal/gorealis $@ + +docker-compose down diff --git a/runTestsMac.sh b/runTestsMac.sh old mode 100755 new mode 100644 index 4c695de..d60d85f --- a/runTestsMac.sh +++ b/runTestsMac.sh @@ -1,4 +1,4 @@ #!/bin/bash # Since we run our docker compose setup in bridge mode to be able to run on MacOS, we have to launch a Docker container within the bridge network in order to avoid any routing issues. -docker run -t -v $(pwd):/go/src/github.com/paypal/gorealis --network gorealis_aurora_cluster golang:1.10-stretch go test -v github.com/paypal/gorealis $@ +docker run --rm -t -v $(pwd):/go/src/github.com/paypal/gorealis --network gorealis_aurora_cluster golang:1.10-stretch go test -v github.com/paypal/gorealis $@ diff --git a/zk.go b/zk.go index dd711e0..a9eb92d 100644 --- a/zk.go +++ b/zk.go @@ -103,7 +103,7 @@ func LeaderFromZKOpts(options ...ZKOpt) (string, error) { c, _, err := zk.Connect(config.endpoints, config.timeout, func(c *zk.Conn) { c.SetLogger(config.logger) }) if err != nil { - return false, NewTemporaryError(errors.Wrap(err, "Failed to connect to Zookeeper")) + return false, NewTemporaryError(errors.Wrap(err, "failed to connect to Zookeeper")) } defer c.Close() @@ -117,7 +117,7 @@ func LeaderFromZKOpts(options ...ZKOpt) (string, error) { return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", config.path) } - return false, NewTemporaryError(errors.Wrapf(err, "Path %s doesn't exist on Zookeeper ", config.path)) + return false, NewTemporaryError(errors.Wrapf(err, "path %s doesn't exist on Zookeeper ", config.path)) } // Search for the leader through all the children in the given path @@ -134,12 +134,12 @@ func LeaderFromZKOpts(options ...ZKOpt) (string, error) { return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", childPath) } - return false, NewTemporaryError(errors.Wrap(err, "Error fetching contents of leader")) + return false, NewTemporaryError(errors.Wrap(err, "unable to fetch contents of leader")) } err = json.Unmarshal([]byte(data), serviceInst) if err != nil { - return false, NewTemporaryError(errors.Wrap(err, "Unable to unmarshall contents of leader")) + return false, NewTemporaryError(errors.Wrap(err, "unable to unmarshal contents of leader")) } // Should only be one endpoint. @@ -162,11 +162,11 @@ func LeaderFromZKOpts(options ...ZKOpt) (string, error) { } // Leader data might not be available yet, try to fetch again. - return false, NewTemporaryError(errors.New("No leader found")) + return false, NewTemporaryError(errors.New("no leader found")) }) if retryErr != nil { - config.logger.Printf("Failed to determine leader after %v attempts", config.backoff.Steps) + config.logger.Printf("failed to determine leader after %v attempts", config.backoff.Steps) return "", retryErr }