Updating zookeeper dependency since logging problem has been solved in main repository. Go fmt run on project to tidy it up.

This commit is contained in:
Renan DelValle 2016-11-02 20:41:43 -04:00
parent b3e55be98b
commit 3bf2e8a831
13 changed files with 296 additions and 152 deletions

View file

@ -15,9 +15,7 @@ import (
"errors"
"fmt"
"io"
"log"
"net"
"os"
"strconv"
"strings"
"sync"
@ -46,9 +44,9 @@ const (
type watchType int
const (
watchTypeData = iota
watchTypeExist = iota
watchTypeChild = iota
watchTypeData = iota
watchTypeExist
watchTypeChild
)
type watchPathType struct {
@ -63,13 +61,9 @@ type Logger interface {
Printf(string, ...interface{})
}
// NoOp logger -- http://stackoverflow.com/questions/10571182/go-disable-a-log-logger
type NopLogger struct {
*log.Logger
}
func (l NopLogger) Printf(string, ...interface{}) {
// noop
type authCreds struct {
scheme string
auth []byte
}
type Conn struct {
@ -86,21 +80,28 @@ type Conn struct {
server string // remember the address/port of the current server
conn net.Conn
eventChan chan Event
eventCallback EventCallback // may be nil
shouldQuit chan struct{}
pingInterval time.Duration
recvTimeout time.Duration
connectTimeout time.Duration
creds []authCreds
credsMu sync.Mutex // protects server
sendChan chan *request
requests map[int32]*request // Xid -> pending request
requestsLock sync.Mutex
watchers map[watchPathType][]chan Event
watchersLock sync.Mutex
closeChan chan struct{} // channel to tell send loop stop
// Debug (used by unit tests)
reconnectDelay time.Duration
logger Logger
buf []byte
}
// connOption represents a connection option.
@ -196,6 +197,7 @@ func Connect(servers []string, sessionTimeout time.Duration, options ...connOpti
watchers: make(map[watchPathType][]chan Event),
passwd: emptyPassword,
logger: DefaultLogger,
buf: make([]byte, bufferSize),
// Debug
reconnectDelay: 0,
@ -235,17 +237,15 @@ func WithHostProvider(hostProvider HostProvider) connOption {
}
}
// WithLogger returns a connection option specifying a non-default logger <PR# #101>
func WithLogger(logger Logger) connOption {
return func(c *Conn) {
c.logger = logger
}
}
// EventCallback is a function that is called when an Event occurs.
type EventCallback func(Event)
// WithLogger returns a connection option specifying a non-default logger <PR# #101>
func WithoutLogger() connOption {
// WithEventCallback returns a connection option that specifies an event
// callback.
// The callback must not block - doing so would delay the ZK go routines.
func WithEventCallback(cb EventCallback) connOption {
return func(c *Conn) {
c.logger = NopLogger{log.New(os.Stderr, "", log.LstdFlags)}
c.eventCallback = cb
}
}
@ -263,7 +263,7 @@ func (c *Conn) State() State {
return State(atomic.LoadInt32((*int32)(&c.state)))
}
// SessionId returns the current session id of the connection.
// SessionID returns the current session id of the connection.
func (c *Conn) SessionID() int64 {
return atomic.LoadInt64(&c.sessionID)
}
@ -283,8 +283,16 @@ func (c *Conn) setTimeouts(sessionTimeoutMs int32) {
func (c *Conn) setState(state State) {
atomic.StoreInt32((*int32)(&c.state), int32(state))
c.sendEvent(Event{Type: EventSession, State: state, Server: c.Server()})
}
func (c *Conn) sendEvent(evt Event) {
if c.eventCallback != nil {
c.eventCallback(evt)
}
select {
case c.eventChan <- Event{Type: EventSession, State: state, Server: c.Server()}:
case c.eventChan <- evt:
default:
// panic("zk: event channel full - it must be monitored and never allowed to be full")
}
@ -321,6 +329,65 @@ func (c *Conn) connect() error {
}
}
func (c *Conn) resendZkAuth(reauthReadyChan chan struct{}) {
c.credsMu.Lock()
defer c.credsMu.Unlock()
defer close(reauthReadyChan)
c.logger.Printf("Re-submitting `%d` credentials after reconnect",
len(c.creds))
for _, cred := range c.creds {
resChan, err := c.sendRequest(
opSetAuth,
&setAuthRequest{Type: 0,
Scheme: cred.scheme,
Auth: cred.auth,
},
&setAuthResponse{},
nil)
if err != nil {
c.logger.Printf("Call to sendRequest failed during credential resubmit: %s", err)
// FIXME(prozlach): lets ignore errors for now
continue
}
res := <-resChan
if res.err != nil {
c.logger.Printf("Credential re-submit failed: %s", res.err)
// FIXME(prozlach): lets ignore errors for now
continue
}
}
}
func (c *Conn) sendRequest(
opcode int32,
req interface{},
res interface{},
recvFunc func(*request, *responseHeader, error),
) (
<-chan response,
error,
) {
rq := &request{
xid: c.nextXid(),
opcode: opcode,
pkt: req,
recvStruct: res,
recvChan: make(chan response, 1),
recvFunc: recvFunc,
}
if err := c.sendData(rq); err != nil {
return nil, err
}
return rq.recvChan, nil
}
func (c *Conn) loop() {
for {
if err := c.connect(); err != nil {
@ -338,13 +405,15 @@ func (c *Conn) loop() {
c.conn.Close()
case err == nil:
c.logger.Printf("Authenticated: id=%d, timeout=%d", c.SessionID(), c.sessionTimeoutMs)
c.hostProvider.Connected() // mark success
closeChan := make(chan struct{}) // channel to tell send loop stop
var wg sync.WaitGroup
c.hostProvider.Connected() // mark success
c.closeChan = make(chan struct{}) // channel to tell send loop stop
reauthChan := make(chan struct{}) // channel to tell send loop that authdata has been resubmitted
var wg sync.WaitGroup
wg.Add(1)
go func() {
err := c.sendLoop(c.conn, closeChan)
<-reauthChan
err := c.sendLoop()
c.logger.Printf("Send loop terminated: err=%v", err)
c.conn.Close() // causes recv loop to EOF/exit
wg.Done()
@ -357,10 +426,12 @@ func (c *Conn) loop() {
if err == nil {
panic("zk: recvLoop should never return nil error")
}
close(closeChan) // tell send loop to exit
close(c.closeChan) // tell send loop to exit
wg.Done()
}()
c.resendZkAuth(reauthChan)
c.sendSetWatches()
wg.Wait()
}
@ -532,66 +603,73 @@ func (c *Conn) authenticate() error {
return nil
}
func (c *Conn) sendLoop(conn net.Conn, closeChan <-chan struct{}) error {
func (c *Conn) sendData(req *request) error {
header := &requestHeader{req.xid, req.opcode}
n, err := encodePacket(c.buf[4:], header)
if err != nil {
req.recvChan <- response{-1, err}
return nil
}
n2, err := encodePacket(c.buf[4+n:], req.pkt)
if err != nil {
req.recvChan <- response{-1, err}
return nil
}
n += n2
binary.BigEndian.PutUint32(c.buf[:4], uint32(n))
c.requestsLock.Lock()
select {
case <-c.closeChan:
req.recvChan <- response{-1, ErrConnectionClosed}
c.requestsLock.Unlock()
return ErrConnectionClosed
default:
}
c.requests[req.xid] = req
c.requestsLock.Unlock()
c.conn.SetWriteDeadline(time.Now().Add(c.recvTimeout))
_, err = c.conn.Write(c.buf[:n+4])
c.conn.SetWriteDeadline(time.Time{})
if err != nil {
req.recvChan <- response{-1, err}
c.conn.Close()
return err
}
return nil
}
func (c *Conn) sendLoop() error {
pingTicker := time.NewTicker(c.pingInterval)
defer pingTicker.Stop()
buf := make([]byte, bufferSize)
for {
select {
case req := <-c.sendChan:
header := &requestHeader{req.xid, req.opcode}
n, err := encodePacket(buf[4:], header)
if err != nil {
req.recvChan <- response{-1, err}
continue
}
n2, err := encodePacket(buf[4+n:], req.pkt)
if err != nil {
req.recvChan <- response{-1, err}
continue
}
n += n2
binary.BigEndian.PutUint32(buf[:4], uint32(n))
c.requestsLock.Lock()
select {
case <-closeChan:
req.recvChan <- response{-1, ErrConnectionClosed}
c.requestsLock.Unlock()
return ErrConnectionClosed
default:
}
c.requests[req.xid] = req
c.requestsLock.Unlock()
conn.SetWriteDeadline(time.Now().Add(c.recvTimeout))
_, err = conn.Write(buf[:n+4])
conn.SetWriteDeadline(time.Time{})
if err != nil {
req.recvChan <- response{-1, err}
conn.Close()
if err := c.sendData(req); err != nil {
return err
}
case <-pingTicker.C:
n, err := encodePacket(buf[4:], &requestHeader{Xid: -2, Opcode: opPing})
n, err := encodePacket(c.buf[4:], &requestHeader{Xid: -2, Opcode: opPing})
if err != nil {
panic("zk: opPing should never fail to serialize")
}
binary.BigEndian.PutUint32(buf[:4], uint32(n))
binary.BigEndian.PutUint32(c.buf[:4], uint32(n))
conn.SetWriteDeadline(time.Now().Add(c.recvTimeout))
_, err = conn.Write(buf[:n+4])
conn.SetWriteDeadline(time.Time{})
c.conn.SetWriteDeadline(time.Now().Add(c.recvTimeout))
_, err = c.conn.Write(c.buf[:n+4])
c.conn.SetWriteDeadline(time.Time{})
if err != nil {
conn.Close()
c.conn.Close()
return err
}
case <-closeChan:
case <-c.closeChan:
return nil
}
}
@ -636,10 +714,7 @@ func (c *Conn) recvLoop(conn net.Conn) error {
Path: res.Path,
Err: nil,
}
select {
case c.eventChan <- ev:
default:
}
c.sendEvent(ev)
wTypes := make([]watchType, 0, 2)
switch res.Type {
case EventNodeCreated:
@ -731,7 +806,28 @@ func (c *Conn) request(opcode int32, req interface{}, res interface{}, recvFunc
func (c *Conn) AddAuth(scheme string, auth []byte) error {
_, err := c.request(opSetAuth, &setAuthRequest{Type: 0, Scheme: scheme, Auth: auth}, &setAuthResponse{}, nil)
return err
if err != nil {
return err
}
// Remember authdata so that it can be re-submitted on reconnect
//
// FIXME(prozlach): For now we treat "userfoo:passbar" and "userfoo:passbar2"
// as two different entries, which will be re-submitted on reconnet. Some
// research is needed on how ZK treats these cases and
// then maybe switch to something like "map[username] = password" to allow
// only single password for given user with users being unique.
obj := authCreds{
scheme: scheme,
auth: auth,
}
c.credsMu.Lock()
c.creds = append(c.creds, obj)
c.credsMu.Unlock()
return nil
}
func (c *Conn) Children(path string) ([]string, *Stat, error) {
@ -892,6 +988,7 @@ func (c *Conn) Sync(path string) (string, error) {
type MultiResponse struct {
Stat *Stat
String string
Error error
}
// Multi executes multiple ZooKeeper operations or none of them. The provided
@ -922,7 +1019,7 @@ func (c *Conn) Multi(ops ...interface{}) ([]MultiResponse, error) {
_, err := c.request(opMulti, req, res, nil)
mr := make([]MultiResponse, len(res.Ops))
for i, op := range res.Ops {
mr[i] = MultiResponse{Stat: op.Stat, String: op.String}
mr[i] = MultiResponse{Stat: op.Stat, String: op.String, Error: op.Err.toError()}
}
return mr, err
}