diff --git a/realis.go b/realis.go index a006d20..a1b3b6c 100644 --- a/realis.go +++ b/realis.go @@ -17,12 +17,14 @@ package realis import ( "crypto/tls" + "crypto/x509" "encoding/base64" "fmt" + "io/ioutil" "math/rand" "net/http" "net/http/cookiejar" - "strings" + "path/filepath" "time" "git.apache.org/thrift.git/lib/go/thrift" @@ -82,6 +84,8 @@ type RealisConfig struct { transport thrift.TTransport protoFactory thrift.TProtocolFactory logger Logger + Insecure bool + certspath string } type Backoff struct { @@ -164,10 +168,8 @@ func SetLogger(l Logger) ClientOption { } } - -func newTJSONTransport(url string, timeout int, secure bool) (thrift.TTransport, error) { - - trans, err := defaultTTransport(url, timeout, secure) +func newTJSONTransport(url string, timeout int, config *RealisConfig) (thrift.TTransport, error) { + trans, err := defaultTTransport(url, timeout, config) if err != nil { return nil, errors.Wrap(err, "Error creating realis") } @@ -177,8 +179,8 @@ func newTJSONTransport(url string, timeout int, secure bool) (thrift.TTransport, return trans, err } -func newTBinTransport(url string, timeout int, secure bool) (thrift.TTransport, error) { - trans, err := defaultTTransport(url, timeout, secure) +func newTBinTransport(url string, timeout int, config *RealisConfig) (thrift.TTransport, error) { + trans, err := defaultTTransport(url, timeout, config) if err != nil { return nil, errors.Wrap(err, "Error creating realis") } @@ -231,7 +233,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { } if config.jsonTransport { - trans, err := newTJSONTransport(url, config.timeoutms, config.secure) + trans, err := newTJSONTransport(url, config.timeoutms, config) if err != nil { return nil, errors.Wrap(err, "Error creating realis") } @@ -239,7 +241,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { config.protoFactory = thrift.NewTJSONProtocolFactory() } else if config.binTransport { - trans, err := newTBinTransport(url, config.timeoutms, config.secure) + trans, err := newTBinTransport(url, config.timeoutms, config) if err != nil { return nil, errors.Wrap(err, "Error creating realis") } @@ -263,6 +265,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) { } + // Jitter returns a time.Duration between duration and duration + maxFactor * // duration. // @@ -276,6 +279,7 @@ func Jitter(duration time.Duration, maxFactor float64) time.Duration { return wait } + func GetDefaultClusterFromZKUrl(zkurl string) *Cluster { return &Cluster{Name: "defaultCluster", AuthMechanism: "UNAUTHENTICATED", @@ -286,17 +290,44 @@ func GetDefaultClusterFromZKUrl(zkurl string) *Cluster { } } +func getcerts(certpath string) (*x509.CertPool, error) { + globalRootCAs := x509.NewCertPool() + caFiles, err := ioutil.ReadDir(certpath) + if err != nil { + return nil, err + } + for _, cert := range caFiles { + capathfile := filepath.Join(certpath, cert.Name()) + caCert, err := ioutil.ReadFile(capathfile) + if err != nil { + return nil, err + } + globalRootCAs.AppendCertsFromPEM(caCert) + } + return globalRootCAs, nil +} + // Creates a default Thrift Transport object for communications in gorealis using an HTTP Post Client -func defaultTTransport(urlstr string, timeoutms int, secure bool) (thrift.TTransport, error) { +func defaultTTransport(urlstr string, timeoutms int, config *RealisConfig) (thrift.TTransport, error) { jar, err := cookiejar.New(nil) if err != nil { return &thrift.THttpClient{}, errors.Wrap(err, "Error creating Cookie Jar") } var transport http.Transport - if secure { - transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: false} - } else { - transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + if config != nil { + var tlsConfig *tls.Config + if config.Insecure { + tlsConfig = &tls.Config{InsecureSkipVerify: true} + } + if config.certspath != "" { + rootCAs, err := getcerts(config.certspath) + if err != nil { + fmt.Println("error occured couldn't fetch certs") + return nil, err + } + tlsConfig.RootCAs = rootCAs + } + transport.TLSClientConfig = tlsConfig } trans, err := thrift.NewTHttpPostClientWithOptions(urlstr+"/api", @@ -313,15 +344,17 @@ func defaultTTransport(urlstr string, timeoutms int, secure bool) (thrift.TTrans return trans, nil } + + // Create a default configuration of the transport layer, requires a URL to test connection with. // Uses HTTP Post as transport layer and Thrift JSON as the wire protocol by default. -func newDefaultConfig(url string, timeoutms int, secure bool) (*RealisConfig, error) { - return newTJSONConfig(url, timeoutms, secure) +func newDefaultConfig(url string, timeoutms int, config *RealisConfig) (*RealisConfig, error) { + return newTJSONConfig(url, timeoutms, config) } // Creates a realis config object using HTTP Post and Thrift JSON protocol to communicate with Aurora. -func newTJSONConfig(url string, timeoutms int, secure bool) (*RealisConfig, error) { - trans, err := defaultTTransport(url, timeoutms, secure) +func newTJSONConfig(url string, timeoutms int, config *RealisConfig) (*RealisConfig, error) { + trans, err := defaultTTransport(url, timeoutms, config) if err != nil { return &RealisConfig{}, errors.Wrap(err, "Error creating realis config") } @@ -334,8 +367,8 @@ func newTJSONConfig(url string, timeoutms int, secure bool) (*RealisConfig, erro } // Creates a realis config config using HTTP Post and Thrift Binary protocol to communicate with Aurora. -func newTBinaryConfig(url string, timeoutms int, secure bool) (*RealisConfig, error) { - trans, err := defaultTTransport(url, timeoutms, secure) +func newTBinaryConfig(url string, timeoutms int, config *RealisConfig) (*RealisConfig, error) { + trans, err := defaultTTransport(url, timeoutms, config) if err != nil { return &RealisConfig{}, errors.Wrap(err, "Error creating realis config") } @@ -360,8 +393,12 @@ func AddBasicAuth(config *RealisConfig, username string, password string) { } // -func Secure(config *RealisConfig, secure bool) { - config.secure = secure +func Secure(config *RealisConfig, insecure bool) { + config.Insecure = insecure +} + +func Certpath(config *RealisConfig, certspath string) { + config.certspath = certspath } func basicAuth(username, password string) string { @@ -386,14 +423,14 @@ func (r *realisClient) ReestablishConn() error { } r.logger.Println("ReestablishConn url: ", url) if r.config.jsonTransport { - trans, err := newTJSONTransport(url, r.config.timeoutms, r.config.secure) + trans, err := newTJSONTransport(url, r.config.timeoutms, r.config) if err != nil { return errors.Wrap(err, "Error creating realis") } r.config.transport = trans r.config.protoFactory = thrift.NewTJSONProtocolFactory() } else if r.config.binTransport { - trans, err := newTBinTransport(url, r.config.timeoutms, r.config.secure) + trans, err := newTBinTransport(url, r.config.timeoutms, r.config) if err != nil { return errors.Wrap(err, "Error creating realis") } @@ -412,14 +449,14 @@ func (r *realisClient) ReestablishConn() error { //Re-establish using scheduler url. r.logger.Println("ReestablishConn url: ", r.config.url) if r.config.jsonTransport { - trans, err := newTJSONTransport(url, r.config.timeoutms, r.config.secure) + trans, err := newTJSONTransport(url, r.config.timeoutms, r.config) if err != nil { return errors.Wrap(err, "Error creating realis") } r.config.transport = trans r.config.protoFactory = thrift.NewTJSONProtocolFactory() } else if r.config.binTransport { - trans, err := newTBinTransport(url, r.config.timeoutms, r.config.secure) + trans, err := newTBinTransport(url, r.config.timeoutms, r.config) if err != nil { return errors.Wrap(err, "Error creating realis") } @@ -863,6 +900,7 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.S retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) { resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) { + fmt.Println(clientErr) return r.client.GetTasksStatus(query) }) if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {