diff --git a/container.go b/container.go index 77c7c39..6216708 100644 --- a/container.go +++ b/container.go @@ -48,4 +48,3 @@ func (c DockerContainer) AddParameter(name, value string) DockerContainer { c.container.Parameters = append(c.container.Parameters, &aurora.DockerParameter{name, value}) return c } - diff --git a/examples/client.go b/examples/client.go index ae5fea8..d8e0efe 100644 --- a/examples/client.go +++ b/examples/client.go @@ -110,15 +110,15 @@ func main() { break case "none": job = realis.NewJob(). - Environment("prod"). - Role("vagrant"). - Name("docker_as_task"). - CPU(1). - RAM(64). - Disk(100). - IsService(true). - InstanceCount(1). - AddPorts(1) + Environment("prod"). + Role("vagrant"). + Name("docker_as_task"). + CPU(1). + RAM(64). + Disk(100). + IsService(true). + InstanceCount(1). + AddPorts(1) break default: fmt.Println("Only thermos, compose, and none are supported for now") diff --git a/job.go b/job.go index 1996b3e..256066b 100644 --- a/job.go +++ b/job.go @@ -116,7 +116,7 @@ func (j AuroraJob) Name(name string) Job { // Set name of the executor that will the task will be configured to. func (j AuroraJob) ExecutorName(name string) Job { - if j.jobConfig.TaskConfig.ExecutorConfig == nil { + if j.jobConfig.TaskConfig.ExecutorConfig == nil { j.jobConfig.TaskConfig.ExecutorConfig = aurora.NewExecutorConfig() } @@ -127,7 +127,7 @@ func (j AuroraJob) ExecutorName(name string) Job { // Will be included as part of entire task inside the scheduler that will be serialized. func (j AuroraJob) ExecutorData(data string) Job { - if j.jobConfig.TaskConfig.ExecutorConfig == nil { + if j.jobConfig.TaskConfig.ExecutorConfig == nil { j.jobConfig.TaskConfig.ExecutorConfig = aurora.NewExecutorConfig() } diff --git a/monitors.go b/monitors.go index c7e6895..f1acb52 100644 --- a/monitors.go +++ b/monitors.go @@ -17,11 +17,11 @@ package realis import ( "fmt" + "github.com/pkg/errors" "github.com/rdelval/gorealis/gen-go/apache/aurora" "github.com/rdelval/gorealis/response" "os" "time" - "github.com/pkg/errors" ) type Monitor struct { diff --git a/realis.go b/realis.go index e9c2b7a..7f08943 100644 --- a/realis.go +++ b/realis.go @@ -21,11 +21,11 @@ import ( "git.apache.org/thrift.git/lib/go/thrift" "github.com/pkg/errors" "github.com/rdelval/gorealis/gen-go/apache/aurora" + "github.com/rdelval/gorealis/response" "net/http" "net/http/cookiejar" "os" "time" - "github.com/rdelval/gorealis/response" ) type Realis interface { diff --git a/realis_test.go b/realis_test.go index 7f2daef..329abbf 100644 --- a/realis_test.go +++ b/realis_test.go @@ -15,12 +15,12 @@ package realis import ( - "testing" - "github.com/stretchr/testify/assert" "fmt" - "os" - "io/ioutil" "github.com/rdelval/gorealis/gen-go/apache/aurora" + "github.com/stretchr/testify/assert" + "io/ioutil" + "os" + "testing" ) var r Realis @@ -71,7 +71,7 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) { assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode) // Tasks must exist for it to be killed - t.Run("TestRealisClient_KillJob_Thermos", func(t *testing.T){ + t.Run("TestRealisClient_KillJob_Thermos", func(t *testing.T) { resp, err := r.KillJob(job.JobKey()) if err != nil { fmt.Println(err) @@ -113,7 +113,7 @@ func TestRealisClient_ScheduleCronJob_Thermos(t *testing.T) { assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode) - t.Run("TestRealisClient_StartCronJob_Thermos", func(t *testing.T){ + t.Run("TestRealisClient_StartCronJob_Thermos", func(t *testing.T) { resp, err := r.StartCronJob(job.JobKey()) if err != nil { fmt.Println(err) @@ -123,7 +123,7 @@ func TestRealisClient_ScheduleCronJob_Thermos(t *testing.T) { assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode) }) - t.Run("TestRealisClient_DeschedulerCronJob_Thermos", func(t *testing.T){ + t.Run("TestRealisClient_DeschedulerCronJob_Thermos", func(t *testing.T) { resp, err := r.DescheduleCronJob(job.JobKey()) if err != nil { fmt.Println(err) @@ -132,4 +132,4 @@ func TestRealisClient_ScheduleCronJob_Thermos(t *testing.T) { assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode) }) -} \ No newline at end of file +} diff --git a/vendor/github.com/samuel/go-zookeeper/zk/conn.go b/vendor/github.com/samuel/go-zookeeper/zk/conn.go index 5ca8e2b..b6b8dbc 100644 --- a/vendor/github.com/samuel/go-zookeeper/zk/conn.go +++ b/vendor/github.com/samuel/go-zookeeper/zk/conn.go @@ -15,9 +15,7 @@ import ( "errors" "fmt" "io" - "log" "net" - "os" "strconv" "strings" "sync" @@ -46,9 +44,9 @@ const ( type watchType int const ( - watchTypeData = iota - watchTypeExist = iota - watchTypeChild = iota + watchTypeData = iota + watchTypeExist + watchTypeChild ) type watchPathType struct { @@ -63,13 +61,9 @@ type Logger interface { Printf(string, ...interface{}) } -// NoOp logger -- http://stackoverflow.com/questions/10571182/go-disable-a-log-logger -type NopLogger struct { - *log.Logger -} - -func (l NopLogger) Printf(string, ...interface{}) { - // noop +type authCreds struct { + scheme string + auth []byte } type Conn struct { @@ -86,21 +80,28 @@ type Conn struct { server string // remember the address/port of the current server conn net.Conn eventChan chan Event + eventCallback EventCallback // may be nil shouldQuit chan struct{} pingInterval time.Duration recvTimeout time.Duration connectTimeout time.Duration + creds []authCreds + credsMu sync.Mutex // protects server + sendChan chan *request requests map[int32]*request // Xid -> pending request requestsLock sync.Mutex watchers map[watchPathType][]chan Event watchersLock sync.Mutex + closeChan chan struct{} // channel to tell send loop stop // Debug (used by unit tests) reconnectDelay time.Duration logger Logger + + buf []byte } // connOption represents a connection option. @@ -196,6 +197,7 @@ func Connect(servers []string, sessionTimeout time.Duration, options ...connOpti watchers: make(map[watchPathType][]chan Event), passwd: emptyPassword, logger: DefaultLogger, + buf: make([]byte, bufferSize), // Debug reconnectDelay: 0, @@ -235,17 +237,15 @@ func WithHostProvider(hostProvider HostProvider) connOption { } } -// WithLogger returns a connection option specifying a non-default logger -func WithLogger(logger Logger) connOption { - return func(c *Conn) { - c.logger = logger - } -} +// EventCallback is a function that is called when an Event occurs. +type EventCallback func(Event) -// WithLogger returns a connection option specifying a non-default logger -func WithoutLogger() connOption { +// WithEventCallback returns a connection option that specifies an event +// callback. +// The callback must not block - doing so would delay the ZK go routines. +func WithEventCallback(cb EventCallback) connOption { return func(c *Conn) { - c.logger = NopLogger{log.New(os.Stderr, "", log.LstdFlags)} + c.eventCallback = cb } } @@ -263,7 +263,7 @@ func (c *Conn) State() State { return State(atomic.LoadInt32((*int32)(&c.state))) } -// SessionId returns the current session id of the connection. +// SessionID returns the current session id of the connection. func (c *Conn) SessionID() int64 { return atomic.LoadInt64(&c.sessionID) } @@ -283,8 +283,16 @@ func (c *Conn) setTimeouts(sessionTimeoutMs int32) { func (c *Conn) setState(state State) { atomic.StoreInt32((*int32)(&c.state), int32(state)) + c.sendEvent(Event{Type: EventSession, State: state, Server: c.Server()}) +} + +func (c *Conn) sendEvent(evt Event) { + if c.eventCallback != nil { + c.eventCallback(evt) + } + select { - case c.eventChan <- Event{Type: EventSession, State: state, Server: c.Server()}: + case c.eventChan <- evt: default: // panic("zk: event channel full - it must be monitored and never allowed to be full") } @@ -321,6 +329,65 @@ func (c *Conn) connect() error { } } +func (c *Conn) resendZkAuth(reauthReadyChan chan struct{}) { + c.credsMu.Lock() + defer c.credsMu.Unlock() + + defer close(reauthReadyChan) + + c.logger.Printf("Re-submitting `%d` credentials after reconnect", + len(c.creds)) + + for _, cred := range c.creds { + resChan, err := c.sendRequest( + opSetAuth, + &setAuthRequest{Type: 0, + Scheme: cred.scheme, + Auth: cred.auth, + }, + &setAuthResponse{}, + nil) + + if err != nil { + c.logger.Printf("Call to sendRequest failed during credential resubmit: %s", err) + // FIXME(prozlach): lets ignore errors for now + continue + } + + res := <-resChan + if res.err != nil { + c.logger.Printf("Credential re-submit failed: %s", res.err) + // FIXME(prozlach): lets ignore errors for now + continue + } + } +} + +func (c *Conn) sendRequest( + opcode int32, + req interface{}, + res interface{}, + recvFunc func(*request, *responseHeader, error), +) ( + <-chan response, + error, +) { + rq := &request{ + xid: c.nextXid(), + opcode: opcode, + pkt: req, + recvStruct: res, + recvChan: make(chan response, 1), + recvFunc: recvFunc, + } + + if err := c.sendData(rq); err != nil { + return nil, err + } + + return rq.recvChan, nil +} + func (c *Conn) loop() { for { if err := c.connect(); err != nil { @@ -338,13 +405,15 @@ func (c *Conn) loop() { c.conn.Close() case err == nil: c.logger.Printf("Authenticated: id=%d, timeout=%d", c.SessionID(), c.sessionTimeoutMs) - c.hostProvider.Connected() // mark success - closeChan := make(chan struct{}) // channel to tell send loop stop - var wg sync.WaitGroup + c.hostProvider.Connected() // mark success + c.closeChan = make(chan struct{}) // channel to tell send loop stop + reauthChan := make(chan struct{}) // channel to tell send loop that authdata has been resubmitted + var wg sync.WaitGroup wg.Add(1) go func() { - err := c.sendLoop(c.conn, closeChan) + <-reauthChan + err := c.sendLoop() c.logger.Printf("Send loop terminated: err=%v", err) c.conn.Close() // causes recv loop to EOF/exit wg.Done() @@ -357,10 +426,12 @@ func (c *Conn) loop() { if err == nil { panic("zk: recvLoop should never return nil error") } - close(closeChan) // tell send loop to exit + close(c.closeChan) // tell send loop to exit wg.Done() }() + c.resendZkAuth(reauthChan) + c.sendSetWatches() wg.Wait() } @@ -532,66 +603,73 @@ func (c *Conn) authenticate() error { return nil } -func (c *Conn) sendLoop(conn net.Conn, closeChan <-chan struct{}) error { +func (c *Conn) sendData(req *request) error { + header := &requestHeader{req.xid, req.opcode} + n, err := encodePacket(c.buf[4:], header) + if err != nil { + req.recvChan <- response{-1, err} + return nil + } + + n2, err := encodePacket(c.buf[4+n:], req.pkt) + if err != nil { + req.recvChan <- response{-1, err} + return nil + } + + n += n2 + + binary.BigEndian.PutUint32(c.buf[:4], uint32(n)) + + c.requestsLock.Lock() + select { + case <-c.closeChan: + req.recvChan <- response{-1, ErrConnectionClosed} + c.requestsLock.Unlock() + return ErrConnectionClosed + default: + } + c.requests[req.xid] = req + c.requestsLock.Unlock() + + c.conn.SetWriteDeadline(time.Now().Add(c.recvTimeout)) + _, err = c.conn.Write(c.buf[:n+4]) + c.conn.SetWriteDeadline(time.Time{}) + if err != nil { + req.recvChan <- response{-1, err} + c.conn.Close() + return err + } + + return nil +} + +func (c *Conn) sendLoop() error { pingTicker := time.NewTicker(c.pingInterval) defer pingTicker.Stop() - buf := make([]byte, bufferSize) for { select { case req := <-c.sendChan: - header := &requestHeader{req.xid, req.opcode} - n, err := encodePacket(buf[4:], header) - if err != nil { - req.recvChan <- response{-1, err} - continue - } - - n2, err := encodePacket(buf[4+n:], req.pkt) - if err != nil { - req.recvChan <- response{-1, err} - continue - } - - n += n2 - - binary.BigEndian.PutUint32(buf[:4], uint32(n)) - - c.requestsLock.Lock() - select { - case <-closeChan: - req.recvChan <- response{-1, ErrConnectionClosed} - c.requestsLock.Unlock() - return ErrConnectionClosed - default: - } - c.requests[req.xid] = req - c.requestsLock.Unlock() - - conn.SetWriteDeadline(time.Now().Add(c.recvTimeout)) - _, err = conn.Write(buf[:n+4]) - conn.SetWriteDeadline(time.Time{}) - if err != nil { - req.recvChan <- response{-1, err} - conn.Close() + if err := c.sendData(req); err != nil { return err } case <-pingTicker.C: - n, err := encodePacket(buf[4:], &requestHeader{Xid: -2, Opcode: opPing}) + n, err := encodePacket(c.buf[4:], &requestHeader{Xid: -2, Opcode: opPing}) if err != nil { panic("zk: opPing should never fail to serialize") } - binary.BigEndian.PutUint32(buf[:4], uint32(n)) + binary.BigEndian.PutUint32(c.buf[:4], uint32(n)) - conn.SetWriteDeadline(time.Now().Add(c.recvTimeout)) - _, err = conn.Write(buf[:n+4]) - conn.SetWriteDeadline(time.Time{}) + c.conn.SetWriteDeadline(time.Now().Add(c.recvTimeout)) + _, err = c.conn.Write(c.buf[:n+4]) + c.conn.SetWriteDeadline(time.Time{}) if err != nil { - conn.Close() + c.conn.Close() return err } - case <-closeChan: + case <-c.closeChan: return nil } } @@ -636,10 +714,7 @@ func (c *Conn) recvLoop(conn net.Conn) error { Path: res.Path, Err: nil, } - select { - case c.eventChan <- ev: - default: - } + c.sendEvent(ev) wTypes := make([]watchType, 0, 2) switch res.Type { case EventNodeCreated: @@ -731,7 +806,28 @@ func (c *Conn) request(opcode int32, req interface{}, res interface{}, recvFunc func (c *Conn) AddAuth(scheme string, auth []byte) error { _, err := c.request(opSetAuth, &setAuthRequest{Type: 0, Scheme: scheme, Auth: auth}, &setAuthResponse{}, nil) - return err + + if err != nil { + return err + } + + // Remember authdata so that it can be re-submitted on reconnect + // + // FIXME(prozlach): For now we treat "userfoo:passbar" and "userfoo:passbar2" + // as two different entries, which will be re-submitted on reconnet. Some + // research is needed on how ZK treats these cases and + // then maybe switch to something like "map[username] = password" to allow + // only single password for given user with users being unique. + obj := authCreds{ + scheme: scheme, + auth: auth, + } + + c.credsMu.Lock() + c.creds = append(c.creds, obj) + c.credsMu.Unlock() + + return nil } func (c *Conn) Children(path string) ([]string, *Stat, error) { @@ -892,6 +988,7 @@ func (c *Conn) Sync(path string) (string, error) { type MultiResponse struct { Stat *Stat String string + Error error } // Multi executes multiple ZooKeeper operations or none of them. The provided @@ -922,7 +1019,7 @@ func (c *Conn) Multi(ops ...interface{}) ([]MultiResponse, error) { _, err := c.request(opMulti, req, res, nil) mr := make([]MultiResponse, len(res.Ops)) for i, op := range res.Ops { - mr[i] = MultiResponse{Stat: op.Stat, String: op.String} + mr[i] = MultiResponse{Stat: op.Stat, String: op.String, Error: op.Err.toError()} } return mr, err } diff --git a/vendor/github.com/samuel/go-zookeeper/zk/constants.go b/vendor/github.com/samuel/go-zookeeper/zk/constants.go index f9b39b9..33b5563 100644 --- a/vendor/github.com/samuel/go-zookeeper/zk/constants.go +++ b/vendor/github.com/samuel/go-zookeeper/zk/constants.go @@ -28,18 +28,19 @@ const ( opClose = -11 opSetAuth = 100 opSetWatches = 101 + opError = -1 // Not in protocol, used internally opWatcherEvent = -2 ) const ( - EventNodeCreated = EventType(1) - EventNodeDeleted = EventType(2) - EventNodeDataChanged = EventType(3) - EventNodeChildrenChanged = EventType(4) + EventNodeCreated EventType = 1 + EventNodeDeleted EventType = 2 + EventNodeDataChanged EventType = 3 + EventNodeChildrenChanged EventType = 4 - EventSession = EventType(-1) - EventNotWatching = EventType(-2) + EventSession EventType = -1 + EventNotWatching EventType = -2 ) var ( @@ -54,14 +55,13 @@ var ( ) const ( - StateUnknown = State(-1) - StateDisconnected = State(0) - StateConnecting = State(1) - StateAuthFailed = State(4) - StateConnectedReadOnly = State(5) - StateSaslAuthenticated = State(6) - StateExpired = State(-112) - // StateAuthFailed = State(-113) + StateUnknown State = -1 + StateDisconnected State = 0 + StateConnecting State = 1 + StateAuthFailed State = 4 + StateConnectedReadOnly State = 5 + StateSaslAuthenticated State = 6 + StateExpired State = -112 StateConnected = State(100) StateHasSession = State(101) @@ -154,20 +154,20 @@ const ( errBadArguments = -8 errInvalidState = -9 // API errors - errAPIError = ErrCode(-100) - errNoNode = ErrCode(-101) // * - errNoAuth = ErrCode(-102) - errBadVersion = ErrCode(-103) // * - errNoChildrenForEphemerals = ErrCode(-108) - errNodeExists = ErrCode(-110) // * - errNotEmpty = ErrCode(-111) - errSessionExpired = ErrCode(-112) - errInvalidCallback = ErrCode(-113) - errInvalidAcl = ErrCode(-114) - errAuthFailed = ErrCode(-115) - errClosing = ErrCode(-116) - errNothing = ErrCode(-117) - errSessionMoved = ErrCode(-118) + errAPIError ErrCode = -100 + errNoNode ErrCode = -101 // * + errNoAuth ErrCode = -102 + errBadVersion ErrCode = -103 // * + errNoChildrenForEphemerals ErrCode = -108 + errNodeExists ErrCode = -110 // * + errNotEmpty ErrCode = -111 + errSessionExpired ErrCode = -112 + errInvalidCallback ErrCode = -113 + errInvalidAcl ErrCode = -114 + errAuthFailed ErrCode = -115 + errClosing ErrCode = -116 + errNothing ErrCode = -117 + errSessionMoved ErrCode = -118 ) // Constants for ACL permissions diff --git a/vendor/github.com/samuel/go-zookeeper/zk/lock.go b/vendor/github.com/samuel/go-zookeeper/zk/lock.go index f13a8b0..3c35a42 100644 --- a/vendor/github.com/samuel/go-zookeeper/zk/lock.go +++ b/vendor/github.com/samuel/go-zookeeper/zk/lock.go @@ -58,8 +58,16 @@ func (l *Lock) Lock() error { parts := strings.Split(l.path, "/") pth := "" for _, p := range parts[1:] { + var exists bool pth += "/" + p - _, err := l.c.Create(pth, []byte{}, 0, l.acl) + exists, _, err = l.c.Exists(pth) + if err != nil { + return err + } + if exists == true { + continue + } + _, err = l.c.Create(pth, []byte{}, 0, l.acl) if err != nil && err != ErrNodeExists { return err } @@ -86,7 +94,7 @@ func (l *Lock) Lock() error { } lowestSeq := seq - prevSeq := 0 + prevSeq := -1 prevSeqPath := "" for _, p := range children { s, err := parseSeq(p) diff --git a/vendor/github.com/samuel/go-zookeeper/zk/server_help.go b/vendor/github.com/samuel/go-zookeeper/zk/server_help.go index 618185a..3663064 100644 --- a/vendor/github.com/samuel/go-zookeeper/zk/server_help.go +++ b/vendor/github.com/samuel/go-zookeeper/zk/server_help.go @@ -99,37 +99,41 @@ func StartTestCluster(size int, stdout, stderr io.Writer) (*TestCluster, error) return cluster, nil } -func (ts *TestCluster) Connect(idx int) (*Conn, error) { - zk, _, err := Connect([]string{fmt.Sprintf("127.0.0.1:%d", ts.Servers[idx].Port)}, time.Second*15) +func (tc *TestCluster) Connect(idx int) (*Conn, error) { + zk, _, err := Connect([]string{fmt.Sprintf("127.0.0.1:%d", tc.Servers[idx].Port)}, time.Second*15) return zk, err } -func (ts *TestCluster) ConnectAll() (*Conn, <-chan Event, error) { - return ts.ConnectAllTimeout(time.Second * 15) +func (tc *TestCluster) ConnectAll() (*Conn, <-chan Event, error) { + return tc.ConnectAllTimeout(time.Second * 15) } -func (ts *TestCluster) ConnectAllTimeout(sessionTimeout time.Duration) (*Conn, <-chan Event, error) { - hosts := make([]string, len(ts.Servers)) - for i, srv := range ts.Servers { +func (tc *TestCluster) ConnectAllTimeout(sessionTimeout time.Duration) (*Conn, <-chan Event, error) { + return tc.ConnectWithOptions(sessionTimeout) +} + +func (tc *TestCluster) ConnectWithOptions(sessionTimeout time.Duration, options ...connOption) (*Conn, <-chan Event, error) { + hosts := make([]string, len(tc.Servers)) + for i, srv := range tc.Servers { hosts[i] = fmt.Sprintf("127.0.0.1:%d", srv.Port) } - zk, ch, err := Connect(hosts, sessionTimeout) + zk, ch, err := Connect(hosts, sessionTimeout, options...) return zk, ch, err } -func (ts *TestCluster) Stop() error { - for _, srv := range ts.Servers { +func (tc *TestCluster) Stop() error { + for _, srv := range tc.Servers { srv.Srv.Stop() } - defer os.RemoveAll(ts.Path) - return ts.waitForStop(5, time.Second) + defer os.RemoveAll(tc.Path) + return tc.waitForStop(5, time.Second) } // waitForStart blocks until the cluster is up -func (ts *TestCluster) waitForStart(maxRetry int, interval time.Duration) error { +func (tc *TestCluster) waitForStart(maxRetry int, interval time.Duration) error { // verify that the servers are up with SRVR - serverAddrs := make([]string, len(ts.Servers)) - for i, s := range ts.Servers { + serverAddrs := make([]string, len(tc.Servers)) + for i, s := range tc.Servers { serverAddrs[i] = fmt.Sprintf("127.0.0.1:%d", s.Port) } @@ -144,10 +148,10 @@ func (ts *TestCluster) waitForStart(maxRetry int, interval time.Duration) error } // waitForStop blocks until the cluster is down -func (ts *TestCluster) waitForStop(maxRetry int, interval time.Duration) error { +func (tc *TestCluster) waitForStop(maxRetry int, interval time.Duration) error { // verify that the servers are up with RUOK - serverAddrs := make([]string, len(ts.Servers)) - for i, s := range ts.Servers { + serverAddrs := make([]string, len(tc.Servers)) + for i, s := range tc.Servers { serverAddrs[i] = fmt.Sprintf("127.0.0.1:%d", s.Port) } @@ -188,3 +192,25 @@ func (tc *TestCluster) StopServer(server string) { } panic(fmt.Sprintf("Unknown server: %s", server)) } + +func (tc *TestCluster) StartAllServers() error { + for _, s := range tc.Servers { + if err := s.Srv.Start(); err != nil { + return fmt.Errorf( + "Failed to start server listening on port `%d` : %+v", s.Port, err) + } + } + + return nil +} + +func (tc *TestCluster) StopAllServers() error { + for _, s := range tc.Servers { + if err := s.Srv.Stop(); err != nil { + return fmt.Errorf( + "Failed to stop server listening on port `%d` : %+v", s.Port, err) + } + } + + return nil +} diff --git a/vendor/github.com/samuel/go-zookeeper/zk/structs.go b/vendor/github.com/samuel/go-zookeeper/zk/structs.go index 02cd3f3..d4af27d 100644 --- a/vendor/github.com/samuel/go-zookeeper/zk/structs.go +++ b/vendor/github.com/samuel/go-zookeeper/zk/structs.go @@ -270,6 +270,7 @@ type multiResponseOp struct { Header multiHeader String string Stat *Stat + Err ErrCode } type multiResponse struct { Ops []multiResponseOp @@ -327,6 +328,8 @@ func (r *multiRequest) Decode(buf []byte) (int, error) { } func (r *multiResponse) Decode(buf []byte) (int, error) { + var multiErr error + r.Ops = make([]multiResponseOp, 0) r.DoneHeader = multiHeader{-1, true, -1} total := 0 @@ -347,6 +350,8 @@ func (r *multiResponse) Decode(buf []byte) (int, error) { switch header.Type { default: return total, ErrAPIError + case opError: + w = reflect.ValueOf(&res.Err) case opCreate: w = reflect.ValueOf(&res.String) case opSetData: @@ -362,8 +367,12 @@ func (r *multiResponse) Decode(buf []byte) (int, error) { total += n } r.Ops = append(r.Ops, res) + if multiErr == nil && res.Err != errOk { + // Use the first error as the error returned from Multi(). + multiErr = res.Err.toError() + } } - return total, nil + return total, multiErr } type watcherEvent struct { diff --git a/vendor/vendor.json b/vendor/vendor.json index 074e58f..812848e 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -15,10 +15,10 @@ "revisionTime": "2016-06-27T22:23:52Z" }, { - "checksumSHA1": "dF3fORwN1HTgrlrdmll9K2cOjOg=", + "checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=", "path": "github.com/samuel/go-zookeeper/zk", - "revision": "e64db453f3512cade908163702045e0f31137843", - "revisionTime": "2016-06-16T02:49:54Z" + "revision": "1d7be4effb13d2d908342d349d71a284a7542693", + "revisionTime": "2016-10-28T23:23:40Z" }, { "checksumSHA1": "iydUphwYqZRq3WhstEdGsbvBAKs=", diff --git a/zk.go b/zk.go index 48b18e2..a301fcd 100644 --- a/zk.go +++ b/zk.go @@ -35,13 +35,18 @@ type ServiceInstance struct { Status string `json:"status"` } +type NoopLogger struct{} + +func (NoopLogger) Printf(format string, a ...interface{}) { +} + // Loads leader from ZK endpoint. func LeaderFromZK(cluster Cluster) (string, error) { endpoints := strings.Split(cluster.ZK, ",") //TODO (rdelvalle): When enabling debugging, change logger here - c, _, err := zk.Connect(endpoints, time.Second*10, zk.WithoutLogger()) + c, _, err := zk.Connect(endpoints, time.Second*10, func(c *zk.Conn) { c.SetLogger(NoopLogger{}) }) if err != nil { return "", errors.Wrap(err, "Failed to connect to Zookeeper at "+cluster.ZK) }