diff --git a/examples/client.go b/examples/client.go index 00f4159..078ff8c 100644 --- a/examples/client.go +++ b/examples/client.go @@ -20,6 +20,8 @@ import ( "io/ioutil" "os" + "time" + "github.com/rdelval/gorealis" "github.com/rdelval/gorealis/gen-go/apache/aurora" "github.com/rdelval/gorealis/response" @@ -63,6 +65,13 @@ func main() { var monitor *realis.Monitor var r realis.Realis + var defaultBackoff = &realis.Backoff{ + Steps: 2, + Duration: 10 * time.Second, + Factor: 2.0, + Jitter: 0.1, + } + //check if zkUrl is available. if *zkUrl != "" { fmt.Println("zkUrl: ", *zkUrl) @@ -75,7 +84,8 @@ func main() { } fmt.Printf("cluster: %+v \n", cluster) - r, err = realis.NewDefaultClientUsingCluster(cluster, *username, *password) + //r, err = realis.NewRealisClient(realis.ZKCluster(cluster), realis.BasicAuth(*username, *password), realis.ThriftJSON(), realis.TimeoutMS(15000)) + r, err = realis.NewRealisClient(realis.ZKUrl(*zkUrl), realis.BasicAuth(*username, *password), realis.ThriftJSON(), realis.TimeoutMS(15000), realis.BackOff(defaultBackoff)) if err != nil { fmt.Println(err) os.Exit(1) @@ -83,11 +93,12 @@ func main() { monitor = &realis.Monitor{r} } else { - r, err = realis.NewDefaultClientUsingUrl(*url, *username, *password) + r, err = realis.NewRealisClient(realis.SchedulerUrl(*url), realis.BasicAuth(*username, *password), realis.ThriftJSON(), realis.TimeoutMS(20000), realis.BackOff(defaultBackoff)) if err != nil { fmt.Println(err) os.Exit(1) } + monitor = &realis.Monitor{r} } defer r.Close() diff --git a/monitors.go b/monitors.go index d7f863f..d6c1c7e 100644 --- a/monitors.go +++ b/monitors.go @@ -36,12 +36,13 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout Limit: 1, } - duration := defaultBackoff.Duration + defaultBackoff := m.Client.RealisConfig().backoff + duration := defaultBackoff.Duration //defaultBackoff.Duration var err error var respDetail *aurora.Response for i := 0; i*interval <= timeout; i++ { - for i := 0; i < defaultBackoff.Steps; i++ { + for step := 0; step < defaultBackoff.Steps; step++ { if i != 0 { adjusted := duration if defaultBackoff.Jitter > 0.0 { @@ -94,12 +95,14 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout } func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval int, timeout int) (bool, error) { + + defaultBackoff := m.Client.RealisConfig().backoff duration := defaultBackoff.Duration var err error var live map[int32]bool for i := 0; i*interval < timeout; i++ { - for i := 0; i < defaultBackoff.Steps; i++ { + for step := 0; step < defaultBackoff.Steps; step++ { if i != 0 { adjusted := duration if defaultBackoff.Jitter > 0.0 { diff --git a/realis.go b/realis.go index 87b904c..ac78408 100644 --- a/realis.go +++ b/realis.go @@ -53,6 +53,7 @@ type Realis interface { StartCronJob(key *aurora.JobKey) (*aurora.Response, error) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error) ReestablishConn() error + RealisConfig() *RealisConfig Close() } @@ -62,13 +63,146 @@ type realisClient struct { readonlyClient *aurora.ReadOnlySchedulerClient } +type option func(*RealisConfig) + +//Config sets for options in RealisConfig. +func BasicAuth(username, password string) option { + + return func(config *RealisConfig) { + config.username = username + config.password = password + } +} + +func SchedulerUrl(url string) option { + return func(config *RealisConfig) { + config.url = url + } +} + +func TimeoutMS(timeout int) option { + return func(config *RealisConfig) { + config.timeoutms = timeout + } +} + +func ZKCluster(cluster *Cluster) option { + return func(config *RealisConfig) { + config.cluster = cluster + } +} + +func ZKUrl(url string) option { + return func(config *RealisConfig) { + config.cluster = GetDefaultClusterFromZKUrl(url) + } +} + +func Retries(backoff *Backoff) option { + return func(config *RealisConfig) { + config.backoff = backoff + } +} + +func ThriftJSON() option { + return func(config *RealisConfig) { + config.jsonTransport = true + } +} + +func ThriftBinary() option { + return func(config *RealisConfig) { + config.binTransport = true + } +} + +func BackOff(b *Backoff) option { + return func(config *RealisConfig) { + config.backoff = b + } +} + +func NewRealisClient(options ...option) (Realis, error) { + config := &RealisConfig{} + fmt.Println(" options length: ", options) + for _, opt := range options { + opt(config) + } + //Default timeout + if config.timeoutms == 0 { + config.timeoutms = 10000 + } + //Set default Transport to JSON if needed. + if !config.jsonTransport && !config.binTransport { + config.jsonTransport = true + } + var url string + var err error + //Cluster or URL? + if config.cluster != nil { + url, err = LeaderFromZK(*config.cluster) + if err != nil { + fmt.Errorf("LeaderFromZK error: %+v\n ", err) + } + fmt.Println("schedURLFromZK: ", url) + } else if config.url != "" { + fmt.Println("Scheduler URL: ", config.url) + url = config.url + } else { + return nil, errors.New("Incomplete Options -- url or cluster required") + } + + if config.jsonTransport { + trans, err := defaultTTransport(url, config.timeoutms) + if err != nil { + return nil, errors.Wrap(err, "Error creating realis") + } + httpTrans := (trans).(*thrift.THttpClient) + httpTrans.SetHeader("Content-Type", "application/x-thrift") + config.transport = trans + config.protoFactory = thrift.NewTJSONProtocolFactory() + } else if config.binTransport { + trans, err := defaultTTransport(url, config.timeoutms) + if err != nil { + return nil, errors.Wrap(err, "Error creating realis") + } + httpTrans := (trans).(*thrift.THttpClient) + httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary") + httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary") + httpTrans.SetHeader("User-Agent", "GoRealis v1.0.4") + config.transport = trans + config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault() + } + + //Basic Authentication. + if config.username != "" && config.password != "" { + AddBasicAuth(config, config.username, config.password) + } + + //Set defaultBackoff if required. + if config.backoff == nil { + config.backoff = &defaultBackoff + } + + fmt.Printf("gorealis config: %+v\n", config) + + return &realisClient{ + config: config, + client: aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory), + readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory)}, nil + +} + // Wrapper object to provide future flexibility type RealisConfig struct { - username, password string - url string - cluster *Cluster - transport thrift.TTransport - protoFactory thrift.TProtocolFactory + username, password string + url string + timeoutms int + binTransport, jsonTransport bool + cluster *Cluster + backoff *Backoff + transport thrift.TTransport + protoFactory thrift.TProtocolFactory } type Backoff struct { @@ -268,34 +402,72 @@ func (r *realisClient) ReestablishConn() error { fmt.Println("ReestablishConn begin ....") r.Close() //First check cluster object for re-establish; if not available then try with scheduler url. + //var config *RealisConfig + var err error + var url string + if r.config.cluster != nil && r.config.username != "" && r.config.password != "" { //Re-establish using cluster object. - url, err := LeaderFromZK(*r.config.cluster) + url, err = LeaderFromZK(*r.config.cluster) if err != nil { fmt.Errorf("LeaderFromZK error: %+v\n ", err) } fmt.Println("ReestablishConn url: ", url) - config, err := newDefaultConfig(url, 10000) + if r.config.jsonTransport { + trans, err := defaultTTransport(url, r.config.timeoutms) + if err != nil { + return errors.Wrap(err, "Error creating realis") + } + httpTrans := (trans).(*thrift.THttpClient) + httpTrans.SetHeader("Content-Type", "application/x-thrift") + r.config.transport = trans + r.config.protoFactory = thrift.NewTJSONProtocolFactory() + } else if r.config.binTransport { + trans, err := defaultTTransport(url, r.config.timeoutms) + if err != nil { + return errors.Wrap(err, "Error creating realis") + } + httpTrans := (trans).(*thrift.THttpClient) + httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary") + httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary") + httpTrans.SetHeader("User-Agent", "GoRealis v1.0.4") + r.config.transport = trans + r.config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault() + } if err != nil { - fmt.Println(err) + fmt.Println("error creating config: ", err) } // Configured for basic-auth - AddBasicAuth(config, r.config.username, r.config.password) - config.cluster = r.config.cluster - r.config = config - r.client = aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory) - r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory) + AddBasicAuth(r.config, r.config.username, r.config.password) + r.client = aurora.NewAuroraSchedulerManagerClientFactory(r.config.transport, r.config.protoFactory) + r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(r.config.transport, r.config.protoFactory) } else if r.config.url != "" && r.config.username != "" && r.config.password != "" { //Re-establish using scheduler url. - //Create new configuration with default transport layer - config, err := newDefaultConfig(r.config.url, 10000) - if err != nil { - fmt.Println(err) + fmt.Println("ReestablishConn url: ", r.config.url) + if r.config.jsonTransport { + trans, err := defaultTTransport(r.config.url, r.config.timeoutms) + if err != nil { + return errors.Wrap(err, "Error creating realis") + } + httpTrans := (trans).(*thrift.THttpClient) + httpTrans.SetHeader("Content-Type", "application/x-thrift") + r.config.transport = trans + r.config.protoFactory = thrift.NewTJSONProtocolFactory() + } else if r.config.binTransport { + trans, err := defaultTTransport(r.config.url, r.config.timeoutms) + if err != nil { + return errors.Wrap(err, "Error creating realis") + } + httpTrans := (trans).(*thrift.THttpClient) + httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary") + httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary") + httpTrans.SetHeader("User-Agent", "GoRealis v1.0.4") + r.config.transport = trans + r.config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault() } - AddBasicAuth(config, r.config.username, r.config.password) - r.config = config - r.client = aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory) - r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory) + AddBasicAuth(r.config, r.config.username, r.config.password) + r.client = aurora.NewAuroraSchedulerManagerClientFactory(r.config.transport, r.config.protoFactory) + r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(r.config.transport, r.config.protoFactory) } else { fmt.Println(" Missing Data for ReestablishConn ") fmt.Println(" r.config.cluster: ", r.config.cluster) @@ -304,6 +476,7 @@ func (r *realisClient) ReestablishConn() error { fmt.Println(" r.config.url: ", r.config.url) return errors.New(" Missing Data for ReestablishConn ") } + fmt.Printf(" config before return: %+v\n", r.config) return nil } @@ -323,6 +496,8 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche var resp *aurora.Response var err error + fmt.Printf(" config: %+v\n", r.config) + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -361,6 +536,7 @@ func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -390,6 +566,8 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a instanceIds := make(map[int32]bool) var resp *aurora.Response var err error + + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for _, instId := range instances { @@ -418,6 +596,10 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a return nil, errors.Wrap(err, "Error sending Kill command to Aurora Scheduler") } +func (r *realisClient) RealisConfig() *RealisConfig { + return r.config +} + // Sends a kill message to the scheduler for all active tasks under a job. func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { @@ -430,9 +612,10 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { } if len(instanceIds) > 0 { - + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { + fmt.Println(" STEPS: ", i) if i != 0 { adjusted := duration if defaultBackoff.Jitter > 0.0 { @@ -464,9 +647,11 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { + fmt.Println(" STEPS: ", i) adjusted := duration if defaultBackoff.Jitter > 0.0 { adjusted = Jitter(duration, defaultBackoff.Jitter) @@ -475,10 +660,11 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { time.Sleep(adjusted) duration = time.Duration(float64(duration) * defaultBackoff.Factor) } - + fmt.Println(" calling CreateJob") if resp, err = r.client.CreateJob(auroraJob.JobConfig()); err == nil { return response.ResponseCodeCheck(resp) } + fmt.Println("CreateJob err: %+v\n", err) err1 := r.ReestablishConn() if err1 != nil { fmt.Println("error in ReestablishConn: ", err1) @@ -491,6 +677,7 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -519,6 +706,7 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -547,6 +735,7 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -580,6 +769,8 @@ func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) } var resp *aurora.Response var err error + + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -615,6 +806,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error) var resp *aurora.Response var err error if len(instanceIds) > 0 { + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -649,6 +841,7 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -678,6 +871,7 @@ func (r *realisClient) AbortJobUpdate( var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -707,6 +901,7 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -755,6 +950,7 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.S var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -800,6 +996,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -849,6 +1046,7 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 { @@ -875,6 +1073,7 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string var resp *aurora.Response var err error + defaultBackoff := r.config.backoff duration := defaultBackoff.Duration for i := 0; i < defaultBackoff.Steps; i++ { if i != 0 {