Merge pull request #26 from kkrishna/master

Add resiliency for LeaderFromZK and minor changes..
This commit is contained in:
kkrishna 2017-04-12 22:29:18 -07:00 committed by GitHub
commit d08c0c637e
3 changed files with 69 additions and 30 deletions

View file

@ -43,7 +43,7 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout
for i := 0; i*interval <= timeout; i++ {
for step := 0; step < defaultBackoff.Steps; step++ {
if i != 0 {
if step != 0 {
adjusted := duration
if defaultBackoff.Jitter > 0.0 {
adjusted = Jitter(duration, defaultBackoff.Jitter)
@ -103,7 +103,7 @@ func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval int, t
for i := 0; i*interval < timeout; i++ {
for step := 0; step < defaultBackoff.Steps; step++ {
if i != 0 {
if step != 0 {
adjusted := duration
if defaultBackoff.Jitter > 0.0 {
adjusted = Jitter(duration, defaultBackoff.Jitter)

View file

@ -122,6 +122,32 @@ func BackOff(b *Backoff) option {
}
}
func newTJSONTransport(url string, timeout int) (thrift.TTransport, error) {
trans, err := defaultTTransport(url, timeout)
if err != nil {
return nil, errors.Wrap(err, "Error creating realis")
}
httpTrans := (trans).(*thrift.THttpClient)
httpTrans.SetHeader("Content-Type", "application/x-thrift")
httpTrans.SetHeader("User-Agent", "GoRealis v"+VERSION)
return trans, err
}
func newTBinTransport(url string, timeout int) (thrift.TTransport, error) {
trans, err := defaultTTransport(url, timeout)
if err != nil {
return nil, errors.Wrap(err, "Error creating realis")
}
httpTrans := (trans).(*thrift.THttpClient)
httpTrans.DelHeader("Content-Type") // Workaround for using thrift HttpPostClient
httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary")
httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary")
httpTrans.SetHeader("User-Agent", "GoRealis v"+VERSION)
return trans, err
}
func NewRealisClient(options ...option) (Realis, error) {
config := &RealisConfig{}
fmt.Println(" options length: ", options)
@ -153,23 +179,18 @@ func NewRealisClient(options ...option) (Realis, error) {
}
if config.jsonTransport {
trans, err := defaultTTransport(url, config.timeoutms)
trans, err := newTJSONTransport(url, config.timeoutms)
if err != nil {
return nil, errors.Wrap(err, "Error creating realis")
}
httpTrans := (trans).(*thrift.THttpClient)
httpTrans.SetHeader("Content-Type", "application/x-thrift")
config.transport = trans
config.protoFactory = thrift.NewTJSONProtocolFactory()
} else if config.binTransport {
trans, err := defaultTTransport(url, config.timeoutms)
trans, err := newTBinTransport(url, config.timeoutms)
if err != nil {
return nil, errors.Wrap(err, "Error creating realis")
}
httpTrans := (trans).(*thrift.THttpClient)
httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary")
httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary")
httpTrans.SetHeader("User-Agent", "GoRealis v1.0.4")
config.transport = trans
config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault()
}
@ -182,6 +203,9 @@ func NewRealisClient(options ...option) (Realis, error) {
//Set defaultBackoff if required.
if config.backoff == nil {
config.backoff = &defaultBackoff
} else {
defaultBackoff = *config.backoff
fmt.Printf(" updating default backoff : %+v\n", *config.backoff)
}
fmt.Printf("gorealis config: %+v\n", config)
@ -414,23 +438,17 @@ func (r *realisClient) ReestablishConn() error {
}
fmt.Println("ReestablishConn url: ", url)
if r.config.jsonTransport {
trans, err := defaultTTransport(url, r.config.timeoutms)
trans, err := newTJSONTransport(url, r.config.timeoutms)
if err != nil {
return errors.Wrap(err, "Error creating realis")
}
httpTrans := (trans).(*thrift.THttpClient)
httpTrans.SetHeader("Content-Type", "application/x-thrift")
r.config.transport = trans
r.config.protoFactory = thrift.NewTJSONProtocolFactory()
} else if r.config.binTransport {
trans, err := defaultTTransport(url, r.config.timeoutms)
trans, err := newTBinTransport(url, r.config.timeoutms)
if err != nil {
return errors.Wrap(err, "Error creating realis")
}
httpTrans := (trans).(*thrift.THttpClient)
httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary")
httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary")
httpTrans.SetHeader("User-Agent", "GoRealis v1.0.4")
r.config.transport = trans
r.config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault()
}
@ -445,23 +463,17 @@ func (r *realisClient) ReestablishConn() error {
//Re-establish using scheduler url.
fmt.Println("ReestablishConn url: ", r.config.url)
if r.config.jsonTransport {
trans, err := defaultTTransport(r.config.url, r.config.timeoutms)
trans, err := newTJSONTransport(url, r.config.timeoutms)
if err != nil {
return errors.Wrap(err, "Error creating realis")
}
httpTrans := (trans).(*thrift.THttpClient)
httpTrans.SetHeader("Content-Type", "application/x-thrift")
r.config.transport = trans
r.config.protoFactory = thrift.NewTJSONProtocolFactory()
} else if r.config.binTransport {
trans, err := defaultTTransport(r.config.url, r.config.timeoutms)
trans, err := newTBinTransport(url, r.config.timeoutms)
if err != nil {
return errors.Wrap(err, "Error creating realis")
}
httpTrans := (trans).(*thrift.THttpClient)
httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary")
httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary")
httpTrans.SetHeader("User-Agent", "GoRealis v1.0.4")
r.config.transport = trans
r.config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault()
}
@ -496,7 +508,6 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche
var resp *aurora.Response
var err error
fmt.Printf(" config: %+v\n", r.config)
defaultBackoff := r.config.backoff
duration := defaultBackoff.Duration
for i := 0; i < defaultBackoff.Steps; i++ {
@ -615,7 +626,6 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
defaultBackoff := r.config.backoff
duration := defaultBackoff.Duration
for i := 0; i < defaultBackoff.Steps; i++ {
fmt.Println(" STEPS: ", i)
if i != 0 {
adjusted := duration
if defaultBackoff.Jitter > 0.0 {

33
zk.go
View file

@ -17,11 +17,12 @@ package realis
import (
"encoding/json"
"fmt"
"github.com/pkg/errors"
"github.com/samuel/go-zookeeper/zk"
"strconv"
"strings"
"time"
"github.com/pkg/errors"
"github.com/samuel/go-zookeeper/zk"
)
type Endpoint struct {
@ -43,6 +44,33 @@ func (NoopLogger) Printf(format string, a ...interface{}) {
// Loads leader from ZK endpoint.
func LeaderFromZK(cluster Cluster) (string, error) {
var err error
var zkurl string
duration := defaultBackoff.Duration
for i := 0; i < defaultBackoff.Steps; i++ {
if i != 0 {
adjusted := duration
if defaultBackoff.Jitter > 0.0 {
adjusted = Jitter(duration, defaultBackoff.Jitter)
}
fmt.Println(" sleeping for: ", adjusted)
time.Sleep(adjusted)
duration = time.Duration(float64(duration) * defaultBackoff.Factor)
}
if zkurl, err = leaderFromZK(cluster); err == nil {
return zkurl, err
}
if err != nil {
fmt.Println("error in LeaderFromZK: ", err)
}
}
return "", err
}
func leaderFromZK(cluster Cluster) (string, error) {
endpoints := strings.Split(cluster.ZK, ",")
//TODO (rdelvalle): When enabling debugging, change logger here
@ -92,4 +120,5 @@ func LeaderFromZK(cluster Cluster) (string, error) {
}
return "", errors.New("No leader found")
}