Removing cookiejar from transport layer. Changing all error messages to start with a lower case letter. Changing some messages around to be more descriptive.

This commit is contained in:
Renan DelValle 2019-04-30 11:33:25 -07:00
parent c90db4773c
commit ee1a95831c
No known key found for this signature in database
GPG key ID: C240AD6D6F443EC9
3 changed files with 44 additions and 42 deletions

2
job.go
View file

@ -123,7 +123,7 @@ func (j *AuroraJob) Environment(env string) Job {
func (j *AuroraJob) Role(role string) Job { func (j *AuroraJob) Role(role string) Job {
j.jobConfig.Key.Role = role j.jobConfig.Key.Role = role
//Will be deprecated // Will be deprecated
identity := &aurora.Identity{User: role} identity := &aurora.Identity{User: role}
j.jobConfig.Owner = identity j.jobConfig.Owner = identity
j.jobConfig.TaskConfig.Owner = identity j.jobConfig.TaskConfig.Owner = identity

View file

@ -52,7 +52,10 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout
switch status { switch status {
case aurora.JobUpdateStatus_ROLLED_FORWARD: case aurora.JobUpdateStatus_ROLLED_FORWARD:
return true, nil return true, nil
case aurora.JobUpdateStatus_ROLLED_BACK, aurora.JobUpdateStatus_ABORTED, aurora.JobUpdateStatus_ERROR, aurora.JobUpdateStatus_FAILED: case aurora.JobUpdateStatus_ROLLED_BACK,
aurora.JobUpdateStatus_ABORTED,
aurora.JobUpdateStatus_ERROR,
aurora.JobUpdateStatus_FAILED:
return false, errors.Errorf("bad terminal state for update: %v", status) return false, errors.Errorf("bad terminal state for update: %v", status)
default: default:
return false, errors.Errorf("unexpected update state: %v", status) return false, errors.Errorf("unexpected update state: %v", status)
@ -87,7 +90,7 @@ func (m *Monitor) JobUpdateStatus(updateKey aurora.JobUpdateKey,
if len(updateDetail) == 0 { if len(updateDetail) == 0 {
m.Client.RealisConfig().logger.Println("No update found") m.Client.RealisConfig().logger.Println("No update found")
return aurora.JobUpdateStatus(-1), errors.New("No update found for " + updateKey.String()) return aurora.JobUpdateStatus(-1), errors.New("no update found for " + updateKey.String())
} }
status := updateDetail[0].Update.Summary.State.Status status := updateDetail[0].Update.Summary.State.Status
@ -109,7 +112,12 @@ func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval, timeo
// Monitor a Job until all instances enter a desired status. // Monitor a Job until all instances enter a desired status.
// Defaults sets of desired statuses provided by the thrift API include: // Defaults sets of desired statuses provided by the thrift API include:
// ACTIVE_STATES, SLAVE_ASSIGNED_STATES, LIVE_STATES, and TERMINAL_STATES // ACTIVE_STATES, SLAVE_ASSIGNED_STATES, LIVE_STATES, and TERMINAL_STATES
func (m *Monitor) ScheduleStatus(key *aurora.JobKey, instanceCount int32, desiredStatuses map[aurora.ScheduleStatus]bool, interval, timeout int) (bool, error) { func (m *Monitor) ScheduleStatus(
key *aurora.JobKey,
instanceCount int32,
desiredStatuses map[aurora.ScheduleStatus]bool,
interval int,
timeout int) (bool, error) {
ticker := time.NewTicker(time.Second * time.Duration(interval)) ticker := time.NewTicker(time.Second * time.Duration(interval))
defer ticker.Stop() defer ticker.Stop()

View file

@ -23,7 +23,6 @@ import (
"io/ioutil" "io/ioutil"
"log" "log"
"net/http" "net/http"
"net/http/cookiejar"
"os" "os"
"path/filepath" "path/filepath"
"sort" "sort"
@ -242,24 +241,24 @@ func Trace() ClientOption {
func newTJSONTransport(url string, timeout int, config *RealisConfig) (thrift.TTransport, error) { func newTJSONTransport(url string, timeout int, config *RealisConfig) (thrift.TTransport, error) {
trans, err := defaultTTransport(url, timeout, config) trans, err := defaultTTransport(url, timeout, config)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "Error creating realis") return nil, errors.Wrap(err, "unable to create transport")
} }
httpTrans := (trans).(*thrift.THttpClient) httpTrans := (trans).(*thrift.THttpClient)
httpTrans.SetHeader("Content-Type", "application/x-thrift") httpTrans.SetHeader("Content-Type", "application/x-thrift")
httpTrans.SetHeader("User-Agent", "GoRealis v"+VERSION) httpTrans.SetHeader("User-Agent", "gorealis v"+VERSION)
return trans, err return trans, err
} }
func newTBinTransport(url string, timeout int, config *RealisConfig) (thrift.TTransport, error) { func newTBinTransport(url string, timeout int, config *RealisConfig) (thrift.TTransport, error) {
trans, err := defaultTTransport(url, timeout, config) trans, err := defaultTTransport(url, timeout, config)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "Error creating realis") return nil, errors.Wrap(err, "unable to create transport")
} }
httpTrans := (trans).(*thrift.THttpClient) httpTrans := (trans).(*thrift.THttpClient)
httpTrans.DelHeader("Content-Type") // Workaround for using thrift HttpPostClient httpTrans.DelHeader("Content-Type") // Workaround for using thrift HttpPostClient
httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary") httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary")
httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary") httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary")
httpTrans.SetHeader("User-Agent", "GoRealis v"+VERSION) httpTrans.SetHeader("User-Agent", "gorealis v"+VERSION)
return trans, err return trans, err
} }
@ -320,7 +319,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
if config.zkOptions != nil { if config.zkOptions != nil {
url, err = LeaderFromZKOpts(config.zkOptions...) url, err = LeaderFromZKOpts(config.zkOptions...)
if err != nil { if err != nil {
return nil, NewTemporaryError(errors.Wrap(err, "LeaderFromZK error")) return nil, NewTemporaryError(errors.Wrap(err, "unable to use zk to get leader"))
} }
config.logger.Println("Scheduler URL from ZK: ", url) config.logger.Println("Scheduler URL from ZK: ", url)
} else if config.cluster != nil { } else if config.cluster != nil {
@ -329,20 +328,20 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
url, err = LeaderFromZK(*config.cluster) url, err = LeaderFromZK(*config.cluster)
// If ZK is configured, throw an error if the leader is unable to be determined // If ZK is configured, throw an error if the leader is unable to be determined
if err != nil { if err != nil {
return nil, NewTemporaryError(errors.Wrap(err, "LeaderFromZK error")) return nil, NewTemporaryError(errors.Wrap(err, "unable to use zk to get leader "))
} }
config.logger.Println("Scheduler URL from ZK: ", url) config.logger.Println("Scheduler URL from ZK: ", url)
} else if config.url != "" { } else if config.url != "" {
url = config.url url = config.url
config.logger.Println("Scheduler URL: ", url) config.logger.Println("Scheduler URL: ", url)
} else { } else {
return nil, errors.New("Incomplete Options -- url, cluster.json, or Zookeeper address required") return nil, errors.New("incomplete Options -- url, cluster.json, or Zookeeper address required")
} }
if config.jsonTransport { if config.jsonTransport {
trans, err := newTJSONTransport(url, config.timeoutms, config) trans, err := newTJSONTransport(url, config.timeoutms, config)
if err != nil { if err != nil {
return nil, NewTemporaryError(errors.Wrap(err, "Error creating realis")) return nil, NewTemporaryError(err)
} }
config.transport = trans config.transport = trans
config.protoFactory = thrift.NewTJSONProtocolFactory() config.protoFactory = thrift.NewTJSONProtocolFactory()
@ -350,7 +349,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
} else if config.binTransport { } else if config.binTransport {
trans, err := newTBinTransport(url, config.timeoutms, config) trans, err := newTBinTransport(url, config.timeoutms, config)
if err != nil { if err != nil {
return nil, NewTemporaryError(errors.Wrap(err, "Error creating realis")) return nil, NewTemporaryError(err)
} }
config.transport = trans config.transport = trans
config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault() config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault()
@ -404,10 +403,6 @@ func GetCerts(certPath string) (*x509.CertPool, error) {
// Creates a default Thrift Transport object for communications in gorealis using an HTTP Post Client // Creates a default Thrift Transport object for communications in gorealis using an HTTP Post Client
func defaultTTransport(url string, timeoutMs int, config *RealisConfig) (thrift.TTransport, error) { func defaultTTransport(url string, timeoutMs int, config *RealisConfig) (thrift.TTransport, error) {
jar, err := cookiejar.New(nil)
if err != nil {
return &thrift.THttpClient{}, errors.Wrap(err, "Error creating Cookie Jar")
}
var transport http.Transport var transport http.Transport
if config != nil { if config != nil {
tlsConfig := &tls.Config{} tlsConfig := &tls.Config{}
@ -444,8 +439,7 @@ func defaultTTransport(url string, timeoutMs int, config *RealisConfig) (thrift.
thrift.THttpClientOptions{ thrift.THttpClientOptions{
Client: &http.Client{ Client: &http.Client{
Timeout: time.Millisecond * time.Duration(timeoutMs), Timeout: time.Millisecond * time.Duration(timeoutMs),
Transport: &transport, Transport: &transport}})
Jar: jar}})
if err != nil { if err != nil {
return nil, errors.Wrap(err, "Error creating transport") return nil, errors.Wrap(err, "Error creating transport")
@ -517,7 +511,7 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states []aurora.Schedu
// If we encountered an error we couldn't recover from by retrying, return an error to the user // If we encountered an error we couldn't recover from by retrying, return an error to the user
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "Error querying Aurora Scheduler for active IDs") return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for active IDs")
} }
// Construct instance id map to stay in line with thrift's representation of sets // Construct instance id map to stay in line with thrift's representation of sets
@ -539,7 +533,7 @@ func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue
}) })
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "Error getting job update summaries from Aurora Scheduler") return nil, errors.Wrap(retryErr, "error getting job update summaries from Aurora Scheduler")
} }
return resp, nil return resp, nil
@ -554,7 +548,7 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe
}) })
if retryErr != nil { if retryErr != nil {
return nil, result, errors.Wrap(retryErr, "Error getting Jobs from Aurora Scheduler") return nil, result, errors.Wrap(retryErr, "error getting Jobs from Aurora Scheduler")
} }
if resp.GetResult_() != nil { if resp.GetResult_() != nil {
@ -573,7 +567,7 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a
}) })
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "Error sending Kill command to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler")
} }
return resp, nil return resp, nil
} }
@ -593,7 +587,7 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
}) })
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "Error sending Kill command to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending Kill command to Aurora Scheduler")
} }
return resp, nil return resp, nil
} }
@ -611,7 +605,7 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
}) })
if retryErr != nil { if retryErr != nil {
return resp, errors.Wrap(retryErr, "Error sending Create command to Aurora Scheduler") return resp, errors.Wrap(retryErr, "error sending Create command to Aurora Scheduler")
} }
return resp, nil return resp, nil
} }
@ -642,7 +636,7 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
}) })
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "Error sending Cron Job Schedule message to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending Cron Job Schedule message to Aurora Scheduler")
} }
return resp, nil return resp, nil
} }
@ -656,7 +650,7 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response,
}) })
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "Error sending Cron Job De-schedule message to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending Cron Job De-schedule message to Aurora Scheduler")
} }
return resp, nil return resp, nil
@ -672,7 +666,7 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error
}) })
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "Error sending Start Cron Job message to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending Start Cron Job message to Aurora Scheduler")
} }
return resp, nil return resp, nil
@ -687,7 +681,7 @@ func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32)
}) })
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "Error sending Restart command to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler")
} }
return resp, nil return resp, nil
} }
@ -708,7 +702,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
}) })
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "Error sending Restart command to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending Restart command to Aurora Scheduler")
} }
return resp, nil return resp, nil
@ -727,7 +721,7 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au
}) })
if retryErr != nil { if retryErr != nil {
return resp, errors.Wrap(retryErr, "Error sending StartJobUpdate command to Aurora Scheduler") return resp, errors.Wrap(retryErr, "error sending StartJobUpdate command to Aurora Scheduler")
} }
return resp, nil return resp, nil
} }
@ -744,7 +738,7 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str
}) })
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "Error sending AbortJobUpdate command to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending AbortJobUpdate command to Aurora Scheduler")
} }
// Make this call synchronous by blocking until it job has successfully transitioned to aborted // Make this call synchronous by blocking until it job has successfully transitioned to aborted
@ -764,7 +758,7 @@ func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st
}) })
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "Error sending PauseJobUpdate command to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending PauseJobUpdate command to Aurora Scheduler")
} }
return resp, nil return resp, nil
@ -780,7 +774,7 @@ func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s
}) })
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "Error sending ResumeJobUpdate command to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending ResumeJobUpdate command to Aurora Scheduler")
} }
return resp, nil return resp, nil
@ -796,7 +790,7 @@ func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R
}) })
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "Error sending PulseJobUpdate command to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending PulseJobUpdate command to Aurora Scheduler")
} }
return resp, nil return resp, nil
@ -813,7 +807,7 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a
}) })
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "Error sending AddInstances command to Aurora Scheduler") return nil, errors.Wrap(retryErr, "error sending AddInstances command to Aurora Scheduler")
} }
return resp, nil return resp, nil
@ -850,7 +844,7 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.Schedul
}) })
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "Error querying Aurora Scheduler for task status") return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status")
} }
return response.ScheduleStatusResult(resp).GetTasks(), nil return response.ScheduleStatusResult(resp).GetTasks(), nil
@ -866,7 +860,7 @@ func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.Pend
}) })
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "Error querying Aurora Scheduler for pending Reasons") return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for pending Reasons")
} }
var pendingReasons []*aurora.PendingReason var pendingReasons []*aurora.PendingReason
@ -888,7 +882,7 @@ func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*auror
}) })
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "Error querying Aurora Scheduler for task status without configs") return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status without configs")
} }
return response.ScheduleStatusResult(resp).GetTasks(), nil return response.ScheduleStatusResult(resp).GetTasks(), nil
@ -912,7 +906,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
}) })
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "Error querying Aurora Scheduler for task configuration") return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task configuration")
} }
tasks := response.ScheduleStatusResult(resp).GetTasks() tasks := response.ScheduleStatusResult(resp).GetTasks()
@ -953,7 +947,7 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string
}) })
if retryErr != nil { if retryErr != nil {
return nil, errors.Wrap(retryErr, "Unable to roll back job update") return nil, errors.Wrap(retryErr, "unable to roll back job update")
} }
return resp, nil return resp, nil
} }