add resiliency for LeaderFromZK and other fixes .

This commit is contained in:
Kumar Krishna 2017-04-06 23:15:44 -07:00
parent 48ca520eaa
commit e57dc98d65
3 changed files with 69 additions and 30 deletions

View file

@ -122,6 +122,32 @@ func BackOff(b *Backoff) option {
}
}
func newTJSONTransport(url string, timeout int) (thrift.TTransport, error) {
trans, err := defaultTTransport(url, timeout)
if err != nil {
return nil, errors.Wrap(err, "Error creating realis")
}
httpTrans := (trans).(*thrift.THttpClient)
httpTrans.SetHeader("Content-Type", "application/x-thrift")
httpTrans.SetHeader("User-Agent", "GoRealis v"+VERSION)
return trans, err
}
func newTBinTransport(url string, timeout int) (thrift.TTransport, error) {
trans, err := defaultTTransport(url, timeout)
if err != nil {
return nil, errors.Wrap(err, "Error creating realis")
}
httpTrans := (trans).(*thrift.THttpClient)
httpTrans.DelHeader("Content-Type") // Workaround for using thrift HttpPostClient
httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary")
httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary")
httpTrans.SetHeader("User-Agent", "GoRealis v"+VERSION)
return trans, err
}
func NewRealisClient(options ...option) (Realis, error) {
config := &RealisConfig{}
fmt.Println(" options length: ", options)
@ -153,23 +179,18 @@ func NewRealisClient(options ...option) (Realis, error) {
}
if config.jsonTransport {
trans, err := defaultTTransport(url, config.timeoutms)
trans, err := newTJSONTransport(url, config.timeoutms)
if err != nil {
return nil, errors.Wrap(err, "Error creating realis")
}
httpTrans := (trans).(*thrift.THttpClient)
httpTrans.SetHeader("Content-Type", "application/x-thrift")
config.transport = trans
config.protoFactory = thrift.NewTJSONProtocolFactory()
} else if config.binTransport {
trans, err := defaultTTransport(url, config.timeoutms)
trans, err := newTBinTransport(url, config.timeoutms)
if err != nil {
return nil, errors.Wrap(err, "Error creating realis")
}
httpTrans := (trans).(*thrift.THttpClient)
httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary")
httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary")
httpTrans.SetHeader("User-Agent", "GoRealis v1.0.4")
config.transport = trans
config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault()
}
@ -182,6 +203,9 @@ func NewRealisClient(options ...option) (Realis, error) {
//Set defaultBackoff if required.
if config.backoff == nil {
config.backoff = &defaultBackoff
} else {
defaultBackoff = *config.backoff
fmt.Printf(" updating default backoff : %+v\n", *config.backoff)
}
fmt.Printf("gorealis config: %+v\n", config)
@ -414,23 +438,17 @@ func (r *realisClient) ReestablishConn() error {
}
fmt.Println("ReestablishConn url: ", url)
if r.config.jsonTransport {
trans, err := defaultTTransport(url, r.config.timeoutms)
trans, err := newTJSONTransport(url, r.config.timeoutms)
if err != nil {
return errors.Wrap(err, "Error creating realis")
}
httpTrans := (trans).(*thrift.THttpClient)
httpTrans.SetHeader("Content-Type", "application/x-thrift")
r.config.transport = trans
r.config.protoFactory = thrift.NewTJSONProtocolFactory()
} else if r.config.binTransport {
trans, err := defaultTTransport(url, r.config.timeoutms)
trans, err := newTBinTransport(url, r.config.timeoutms)
if err != nil {
return errors.Wrap(err, "Error creating realis")
}
httpTrans := (trans).(*thrift.THttpClient)
httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary")
httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary")
httpTrans.SetHeader("User-Agent", "GoRealis v1.0.4")
r.config.transport = trans
r.config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault()
}
@ -445,23 +463,17 @@ func (r *realisClient) ReestablishConn() error {
//Re-establish using scheduler url.
fmt.Println("ReestablishConn url: ", r.config.url)
if r.config.jsonTransport {
trans, err := defaultTTransport(r.config.url, r.config.timeoutms)
trans, err := newTJSONTransport(url, r.config.timeoutms)
if err != nil {
return errors.Wrap(err, "Error creating realis")
}
httpTrans := (trans).(*thrift.THttpClient)
httpTrans.SetHeader("Content-Type", "application/x-thrift")
r.config.transport = trans
r.config.protoFactory = thrift.NewTJSONProtocolFactory()
} else if r.config.binTransport {
trans, err := defaultTTransport(r.config.url, r.config.timeoutms)
trans, err := newTBinTransport(url, r.config.timeoutms)
if err != nil {
return errors.Wrap(err, "Error creating realis")
}
httpTrans := (trans).(*thrift.THttpClient)
httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary")
httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary")
httpTrans.SetHeader("User-Agent", "GoRealis v1.0.4")
r.config.transport = trans
r.config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault()
}
@ -496,7 +508,6 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche
var resp *aurora.Response
var err error
fmt.Printf(" config: %+v\n", r.config)
defaultBackoff := r.config.backoff
duration := defaultBackoff.Duration
for i := 0; i < defaultBackoff.Steps; i++ {
@ -615,7 +626,6 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
defaultBackoff := r.config.backoff
duration := defaultBackoff.Duration
for i := 0; i < defaultBackoff.Steps; i++ {
fmt.Println(" STEPS: ", i)
if i != 0 {
adjusted := duration
if defaultBackoff.Jitter > 0.0 {