summaryrefslogtreecommitdiff
path: root/libgo/go/net/http/transport.go
diff options
context:
space:
mode:
Diffstat (limited to 'libgo/go/net/http/transport.go')
-rw-r--r--libgo/go/net/http/transport.go737
1 files changed, 517 insertions, 220 deletions
diff --git a/libgo/go/net/http/transport.go b/libgo/go/net/http/transport.go
index 70d18646059..41df906cf2d 100644
--- a/libgo/go/net/http/transport.go
+++ b/libgo/go/net/http/transport.go
@@ -36,7 +36,8 @@ var DefaultTransport RoundTripper = &Transport{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
- TLSHandshakeTimeout: 10 * time.Second,
+ TLSHandshakeTimeout: 10 * time.Second,
+ ExpectContinueTimeout: 1 * time.Second,
}
// DefaultMaxIdleConnsPerHost is the default value of Transport's
@@ -45,7 +46,21 @@ const DefaultMaxIdleConnsPerHost = 2
// Transport is an implementation of RoundTripper that supports HTTP,
// HTTPS, and HTTP proxies (for either HTTP or HTTPS with CONNECT).
-// Transport can also cache connections for future re-use.
+//
+// By default, Transport caches connections for future re-use.
+// This may leave many open connections when accessing many hosts.
+// This behavior can be managed using Transport's CloseIdleConnections method
+// and the MaxIdleConnsPerHost and DisableKeepAlives fields.
+//
+// Transports should be reused instead of created as needed.
+// Transports are safe for concurrent use by multiple goroutines.
+//
+// A Transport is a low-level primitive for making HTTP and HTTPS requests.
+// For high-level functionality, such as cookies and redirects, see Client.
+//
+// Transport uses HTTP/1.1 for HTTP URLs and either HTTP/1.1 or HTTP/2
+// for HTTPS URLs, depending on whether the server supports HTTP/2.
+// See the package docs for more about HTTP/2.
type Transport struct {
idleMu sync.Mutex
wantIdle bool // user has requested to close all idle conns
@@ -113,8 +128,49 @@ type Transport struct {
// time does not include the time to read the response body.
ResponseHeaderTimeout time.Duration
+ // ExpectContinueTimeout, if non-zero, specifies the amount of
+ // time to wait for a server's first response headers after fully
+ // writing the request headers if the request has an
+ // "Expect: 100-continue" header. Zero means no timeout.
+ // This time does not include the time to send the request header.
+ ExpectContinueTimeout time.Duration
+
+ // TLSNextProto specifies how the Transport switches to an
+ // alternate protocol (such as HTTP/2) after a TLS NPN/ALPN
+ // protocol negotiation. If Transport dials an TLS connection
+ // with a non-empty protocol name and TLSNextProto contains a
+ // map entry for that key (such as "h2"), then the func is
+ // called with the request's authority (such as "example.com"
+ // or "example.com:1234") and the TLS connection. The function
+ // must return a RoundTripper that then handles the request.
+ // If TLSNextProto is nil, HTTP/2 support is enabled automatically.
+ TLSNextProto map[string]func(authority string, c *tls.Conn) RoundTripper
+
+ // nextProtoOnce guards initialization of TLSNextProto and
+ // h2transport (via onceSetNextProtoDefaults)
+ nextProtoOnce sync.Once
+ h2transport *http2Transport // non-nil if http2 wired up
+
// TODO: tunable on global max cached connections
// TODO: tunable on timeout on cached connections
+ // TODO: tunable on max per-host TCP dials in flight (Issue 13957)
+}
+
+// onceSetNextProtoDefaults initializes TLSNextProto.
+// It must be called via t.nextProtoOnce.Do.
+func (t *Transport) onceSetNextProtoDefaults() {
+ if strings.Contains(os.Getenv("GODEBUG"), "http2client=0") {
+ return
+ }
+ if t.TLSNextProto != nil {
+ return
+ }
+ t2, err := http2configureTransport(t)
+ if err != nil {
+ log.Printf("Error enabling Transport HTTP/2 support: %v", err)
+ } else {
+ t.h2transport = t2
+ }
}
// ProxyFromEnvironment returns the URL of the proxy to use for a
@@ -188,7 +244,8 @@ func (tr *transportRequest) extraHeaders() Header {
//
// For higher-level HTTP client support (such as handling of cookies
// and redirects), see Get, Post, and the Client type.
-func (t *Transport) RoundTrip(req *Request) (resp *Response, err error) {
+func (t *Transport) RoundTrip(req *Request) (*Response, error) {
+ t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
if req.URL == nil {
req.closeBody()
return nil, errors.New("http: nil Request.URL")
@@ -197,54 +254,114 @@ func (t *Transport) RoundTrip(req *Request) (resp *Response, err error) {
req.closeBody()
return nil, errors.New("http: nil Request.Header")
}
- if req.URL.Scheme != "http" && req.URL.Scheme != "https" {
- t.altMu.RLock()
- var rt RoundTripper
- if t.altProto != nil {
- rt = t.altProto[req.URL.Scheme]
- }
- t.altMu.RUnlock()
- if rt == nil {
- req.closeBody()
- return nil, &badStringError{"unsupported protocol scheme", req.URL.Scheme}
+ // TODO(bradfitz): switch to atomic.Value for this map instead of RWMutex
+ t.altMu.RLock()
+ altRT := t.altProto[req.URL.Scheme]
+ t.altMu.RUnlock()
+ if altRT != nil {
+ if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol {
+ return resp, err
}
- return rt.RoundTrip(req)
}
- if req.URL.Host == "" {
+ if s := req.URL.Scheme; s != "http" && s != "https" {
req.closeBody()
- return nil, errors.New("http: no Host in request URL")
+ return nil, &badStringError{"unsupported protocol scheme", s}
}
- treq := &transportRequest{Request: req}
- cm, err := t.connectMethodForRequest(treq)
- if err != nil {
+ if req.Method != "" && !validMethod(req.Method) {
+ return nil, fmt.Errorf("net/http: invalid method %q", req.Method)
+ }
+ if req.URL.Host == "" {
req.closeBody()
- return nil, err
+ return nil, errors.New("http: no Host in request URL")
}
- // Get the cached or newly-created connection to either the
- // host (for http or https), the http proxy, or the http proxy
- // pre-CONNECTed to https server. In any case, we'll be ready
- // to send it requests.
- pconn, err := t.getConn(req, cm)
- if err != nil {
- t.setReqCanceler(req, nil)
- req.closeBody()
- return nil, err
+ for {
+ // treq gets modified by roundTrip, so we need to recreate for each retry.
+ treq := &transportRequest{Request: req}
+ cm, err := t.connectMethodForRequest(treq)
+ if err != nil {
+ req.closeBody()
+ return nil, err
+ }
+
+ // Get the cached or newly-created connection to either the
+ // host (for http or https), the http proxy, or the http proxy
+ // pre-CONNECTed to https server. In any case, we'll be ready
+ // to send it requests.
+ pconn, err := t.getConn(req, cm)
+ if err != nil {
+ t.setReqCanceler(req, nil)
+ req.closeBody()
+ return nil, err
+ }
+
+ var resp *Response
+ if pconn.alt != nil {
+ // HTTP/2 path.
+ t.setReqCanceler(req, nil) // not cancelable with CancelRequest
+ resp, err = pconn.alt.RoundTrip(req)
+ } else {
+ resp, err = pconn.roundTrip(treq)
+ }
+ if err == nil {
+ return resp, nil
+ }
+ if err := checkTransportResend(err, req, pconn); err != nil {
+ return nil, err
+ }
+ testHookRoundTripRetried()
}
+}
- return pconn.roundTrip(treq)
+// checkTransportResend checks whether a failed HTTP request can be
+// resent on a new connection. The non-nil input error is the error from
+// roundTrip, which might be wrapped in a beforeRespHeaderError error.
+//
+// The return value is err or the unwrapped error inside a
+// beforeRespHeaderError.
+func checkTransportResend(err error, req *Request, pconn *persistConn) error {
+ brhErr, ok := err.(beforeRespHeaderError)
+ if !ok {
+ return err
+ }
+ err = brhErr.error // unwrap the custom error in case we return it
+ if err != errMissingHost && pconn.isReused() && req.isReplayable() {
+ // If we try to reuse a connection that the server is in the process of
+ // closing, we may end up successfully writing out our request (or a
+ // portion of our request) only to find a connection error when we try to
+ // read from (or finish writing to) the socket.
+
+ // There can be a race between the socket pool checking whether a socket
+ // is still connected, receiving the FIN, and sending/reading data on a
+ // reused socket. If we receive the FIN between the connectedness check
+ // and writing/reading from the socket, we may first learn the socket is
+ // disconnected when we get a ERR_SOCKET_NOT_CONNECTED. This will most
+ // likely happen when trying to retrieve its IP address. See
+ // http://crbug.com/105824 for more details.
+
+ // We resend a request only if we reused a keep-alive connection and did
+ // not yet receive any header data. This automatically prevents an
+ // infinite resend loop because we'll run out of the cached keep-alive
+ // connections eventually.
+ return nil
+ }
+ return err
}
+// ErrSkipAltProtocol is a sentinel error value defined by Transport.RegisterProtocol.
+var ErrSkipAltProtocol = errors.New("net/http: skip alternate protocol")
+
// RegisterProtocol registers a new protocol with scheme.
// The Transport will pass requests using the given scheme to rt.
// It is rt's responsibility to simulate HTTP request semantics.
//
// RegisterProtocol can be used by other packages to provide
// implementations of protocol schemes like "ftp" or "file".
+//
+// If rt.RoundTrip returns ErrSkipAltProtocol, the Transport will
+// handle the RoundTrip itself for that one request, as if the
+// protocol were not registered.
func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) {
- if scheme == "http" || scheme == "https" {
- panic("protocol " + scheme + " already registered")
- }
t.altMu.Lock()
defer t.altMu.Unlock()
if t.altProto == nil {
@@ -261,6 +378,7 @@ func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) {
// a "keep-alive" state. It does not interrupt any connections currently
// in use.
func (t *Transport) CloseIdleConnections() {
+ t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
t.idleMu.Lock()
m := t.idleConn
t.idleConn = nil
@@ -269,13 +387,19 @@ func (t *Transport) CloseIdleConnections() {
t.idleMu.Unlock()
for _, conns := range m {
for _, pconn := range conns {
- pconn.close()
+ pconn.close(errCloseIdleConns)
}
}
+ if t2 := t.h2transport; t2 != nil {
+ t2.CloseIdleConnections()
+ }
}
// CancelRequest cancels an in-flight request by closing its connection.
// CancelRequest should only be called after RoundTrip has returned.
+//
+// Deprecated: Use Request.Cancel instead. CancelRequest can not cancel
+// HTTP/2 requests.
func (t *Transport) CancelRequest(req *Request) {
t.reqMu.Lock()
cancel := t.reqCanceler[req]
@@ -354,23 +478,41 @@ func (cm *connectMethod) proxyAuth() string {
return ""
}
-// putIdleConn adds pconn to the list of idle persistent connections awaiting
+// error values for debugging and testing, not seen by users.
+var (
+ errKeepAlivesDisabled = errors.New("http: putIdleConn: keep alives disabled")
+ errConnBroken = errors.New("http: putIdleConn: connection is in bad state")
+ errWantIdle = errors.New("http: putIdleConn: CloseIdleConnections was called")
+ errTooManyIdle = errors.New("http: putIdleConn: too many idle connections")
+ errCloseIdleConns = errors.New("http: CloseIdleConnections called")
+ errReadLoopExiting = errors.New("http: persistConn.readLoop exiting")
+ errServerClosedIdle = errors.New("http: server closed idle conn")
+)
+
+func (t *Transport) putOrCloseIdleConn(pconn *persistConn) {
+ if err := t.tryPutIdleConn(pconn); err != nil {
+ pconn.close(err)
+ }
+}
+
+// tryPutIdleConn adds pconn to the list of idle persistent connections awaiting
// a new request.
-// If pconn is no longer needed or not in a good state, putIdleConn
-// returns false.
-func (t *Transport) putIdleConn(pconn *persistConn) bool {
+// If pconn is no longer needed or not in a good state, tryPutIdleConn returns
+// an error explaining why it wasn't registered.
+// tryPutIdleConn does not close pconn. Use putOrCloseIdleConn instead for that.
+func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
- pconn.close()
- return false
+ return errKeepAlivesDisabled
}
if pconn.isBroken() {
- return false
+ return errConnBroken
}
key := pconn.cacheKey
max := t.MaxIdleConnsPerHost
if max == 0 {
max = DefaultMaxIdleConnsPerHost
}
+ pconn.markReused()
t.idleMu.Lock()
waitingDialer := t.idleConnCh[key]
@@ -382,7 +524,7 @@ func (t *Transport) putIdleConn(pconn *persistConn) bool {
// first). Chrome calls this socket late binding. See
// https://insouciant.org/tech/connection-management-in-chromium/
t.idleMu.Unlock()
- return true
+ return nil
default:
if waitingDialer != nil {
// They had populated this, but their dial won
@@ -392,16 +534,14 @@ func (t *Transport) putIdleConn(pconn *persistConn) bool {
}
if t.wantIdle {
t.idleMu.Unlock()
- pconn.close()
- return false
+ return errWantIdle
}
if t.idleConn == nil {
t.idleConn = make(map[connectMethodKey][]*persistConn)
}
if len(t.idleConn[key]) >= max {
t.idleMu.Unlock()
- pconn.close()
- return false
+ return errTooManyIdle
}
for _, exist := range t.idleConn[key] {
if exist == pconn {
@@ -410,7 +550,7 @@ func (t *Transport) putIdleConn(pconn *persistConn) bool {
}
t.idleConn[key] = append(t.idleConn[key], pconn)
t.idleMu.Unlock()
- return true
+ return nil
}
// getIdleConnCh returns a channel to receive and return idle
@@ -494,16 +634,17 @@ func (t *Transport) replaceReqCanceler(r *Request, fn func()) bool {
return true
}
-func (t *Transport) dial(network, addr string) (c net.Conn, err error) {
+func (t *Transport) dial(network, addr string) (net.Conn, error) {
if t.Dial != nil {
- return t.Dial(network, addr)
+ c, err := t.Dial(network, addr)
+ if c == nil && err == nil {
+ err = errors.New("net/http: Transport.Dial hook returned (nil, nil)")
+ }
+ return c, err
}
return net.Dial(network, addr)
}
-// Testing hooks:
-var prePendingDial, postPendingDial func()
-
// getConn dials and creates a new persistConn to the target as
// specified in the connectMethod. This includes doing a proxy CONNECT
// and/or setting up TLS. If this doesn't return an error, the persistConn
@@ -525,20 +666,16 @@ func (t *Transport) getConn(req *Request, cm connectMethod) (*persistConn, error
// Copy these hooks so we don't race on the postPendingDial in
// the goroutine we launch. Issue 11136.
- prePendingDial := prePendingDial
- postPendingDial := postPendingDial
+ testHookPrePendingDial := testHookPrePendingDial
+ testHookPostPendingDial := testHookPostPendingDial
handlePendingDial := func() {
- if prePendingDial != nil {
- prePendingDial()
- }
+ testHookPrePendingDial()
go func() {
if v := <-dialc; v.err == nil {
- t.putIdleConn(v.pc)
- }
- if postPendingDial != nil {
- postPendingDial()
+ t.putOrCloseIdleConn(v.pc)
}
+ testHookPostPendingDial()
}()
}
@@ -565,10 +702,10 @@ func (t *Transport) getConn(req *Request, cm connectMethod) (*persistConn, error
return pc, nil
case <-req.Cancel:
handlePendingDial()
- return nil, errors.New("net/http: request canceled while waiting for connection")
+ return nil, errRequestCanceledConn
case <-cancelc:
handlePendingDial()
- return nil, errors.New("net/http: request canceled while waiting for connection")
+ return nil, errRequestCanceledConn
}
}
@@ -588,7 +725,16 @@ func (t *Transport) dialConn(cm connectMethod) (*persistConn, error) {
if err != nil {
return nil, err
}
+ if pconn.conn == nil {
+ return nil, errors.New("net/http: Transport.DialTLS returned (nil, nil)")
+ }
if tc, ok := pconn.conn.(*tls.Conn); ok {
+ // Handshake here, in case DialTLS didn't. TLSNextProto below
+ // depends on it for knowing the connection state.
+ if err := tc.Handshake(); err != nil {
+ go pconn.conn.Close()
+ return nil, err
+ }
cs := tc.ConnectionState()
pconn.tlsState = &cs
}
@@ -680,6 +826,12 @@ func (t *Transport) dialConn(cm connectMethod) (*persistConn, error) {
pconn.conn = tlsConn
}
+ if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
+ if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok {
+ return &persistConn{alt: next(cm.targetAddr, pconn.conn.(*tls.Conn))}, nil
+ }
+ }
+
pconn.br = bufio.NewReader(noteEOFReader{pconn.conn, &pconn.sawEOF})
pconn.bw = bufio.NewWriter(pconn.conn)
go pconn.readLoop()
@@ -809,6 +961,11 @@ func (k connectMethodKey) String() string {
// persistConn wraps a connection, usually a persistent one
// (but may be used for non-keep-alive requests as well)
type persistConn struct {
+ // alt optionally specifies the TLS NextProto RoundTripper.
+ // This is used for HTTP/2 today and future protocol laters.
+ // If it's non-nil, the rest of the fields are unused.
+ alt RoundTripper
+
t *Transport
cacheKey connectMethodKey
conn net.Conn
@@ -828,9 +985,10 @@ type persistConn struct {
lk sync.Mutex // guards following fields
numExpectedResponses int
- closed bool // whether conn has been closed
- broken bool // an error has happened on this connection; marked broken so it's not reused.
- canceled bool // whether this conn was broken due a CancelRequest
+ closed error // set non-nil when conn is closed, before closech is closed
+ broken bool // an error has happened on this connection; marked broken so it's not reused.
+ canceled bool // whether this conn was broken due a CancelRequest
+ reused bool // whether conn has had successful request/response and is being reused.
// mutateHeaderFunc is an optional func to modify extra
// headers on each outbound request before it's written. (the
// original Request given to RoundTrip is not modified)
@@ -852,15 +1010,34 @@ func (pc *persistConn) isCanceled() bool {
return pc.canceled
}
+// isReused reports whether this connection is in a known broken state.
+func (pc *persistConn) isReused() bool {
+ pc.lk.Lock()
+ r := pc.reused
+ pc.lk.Unlock()
+ return r
+}
+
func (pc *persistConn) cancelRequest() {
pc.lk.Lock()
defer pc.lk.Unlock()
pc.canceled = true
- pc.closeLocked()
+ pc.closeLocked(errRequestCanceled)
}
func (pc *persistConn) readLoop() {
- // eofc is used to block http.Handler goroutines reading from Response.Body
+ closeErr := errReadLoopExiting // default value, if not changed below
+ defer func() { pc.close(closeErr) }()
+
+ tryPutIdleConn := func() bool {
+ if err := pc.t.tryPutIdleConn(pc); err != nil {
+ closeErr = err
+ return false
+ }
+ return true
+ }
+
+ // eofc is used to block caller goroutines reading from Response.Body
// at EOF until this goroutines has (potentially) added the connection
// back to the idle pool.
eofc := make(chan struct{})
@@ -873,17 +1050,14 @@ func (pc *persistConn) readLoop() {
alive := true
for alive {
- pb, err := pc.br.Peek(1)
+ _, err := pc.br.Peek(1)
+ if err != nil {
+ err = beforeRespHeaderError{err}
+ }
pc.lk.Lock()
if pc.numExpectedResponses == 0 {
- if !pc.closed {
- pc.closeLocked()
- if len(pb) > 0 {
- log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v",
- string(pb), err)
- }
- }
+ pc.readLoopPeekFailLocked(err)
pc.lk.Unlock()
return
}
@@ -893,115 +1067,189 @@ func (pc *persistConn) readLoop() {
var resp *Response
if err == nil {
- resp, err = ReadResponse(pc.br, rc.req)
- if err == nil && resp.StatusCode == 100 {
- // Skip any 100-continue for now.
- // TODO(bradfitz): if rc.req had "Expect: 100-continue",
- // actually block the request body write and signal the
- // writeLoop now to begin sending it. (Issue 2184) For now we
- // eat it, since we're never expecting one.
- resp, err = ReadResponse(pc.br, rc.req)
- }
- }
-
- if resp != nil {
- resp.TLS = pc.tlsState
+ resp, err = pc.readResponse(rc)
}
- hasBody := resp != nil && rc.req.Method != "HEAD" && resp.ContentLength != 0
-
if err != nil {
- pc.close()
- } else {
- if rc.addedGzip && hasBody && resp.Header.Get("Content-Encoding") == "gzip" {
- resp.Header.Del("Content-Encoding")
- resp.Header.Del("Content-Length")
- resp.ContentLength = -1
- resp.Body = &gzipReader{body: resp.Body}
+ // If we won't be able to retry this request later (from the
+ // roundTrip goroutine), mark it as done now.
+ // BEFORE the send on rc.ch, as the client might re-use the
+ // same *Request pointer, and we don't want to set call
+ // t.setReqCanceler from this persistConn while the Transport
+ // potentially spins up a different persistConn for the
+ // caller's subsequent request.
+ if checkTransportResend(err, rc.req, pc) != nil {
+ pc.t.setReqCanceler(rc.req, nil)
+ }
+ select {
+ case rc.ch <- responseAndError{err: err}:
+ case <-rc.callerGone:
+ return
}
- resp.Body = &bodyEOFSignal{body: resp.Body}
+ return
}
- if err != nil || resp.Close || rc.req.Close || resp.StatusCode <= 199 {
+ pc.lk.Lock()
+ pc.numExpectedResponses--
+ pc.lk.Unlock()
+
+ hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0
+
+ if resp.Close || rc.req.Close || resp.StatusCode <= 199 {
// Don't do keep-alive on error if either party requested a close
// or we get an unexpected informational (1xx) response.
// StatusCode 100 is already handled above.
alive = false
}
- var waitForBodyRead chan bool // channel is nil when there's no body
- if hasBody {
- waitForBodyRead = make(chan bool, 2)
- resp.Body.(*bodyEOFSignal).earlyCloseFn = func() error {
- waitForBodyRead <- false
- return nil
- }
- resp.Body.(*bodyEOFSignal).fn = func(err error) error {
- isEOF := err == io.EOF
- waitForBodyRead <- isEOF
- if isEOF {
- <-eofc // see comment at top
- } else if err != nil && pc.isCanceled() {
- return errRequestCanceled
- }
- return err
- }
- } else {
- // Before send on rc.ch, as client might re-use the
- // same *Request pointer, and we don't want to set this
- // on t from this persistConn while the Transport
- // potentially spins up a different persistConn for the
- // caller's subsequent request.
+ if !hasBody {
pc.t.setReqCanceler(rc.req, nil)
- }
- pc.lk.Lock()
- pc.numExpectedResponses--
- pc.lk.Unlock()
+ // Put the idle conn back into the pool before we send the response
+ // so if they process it quickly and make another request, they'll
+ // get this same conn. But we use the unbuffered channel 'rc'
+ // to guarantee that persistConn.roundTrip got out of its select
+ // potentially waiting for this persistConn to close.
+ // but after
+ alive = alive &&
+ !pc.sawEOF &&
+ pc.wroteRequest() &&
+ tryPutIdleConn()
- // The connection might be going away when we put the
- // idleConn below. When that happens, we close the response channel to signal
- // to roundTrip that the connection is gone. roundTrip waits for
- // both closing and a response in a select, so it might choose
- // the close channel, rather than the response.
- // We send the response first so that roundTrip can check
- // if there is a pending one with a non-blocking select
- // on the response channel before erroring out.
- rc.ch <- responseAndError{resp, err}
-
- if hasBody {
- // To avoid a race, wait for the just-returned
- // response body to be fully consumed before peek on
- // the underlying bufio reader.
select {
- case <-rc.req.Cancel:
- alive = false
- pc.t.CancelRequest(rc.req)
- case bodyEOF := <-waitForBodyRead:
- pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool
- alive = alive &&
- bodyEOF &&
- !pc.sawEOF &&
- pc.wroteRequest() &&
- pc.t.putIdleConn(pc)
- if bodyEOF {
- eofc <- struct{}{}
- }
- case <-pc.closech:
- alive = false
+ case rc.ch <- responseAndError{res: resp}:
+ case <-rc.callerGone:
+ return
}
- } else {
+
+ // Now that they've read from the unbuffered channel, they're safely
+ // out of the select that also waits on this goroutine to die, so
+ // we're allowed to exit now if needed (if alive is false)
+ testHookReadLoopBeforeNextRead()
+ continue
+ }
+
+ if rc.addedGzip {
+ maybeUngzipResponse(resp)
+ }
+ resp.Body = &bodyEOFSignal{body: resp.Body}
+
+ waitForBodyRead := make(chan bool, 2)
+ resp.Body.(*bodyEOFSignal).earlyCloseFn = func() error {
+ waitForBodyRead <- false
+ return nil
+ }
+ resp.Body.(*bodyEOFSignal).fn = func(err error) error {
+ isEOF := err == io.EOF
+ waitForBodyRead <- isEOF
+ if isEOF {
+ <-eofc // see comment above eofc declaration
+ } else if err != nil && pc.isCanceled() {
+ return errRequestCanceled
+ }
+ return err
+ }
+
+ select {
+ case rc.ch <- responseAndError{res: resp}:
+ case <-rc.callerGone:
+ return
+ }
+
+ // Before looping back to the top of this function and peeking on
+ // the bufio.Reader, wait for the caller goroutine to finish
+ // reading the response body. (or for cancelation or death)
+ select {
+ case bodyEOF := <-waitForBodyRead:
+ pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool
alive = alive &&
+ bodyEOF &&
!pc.sawEOF &&
pc.wroteRequest() &&
- pc.t.putIdleConn(pc)
+ tryPutIdleConn()
+ if bodyEOF {
+ eofc <- struct{}{}
+ }
+ case <-rc.req.Cancel:
+ alive = false
+ pc.t.CancelRequest(rc.req)
+ case <-pc.closech:
+ alive = false
+ }
+
+ testHookReadLoopBeforeNextRead()
+ }
+}
+
+func maybeUngzipResponse(resp *Response) {
+ if resp.Header.Get("Content-Encoding") == "gzip" {
+ resp.Header.Del("Content-Encoding")
+ resp.Header.Del("Content-Length")
+ resp.ContentLength = -1
+ resp.Body = &gzipReader{body: resp.Body}
+ }
+}
+
+func (pc *persistConn) readLoopPeekFailLocked(peekErr error) {
+ if pc.closed != nil {
+ return
+ }
+ if n := pc.br.Buffered(); n > 0 {
+ buf, _ := pc.br.Peek(n)
+ log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v", buf, peekErr)
+ }
+ if peekErr == io.EOF {
+ // common case.
+ pc.closeLocked(errServerClosedIdle)
+ } else {
+ pc.closeLocked(fmt.Errorf("readLoopPeekFailLocked: %v", peekErr))
+ }
+}
+
+// readResponse reads an HTTP response (or two, in the case of "Expect:
+// 100-continue") from the server. It returns the final non-100 one.
+func (pc *persistConn) readResponse(rc requestAndChan) (resp *Response, err error) {
+ resp, err = ReadResponse(pc.br, rc.req)
+ if err != nil {
+ return
+ }
+ if rc.continueCh != nil {
+ if resp.StatusCode == 100 {
+ rc.continueCh <- struct{}{}
+ } else {
+ close(rc.continueCh)
+ }
+ }
+ if resp.StatusCode == 100 {
+ resp, err = ReadResponse(pc.br, rc.req)
+ if err != nil {
+ return
}
+ }
+ resp.TLS = pc.tlsState
+ return
+}
+
+// waitForContinue returns the function to block until
+// any response, timeout or connection close. After any of them,
+// the function returns a bool which indicates if the body should be sent.
+func (pc *persistConn) waitForContinue(continueCh <-chan struct{}) func() bool {
+ if continueCh == nil {
+ return nil
+ }
+ return func() bool {
+ timer := time.NewTimer(pc.t.ExpectContinueTimeout)
+ defer timer.Stop()
- if hook := testHookReadLoopBeforeNextRead; hook != nil {
- hook()
+ select {
+ case _, ok := <-continueCh:
+ return ok
+ case <-timer.C:
+ return true
+ case <-pc.closech:
+ return false
}
}
- pc.close()
}
func (pc *persistConn) writeLoop() {
@@ -1012,7 +1260,7 @@ func (pc *persistConn) writeLoop() {
wr.ch <- errors.New("http: can't write HTTP request on broken connection")
continue
}
- err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra)
+ err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
if err == nil {
err = pc.bw.Flush()
}
@@ -1056,19 +1304,29 @@ func (pc *persistConn) wroteRequest() bool {
}
}
+// responseAndError is how the goroutine reading from an HTTP/1 server
+// communicates with the goroutine doing the RoundTrip.
type responseAndError struct {
- res *Response
+ res *Response // else use this response (see res method)
err error
}
type requestAndChan struct {
req *Request
- ch chan responseAndError
+ ch chan responseAndError // unbuffered; always send in select on callerGone
// did the Transport (as opposed to the client code) add an
// Accept-Encoding gzip header? only if it we set it do
// we transparently decode the gzip.
addedGzip bool
+
+ // Optional blocking chan for Expect: 100-continue (for send).
+ // If the request has an "Expect: 100-continue" header and
+ // the server responds 100 Continue, readLoop send a value
+ // to writeLoop via this chan.
+ continueCh chan<- struct{}
+
+ callerGone <-chan struct{} // closed when roundTrip caller has returned
}
// A writeRequest is sent by the readLoop's goroutine to the
@@ -1078,6 +1336,11 @@ type requestAndChan struct {
type writeRequest struct {
req *transportRequest
ch chan<- error
+
+ // Optional blocking chan for Expect: 100-continue (for recieve).
+ // If not nil, writeLoop blocks sending request body until
+ // it receives from this chan.
+ continueCh <-chan struct{}
}
type httpError struct {
@@ -1090,23 +1353,34 @@ func (e *httpError) Timeout() bool { return e.timeout }
func (e *httpError) Temporary() bool { return true }
var errTimeout error = &httpError{err: "net/http: timeout awaiting response headers", timeout: true}
-var errClosed error = &httpError{err: "net/http: transport closed before response was received"}
+var errClosed error = &httpError{err: "net/http: server closed connection before response was received"}
var errRequestCanceled = errors.New("net/http: request canceled")
+var errRequestCanceledConn = errors.New("net/http: request canceled while waiting for connection") // TODO: unify?
+
+func nop() {}
-// nil except for tests
+// testHooks. Always non-nil.
var (
- testHookPersistConnClosedGotRes func()
- testHookEnterRoundTrip func()
- testHookMu sync.Locker = fakeLocker{} // guards following
- testHookReadLoopBeforeNextRead func()
+ testHookEnterRoundTrip = nop
+ testHookWaitResLoop = nop
+ testHookRoundTripRetried = nop
+ testHookPrePendingDial = nop
+ testHookPostPendingDial = nop
+
+ testHookMu sync.Locker = fakeLocker{} // guards following
+ testHookReadLoopBeforeNextRead = nop
)
+// beforeRespHeaderError is used to indicate when an IO error has occurred before
+// any header data was received.
+type beforeRespHeaderError struct {
+ error
+}
+
func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
- if hook := testHookEnterRoundTrip; hook != nil {
- hook()
- }
+ testHookEnterRoundTrip()
if !pc.t.replaceReqCanceler(req.Request, pc.cancelRequest) {
- pc.t.putIdleConn(pc)
+ pc.t.putOrCloseIdleConn(pc)
return nil, errRequestCanceled
}
pc.lk.Lock()
@@ -1143,42 +1417,47 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err
req.extraHeaders().Set("Accept-Encoding", "gzip")
}
+ var continueCh chan struct{}
+ if req.ProtoAtLeast(1, 1) && req.Body != nil && req.expectsContinue() {
+ continueCh = make(chan struct{}, 1)
+ }
+
if pc.t.DisableKeepAlives {
req.extraHeaders().Set("Connection", "close")
}
+ gone := make(chan struct{})
+ defer close(gone)
+
// Write the request concurrently with waiting for a response,
// in case the server decides to reply before reading our full
// request body.
writeErrCh := make(chan error, 1)
- pc.writech <- writeRequest{req, writeErrCh}
+ pc.writech <- writeRequest{req, writeErrCh, continueCh}
- resc := make(chan responseAndError, 1)
- pc.reqch <- requestAndChan{req.Request, resc, requestedGzip}
+ resc := make(chan responseAndError)
+ pc.reqch <- requestAndChan{
+ req: req.Request,
+ ch: resc,
+ addedGzip: requestedGzip,
+ continueCh: continueCh,
+ callerGone: gone,
+ }
var re responseAndError
var respHeaderTimer <-chan time.Time
cancelChan := req.Request.Cancel
WaitResponse:
for {
+ testHookWaitResLoop()
select {
case err := <-writeErrCh:
- if isNetWriteError(err) {
- // Issue 11745. If we failed to write the request
- // body, it's possible the server just heard enough
- // and already wrote to us. Prioritize the server's
- // response over returning a body write error.
- select {
- case re = <-resc:
- pc.close()
- break WaitResponse
- case <-time.After(50 * time.Millisecond):
- // Fall through.
- }
- }
if err != nil {
- re = responseAndError{nil, err}
- pc.close()
+ if pc.isCanceled() {
+ err = errRequestCanceled
+ }
+ re = responseAndError{err: beforeRespHeaderError{err}}
+ pc.close(fmt.Errorf("write error: %v", err))
break WaitResponse
}
if d := pc.t.ResponseHeaderTimeout; d > 0 {
@@ -1187,33 +1466,22 @@ WaitResponse:
respHeaderTimer = timer.C
}
case <-pc.closech:
- // The persist connection is dead. This shouldn't
- // usually happen (only with Connection: close responses
- // with no response bodies), but if it does happen it
- // means either a) the remote server hung up on us
- // prematurely, or b) the readLoop sent us a response &
- // closed its closech at roughly the same time, and we
- // selected this case first. If we got a response, readLoop makes sure
- // to send it before it puts the conn and closes the channel.
- // That way, we can fetch the response, if there is one,
- // with a non-blocking receive.
- select {
- case re = <-resc:
- if fn := testHookPersistConnClosedGotRes; fn != nil {
- fn()
- }
- default:
- re = responseAndError{err: errClosed}
- if pc.isCanceled() {
- re = responseAndError{err: errRequestCanceled}
- }
+ var err error
+ if pc.isCanceled() {
+ err = errRequestCanceled
+ } else {
+ err = beforeRespHeaderError{fmt.Errorf("net/http: HTTP/1 transport connection broken: %v", pc.closed)}
}
+ re = responseAndError{err: err}
break WaitResponse
case <-respHeaderTimer:
- pc.close()
+ pc.close(errTimeout)
re = responseAndError{err: errTimeout}
break WaitResponse
case re = <-resc:
+ if re.err != nil && pc.isCanceled() {
+ re.err = errRequestCanceled
+ }
break WaitResponse
case <-cancelChan:
pc.t.CancelRequest(req.Request)
@@ -1224,6 +1492,9 @@ WaitResponse:
if re.err != nil {
pc.t.setReqCanceler(req.Request, nil)
}
+ if (re.res == nil) == (re.err == nil) {
+ panic("internal error: exactly one of res or err should be set")
+ }
return re.res, re.err
}
@@ -1236,18 +1507,44 @@ func (pc *persistConn) markBroken() {
pc.broken = true
}
-func (pc *persistConn) close() {
+// markReused marks this connection as having been successfully used for a
+// request and response.
+func (pc *persistConn) markReused() {
+ pc.lk.Lock()
+ pc.reused = true
+ pc.lk.Unlock()
+}
+
+// close closes the underlying TCP connection and closes
+// the pc.closech channel.
+//
+// The provided err is only for testing and debugging; in normal
+// circumstances it should never be seen by users.
+func (pc *persistConn) close(err error) {
pc.lk.Lock()
defer pc.lk.Unlock()
- pc.closeLocked()
+ pc.closeLocked(err)
}
-func (pc *persistConn) closeLocked() {
+func (pc *persistConn) closeLocked(err error) {
+ if err == nil {
+ panic("nil error")
+ }
pc.broken = true
- if !pc.closed {
- pc.conn.Close()
- pc.closed = true
- close(pc.closech)
+ if pc.closed == nil {
+ pc.closed = err
+ if pc.alt != nil {
+ // Do nothing; can only get here via getConn's
+ // handlePendingDial's putOrCloseIdleConn when
+ // it turns out the abandoned connection in
+ // flight ended up negotiating an alternate
+ // protocol. We don't use the connection
+ // freelist for http2. That's done by the
+ // alternate protocol's RoundTripper.
+ } else {
+ pc.conn.Close()
+ close(pc.closech)
+ }
}
pc.mutateHeaderFunc = nil
}