diff options
Diffstat (limited to 'libgo/go/net/http/transport.go')
-rw-r--r-- | libgo/go/net/http/transport.go | 737 |
1 files changed, 517 insertions, 220 deletions
diff --git a/libgo/go/net/http/transport.go b/libgo/go/net/http/transport.go index 70d18646059..41df906cf2d 100644 --- a/libgo/go/net/http/transport.go +++ b/libgo/go/net/http/transport.go @@ -36,7 +36,8 @@ var DefaultTransport RoundTripper = &Transport{ Timeout: 30 * time.Second, KeepAlive: 30 * time.Second, }).Dial, - TLSHandshakeTimeout: 10 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, } // DefaultMaxIdleConnsPerHost is the default value of Transport's @@ -45,7 +46,21 @@ const DefaultMaxIdleConnsPerHost = 2 // Transport is an implementation of RoundTripper that supports HTTP, // HTTPS, and HTTP proxies (for either HTTP or HTTPS with CONNECT). -// Transport can also cache connections for future re-use. +// +// By default, Transport caches connections for future re-use. +// This may leave many open connections when accessing many hosts. +// This behavior can be managed using Transport's CloseIdleConnections method +// and the MaxIdleConnsPerHost and DisableKeepAlives fields. +// +// Transports should be reused instead of created as needed. +// Transports are safe for concurrent use by multiple goroutines. +// +// A Transport is a low-level primitive for making HTTP and HTTPS requests. +// For high-level functionality, such as cookies and redirects, see Client. +// +// Transport uses HTTP/1.1 for HTTP URLs and either HTTP/1.1 or HTTP/2 +// for HTTPS URLs, depending on whether the server supports HTTP/2. +// See the package docs for more about HTTP/2. type Transport struct { idleMu sync.Mutex wantIdle bool // user has requested to close all idle conns @@ -113,8 +128,49 @@ type Transport struct { // time does not include the time to read the response body. ResponseHeaderTimeout time.Duration + // ExpectContinueTimeout, if non-zero, specifies the amount of + // time to wait for a server's first response headers after fully + // writing the request headers if the request has an + // "Expect: 100-continue" header. Zero means no timeout. + // This time does not include the time to send the request header. + ExpectContinueTimeout time.Duration + + // TLSNextProto specifies how the Transport switches to an + // alternate protocol (such as HTTP/2) after a TLS NPN/ALPN + // protocol negotiation. If Transport dials an TLS connection + // with a non-empty protocol name and TLSNextProto contains a + // map entry for that key (such as "h2"), then the func is + // called with the request's authority (such as "example.com" + // or "example.com:1234") and the TLS connection. The function + // must return a RoundTripper that then handles the request. + // If TLSNextProto is nil, HTTP/2 support is enabled automatically. + TLSNextProto map[string]func(authority string, c *tls.Conn) RoundTripper + + // nextProtoOnce guards initialization of TLSNextProto and + // h2transport (via onceSetNextProtoDefaults) + nextProtoOnce sync.Once + h2transport *http2Transport // non-nil if http2 wired up + // TODO: tunable on global max cached connections // TODO: tunable on timeout on cached connections + // TODO: tunable on max per-host TCP dials in flight (Issue 13957) +} + +// onceSetNextProtoDefaults initializes TLSNextProto. +// It must be called via t.nextProtoOnce.Do. +func (t *Transport) onceSetNextProtoDefaults() { + if strings.Contains(os.Getenv("GODEBUG"), "http2client=0") { + return + } + if t.TLSNextProto != nil { + return + } + t2, err := http2configureTransport(t) + if err != nil { + log.Printf("Error enabling Transport HTTP/2 support: %v", err) + } else { + t.h2transport = t2 + } } // ProxyFromEnvironment returns the URL of the proxy to use for a @@ -188,7 +244,8 @@ func (tr *transportRequest) extraHeaders() Header { // // For higher-level HTTP client support (such as handling of cookies // and redirects), see Get, Post, and the Client type. -func (t *Transport) RoundTrip(req *Request) (resp *Response, err error) { +func (t *Transport) RoundTrip(req *Request) (*Response, error) { + t.nextProtoOnce.Do(t.onceSetNextProtoDefaults) if req.URL == nil { req.closeBody() return nil, errors.New("http: nil Request.URL") @@ -197,54 +254,114 @@ func (t *Transport) RoundTrip(req *Request) (resp *Response, err error) { req.closeBody() return nil, errors.New("http: nil Request.Header") } - if req.URL.Scheme != "http" && req.URL.Scheme != "https" { - t.altMu.RLock() - var rt RoundTripper - if t.altProto != nil { - rt = t.altProto[req.URL.Scheme] - } - t.altMu.RUnlock() - if rt == nil { - req.closeBody() - return nil, &badStringError{"unsupported protocol scheme", req.URL.Scheme} + // TODO(bradfitz): switch to atomic.Value for this map instead of RWMutex + t.altMu.RLock() + altRT := t.altProto[req.URL.Scheme] + t.altMu.RUnlock() + if altRT != nil { + if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol { + return resp, err } - return rt.RoundTrip(req) } - if req.URL.Host == "" { + if s := req.URL.Scheme; s != "http" && s != "https" { req.closeBody() - return nil, errors.New("http: no Host in request URL") + return nil, &badStringError{"unsupported protocol scheme", s} } - treq := &transportRequest{Request: req} - cm, err := t.connectMethodForRequest(treq) - if err != nil { + if req.Method != "" && !validMethod(req.Method) { + return nil, fmt.Errorf("net/http: invalid method %q", req.Method) + } + if req.URL.Host == "" { req.closeBody() - return nil, err + return nil, errors.New("http: no Host in request URL") } - // Get the cached or newly-created connection to either the - // host (for http or https), the http proxy, or the http proxy - // pre-CONNECTed to https server. In any case, we'll be ready - // to send it requests. - pconn, err := t.getConn(req, cm) - if err != nil { - t.setReqCanceler(req, nil) - req.closeBody() - return nil, err + for { + // treq gets modified by roundTrip, so we need to recreate for each retry. + treq := &transportRequest{Request: req} + cm, err := t.connectMethodForRequest(treq) + if err != nil { + req.closeBody() + return nil, err + } + + // Get the cached or newly-created connection to either the + // host (for http or https), the http proxy, or the http proxy + // pre-CONNECTed to https server. In any case, we'll be ready + // to send it requests. + pconn, err := t.getConn(req, cm) + if err != nil { + t.setReqCanceler(req, nil) + req.closeBody() + return nil, err + } + + var resp *Response + if pconn.alt != nil { + // HTTP/2 path. + t.setReqCanceler(req, nil) // not cancelable with CancelRequest + resp, err = pconn.alt.RoundTrip(req) + } else { + resp, err = pconn.roundTrip(treq) + } + if err == nil { + return resp, nil + } + if err := checkTransportResend(err, req, pconn); err != nil { + return nil, err + } + testHookRoundTripRetried() } +} - return pconn.roundTrip(treq) +// checkTransportResend checks whether a failed HTTP request can be +// resent on a new connection. The non-nil input error is the error from +// roundTrip, which might be wrapped in a beforeRespHeaderError error. +// +// The return value is err or the unwrapped error inside a +// beforeRespHeaderError. +func checkTransportResend(err error, req *Request, pconn *persistConn) error { + brhErr, ok := err.(beforeRespHeaderError) + if !ok { + return err + } + err = brhErr.error // unwrap the custom error in case we return it + if err != errMissingHost && pconn.isReused() && req.isReplayable() { + // If we try to reuse a connection that the server is in the process of + // closing, we may end up successfully writing out our request (or a + // portion of our request) only to find a connection error when we try to + // read from (or finish writing to) the socket. + + // There can be a race between the socket pool checking whether a socket + // is still connected, receiving the FIN, and sending/reading data on a + // reused socket. If we receive the FIN between the connectedness check + // and writing/reading from the socket, we may first learn the socket is + // disconnected when we get a ERR_SOCKET_NOT_CONNECTED. This will most + // likely happen when trying to retrieve its IP address. See + // http://crbug.com/105824 for more details. + + // We resend a request only if we reused a keep-alive connection and did + // not yet receive any header data. This automatically prevents an + // infinite resend loop because we'll run out of the cached keep-alive + // connections eventually. + return nil + } + return err } +// ErrSkipAltProtocol is a sentinel error value defined by Transport.RegisterProtocol. +var ErrSkipAltProtocol = errors.New("net/http: skip alternate protocol") + // RegisterProtocol registers a new protocol with scheme. // The Transport will pass requests using the given scheme to rt. // It is rt's responsibility to simulate HTTP request semantics. // // RegisterProtocol can be used by other packages to provide // implementations of protocol schemes like "ftp" or "file". +// +// If rt.RoundTrip returns ErrSkipAltProtocol, the Transport will +// handle the RoundTrip itself for that one request, as if the +// protocol were not registered. func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) { - if scheme == "http" || scheme == "https" { - panic("protocol " + scheme + " already registered") - } t.altMu.Lock() defer t.altMu.Unlock() if t.altProto == nil { @@ -261,6 +378,7 @@ func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) { // a "keep-alive" state. It does not interrupt any connections currently // in use. func (t *Transport) CloseIdleConnections() { + t.nextProtoOnce.Do(t.onceSetNextProtoDefaults) t.idleMu.Lock() m := t.idleConn t.idleConn = nil @@ -269,13 +387,19 @@ func (t *Transport) CloseIdleConnections() { t.idleMu.Unlock() for _, conns := range m { for _, pconn := range conns { - pconn.close() + pconn.close(errCloseIdleConns) } } + if t2 := t.h2transport; t2 != nil { + t2.CloseIdleConnections() + } } // CancelRequest cancels an in-flight request by closing its connection. // CancelRequest should only be called after RoundTrip has returned. +// +// Deprecated: Use Request.Cancel instead. CancelRequest can not cancel +// HTTP/2 requests. func (t *Transport) CancelRequest(req *Request) { t.reqMu.Lock() cancel := t.reqCanceler[req] @@ -354,23 +478,41 @@ func (cm *connectMethod) proxyAuth() string { return "" } -// putIdleConn adds pconn to the list of idle persistent connections awaiting +// error values for debugging and testing, not seen by users. +var ( + errKeepAlivesDisabled = errors.New("http: putIdleConn: keep alives disabled") + errConnBroken = errors.New("http: putIdleConn: connection is in bad state") + errWantIdle = errors.New("http: putIdleConn: CloseIdleConnections was called") + errTooManyIdle = errors.New("http: putIdleConn: too many idle connections") + errCloseIdleConns = errors.New("http: CloseIdleConnections called") + errReadLoopExiting = errors.New("http: persistConn.readLoop exiting") + errServerClosedIdle = errors.New("http: server closed idle conn") +) + +func (t *Transport) putOrCloseIdleConn(pconn *persistConn) { + if err := t.tryPutIdleConn(pconn); err != nil { + pconn.close(err) + } +} + +// tryPutIdleConn adds pconn to the list of idle persistent connections awaiting // a new request. -// If pconn is no longer needed or not in a good state, putIdleConn -// returns false. -func (t *Transport) putIdleConn(pconn *persistConn) bool { +// If pconn is no longer needed or not in a good state, tryPutIdleConn returns +// an error explaining why it wasn't registered. +// tryPutIdleConn does not close pconn. Use putOrCloseIdleConn instead for that. +func (t *Transport) tryPutIdleConn(pconn *persistConn) error { if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 { - pconn.close() - return false + return errKeepAlivesDisabled } if pconn.isBroken() { - return false + return errConnBroken } key := pconn.cacheKey max := t.MaxIdleConnsPerHost if max == 0 { max = DefaultMaxIdleConnsPerHost } + pconn.markReused() t.idleMu.Lock() waitingDialer := t.idleConnCh[key] @@ -382,7 +524,7 @@ func (t *Transport) putIdleConn(pconn *persistConn) bool { // first). Chrome calls this socket late binding. See // https://insouciant.org/tech/connection-management-in-chromium/ t.idleMu.Unlock() - return true + return nil default: if waitingDialer != nil { // They had populated this, but their dial won @@ -392,16 +534,14 @@ func (t *Transport) putIdleConn(pconn *persistConn) bool { } if t.wantIdle { t.idleMu.Unlock() - pconn.close() - return false + return errWantIdle } if t.idleConn == nil { t.idleConn = make(map[connectMethodKey][]*persistConn) } if len(t.idleConn[key]) >= max { t.idleMu.Unlock() - pconn.close() - return false + return errTooManyIdle } for _, exist := range t.idleConn[key] { if exist == pconn { @@ -410,7 +550,7 @@ func (t *Transport) putIdleConn(pconn *persistConn) bool { } t.idleConn[key] = append(t.idleConn[key], pconn) t.idleMu.Unlock() - return true + return nil } // getIdleConnCh returns a channel to receive and return idle @@ -494,16 +634,17 @@ func (t *Transport) replaceReqCanceler(r *Request, fn func()) bool { return true } -func (t *Transport) dial(network, addr string) (c net.Conn, err error) { +func (t *Transport) dial(network, addr string) (net.Conn, error) { if t.Dial != nil { - return t.Dial(network, addr) + c, err := t.Dial(network, addr) + if c == nil && err == nil { + err = errors.New("net/http: Transport.Dial hook returned (nil, nil)") + } + return c, err } return net.Dial(network, addr) } -// Testing hooks: -var prePendingDial, postPendingDial func() - // getConn dials and creates a new persistConn to the target as // specified in the connectMethod. This includes doing a proxy CONNECT // and/or setting up TLS. If this doesn't return an error, the persistConn @@ -525,20 +666,16 @@ func (t *Transport) getConn(req *Request, cm connectMethod) (*persistConn, error // Copy these hooks so we don't race on the postPendingDial in // the goroutine we launch. Issue 11136. - prePendingDial := prePendingDial - postPendingDial := postPendingDial + testHookPrePendingDial := testHookPrePendingDial + testHookPostPendingDial := testHookPostPendingDial handlePendingDial := func() { - if prePendingDial != nil { - prePendingDial() - } + testHookPrePendingDial() go func() { if v := <-dialc; v.err == nil { - t.putIdleConn(v.pc) - } - if postPendingDial != nil { - postPendingDial() + t.putOrCloseIdleConn(v.pc) } + testHookPostPendingDial() }() } @@ -565,10 +702,10 @@ func (t *Transport) getConn(req *Request, cm connectMethod) (*persistConn, error return pc, nil case <-req.Cancel: handlePendingDial() - return nil, errors.New("net/http: request canceled while waiting for connection") + return nil, errRequestCanceledConn case <-cancelc: handlePendingDial() - return nil, errors.New("net/http: request canceled while waiting for connection") + return nil, errRequestCanceledConn } } @@ -588,7 +725,16 @@ func (t *Transport) dialConn(cm connectMethod) (*persistConn, error) { if err != nil { return nil, err } + if pconn.conn == nil { + return nil, errors.New("net/http: Transport.DialTLS returned (nil, nil)") + } if tc, ok := pconn.conn.(*tls.Conn); ok { + // Handshake here, in case DialTLS didn't. TLSNextProto below + // depends on it for knowing the connection state. + if err := tc.Handshake(); err != nil { + go pconn.conn.Close() + return nil, err + } cs := tc.ConnectionState() pconn.tlsState = &cs } @@ -680,6 +826,12 @@ func (t *Transport) dialConn(cm connectMethod) (*persistConn, error) { pconn.conn = tlsConn } + if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" { + if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok { + return &persistConn{alt: next(cm.targetAddr, pconn.conn.(*tls.Conn))}, nil + } + } + pconn.br = bufio.NewReader(noteEOFReader{pconn.conn, &pconn.sawEOF}) pconn.bw = bufio.NewWriter(pconn.conn) go pconn.readLoop() @@ -809,6 +961,11 @@ func (k connectMethodKey) String() string { // persistConn wraps a connection, usually a persistent one // (but may be used for non-keep-alive requests as well) type persistConn struct { + // alt optionally specifies the TLS NextProto RoundTripper. + // This is used for HTTP/2 today and future protocol laters. + // If it's non-nil, the rest of the fields are unused. + alt RoundTripper + t *Transport cacheKey connectMethodKey conn net.Conn @@ -828,9 +985,10 @@ type persistConn struct { lk sync.Mutex // guards following fields numExpectedResponses int - closed bool // whether conn has been closed - broken bool // an error has happened on this connection; marked broken so it's not reused. - canceled bool // whether this conn was broken due a CancelRequest + closed error // set non-nil when conn is closed, before closech is closed + broken bool // an error has happened on this connection; marked broken so it's not reused. + canceled bool // whether this conn was broken due a CancelRequest + reused bool // whether conn has had successful request/response and is being reused. // mutateHeaderFunc is an optional func to modify extra // headers on each outbound request before it's written. (the // original Request given to RoundTrip is not modified) @@ -852,15 +1010,34 @@ func (pc *persistConn) isCanceled() bool { return pc.canceled } +// isReused reports whether this connection is in a known broken state. +func (pc *persistConn) isReused() bool { + pc.lk.Lock() + r := pc.reused + pc.lk.Unlock() + return r +} + func (pc *persistConn) cancelRequest() { pc.lk.Lock() defer pc.lk.Unlock() pc.canceled = true - pc.closeLocked() + pc.closeLocked(errRequestCanceled) } func (pc *persistConn) readLoop() { - // eofc is used to block http.Handler goroutines reading from Response.Body + closeErr := errReadLoopExiting // default value, if not changed below + defer func() { pc.close(closeErr) }() + + tryPutIdleConn := func() bool { + if err := pc.t.tryPutIdleConn(pc); err != nil { + closeErr = err + return false + } + return true + } + + // eofc is used to block caller goroutines reading from Response.Body // at EOF until this goroutines has (potentially) added the connection // back to the idle pool. eofc := make(chan struct{}) @@ -873,17 +1050,14 @@ func (pc *persistConn) readLoop() { alive := true for alive { - pb, err := pc.br.Peek(1) + _, err := pc.br.Peek(1) + if err != nil { + err = beforeRespHeaderError{err} + } pc.lk.Lock() if pc.numExpectedResponses == 0 { - if !pc.closed { - pc.closeLocked() - if len(pb) > 0 { - log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v", - string(pb), err) - } - } + pc.readLoopPeekFailLocked(err) pc.lk.Unlock() return } @@ -893,115 +1067,189 @@ func (pc *persistConn) readLoop() { var resp *Response if err == nil { - resp, err = ReadResponse(pc.br, rc.req) - if err == nil && resp.StatusCode == 100 { - // Skip any 100-continue for now. - // TODO(bradfitz): if rc.req had "Expect: 100-continue", - // actually block the request body write and signal the - // writeLoop now to begin sending it. (Issue 2184) For now we - // eat it, since we're never expecting one. - resp, err = ReadResponse(pc.br, rc.req) - } - } - - if resp != nil { - resp.TLS = pc.tlsState + resp, err = pc.readResponse(rc) } - hasBody := resp != nil && rc.req.Method != "HEAD" && resp.ContentLength != 0 - if err != nil { - pc.close() - } else { - if rc.addedGzip && hasBody && resp.Header.Get("Content-Encoding") == "gzip" { - resp.Header.Del("Content-Encoding") - resp.Header.Del("Content-Length") - resp.ContentLength = -1 - resp.Body = &gzipReader{body: resp.Body} + // If we won't be able to retry this request later (from the + // roundTrip goroutine), mark it as done now. + // BEFORE the send on rc.ch, as the client might re-use the + // same *Request pointer, and we don't want to set call + // t.setReqCanceler from this persistConn while the Transport + // potentially spins up a different persistConn for the + // caller's subsequent request. + if checkTransportResend(err, rc.req, pc) != nil { + pc.t.setReqCanceler(rc.req, nil) + } + select { + case rc.ch <- responseAndError{err: err}: + case <-rc.callerGone: + return } - resp.Body = &bodyEOFSignal{body: resp.Body} + return } - if err != nil || resp.Close || rc.req.Close || resp.StatusCode <= 199 { + pc.lk.Lock() + pc.numExpectedResponses-- + pc.lk.Unlock() + + hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0 + + if resp.Close || rc.req.Close || resp.StatusCode <= 199 { // Don't do keep-alive on error if either party requested a close // or we get an unexpected informational (1xx) response. // StatusCode 100 is already handled above. alive = false } - var waitForBodyRead chan bool // channel is nil when there's no body - if hasBody { - waitForBodyRead = make(chan bool, 2) - resp.Body.(*bodyEOFSignal).earlyCloseFn = func() error { - waitForBodyRead <- false - return nil - } - resp.Body.(*bodyEOFSignal).fn = func(err error) error { - isEOF := err == io.EOF - waitForBodyRead <- isEOF - if isEOF { - <-eofc // see comment at top - } else if err != nil && pc.isCanceled() { - return errRequestCanceled - } - return err - } - } else { - // Before send on rc.ch, as client might re-use the - // same *Request pointer, and we don't want to set this - // on t from this persistConn while the Transport - // potentially spins up a different persistConn for the - // caller's subsequent request. + if !hasBody { pc.t.setReqCanceler(rc.req, nil) - } - pc.lk.Lock() - pc.numExpectedResponses-- - pc.lk.Unlock() + // Put the idle conn back into the pool before we send the response + // so if they process it quickly and make another request, they'll + // get this same conn. But we use the unbuffered channel 'rc' + // to guarantee that persistConn.roundTrip got out of its select + // potentially waiting for this persistConn to close. + // but after + alive = alive && + !pc.sawEOF && + pc.wroteRequest() && + tryPutIdleConn() - // The connection might be going away when we put the - // idleConn below. When that happens, we close the response channel to signal - // to roundTrip that the connection is gone. roundTrip waits for - // both closing and a response in a select, so it might choose - // the close channel, rather than the response. - // We send the response first so that roundTrip can check - // if there is a pending one with a non-blocking select - // on the response channel before erroring out. - rc.ch <- responseAndError{resp, err} - - if hasBody { - // To avoid a race, wait for the just-returned - // response body to be fully consumed before peek on - // the underlying bufio reader. select { - case <-rc.req.Cancel: - alive = false - pc.t.CancelRequest(rc.req) - case bodyEOF := <-waitForBodyRead: - pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool - alive = alive && - bodyEOF && - !pc.sawEOF && - pc.wroteRequest() && - pc.t.putIdleConn(pc) - if bodyEOF { - eofc <- struct{}{} - } - case <-pc.closech: - alive = false + case rc.ch <- responseAndError{res: resp}: + case <-rc.callerGone: + return } - } else { + + // Now that they've read from the unbuffered channel, they're safely + // out of the select that also waits on this goroutine to die, so + // we're allowed to exit now if needed (if alive is false) + testHookReadLoopBeforeNextRead() + continue + } + + if rc.addedGzip { + maybeUngzipResponse(resp) + } + resp.Body = &bodyEOFSignal{body: resp.Body} + + waitForBodyRead := make(chan bool, 2) + resp.Body.(*bodyEOFSignal).earlyCloseFn = func() error { + waitForBodyRead <- false + return nil + } + resp.Body.(*bodyEOFSignal).fn = func(err error) error { + isEOF := err == io.EOF + waitForBodyRead <- isEOF + if isEOF { + <-eofc // see comment above eofc declaration + } else if err != nil && pc.isCanceled() { + return errRequestCanceled + } + return err + } + + select { + case rc.ch <- responseAndError{res: resp}: + case <-rc.callerGone: + return + } + + // Before looping back to the top of this function and peeking on + // the bufio.Reader, wait for the caller goroutine to finish + // reading the response body. (or for cancelation or death) + select { + case bodyEOF := <-waitForBodyRead: + pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool alive = alive && + bodyEOF && !pc.sawEOF && pc.wroteRequest() && - pc.t.putIdleConn(pc) + tryPutIdleConn() + if bodyEOF { + eofc <- struct{}{} + } + case <-rc.req.Cancel: + alive = false + pc.t.CancelRequest(rc.req) + case <-pc.closech: + alive = false + } + + testHookReadLoopBeforeNextRead() + } +} + +func maybeUngzipResponse(resp *Response) { + if resp.Header.Get("Content-Encoding") == "gzip" { + resp.Header.Del("Content-Encoding") + resp.Header.Del("Content-Length") + resp.ContentLength = -1 + resp.Body = &gzipReader{body: resp.Body} + } +} + +func (pc *persistConn) readLoopPeekFailLocked(peekErr error) { + if pc.closed != nil { + return + } + if n := pc.br.Buffered(); n > 0 { + buf, _ := pc.br.Peek(n) + log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v", buf, peekErr) + } + if peekErr == io.EOF { + // common case. + pc.closeLocked(errServerClosedIdle) + } else { + pc.closeLocked(fmt.Errorf("readLoopPeekFailLocked: %v", peekErr)) + } +} + +// readResponse reads an HTTP response (or two, in the case of "Expect: +// 100-continue") from the server. It returns the final non-100 one. +func (pc *persistConn) readResponse(rc requestAndChan) (resp *Response, err error) { + resp, err = ReadResponse(pc.br, rc.req) + if err != nil { + return + } + if rc.continueCh != nil { + if resp.StatusCode == 100 { + rc.continueCh <- struct{}{} + } else { + close(rc.continueCh) + } + } + if resp.StatusCode == 100 { + resp, err = ReadResponse(pc.br, rc.req) + if err != nil { + return } + } + resp.TLS = pc.tlsState + return +} + +// waitForContinue returns the function to block until +// any response, timeout or connection close. After any of them, +// the function returns a bool which indicates if the body should be sent. +func (pc *persistConn) waitForContinue(continueCh <-chan struct{}) func() bool { + if continueCh == nil { + return nil + } + return func() bool { + timer := time.NewTimer(pc.t.ExpectContinueTimeout) + defer timer.Stop() - if hook := testHookReadLoopBeforeNextRead; hook != nil { - hook() + select { + case _, ok := <-continueCh: + return ok + case <-timer.C: + return true + case <-pc.closech: + return false } } - pc.close() } func (pc *persistConn) writeLoop() { @@ -1012,7 +1260,7 @@ func (pc *persistConn) writeLoop() { wr.ch <- errors.New("http: can't write HTTP request on broken connection") continue } - err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra) + err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh)) if err == nil { err = pc.bw.Flush() } @@ -1056,19 +1304,29 @@ func (pc *persistConn) wroteRequest() bool { } } +// responseAndError is how the goroutine reading from an HTTP/1 server +// communicates with the goroutine doing the RoundTrip. type responseAndError struct { - res *Response + res *Response // else use this response (see res method) err error } type requestAndChan struct { req *Request - ch chan responseAndError + ch chan responseAndError // unbuffered; always send in select on callerGone // did the Transport (as opposed to the client code) add an // Accept-Encoding gzip header? only if it we set it do // we transparently decode the gzip. addedGzip bool + + // Optional blocking chan for Expect: 100-continue (for send). + // If the request has an "Expect: 100-continue" header and + // the server responds 100 Continue, readLoop send a value + // to writeLoop via this chan. + continueCh chan<- struct{} + + callerGone <-chan struct{} // closed when roundTrip caller has returned } // A writeRequest is sent by the readLoop's goroutine to the @@ -1078,6 +1336,11 @@ type requestAndChan struct { type writeRequest struct { req *transportRequest ch chan<- error + + // Optional blocking chan for Expect: 100-continue (for recieve). + // If not nil, writeLoop blocks sending request body until + // it receives from this chan. + continueCh <-chan struct{} } type httpError struct { @@ -1090,23 +1353,34 @@ func (e *httpError) Timeout() bool { return e.timeout } func (e *httpError) Temporary() bool { return true } var errTimeout error = &httpError{err: "net/http: timeout awaiting response headers", timeout: true} -var errClosed error = &httpError{err: "net/http: transport closed before response was received"} +var errClosed error = &httpError{err: "net/http: server closed connection before response was received"} var errRequestCanceled = errors.New("net/http: request canceled") +var errRequestCanceledConn = errors.New("net/http: request canceled while waiting for connection") // TODO: unify? + +func nop() {} -// nil except for tests +// testHooks. Always non-nil. var ( - testHookPersistConnClosedGotRes func() - testHookEnterRoundTrip func() - testHookMu sync.Locker = fakeLocker{} // guards following - testHookReadLoopBeforeNextRead func() + testHookEnterRoundTrip = nop + testHookWaitResLoop = nop + testHookRoundTripRetried = nop + testHookPrePendingDial = nop + testHookPostPendingDial = nop + + testHookMu sync.Locker = fakeLocker{} // guards following + testHookReadLoopBeforeNextRead = nop ) +// beforeRespHeaderError is used to indicate when an IO error has occurred before +// any header data was received. +type beforeRespHeaderError struct { + error +} + func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) { - if hook := testHookEnterRoundTrip; hook != nil { - hook() - } + testHookEnterRoundTrip() if !pc.t.replaceReqCanceler(req.Request, pc.cancelRequest) { - pc.t.putIdleConn(pc) + pc.t.putOrCloseIdleConn(pc) return nil, errRequestCanceled } pc.lk.Lock() @@ -1143,42 +1417,47 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err req.extraHeaders().Set("Accept-Encoding", "gzip") } + var continueCh chan struct{} + if req.ProtoAtLeast(1, 1) && req.Body != nil && req.expectsContinue() { + continueCh = make(chan struct{}, 1) + } + if pc.t.DisableKeepAlives { req.extraHeaders().Set("Connection", "close") } + gone := make(chan struct{}) + defer close(gone) + // Write the request concurrently with waiting for a response, // in case the server decides to reply before reading our full // request body. writeErrCh := make(chan error, 1) - pc.writech <- writeRequest{req, writeErrCh} + pc.writech <- writeRequest{req, writeErrCh, continueCh} - resc := make(chan responseAndError, 1) - pc.reqch <- requestAndChan{req.Request, resc, requestedGzip} + resc := make(chan responseAndError) + pc.reqch <- requestAndChan{ + req: req.Request, + ch: resc, + addedGzip: requestedGzip, + continueCh: continueCh, + callerGone: gone, + } var re responseAndError var respHeaderTimer <-chan time.Time cancelChan := req.Request.Cancel WaitResponse: for { + testHookWaitResLoop() select { case err := <-writeErrCh: - if isNetWriteError(err) { - // Issue 11745. If we failed to write the request - // body, it's possible the server just heard enough - // and already wrote to us. Prioritize the server's - // response over returning a body write error. - select { - case re = <-resc: - pc.close() - break WaitResponse - case <-time.After(50 * time.Millisecond): - // Fall through. - } - } if err != nil { - re = responseAndError{nil, err} - pc.close() + if pc.isCanceled() { + err = errRequestCanceled + } + re = responseAndError{err: beforeRespHeaderError{err}} + pc.close(fmt.Errorf("write error: %v", err)) break WaitResponse } if d := pc.t.ResponseHeaderTimeout; d > 0 { @@ -1187,33 +1466,22 @@ WaitResponse: respHeaderTimer = timer.C } case <-pc.closech: - // The persist connection is dead. This shouldn't - // usually happen (only with Connection: close responses - // with no response bodies), but if it does happen it - // means either a) the remote server hung up on us - // prematurely, or b) the readLoop sent us a response & - // closed its closech at roughly the same time, and we - // selected this case first. If we got a response, readLoop makes sure - // to send it before it puts the conn and closes the channel. - // That way, we can fetch the response, if there is one, - // with a non-blocking receive. - select { - case re = <-resc: - if fn := testHookPersistConnClosedGotRes; fn != nil { - fn() - } - default: - re = responseAndError{err: errClosed} - if pc.isCanceled() { - re = responseAndError{err: errRequestCanceled} - } + var err error + if pc.isCanceled() { + err = errRequestCanceled + } else { + err = beforeRespHeaderError{fmt.Errorf("net/http: HTTP/1 transport connection broken: %v", pc.closed)} } + re = responseAndError{err: err} break WaitResponse case <-respHeaderTimer: - pc.close() + pc.close(errTimeout) re = responseAndError{err: errTimeout} break WaitResponse case re = <-resc: + if re.err != nil && pc.isCanceled() { + re.err = errRequestCanceled + } break WaitResponse case <-cancelChan: pc.t.CancelRequest(req.Request) @@ -1224,6 +1492,9 @@ WaitResponse: if re.err != nil { pc.t.setReqCanceler(req.Request, nil) } + if (re.res == nil) == (re.err == nil) { + panic("internal error: exactly one of res or err should be set") + } return re.res, re.err } @@ -1236,18 +1507,44 @@ func (pc *persistConn) markBroken() { pc.broken = true } -func (pc *persistConn) close() { +// markReused marks this connection as having been successfully used for a +// request and response. +func (pc *persistConn) markReused() { + pc.lk.Lock() + pc.reused = true + pc.lk.Unlock() +} + +// close closes the underlying TCP connection and closes +// the pc.closech channel. +// +// The provided err is only for testing and debugging; in normal +// circumstances it should never be seen by users. +func (pc *persistConn) close(err error) { pc.lk.Lock() defer pc.lk.Unlock() - pc.closeLocked() + pc.closeLocked(err) } -func (pc *persistConn) closeLocked() { +func (pc *persistConn) closeLocked(err error) { + if err == nil { + panic("nil error") + } pc.broken = true - if !pc.closed { - pc.conn.Close() - pc.closed = true - close(pc.closech) + if pc.closed == nil { + pc.closed = err + if pc.alt != nil { + // Do nothing; can only get here via getConn's + // handlePendingDial's putOrCloseIdleConn when + // it turns out the abandoned connection in + // flight ended up negotiating an alternate + // protocol. We don't use the connection + // freelist for http2. That's done by the + // alternate protocol's RoundTripper. + } else { + pc.conn.Close() + close(pc.closech) + } } pc.mutateHeaderFunc = nil } |