Unit testing for def/ module.

Added unit tests to test code in def/ module.
This commit is contained in:
Pradyumna Kaushik 2019-10-12 06:48:45 +00:00
parent e24b8a08c9
commit bac60e872a
396 changed files with 83991 additions and 13209 deletions

View file

@ -409,13 +409,11 @@ func (c *Conn) resendZkAuth(reauthReadyChan chan struct{}) {
defer close(reauthReadyChan)
if c.logInfo {
c.logger.Printf("Re-submitting `%d` credentials after reconnect",
len(c.creds))
c.logger.Printf("re-submitting `%d` credentials after reconnect", len(c.creds))
}
for _, cred := range c.creds {
if shouldCancel() {
c.logger.Printf("Cancel rer-submitting credentials")
return
}
resChan, err := c.sendRequest(
@ -428,7 +426,7 @@ func (c *Conn) resendZkAuth(reauthReadyChan chan struct{}) {
nil)
if err != nil {
c.logger.Printf("Call to sendRequest failed during credential resubmit: %s", err)
c.logger.Printf("call to sendRequest failed during credential resubmit: %s", err)
// FIXME(prozlach): lets ignore errors for now
continue
}
@ -437,14 +435,14 @@ func (c *Conn) resendZkAuth(reauthReadyChan chan struct{}) {
select {
case res = <-resChan:
case <-c.closeChan:
c.logger.Printf("Recv closed, cancel re-submitting credentials")
c.logger.Printf("recv closed, cancel re-submitting credentials")
return
case <-c.shouldQuit:
c.logger.Printf("Should quit, cancel re-submitting credentials")
c.logger.Printf("should quit, cancel re-submitting credentials")
return
}
if res.err != nil {
c.logger.Printf("Credential re-submit failed: %s", res.err)
c.logger.Printf("credential re-submit failed: %s", res.err)
// FIXME(prozlach): lets ignore errors for now
continue
}
@ -486,14 +484,14 @@ func (c *Conn) loop() {
err := c.authenticate()
switch {
case err == ErrSessionExpired:
c.logger.Printf("Authentication failed: %s", err)
c.logger.Printf("authentication failed: %s", err)
c.invalidateWatches(err)
case err != nil && c.conn != nil:
c.logger.Printf("Authentication failed: %s", err)
c.logger.Printf("authentication failed: %s", err)
c.conn.Close()
case err == nil:
if c.logInfo {
c.logger.Printf("Authenticated: id=%d, timeout=%d", c.SessionID(), c.sessionTimeoutMs)
c.logger.Printf("authenticated: id=%d, timeout=%d", c.SessionID(), c.sessionTimeoutMs)
}
c.hostProvider.Connected() // mark success
c.closeChan = make(chan struct{}) // channel to tell send loop stop
@ -508,7 +506,7 @@ func (c *Conn) loop() {
}
err := c.sendLoop()
if err != nil || c.logInfo {
c.logger.Printf("Send loop terminated: err=%v", err)
c.logger.Printf("send loop terminated: err=%v", err)
}
c.conn.Close() // causes recv loop to EOF/exit
wg.Done()
@ -523,7 +521,7 @@ func (c *Conn) loop() {
err = c.recvLoop(c.conn)
}
if err != io.EOF || c.logInfo {
c.logger.Printf("Recv loop terminated: err=%v", err)
c.logger.Printf("recv loop terminated: err=%v", err)
}
if err == nil {
panic("zk: recvLoop should never return nil error")
@ -697,20 +695,28 @@ func (c *Conn) authenticate() error {
binary.BigEndian.PutUint32(buf[:4], uint32(n))
c.conn.SetWriteDeadline(time.Now().Add(c.recvTimeout * 10))
if err := c.conn.SetWriteDeadline(time.Now().Add(c.recvTimeout * 10)); err != nil {
return err
}
_, err = c.conn.Write(buf[:n+4])
c.conn.SetWriteDeadline(time.Time{})
if err != nil {
return err
}
if err := c.conn.SetWriteDeadline(time.Time{}); err != nil {
return err
}
// Receive and decode a connect response.
c.conn.SetReadDeadline(time.Now().Add(c.recvTimeout * 10))
if err := c.conn.SetReadDeadline(time.Now().Add(c.recvTimeout * 10)); err != nil {
return err
}
_, err = io.ReadFull(c.conn, buf[:4])
c.conn.SetReadDeadline(time.Time{})
if err != nil {
return err
}
if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
return err
}
blen := int(binary.BigEndian.Uint32(buf[:4]))
if cap(buf) < blen {
@ -772,14 +778,18 @@ func (c *Conn) sendData(req *request) error {
c.requests[req.xid] = req
c.requestsLock.Unlock()
c.conn.SetWriteDeadline(time.Now().Add(c.recvTimeout))
if err := c.conn.SetWriteDeadline(time.Now().Add(c.recvTimeout)); err != nil {
return err
}
_, err = c.conn.Write(c.buf[:n+4])
c.conn.SetWriteDeadline(time.Time{})
if err != nil {
req.recvChan <- response{-1, err}
c.conn.Close()
return err
}
if err := c.conn.SetWriteDeadline(time.Time{}); err != nil {
return err
}
return nil
}
@ -802,13 +812,17 @@ func (c *Conn) sendLoop() error {
binary.BigEndian.PutUint32(c.buf[:4], uint32(n))
c.conn.SetWriteDeadline(time.Now().Add(c.recvTimeout))
if err := c.conn.SetWriteDeadline(time.Now().Add(c.recvTimeout)); err != nil {
return err
}
_, err = c.conn.Write(c.buf[:n+4])
c.conn.SetWriteDeadline(time.Time{})
if err != nil {
c.conn.Close()
return err
}
if err := c.conn.SetWriteDeadline(time.Time{}); err != nil {
return err
}
case <-c.closeChan:
return nil
}
@ -823,10 +837,12 @@ func (c *Conn) recvLoop(conn net.Conn) error {
buf := make([]byte, sz)
for {
// package length
conn.SetReadDeadline(time.Now().Add(c.recvTimeout))
if err := conn.SetReadDeadline(time.Now().Add(c.recvTimeout)); err != nil {
c.logger.Printf("failed to set connection deadline: %v", err)
}
_, err := io.ReadFull(conn, buf[:4])
if err != nil {
return err
return fmt.Errorf("failed to read from connection: %v", err)
}
blen := int(binary.BigEndian.Uint32(buf[:4]))
@ -838,10 +854,12 @@ func (c *Conn) recvLoop(conn net.Conn) error {
}
_, err = io.ReadFull(conn, buf[:blen])
conn.SetReadDeadline(time.Time{})
if err != nil {
return err
}
if err := conn.SetReadDeadline(time.Time{}); err != nil {
return err
}
res := responseHeader{}
_, err = decodePacket(buf[:16], &res)
@ -874,7 +892,7 @@ func (c *Conn) recvLoop(conn net.Conn) error {
c.watchersLock.Lock()
for _, t := range wTypes {
wpt := watchPathType{res.Path, t}
if watchers := c.watchers[wpt]; watchers != nil && len(watchers) > 0 {
if watchers, ok := c.watchers[wpt]; ok {
for _, ch := range watchers {
ch <- ev
close(ch)
@ -1220,6 +1238,38 @@ func (c *Conn) Multi(ops ...interface{}) ([]MultiResponse, error) {
return mr, err
}
// IncrementalReconfig is the zookeeper reconfiguration api that allows adding and removing servers
// by lists of members.
// Return the new configuration stats.
func (c *Conn) IncrementalReconfig(joining, leaving []string, version int64) (*Stat, error) {
// TODO: validate the shape of the member string to give early feedback.
request := &reconfigRequest{
JoiningServers: []byte(strings.Join(joining, ",")),
LeavingServers: []byte(strings.Join(leaving, ",")),
CurConfigId: version,
}
return c.internalReconfig(request)
}
// Reconfig is the non-incremental update functionality for Zookeeper where the list preovided
// is the entire new member list.
// the optional version allows for conditional reconfigurations, -1 ignores the condition.
func (c *Conn) Reconfig(members []string, version int64) (*Stat, error) {
request := &reconfigRequest{
NewMembers: []byte(strings.Join(members, ",")),
CurConfigId: version,
}
return c.internalReconfig(request)
}
func (c *Conn) internalReconfig(request *reconfigRequest) (*Stat, error) {
response := &reconfigReponse{}
_, err := c.request(opReconfig, request, response, nil)
return &response.Stat, err
}
// Server returns the current or last-connected server name.
func (c *Conn) Server() string {
c.serverMu.Lock()

View file

@ -2,6 +2,7 @@ package zk
import (
"errors"
"fmt"
)
const (
@ -25,6 +26,7 @@ const (
opGetChildren2 = 12
opCheck = 13
opMulti = 14
opReconfig = 16
opClose = -11
opSetAuth = 100
opSetWatches = 101
@ -92,7 +94,7 @@ func (s State) String() string {
if name := stateNames[s]; name != "" {
return name
}
return "Unknown"
return "unknown state"
}
type ErrCode int32
@ -113,8 +115,10 @@ var (
ErrClosing = errors.New("zk: zookeeper is closing")
ErrNothing = errors.New("zk: no server responsees to process")
ErrSessionMoved = errors.New("zk: session moved to another server, so operation is ignored")
ErrReconfigDisabled = errors.New("attempts to perform a reconfiguration operation when reconfiguration feature is disabled")
ErrBadArguments = errors.New("invalid arguments")
// ErrInvalidCallback = errors.New("zk: invalid callback specified")
errCodeToError = map[ErrCode]error{
0: nil,
errAPIError: ErrAPIError,
@ -126,11 +130,13 @@ var (
errNotEmpty: ErrNotEmpty,
errSessionExpired: ErrSessionExpired,
// errInvalidCallback: ErrInvalidCallback,
errInvalidAcl: ErrInvalidACL,
errAuthFailed: ErrAuthFailed,
errClosing: ErrClosing,
errNothing: ErrNothing,
errSessionMoved: ErrSessionMoved,
errInvalidAcl: ErrInvalidACL,
errAuthFailed: ErrAuthFailed,
errClosing: ErrClosing,
errNothing: ErrNothing,
errSessionMoved: ErrSessionMoved,
errZReconfigDisabled: ErrReconfigDisabled,
errBadArguments: ErrBadArguments,
}
)
@ -138,7 +144,7 @@ func (e ErrCode) toError() error {
if err, ok := errCodeToError[e]; ok {
return err
}
return ErrUnknown
return fmt.Errorf("unknown error: %v", e)
}
const (
@ -168,6 +174,8 @@ const (
errClosing ErrCode = -116
errNothing ErrCode = -117
errSessionMoved ErrCode = -118
// Attempts to perform a reconfiguration operation when reconfiguration feature is disabled
errZReconfigDisabled ErrCode = -123
)
// Constants for ACL permissions
@ -197,6 +205,7 @@ var (
opGetChildren2: "getChildren2",
opCheck: "check",
opMulti: "multi",
opReconfig: "reconfig",
opClose: "close",
opSetAuth: "setAuth",
opSetWatches: "setWatches",

View file

@ -255,12 +255,16 @@ func fourLetterWord(server, command string, timeout time.Duration) ([]byte, erro
// once the command has been processed, but better safe than sorry
defer conn.Close()
conn.SetWriteDeadline(time.Now().Add(timeout))
if err := conn.SetWriteDeadline(time.Now().Add(timeout)); err != nil {
return nil, err
}
_, err = conn.Write([]byte(command))
if err != nil {
return nil, err
}
conn.SetReadDeadline(time.Now().Add(timeout))
if err := conn.SetReadDeadline(time.Now().Add(timeout)); err != nil {
return nil, err
}
return ioutil.ReadAll(conn)
}

View file

@ -1,216 +0,0 @@
package zk
import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"strings"
"time"
)
func init() {
rand.Seed(time.Now().UnixNano())
}
type TestServer struct {
Port int
Path string
Srv *Server
}
type TestCluster struct {
Path string
Servers []TestServer
}
func StartTestCluster(size int, stdout, stderr io.Writer) (*TestCluster, error) {
tmpPath, err := ioutil.TempDir("", "gozk")
if err != nil {
return nil, err
}
success := false
startPort := int(rand.Int31n(6000) + 10000)
cluster := &TestCluster{Path: tmpPath}
defer func() {
if !success {
cluster.Stop()
}
}()
for serverN := 0; serverN < size; serverN++ {
srvPath := filepath.Join(tmpPath, fmt.Sprintf("srv%d", serverN))
if err := os.Mkdir(srvPath, 0700); err != nil {
return nil, err
}
port := startPort + serverN*3
cfg := ServerConfig{
ClientPort: port,
DataDir: srvPath,
}
for i := 0; i < size; i++ {
cfg.Servers = append(cfg.Servers, ServerConfigServer{
ID: i + 1,
Host: "127.0.0.1",
PeerPort: startPort + i*3 + 1,
LeaderElectionPort: startPort + i*3 + 2,
})
}
cfgPath := filepath.Join(srvPath, "zoo.cfg")
fi, err := os.Create(cfgPath)
if err != nil {
return nil, err
}
err = cfg.Marshall(fi)
fi.Close()
if err != nil {
return nil, err
}
fi, err = os.Create(filepath.Join(srvPath, "myid"))
if err != nil {
return nil, err
}
_, err = fmt.Fprintf(fi, "%d\n", serverN+1)
fi.Close()
if err != nil {
return nil, err
}
srv := &Server{
ConfigPath: cfgPath,
Stdout: stdout,
Stderr: stderr,
}
if err := srv.Start(); err != nil {
return nil, err
}
cluster.Servers = append(cluster.Servers, TestServer{
Path: srvPath,
Port: cfg.ClientPort,
Srv: srv,
})
}
if err := cluster.waitForStart(10, time.Second); err != nil {
return nil, err
}
success = true
return cluster, nil
}
func (tc *TestCluster) Connect(idx int) (*Conn, error) {
zk, _, err := Connect([]string{fmt.Sprintf("127.0.0.1:%d", tc.Servers[idx].Port)}, time.Second*15)
return zk, err
}
func (tc *TestCluster) ConnectAll() (*Conn, <-chan Event, error) {
return tc.ConnectAllTimeout(time.Second * 15)
}
func (tc *TestCluster) ConnectAllTimeout(sessionTimeout time.Duration) (*Conn, <-chan Event, error) {
return tc.ConnectWithOptions(sessionTimeout)
}
func (tc *TestCluster) ConnectWithOptions(sessionTimeout time.Duration, options ...connOption) (*Conn, <-chan Event, error) {
hosts := make([]string, len(tc.Servers))
for i, srv := range tc.Servers {
hosts[i] = fmt.Sprintf("127.0.0.1:%d", srv.Port)
}
zk, ch, err := Connect(hosts, sessionTimeout, options...)
return zk, ch, err
}
func (tc *TestCluster) Stop() error {
for _, srv := range tc.Servers {
srv.Srv.Stop()
}
defer os.RemoveAll(tc.Path)
return tc.waitForStop(5, time.Second)
}
// waitForStart blocks until the cluster is up
func (tc *TestCluster) waitForStart(maxRetry int, interval time.Duration) error {
// verify that the servers are up with SRVR
serverAddrs := make([]string, len(tc.Servers))
for i, s := range tc.Servers {
serverAddrs[i] = fmt.Sprintf("127.0.0.1:%d", s.Port)
}
for i := 0; i < maxRetry; i++ {
_, ok := FLWSrvr(serverAddrs, time.Second)
if ok {
return nil
}
time.Sleep(interval)
}
return fmt.Errorf("unable to verify health of servers")
}
// waitForStop blocks until the cluster is down
func (tc *TestCluster) waitForStop(maxRetry int, interval time.Duration) error {
// verify that the servers are up with RUOK
serverAddrs := make([]string, len(tc.Servers))
for i, s := range tc.Servers {
serverAddrs[i] = fmt.Sprintf("127.0.0.1:%d", s.Port)
}
var success bool
for i := 0; i < maxRetry && !success; i++ {
success = true
for _, ok := range FLWRuok(serverAddrs, time.Second) {
if ok {
success = false
}
}
if !success {
time.Sleep(interval)
}
}
if !success {
return fmt.Errorf("unable to verify servers are down")
}
return nil
}
func (tc *TestCluster) StartServer(server string) {
for _, s := range tc.Servers {
if strings.HasSuffix(server, fmt.Sprintf(":%d", s.Port)) {
s.Srv.Start()
return
}
}
panic(fmt.Sprintf("Unknown server: %s", server))
}
func (tc *TestCluster) StopServer(server string) {
for _, s := range tc.Servers {
if strings.HasSuffix(server, fmt.Sprintf(":%d", s.Port)) {
s.Srv.Stop()
return
}
}
panic(fmt.Sprintf("Unknown server: %s", server))
}
func (tc *TestCluster) StartAllServers() error {
for _, s := range tc.Servers {
if err := s.Srv.Start(); err != nil {
return fmt.Errorf(
"Failed to start server listening on port `%d` : %+v", s.Port, err)
}
}
return nil
}
func (tc *TestCluster) StopAllServers() error {
for _, s := range tc.Servers {
if err := s.Srv.Stop(); err != nil {
return fmt.Errorf(
"Failed to stop server listening on port `%d` : %+v", s.Port, err)
}
}
return nil
}

View file

@ -1,136 +0,0 @@
package zk
import (
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
)
type ErrMissingServerConfigField string
func (e ErrMissingServerConfigField) Error() string {
return fmt.Sprintf("zk: missing server config field '%s'", string(e))
}
const (
DefaultServerTickTime = 2000
DefaultServerInitLimit = 10
DefaultServerSyncLimit = 5
DefaultServerAutoPurgeSnapRetainCount = 3
DefaultPeerPort = 2888
DefaultLeaderElectionPort = 3888
)
type ServerConfigServer struct {
ID int
Host string
PeerPort int
LeaderElectionPort int
}
type ServerConfig struct {
TickTime int // Number of milliseconds of each tick
InitLimit int // Number of ticks that the initial synchronization phase can take
SyncLimit int // Number of ticks that can pass between sending a request and getting an acknowledgement
DataDir string // Direcrory where the snapshot is stored
ClientPort int // Port at which clients will connect
AutoPurgeSnapRetainCount int // Number of snapshots to retain in dataDir
AutoPurgePurgeInterval int // Purge task internal in hours (0 to disable auto purge)
Servers []ServerConfigServer
}
func (sc ServerConfig) Marshall(w io.Writer) error {
if sc.DataDir == "" {
return ErrMissingServerConfigField("dataDir")
}
fmt.Fprintf(w, "dataDir=%s\n", sc.DataDir)
if sc.TickTime <= 0 {
sc.TickTime = DefaultServerTickTime
}
fmt.Fprintf(w, "tickTime=%d\n", sc.TickTime)
if sc.InitLimit <= 0 {
sc.InitLimit = DefaultServerInitLimit
}
fmt.Fprintf(w, "initLimit=%d\n", sc.InitLimit)
if sc.SyncLimit <= 0 {
sc.SyncLimit = DefaultServerSyncLimit
}
fmt.Fprintf(w, "syncLimit=%d\n", sc.SyncLimit)
if sc.ClientPort <= 0 {
sc.ClientPort = DefaultPort
}
fmt.Fprintf(w, "clientPort=%d\n", sc.ClientPort)
if sc.AutoPurgePurgeInterval > 0 {
if sc.AutoPurgeSnapRetainCount <= 0 {
sc.AutoPurgeSnapRetainCount = DefaultServerAutoPurgeSnapRetainCount
}
fmt.Fprintf(w, "autopurge.snapRetainCount=%d\n", sc.AutoPurgeSnapRetainCount)
fmt.Fprintf(w, "autopurge.purgeInterval=%d\n", sc.AutoPurgePurgeInterval)
}
if len(sc.Servers) > 0 {
for _, srv := range sc.Servers {
if srv.PeerPort <= 0 {
srv.PeerPort = DefaultPeerPort
}
if srv.LeaderElectionPort <= 0 {
srv.LeaderElectionPort = DefaultLeaderElectionPort
}
fmt.Fprintf(w, "server.%d=%s:%d:%d\n", srv.ID, srv.Host, srv.PeerPort, srv.LeaderElectionPort)
}
}
return nil
}
var jarSearchPaths = []string{
"zookeeper-*/contrib/fatjar/zookeeper-*-fatjar.jar",
"../zookeeper-*/contrib/fatjar/zookeeper-*-fatjar.jar",
"/usr/share/java/zookeeper-*.jar",
"/usr/local/zookeeper-*/contrib/fatjar/zookeeper-*-fatjar.jar",
"/usr/local/Cellar/zookeeper/*/libexec/contrib/fatjar/zookeeper-*-fatjar.jar",
}
func findZookeeperFatJar() string {
var paths []string
zkPath := os.Getenv("ZOOKEEPER_PATH")
if zkPath == "" {
paths = jarSearchPaths
} else {
paths = []string{filepath.Join(zkPath, "contrib/fatjar/zookeeper-*-fatjar.jar")}
}
for _, path := range paths {
matches, _ := filepath.Glob(path)
// TODO: could sort by version and pick latest
if len(matches) > 0 {
return matches[0]
}
}
return ""
}
type Server struct {
JarPath string
ConfigPath string
Stdout, Stderr io.Writer
cmd *exec.Cmd
}
func (srv *Server) Start() error {
if srv.JarPath == "" {
srv.JarPath = findZookeeperFatJar()
if srv.JarPath == "" {
return fmt.Errorf("zk: unable to find server jar")
}
}
srv.cmd = exec.Command("java", "-jar", srv.JarPath, "server", srv.ConfigPath)
srv.cmd.Stdout = srv.Stdout
srv.cmd.Stderr = srv.Stderr
return srv.cmd.Start()
}
func (srv *Server) Stop() error {
srv.cmd.Process.Signal(os.Kill)
return srv.cmd.Wait()
}

View file

@ -6,6 +6,7 @@ import (
"log"
"reflect"
"runtime"
"strings"
"time"
)
@ -277,6 +278,18 @@ type multiResponse struct {
DoneHeader multiHeader
}
// zk version 3.5 reconfig API
type reconfigRequest struct {
JoiningServers []byte
LeavingServers []byte
NewMembers []byte
// curConfigId version of the current configuration
// optional - causes reconfiguration to return an error if configuration is no longer current
CurConfigId int64
}
type reconfigReponse getDataResponse
func (r *multiRequest) Encode(buf []byte) (int, error) {
total := 0
for _, op := range r.Ops {
@ -392,7 +405,7 @@ type encoder interface {
func decodePacket(buf []byte, st interface{}) (n int, err error) {
defer func() {
if r := recover(); r != nil {
if e, ok := r.(runtime.Error); ok && e.Error() == "runtime error: slice bounds out of range" {
if e, ok := r.(runtime.Error); ok && strings.HasPrefix(e.Error(), "runtime error: slice bounds out of range") {
err = ErrShortBuffer
} else {
panic(r)
@ -483,7 +496,7 @@ func decodePacketValue(buf []byte, v reflect.Value) (int, error) {
func encodePacket(buf []byte, st interface{}) (n int, err error) {
defer func() {
if r := recover(); r != nil {
if e, ok := r.(runtime.Error); ok && e.Error() == "runtime error: slice bounds out of range" {
if e, ok := r.(runtime.Error); ok && strings.HasPrefix(e.Error(), "runtime error: slice bounds out of range") {
err = ErrShortBuffer
} else {
panic(r)
@ -604,6 +617,8 @@ func requestStructForOp(op int32) interface{} {
return &CheckVersionRequest{}
case opMulti:
return &multiRequest{}
case opReconfig:
return &reconfigRequest{}
}
return nil
}