From b10df0603e8e9c696d04de0e0554d8911c253f1b Mon Sep 17 00:00:00 2001 From: Kumar Krishna Date: Thu, 30 Mar 2017 18:17:21 -0700 Subject: [PATCH] gorealis config refactoring --- examples/client.go | 19 +++- monitors.go | 9 +- realis.go | 241 ++++++++++++++++++++++++++++++++++++++++----- 3 files changed, 241 insertions(+), 28 deletions(-) diff --git a/examples/client.go b/examples/client.go index 00f4159..357886b 100644 --- a/examples/client.go +++ b/examples/client.go @@ -20,6 +20,8 @@ import ( "io/ioutil" "os" + "time" + "github.com/rdelval/gorealis" "github.com/rdelval/gorealis/gen-go/apache/aurora" "github.com/rdelval/gorealis/response" @@ -63,6 +65,13 @@ func main() { var monitor *realis.Monitor var r realis.Realis + var defaultBackoff = &realis.Backoff{ + Steps: 5, + Duration: 5 * time.Second, + Factor: 2.0, + Jitter: 0.1, + } + //check if zkUrl is available. if *zkUrl != "" { fmt.Println("zkUrl: ", *zkUrl) @@ -75,19 +84,25 @@ func main() { } fmt.Printf("cluster: %+v \n", cluster) - r, err = realis.NewDefaultClientUsingCluster(cluster, *username, *password) + r, err = realis.NewRealisClient(realis.ZKCluster(cluster), realis.BasicAuth(*username, *password), realis.ThriftJSON(), realis.TimeoutMS(15000)) if err != nil { fmt.Println(err) os.Exit(1) } + //r, err = realis.NewDefaultClientUsingCluster(cluster, *username, *password) + //if err != nil { + // fmt.Println(err) + // os.Exit(1) + //} monitor = &realis.Monitor{r} } else { - r, err = realis.NewDefaultClientUsingUrl(*url, *username, *password) + r, err = realis.NewRealisClient(realis.SchedulerUrl(*url), realis.BasicAuth(*username, *password), realis.ThriftJSON(), realis.TimeoutMS(20000), realis.BackOff(defaultBackoff)) if err != nil { fmt.Println(err) os.Exit(1) } + monitor = &realis.Monitor{r} } defer r.Close() diff --git a/monitors.go b/monitors.go index d7f863f..d6c1c7e 100644 --- a/monitors.go +++ b/monitors.go @@ -36,12 +36,13 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout Limit: 1, } - duration := defaultBackoff.Duration + defaultBackoff := m.Client.RealisConfig().backoff + duration := defaultBackoff.Duration //defaultBackoff.Duration var err error var respDetail *aurora.Response for i := 0; i*interval <= timeout; i++ { - for i := 0; i < defaultBackoff.Steps; i++ { + for step := 0; step < defaultBackoff.Steps; step++ { if i != 0 { adjusted := duration if defaultBackoff.Jitter > 0.0 { @@ -94,12 +95,14 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout } func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval int, timeout int) (bool, error) { + + defaultBackoff := m.Client.RealisConfig().backoff duration := defaultBackoff.Duration var err error var live map[int32]bool for i := 0; i*interval < timeout; i++ { - for i := 0; i < defaultBackoff.Steps; i++ { + for step := 0; step < defaultBackoff.Steps; step++ { if i != 0 { adjusted := duration if defaultBackoff.Jitter > 0.0 { diff --git a/realis.go b/realis.go index 339f07e..120b026 100644 --- a/realis.go +++ b/realis.go @@ -51,6 +51,7 @@ type Realis interface { StartCronJob(key *aurora.JobKey) (*aurora.Response, error) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error) ReestablishConn() error + RealisConfig() *RealisConfig Close() } @@ -60,13 +61,140 @@ type realisClient struct { readonlyClient *aurora.ReadOnlySchedulerClient } +type option func(*RealisConfig) + +//Config sets for options in RealisConfig. +func BasicAuth(username, password string) option { + + return func(config *RealisConfig) { + config.username = username + config.password = password + } +} + +func SchedulerUrl(url string) option { + return func(config *RealisConfig) { + config.url = url + } +} + +func TimeoutMS(timeout int) option { + return func(config *RealisConfig) { + config.timeoutms = timeout + } +} + +func ZKCluster(cluster *Cluster) option { + return func(config *RealisConfig) { + config.cluster = cluster + } +} + +func Retries(backoff *Backoff) option { + return func(config *RealisConfig) { + config.backoff = backoff + } +} + +func ThriftJSON() option { + return func(config *RealisConfig) { + config.jsonTransport = true + } +} + +func ThriftBinary() option { + return func(config *RealisConfig) { + config.binTransport = true + } +} + +func BackOff(b *Backoff) option { + return func(config *RealisConfig) { + config.backoff = b + } +} + +func NewRealisClient(options ...option) (Realis, error) { + config := &RealisConfig{} + fmt.Println(" options length: ", options) + for _, opt := range options { + opt(config) + } + //Default timeout + if config.timeoutms == 0 { + config.timeoutms = 10000 + } + //Set default Transport to JSON if needed. + if !config.jsonTransport && !config.binTransport { + config.jsonTransport = true + } + var url string + var err error + //Cluster or URL? + if config.cluster != nil { + url, err = LeaderFromZK(*config.cluster) + if err != nil { + fmt.Errorf("LeaderFromZK error: %+v\n ", err) + } + fmt.Println("schedURLFromZK: ", url) + } else if config.url != "" { + fmt.Println("Scheduler URL: ", config.url) + url = config.url + } else { + return nil, errors.New("Incomplete Options -- url or cluster required") + } + + if config.jsonTransport { + trans, err := defaultTTransport(url, config.timeoutms) + if err != nil { + return nil, errors.Wrap(err, "Error creating realis") + } + httpTrans := (trans).(*thrift.THttpClient) + httpTrans.SetHeader("Content-Type", "application/x-thrift") + config.transport = trans + config.protoFactory = thrift.NewTJSONProtocolFactory() + } else if config.binTransport { + trans, err := defaultTTransport(url, config.timeoutms) + if err != nil { + return nil, errors.Wrap(err, "Error creating realis") + } + httpTrans := (trans).(*thrift.THttpClient) + httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary") + httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary") + httpTrans.SetHeader("User-Agent", "GoRealis v1.0.4") + config.transport = trans + config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault() + } + + //Basic Authentication. + if config.username != "" && config.password != "" { + AddBasicAuth(config, config.username, config.password) + } + + //Set defaultBackoff if required. + if config.backoff == nil { + config.backoff = &defaultBackoff + } + + fmt.Printf("gorealis config: %+v\n", config) + + return &realisClient{ + config: config, + client: aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory), + readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory)}, nil + +} + // Wrapper object to provide future flexibility type RealisConfig struct { - username, password string - url string - cluster *Cluster - transport thrift.TTransport - protoFactory thrift.TProtocolFactory + username, password string + url string + timeoutms int + binTransport, jsonTransport bool + cluster *Cluster + backoff *Backoff + transport thrift.TTransport + protoFactory thrift.TProtocolFactory } type Backoff struct { @@ -263,34 +391,74 @@ func (r *realisClient) ReestablishConn() error { fmt.Println("ReestablishConn begin ....") r.Close() //First check cluster object for re-establish; if not available then try with scheduler url. + //var config *RealisConfig + var err error + var url string + if r.config.cluster != nil && r.config.username != "" && r.config.password != "" { //Re-establish using cluster object. - url, err := LeaderFromZK(*r.config.cluster) + url, err = LeaderFromZK(*r.config.cluster) if err != nil { fmt.Errorf("LeaderFromZK error: %+v\n ", err) } fmt.Println("ReestablishConn url: ", url) - config, err := newDefaultConfig(url, 10000) + if r.config.jsonTransport { + trans, err := defaultTTransport(url, r.config.timeoutms) + if err != nil { + return errors.Wrap(err, "Error creating realis") + } + httpTrans := (trans).(*thrift.THttpClient) + httpTrans.SetHeader("Content-Type", "application/x-thrift") + r.config.transport = trans + r.config.protoFactory = thrift.NewTJSONProtocolFactory() + } else if r.config.binTransport { + trans, err := defaultTTransport(url, r.config.timeoutms) + if err != nil { + return errors.Wrap(err, "Error creating realis") + } + httpTrans := (trans).(*thrift.THttpClient) + httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary") + httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary") + httpTrans.SetHeader("User-Agent", "GoRealis v1.0.4") + r.config.transport = trans + r.config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault() + } if err != nil { - fmt.Println(err) + fmt.Println("error creating config: ", err) } // Configured for basic-auth - AddBasicAuth(config, r.config.username, r.config.password) - config.cluster = r.config.cluster - r.config = config - r.client = aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory) - r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory) + AddBasicAuth(r.config, r.config.username, r.config.password) + //config.cluster = r.config.cluster + //r.config = config + r.client = aurora.NewAuroraSchedulerManagerClientFactory(r.config.transport, r.config.protoFactory) + r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(r.config.transport, r.config.protoFactory) } else if r.config.url != "" && r.config.username != "" && r.config.password != "" { //Re-establish using scheduler url. - //Create new configuration with default transport layer - config, err := newDefaultConfig(r.config.url, 10000) - if err != nil { - fmt.Println(err) + fmt.Println("ReestablishConn url: ", r.config.url) + if r.config.jsonTransport { + trans, err := defaultTTransport(r.config.url, r.config.timeoutms) + if err != nil { + return errors.Wrap(err, "Error creating realis") + } + httpTrans := (trans).(*thrift.THttpClient) + httpTrans.SetHeader("Content-Type", "application/x-thrift") + r.config.transport = trans + r.config.protoFactory = thrift.NewTJSONProtocolFactory() + } else if r.config.binTransport { + trans, err := defaultTTransport(r.config.url, r.config.timeoutms) + if err != nil { + return errors.Wrap(err, "Error creating realis") + } + httpTrans := (trans).(*thrift.THttpClient) + httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary") + httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary") + httpTrans.SetHeader("User-Agent", "GoRealis v1.0.4") + r.config.transport = trans + r.config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault() } - AddBasicAuth(config, r.config.username, r.config.password) - r.config = config - r.client = aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory) - r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory) + AddBasicAuth(r.config, r.config.username, r.config.password) + r.client = aurora.NewAuroraSchedulerManagerClientFactory(r.config.transport, r.config.protoFactory) + r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(r.config.transport, r.config.protoFactory) } else { fmt.Println(" Missing Data for ReestablishConn ") fmt.Println(" r.config.cluster: ", r.config.cluster) @@ -299,6 +467,7 @@ func (r *realisClient) ReestablishConn() error { fmt.Println(" r.config.url: ", r.config.url) return errors.New(" Missing Data for ReestablishConn ") } + fmt.Printf(" config before return: %+v\n", r.config) return nil } @@ -318,6 +487,8 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche var resp *aurora.Response var err error + fmt.Printf(" config: %+v\n", r.config) + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -356,6 +527,7 @@ func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -385,6 +557,8 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a instanceIds := make(map[int32]bool) var resp *aurora.Response var err error + + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for _, instId := range instances { @@ -413,6 +587,10 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a return nil, errors.Wrap(err, "Error sending Kill command to Aurora Scheduler") } +func (r *realisClient) RealisConfig() *RealisConfig { + return r.config +} + // Sends a kill message to the scheduler for all active tasks under a job. func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { @@ -425,9 +603,10 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { } if len(instanceIds) > 0 { - + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { + fmt.Println(" STEPS: ", i) if i != 0 { adjusted := duration if defaultBackoff.Jitter > 0.0 { @@ -459,9 +638,11 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { + fmt.Println(" STEPS: ", i) adjusted := duration if defaultBackoff.Jitter > 0.0 { adjusted = Jitter(duration, defaultBackoff.Jitter) @@ -470,10 +651,11 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { time.Sleep(adjusted) duration = time.Duration(float64(duration) * defaultBackoff.Factor) } - + fmt.Println(" calling CreateJob") if resp, err = r.client.CreateJob(auroraJob.JobConfig()); err == nil { return response.ResponseCodeCheck(resp) } + fmt.Println("CreateJob err: %+v\n", err) err1 := r.ReestablishConn() if err1 != nil { fmt.Println("error in ReestablishConn: ", err1) @@ -486,6 +668,7 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -514,6 +697,7 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -542,6 +726,7 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -575,6 +760,8 @@ func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) } var resp *aurora.Response var err error + + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -610,6 +797,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) var resp *aurora.Response var err error if len(instanceIds) > 0 { + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -644,6 +832,7 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -673,6 +862,7 @@ func (r *realisClient) AbortJobUpdate( var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -702,6 +892,7 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -750,6 +941,7 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.S var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -795,6 +987,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -844,6 +1037,7 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -870,6 +1064,7 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 {