add insecure and certspath to configs
This commit is contained in:
parent
6ddbf833cf
commit
7980d7cad7
1 changed files with 64 additions and 26 deletions
90
realis.go
90
realis.go
|
@ -17,12 +17,14 @@ package realis
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
|
"crypto/x509"
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/cookiejar"
|
"net/http/cookiejar"
|
||||||
"strings"
|
"path/filepath"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.apache.org/thrift.git/lib/go/thrift"
|
"git.apache.org/thrift.git/lib/go/thrift"
|
||||||
|
@ -82,6 +84,8 @@ type RealisConfig struct {
|
||||||
transport thrift.TTransport
|
transport thrift.TTransport
|
||||||
protoFactory thrift.TProtocolFactory
|
protoFactory thrift.TProtocolFactory
|
||||||
logger Logger
|
logger Logger
|
||||||
|
Insecure bool
|
||||||
|
certspath string
|
||||||
}
|
}
|
||||||
|
|
||||||
type Backoff struct {
|
type Backoff struct {
|
||||||
|
@ -164,10 +168,8 @@ func SetLogger(l Logger) ClientOption {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newTJSONTransport(url string, timeout int, config *RealisConfig) (thrift.TTransport, error) {
|
||||||
func newTJSONTransport(url string, timeout int, secure bool) (thrift.TTransport, error) {
|
trans, err := defaultTTransport(url, timeout, config)
|
||||||
|
|
||||||
trans, err := defaultTTransport(url, timeout, secure)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "Error creating realis")
|
return nil, errors.Wrap(err, "Error creating realis")
|
||||||
}
|
}
|
||||||
|
@ -177,8 +179,8 @@ func newTJSONTransport(url string, timeout int, secure bool) (thrift.TTransport,
|
||||||
return trans, err
|
return trans, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTBinTransport(url string, timeout int, secure bool) (thrift.TTransport, error) {
|
func newTBinTransport(url string, timeout int, config *RealisConfig) (thrift.TTransport, error) {
|
||||||
trans, err := defaultTTransport(url, timeout, secure)
|
trans, err := defaultTTransport(url, timeout, config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "Error creating realis")
|
return nil, errors.Wrap(err, "Error creating realis")
|
||||||
}
|
}
|
||||||
|
@ -231,7 +233,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.jsonTransport {
|
if config.jsonTransport {
|
||||||
trans, err := newTJSONTransport(url, config.timeoutms, config.secure)
|
trans, err := newTJSONTransport(url, config.timeoutms, config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "Error creating realis")
|
return nil, errors.Wrap(err, "Error creating realis")
|
||||||
}
|
}
|
||||||
|
@ -239,7 +241,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
|
||||||
config.protoFactory = thrift.NewTJSONProtocolFactory()
|
config.protoFactory = thrift.NewTJSONProtocolFactory()
|
||||||
|
|
||||||
} else if config.binTransport {
|
} else if config.binTransport {
|
||||||
trans, err := newTBinTransport(url, config.timeoutms, config.secure)
|
trans, err := newTBinTransport(url, config.timeoutms, config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "Error creating realis")
|
return nil, errors.Wrap(err, "Error creating realis")
|
||||||
}
|
}
|
||||||
|
@ -263,6 +265,7 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// Jitter returns a time.Duration between duration and duration + maxFactor *
|
// Jitter returns a time.Duration between duration and duration + maxFactor *
|
||||||
// duration.
|
// duration.
|
||||||
//
|
//
|
||||||
|
@ -276,6 +279,7 @@ func Jitter(duration time.Duration, maxFactor float64) time.Duration {
|
||||||
return wait
|
return wait
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
func GetDefaultClusterFromZKUrl(zkurl string) *Cluster {
|
func GetDefaultClusterFromZKUrl(zkurl string) *Cluster {
|
||||||
return &Cluster{Name: "defaultCluster",
|
return &Cluster{Name: "defaultCluster",
|
||||||
AuthMechanism: "UNAUTHENTICATED",
|
AuthMechanism: "UNAUTHENTICATED",
|
||||||
|
@ -286,17 +290,44 @@ func GetDefaultClusterFromZKUrl(zkurl string) *Cluster {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getcerts(certpath string) (*x509.CertPool, error) {
|
||||||
|
globalRootCAs := x509.NewCertPool()
|
||||||
|
caFiles, err := ioutil.ReadDir(certpath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for _, cert := range caFiles {
|
||||||
|
capathfile := filepath.Join(certpath, cert.Name())
|
||||||
|
caCert, err := ioutil.ReadFile(capathfile)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
globalRootCAs.AppendCertsFromPEM(caCert)
|
||||||
|
}
|
||||||
|
return globalRootCAs, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Creates a default Thrift Transport object for communications in gorealis using an HTTP Post Client
|
// Creates a default Thrift Transport object for communications in gorealis using an HTTP Post Client
|
||||||
func defaultTTransport(urlstr string, timeoutms int, secure bool) (thrift.TTransport, error) {
|
func defaultTTransport(urlstr string, timeoutms int, config *RealisConfig) (thrift.TTransport, error) {
|
||||||
jar, err := cookiejar.New(nil)
|
jar, err := cookiejar.New(nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &thrift.THttpClient{}, errors.Wrap(err, "Error creating Cookie Jar")
|
return &thrift.THttpClient{}, errors.Wrap(err, "Error creating Cookie Jar")
|
||||||
}
|
}
|
||||||
var transport http.Transport
|
var transport http.Transport
|
||||||
if secure {
|
if config != nil {
|
||||||
transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: false}
|
var tlsConfig *tls.Config
|
||||||
} else {
|
if config.Insecure {
|
||||||
transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
|
tlsConfig = &tls.Config{InsecureSkipVerify: true}
|
||||||
|
}
|
||||||
|
if config.certspath != "" {
|
||||||
|
rootCAs, err := getcerts(config.certspath)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("error occured couldn't fetch certs")
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
tlsConfig.RootCAs = rootCAs
|
||||||
|
}
|
||||||
|
transport.TLSClientConfig = tlsConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
trans, err := thrift.NewTHttpPostClientWithOptions(urlstr+"/api",
|
trans, err := thrift.NewTHttpPostClientWithOptions(urlstr+"/api",
|
||||||
|
@ -313,15 +344,17 @@ func defaultTTransport(urlstr string, timeoutms int, secure bool) (thrift.TTrans
|
||||||
return trans, nil
|
return trans, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// Create a default configuration of the transport layer, requires a URL to test connection with.
|
// Create a default configuration of the transport layer, requires a URL to test connection with.
|
||||||
// Uses HTTP Post as transport layer and Thrift JSON as the wire protocol by default.
|
// Uses HTTP Post as transport layer and Thrift JSON as the wire protocol by default.
|
||||||
func newDefaultConfig(url string, timeoutms int, secure bool) (*RealisConfig, error) {
|
func newDefaultConfig(url string, timeoutms int, config *RealisConfig) (*RealisConfig, error) {
|
||||||
return newTJSONConfig(url, timeoutms, secure)
|
return newTJSONConfig(url, timeoutms, config)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Creates a realis config object using HTTP Post and Thrift JSON protocol to communicate with Aurora.
|
// Creates a realis config object using HTTP Post and Thrift JSON protocol to communicate with Aurora.
|
||||||
func newTJSONConfig(url string, timeoutms int, secure bool) (*RealisConfig, error) {
|
func newTJSONConfig(url string, timeoutms int, config *RealisConfig) (*RealisConfig, error) {
|
||||||
trans, err := defaultTTransport(url, timeoutms, secure)
|
trans, err := defaultTTransport(url, timeoutms, config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &RealisConfig{}, errors.Wrap(err, "Error creating realis config")
|
return &RealisConfig{}, errors.Wrap(err, "Error creating realis config")
|
||||||
}
|
}
|
||||||
|
@ -334,8 +367,8 @@ func newTJSONConfig(url string, timeoutms int, secure bool) (*RealisConfig, erro
|
||||||
}
|
}
|
||||||
|
|
||||||
// Creates a realis config config using HTTP Post and Thrift Binary protocol to communicate with Aurora.
|
// Creates a realis config config using HTTP Post and Thrift Binary protocol to communicate with Aurora.
|
||||||
func newTBinaryConfig(url string, timeoutms int, secure bool) (*RealisConfig, error) {
|
func newTBinaryConfig(url string, timeoutms int, config *RealisConfig) (*RealisConfig, error) {
|
||||||
trans, err := defaultTTransport(url, timeoutms, secure)
|
trans, err := defaultTTransport(url, timeoutms, config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &RealisConfig{}, errors.Wrap(err, "Error creating realis config")
|
return &RealisConfig{}, errors.Wrap(err, "Error creating realis config")
|
||||||
}
|
}
|
||||||
|
@ -360,8 +393,12 @@ func AddBasicAuth(config *RealisConfig, username string, password string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
func Secure(config *RealisConfig, secure bool) {
|
func Secure(config *RealisConfig, insecure bool) {
|
||||||
config.secure = secure
|
config.Insecure = insecure
|
||||||
|
}
|
||||||
|
|
||||||
|
func Certpath(config *RealisConfig, certspath string) {
|
||||||
|
config.certspath = certspath
|
||||||
}
|
}
|
||||||
|
|
||||||
func basicAuth(username, password string) string {
|
func basicAuth(username, password string) string {
|
||||||
|
@ -386,14 +423,14 @@ func (r *realisClient) ReestablishConn() error {
|
||||||
}
|
}
|
||||||
r.logger.Println("ReestablishConn url: ", url)
|
r.logger.Println("ReestablishConn url: ", url)
|
||||||
if r.config.jsonTransport {
|
if r.config.jsonTransport {
|
||||||
trans, err := newTJSONTransport(url, r.config.timeoutms, r.config.secure)
|
trans, err := newTJSONTransport(url, r.config.timeoutms, r.config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "Error creating realis")
|
return errors.Wrap(err, "Error creating realis")
|
||||||
}
|
}
|
||||||
r.config.transport = trans
|
r.config.transport = trans
|
||||||
r.config.protoFactory = thrift.NewTJSONProtocolFactory()
|
r.config.protoFactory = thrift.NewTJSONProtocolFactory()
|
||||||
} else if r.config.binTransport {
|
} else if r.config.binTransport {
|
||||||
trans, err := newTBinTransport(url, r.config.timeoutms, r.config.secure)
|
trans, err := newTBinTransport(url, r.config.timeoutms, r.config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "Error creating realis")
|
return errors.Wrap(err, "Error creating realis")
|
||||||
}
|
}
|
||||||
|
@ -412,14 +449,14 @@ func (r *realisClient) ReestablishConn() error {
|
||||||
//Re-establish using scheduler url.
|
//Re-establish using scheduler url.
|
||||||
r.logger.Println("ReestablishConn url: ", r.config.url)
|
r.logger.Println("ReestablishConn url: ", r.config.url)
|
||||||
if r.config.jsonTransport {
|
if r.config.jsonTransport {
|
||||||
trans, err := newTJSONTransport(url, r.config.timeoutms, r.config.secure)
|
trans, err := newTJSONTransport(url, r.config.timeoutms, r.config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "Error creating realis")
|
return errors.Wrap(err, "Error creating realis")
|
||||||
}
|
}
|
||||||
r.config.transport = trans
|
r.config.transport = trans
|
||||||
r.config.protoFactory = thrift.NewTJSONProtocolFactory()
|
r.config.protoFactory = thrift.NewTJSONProtocolFactory()
|
||||||
} else if r.config.binTransport {
|
} else if r.config.binTransport {
|
||||||
trans, err := newTBinTransport(url, r.config.timeoutms, r.config.secure)
|
trans, err := newTBinTransport(url, r.config.timeoutms, r.config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "Error creating realis")
|
return errors.Wrap(err, "Error creating realis")
|
||||||
}
|
}
|
||||||
|
@ -863,6 +900,7 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.S
|
||||||
|
|
||||||
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
|
retryErr := ExponentialBackoff(*r.config.backoff, func() (bool, error) {
|
||||||
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
|
resp, clientErr = CheckAndRetryConn(r, func() (*aurora.Response, error) {
|
||||||
|
fmt.Println(clientErr)
|
||||||
return r.client.GetTasksStatus(query)
|
return r.client.GetTasksStatus(query)
|
||||||
})
|
})
|
||||||
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
|
if clientErr != nil && clientErr.Error() == RetryConnErr.Error() {
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue