638 lines
17 KiB
Go
638 lines
17 KiB
Go
package messenger
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"errors"
|
|
"io"
|
|
"net"
|
|
"net/http"
|
|
"net/textproto"
|
|
"net/url"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"syscall"
|
|
"time"
|
|
|
|
log "github.com/golang/glog"
|
|
)
|
|
|
|
const (
|
|
// writeFlushPeriod is the amount of time we're willing to wait for a single
|
|
// response buffer to be fully written to the underlying TCP connection; after
|
|
// this amount of time the remaining bytes of the response are discarded. see
|
|
// responseWriter().
|
|
writeFlushPeriod = 30 * time.Second
|
|
)
|
|
|
|
type decoderID int32
|
|
|
|
func (did decoderID) String() string {
|
|
return "[" + strconv.Itoa(int(did)) + "]"
|
|
}
|
|
|
|
func (did *decoderID) next() decoderID {
|
|
return decoderID(atomic.AddInt32((*int32)(did), 1))
|
|
}
|
|
|
|
var (
|
|
errHijackFailed = errors.New("failed to hijack http connection")
|
|
did decoderID // decoder ID counter
|
|
)
|
|
|
|
type Decoder interface {
|
|
Requests() <-chan *Request
|
|
Err() <-chan error
|
|
Cancel(bool)
|
|
}
|
|
|
|
type Request struct {
|
|
*http.Request
|
|
response chan<- Response // callers that are finished with a Request should ensure that response is *always* closed, regardless of whether a Response has been written.
|
|
}
|
|
|
|
type Response struct {
|
|
code int
|
|
reason string
|
|
}
|
|
|
|
type httpDecoder struct {
|
|
req *http.Request // original request
|
|
kalive bool // keepalive
|
|
chunked bool // chunked
|
|
msg chan *Request
|
|
con net.Conn
|
|
rw *bufio.ReadWriter
|
|
errCh chan error
|
|
buf *bytes.Buffer
|
|
lrc *io.LimitedReader
|
|
shouldQuit chan struct{} // signal chan, closes upon calls to Cancel(...)
|
|
forceQuit chan struct{} // signal chan, indicates that quit is NOT graceful; closes upon Cancel(false)
|
|
cancelGuard sync.Mutex
|
|
readTimeout time.Duration
|
|
writeTimeout time.Duration
|
|
idtag string // useful for debugging
|
|
sendError func(err error) // abstraction for error handling
|
|
outCh chan *bytes.Buffer // chan of responses to be written to the connection
|
|
}
|
|
|
|
// DecodeHTTP hijacks an HTTP server connection and generates mesos libprocess HTTP
|
|
// requests via the returned chan. Upon generation of an error in the error chan the
|
|
// decoder's internal goroutine will terminate. This func returns immediately.
|
|
// The caller should immediately *stop* using the ResponseWriter and Request that were
|
|
// passed as parameters; the decoder assumes full control of the HTTP transport.
|
|
func DecodeHTTP(w http.ResponseWriter, r *http.Request) Decoder {
|
|
id := did.next()
|
|
d := &httpDecoder{
|
|
msg: make(chan *Request),
|
|
errCh: make(chan error, 1),
|
|
req: r,
|
|
shouldQuit: make(chan struct{}),
|
|
forceQuit: make(chan struct{}),
|
|
readTimeout: ReadTimeout,
|
|
writeTimeout: WriteTimeout,
|
|
idtag: id.String(),
|
|
outCh: make(chan *bytes.Buffer),
|
|
}
|
|
d.sendError = d.defaultSendError
|
|
go d.run(w)
|
|
return d
|
|
}
|
|
|
|
func (d *httpDecoder) Requests() <-chan *Request {
|
|
return d.msg
|
|
}
|
|
|
|
func (d *httpDecoder) Err() <-chan error {
|
|
return d.errCh
|
|
}
|
|
|
|
// Cancel the decoding process; if graceful then process pending responses before terminating
|
|
func (d *httpDecoder) Cancel(graceful bool) {
|
|
log.V(2).Infof("%scancel:%t", d.idtag, graceful)
|
|
d.cancelGuard.Lock()
|
|
defer d.cancelGuard.Unlock()
|
|
select {
|
|
case <-d.shouldQuit:
|
|
// already quitting, but perhaps gracefully?
|
|
default:
|
|
close(d.shouldQuit)
|
|
}
|
|
// allow caller to "upgrade" from a graceful cancel to a forced one
|
|
if !graceful {
|
|
select {
|
|
case <-d.forceQuit:
|
|
// already forcefully quitting
|
|
default:
|
|
close(d.forceQuit) // push it!
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *httpDecoder) run(res http.ResponseWriter) {
|
|
defer func() {
|
|
close(d.outCh) // we're finished generating response objects
|
|
log.V(2).Infoln(d.idtag + "run: terminating")
|
|
}()
|
|
|
|
for state := d.bootstrapState(res); state != nil; {
|
|
next := state(d)
|
|
state = next
|
|
}
|
|
}
|
|
|
|
// tryFlushResponse flushes the response buffer (if not empty); returns true if flush succeeded
|
|
func (d *httpDecoder) tryFlushResponse(out *bytes.Buffer) {
|
|
log.V(2).Infof(d.idtag+"try-flush-responses: %d bytes to flush", out.Len())
|
|
// set a write deadline here so that we don't block for very long.
|
|
err := d.setWriteTimeout()
|
|
if err != nil {
|
|
// this is a problem because if we can't set the timeout then we can't guarantee
|
|
// how long a write op might block for. Log the error and skip this response.
|
|
log.Errorln("failed to set write deadline, aborting response:", err.Error())
|
|
} else {
|
|
_, err = out.WriteTo(d.rw.Writer)
|
|
if err != nil {
|
|
if neterr, ok := err.(net.Error); ok && neterr.Timeout() && out.Len() > 0 {
|
|
// we couldn't fully write before timing out, return rch and hope that
|
|
// we have better luck next time.
|
|
return
|
|
}
|
|
// we don't really know how to deal with other kinds of errors, so
|
|
// log it and skip the rest of the response.
|
|
log.Errorln("failed to write response buffer:", err.Error())
|
|
}
|
|
err = d.rw.Flush()
|
|
if err != nil {
|
|
if neterr, ok := err.(net.Error); ok && neterr.Timeout() && out.Len() > 0 {
|
|
return
|
|
}
|
|
log.Errorln("failed to flush response buffer:", err.Error())
|
|
}
|
|
}
|
|
}
|
|
|
|
// TODO(jdef) make this a func on Response, to write its contents to a *bytes.Buffer
|
|
func (d *httpDecoder) buildResponseEntity(resp *Response) *bytes.Buffer {
|
|
log.V(2).Infoln(d.idtag + "build-response-entity")
|
|
|
|
out := &bytes.Buffer{}
|
|
|
|
// generate new response buffer content and continue; buffer should have
|
|
// at least a response status-line w/ Content-Length: 0
|
|
out.WriteString("HTTP/1.1 ")
|
|
out.WriteString(strconv.Itoa(resp.code))
|
|
out.WriteString(" ")
|
|
out.WriteString(resp.reason)
|
|
out.WriteString(crlf + "Content-Length: 0" + crlf)
|
|
|
|
select {
|
|
case <-d.shouldQuit:
|
|
// this is the last request in the pipeline and we've been told to quit, so
|
|
// indicate that the server will close the connection.
|
|
out.WriteString("Connection: Close" + crlf)
|
|
default:
|
|
}
|
|
out.WriteString(crlf) // this ends the HTTP response entity
|
|
return out
|
|
}
|
|
|
|
// updateForRequest updates the chunked and kalive fields of the decoder to align
|
|
// with the header values of the request
|
|
func (d *httpDecoder) updateForRequest(bootstrapping bool) {
|
|
// check "Transfer-Encoding" for "chunked"
|
|
d.chunked = false
|
|
for _, v := range d.req.Header["Transfer-Encoding"] {
|
|
if v == "chunked" {
|
|
d.chunked = true
|
|
break
|
|
}
|
|
}
|
|
if !d.chunked && d.req.ContentLength < 0 {
|
|
if bootstrapping {
|
|
// strongly suspect that Go's internal net/http lib is stripping
|
|
// the Transfer-Encoding header from the initial request, so this
|
|
// workaround makes a very mesos-specific assumption: an unknown
|
|
// Content-Length indicates a chunked stream.
|
|
d.chunked = true
|
|
} else {
|
|
// via https://tools.ietf.org/html/rfc7230#section-3.3.2
|
|
d.req.ContentLength = 0
|
|
}
|
|
}
|
|
|
|
// check "Connection" for "Keep-Alive"
|
|
d.kalive = d.req.Header.Get("Connection") == "Keep-Alive"
|
|
|
|
log.V(2).Infof(d.idtag+"update-for-request: chunked %v keep-alive %v", d.chunked, d.kalive)
|
|
}
|
|
|
|
func (d *httpDecoder) readBodyContent() httpState {
|
|
log.V(2).Info(d.idtag + "read-body-content")
|
|
if d.chunked {
|
|
d.buf = &bytes.Buffer{}
|
|
return readChunkHeaderState
|
|
} else {
|
|
d.lrc = limit(d.rw.Reader, d.req.ContentLength)
|
|
d.buf = &bytes.Buffer{}
|
|
return readBodyState
|
|
}
|
|
}
|
|
|
|
const http202response = "HTTP/1.1 202 OK\r\nContent-Length: 0\r\n\r\n"
|
|
|
|
func (d *httpDecoder) generateRequest() httpState {
|
|
log.V(2).Infof(d.idtag + "generate-request")
|
|
// send a Request to msg
|
|
b := d.buf.Bytes()
|
|
rch := make(chan Response, 1)
|
|
r := &Request{
|
|
Request: &http.Request{
|
|
Method: d.req.Method,
|
|
URL: d.req.URL,
|
|
Proto: d.req.Proto,
|
|
ProtoMajor: d.req.ProtoMajor,
|
|
ProtoMinor: d.req.ProtoMinor,
|
|
Header: d.req.Header,
|
|
Close: !d.kalive,
|
|
Host: d.req.Host,
|
|
RequestURI: d.req.RequestURI,
|
|
Body: &body{bytes.NewBuffer(b)},
|
|
ContentLength: int64(len(b)),
|
|
},
|
|
response: rch,
|
|
}
|
|
|
|
select {
|
|
case d.msg <- r:
|
|
case <-d.forceQuit:
|
|
return terminateState
|
|
}
|
|
|
|
select {
|
|
case <-d.forceQuit:
|
|
return terminateState
|
|
case resp, ok := <-rch:
|
|
if ok {
|
|
// response required, so build it and ship it
|
|
out := d.buildResponseEntity(&resp)
|
|
select {
|
|
case <-d.forceQuit:
|
|
return terminateState
|
|
case d.outCh <- out:
|
|
}
|
|
}
|
|
}
|
|
|
|
if d.kalive {
|
|
d.req = &http.Request{
|
|
ContentLength: -1,
|
|
Header: make(http.Header),
|
|
}
|
|
return awaitRequestState
|
|
} else {
|
|
return gracefulTerminateState
|
|
}
|
|
}
|
|
|
|
func (d *httpDecoder) defaultSendError(err error) {
|
|
d.errCh <- err
|
|
}
|
|
|
|
type httpState func(d *httpDecoder) httpState
|
|
|
|
// terminateState forcefully shuts down the state machine
|
|
func terminateState(d *httpDecoder) httpState {
|
|
log.V(2).Infoln(d.idtag + "terminate-state")
|
|
// closing these chans tells Decoder users that it's wrapping up
|
|
close(d.msg)
|
|
close(d.errCh)
|
|
|
|
// attempt to forcefully close the connection and signal response handlers that
|
|
// no further responses should be written
|
|
d.Cancel(false)
|
|
|
|
if d.con != nil {
|
|
d.con.Close()
|
|
}
|
|
|
|
// there is no spoon
|
|
return nil
|
|
}
|
|
|
|
func gracefulTerminateState(d *httpDecoder) httpState {
|
|
log.V(2).Infoln(d.idtag + "gracefully-terminate-state")
|
|
// closing these chans tells Decoder users that it's wrapping up
|
|
close(d.msg)
|
|
close(d.errCh)
|
|
|
|
// gracefully terminate the connection; signal that we should flush pending
|
|
// responses before closing the connection.
|
|
d.Cancel(true)
|
|
|
|
return nil
|
|
}
|
|
|
|
func limit(r *bufio.Reader, limit int64) *io.LimitedReader {
|
|
return &io.LimitedReader{
|
|
R: r,
|
|
N: limit,
|
|
}
|
|
}
|
|
|
|
// bootstrapState expects to be called when the standard net/http lib has already
|
|
// read the initial request query line + headers from a connection. the request
|
|
// is ready to be hijacked at this point.
|
|
func (d *httpDecoder) bootstrapState(res http.ResponseWriter) httpState {
|
|
log.V(2).Infoln(d.idtag + "bootstrap-state")
|
|
|
|
d.updateForRequest(true)
|
|
|
|
// hijack
|
|
hj, ok := res.(http.Hijacker)
|
|
if !ok {
|
|
http.Error(res, "server does not support hijack", http.StatusInternalServerError)
|
|
d.sendError(errHijackFailed)
|
|
return terminateState
|
|
}
|
|
c, rw, err := hj.Hijack()
|
|
if err != nil {
|
|
http.Error(res, "failed to hijack the connection", http.StatusInternalServerError)
|
|
d.sendError(errHijackFailed)
|
|
return terminateState
|
|
}
|
|
|
|
d.rw = rw
|
|
d.con = c
|
|
|
|
go d.responseWriter()
|
|
return d.readBodyContent()
|
|
}
|
|
|
|
func (d *httpDecoder) responseWriter() {
|
|
defer func() {
|
|
log.V(3).Infoln(d.idtag + "response-writer: closing connection")
|
|
d.con.Close()
|
|
}()
|
|
for buf := range d.outCh {
|
|
//TODO(jdef) I worry about this busy-looping
|
|
|
|
// write & flush the buffer until there's nothing left in it, or else
|
|
// we exceed the write/flush period.
|
|
now := time.Now()
|
|
for buf.Len() > 0 && time.Since(now) < writeFlushPeriod {
|
|
select {
|
|
case <-d.forceQuit:
|
|
return
|
|
default:
|
|
}
|
|
d.tryFlushResponse(buf)
|
|
}
|
|
if buf.Len() > 0 {
|
|
//TODO(jdef) should we abort the entire connection instead? a partially written
|
|
// response doesn't do anyone any good. That said, real libprocess agents don't
|
|
// really care about the response channel anyway - the entire system is fire and
|
|
// forget. So I've decided to err on the side that we might lose response bytes
|
|
// in favor of completely reading the connection request stream before we terminate.
|
|
log.Errorln(d.idtag + "failed to fully flush output buffer within write-flush period")
|
|
}
|
|
}
|
|
}
|
|
|
|
type body struct {
|
|
*bytes.Buffer
|
|
}
|
|
|
|
func (b *body) Close() error { return nil }
|
|
|
|
// checkTimeoutOrFail tests whether the given error is related to a timeout condition.
|
|
// returns true if the caller should advance to the returned state.
|
|
func (d *httpDecoder) checkTimeoutOrFail(err error, stateContinue httpState) (httpState, bool) {
|
|
if err != nil {
|
|
if neterr, ok := err.(net.Error); ok && neterr.Timeout() {
|
|
select {
|
|
case <-d.forceQuit:
|
|
return terminateState, true
|
|
case <-d.shouldQuit:
|
|
return gracefulTerminateState, true
|
|
default:
|
|
return stateContinue, true
|
|
}
|
|
}
|
|
d.sendError(err)
|
|
return terminateState, true
|
|
}
|
|
return nil, false
|
|
}
|
|
|
|
func (d *httpDecoder) setReadTimeoutOrFail() bool {
|
|
if d.readTimeout > 0 {
|
|
err := d.con.SetReadDeadline(time.Now().Add(d.readTimeout))
|
|
if err != nil {
|
|
d.sendError(err)
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (d *httpDecoder) setWriteTimeout() error {
|
|
if d.writeTimeout > 0 {
|
|
return d.con.SetWriteDeadline(time.Now().Add(d.writeTimeout))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func readChunkHeaderState(d *httpDecoder) httpState {
|
|
log.V(2).Infoln(d.idtag + "read-chunk-header-state")
|
|
tr := textproto.NewReader(d.rw.Reader)
|
|
if !d.setReadTimeoutOrFail() {
|
|
return terminateState
|
|
}
|
|
hexlen, err := tr.ReadLine()
|
|
if next, ok := d.checkTimeoutOrFail(err, readChunkHeaderState); ok {
|
|
return next
|
|
}
|
|
|
|
clen, err := strconv.ParseInt(hexlen, 16, 64)
|
|
if err != nil {
|
|
d.sendError(err)
|
|
return terminateState
|
|
}
|
|
|
|
if clen == 0 {
|
|
return readEndOfChunkStreamState
|
|
}
|
|
|
|
d.lrc = limit(d.rw.Reader, clen)
|
|
return readChunkState
|
|
}
|
|
|
|
func readChunkState(d *httpDecoder) httpState {
|
|
log.V(2).Infoln(d.idtag+"read-chunk-state, bytes remaining:", d.lrc.N)
|
|
if !d.setReadTimeoutOrFail() {
|
|
return terminateState
|
|
}
|
|
_, err := d.buf.ReadFrom(d.lrc)
|
|
if next, ok := d.checkTimeoutOrFail(err, readChunkState); ok {
|
|
return next
|
|
}
|
|
return readEndOfChunkState
|
|
}
|
|
|
|
const crlf = "\r\n"
|
|
|
|
func readEndOfChunkState(d *httpDecoder) httpState {
|
|
log.V(2).Infoln(d.idtag + "read-end-of-chunk-state")
|
|
if !d.setReadTimeoutOrFail() {
|
|
return terminateState
|
|
}
|
|
b, err := d.rw.Reader.Peek(2)
|
|
if len(b) == 2 {
|
|
if string(b) == crlf {
|
|
d.rw.ReadByte()
|
|
d.rw.ReadByte()
|
|
return readChunkHeaderState
|
|
}
|
|
d.sendError(errors.New(d.idtag + "unexpected data at end-of-chunk marker"))
|
|
return terminateState
|
|
}
|
|
// less than two bytes avail
|
|
if next, ok := d.checkTimeoutOrFail(err, readEndOfChunkState); ok {
|
|
return next
|
|
}
|
|
panic("couldn't peek 2 bytes, but didn't get an error?!")
|
|
}
|
|
|
|
func readEndOfChunkStreamState(d *httpDecoder) httpState {
|
|
log.V(2).Infoln(d.idtag + "read-end-of-chunk-stream-state")
|
|
if !d.setReadTimeoutOrFail() {
|
|
return terminateState
|
|
}
|
|
b, err := d.rw.Reader.Peek(2)
|
|
if len(b) == 2 {
|
|
if string(b) == crlf {
|
|
d.rw.ReadByte()
|
|
d.rw.ReadByte()
|
|
return d.generateRequest()
|
|
}
|
|
d.sendError(errors.New(d.idtag + "unexpected data at end-of-chunk marker"))
|
|
return terminateState
|
|
}
|
|
// less than 2 bytes avail
|
|
if next, ok := d.checkTimeoutOrFail(err, readEndOfChunkStreamState); ok {
|
|
return next
|
|
}
|
|
panic("couldn't peek 2 bytes, but didn't get an error?!")
|
|
}
|
|
|
|
func readBodyState(d *httpDecoder) httpState {
|
|
log.V(2).Infof(d.idtag+"read-body-state: %d bytes remaining", d.lrc.N)
|
|
// read remaining bytes into the buffer
|
|
var err error
|
|
if d.lrc.N > 0 {
|
|
if !d.setReadTimeoutOrFail() {
|
|
return terminateState
|
|
}
|
|
_, err = d.buf.ReadFrom(d.lrc)
|
|
}
|
|
if d.lrc.N <= 0 {
|
|
return d.generateRequest()
|
|
}
|
|
if next, ok := d.checkTimeoutOrFail(err, readBodyState); ok {
|
|
return next
|
|
}
|
|
return readBodyState
|
|
}
|
|
|
|
func isGracefulTermSignal(err error) bool {
|
|
if err == io.EOF {
|
|
return true
|
|
}
|
|
if operr, ok := err.(*net.OpError); ok {
|
|
return operr.Op == "read" && err == syscall.ECONNRESET
|
|
}
|
|
return false
|
|
}
|
|
|
|
func awaitRequestState(d *httpDecoder) httpState {
|
|
log.V(2).Infoln(d.idtag + "await-request-state")
|
|
tr := textproto.NewReader(d.rw.Reader)
|
|
if !d.setReadTimeoutOrFail() {
|
|
return terminateState
|
|
}
|
|
requestLine, err := tr.ReadLine()
|
|
if requestLine == "" && isGracefulTermSignal(err) {
|
|
// we're actually expecting this at some point, so don't react poorly
|
|
return gracefulTerminateState
|
|
}
|
|
if next, ok := d.checkTimeoutOrFail(err, awaitRequestState); ok {
|
|
return next
|
|
}
|
|
ss := strings.SplitN(requestLine, " ", 3)
|
|
if len(ss) < 3 {
|
|
if err == io.EOF {
|
|
return gracefulTerminateState
|
|
}
|
|
d.sendError(errors.New(d.idtag + "illegal request line"))
|
|
return terminateState
|
|
}
|
|
r := d.req
|
|
r.Method = ss[0]
|
|
r.RequestURI = ss[1]
|
|
r.URL, err = url.ParseRequestURI(ss[1])
|
|
if err != nil {
|
|
d.sendError(err)
|
|
return terminateState
|
|
}
|
|
major, minor, ok := http.ParseHTTPVersion(ss[2])
|
|
if !ok {
|
|
d.sendError(errors.New(d.idtag + "malformed HTTP version"))
|
|
return terminateState
|
|
}
|
|
r.ProtoMajor = major
|
|
r.ProtoMinor = minor
|
|
r.Proto = ss[2]
|
|
return readHeaderState
|
|
}
|
|
|
|
func readHeaderState(d *httpDecoder) httpState {
|
|
log.V(2).Infoln(d.idtag + "read-header-state")
|
|
if !d.setReadTimeoutOrFail() {
|
|
return terminateState
|
|
}
|
|
r := d.req
|
|
tr := textproto.NewReader(d.rw.Reader)
|
|
h, err := tr.ReadMIMEHeader()
|
|
// merge any headers that were read successfully (before a possible error)
|
|
for k, v := range h {
|
|
if rh, exists := r.Header[k]; exists {
|
|
r.Header[k] = append(rh, v...)
|
|
} else {
|
|
r.Header[k] = v
|
|
}
|
|
log.V(2).Infoln(d.idtag+"request header", k, v)
|
|
}
|
|
if next, ok := d.checkTimeoutOrFail(err, readHeaderState); ok {
|
|
return next
|
|
}
|
|
|
|
// special headers: Host, Content-Length, Transfer-Encoding
|
|
r.Host = r.Header.Get("Host")
|
|
r.TransferEncoding = r.Header["Transfer-Encoding"]
|
|
if cl := r.Header.Get("Content-Length"); cl != "" {
|
|
l, err := strconv.ParseInt(cl, 10, 64)
|
|
if err != nil {
|
|
d.sendError(err)
|
|
return terminateState
|
|
}
|
|
if l > -1 {
|
|
r.ContentLength = l
|
|
log.V(2).Infoln(d.idtag+"set content length", r.ContentLength)
|
|
}
|
|
}
|
|
d.updateForRequest(false)
|
|
return d.readBodyContent()
|
|
}
|