Adding dep files and dependencies.
This commit is contained in:
parent
45f9efa578
commit
b341c0a0e4
539 changed files with 313111 additions and 0 deletions
417
vendor/github.com/mesos/mesos-go/api/v0/messenger/messenger.go
generated
vendored
Normal file
417
vendor/github.com/mesos/mesos-go/api/v0/messenger/messenger.go
generated
vendored
Normal file
|
@ -0,0 +1,417 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package messenger
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
log "github.com/golang/glog"
|
||||
mesos "github.com/mesos/mesos-go/api/v0/mesosproto"
|
||||
"github.com/mesos/mesos-go/api/v0/mesosproto/scheduler"
|
||||
"github.com/mesos/mesos-go/api/v0/mesosutil/process"
|
||||
"github.com/mesos/mesos-go/api/v0/messenger/sessionid"
|
||||
"github.com/mesos/mesos-go/api/v0/upid"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultQueueSize = 1024
|
||||
)
|
||||
|
||||
// MessageHandler is the callback of the message. When the callback
|
||||
// is invoked, the sender's upid and the message is passed to the callback.
|
||||
type MessageHandler func(from *upid.UPID, pbMsg proto.Message)
|
||||
|
||||
// Messenger defines the interfaces that should be implemented.
|
||||
type Messenger interface {
|
||||
Install(handler MessageHandler, msg proto.Message) error
|
||||
Send(ctx context.Context, upid *upid.UPID, msg proto.Message) error
|
||||
Route(ctx context.Context, from *upid.UPID, msg proto.Message) error
|
||||
Start() error
|
||||
Stop() error
|
||||
UPID() upid.UPID
|
||||
}
|
||||
|
||||
type errorHandlerFunc func(context.Context, *Message, error) error
|
||||
type dispatchFunc func(errorHandlerFunc)
|
||||
|
||||
// MesosMessenger is an implementation of the Messenger interface.
|
||||
type MesosMessenger struct {
|
||||
upid upid.UPID
|
||||
sendingQueue chan dispatchFunc
|
||||
installedMessages map[string]reflect.Type
|
||||
installedHandlers map[string]MessageHandler
|
||||
stop chan struct{}
|
||||
stopOnce sync.Once
|
||||
tr Transporter
|
||||
guardHandlers sync.RWMutex // protect simultaneous changes to messages/handlers maps
|
||||
}
|
||||
|
||||
// ForHostname creates a new default messenger (HTTP), using UPIDBindingAddress to
|
||||
// determine the binding-address used for both the UPID.Host and Transport binding address.
|
||||
func ForHostname(proc *process.Process, hostname string, bindingAddress net.IP, port uint16, publishedAddress net.IP) (Messenger, error) {
|
||||
upid := upid.UPID{
|
||||
ID: proc.Label(),
|
||||
Port: strconv.Itoa(int(port)),
|
||||
}
|
||||
host, err := UPIDBindingAddress(hostname, bindingAddress)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var publishedHost string
|
||||
if publishedAddress != nil {
|
||||
publishedHost, err = UPIDBindingAddress(hostname, publishedAddress)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if publishedHost != "" {
|
||||
upid.Host = publishedHost
|
||||
} else {
|
||||
upid.Host = host
|
||||
}
|
||||
|
||||
return NewHttpWithBindingAddress(upid, bindingAddress), nil
|
||||
}
|
||||
|
||||
// UPIDBindingAddress determines the value of UPID.Host that will be used to build
|
||||
// a Transport. If a non-nil, non-wildcard bindingAddress is specified then it will be used
|
||||
// for both the UPID and Transport binding address. Otherwise hostname is resolved to an IP
|
||||
// address and the UPID.Host is set to that address and the bindingAddress is passed through
|
||||
// to the Transport.
|
||||
func UPIDBindingAddress(hostname string, bindingAddress net.IP) (string, error) {
|
||||
upidHost := ""
|
||||
if bindingAddress != nil && "0.0.0.0" != bindingAddress.String() {
|
||||
upidHost = bindingAddress.String()
|
||||
} else {
|
||||
if hostname == "" || hostname == "0.0.0.0" {
|
||||
return "", fmt.Errorf("invalid hostname (%q) specified with binding address %v", hostname, bindingAddress)
|
||||
}
|
||||
ip := net.ParseIP(hostname)
|
||||
if ip != nil {
|
||||
ip = ip.To4()
|
||||
}
|
||||
if ip == nil {
|
||||
ips, err := net.LookupIP(hostname)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
// try to find an ipv4 and use that
|
||||
for _, addr := range ips {
|
||||
if ip = addr.To4(); ip != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
if ip == nil {
|
||||
// no ipv4? best guess, just take the first addr
|
||||
if len(ips) > 0 {
|
||||
ip = ips[0]
|
||||
log.Warningf("failed to find an IPv4 address for '%v', best guess is '%v'", hostname, ip)
|
||||
} else {
|
||||
return "", fmt.Errorf("failed to determine IP address for host '%v'", hostname)
|
||||
}
|
||||
}
|
||||
}
|
||||
upidHost = ip.String()
|
||||
}
|
||||
return upidHost, nil
|
||||
}
|
||||
|
||||
// NewMesosMessenger creates a new mesos messenger.
|
||||
func NewHttp(upid upid.UPID, opts ...httpOpt) *MesosMessenger {
|
||||
return NewHttpWithBindingAddress(upid, nil, opts...)
|
||||
}
|
||||
|
||||
func NewHttpWithBindingAddress(upid upid.UPID, address net.IP, opts ...httpOpt) *MesosMessenger {
|
||||
return New(NewHTTPTransporter(upid, address, opts...))
|
||||
}
|
||||
|
||||
func New(t Transporter) *MesosMessenger {
|
||||
return &MesosMessenger{
|
||||
sendingQueue: make(chan dispatchFunc, defaultQueueSize),
|
||||
installedMessages: make(map[string]reflect.Type),
|
||||
installedHandlers: make(map[string]MessageHandler),
|
||||
tr: t,
|
||||
}
|
||||
}
|
||||
|
||||
/// Install installs the handler with the given message.
|
||||
func (m *MesosMessenger) Install(handler MessageHandler, msg proto.Message) error {
|
||||
// Check if the message is a pointer.
|
||||
mtype := reflect.TypeOf(msg)
|
||||
if mtype.Kind() != reflect.Ptr {
|
||||
return fmt.Errorf("Message %v is not a Ptr type", msg)
|
||||
}
|
||||
|
||||
// Check if the message is already installed.
|
||||
name := getMessageName(msg)
|
||||
if _, ok := m.installedMessages[name]; ok {
|
||||
return fmt.Errorf("Message %v is already installed", name)
|
||||
}
|
||||
|
||||
m.guardHandlers.Lock()
|
||||
defer m.guardHandlers.Unlock()
|
||||
|
||||
m.installedMessages[name] = mtype.Elem()
|
||||
m.installedHandlers[name] = handler
|
||||
m.tr.Install(name)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Send puts a message into the outgoing queue, waiting to be sent.
|
||||
// With buffered channels, this will not block under moderate throughput.
|
||||
// When an error is generated, the error can be communicated by placing
|
||||
// a message on the incoming queue to be handled upstream.
|
||||
func (m *MesosMessenger) Send(ctx context.Context, upid *upid.UPID, msg proto.Message) error {
|
||||
if upid == nil {
|
||||
panic("cannot sent a message to a nil pid")
|
||||
} else if *upid == m.upid {
|
||||
return fmt.Errorf("Send the message to self")
|
||||
}
|
||||
|
||||
b, err := proto.Marshal(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
name := getMessageName(msg)
|
||||
log.V(2).Infof("Sending message %v to %v\n", name, upid)
|
||||
|
||||
wrapped := &Message{upid, name, msg, b}
|
||||
d := dispatchFunc(func(rf errorHandlerFunc) {
|
||||
err := m.tr.Send(ctx, wrapped)
|
||||
err = rf(ctx, wrapped, err)
|
||||
if err != nil {
|
||||
m.reportError("send", wrapped, err)
|
||||
}
|
||||
})
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case m.sendingQueue <- d:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Route puts a message either in the incoming or outgoing queue.
|
||||
// This method is useful for:
|
||||
// 1) routing internal error to callback handlers
|
||||
// 2) testing components without starting remote servers.
|
||||
func (m *MesosMessenger) Route(ctx context.Context, upid *upid.UPID, msg proto.Message) error {
|
||||
if upid == nil {
|
||||
panic("cannot route a message to a nil pid")
|
||||
} else if *upid != m.upid {
|
||||
// if destination is not self, send to outbound.
|
||||
return m.Send(ctx, upid, msg)
|
||||
}
|
||||
|
||||
name := getMessageName(msg)
|
||||
log.V(2).Infof("routing message %q to self", name)
|
||||
|
||||
_, handler, ok := m.messageBinding(name)
|
||||
if !ok {
|
||||
return fmt.Errorf("failed to route message, no message binding for %q", name)
|
||||
}
|
||||
|
||||
// the implication of this is that messages can be delivered to self even if the
|
||||
// messenger has been stopped. is that OK?
|
||||
go handler(upid, msg)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Start starts the messenger; expects to be called once and only once.
|
||||
func (m *MesosMessenger) Start() error {
|
||||
|
||||
m.stop = make(chan struct{})
|
||||
|
||||
pid, errChan := m.tr.Start()
|
||||
if pid == (upid.UPID{}) {
|
||||
err := <-errChan
|
||||
return fmt.Errorf("failed to start messenger: %v", err)
|
||||
}
|
||||
|
||||
// the pid that we're actually bound as
|
||||
m.upid = pid
|
||||
|
||||
go m.sendLoop()
|
||||
go m.decodeLoop()
|
||||
|
||||
// wait for a listener error or a stop signal; either way stop the messenger
|
||||
|
||||
// TODO(jdef) a better implementation would attempt to re-listen; need to coordinate
|
||||
// access to m.upid in that case. probably better off with a state machine instead of
|
||||
// what we have now.
|
||||
go func() {
|
||||
select {
|
||||
case err := <-errChan:
|
||||
if err != nil {
|
||||
//TODO(jdef) should the driver abort in this case? probably
|
||||
//since this messenger will never attempt to re-establish the
|
||||
//transport
|
||||
log.Errorln("transport stopped unexpectedly:", err.Error())
|
||||
}
|
||||
err = m.Stop()
|
||||
if err != nil && err != errTerminal {
|
||||
log.Errorln("failed to stop messenger cleanly: ", err.Error())
|
||||
}
|
||||
case <-m.stop:
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop stops the messenger and clean up all the goroutines.
|
||||
func (m *MesosMessenger) Stop() (err error) {
|
||||
m.stopOnce.Do(func() {
|
||||
select {
|
||||
case <-m.stop:
|
||||
default:
|
||||
defer close(m.stop)
|
||||
}
|
||||
|
||||
log.Infof("stopping messenger %v..", m.upid)
|
||||
|
||||
//TODO(jdef) don't hardcode the graceful flag here
|
||||
if err2 := m.tr.Stop(true); err2 != nil && err2 != errTerminal {
|
||||
log.Warningf("failed to stop the transporter: %v\n", err2)
|
||||
err = err2
|
||||
}
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// UPID returns the upid of the messenger.
|
||||
func (m *MesosMessenger) UPID() upid.UPID {
|
||||
return m.upid
|
||||
}
|
||||
|
||||
func (m *MesosMessenger) reportError(action string, msg *Message, err error) {
|
||||
// log message transmission errors but don't shoot the messenger.
|
||||
// this approach essentially drops all undelivered messages on the floor.
|
||||
name := ""
|
||||
if msg != nil {
|
||||
name = msg.Name
|
||||
}
|
||||
log.Errorf("failed to %s message %q: %+v", action, name, err)
|
||||
}
|
||||
|
||||
func (m *MesosMessenger) sendLoop() {
|
||||
for {
|
||||
select {
|
||||
case <-m.stop:
|
||||
return
|
||||
case f := <-m.sendingQueue:
|
||||
f(errorHandlerFunc(func(ctx context.Context, msg *Message, err error) error {
|
||||
if _, ok := err.(*networkError); ok {
|
||||
// if transport reports a network error, then
|
||||
// we're probably disconnected from the remote process?
|
||||
pid := msg.UPID.String()
|
||||
neterr := &mesos.InternalNetworkError{Pid: &pid}
|
||||
sessionID, ok := sessionid.FromContext(ctx)
|
||||
if ok {
|
||||
neterr.Session = &sessionID
|
||||
}
|
||||
log.V(1).Infof("routing network error for pid %q session %q", pid, sessionID)
|
||||
err2 := m.Route(ctx, &m.upid, neterr)
|
||||
if err2 != nil {
|
||||
log.Error(err2)
|
||||
} else {
|
||||
log.V(1).Infof("swallowing raw error because we're reporting a networkError: %v", err)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return err
|
||||
}))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Since HTTPTransporter.Recv() is already buffered, so we don't need a 'recvLoop' here.
|
||||
func (m *MesosMessenger) decodeLoop() {
|
||||
for {
|
||||
select {
|
||||
case <-m.stop:
|
||||
return
|
||||
default:
|
||||
}
|
||||
msg, err := m.tr.Recv()
|
||||
if err != nil {
|
||||
if err == discardOnStopError {
|
||||
log.V(1).Info("exiting decodeLoop, transport shutting down")
|
||||
return
|
||||
} else {
|
||||
panic(fmt.Sprintf("unexpected transport error: %v", err))
|
||||
}
|
||||
}
|
||||
|
||||
log.V(2).Infof("Receiving message %v from %v\n", msg.Name, msg.UPID)
|
||||
protoMessage, handler, found := m.messageBinding(msg.Name)
|
||||
if !found {
|
||||
log.Warningf("no message binding for message %q", msg.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
msg.ProtoMessage = protoMessage
|
||||
if err := proto.Unmarshal(msg.Bytes, msg.ProtoMessage); err != nil {
|
||||
log.Errorf("Failed to unmarshal message %v: %v\n", msg, err)
|
||||
continue
|
||||
}
|
||||
|
||||
handler(msg.UPID, msg.ProtoMessage)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MesosMessenger) messageBinding(name string) (proto.Message, MessageHandler, bool) {
|
||||
m.guardHandlers.RLock()
|
||||
defer m.guardHandlers.RUnlock()
|
||||
|
||||
gotype, ok := m.installedMessages[name]
|
||||
if !ok {
|
||||
return nil, nil, false
|
||||
}
|
||||
|
||||
handler, ok := m.installedHandlers[name]
|
||||
if !ok {
|
||||
return nil, nil, false
|
||||
}
|
||||
|
||||
protoMessage := reflect.New(gotype).Interface().(proto.Message)
|
||||
return protoMessage, handler, true
|
||||
}
|
||||
|
||||
// getMessageName returns the name of the message in the mesos manner.
|
||||
func getMessageName(msg proto.Message) string {
|
||||
var msgName string
|
||||
|
||||
switch msg := msg.(type) {
|
||||
case *scheduler.Call:
|
||||
msgName = "scheduler"
|
||||
default:
|
||||
msgName = fmt.Sprintf("%v.%v", "mesos.internal", reflect.TypeOf(msg).Elem().Name())
|
||||
}
|
||||
|
||||
return msgName
|
||||
}
|
Reference in a new issue