/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package messenger import ( "fmt" "net" "reflect" "strconv" "sync" "github.com/gogo/protobuf/proto" log "github.com/golang/glog" mesos "github.com/mesos/mesos-go/api/v0/mesosproto" "github.com/mesos/mesos-go/api/v0/mesosproto/scheduler" "github.com/mesos/mesos-go/api/v0/mesosutil/process" "github.com/mesos/mesos-go/api/v0/messenger/sessionid" "github.com/mesos/mesos-go/api/v0/upid" "golang.org/x/net/context" ) const ( defaultQueueSize = 1024 ) // MessageHandler is the callback of the message. When the callback // is invoked, the sender's upid and the message is passed to the callback. type MessageHandler func(from *upid.UPID, pbMsg proto.Message) // Messenger defines the interfaces that should be implemented. type Messenger interface { Install(handler MessageHandler, msg proto.Message) error Send(ctx context.Context, upid *upid.UPID, msg proto.Message) error Route(ctx context.Context, from *upid.UPID, msg proto.Message) error Start() error Stop() error UPID() upid.UPID } type errorHandlerFunc func(context.Context, *Message, error) error type dispatchFunc func(errorHandlerFunc) // MesosMessenger is an implementation of the Messenger interface. type MesosMessenger struct { upid upid.UPID sendingQueue chan dispatchFunc installedMessages map[string]reflect.Type installedHandlers map[string]MessageHandler stop chan struct{} stopOnce sync.Once tr Transporter guardHandlers sync.RWMutex // protect simultaneous changes to messages/handlers maps } // ForHostname creates a new default messenger (HTTP), using UPIDBindingAddress to // determine the binding-address used for both the UPID.Host and Transport binding address. func ForHostname(proc *process.Process, hostname string, bindingAddress net.IP, port uint16, publishedAddress net.IP) (Messenger, error) { upid := upid.UPID{ ID: proc.Label(), Port: strconv.Itoa(int(port)), } host, err := UPIDBindingAddress(hostname, bindingAddress) if err != nil { return nil, err } var publishedHost string if publishedAddress != nil { publishedHost, err = UPIDBindingAddress(hostname, publishedAddress) if err != nil { return nil, err } } if publishedHost != "" { upid.Host = publishedHost } else { upid.Host = host } return NewHttpWithBindingAddress(upid, bindingAddress), nil } // UPIDBindingAddress determines the value of UPID.Host that will be used to build // a Transport. If a non-nil, non-wildcard bindingAddress is specified then it will be used // for both the UPID and Transport binding address. Otherwise hostname is resolved to an IP // address and the UPID.Host is set to that address and the bindingAddress is passed through // to the Transport. func UPIDBindingAddress(hostname string, bindingAddress net.IP) (string, error) { upidHost := "" if bindingAddress != nil && "0.0.0.0" != bindingAddress.String() { upidHost = bindingAddress.String() } else { if hostname == "" || hostname == "0.0.0.0" { return "", fmt.Errorf("invalid hostname (%q) specified with binding address %v", hostname, bindingAddress) } ip := net.ParseIP(hostname) if ip != nil { ip = ip.To4() } if ip == nil { ips, err := net.LookupIP(hostname) if err != nil { return "", err } // try to find an ipv4 and use that for _, addr := range ips { if ip = addr.To4(); ip != nil { break } } if ip == nil { // no ipv4? best guess, just take the first addr if len(ips) > 0 { ip = ips[0] log.Warningf("failed to find an IPv4 address for '%v', best guess is '%v'", hostname, ip) } else { return "", fmt.Errorf("failed to determine IP address for host '%v'", hostname) } } } upidHost = ip.String() } return upidHost, nil } // NewMesosMessenger creates a new mesos messenger. func NewHttp(upid upid.UPID, opts ...httpOpt) *MesosMessenger { return NewHttpWithBindingAddress(upid, nil, opts...) } func NewHttpWithBindingAddress(upid upid.UPID, address net.IP, opts ...httpOpt) *MesosMessenger { return New(NewHTTPTransporter(upid, address, opts...)) } func New(t Transporter) *MesosMessenger { return &MesosMessenger{ sendingQueue: make(chan dispatchFunc, defaultQueueSize), installedMessages: make(map[string]reflect.Type), installedHandlers: make(map[string]MessageHandler), tr: t, } } /// Install installs the handler with the given message. func (m *MesosMessenger) Install(handler MessageHandler, msg proto.Message) error { // Check if the message is a pointer. mtype := reflect.TypeOf(msg) if mtype.Kind() != reflect.Ptr { return fmt.Errorf("Message %v is not a Ptr type", msg) } // Check if the message is already installed. name := getMessageName(msg) if _, ok := m.installedMessages[name]; ok { return fmt.Errorf("Message %v is already installed", name) } m.guardHandlers.Lock() defer m.guardHandlers.Unlock() m.installedMessages[name] = mtype.Elem() m.installedHandlers[name] = handler m.tr.Install(name) return nil } // Send puts a message into the outgoing queue, waiting to be sent. // With buffered channels, this will not block under moderate throughput. // When an error is generated, the error can be communicated by placing // a message on the incoming queue to be handled upstream. func (m *MesosMessenger) Send(ctx context.Context, upid *upid.UPID, msg proto.Message) error { if upid == nil { panic("cannot sent a message to a nil pid") } else if *upid == m.upid { return fmt.Errorf("Send the message to self") } b, err := proto.Marshal(msg) if err != nil { return err } name := getMessageName(msg) log.V(2).Infof("Sending message %v to %v\n", name, upid) wrapped := &Message{upid, name, msg, b} d := dispatchFunc(func(rf errorHandlerFunc) { err := m.tr.Send(ctx, wrapped) err = rf(ctx, wrapped, err) if err != nil { m.reportError("send", wrapped, err) } }) select { case <-ctx.Done(): return ctx.Err() case m.sendingQueue <- d: return nil } } // Route puts a message either in the incoming or outgoing queue. // This method is useful for: // 1) routing internal error to callback handlers // 2) testing components without starting remote servers. func (m *MesosMessenger) Route(ctx context.Context, upid *upid.UPID, msg proto.Message) error { if upid == nil { panic("cannot route a message to a nil pid") } else if *upid != m.upid { // if destination is not self, send to outbound. return m.Send(ctx, upid, msg) } name := getMessageName(msg) log.V(2).Infof("routing message %q to self", name) _, handler, ok := m.messageBinding(name) if !ok { return fmt.Errorf("failed to route message, no message binding for %q", name) } // the implication of this is that messages can be delivered to self even if the // messenger has been stopped. is that OK? go handler(upid, msg) return nil } // Start starts the messenger; expects to be called once and only once. func (m *MesosMessenger) Start() error { m.stop = make(chan struct{}) pid, errChan := m.tr.Start() if pid == (upid.UPID{}) { err := <-errChan return fmt.Errorf("failed to start messenger: %v", err) } // the pid that we're actually bound as m.upid = pid go m.sendLoop() go m.decodeLoop() // wait for a listener error or a stop signal; either way stop the messenger // TODO(jdef) a better implementation would attempt to re-listen; need to coordinate // access to m.upid in that case. probably better off with a state machine instead of // what we have now. go func() { select { case err := <-errChan: if err != nil { //TODO(jdef) should the driver abort in this case? probably //since this messenger will never attempt to re-establish the //transport log.Errorln("transport stopped unexpectedly:", err.Error()) } err = m.Stop() if err != nil && err != errTerminal { log.Errorln("failed to stop messenger cleanly: ", err.Error()) } case <-m.stop: } }() return nil } // Stop stops the messenger and clean up all the goroutines. func (m *MesosMessenger) Stop() (err error) { m.stopOnce.Do(func() { select { case <-m.stop: default: defer close(m.stop) } log.Infof("stopping messenger %v..", m.upid) //TODO(jdef) don't hardcode the graceful flag here if err2 := m.tr.Stop(true); err2 != nil && err2 != errTerminal { log.Warningf("failed to stop the transporter: %v\n", err2) err = err2 } }) return } // UPID returns the upid of the messenger. func (m *MesosMessenger) UPID() upid.UPID { return m.upid } func (m *MesosMessenger) reportError(action string, msg *Message, err error) { // log message transmission errors but don't shoot the messenger. // this approach essentially drops all undelivered messages on the floor. name := "" if msg != nil { name = msg.Name } log.Errorf("failed to %s message %q: %+v", action, name, err) } func (m *MesosMessenger) sendLoop() { for { select { case <-m.stop: return case f := <-m.sendingQueue: f(errorHandlerFunc(func(ctx context.Context, msg *Message, err error) error { if _, ok := err.(*networkError); ok { // if transport reports a network error, then // we're probably disconnected from the remote process? pid := msg.UPID.String() neterr := &mesos.InternalNetworkError{Pid: &pid} sessionID, ok := sessionid.FromContext(ctx) if ok { neterr.Session = &sessionID } log.V(1).Infof("routing network error for pid %q session %q", pid, sessionID) err2 := m.Route(ctx, &m.upid, neterr) if err2 != nil { log.Error(err2) } else { log.V(1).Infof("swallowing raw error because we're reporting a networkError: %v", err) return nil } } return err })) } } } // Since HTTPTransporter.Recv() is already buffered, so we don't need a 'recvLoop' here. func (m *MesosMessenger) decodeLoop() { for { select { case <-m.stop: return default: } msg, err := m.tr.Recv() if err != nil { if err == discardOnStopError { log.V(1).Info("exiting decodeLoop, transport shutting down") return } else { panic(fmt.Sprintf("unexpected transport error: %v", err)) } } log.V(2).Infof("Receiving message %v from %v\n", msg.Name, msg.UPID) protoMessage, handler, found := m.messageBinding(msg.Name) if !found { log.Warningf("no message binding for message %q", msg.Name) continue } msg.ProtoMessage = protoMessage if err := proto.Unmarshal(msg.Bytes, msg.ProtoMessage); err != nil { log.Errorf("Failed to unmarshal message %v: %v\n", msg, err) continue } handler(msg.UPID, msg.ProtoMessage) } } func (m *MesosMessenger) messageBinding(name string) (proto.Message, MessageHandler, bool) { m.guardHandlers.RLock() defer m.guardHandlers.RUnlock() gotype, ok := m.installedMessages[name] if !ok { return nil, nil, false } handler, ok := m.installedHandlers[name] if !ok { return nil, nil, false } protoMessage := reflect.New(gotype).Interface().(proto.Message) return protoMessage, handler, true } // getMessageName returns the name of the message in the mesos manner. func getMessageName(msg proto.Message) string { var msgName string switch msg := msg.(type) { case *scheduler.Call: msgName = "scheduler" default: msgName = fmt.Sprintf("%v.%v", "mesos.internal", reflect.TypeOf(msg).Elem().Name()) } return msgName }