Code cleanup and moving towards messages printed by library being configurabele by using a logger. Discussion should take place as to wether we want to move towards a flag enabling debug messages at compile time, like gocql, or embedding an optional logger.

This commit is contained in:
Renan DelValle 2017-11-28 08:46:54 -08:00
parent 72b746e431
commit d99082d1a1
4 changed files with 104 additions and 185 deletions

View file

@ -45,11 +45,10 @@ func init() {
flag.StringVar(&zkUrl, "zkurl", "", "zookeeper url")
flag.StringVar(&hostList, "hostList", "", "Comma separated list of hosts to operate on")
flag.Parse()
}
func main() {
// Attempt to load leader from zookeeper
// Attempt to load leader from zookeeper using a
// cluster.json file used for the default aurora client if provided.
// This will override the provided url in the arguments
if clustersConfig != "" {
clusters, err := realis.LoadClusters(clustersConfig)
if err != nil {
@ -59,7 +58,7 @@ func main() {
cluster, ok := clusters[clusterName]
if !ok {
fmt.Printf("Cluster %s chosen doesn't exist\n", clusterName)
fmt.Printf("Cluster %s doesn't exist in the file provided\n", clusterName)
os.Exit(1)
}
@ -69,17 +68,25 @@ func main() {
os.Exit(1)
}
}
}
func main() {
var job realis.Job
var err error
var monitor *realis.Monitor
var r realis.Realis
var defaultBackoff = &realis.Backoff{
Steps: 2,
Duration: 10 * time.Second,
Factor: 2.0,
Jitter: 0.1,
clientOptions := []realis.ClientOption{
realis.BasicAuth(username, password),
realis.ThriftJSON(),
realis.TimeoutMS(CONNECTION_TIMEOUT),
realis.BackOff(&realis.Backoff{
Steps: 2,
Duration: 10 * time.Second,
Factor: 2.0,
Jitter: 0.1,
}),
}
//check if zkUrl is available.
@ -93,32 +100,17 @@ func main() {
AgentRoot: "/var/lib/mesos",
}
fmt.Printf("cluster: %+v \n", cluster)
r, err = realis.NewRealisClient(realis.ZKUrl(zkUrl),
realis.BasicAuth(username, password),
realis.ThriftJSON(),
realis.TimeoutMS(CONNECTION_TIMEOUT),
realis.BackOff(defaultBackoff))
if err != nil {
fmt.Println(err)
os.Exit(1)
}
monitor = &realis.Monitor{r}
clientOptions = append(clientOptions, realis.ZKUrl(zkUrl))
} else {
r, err = realis.NewRealisClient(realis.SchedulerUrl(url),
realis.BasicAuth(username, password),
realis.ThriftJSON(),
realis.TimeoutMS(CONNECTION_TIMEOUT),
realis.BackOff(defaultBackoff))
if err != nil {
fmt.Println(err)
os.Exit(1)
}
monitor = &realis.Monitor{r}
clientOptions = append(clientOptions, realis.SchedulerUrl(url))
}
r, err = realis.NewRealisClient(clientOptions...)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
monitor = &realis.Monitor{r}
defer r.Close()
switch executor {

2
job.go
View file

@ -151,8 +151,6 @@ func (j *AuroraJob) RAM(ram int64) Job {
*j.resources["ram"].RamMb = ram
j.jobConfig.TaskConfig.RamMb = ram //Will be deprecated soon
return j
}

226
realis.go
View file

@ -25,10 +25,15 @@ import (
"math/rand"
"log"
"bytes"
"git.apache.org/thrift.git/lib/go/thrift"
"github.com/paypal/gorealis/gen-go/apache/aurora"
"github.com/paypal/gorealis/response"
"github.com/pkg/errors"
"os"
)
const VERSION = "1.0.4"
@ -36,23 +41,23 @@ const VERSION = "1.0.4"
type Realis interface {
AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error)
AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error)
RemoveInstances(key *aurora.JobKey, count int32) (*aurora.Response, error)
CreateJob(auroraJob Job) (*aurora.Response, error)
DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error)
GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error)
GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error)
FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error)
GetInstanceIds(key *aurora.JobKey, states map[aurora.ScheduleStatus]bool) (map[int32]bool, error)
GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error)
GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error)
GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error)
JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aurora.Response, error)
KillJob(key *aurora.JobKey) (*aurora.Response, error)
KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error)
RemoveInstances(key *aurora.JobKey, count int32) (*aurora.Response, error)
RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error)
RestartJob(key *aurora.JobKey) (*aurora.Response, error)
RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error)
ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error)
StartCronJob(key *aurora.JobKey) (*aurora.Response, error)
GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error)
ReestablishConn() error
RealisConfig() *RealisConfig
Close()
@ -68,69 +73,94 @@ type realisClient struct {
client *aurora.AuroraSchedulerManagerClient
readonlyClient *aurora.ReadOnlySchedulerClient
adminClient *aurora.AuroraAdminClient
logger *log.Logger
}
type option func(*RealisConfig)
type RealisConfig struct {
username, password string
url string
timeoutms int
binTransport, jsonTransport bool
cluster *Cluster
backoff *Backoff
transport thrift.TTransport
protoFactory thrift.TProtocolFactory
logger *log.Logger
}
type Backoff struct {
Duration time.Duration // the base duration
Factor float64 // Duration is multipled by factor each iteration
Jitter float64 // The amount of jitter applied each iteration
Steps int // Exit with error after this many steps
}
var defaultBackoff = Backoff{
Steps: 3,
Duration: 10 * time.Second,
Factor: 5.0,
Jitter: 0.1,
}
type ClientOption func(*RealisConfig)
//Config sets for options in RealisConfig.
func BasicAuth(username, password string) option {
func BasicAuth(username, password string) ClientOption {
return func(config *RealisConfig) {
config.username = username
config.password = password
}
}
func SchedulerUrl(url string) option {
func SchedulerUrl(url string) ClientOption {
return func(config *RealisConfig) {
config.url = url
}
}
func TimeoutMS(timeout int) option {
func TimeoutMS(timeout int) ClientOption {
return func(config *RealisConfig) {
config.timeoutms = timeout
}
}
func ZKCluster(cluster *Cluster) option {
func ZKCluster(cluster *Cluster) ClientOption {
return func(config *RealisConfig) {
config.cluster = cluster
}
}
func ZKUrl(url string) option {
func ZKUrl(url string) ClientOption {
return func(config *RealisConfig) {
config.cluster = GetDefaultClusterFromZKUrl(url)
}
}
func Retries(backoff *Backoff) option {
func Retries(backoff *Backoff) ClientOption {
return func(config *RealisConfig) {
config.backoff = backoff
}
}
func ThriftJSON() option {
func ThriftJSON() ClientOption {
return func(config *RealisConfig) {
config.jsonTransport = true
}
}
func ThriftBinary() option {
func ThriftBinary() ClientOption {
return func(config *RealisConfig) {
config.binTransport = true
}
}
func BackOff(b *Backoff) option {
func BackOff(b *Backoff) ClientOption {
return func(config *RealisConfig) {
config.backoff = b
}
}
func newTJSONTransport(url string, timeout int) (thrift.TTransport, error) {
trans, err := defaultTTransport(url, timeout)
if err != nil {
return nil, errors.Wrap(err, "Error creating realis")
@ -155,12 +185,20 @@ func newTBinTransport(url string, timeout int) (thrift.TTransport, error) {
return trans, err
}
func NewRealisClient(options ...option) (Realis, error) {
func NewRealisClient(options ...ClientOption) (Realis, error) {
config := &RealisConfig{}
fmt.Println(" options length: ", len(options))
for _, opt := range options {
opt(config)
}
if config.logger == nil {
buf := new(bytes.Buffer)
config.logger = log.New(buf, "gorealis debug: ", 0)
config.logger.SetOutput(os.Stdout)
}
config.logger.Println("Number of options applied to config: ", len(options))
//Default timeout
if config.timeoutms == 0 {
config.timeoutms = 10000
@ -171,19 +209,19 @@ func NewRealisClient(options ...option) (Realis, error) {
}
var url string
var err error
//Cluster or URL?
// Determine how to get information to connect to the scheduler.
// Prioritize getting leader from ZK over using a direct URL.
if config.cluster != nil {
url, err = LeaderFromZK(*config.cluster)
// If ZK is configured, throw an error if the leader is unable to be determined
if err != nil {
return nil, errors.Wrap(err, "LeaderFromZK error")
}
fmt.Println("schedURLFromZK: ", url)
config.logger.Println("Scheduler URL from ZK: ", url)
} else if config.url != "" {
fmt.Println("Scheduler URL: ", config.url)
url = config.url
config.logger.Println("Scheduler URL: ", url)
} else {
return nil, errors.New("Incomplete Options -- url or cluster required")
}
@ -215,10 +253,10 @@ func NewRealisClient(options ...option) (Realis, error) {
config.backoff = &defaultBackoff
} else {
defaultBackoff = *config.backoff
fmt.Printf(" updating default backoff : %+v\n", *config.backoff)
config.logger.Printf("Updating default backoff : %+v\n", *config.backoff)
}
fmt.Printf("gorealis config url: %+v\n", config.url)
config.logger.Printf("gorealis config url: %+v\n", config.url)
return &realisClient{
config: config,
@ -228,32 +266,6 @@ func NewRealisClient(options ...option) (Realis, error) {
}
// Wrapper object to provide future flexibility
type RealisConfig struct {
username, password string
url string
timeoutms int
binTransport, jsonTransport bool
cluster *Cluster
backoff *Backoff
transport thrift.TTransport
protoFactory thrift.TProtocolFactory
}
type Backoff struct {
Duration time.Duration // the base duration
Factor float64 // Duration is multipled by factor each iteration
Jitter float64 // The amount of jitter applied each iteration
Steps int // Exit with error after this many steps
}
var defaultBackoff = Backoff{
Steps: 3,
Duration: 10 * time.Second,
Factor: 5.0,
Jitter: 0.1,
}
// Jitter returns a time.Duration between duration and duration + maxFactor *
// duration.
//
@ -267,33 +279,6 @@ func Jitter(duration time.Duration, maxFactor float64) time.Duration {
return wait
}
// Create a new Client with Cluster information and other details.
func NewDefaultClientUsingCluster(cluster *Cluster, user, passwd string) (Realis, error) {
url, err := LeaderFromZK(*cluster)
if err != nil {
fmt.Println(err)
return nil, err
}
fmt.Printf(" url: %s\n", url)
//Create new configuration with default transport layer
config, err := newDefaultConfig(url, 10000)
if err != nil {
fmt.Println(err)
return nil, err
}
config.username = user
config.password = passwd
config.cluster = cluster
config.url = ""
// Configured for vagrant
AddBasicAuth(config, user, passwd)
r := newClient(config)
return r, nil
}
func GetDefaultClusterFromZKUrl(zkurl string) *Cluster {
return &Cluster{Name: "defaultCluster",
AuthMechanism: "UNAUTHENTICATED",
@ -304,65 +289,6 @@ func GetDefaultClusterFromZKUrl(zkurl string) *Cluster {
}
}
//This api would create default cluster object..
func NewDefaultClientUsingZKUrl(zkUrl, user, passwd string) (Realis, error) {
fmt.Printf(" zkUrl: %s\n", zkUrl)
cluster := GetDefaultClusterFromZKUrl(zkUrl)
url, err := LeaderFromZK(*cluster)
if err != nil {
fmt.Println(err)
return nil, err
}
fmt.Printf(" url: %s\n", url)
//Create new configuration with default transport layer
config, err := newDefaultConfig(url, 10000)
if err != nil {
fmt.Println(err)
return nil, err
}
config.username = user
config.password = passwd
config.cluster = cluster
config.url = ""
// Configured for vagrant
AddBasicAuth(config, user, passwd)
r := newClient(config)
return r, nil
}
func NewDefaultClientUsingUrl(url, user, passwd string) (Realis, error) {
fmt.Printf(" url: %s\n", url)
//Create new configuration with default transport layer
config, err := newDefaultConfig(url, 10000)
if err != nil {
fmt.Println(err)
return nil, err
}
config.username = user
config.password = passwd
config.url = url
config.cluster = nil
// Configured for vagrant
AddBasicAuth(config, user, passwd)
config.backoff = &Backoff{Steps: 2, Duration: 10 * time.Second, Factor: 2.0, Jitter: 0.1}
r := newClient(config)
return r, nil
}
// Create a new Client with a default transport layer
func newClient(realisconfig *RealisConfig) Realis {
return &realisClient{
config: realisconfig,
client: aurora.NewAuroraSchedulerManagerClientFactory(realisconfig.transport, realisconfig.protoFactory),
readonlyClient: aurora.NewReadOnlySchedulerClientFactory(realisconfig.transport, realisconfig.protoFactory),
adminClient: aurora.NewAuroraAdminClientFactory(realisconfig.transport, realisconfig.protoFactory)}
}
// Creates a default Thrift Transport object for communications in gorealis using an HTTP Post Client
func defaultTTransport(urlstr string, timeoutms int) (thrift.TTransport, error) {
jar, err := cookiejar.New(nil)
@ -439,7 +365,7 @@ func basicAuth(username, password string) string {
func (r *realisClient) ReestablishConn() error {
//close existing connection..
fmt.Println("ReestablishConn begin ....")
r.config.logger.Println("ReestablishConn begin ....")
r.Close()
//First check cluster object for re-establish; if not available then try with scheduler url.
//var config *RealisConfig
@ -452,7 +378,7 @@ func (r *realisClient) ReestablishConn() error {
if err != nil {
fmt.Errorf("LeaderFromZK error: %+v\n ", err)
}
fmt.Println("ReestablishConn url: ", url)
r.config.logger.Println("ReestablishConn url: ", url)
if r.config.jsonTransport {
trans, err := newTJSONTransport(url, r.config.timeoutms)
if err != nil {
@ -469,7 +395,7 @@ func (r *realisClient) ReestablishConn() error {
r.config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault()
}
if err != nil {
fmt.Println("error creating config: ", err)
r.config.logger.Println("error creating config: ", err)
}
// Configured for basic-auth
AddBasicAuth(r.config, r.config.username, r.config.password)
@ -478,7 +404,7 @@ func (r *realisClient) ReestablishConn() error {
r.adminClient = aurora.NewAuroraAdminClientFactory(r.config.transport, r.config.protoFactory)
} else if r.config.url != "" && r.config.username != "" && r.config.password != "" {
//Re-establish using scheduler url.
fmt.Println("ReestablishConn url: ", r.config.url)
r.config.logger.Println("ReestablishConn url: ", r.config.url)
if r.config.jsonTransport {
trans, err := newTJSONTransport(url, r.config.timeoutms)
if err != nil {
@ -499,14 +425,14 @@ func (r *realisClient) ReestablishConn() error {
r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(r.config.transport, r.config.protoFactory)
r.adminClient = aurora.NewAuroraAdminClientFactory(r.config.transport, r.config.protoFactory)
} else {
fmt.Println(" Missing Data for ReestablishConn ")
fmt.Println(" r.config.cluster: ", r.config.cluster)
fmt.Println(" r.config.username: ", r.config.username)
fmt.Println(" r.config.passwd: ", r.config.password)
fmt.Println(" r.config.url: ", r.config.url)
r.config.logger.Println(" Missing Data for ReestablishConn ")
r.config.logger.Println(" r.config.cluster: ", r.config.cluster)
r.config.logger.Println(" r.config.username: ", r.config.username)
r.config.logger.Println(" r.config.passwd: ", r.config.password)
r.config.logger.Println(" r.config.url: ", r.config.url)
return errors.New(" Missing Data for ReestablishConn ")
}
fmt.Printf(" config url before return: %+v\n", r.config.url)
r.config.logger.Printf(" config url before return: %+v\n", r.config.url)
return nil
}
@ -645,6 +571,9 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
}
// Sends a create job message to the scheduler with a specific job configuration.
// Although this API is able to create service jobs, it is better to use CreateService instead
// as that API uses the update thrift call which has a few extra features available.
// Use this API to create ad-hoc jobs.
func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
var resp *aurora.Response
var clientErr error
@ -962,6 +891,7 @@ func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks []
}
// Get the task configuration from the aurora scheduler for a job
func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error) {
ids := make(map[int32]bool)
@ -1166,7 +1096,7 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au
// Experienced an connection error
err1 := r.ReestablishConn()
if err1 != nil {
fmt.Println("error in re-establishing connection: ", err1)
r.config.logger.Println("error in re-establishing connection: ", err1)
}
return false, nil
}

View file

@ -60,7 +60,6 @@ func NewDefaultUpdateJob(config *aurora.TaskConfig) *UpdateJob {
req.Settings.MaxPerInstanceFailures = 0
req.Settings.MaxFailedInstances = 0
req.Settings.RollbackOnFailure = true
req.Settings.WaitForBatchCompletion = false
//TODO(rdelvalle): Deep copy job struct to avoid unexpected behavior
return &UpdateJob{job, req}