Merge pull request #25 from kkrishna/master

realisconfig refactor
This commit is contained in:
Renan DelValle 2017-03-31 16:05:41 -04:00 committed by GitHub
commit ff8f10a004
3 changed files with 241 additions and 28 deletions

View file

@ -20,6 +20,8 @@ import (
"io/ioutil"
"os"
"time"
"github.com/rdelval/gorealis"
"github.com/rdelval/gorealis/gen-go/apache/aurora"
"github.com/rdelval/gorealis/response"
@ -63,6 +65,13 @@ func main() {
var monitor *realis.Monitor
var r realis.Realis
var defaultBackoff = &realis.Backoff{
Steps: 2,
Duration: 10 * time.Second,
Factor: 2.0,
Jitter: 0.1,
}
//check if zkUrl is available.
if *zkUrl != "" {
fmt.Println("zkUrl: ", *zkUrl)
@ -75,7 +84,8 @@ func main() {
}
fmt.Printf("cluster: %+v \n", cluster)
r, err = realis.NewDefaultClientUsingCluster(cluster, *username, *password)
//r, err = realis.NewRealisClient(realis.ZKCluster(cluster), realis.BasicAuth(*username, *password), realis.ThriftJSON(), realis.TimeoutMS(15000))
r, err = realis.NewRealisClient(realis.ZKUrl(*zkUrl), realis.BasicAuth(*username, *password), realis.ThriftJSON(), realis.TimeoutMS(15000), realis.BackOff(defaultBackoff))
if err != nil {
fmt.Println(err)
os.Exit(1)
@ -83,11 +93,12 @@ func main() {
monitor = &realis.Monitor{r}
} else {
r, err = realis.NewDefaultClientUsingUrl(*url, *username, *password)
r, err = realis.NewRealisClient(realis.SchedulerUrl(*url), realis.BasicAuth(*username, *password), realis.ThriftJSON(), realis.TimeoutMS(20000), realis.BackOff(defaultBackoff))
if err != nil {
fmt.Println(err)
os.Exit(1)
}
monitor = &realis.Monitor{r}
}
defer r.Close()

View file

@ -36,12 +36,13 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout
Limit: 1,
}
duration := defaultBackoff.Duration
defaultBackoff := m.Client.RealisConfig().backoff
duration := defaultBackoff.Duration //defaultBackoff.Duration
var err error
var respDetail *aurora.Response
for i := 0; i*interval <= timeout; i++ {
for i := 0; i < defaultBackoff.Steps; i++ {
for step := 0; step < defaultBackoff.Steps; step++ {
if i != 0 {
adjusted := duration
if defaultBackoff.Jitter > 0.0 {
@ -94,12 +95,14 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout
}
func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval int, timeout int) (bool, error) {
defaultBackoff := m.Client.RealisConfig().backoff
duration := defaultBackoff.Duration
var err error
var live map[int32]bool
for i := 0; i*interval < timeout; i++ {
for i := 0; i < defaultBackoff.Steps; i++ {
for step := 0; step < defaultBackoff.Steps; step++ {
if i != 0 {
adjusted := duration
if defaultBackoff.Jitter > 0.0 {

245
realis.go
View file

@ -53,6 +53,7 @@ type Realis interface {
StartCronJob(key *aurora.JobKey) (*aurora.Response, error)
GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error)
ReestablishConn() error
RealisConfig() *RealisConfig
Close()
}
@ -62,13 +63,146 @@ type realisClient struct {
readonlyClient *aurora.ReadOnlySchedulerClient
}
type option func(*RealisConfig)
//Config sets for options in RealisConfig.
func BasicAuth(username, password string) option {
return func(config *RealisConfig) {
config.username = username
config.password = password
}
}
func SchedulerUrl(url string) option {
return func(config *RealisConfig) {
config.url = url
}
}
func TimeoutMS(timeout int) option {
return func(config *RealisConfig) {
config.timeoutms = timeout
}
}
func ZKCluster(cluster *Cluster) option {
return func(config *RealisConfig) {
config.cluster = cluster
}
}
func ZKUrl(url string) option {
return func(config *RealisConfig) {
config.cluster = GetDefaultClusterFromZKUrl(url)
}
}
func Retries(backoff *Backoff) option {
return func(config *RealisConfig) {
config.backoff = backoff
}
}
func ThriftJSON() option {
return func(config *RealisConfig) {
config.jsonTransport = true
}
}
func ThriftBinary() option {
return func(config *RealisConfig) {
config.binTransport = true
}
}
func BackOff(b *Backoff) option {
return func(config *RealisConfig) {
config.backoff = b
}
}
func NewRealisClient(options ...option) (Realis, error) {
config := &RealisConfig{}
fmt.Println(" options length: ", options)
for _, opt := range options {
opt(config)
}
//Default timeout
if config.timeoutms == 0 {
config.timeoutms = 10000
}
//Set default Transport to JSON if needed.
if !config.jsonTransport && !config.binTransport {
config.jsonTransport = true
}
var url string
var err error
//Cluster or URL?
if config.cluster != nil {
url, err = LeaderFromZK(*config.cluster)
if err != nil {
fmt.Errorf("LeaderFromZK error: %+v\n ", err)
}
fmt.Println("schedURLFromZK: ", url)
} else if config.url != "" {
fmt.Println("Scheduler URL: ", config.url)
url = config.url
} else {
return nil, errors.New("Incomplete Options -- url or cluster required")
}
if config.jsonTransport {
trans, err := defaultTTransport(url, config.timeoutms)
if err != nil {
return nil, errors.Wrap(err, "Error creating realis")
}
httpTrans := (trans).(*thrift.THttpClient)
httpTrans.SetHeader("Content-Type", "application/x-thrift")
config.transport = trans
config.protoFactory = thrift.NewTJSONProtocolFactory()
} else if config.binTransport {
trans, err := defaultTTransport(url, config.timeoutms)
if err != nil {
return nil, errors.Wrap(err, "Error creating realis")
}
httpTrans := (trans).(*thrift.THttpClient)
httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary")
httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary")
httpTrans.SetHeader("User-Agent", "GoRealis v1.0.4")
config.transport = trans
config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault()
}
//Basic Authentication.
if config.username != "" && config.password != "" {
AddBasicAuth(config, config.username, config.password)
}
//Set defaultBackoff if required.
if config.backoff == nil {
config.backoff = &defaultBackoff
}
fmt.Printf("gorealis config: %+v\n", config)
return &realisClient{
config: config,
client: aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory),
readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory)}, nil
}
// Wrapper object to provide future flexibility
type RealisConfig struct {
username, password string
url string
cluster *Cluster
transport thrift.TTransport
protoFactory thrift.TProtocolFactory
username, password string
url string
timeoutms int
binTransport, jsonTransport bool
cluster *Cluster
backoff *Backoff
transport thrift.TTransport
protoFactory thrift.TProtocolFactory
}
type Backoff struct {
@ -268,34 +402,72 @@ func (r *realisClient) ReestablishConn() error {
fmt.Println("ReestablishConn begin ....")
r.Close()
//First check cluster object for re-establish; if not available then try with scheduler url.
//var config *RealisConfig
var err error
var url string
if r.config.cluster != nil && r.config.username != "" && r.config.password != "" {
//Re-establish using cluster object.
url, err := LeaderFromZK(*r.config.cluster)
url, err = LeaderFromZK(*r.config.cluster)
if err != nil {
fmt.Errorf("LeaderFromZK error: %+v\n ", err)
}
fmt.Println("ReestablishConn url: ", url)
config, err := newDefaultConfig(url, 10000)
if r.config.jsonTransport {
trans, err := defaultTTransport(url, r.config.timeoutms)
if err != nil {
return errors.Wrap(err, "Error creating realis")
}
httpTrans := (trans).(*thrift.THttpClient)
httpTrans.SetHeader("Content-Type", "application/x-thrift")
r.config.transport = trans
r.config.protoFactory = thrift.NewTJSONProtocolFactory()
} else if r.config.binTransport {
trans, err := defaultTTransport(url, r.config.timeoutms)
if err != nil {
return errors.Wrap(err, "Error creating realis")
}
httpTrans := (trans).(*thrift.THttpClient)
httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary")
httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary")
httpTrans.SetHeader("User-Agent", "GoRealis v1.0.4")
r.config.transport = trans
r.config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault()
}
if err != nil {
fmt.Println(err)
fmt.Println("error creating config: ", err)
}
// Configured for basic-auth
AddBasicAuth(config, r.config.username, r.config.password)
config.cluster = r.config.cluster
r.config = config
r.client = aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory)
r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory)
AddBasicAuth(r.config, r.config.username, r.config.password)
r.client = aurora.NewAuroraSchedulerManagerClientFactory(r.config.transport, r.config.protoFactory)
r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(r.config.transport, r.config.protoFactory)
} else if r.config.url != "" && r.config.username != "" && r.config.password != "" {
//Re-establish using scheduler url.
//Create new configuration with default transport layer
config, err := newDefaultConfig(r.config.url, 10000)
if err != nil {
fmt.Println(err)
fmt.Println("ReestablishConn url: ", r.config.url)
if r.config.jsonTransport {
trans, err := defaultTTransport(r.config.url, r.config.timeoutms)
if err != nil {
return errors.Wrap(err, "Error creating realis")
}
httpTrans := (trans).(*thrift.THttpClient)
httpTrans.SetHeader("Content-Type", "application/x-thrift")
r.config.transport = trans
r.config.protoFactory = thrift.NewTJSONProtocolFactory()
} else if r.config.binTransport {
trans, err := defaultTTransport(r.config.url, r.config.timeoutms)
if err != nil {
return errors.Wrap(err, "Error creating realis")
}
httpTrans := (trans).(*thrift.THttpClient)
httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary")
httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary")
httpTrans.SetHeader("User-Agent", "GoRealis v1.0.4")
r.config.transport = trans
r.config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault()
}
AddBasicAuth(config, r.config.username, r.config.password)
r.config = config
r.client = aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory)
r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory)
AddBasicAuth(r.config, r.config.username, r.config.password)
r.client = aurora.NewAuroraSchedulerManagerClientFactory(r.config.transport, r.config.protoFactory)
r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(r.config.transport, r.config.protoFactory)
} else {
fmt.Println(" Missing Data for ReestablishConn ")
fmt.Println(" r.config.cluster: ", r.config.cluster)
@ -304,6 +476,7 @@ func (r *realisClient) ReestablishConn() error {
fmt.Println(" r.config.url: ", r.config.url)
return errors.New(" Missing Data for ReestablishConn ")
}
fmt.Printf(" config before return: %+v\n", r.config)
return nil
}
@ -323,6 +496,8 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche
var resp *aurora.Response
var err error
fmt.Printf(" config: %+v\n", r.config)
defaultBackoff := r.config.backoff
duration := defaultBackoff.Duration
for i := 0; i < defaultBackoff.Steps; i++ {
if i != 0 {
@ -361,6 +536,7 @@ func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue
var resp *aurora.Response
var err error
defaultBackoff := r.config.backoff
duration := defaultBackoff.Duration
for i := 0; i < defaultBackoff.Steps; i++ {
if i != 0 {
@ -390,6 +566,8 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a
instanceIds := make(map[int32]bool)
var resp *aurora.Response
var err error
defaultBackoff := r.config.backoff
duration := defaultBackoff.Duration
for _, instId := range instances {
@ -418,6 +596,10 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a
return nil, errors.Wrap(err, "Error sending Kill command to Aurora Scheduler")
}
func (r *realisClient) RealisConfig() *RealisConfig {
return r.config
}
// Sends a kill message to the scheduler for all active tasks under a job.
func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
@ -430,9 +612,10 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
}
if len(instanceIds) > 0 {
defaultBackoff := r.config.backoff
duration := defaultBackoff.Duration
for i := 0; i < defaultBackoff.Steps; i++ {
fmt.Println(" STEPS: ", i)
if i != 0 {
adjusted := duration
if defaultBackoff.Jitter > 0.0 {
@ -464,9 +647,11 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
var resp *aurora.Response
var err error
defaultBackoff := r.config.backoff
duration := defaultBackoff.Duration
for i := 0; i < defaultBackoff.Steps; i++ {
if i != 0 {
fmt.Println(" STEPS: ", i)
adjusted := duration
if defaultBackoff.Jitter > 0.0 {
adjusted = Jitter(duration, defaultBackoff.Jitter)
@ -475,10 +660,11 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
time.Sleep(adjusted)
duration = time.Duration(float64(duration) * defaultBackoff.Factor)
}
fmt.Println(" calling CreateJob")
if resp, err = r.client.CreateJob(auroraJob.JobConfig()); err == nil {
return response.ResponseCodeCheck(resp)
}
fmt.Println("CreateJob err: %+v\n", err)
err1 := r.ReestablishConn()
if err1 != nil {
fmt.Println("error in ReestablishConn: ", err1)
@ -491,6 +677,7 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
var resp *aurora.Response
var err error
defaultBackoff := r.config.backoff
duration := defaultBackoff.Duration
for i := 0; i < defaultBackoff.Steps; i++ {
if i != 0 {
@ -519,6 +706,7 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response,
var resp *aurora.Response
var err error
defaultBackoff := r.config.backoff
duration := defaultBackoff.Duration
for i := 0; i < defaultBackoff.Steps; i++ {
if i != 0 {
@ -547,6 +735,7 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error
var resp *aurora.Response
var err error
defaultBackoff := r.config.backoff
duration := defaultBackoff.Duration
for i := 0; i < defaultBackoff.Steps; i++ {
if i != 0 {
@ -580,6 +769,8 @@ func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32)
}
var resp *aurora.Response
var err error
defaultBackoff := r.config.backoff
duration := defaultBackoff.Duration
for i := 0; i < defaultBackoff.Steps; i++ {
if i != 0 {
@ -615,6 +806,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
var resp *aurora.Response
var err error
if len(instanceIds) > 0 {
defaultBackoff := r.config.backoff
duration := defaultBackoff.Duration
for i := 0; i < defaultBackoff.Steps; i++ {
if i != 0 {
@ -649,6 +841,7 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au
var resp *aurora.Response
var err error
defaultBackoff := r.config.backoff
duration := defaultBackoff.Duration
for i := 0; i < defaultBackoff.Steps; i++ {
if i != 0 {
@ -678,6 +871,7 @@ func (r *realisClient) AbortJobUpdate(
var resp *aurora.Response
var err error
defaultBackoff := r.config.backoff
duration := defaultBackoff.Duration
for i := 0; i < defaultBackoff.Steps; i++ {
if i != 0 {
@ -707,6 +901,7 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a
var resp *aurora.Response
var err error
defaultBackoff := r.config.backoff
duration := defaultBackoff.Duration
for i := 0; i < defaultBackoff.Steps; i++ {
if i != 0 {
@ -755,6 +950,7 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.S
var resp *aurora.Response
var err error
defaultBackoff := r.config.backoff
duration := defaultBackoff.Duration
for i := 0; i < defaultBackoff.Steps; i++ {
if i != 0 {
@ -800,6 +996,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
var resp *aurora.Response
var err error
defaultBackoff := r.config.backoff
duration := defaultBackoff.Duration
for i := 0; i < defaultBackoff.Steps; i++ {
if i != 0 {
@ -849,6 +1046,7 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur
var resp *aurora.Response
var err error
defaultBackoff := r.config.backoff
duration := defaultBackoff.Duration
for i := 0; i < defaultBackoff.Steps; i++ {
if i != 0 {
@ -875,6 +1073,7 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string
var resp *aurora.Response
var err error
defaultBackoff := r.config.backoff
duration := defaultBackoff.Duration
for i := 0; i < defaultBackoff.Steps; i++ {
if i != 0 {