Code cleanup, added ability to attach logger, added CreateService api

* Code cleanup: Deleted multiple functions which have become stale. Removed cluster example as we replaced the need to create the Cluster object.

* Cleaned up ZK connection code by using the backoff function. Added a test to the end to end to test that we're getting the host correctly from ZK. Changed clusters test to be an outside package.

* Added LeaderFromZKURL test to end to end tests.

* Added logger to realisConfig so that users can attach their own Loggers to the client. Logger is an interface that shadows most popular logging libraries. Only Print, Println, and Printf are needed to be a realis.Logger type. Example in the client uses the std library log.

* Moved most fmt.Print* calls to be redirected to user provided logger. Logger by default is a no-op logger.

* Adding CreateService to realis interface. Uses the StartJobUpdate API to create services instead of the createJobs API.

* Bumping up version number inside client in anticipation of new release.
This commit is contained in:
Renan DelValle 2017-11-30 12:02:50 -08:00 committed by GitHub
parent 72b746e431
commit e614e04f27
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 292 additions and 298 deletions

268
realis.go
View file

@ -31,28 +31,29 @@ import (
"github.com/pkg/errors"
)
const VERSION = "1.0.4"
const VERSION = "1.1.0"
type Realis interface {
AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error)
AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error)
RemoveInstances(key *aurora.JobKey, count int32) (*aurora.Response, error)
CreateJob(auroraJob Job) (*aurora.Response, error)
CreateService(auroraJob Job, settings UpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error)
DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error)
GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error)
GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error)
FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error)
GetInstanceIds(key *aurora.JobKey, states map[aurora.ScheduleStatus]bool) (map[int32]bool, error)
GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error)
GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error)
GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error)
JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aurora.Response, error)
KillJob(key *aurora.JobKey) (*aurora.Response, error)
KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error)
RemoveInstances(key *aurora.JobKey, count int32) (*aurora.Response, error)
RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error)
RestartJob(key *aurora.JobKey) (*aurora.Response, error)
RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error)
ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error)
StartCronJob(key *aurora.JobKey) (*aurora.Response, error)
GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error)
ReestablishConn() error
RealisConfig() *RealisConfig
Close()
@ -68,69 +69,101 @@ type realisClient struct {
client *aurora.AuroraSchedulerManagerClient
readonlyClient *aurora.ReadOnlySchedulerClient
adminClient *aurora.AuroraAdminClient
logger Logger
}
type option func(*RealisConfig)
type RealisConfig struct {
username, password string
url string
timeoutms int
binTransport, jsonTransport bool
cluster *Cluster
backoff *Backoff
transport thrift.TTransport
protoFactory thrift.TProtocolFactory
logger Logger
}
type Backoff struct {
Duration time.Duration // the base duration
Factor float64 // Duration is multipled by factor each iteration
Jitter float64 // The amount of jitter applied each iteration
Steps int // Exit with error after this many steps
}
var defaultBackoff = Backoff{
Steps: 3,
Duration: 10 * time.Second,
Factor: 5.0,
Jitter: 0.1,
}
type ClientOption func(*RealisConfig)
//Config sets for options in RealisConfig.
func BasicAuth(username, password string) option {
func BasicAuth(username, password string) ClientOption {
return func(config *RealisConfig) {
config.username = username
config.password = password
}
}
func SchedulerUrl(url string) option {
func SchedulerUrl(url string) ClientOption {
return func(config *RealisConfig) {
config.url = url
}
}
func TimeoutMS(timeout int) option {
func TimeoutMS(timeout int) ClientOption {
return func(config *RealisConfig) {
config.timeoutms = timeout
}
}
func ZKCluster(cluster *Cluster) option {
func ZKCluster(cluster *Cluster) ClientOption {
return func(config *RealisConfig) {
config.cluster = cluster
}
}
func ZKUrl(url string) option {
func ZKUrl(url string) ClientOption {
return func(config *RealisConfig) {
config.cluster = GetDefaultClusterFromZKUrl(url)
}
}
func Retries(backoff *Backoff) option {
func Retries(backoff *Backoff) ClientOption {
return func(config *RealisConfig) {
config.backoff = backoff
}
}
func ThriftJSON() option {
func ThriftJSON() ClientOption {
return func(config *RealisConfig) {
config.jsonTransport = true
}
}
func ThriftBinary() option {
func ThriftBinary() ClientOption {
return func(config *RealisConfig) {
config.binTransport = true
}
}
func BackOff(b *Backoff) option {
func BackOff(b *Backoff) ClientOption {
return func(config *RealisConfig) {
config.backoff = b
}
}
func newTJSONTransport(url string, timeout int) (thrift.TTransport, error) {
// Using the word set to avoid name collision with Interface
func SetLogger(l Logger) ClientOption {
return func(config *RealisConfig) {
config.logger = l
}
}
func newTJSONTransport(url string, timeout int) (thrift.TTransport, error) {
trans, err := defaultTTransport(url, timeout)
if err != nil {
return nil, errors.Wrap(err, "Error creating realis")
@ -155,35 +188,41 @@ func newTBinTransport(url string, timeout int) (thrift.TTransport, error) {
return trans, err
}
func NewRealisClient(options ...option) (Realis, error) {
func NewRealisClient(options ...ClientOption) (Realis, error) {
config := &RealisConfig{}
fmt.Println(" options length: ", len(options))
// Default configs
config.timeoutms = 10000
config.backoff = &defaultBackoff
config.logger = NoopLogger{}
// Override default configs where necessary
for _, opt := range options {
opt(config)
}
//Default timeout
if config.timeoutms == 0 {
config.timeoutms = 10000
}
config.logger.Println("Number of options applied to config: ", len(options))
//Set default Transport to JSON if needed.
if !config.jsonTransport && !config.binTransport {
config.jsonTransport = true
}
var url string
var err error
//Cluster or URL?
// Determine how to get information to connect to the scheduler.
// Prioritize getting leader from ZK over using a direct URL.
if config.cluster != nil {
url, err = LeaderFromZK(*config.cluster)
// If ZK is configured, throw an error if the leader is unable to be determined
if err != nil {
return nil, errors.Wrap(err, "LeaderFromZK error")
}
fmt.Println("schedURLFromZK: ", url)
config.logger.Println("Scheduler URL from ZK: ", url)
} else if config.url != "" {
fmt.Println("Scheduler URL: ", config.url)
url = config.url
config.logger.Println("Scheduler URL: ", url)
} else {
return nil, errors.New("Incomplete Options -- url or cluster required")
}
@ -193,9 +232,9 @@ func NewRealisClient(options ...option) (Realis, error) {
if err != nil {
return nil, errors.Wrap(err, "Error creating realis")
}
config.transport = trans
config.protoFactory = thrift.NewTJSONProtocolFactory()
} else if config.binTransport {
trans, err := newTBinTransport(url, config.timeoutms)
if err != nil {
@ -205,55 +244,22 @@ func NewRealisClient(options ...option) (Realis, error) {
config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault()
}
config.logger.Printf("gorealis config url: %+v\n", config.url)
//Basic Authentication.
if config.username != "" && config.password != "" {
AddBasicAuth(config, config.username, config.password)
}
//Set defaultBackoff if required.
if config.backoff == nil {
config.backoff = &defaultBackoff
} else {
defaultBackoff = *config.backoff
fmt.Printf(" updating default backoff : %+v\n", *config.backoff)
}
fmt.Printf("gorealis config url: %+v\n", config.url)
return &realisClient{
config: config,
client: aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory),
readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory),
adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory)}, nil
adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory),
logger: config.logger}, nil
}
// Wrapper object to provide future flexibility
type RealisConfig struct {
username, password string
url string
timeoutms int
binTransport, jsonTransport bool
cluster *Cluster
backoff *Backoff
transport thrift.TTransport
protoFactory thrift.TProtocolFactory
}
type Backoff struct {
Duration time.Duration // the base duration
Factor float64 // Duration is multipled by factor each iteration
Jitter float64 // The amount of jitter applied each iteration
Steps int // Exit with error after this many steps
}
var defaultBackoff = Backoff{
Steps: 3,
Duration: 10 * time.Second,
Factor: 5.0,
Jitter: 0.1,
}
// Jitter returns a time.Duration between duration and duration + maxFactor *
// duration.
//
@ -267,33 +273,6 @@ func Jitter(duration time.Duration, maxFactor float64) time.Duration {
return wait
}
// Create a new Client with Cluster information and other details.
func NewDefaultClientUsingCluster(cluster *Cluster, user, passwd string) (Realis, error) {
url, err := LeaderFromZK(*cluster)
if err != nil {
fmt.Println(err)
return nil, err
}
fmt.Printf(" url: %s\n", url)
//Create new configuration with default transport layer
config, err := newDefaultConfig(url, 10000)
if err != nil {
fmt.Println(err)
return nil, err
}
config.username = user
config.password = passwd
config.cluster = cluster
config.url = ""
// Configured for vagrant
AddBasicAuth(config, user, passwd)
r := newClient(config)
return r, nil
}
func GetDefaultClusterFromZKUrl(zkurl string) *Cluster {
return &Cluster{Name: "defaultCluster",
AuthMechanism: "UNAUTHENTICATED",
@ -304,65 +283,6 @@ func GetDefaultClusterFromZKUrl(zkurl string) *Cluster {
}
}
//This api would create default cluster object..
func NewDefaultClientUsingZKUrl(zkUrl, user, passwd string) (Realis, error) {
fmt.Printf(" zkUrl: %s\n", zkUrl)
cluster := GetDefaultClusterFromZKUrl(zkUrl)
url, err := LeaderFromZK(*cluster)
if err != nil {
fmt.Println(err)
return nil, err
}
fmt.Printf(" url: %s\n", url)
//Create new configuration with default transport layer
config, err := newDefaultConfig(url, 10000)
if err != nil {
fmt.Println(err)
return nil, err
}
config.username = user
config.password = passwd
config.cluster = cluster
config.url = ""
// Configured for vagrant
AddBasicAuth(config, user, passwd)
r := newClient(config)
return r, nil
}
func NewDefaultClientUsingUrl(url, user, passwd string) (Realis, error) {
fmt.Printf(" url: %s\n", url)
//Create new configuration with default transport layer
config, err := newDefaultConfig(url, 10000)
if err != nil {
fmt.Println(err)
return nil, err
}
config.username = user
config.password = passwd
config.url = url
config.cluster = nil
// Configured for vagrant
AddBasicAuth(config, user, passwd)
config.backoff = &Backoff{Steps: 2, Duration: 10 * time.Second, Factor: 2.0, Jitter: 0.1}
r := newClient(config)
return r, nil
}
// Create a new Client with a default transport layer
func newClient(realisconfig *RealisConfig) Realis {
return &realisClient{
config: realisconfig,
client: aurora.NewAuroraSchedulerManagerClientFactory(realisconfig.transport, realisconfig.protoFactory),
readonlyClient: aurora.NewReadOnlySchedulerClientFactory(realisconfig.transport, realisconfig.protoFactory),
adminClient: aurora.NewAuroraAdminClientFactory(realisconfig.transport, realisconfig.protoFactory)}
}
// Creates a default Thrift Transport object for communications in gorealis using an HTTP Post Client
func defaultTTransport(urlstr string, timeoutms int) (thrift.TTransport, error) {
jar, err := cookiejar.New(nil)
@ -439,7 +359,7 @@ func basicAuth(username, password string) string {
func (r *realisClient) ReestablishConn() error {
//close existing connection..
fmt.Println("ReestablishConn begin ....")
r.logger.Println("ReestablishConn begin ....")
r.Close()
//First check cluster object for re-establish; if not available then try with scheduler url.
//var config *RealisConfig
@ -452,7 +372,7 @@ func (r *realisClient) ReestablishConn() error {
if err != nil {
fmt.Errorf("LeaderFromZK error: %+v\n ", err)
}
fmt.Println("ReestablishConn url: ", url)
r.logger.Println("ReestablishConn url: ", url)
if r.config.jsonTransport {
trans, err := newTJSONTransport(url, r.config.timeoutms)
if err != nil {
@ -469,7 +389,7 @@ func (r *realisClient) ReestablishConn() error {
r.config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault()
}
if err != nil {
fmt.Println("error creating config: ", err)
r.logger.Println("error creating config: ", err)
}
// Configured for basic-auth
AddBasicAuth(r.config, r.config.username, r.config.password)
@ -478,7 +398,7 @@ func (r *realisClient) ReestablishConn() error {
r.adminClient = aurora.NewAuroraAdminClientFactory(r.config.transport, r.config.protoFactory)
} else if r.config.url != "" && r.config.username != "" && r.config.password != "" {
//Re-establish using scheduler url.
fmt.Println("ReestablishConn url: ", r.config.url)
r.logger.Println("ReestablishConn url: ", r.config.url)
if r.config.jsonTransport {
trans, err := newTJSONTransport(url, r.config.timeoutms)
if err != nil {
@ -499,14 +419,14 @@ func (r *realisClient) ReestablishConn() error {
r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(r.config.transport, r.config.protoFactory)
r.adminClient = aurora.NewAuroraAdminClientFactory(r.config.transport, r.config.protoFactory)
} else {
fmt.Println(" Missing Data for ReestablishConn ")
fmt.Println(" r.config.cluster: ", r.config.cluster)
fmt.Println(" r.config.username: ", r.config.username)
fmt.Println(" r.config.passwd: ", r.config.password)
fmt.Println(" r.config.url: ", r.config.url)
r.logger.Println(" Missing Data for ReestablishConn ")
r.logger.Println(" r.config.cluster: ", r.config.cluster)
r.logger.Println(" r.config.username: ", r.config.username)
r.logger.Println(" r.config.passwd: ", r.config.password)
r.logger.Println(" r.config.url: ", r.config.url)
return errors.New(" Missing Data for ReestablishConn ")
}
fmt.Printf(" config url before return: %+v\n", r.config.url)
r.logger.Printf(" config url before return: %+v\n", r.config.url)
return nil
}
@ -645,6 +565,9 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
}
// Sends a create job message to the scheduler with a specific job configuration.
// Although this API is able to create service jobs, it is better to use CreateService instead
// as that API uses the update thrift call which has a few extra features available.
// Use this API to create ad-hoc jobs.
func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
var resp *aurora.Response
var clientErr error
@ -669,6 +592,24 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
}
// This API uses an update thrift call to create the services giving a few more robust features.
func (r *realisClient) CreateService(auroraJob Job, settings UpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error) {
// Create a new job update object and ship it to the StartJobUpdate api
update := NewUpdateJob(auroraJob.TaskConfig(), &settings.settings)
update.InstanceCount(auroraJob.GetInstanceCount())
resp, err := r.StartJobUpdate(update, "")
if err != nil {
return resp, nil, errors.Wrap(err, "unable to create service")
}
if resp != nil && resp.GetResult_() != nil {
return resp, resp.GetResult_().GetStartJobUpdateResult_(), nil
}
return resp, nil, errors.New("results object is nil")
}
func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) {
var resp *aurora.Response
var clientErr error
@ -962,6 +903,7 @@ func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks []
}
// Get the task configuration from the aurora scheduler for a job
func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error) {
ids := make(map[int32]bool)
@ -1166,7 +1108,7 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au
// Experienced an connection error
err1 := r.ReestablishConn()
if err1 != nil {
fmt.Println("error in re-establishing connection: ", err1)
r.logger.Println("error in re-establishing connection: ", err1)
}
return false, nil
}