Moving from govendor to dep, updated dependencies (#48)

* Moving from govendor to dep.

* Making the pull request template more friendly.

* Fixing akward space in PR template.

* goimports run on whole project using ` goimports -w $(find . -type f -name '*.go' -not -path "./vendor/*" -not -path "./gen-go/*")`

source of command: https://gist.github.com/bgentry/fd1ffef7dbde01857f66
This commit is contained in:
Renan DelValle 2018-01-07 13:13:47 -08:00 committed by GitHub
parent 9631aa3aab
commit 8d445c1c77
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2186 changed files with 400410 additions and 352 deletions

1
vendor/github.com/samuel/go-zookeeper/.gitignore generated vendored Normal file
View file

@ -0,0 +1 @@
.DS_Store

31
vendor/github.com/samuel/go-zookeeper/.travis.yml generated vendored Normal file
View file

@ -0,0 +1,31 @@
language: go
go:
- 1.9
jdk:
- oraclejdk9
sudo: false
branches:
only:
- master
before_install:
- wget http://apache.cs.utah.edu/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz
- tar -zxvf zookeeper*tar.gz
- go get github.com/mattn/goveralls
- go get golang.org/x/tools/cmd/cover
script:
- jdk_switcher use oraclejdk9
- go build ./...
- go fmt ./...
- go vet ./...
- go test -i -race ./...
- go test -race -covermode atomic -coverprofile=profile.cov ./zk
- goveralls -coverprofile=profile.cov -service=travis-ci
env:
global:
secure: Coha3DDcXmsekrHCZlKvRAc+pMBaQU1QS/3++3YCCUXVDBWgVsC1ZIc9df4RLdZ/ncGd86eoRq/S+zyn1XbnqK5+ePqwJoUnJ59BE8ZyHLWI9ajVn3fND1MTduu/ksGsS79+IYbdVI5wgjSgjD3Ktp6Y5uPl+BPosjYBGdNcHS4=

11
vendor/github.com/samuel/go-zookeeper/README.md generated vendored Normal file
View file

@ -0,0 +1,11 @@
Native Go Zookeeper Client Library
===================================
[![GoDoc](https://godoc.org/github.com/samuel/go-zookeeper?status.svg)](https://godoc.org/github.com/samuel/go-zookeeper)
[![Build Status](https://travis-ci.org/samuel/go-zookeeper.png)](https://travis-ci.org/samuel/go-zookeeper)
[![Coverage Status](https://coveralls.io/repos/github/samuel/go-zookeeper/badge.svg?branch=master)](https://coveralls.io/github/samuel/go-zookeeper?branch=master)
License
-------
3-clause BSD. See LICENSE file.

View file

@ -0,0 +1,22 @@
package main
import (
"fmt"
"time"
"github.com/samuel/go-zookeeper/zk"
)
func main() {
c, _, err := zk.Connect([]string{"127.0.0.1"}, time.Second) //*10)
if err != nil {
panic(err)
}
children, stat, ch, err := c.ChildrenW("/")
if err != nil {
panic(err)
}
fmt.Printf("%+v %+v\n", children, stat)
e := <-ch
fmt.Printf("%+v\n", e)
}

View file

@ -0,0 +1,314 @@
package zk
import (
"sync"
"testing"
"time"
)
type logWriter struct {
t *testing.T
p string
}
func (lw logWriter) Write(b []byte) (int, error) {
lw.t.Logf("%s%s", lw.p, string(b))
return len(b), nil
}
func TestBasicCluster(t *testing.T) {
ts, err := StartTestCluster(3, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk1, err := ts.Connect(0)
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk1.Close()
zk2, err := ts.Connect(1)
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk2.Close()
time.Sleep(time.Second * 5)
if _, err := zk1.Create("/gozk-test", []byte("foo-cluster"), 0, WorldACL(PermAll)); err != nil {
t.Fatalf("Create failed on node 1: %+v", err)
}
if by, _, err := zk2.Get("/gozk-test"); err != nil {
t.Fatalf("Get failed on node 2: %+v", err)
} else if string(by) != "foo-cluster" {
t.Fatal("Wrong data for node 2")
}
}
// If the current leader dies, then the session is reestablished with the new one.
func TestClientClusterFailover(t *testing.T) {
tc, err := StartTestCluster(3, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer tc.Stop()
zk, evCh, err := tc.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()
sl := NewStateLogger(evCh)
hasSessionEvent1 := sl.NewWatcher(sessionStateMatcher(StateHasSession)).Wait(8 * time.Second)
if hasSessionEvent1 == nil {
t.Fatalf("Failed to connect and get session")
}
if _, err := zk.Create("/gozk-test", []byte("foo-cluster"), 0, WorldACL(PermAll)); err != nil {
t.Fatalf("Create failed on node 1: %+v", err)
}
hasSessionWatcher2 := sl.NewWatcher(sessionStateMatcher(StateHasSession))
// Kill the current leader
tc.StopServer(hasSessionEvent1.Server)
// Wait for the session to be reconnected with the new leader.
if hasSessionWatcher2.Wait(8*time.Second) == nil {
t.Fatalf("Failover failed")
}
if by, _, err := zk.Get("/gozk-test"); err != nil {
t.Fatalf("Get failed on node 2: %+v", err)
} else if string(by) != "foo-cluster" {
t.Fatal("Wrong data for node 2")
}
}
// If a ZooKeeper cluster looses quorum then a session is reconnected as soon
// as the quorum is restored.
func TestNoQuorum(t *testing.T) {
tc, err := StartTestCluster(3, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer tc.Stop()
zk, evCh, err := tc.ConnectAllTimeout(4 * time.Second)
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()
sl := NewStateLogger(evCh)
// Wait for initial session to be established
hasSessionEvent1 := sl.NewWatcher(sessionStateMatcher(StateHasSession)).Wait(8 * time.Second)
if hasSessionEvent1 == nil {
t.Fatalf("Failed to connect and get session")
}
initialSessionID := zk.sessionID
DefaultLogger.Printf(" Session established: id=%d, timeout=%d", zk.sessionID, zk.sessionTimeoutMs)
// Kill the ZooKeeper leader and wait for the session to reconnect.
DefaultLogger.Printf(" Kill the leader")
disconnectWatcher1 := sl.NewWatcher(sessionStateMatcher(StateDisconnected))
hasSessionWatcher2 := sl.NewWatcher(sessionStateMatcher(StateHasSession))
tc.StopServer(hasSessionEvent1.Server)
disconnectedEvent1 := disconnectWatcher1.Wait(8 * time.Second)
if disconnectedEvent1 == nil {
t.Fatalf("Failover failed, missed StateDisconnected event")
}
if disconnectedEvent1.Server != hasSessionEvent1.Server {
t.Fatalf("Unexpected StateDisconnected event, expected=%s, actual=%s",
hasSessionEvent1.Server, disconnectedEvent1.Server)
}
hasSessionEvent2 := hasSessionWatcher2.Wait(8 * time.Second)
if hasSessionEvent2 == nil {
t.Fatalf("Failover failed, missed StateHasSession event")
}
// Kill the ZooKeeper leader leaving the cluster without quorum.
DefaultLogger.Printf(" Kill the leader")
disconnectWatcher2 := sl.NewWatcher(sessionStateMatcher(StateDisconnected))
tc.StopServer(hasSessionEvent2.Server)
disconnectedEvent2 := disconnectWatcher2.Wait(8 * time.Second)
if disconnectedEvent2 == nil {
t.Fatalf("Failover failed, missed StateDisconnected event")
}
if disconnectedEvent2.Server != hasSessionEvent2.Server {
t.Fatalf("Unexpected StateDisconnected event, expected=%s, actual=%s",
hasSessionEvent2.Server, disconnectedEvent2.Server)
}
// Make sure that we keep retrying connecting to the only remaining
// ZooKeeper server, but the attempts are being dropped because there is
// no quorum.
DefaultLogger.Printf(" Retrying no luck...")
var firstDisconnect *Event
begin := time.Now()
for time.Now().Sub(begin) < 6*time.Second {
disconnectedEvent := sl.NewWatcher(sessionStateMatcher(StateDisconnected)).Wait(4 * time.Second)
if disconnectedEvent == nil {
t.Fatalf("Disconnected event expected")
}
if firstDisconnect == nil {
firstDisconnect = disconnectedEvent
continue
}
if disconnectedEvent.Server != firstDisconnect.Server {
t.Fatalf("Disconnect from wrong server: expected=%s, actual=%s",
firstDisconnect.Server, disconnectedEvent.Server)
}
}
// Start a ZooKeeper node to restore quorum.
hasSessionWatcher3 := sl.NewWatcher(sessionStateMatcher(StateHasSession))
tc.StartServer(hasSessionEvent1.Server)
// Make sure that session is reconnected with the same ID.
hasSessionEvent3 := hasSessionWatcher3.Wait(8 * time.Second)
if hasSessionEvent3 == nil {
t.Fatalf("Session has not been reconnected")
}
if zk.sessionID != initialSessionID {
t.Fatalf("Wrong session ID: expected=%d, actual=%d", initialSessionID, zk.sessionID)
}
// Make sure that the session is not dropped soon after reconnect
e := sl.NewWatcher(sessionStateMatcher(StateDisconnected)).Wait(6 * time.Second)
if e != nil {
t.Fatalf("Unexpected disconnect")
}
}
func TestWaitForClose(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk, err := ts.Connect(0)
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
timeout := time.After(30 * time.Second)
CONNECTED:
for {
select {
case ev := <-zk.eventChan:
if ev.State == StateConnected {
break CONNECTED
}
case <-timeout:
zk.Close()
t.Fatal("Timeout")
}
}
zk.Close()
for {
select {
case _, ok := <-zk.eventChan:
if !ok {
return
}
case <-timeout:
t.Fatal("Timeout")
}
}
}
func TestBadSession(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()
if err := zk.Delete("/gozk-test", -1); err != nil && err != ErrNoNode {
t.Fatalf("Delete returned error: %+v", err)
}
zk.conn.Close()
time.Sleep(time.Millisecond * 100)
if err := zk.Delete("/gozk-test", -1); err != nil && err != ErrNoNode {
t.Fatalf("Delete returned error: %+v", err)
}
}
type EventLogger struct {
events []Event
watchers []*EventWatcher
lock sync.Mutex
wg sync.WaitGroup
}
func NewStateLogger(eventCh <-chan Event) *EventLogger {
el := &EventLogger{}
el.wg.Add(1)
go func() {
defer el.wg.Done()
for event := range eventCh {
el.lock.Lock()
for _, sw := range el.watchers {
if !sw.triggered && sw.matcher(event) {
sw.triggered = true
sw.matchCh <- event
}
}
DefaultLogger.Printf(" event received: %v\n", event)
el.events = append(el.events, event)
el.lock.Unlock()
}
}()
return el
}
func (el *EventLogger) NewWatcher(matcher func(Event) bool) *EventWatcher {
ew := &EventWatcher{matcher: matcher, matchCh: make(chan Event, 1)}
el.lock.Lock()
el.watchers = append(el.watchers, ew)
el.lock.Unlock()
return ew
}
func (el *EventLogger) Events() []Event {
el.lock.Lock()
transitions := make([]Event, len(el.events))
copy(transitions, el.events)
el.lock.Unlock()
return transitions
}
func (el *EventLogger) Wait4Stop() {
el.wg.Wait()
}
type EventWatcher struct {
matcher func(Event) bool
matchCh chan Event
triggered bool
}
func (ew *EventWatcher) Wait(timeout time.Duration) *Event {
select {
case event := <-ew.matchCh:
return &event
case <-time.After(timeout):
return nil
}
}
func sessionStateMatcher(s State) func(Event) bool {
return func(e Event) bool {
return e.Type == EventSession && e.State == s
}
}

View file

@ -85,6 +85,7 @@ type Conn struct {
pingInterval time.Duration
recvTimeout time.Duration
connectTimeout time.Duration
maxBufferSize int
creds []authCreds
credsMu sync.Mutex // protects server
@ -97,9 +98,12 @@ type Conn struct {
closeChan chan struct{} // channel to tell send loop stop
// Debug (used by unit tests)
reconnectDelay time.Duration
reconnectLatch chan struct{}
setWatchLimit int
setWatchCallback func([]*setWatchesRequest)
logger Logger
logger Logger
logInfo bool // true if information messages are logged; false if only errors are logged
buf []byte
}
@ -197,10 +201,8 @@ func Connect(servers []string, sessionTimeout time.Duration, options ...connOpti
watchers: make(map[watchPathType][]chan Event),
passwd: emptyPassword,
logger: DefaultLogger,
logInfo: true, // default is true for backwards compatability
buf: make([]byte, bufferSize),
// Debug
reconnectDelay: 0,
}
// Set provided options.
@ -237,6 +239,21 @@ func WithHostProvider(hostProvider HostProvider) connOption {
}
}
// WithLogger returns a connection option specifying a non-default Logger
func WithLogger(logger Logger) connOption {
return func(c *Conn) {
c.logger = logger
}
}
// WithLogInfo returns a connection option specifying whether or not information messages
// shoud be logged.
func WithLogInfo(logInfo bool) connOption {
return func(c *Conn) {
c.logInfo = logInfo
}
}
// EventCallback is a function that is called when an Event occurs.
type EventCallback func(Event)
@ -249,6 +266,46 @@ func WithEventCallback(cb EventCallback) connOption {
}
}
// WithMaxBufferSize sets the maximum buffer size used to read and decode
// packets received from the Zookeeper server. The standard Zookeeper client for
// Java defaults to a limit of 1mb. For backwards compatibility, this Go client
// defaults to unbounded unless overridden via this option. A value that is zero
// or negative indicates that no limit is enforced.
//
// This is meant to prevent resource exhaustion in the face of potentially
// malicious data in ZK. It should generally match the server setting (which
// also defaults ot 1mb) so that clients and servers agree on the limits for
// things like the size of data in an individual znode and the total size of a
// transaction.
//
// For production systems, this should be set to a reasonable value (ideally
// that matches the server configuration). For ops tooling, it is handy to use a
// much larger limit, in order to do things like clean-up problematic state in
// the ZK tree. For example, if a single znode has a huge number of children, it
// is possible for the response to a "list children" operation to exceed this
// buffer size and cause errors in clients. The only way to subsequently clean
// up the tree (by removing superfluous children) is to use a client configured
// with a larger buffer size that can successfully query for all of the child
// names and then remove them. (Note there are other tools that can list all of
// the child names without an increased buffer size in the client, but they work
// by inspecting the servers' transaction logs to enumerate children instead of
// sending an online request to a server.
func WithMaxBufferSize(maxBufferSize int) connOption {
return func(c *Conn) {
c.maxBufferSize = maxBufferSize
}
}
// WithMaxConnBufferSize sets maximum buffer size used to send and encode
// packets to Zookeeper server. The standard Zookeepeer client for java defaults
// to a limit of 1mb. This option should be used for non-standard server setup
// where znode is bigger than default 1mb.
func WithMaxConnBufferSize(maxBufferSize int) connOption {
return func(c *Conn) {
c.buf = make([]byte, maxBufferSize)
}
}
func (c *Conn) Close() {
close(c.shouldQuit)
@ -321,7 +378,9 @@ func (c *Conn) connect() error {
if err == nil {
c.conn = zkConn
c.setState(StateConnected)
c.logger.Printf("Connected to %s", c.Server())
if c.logInfo {
c.logger.Printf("Connected to %s", c.Server())
}
return nil
}
@ -335,8 +394,10 @@ func (c *Conn) resendZkAuth(reauthReadyChan chan struct{}) {
defer close(reauthReadyChan)
c.logger.Printf("Re-submitting `%d` credentials after reconnect",
len(c.creds))
if c.logInfo {
c.logger.Printf("Re-submitting `%d` credentials after reconnect",
len(c.creds))
}
for _, cred := range c.creds {
resChan, err := c.sendRequest(
@ -404,7 +465,9 @@ func (c *Conn) loop() {
c.logger.Printf("Authentication failed: %s", err)
c.conn.Close()
case err == nil:
c.logger.Printf("Authenticated: id=%d, timeout=%d", c.SessionID(), c.sessionTimeoutMs)
if c.logInfo {
c.logger.Printf("Authenticated: id=%d, timeout=%d", c.SessionID(), c.sessionTimeoutMs)
}
c.hostProvider.Connected() // mark success
c.closeChan = make(chan struct{}) // channel to tell send loop stop
reauthChan := make(chan struct{}) // channel to tell send loop that authdata has been resubmitted
@ -414,7 +477,9 @@ func (c *Conn) loop() {
go func() {
<-reauthChan
err := c.sendLoop()
c.logger.Printf("Send loop terminated: err=%v", err)
if err != nil || c.logInfo {
c.logger.Printf("Send loop terminated: err=%v", err)
}
c.conn.Close() // causes recv loop to EOF/exit
wg.Done()
}()
@ -422,7 +487,9 @@ func (c *Conn) loop() {
wg.Add(1)
go func() {
err := c.recvLoop(c.conn)
c.logger.Printf("Recv loop terminated: err=%v", err)
if err != io.EOF || c.logInfo {
c.logger.Printf("Recv loop terminated: err=%v", err)
}
if err == nil {
panic("zk: recvLoop should never return nil error")
}
@ -450,11 +517,11 @@ func (c *Conn) loop() {
}
c.flushRequests(err)
if c.reconnectDelay > 0 {
if c.reconnectLatch != nil {
select {
case <-c.shouldQuit:
return
case <-time.After(c.reconnectDelay):
case <-c.reconnectLatch:
}
}
}
@ -506,17 +573,41 @@ func (c *Conn) sendSetWatches() {
return
}
req := &setWatchesRequest{
RelativeZxid: c.lastZxid,
DataWatches: make([]string, 0),
ExistWatches: make([]string, 0),
ChildWatches: make([]string, 0),
// NB: A ZK server, by default, rejects packets >1mb. So, if we have too
// many watches to reset, we need to break this up into multiple packets
// to avoid hitting that limit. Mirroring the Java client behavior: we are
// conservative in that we limit requests to 128kb (since server limit is
// is actually configurable and could conceivably be configured smaller
// than default of 1mb).
limit := 128 * 1024
if c.setWatchLimit > 0 {
limit = c.setWatchLimit
}
var reqs []*setWatchesRequest
var req *setWatchesRequest
var sizeSoFar int
n := 0
for pathType, watchers := range c.watchers {
if len(watchers) == 0 {
continue
}
addlLen := 4 + len(pathType.path)
if req == nil || sizeSoFar+addlLen > limit {
if req != nil {
// add to set of requests that we'll send
reqs = append(reqs, req)
}
sizeSoFar = 28 // fixed overhead of a set-watches packet
req = &setWatchesRequest{
RelativeZxid: c.lastZxid,
DataWatches: make([]string, 0),
ExistWatches: make([]string, 0),
ChildWatches: make([]string, 0),
}
}
sizeSoFar += addlLen
switch pathType.wType {
case watchTypeData:
req.DataWatches = append(req.DataWatches, pathType.path)
@ -530,12 +621,26 @@ func (c *Conn) sendSetWatches() {
if n == 0 {
return
}
if req != nil { // don't forget any trailing packet we were building
reqs = append(reqs, req)
}
if c.setWatchCallback != nil {
c.setWatchCallback(reqs)
}
go func() {
res := &setWatchesResponse{}
_, err := c.request(opSetWatches, req, res, nil)
if err != nil {
c.logger.Printf("Failed to set previous watches: %s", err.Error())
// TODO: Pipeline these so queue all of them up before waiting on any
// response. That will require some investigation to make sure there
// aren't failure modes where a blocking write to the channel of requests
// could hang indefinitely and cause this goroutine to leak...
for _, req := range reqs {
_, err := c.request(opSetWatches, req, res, nil)
if err != nil {
c.logger.Printf("Failed to set previous watches: %s", err.Error())
break
}
}
}()
}
@ -676,7 +781,11 @@ func (c *Conn) sendLoop() error {
}
func (c *Conn) recvLoop(conn net.Conn) error {
buf := make([]byte, bufferSize)
sz := bufferSize
if c.maxBufferSize > 0 && sz > c.maxBufferSize {
sz = c.maxBufferSize
}
buf := make([]byte, sz)
for {
// package length
conn.SetReadDeadline(time.Now().Add(c.recvTimeout))
@ -687,6 +796,9 @@ func (c *Conn) recvLoop(conn net.Conn) error {
blen := int(binary.BigEndian.Uint32(buf[:4]))
if cap(buf) < blen {
if c.maxBufferSize > 0 && blen > c.maxBufferSize {
return fmt.Errorf("received packet from server with length %d, which exceeds max buffer size %d", blen, c.maxBufferSize)
}
buf = make([]byte, blen)
}
@ -831,12 +943,20 @@ func (c *Conn) AddAuth(scheme string, auth []byte) error {
}
func (c *Conn) Children(path string) ([]string, *Stat, error) {
if err := validatePath(path, false); err != nil {
return nil, nil, err
}
res := &getChildren2Response{}
_, err := c.request(opGetChildren2, &getChildren2Request{Path: path, Watch: false}, res, nil)
return res.Children, &res.Stat, err
}
func (c *Conn) ChildrenW(path string) ([]string, *Stat, <-chan Event, error) {
if err := validatePath(path, false); err != nil {
return nil, nil, nil, err
}
var ech <-chan Event
res := &getChildren2Response{}
_, err := c.request(opGetChildren2, &getChildren2Request{Path: path, Watch: true}, res, func(req *request, res *responseHeader, err error) {
@ -851,6 +971,10 @@ func (c *Conn) ChildrenW(path string) ([]string, *Stat, <-chan Event, error) {
}
func (c *Conn) Get(path string) ([]byte, *Stat, error) {
if err := validatePath(path, false); err != nil {
return nil, nil, err
}
res := &getDataResponse{}
_, err := c.request(opGetData, &getDataRequest{Path: path, Watch: false}, res, nil)
return res.Data, &res.Stat, err
@ -858,6 +982,10 @@ func (c *Conn) Get(path string) ([]byte, *Stat, error) {
// GetW returns the contents of a znode and sets a watch
func (c *Conn) GetW(path string) ([]byte, *Stat, <-chan Event, error) {
if err := validatePath(path, false); err != nil {
return nil, nil, nil, err
}
var ech <-chan Event
res := &getDataResponse{}
_, err := c.request(opGetData, &getDataRequest{Path: path, Watch: true}, res, func(req *request, res *responseHeader, err error) {
@ -872,15 +1000,20 @@ func (c *Conn) GetW(path string) ([]byte, *Stat, <-chan Event, error) {
}
func (c *Conn) Set(path string, data []byte, version int32) (*Stat, error) {
if path == "" {
return nil, ErrInvalidPath
if err := validatePath(path, false); err != nil {
return nil, err
}
res := &setDataResponse{}
_, err := c.request(opSetData, &SetDataRequest{path, data, version}, res, nil)
return &res.Stat, err
}
func (c *Conn) Create(path string, data []byte, flags int32, acl []ACL) (string, error) {
if err := validatePath(path, flags&FlagSequence == FlagSequence); err != nil {
return "", err
}
res := &createResponse{}
_, err := c.request(opCreate, &CreateRequest{path, data, acl, flags}, res, nil)
return res.Path, err
@ -891,6 +1024,10 @@ func (c *Conn) Create(path string, data []byte, flags int32, acl []ACL) (string,
// ephemeral node still exists. Therefore, on reconnect we need to check if a node
// with a GUID generated on create exists.
func (c *Conn) CreateProtectedEphemeralSequential(path string, data []byte, acl []ACL) (string, error) {
if err := validatePath(path, true); err != nil {
return "", err
}
var guid [16]byte
_, err := io.ReadFull(rand.Reader, guid[:16])
if err != nil {
@ -932,11 +1069,19 @@ func (c *Conn) CreateProtectedEphemeralSequential(path string, data []byte, acl
}
func (c *Conn) Delete(path string, version int32) error {
if err := validatePath(path, false); err != nil {
return err
}
_, err := c.request(opDelete, &DeleteRequest{path, version}, &deleteResponse{}, nil)
return err
}
func (c *Conn) Exists(path string) (bool, *Stat, error) {
if err := validatePath(path, false); err != nil {
return false, nil, err
}
res := &existsResponse{}
_, err := c.request(opExists, &existsRequest{Path: path, Watch: false}, res, nil)
exists := true
@ -948,6 +1093,10 @@ func (c *Conn) Exists(path string) (bool, *Stat, error) {
}
func (c *Conn) ExistsW(path string) (bool, *Stat, <-chan Event, error) {
if err := validatePath(path, false); err != nil {
return false, nil, nil, err
}
var ech <-chan Event
res := &existsResponse{}
_, err := c.request(opExists, &existsRequest{Path: path, Watch: true}, res, func(req *request, res *responseHeader, err error) {
@ -969,17 +1118,29 @@ func (c *Conn) ExistsW(path string) (bool, *Stat, <-chan Event, error) {
}
func (c *Conn) GetACL(path string) ([]ACL, *Stat, error) {
if err := validatePath(path, false); err != nil {
return nil, nil, err
}
res := &getAclResponse{}
_, err := c.request(opGetAcl, &getAclRequest{Path: path}, res, nil)
return res.Acl, &res.Stat, err
}
func (c *Conn) SetACL(path string, acl []ACL, version int32) (*Stat, error) {
if err := validatePath(path, false); err != nil {
return nil, err
}
res := &setAclResponse{}
_, err := c.request(opSetAcl, &setAclRequest{Path: path, Acl: acl, Version: version}, res, nil)
return &res.Stat, err
}
func (c *Conn) Sync(path string) (string, error) {
if err := validatePath(path, false); err != nil {
return "", err
}
res := &syncResponse{}
_, err := c.request(opSync, &syncRequest{Path: path}, res, nil)
return res.Path, err

View file

@ -0,0 +1,24 @@
package zk
import (
"fmt"
"testing"
)
func TestModeString(t *testing.T) {
if fmt.Sprintf("%v", ModeUnknown) != "unknown" {
t.Errorf("unknown value should be 'unknown'")
}
if fmt.Sprintf("%v", ModeLeader) != "leader" {
t.Errorf("leader value should be 'leader'")
}
if fmt.Sprintf("%v", ModeFollower) != "follower" {
t.Errorf("follower value should be 'follower'")
}
if fmt.Sprintf("%v", ModeStandalone) != "standalone" {
t.Errorf("standlone value should be 'standalone'")
}
}

View file

@ -0,0 +1,224 @@
package zk
import (
"fmt"
"log"
"testing"
"time"
)
// localhostLookupHost is a test replacement for net.LookupHost that
// always returns 127.0.0.1
func localhostLookupHost(host string) ([]string, error) {
return []string{"127.0.0.1"}, nil
}
// TestDNSHostProviderCreate is just like TestCreate, but with an
// overridden HostProvider that ignores the provided hostname.
func TestDNSHostProviderCreate(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
port := ts.Servers[0].Port
server := fmt.Sprintf("foo.example.com:%d", port)
hostProvider := &DNSHostProvider{lookupHost: localhostLookupHost}
zk, _, err := Connect([]string{server}, time.Second*15, WithHostProvider(hostProvider))
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()
path := "/gozk-test"
if err := zk.Delete(path, -1); err != nil && err != ErrNoNode {
t.Fatalf("Delete returned error: %+v", err)
}
if p, err := zk.Create(path, []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil {
t.Fatalf("Create returned error: %+v", err)
} else if p != path {
t.Fatalf("Create returned different path '%s' != '%s'", p, path)
}
if data, stat, err := zk.Get(path); err != nil {
t.Fatalf("Get returned error: %+v", err)
} else if stat == nil {
t.Fatal("Get returned nil stat")
} else if len(data) < 4 {
t.Fatal("Get returned wrong size data")
}
}
// localHostPortsFacade wraps a HostProvider, remapping the
// address/port combinations it returns to "localhost:$PORT" where
// $PORT is chosen from the provided ports.
type localHostPortsFacade struct {
inner HostProvider // The wrapped HostProvider
ports []int // The provided list of ports
nextPort int // The next port to use
mapped map[string]string // Already-mapped address/port combinations
}
func newLocalHostPortsFacade(inner HostProvider, ports []int) *localHostPortsFacade {
return &localHostPortsFacade{
inner: inner,
ports: ports,
mapped: make(map[string]string),
}
}
func (lhpf *localHostPortsFacade) Len() int { return lhpf.inner.Len() }
func (lhpf *localHostPortsFacade) Connected() { lhpf.inner.Connected() }
func (lhpf *localHostPortsFacade) Init(servers []string) error { return lhpf.inner.Init(servers) }
func (lhpf *localHostPortsFacade) Next() (string, bool) {
server, retryStart := lhpf.inner.Next()
// If we've already set up a mapping for that server, just return it.
if localMapping := lhpf.mapped[server]; localMapping != "" {
return localMapping, retryStart
}
if lhpf.nextPort == len(lhpf.ports) {
log.Fatalf("localHostPortsFacade out of ports to assign to %q; current config: %q", server, lhpf.mapped)
}
localMapping := fmt.Sprintf("localhost:%d", lhpf.ports[lhpf.nextPort])
lhpf.mapped[server] = localMapping
lhpf.nextPort++
return localMapping, retryStart
}
var _ HostProvider = &localHostPortsFacade{}
// TestDNSHostProviderReconnect tests that the zk.Conn correctly
// reconnects when the Zookeeper instance it's connected to
// restarts. It wraps the DNSHostProvider in a lightweight facade that
// remaps addresses to localhost:$PORT combinations corresponding to
// the test ZooKeeper instances.
func TestDNSHostProviderReconnect(t *testing.T) {
ts, err := StartTestCluster(3, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
innerHp := &DNSHostProvider{lookupHost: func(host string) ([]string, error) {
return []string{"192.0.2.1", "192.0.2.2", "192.0.2.3"}, nil
}}
ports := make([]int, 0, len(ts.Servers))
for _, server := range ts.Servers {
ports = append(ports, server.Port)
}
hp := newLocalHostPortsFacade(innerHp, ports)
zk, _, err := Connect([]string{"foo.example.com:12345"}, time.Second, WithHostProvider(hp))
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()
path := "/gozk-test"
// Initial operation to force connection.
if err := zk.Delete(path, -1); err != nil && err != ErrNoNode {
t.Fatalf("Delete returned error: %+v", err)
}
// Figure out which server we're connected to.
currentServer := zk.Server()
t.Logf("Connected to %q. Finding test server index…", currentServer)
serverIndex := -1
for i, server := range ts.Servers {
server := fmt.Sprintf("localhost:%d", server.Port)
t.Logf("…trying %q", server)
if currentServer == server {
serverIndex = i
t.Logf("…found at index %d", i)
break
}
}
if serverIndex == -1 {
t.Fatalf("Cannot determine test server index.")
}
// Restart the connected server.
ts.Servers[serverIndex].Srv.Stop()
ts.Servers[serverIndex].Srv.Start()
// Continue with the basic TestCreate tests.
if p, err := zk.Create(path, []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil {
t.Fatalf("Create returned error: %+v", err)
} else if p != path {
t.Fatalf("Create returned different path '%s' != '%s'", p, path)
}
if data, stat, err := zk.Get(path); err != nil {
t.Fatalf("Get returned error: %+v", err)
} else if stat == nil {
t.Fatal("Get returned nil stat")
} else if len(data) < 4 {
t.Fatal("Get returned wrong size data")
}
if zk.Server() == currentServer {
t.Errorf("Still connected to %q after restart.", currentServer)
}
}
// TestDNSHostProviderRetryStart tests the `retryStart` functionality
// of DNSHostProvider.
// It's also probably the clearest visual explanation of exactly how
// it works.
func TestDNSHostProviderRetryStart(t *testing.T) {
t.Parallel()
hp := &DNSHostProvider{lookupHost: func(host string) ([]string, error) {
return []string{"192.0.2.1", "192.0.2.2", "192.0.2.3"}, nil
}}
if err := hp.Init([]string{"foo.example.com:12345"}); err != nil {
t.Fatal(err)
}
testdata := []struct {
retryStartWant bool
callConnected bool
}{
// Repeated failures.
{false, false},
{false, false},
{false, false},
{true, false},
{false, false},
{false, false},
{true, true},
// One success offsets things.
{false, false},
{false, true},
{false, true},
// Repeated successes.
{false, true},
{false, true},
{false, true},
{false, true},
{false, true},
// And some more failures.
{false, false},
{false, false},
{true, false}, // Looped back to last known good server: all alternates failed.
{false, false},
}
for i, td := range testdata {
_, retryStartGot := hp.Next()
if retryStartGot != td.retryStartWant {
t.Errorf("%d: retryStart=%v; want %v", i, retryStartGot, td.retryStartWant)
}
if td.callConnected {
hp.Connected()
}
}
}

330
vendor/github.com/samuel/go-zookeeper/zk/flw_test.go generated vendored Normal file
View file

@ -0,0 +1,330 @@
package zk
import (
"net"
"testing"
"time"
)
var (
zkSrvrOut = `Zookeeper version: 3.4.6-1569965, built on 02/20/2014 09:09 GMT
Latency min/avg/max: 0/1/10
Received: 4207
Sent: 4220
Connections: 81
Outstanding: 1
Zxid: 0x110a7a8f37
Mode: leader
Node count: 306
`
zkConsOut = ` /10.42.45.231:45361[1](queued=0,recved=9435,sent=9457,sid=0x94c2989e04716b5,lop=PING,est=1427238717217,to=20001,lcxid=0x55120915,lzxid=0xffffffffffffffff,lresp=1427259255908,llat=0,minlat=0,avglat=1,maxlat=17)
/10.55.33.98:34342[1](queued=0,recved=9338,sent=9350,sid=0x94c2989e0471731,lop=PING,est=1427238849319,to=20001,lcxid=0x55120944,lzxid=0xffffffffffffffff,lresp=1427259252294,llat=0,minlat=0,avglat=1,maxlat=18)
/10.44.145.114:46556[1](queued=0,recved=109253,sent=109617,sid=0x94c2989e0471709,lop=DELE,est=1427238791305,to=20001,lcxid=0x55139618,lzxid=0x110a7b187d,lresp=1427259257423,llat=2,minlat=0,avglat=1,maxlat=23)
`
)
func TestFLWRuok(t *testing.T) {
t.Parallel()
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
defer l.Close()
go tcpServer(l, "")
oks := FLWRuok([]string{l.Addr().String()}, time.Second*10)
if len(oks) == 0 {
t.Errorf("no values returned")
}
if !oks[0] {
t.Errorf("instance should be marked as OK")
}
//
// Confirm that it also returns false for dead instances
//
l, err = net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
defer l.Close()
go tcpServer(l, "dead")
oks = FLWRuok([]string{l.Addr().String()}, time.Second*10)
if len(oks) == 0 {
t.Errorf("no values returned")
}
if oks[0] {
t.Errorf("instance should be marked as not OK")
}
}
func TestFLWSrvr(t *testing.T) {
t.Parallel()
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
defer l.Close()
go tcpServer(l, "")
statsSlice, ok := FLWSrvr([]string{l.Addr().String()}, time.Second*10)
if !ok {
t.Errorf("failure indicated on 'srvr' parsing")
}
if len(statsSlice) == 0 {
t.Errorf("no *ServerStats instances returned")
}
stats := statsSlice[0]
if stats.Error != nil {
t.Fatalf("error seen in stats: %v", err.Error())
}
if stats.Sent != 4220 {
t.Errorf("Sent != 4220")
}
if stats.Received != 4207 {
t.Errorf("Received != 4207")
}
if stats.NodeCount != 306 {
t.Errorf("NodeCount != 306")
}
if stats.MinLatency != 0 {
t.Errorf("MinLatency != 0")
}
if stats.AvgLatency != 1 {
t.Errorf("AvgLatency != 1")
}
if stats.MaxLatency != 10 {
t.Errorf("MaxLatency != 10")
}
if stats.Connections != 81 {
t.Errorf("Connection != 81")
}
if stats.Outstanding != 1 {
t.Errorf("Outstanding != 1")
}
if stats.Epoch != 17 {
t.Errorf("Epoch != 17")
}
if stats.Counter != 175804215 {
t.Errorf("Counter != 175804215")
}
if stats.Mode != ModeLeader {
t.Errorf("Mode != ModeLeader")
}
if stats.Version != "3.4.6-1569965" {
t.Errorf("Version expected: 3.4.6-1569965")
}
}
func TestFLWCons(t *testing.T) {
t.Parallel()
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
defer l.Close()
go tcpServer(l, "")
clients, ok := FLWCons([]string{l.Addr().String()}, time.Second*10)
if !ok {
t.Errorf("failure indicated on 'cons' parsing")
}
if len(clients) == 0 {
t.Errorf("no *ServerClients instances returned")
}
results := []*ServerClient{
{
Queued: 0,
Received: 9435,
Sent: 9457,
SessionID: 669956116721374901,
LastOperation: "PING",
Established: time.Unix(1427238717217, 0),
Timeout: 20001,
Lcxid: 1427245333,
Lzxid: -1,
LastResponse: time.Unix(1427259255908, 0),
LastLatency: 0,
MinLatency: 0,
AvgLatency: 1,
MaxLatency: 17,
Addr: "10.42.45.231:45361",
},
{
Queued: 0,
Received: 9338,
Sent: 9350,
SessionID: 669956116721375025,
LastOperation: "PING",
Established: time.Unix(1427238849319, 0),
Timeout: 20001,
Lcxid: 1427245380,
Lzxid: -1,
LastResponse: time.Unix(1427259252294, 0),
LastLatency: 0,
MinLatency: 0,
AvgLatency: 1,
MaxLatency: 18,
Addr: "10.55.33.98:34342",
},
{
Queued: 0,
Received: 109253,
Sent: 109617,
SessionID: 669956116721374985,
LastOperation: "DELE",
Established: time.Unix(1427238791305, 0),
Timeout: 20001,
Lcxid: 1427346968,
Lzxid: 73190283389,
LastResponse: time.Unix(1427259257423, 0),
LastLatency: 2,
MinLatency: 0,
AvgLatency: 1,
MaxLatency: 23,
Addr: "10.44.145.114:46556",
},
}
for _, z := range clients {
if z.Error != nil {
t.Errorf("error seen: %v", err.Error())
}
for i, v := range z.Clients {
c := results[i]
if v.Error != nil {
t.Errorf("client error seen: %v", err.Error())
}
if v.Queued != c.Queued {
t.Errorf("Queued value mismatch (%d/%d)", v.Queued, c.Queued)
}
if v.Received != c.Received {
t.Errorf("Received value mismatch (%d/%d)", v.Received, c.Received)
}
if v.Sent != c.Sent {
t.Errorf("Sent value mismatch (%d/%d)", v.Sent, c.Sent)
}
if v.SessionID != c.SessionID {
t.Errorf("SessionID value mismatch (%d/%d)", v.SessionID, c.SessionID)
}
if v.LastOperation != c.LastOperation {
t.Errorf("LastOperation value mismatch ('%v'/'%v')", v.LastOperation, c.LastOperation)
}
if v.Timeout != c.Timeout {
t.Errorf("Timeout value mismatch (%d/%d)", v.Timeout, c.Timeout)
}
if v.Lcxid != c.Lcxid {
t.Errorf("Lcxid value mismatch (%d/%d)", v.Lcxid, c.Lcxid)
}
if v.Lzxid != c.Lzxid {
t.Errorf("Lzxid value mismatch (%d/%d)", v.Lzxid, c.Lzxid)
}
if v.LastLatency != c.LastLatency {
t.Errorf("LastLatency value mismatch (%d/%d)", v.LastLatency, c.LastLatency)
}
if v.MinLatency != c.MinLatency {
t.Errorf("MinLatency value mismatch (%d/%d)", v.MinLatency, c.MinLatency)
}
if v.AvgLatency != c.AvgLatency {
t.Errorf("AvgLatency value mismatch (%d/%d)", v.AvgLatency, c.AvgLatency)
}
if v.MaxLatency != c.MaxLatency {
t.Errorf("MaxLatency value mismatch (%d/%d)", v.MaxLatency, c.MaxLatency)
}
if v.Addr != c.Addr {
t.Errorf("Addr value mismatch ('%v'/'%v')", v.Addr, c.Addr)
}
if !c.Established.Equal(v.Established) {
t.Errorf("Established value mismatch (%v/%v)", c.Established, v.Established)
}
if !c.LastResponse.Equal(v.LastResponse) {
t.Errorf("Established value mismatch (%v/%v)", c.LastResponse, v.LastResponse)
}
}
}
}
func tcpServer(listener net.Listener, thing string) {
for {
conn, err := listener.Accept()
if err != nil {
return
}
go connHandler(conn, thing)
}
}
func connHandler(conn net.Conn, thing string) {
defer conn.Close()
data := make([]byte, 4)
_, err := conn.Read(data)
if err != nil {
return
}
switch string(data) {
case "ruok":
switch thing {
case "dead":
return
default:
conn.Write([]byte("imok"))
}
case "srvr":
switch thing {
case "dead":
return
default:
conn.Write([]byte(zkSrvrOut))
}
case "cons":
switch thing {
case "dead":
return
default:
conn.Write([]byte(zkConsOut))
}
default:
conn.Write([]byte("This ZooKeeper instance is not currently serving requests."))
}
}

94
vendor/github.com/samuel/go-zookeeper/zk/lock_test.go generated vendored Normal file
View file

@ -0,0 +1,94 @@
package zk
import (
"testing"
"time"
)
func TestLock(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()
acls := WorldACL(PermAll)
l := NewLock(zk, "/test", acls)
if err := l.Lock(); err != nil {
t.Fatal(err)
}
if err := l.Unlock(); err != nil {
t.Fatal(err)
}
val := make(chan int, 3)
if err := l.Lock(); err != nil {
t.Fatal(err)
}
l2 := NewLock(zk, "/test", acls)
go func() {
if err := l2.Lock(); err != nil {
t.Fatal(err)
}
val <- 2
if err := l2.Unlock(); err != nil {
t.Fatal(err)
}
val <- 3
}()
time.Sleep(time.Millisecond * 100)
val <- 1
if err := l.Unlock(); err != nil {
t.Fatal(err)
}
if x := <-val; x != 1 {
t.Fatalf("Expected 1 instead of %d", x)
}
if x := <-val; x != 2 {
t.Fatalf("Expected 2 instead of %d", x)
}
if x := <-val; x != 3 {
t.Fatalf("Expected 3 instead of %d", x)
}
}
// This tests creating a lock with a path that's more than 1 node deep (e.g. "/test-multi-level/lock"),
// when a part of that path already exists (i.e. "/test-multi-level" node already exists).
func TestMultiLevelLock(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()
acls := WorldACL(PermAll)
path := "/test-multi-level"
if p, err := zk.Create(path, []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil {
t.Fatalf("Create returned error: %+v", err)
} else if p != path {
t.Fatalf("Create returned different path '%s' != '%s'", p, path)
}
l := NewLock(zk, "/test-multi-level/lock", acls)
defer zk.Delete("/test-multi-level", -1) // Clean up what we've created for this test
defer zk.Delete("/test-multi-level/lock", -1)
if err := l.Lock(); err != nil {
t.Fatal(err)
}
if err := l.Unlock(); err != nil {
t.Fatal(err)
}
}

View file

@ -0,0 +1,83 @@
package zk
import (
"reflect"
"testing"
)
func TestEncodeDecodePacket(t *testing.T) {
t.Parallel()
encodeDecodeTest(t, &requestHeader{-2, 5})
encodeDecodeTest(t, &connectResponse{1, 2, 3, nil})
encodeDecodeTest(t, &connectResponse{1, 2, 3, []byte{4, 5, 6}})
encodeDecodeTest(t, &getAclResponse{[]ACL{{12, "s", "anyone"}}, Stat{}})
encodeDecodeTest(t, &getChildrenResponse{[]string{"foo", "bar"}})
encodeDecodeTest(t, &pathWatchRequest{"path", true})
encodeDecodeTest(t, &pathWatchRequest{"path", false})
encodeDecodeTest(t, &CheckVersionRequest{"/", -1})
encodeDecodeTest(t, &multiRequest{Ops: []multiRequestOp{{multiHeader{opCheck, false, -1}, &CheckVersionRequest{"/", -1}}}})
}
func TestRequestStructForOp(t *testing.T) {
for op, name := range opNames {
if op != opNotify && op != opWatcherEvent {
if s := requestStructForOp(op); s == nil {
t.Errorf("No struct for op %s", name)
}
}
}
}
func encodeDecodeTest(t *testing.T, r interface{}) {
buf := make([]byte, 1024)
n, err := encodePacket(buf, r)
if err != nil {
t.Errorf("encodePacket returned non-nil error %+v\n", err)
return
}
t.Logf("%+v %x", r, buf[:n])
r2 := reflect.New(reflect.ValueOf(r).Elem().Type()).Interface()
n2, err := decodePacket(buf[:n], r2)
if err != nil {
t.Errorf("decodePacket returned non-nil error %+v\n", err)
return
}
if n != n2 {
t.Errorf("sizes don't match: %d != %d", n, n2)
return
}
if !reflect.DeepEqual(r, r2) {
t.Errorf("results don't match: %+v != %+v", r, r2)
return
}
}
func TestEncodeShortBuffer(t *testing.T) {
t.Parallel()
_, err := encodePacket([]byte{}, &requestHeader{1, 2})
if err != ErrShortBuffer {
t.Errorf("encodePacket should return ErrShortBuffer on a short buffer instead of '%+v'", err)
return
}
}
func TestDecodeShortBuffer(t *testing.T) {
t.Parallel()
_, err := decodePacket([]byte{}, &responseHeader{})
if err != ErrShortBuffer {
t.Errorf("decodePacket should return ErrShortBuffer on a short buffer instead of '%+v'", err)
return
}
}
func BenchmarkEncode(b *testing.B) {
buf := make([]byte, 4096)
st := &connectRequest{Passwd: []byte("1234567890")}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
if _, err := encodePacket(buf, st); err != nil {
b.Fatal(err)
}
}
}

View file

@ -0,0 +1,136 @@
/*
Copyright 2012 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Vendored from go4.org/net/throttle
package zk
import (
"fmt"
"net"
"sync"
"time"
)
const unitSize = 1400 // read/write chunk size. ~MTU size.
type Rate struct {
KBps int // or 0, to not rate-limit bandwidth
Latency time.Duration
}
// byteTime returns the time required for n bytes.
func (r Rate) byteTime(n int) time.Duration {
if r.KBps == 0 {
return 0
}
return time.Duration(float64(n)/1024/float64(r.KBps)) * time.Second
}
type Listener struct {
net.Listener
Down Rate // server Writes to Client
Up Rate // server Reads from client
}
func (ln *Listener) Accept() (net.Conn, error) {
c, err := ln.Listener.Accept()
time.Sleep(ln.Up.Latency)
if err != nil {
return nil, err
}
tc := &conn{Conn: c, Down: ln.Down, Up: ln.Up}
tc.start()
return tc, nil
}
type nErr struct {
n int
err error
}
type writeReq struct {
writeAt time.Time
p []byte
resc chan nErr
}
type conn struct {
net.Conn
Down Rate // for reads
Up Rate // for writes
wchan chan writeReq
closeOnce sync.Once
closeErr error
}
func (c *conn) start() {
c.wchan = make(chan writeReq, 1024)
go c.writeLoop()
}
func (c *conn) writeLoop() {
for req := range c.wchan {
time.Sleep(req.writeAt.Sub(time.Now()))
var res nErr
for len(req.p) > 0 && res.err == nil {
writep := req.p
if len(writep) > unitSize {
writep = writep[:unitSize]
}
n, err := c.Conn.Write(writep)
time.Sleep(c.Up.byteTime(len(writep)))
res.n += n
res.err = err
req.p = req.p[n:]
}
req.resc <- res
}
}
func (c *conn) Close() error {
c.closeOnce.Do(func() {
err := c.Conn.Close()
close(c.wchan)
c.closeErr = err
})
return c.closeErr
}
func (c *conn) Write(p []byte) (n int, err error) {
defer func() {
if e := recover(); e != nil {
n = 0
err = fmt.Errorf("%v", err)
return
}
}()
resc := make(chan nErr, 1)
c.wchan <- writeReq{time.Now().Add(c.Up.Latency), p, resc}
res := <-resc
return res.n, res.err
}
func (c *conn) Read(p []byte) (n int, err error) {
const max = 1024
if len(p) > max {
p = p[:max]
}
n, err = c.Conn.Read(p)
time.Sleep(c.Down.byteTime(n))
return
}

View file

@ -7,6 +7,7 @@ import (
"math/rand"
"strconv"
"strings"
"unicode/utf8"
)
// AuthACL produces an ACL list containing a single ACL which uses the
@ -52,3 +53,64 @@ func stringShuffle(s []string) {
s[i], s[j] = s[j], s[i]
}
}
// validatePath will make sure a path is valid before sending the request
func validatePath(path string, isSequential bool) error {
if path == "" {
return ErrInvalidPath
}
if path[0] != '/' {
return ErrInvalidPath
}
n := len(path)
if n == 1 {
// path is just the root
return nil
}
if !isSequential && path[n-1] == '/' {
return ErrInvalidPath
}
// Start at rune 1 since we already know that the first character is
// a '/'.
for i, w := 1, 0; i < n; i += w {
r, width := utf8.DecodeRuneInString(path[i:])
switch {
case r == '\u0000':
return ErrInvalidPath
case r == '/':
last, _ := utf8.DecodeLastRuneInString(path[:i])
if last == '/' {
return ErrInvalidPath
}
case r == '.':
last, lastWidth := utf8.DecodeLastRuneInString(path[:i])
// Check for double dot
if last == '.' {
last, _ = utf8.DecodeLastRuneInString(path[:i-lastWidth])
}
if last == '/' {
if i+1 == n {
return ErrInvalidPath
}
next, _ := utf8.DecodeRuneInString(path[i+w:])
if next == '/' {
return ErrInvalidPath
}
}
case r >= '\u0000' && r <= '\u001f',
r >= '\u007f' && r <= '\u009f',
r >= '\uf000' && r <= '\uf8ff',
r >= '\ufff0' && r < '\uffff':
return ErrInvalidPath
}
w = width
}
return nil
}

53
vendor/github.com/samuel/go-zookeeper/zk/util_test.go generated vendored Normal file
View file

@ -0,0 +1,53 @@
package zk
import "testing"
func TestFormatServers(t *testing.T) {
t.Parallel()
servers := []string{"127.0.0.1:2181", "127.0.0.42", "127.0.42.1:8811"}
r := []string{"127.0.0.1:2181", "127.0.0.42:2181", "127.0.42.1:8811"}
for i, s := range FormatServers(servers) {
if s != r[i] {
t.Errorf("%v should equal %v", s, r[i])
}
}
}
func TestValidatePath(t *testing.T) {
tt := []struct {
path string
seq bool
valid bool
}{
{"/this is / a valid/path", false, true},
{"/", false, true},
{"", false, false},
{"not/valid", false, false},
{"/ends/with/slash/", false, false},
{"/sequential/", true, true},
{"/test\u0000", false, false},
{"/double//slash", false, false},
{"/single/./period", false, false},
{"/double/../period", false, false},
{"/double/..ok/period", false, true},
{"/double/alsook../period", false, true},
{"/double/period/at/end/..", false, false},
{"/name/with.period", false, true},
{"/test\u0001", false, false},
{"/test\u001f", false, false},
{"/test\u0020", false, true}, // first allowable
{"/test\u007e", false, true}, // last valid ascii
{"/test\u007f", false, false},
{"/test\u009f", false, false},
{"/test\uf8ff", false, false},
{"/test\uffef", false, true},
{"/test\ufff0", false, false},
}
for _, tc := range tt {
err := validatePath(tc.path, tc.seq)
if (err != nil) == tc.valid {
t.Errorf("failed to validate path %q", tc.path)
}
}
}

939
vendor/github.com/samuel/go-zookeeper/zk/zk_test.go generated vendored Normal file
View file

@ -0,0 +1,939 @@
package zk
import (
"crypto/rand"
"encoding/hex"
"fmt"
"io"
"net"
"reflect"
"regexp"
"sort"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
)
func TestStateChanges(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
callbackChan := make(chan Event)
f := func(event Event) {
callbackChan <- event
}
zk, eventChan, err := ts.ConnectWithOptions(15*time.Second, WithEventCallback(f))
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
verifyEventOrder := func(c <-chan Event, expectedStates []State, source string) {
for _, state := range expectedStates {
for {
event, ok := <-c
if !ok {
t.Fatalf("unexpected channel close for %s", source)
}
if event.Type != EventSession {
continue
}
if event.State != state {
t.Fatalf("mismatched state order from %s, expected %v, received %v", source, state, event.State)
}
break
}
}
}
states := []State{StateConnecting, StateConnected, StateHasSession}
verifyEventOrder(callbackChan, states, "callback")
verifyEventOrder(eventChan, states, "event channel")
zk.Close()
verifyEventOrder(callbackChan, []State{StateDisconnected}, "callback")
verifyEventOrder(eventChan, []State{StateDisconnected}, "event channel")
}
func TestCreate(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()
path := "/gozk-test"
if err := zk.Delete(path, -1); err != nil && err != ErrNoNode {
t.Fatalf("Delete returned error: %+v", err)
}
if p, err := zk.Create(path, []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil {
t.Fatalf("Create returned error: %+v", err)
} else if p != path {
t.Fatalf("Create returned different path '%s' != '%s'", p, path)
}
if data, stat, err := zk.Get(path); err != nil {
t.Fatalf("Get returned error: %+v", err)
} else if stat == nil {
t.Fatal("Get returned nil stat")
} else if len(data) < 4 {
t.Fatal("Get returned wrong size data")
}
}
func TestMulti(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()
path := "/gozk-test"
if err := zk.Delete(path, -1); err != nil && err != ErrNoNode {
t.Fatalf("Delete returned error: %+v", err)
}
ops := []interface{}{
&CreateRequest{Path: path, Data: []byte{1, 2, 3, 4}, Acl: WorldACL(PermAll)},
&SetDataRequest{Path: path, Data: []byte{1, 2, 3, 4}, Version: -1},
}
if res, err := zk.Multi(ops...); err != nil {
t.Fatalf("Multi returned error: %+v", err)
} else if len(res) != 2 {
t.Fatalf("Expected 2 responses got %d", len(res))
} else {
t.Logf("%+v", res)
}
if data, stat, err := zk.Get(path); err != nil {
t.Fatalf("Get returned error: %+v", err)
} else if stat == nil {
t.Fatal("Get returned nil stat")
} else if len(data) < 4 {
t.Fatal("Get returned wrong size data")
}
}
func TestIfAuthdataSurvivesReconnect(t *testing.T) {
// This test case ensures authentication data is being resubmited after
// reconnect.
testNode := "/auth-testnode"
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
zk, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()
acl := DigestACL(PermAll, "userfoo", "passbar")
_, err = zk.Create(testNode, []byte("Some very secret content"), 0, acl)
if err != nil && err != ErrNodeExists {
t.Fatalf("Failed to create test node : %+v", err)
}
_, _, err = zk.Get(testNode)
if err == nil || err != ErrNoAuth {
var msg string
if err == nil {
msg = "Fetching data without auth should have resulted in an error"
} else {
msg = fmt.Sprintf("Expecting ErrNoAuth, got `%+v` instead", err)
}
t.Fatalf(msg)
}
zk.AddAuth("digest", []byte("userfoo:passbar"))
_, _, err = zk.Get(testNode)
if err != nil {
t.Fatalf("Fetching data with auth failed: %+v", err)
}
ts.StopAllServers()
ts.StartAllServers()
_, _, err = zk.Get(testNode)
if err != nil {
t.Fatalf("Fetching data after reconnect failed: %+v", err)
}
}
func TestMultiFailures(t *testing.T) {
// This test case ensures that we return the errors associated with each
// opeThis in the event a call to Multi() fails.
const firstPath = "/gozk-test-first"
const secondPath = "/gozk-test-second"
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()
// Ensure firstPath doesn't exist and secondPath does. This will cause the
// 2nd operation in the Multi() to fail.
if err := zk.Delete(firstPath, -1); err != nil && err != ErrNoNode {
t.Fatalf("Delete returned error: %+v", err)
}
if _, err := zk.Create(secondPath, nil /* data */, 0, WorldACL(PermAll)); err != nil {
t.Fatalf("Create returned error: %+v", err)
}
ops := []interface{}{
&CreateRequest{Path: firstPath, Data: []byte{1, 2}, Acl: WorldACL(PermAll)},
&CreateRequest{Path: secondPath, Data: []byte{3, 4}, Acl: WorldACL(PermAll)},
}
res, err := zk.Multi(ops...)
if err != ErrNodeExists {
t.Fatalf("Multi() didn't return correct error: %+v", err)
}
if len(res) != 2 {
t.Fatalf("Expected 2 responses received %d", len(res))
}
if res[0].Error != nil {
t.Fatalf("First operation returned an unexpected error %+v", res[0].Error)
}
if res[1].Error != ErrNodeExists {
t.Fatalf("Second operation returned incorrect error %+v", res[1].Error)
}
if _, _, err := zk.Get(firstPath); err != ErrNoNode {
t.Fatalf("Node %s was incorrectly created: %+v", firstPath, err)
}
}
func TestGetSetACL(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()
if err := zk.AddAuth("digest", []byte("blah")); err != nil {
t.Fatalf("AddAuth returned error %+v", err)
}
path := "/gozk-test"
if err := zk.Delete(path, -1); err != nil && err != ErrNoNode {
t.Fatalf("Delete returned error: %+v", err)
}
if path, err := zk.Create(path, []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil {
t.Fatalf("Create returned error: %+v", err)
} else if path != "/gozk-test" {
t.Fatalf("Create returned different path '%s' != '/gozk-test'", path)
}
expected := WorldACL(PermAll)
if acl, stat, err := zk.GetACL(path); err != nil {
t.Fatalf("GetACL returned error %+v", err)
} else if stat == nil {
t.Fatalf("GetACL returned nil Stat")
} else if len(acl) != 1 || expected[0] != acl[0] {
t.Fatalf("GetACL mismatch expected %+v instead of %+v", expected, acl)
}
expected = []ACL{{PermAll, "ip", "127.0.0.1"}}
if stat, err := zk.SetACL(path, expected, -1); err != nil {
t.Fatalf("SetACL returned error %+v", err)
} else if stat == nil {
t.Fatalf("SetACL returned nil Stat")
}
if acl, stat, err := zk.GetACL(path); err != nil {
t.Fatalf("GetACL returned error %+v", err)
} else if stat == nil {
t.Fatalf("GetACL returned nil Stat")
} else if len(acl) != 1 || expected[0] != acl[0] {
t.Fatalf("GetACL mismatch expected %+v instead of %+v", expected, acl)
}
}
func TestAuth(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()
path := "/gozk-digest-test"
if err := zk.Delete(path, -1); err != nil && err != ErrNoNode {
t.Fatalf("Delete returned error: %+v", err)
}
acl := DigestACL(PermAll, "user", "password")
if p, err := zk.Create(path, []byte{1, 2, 3, 4}, 0, acl); err != nil {
t.Fatalf("Create returned error: %+v", err)
} else if p != path {
t.Fatalf("Create returned different path '%s' != '%s'", p, path)
}
if a, stat, err := zk.GetACL(path); err != nil {
t.Fatalf("GetACL returned error %+v", err)
} else if stat == nil {
t.Fatalf("GetACL returned nil Stat")
} else if len(a) != 1 || acl[0] != a[0] {
t.Fatalf("GetACL mismatch expected %+v instead of %+v", acl, a)
}
if _, _, err := zk.Get(path); err != ErrNoAuth {
t.Fatalf("Get returned error %+v instead of ErrNoAuth", err)
}
if err := zk.AddAuth("digest", []byte("user:password")); err != nil {
t.Fatalf("AddAuth returned error %+v", err)
}
if data, stat, err := zk.Get(path); err != nil {
t.Fatalf("Get returned error %+v", err)
} else if stat == nil {
t.Fatalf("Get returned nil Stat")
} else if len(data) != 4 {
t.Fatalf("Get returned wrong data length")
}
}
func TestChildren(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()
deleteNode := func(node string) {
if err := zk.Delete(node, -1); err != nil && err != ErrNoNode {
t.Fatalf("Delete returned error: %+v", err)
}
}
deleteNode("/gozk-test-big")
if path, err := zk.Create("/gozk-test-big", []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil {
t.Fatalf("Create returned error: %+v", err)
} else if path != "/gozk-test-big" {
t.Fatalf("Create returned different path '%s' != '/gozk-test-big'", path)
}
rb := make([]byte, 1000)
hb := make([]byte, 2000)
prefix := []byte("/gozk-test-big/")
for i := 0; i < 10000; i++ {
_, err := rand.Read(rb)
if err != nil {
t.Fatal("Cannot create random znode name")
}
hex.Encode(hb, rb)
expect := string(append(prefix, hb...))
if path, err := zk.Create(expect, []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil {
t.Fatalf("Create returned error: %+v", err)
} else if path != expect {
t.Fatalf("Create returned different path '%s' != '%s'", path, expect)
}
defer deleteNode(string(expect))
}
children, _, err := zk.Children("/gozk-test-big")
if err != nil {
t.Fatalf("Children returned error: %+v", err)
} else if len(children) != 10000 {
t.Fatal("Children returned wrong number of nodes")
}
}
func TestChildWatch(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()
if err := zk.Delete("/gozk-test", -1); err != nil && err != ErrNoNode {
t.Fatalf("Delete returned error: %+v", err)
}
children, stat, childCh, err := zk.ChildrenW("/")
if err != nil {
t.Fatalf("Children returned error: %+v", err)
} else if stat == nil {
t.Fatal("Children returned nil stat")
} else if len(children) < 1 {
t.Fatal("Children should return at least 1 child")
}
if path, err := zk.Create("/gozk-test", []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil {
t.Fatalf("Create returned error: %+v", err)
} else if path != "/gozk-test" {
t.Fatalf("Create returned different path '%s' != '/gozk-test'", path)
}
select {
case ev := <-childCh:
if ev.Err != nil {
t.Fatalf("Child watcher error %+v", ev.Err)
}
if ev.Path != "/" {
t.Fatalf("Child watcher wrong path %s instead of %s", ev.Path, "/")
}
case _ = <-time.After(time.Second * 2):
t.Fatal("Child watcher timed out")
}
// Delete of the watched node should trigger the watch
children, stat, childCh, err = zk.ChildrenW("/gozk-test")
if err != nil {
t.Fatalf("Children returned error: %+v", err)
} else if stat == nil {
t.Fatal("Children returned nil stat")
} else if len(children) != 0 {
t.Fatal("Children should return 0 children")
}
if err := zk.Delete("/gozk-test", -1); err != nil && err != ErrNoNode {
t.Fatalf("Delete returned error: %+v", err)
}
select {
case ev := <-childCh:
if ev.Err != nil {
t.Fatalf("Child watcher error %+v", ev.Err)
}
if ev.Path != "/gozk-test" {
t.Fatalf("Child watcher wrong path %s instead of %s", ev.Path, "/")
}
case _ = <-time.After(time.Second * 2):
t.Fatal("Child watcher timed out")
}
}
func TestSetWatchers(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()
zk.reconnectLatch = make(chan struct{})
zk.setWatchLimit = 1024 // break up set-watch step into 1k requests
var setWatchReqs atomic.Value
zk.setWatchCallback = func(reqs []*setWatchesRequest) {
setWatchReqs.Store(reqs)
}
zk2, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk2.Close()
if err := zk.Delete("/gozk-test", -1); err != nil && err != ErrNoNode {
t.Fatalf("Delete returned error: %+v", err)
}
testPaths := map[string]<-chan Event{}
defer func() {
// clean up all of the test paths we create
for p := range testPaths {
zk2.Delete(p, -1)
}
}()
// we create lots of paths to watch, to make sure a "set watches" request
// on re-create will be too big and be required to span multiple packets
for i := 0; i < 1000; i++ {
testPath, err := zk.Create(fmt.Sprintf("/gozk-test-%d", i), []byte{}, 0, WorldACL(PermAll))
if err != nil {
t.Fatalf("Create returned: %+v", err)
}
testPaths[testPath] = nil
_, _, testEvCh, err := zk.GetW(testPath)
if err != nil {
t.Fatalf("GetW returned: %+v", err)
}
testPaths[testPath] = testEvCh
}
children, stat, childCh, err := zk.ChildrenW("/")
if err != nil {
t.Fatalf("Children returned error: %+v", err)
} else if stat == nil {
t.Fatal("Children returned nil stat")
} else if len(children) < 1 {
t.Fatal("Children should return at least 1 child")
}
// Simulate network error by brutally closing the network connection.
zk.conn.Close()
for p := range testPaths {
if err := zk2.Delete(p, -1); err != nil && err != ErrNoNode {
t.Fatalf("Delete returned error: %+v", err)
}
}
if path, err := zk2.Create("/gozk-test", []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil {
t.Fatalf("Create returned error: %+v", err)
} else if path != "/gozk-test" {
t.Fatalf("Create returned different path '%s' != '/gozk-test'", path)
}
time.Sleep(100 * time.Millisecond)
// zk should still be waiting to reconnect, so none of the watches should have been triggered
for p, ch := range testPaths {
select {
case <-ch:
t.Fatalf("GetW watcher for %q should not have triggered yet", p)
default:
}
}
select {
case <-childCh:
t.Fatalf("ChildrenW watcher should not have triggered yet")
default:
}
// now we let the reconnect occur and make sure it resets watches
close(zk.reconnectLatch)
for p, ch := range testPaths {
select {
case ev := <-ch:
if ev.Err != nil {
t.Fatalf("GetW watcher error %+v", ev.Err)
}
if ev.Path != p {
t.Fatalf("GetW watcher wrong path %s instead of %s", ev.Path, p)
}
case <-time.After(2 * time.Second):
t.Fatal("GetW watcher timed out")
}
}
select {
case ev := <-childCh:
if ev.Err != nil {
t.Fatalf("Child watcher error %+v", ev.Err)
}
if ev.Path != "/" {
t.Fatalf("Child watcher wrong path %s instead of %s", ev.Path, "/")
}
case <-time.After(2 * time.Second):
t.Fatal("Child watcher timed out")
}
// Yay! All watches fired correctly. Now we also inspect the actual set-watch request objects
// to ensure they didn't exceed the expected packet set.
buf := make([]byte, bufferSize)
totalWatches := 0
actualReqs := setWatchReqs.Load().([]*setWatchesRequest)
if len(actualReqs) < 12 {
// sanity check: we should have generated *at least* 12 requests to reset watches
t.Fatalf("too few setWatchesRequest messages: %d", len(actualReqs))
}
for _, r := range actualReqs {
totalWatches += len(r.ChildWatches) + len(r.DataWatches) + len(r.ExistWatches)
n, err := encodePacket(buf, r)
if err != nil {
t.Fatalf("encodePacket failed: %v! request:\n%+v", err, r)
} else if n > 1024 {
t.Fatalf("setWatchesRequest exceeded allowed size (%d > 1024)! request:\n%+v", n, r)
}
}
if totalWatches != len(testPaths)+1 {
t.Fatalf("setWatchesRequests did not include all expected watches; expecting %d, got %d", len(testPaths)+1, totalWatches)
}
}
func TestExpiringWatch(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()
if err := zk.Delete("/gozk-test", -1); err != nil && err != ErrNoNode {
t.Fatalf("Delete returned error: %+v", err)
}
children, stat, childCh, err := zk.ChildrenW("/")
if err != nil {
t.Fatalf("Children returned error: %+v", err)
} else if stat == nil {
t.Fatal("Children returned nil stat")
} else if len(children) < 1 {
t.Fatal("Children should return at least 1 child")
}
zk.sessionID = 99999
zk.conn.Close()
select {
case ev := <-childCh:
if ev.Err != ErrSessionExpired {
t.Fatalf("Child watcher error %+v instead of expected ErrSessionExpired", ev.Err)
}
if ev.Path != "/" {
t.Fatalf("Child watcher wrong path %s instead of %s", ev.Path, "/")
}
case <-time.After(2 * time.Second):
t.Fatal("Child watcher timed out")
}
}
func TestRequestFail(t *testing.T) {
// If connecting fails to all servers in the list then pending requests
// should be errored out so they don't hang forever.
zk, _, err := Connect([]string{"127.0.0.1:32444"}, time.Second*15)
if err != nil {
t.Fatal(err)
}
defer zk.Close()
ch := make(chan error)
go func() {
_, _, err := zk.Get("/blah")
ch <- err
}()
select {
case err := <-ch:
if err == nil {
t.Fatal("Expected non-nil error on failed request due to connection failure")
}
case <-time.After(time.Second * 2):
t.Fatal("Get hung when connection could not be made")
}
}
func TestSlowServer(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
realAddr := fmt.Sprintf("127.0.0.1:%d", ts.Servers[0].Port)
proxyAddr, stopCh, err := startSlowProxy(t,
Rate{}, Rate{},
realAddr, func(ln *Listener) {
if ln.Up.Latency == 0 {
ln.Up.Latency = time.Millisecond * 2000
ln.Down.Latency = time.Millisecond * 2000
} else {
ln.Up.Latency = 0
ln.Down.Latency = 0
}
})
if err != nil {
t.Fatal(err)
}
defer close(stopCh)
zk, _, err := Connect([]string{proxyAddr}, time.Millisecond*500)
if err != nil {
t.Fatal(err)
}
defer zk.Close()
_, _, wch, err := zk.ChildrenW("/")
if err != nil {
t.Fatal(err)
}
// Force a reconnect to get a throttled connection
zk.conn.Close()
time.Sleep(time.Millisecond * 100)
if err := zk.Delete("/gozk-test", -1); err == nil {
t.Fatal("Delete should have failed")
}
// The previous request should have timed out causing the server to be disconnected and reconnected
if _, err := zk.Create("/gozk-test", []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil {
t.Fatal(err)
}
// Make sure event is still returned because the session should not have been affected
select {
case ev := <-wch:
t.Logf("Received event: %+v", ev)
case <-time.After(time.Second):
t.Fatal("Expected to receive a watch event")
}
}
func startSlowProxy(t *testing.T, up, down Rate, upstream string, adj func(ln *Listener)) (string, chan bool, error) {
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return "", nil, err
}
tln := &Listener{
Listener: ln,
Up: up,
Down: down,
}
stopCh := make(chan bool)
go func() {
<-stopCh
tln.Close()
}()
go func() {
for {
cn, err := tln.Accept()
if err != nil {
if !strings.Contains(err.Error(), "use of closed network connection") {
t.Fatalf("Accept failed: %s", err.Error())
}
return
}
if adj != nil {
adj(tln)
}
go func(cn net.Conn) {
defer cn.Close()
upcn, err := net.Dial("tcp", upstream)
if err != nil {
t.Log(err)
return
}
// This will leave hanging goroutines util stopCh is closed
// but it doesn't matter in the context of running tests.
go func() {
<-stopCh
upcn.Close()
}()
go func() {
if _, err := io.Copy(upcn, cn); err != nil {
if !strings.Contains(err.Error(), "use of closed network connection") {
// log.Printf("Upstream write failed: %s", err.Error())
}
}
}()
if _, err := io.Copy(cn, upcn); err != nil {
if !strings.Contains(err.Error(), "use of closed network connection") {
// log.Printf("Upstream read failed: %s", err.Error())
}
}
}(cn)
}
}()
return ln.Addr().String(), stopCh, nil
}
func TestMaxBufferSize(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
// no buffer size
zk, _, err := ts.ConnectWithOptions(15 * time.Second)
var l testLogger
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()
// 1k buffer size, logs to custom test logger
zkLimited, _, err := ts.ConnectWithOptions(15*time.Second, WithMaxBufferSize(1024), func(conn *Conn) {
conn.SetLogger(&l)
})
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zkLimited.Close()
// With small node with small number of children
data := []byte{101, 102, 103, 103}
_, err = zk.Create("/foo", data, 0, WorldACL(PermAll))
if err != nil {
t.Fatalf("Create returned error: %+v", err)
}
var children []string
for i := 0; i < 4; i++ {
childName, err := zk.Create("/foo/child", nil, FlagEphemeral|FlagSequence, WorldACL(PermAll))
if err != nil {
t.Fatalf("Create returned error: %+v", err)
}
children = append(children, childName[len("/foo/"):]) // strip parent prefix from name
}
sort.Strings(children)
// Limited client works fine
resultData, _, err := zkLimited.Get("/foo")
if err != nil {
t.Fatalf("Get returned error: %+v", err)
}
if !reflect.DeepEqual(resultData, data) {
t.Fatalf("Get returned unexpected data; expecting %+v, got %+v", data, resultData)
}
resultChildren, _, err := zkLimited.Children("/foo")
if err != nil {
t.Fatalf("Children returned error: %+v", err)
}
sort.Strings(resultChildren)
if !reflect.DeepEqual(resultChildren, children) {
t.Fatalf("Children returned unexpected names; expecting %+v, got %+v", children, resultChildren)
}
// With large node though...
data = make([]byte, 1024)
for i := 0; i < 1024; i++ {
data[i] = byte(i)
}
_, err = zk.Create("/bar", data, 0, WorldACL(PermAll))
if err != nil {
t.Fatalf("Create returned error: %+v", err)
}
_, _, err = zkLimited.Get("/bar")
// NB: Sadly, without actually de-serializing the too-large response packet, we can't send the
// right error to the corresponding outstanding request. So the request just sees ErrConnectionClosed
// while the log will see the actual reason the connection was closed.
expectErr(t, err, ErrConnectionClosed)
expectLogMessage(t, &l, "received packet from server with length .*, which exceeds max buffer size 1024")
// Or with large number of children...
totalLen := 0
children = nil
for totalLen < 1024 {
childName, err := zk.Create("/bar/child", nil, FlagEphemeral|FlagSequence, WorldACL(PermAll))
if err != nil {
t.Fatalf("Create returned error: %+v", err)
}
n := childName[len("/bar/"):] // strip parent prefix from name
children = append(children, n)
totalLen += len(n)
}
sort.Strings(children)
_, _, err = zkLimited.Children("/bar")
expectErr(t, err, ErrConnectionClosed)
expectLogMessage(t, &l, "received packet from server with length .*, which exceeds max buffer size 1024")
// Other client (without buffer size limit) can successfully query the node and its children, of course
resultData, _, err = zk.Get("/bar")
if err != nil {
t.Fatalf("Get returned error: %+v", err)
}
if !reflect.DeepEqual(resultData, data) {
t.Fatalf("Get returned unexpected data; expecting %+v, got %+v", data, resultData)
}
resultChildren, _, err = zk.Children("/bar")
if err != nil {
t.Fatalf("Children returned error: %+v", err)
}
sort.Strings(resultChildren)
if !reflect.DeepEqual(resultChildren, children) {
t.Fatalf("Children returned unexpected names; expecting %+v, got %+v", children, resultChildren)
}
}
func expectErr(t *testing.T, err error, expected error) {
if err == nil {
t.Fatalf("Get for node that is too large should have returned error!")
}
if err != expected {
t.Fatalf("Get returned wrong error; expecting ErrClosing, got %+v", err)
}
}
func expectLogMessage(t *testing.T, logger *testLogger, pattern string) {
re := regexp.MustCompile(pattern)
events := logger.Reset()
if len(events) == 0 {
t.Fatalf("Failed to log error; expecting message that matches pattern: %s", pattern)
}
var found []string
for _, e := range events {
if re.Match([]byte(e)) {
found = append(found, e)
}
}
if len(found) == 0 {
t.Fatalf("Failed to log error; expecting message that matches pattern: %s", pattern)
} else if len(found) > 1 {
t.Fatalf("Logged error redundantly %d times:\n%+v", len(found), found)
}
}
type testLogger struct {
mu sync.Mutex
events []string
}
func (l *testLogger) Printf(msgFormat string, args ...interface{}) {
msg := fmt.Sprintf(msgFormat, args...)
fmt.Println(msg)
l.mu.Lock()
defer l.mu.Unlock()
l.events = append(l.events, msg)
}
func (l *testLogger) Reset() []string {
l.mu.Lock()
defer l.mu.Unlock()
ret := l.events
l.events = nil
return ret
}