diff options
Diffstat (limited to 'libgo/go/net/http/transport.go')
-rw-r--r-- | libgo/go/net/http/transport.go | 360 |
1 files changed, 273 insertions, 87 deletions
diff --git a/libgo/go/net/http/transport.go b/libgo/go/net/http/transport.go index 782f7cd395b..70d18646059 100644 --- a/libgo/go/net/http/transport.go +++ b/libgo/go/net/http/transport.go @@ -274,11 +274,12 @@ func (t *Transport) CloseIdleConnections() { } } -// CancelRequest cancels an in-flight request by closing its -// connection. +// CancelRequest cancels an in-flight request by closing its connection. +// CancelRequest should only be called after RoundTrip has returned. func (t *Transport) CancelRequest(req *Request) { t.reqMu.Lock() cancel := t.reqCanceler[req] + delete(t.reqCanceler, req) t.reqMu.Unlock() if cancel != nil { cancel() @@ -474,6 +475,25 @@ func (t *Transport) setReqCanceler(r *Request, fn func()) { } } +// replaceReqCanceler replaces an existing cancel function. If there is no cancel function +// for the request, we don't set the function and return false. +// Since CancelRequest will clear the canceler, we can use the return value to detect if +// the request was canceled since the last setReqCancel call. +func (t *Transport) replaceReqCanceler(r *Request, fn func()) bool { + t.reqMu.Lock() + defer t.reqMu.Unlock() + _, ok := t.reqCanceler[r] + if !ok { + return false + } + if fn != nil { + t.reqCanceler[r] = fn + } else { + delete(t.reqCanceler, r) + } + return true +} + func (t *Transport) dial(network, addr string) (c net.Conn, err error) { if t.Dial != nil { return t.Dial(network, addr) @@ -490,6 +510,10 @@ var prePendingDial, postPendingDial func() // is ready to write requests to. func (t *Transport) getConn(req *Request, cm connectMethod) (*persistConn, error) { if pc := t.getIdleConn(cm); pc != nil { + // set request canceler to some non-nil function so we + // can detect whether it was cleared between now and when + // we enter roundTrip + t.setReqCanceler(req, func() {}) return pc, nil } @@ -499,6 +523,11 @@ func (t *Transport) getConn(req *Request, cm connectMethod) (*persistConn, error } dialc := make(chan dialRes) + // Copy these hooks so we don't race on the postPendingDial in + // the goroutine we launch. Issue 11136. + prePendingDial := prePendingDial + postPendingDial := postPendingDial + handlePendingDial := func() { if prePendingDial != nil { prePendingDial() @@ -534,6 +563,9 @@ func (t *Transport) getConn(req *Request, cm connectMethod) (*persistConn, error // when it finishes: handlePendingDial() return pc, nil + case <-req.Cancel: + handlePendingDial() + return nil, errors.New("net/http: request canceled while waiting for connection") case <-cancelc: handlePendingDial() return nil, errors.New("net/http: request canceled while waiting for connection") @@ -613,16 +645,9 @@ func (t *Transport) dialConn(cm connectMethod) (*persistConn, error) { if cm.targetScheme == "https" && !tlsDial { // Initiate TLS and check remote host name against certificate. - cfg := t.TLSClientConfig - if cfg == nil || cfg.ServerName == "" { - host := cm.tlsHost() - if cfg == nil { - cfg = &tls.Config{ServerName: host} - } else { - clone := *cfg // shallow clone - clone.ServerName = host - cfg = &clone - } + cfg := cloneTLSClientConfig(t.TLSClientConfig) + if cfg.ServerName == "" { + cfg.ServerName = cm.tlsHost() } plainConn := pconn.conn tlsConn := tls.Client(plainConn, cfg) @@ -662,7 +687,7 @@ func (t *Transport) dialConn(cm connectMethod) (*persistConn, error) { return pconn, nil } -// useProxy returns true if requests to addr should use a proxy, +// useProxy reports whether requests to addr should use a proxy, // according to the NO_PROXY or no_proxy environment variable. // addr is always a canonicalAddr with a host and port. func useProxy(addr string) bool { @@ -805,6 +830,7 @@ type persistConn struct { numExpectedResponses int closed bool // whether conn has been closed broken bool // an error has happened on this connection; marked broken so it's not reused. + canceled bool // whether this conn was broken due a CancelRequest // mutateHeaderFunc is an optional func to modify extra // headers on each outbound request before it's written. (the // original Request given to RoundTrip is not modified) @@ -819,25 +845,33 @@ func (pc *persistConn) isBroken() bool { return b } -func (pc *persistConn) cancelRequest() { - pc.conn.Close() +// isCanceled reports whether this connection was closed due to CancelRequest. +func (pc *persistConn) isCanceled() bool { + pc.lk.Lock() + defer pc.lk.Unlock() + return pc.canceled } -var remoteSideClosedFunc func(error) bool // or nil to use default - -func remoteSideClosed(err error) bool { - if err == io.EOF { - return true - } - if remoteSideClosedFunc != nil { - return remoteSideClosedFunc(err) - } - return false +func (pc *persistConn) cancelRequest() { + pc.lk.Lock() + defer pc.lk.Unlock() + pc.canceled = true + pc.closeLocked() } func (pc *persistConn) readLoop() { - alive := true + // eofc is used to block http.Handler goroutines reading from Response.Body + // at EOF until this goroutines has (potentially) added the connection + // back to the idle pool. + eofc := make(chan struct{}) + defer close(eofc) // unblock reader on errors + + // Read this once, before loop starts. (to avoid races in tests) + testHookMu.Lock() + testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead + testHookMu.Unlock() + alive := true for alive { pb, err := pc.br.Peek(1) @@ -895,49 +929,79 @@ func (pc *persistConn) readLoop() { alive = false } - var waitForBodyRead chan bool + var waitForBodyRead chan bool // channel is nil when there's no body if hasBody { waitForBodyRead = make(chan bool, 2) resp.Body.(*bodyEOFSignal).earlyCloseFn = func() error { - // Sending false here sets alive to - // false and closes the connection - // below. waitForBodyRead <- false return nil } - resp.Body.(*bodyEOFSignal).fn = func(err error) { - waitForBodyRead <- alive && - err == nil && - !pc.sawEOF && - pc.wroteRequest() && - pc.t.putIdleConn(pc) + resp.Body.(*bodyEOFSignal).fn = func(err error) error { + isEOF := err == io.EOF + waitForBodyRead <- isEOF + if isEOF { + <-eofc // see comment at top + } else if err != nil && pc.isCanceled() { + return errRequestCanceled + } + return err } + } else { + // Before send on rc.ch, as client might re-use the + // same *Request pointer, and we don't want to set this + // on t from this persistConn while the Transport + // potentially spins up a different persistConn for the + // caller's subsequent request. + pc.t.setReqCanceler(rc.req, nil) } - if alive && !hasBody { - alive = !pc.sawEOF && - pc.wroteRequest() && - pc.t.putIdleConn(pc) - } + pc.lk.Lock() + pc.numExpectedResponses-- + pc.lk.Unlock() + // The connection might be going away when we put the + // idleConn below. When that happens, we close the response channel to signal + // to roundTrip that the connection is gone. roundTrip waits for + // both closing and a response in a select, so it might choose + // the close channel, rather than the response. + // We send the response first so that roundTrip can check + // if there is a pending one with a non-blocking select + // on the response channel before erroring out. rc.ch <- responseAndError{resp, err} - // Wait for the just-returned response body to be fully consumed - // before we race and peek on the underlying bufio reader. - if waitForBodyRead != nil { + if hasBody { + // To avoid a race, wait for the just-returned + // response body to be fully consumed before peek on + // the underlying bufio reader. select { - case alive = <-waitForBodyRead: + case <-rc.req.Cancel: + alive = false + pc.t.CancelRequest(rc.req) + case bodyEOF := <-waitForBodyRead: + pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool + alive = alive && + bodyEOF && + !pc.sawEOF && + pc.wroteRequest() && + pc.t.putIdleConn(pc) + if bodyEOF { + eofc <- struct{}{} + } case <-pc.closech: alive = false } + } else { + alive = alive && + !pc.sawEOF && + pc.wroteRequest() && + pc.t.putIdleConn(pc) } - pc.t.setReqCanceler(rc.req, nil) - - if !alive { - pc.close() + if hook := testHookReadLoopBeforeNextRead; hook != nil { + hook() } } + pc.close() } func (pc *persistConn) writeLoop() { @@ -1027,9 +1091,24 @@ func (e *httpError) Temporary() bool { return true } var errTimeout error = &httpError{err: "net/http: timeout awaiting response headers", timeout: true} var errClosed error = &httpError{err: "net/http: transport closed before response was received"} +var errRequestCanceled = errors.New("net/http: request canceled") + +// nil except for tests +var ( + testHookPersistConnClosedGotRes func() + testHookEnterRoundTrip func() + testHookMu sync.Locker = fakeLocker{} // guards following + testHookReadLoopBeforeNextRead func() +) func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) { - pc.t.setReqCanceler(req.Request, pc.cancelRequest) + if hook := testHookEnterRoundTrip; hook != nil { + hook() + } + if !pc.t.replaceReqCanceler(req.Request, pc.cancelRequest) { + pc.t.putIdleConn(pc) + return nil, errRequestCanceled + } pc.lk.Lock() pc.numExpectedResponses++ headerFn := pc.mutateHeaderFunc @@ -1055,15 +1134,19 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err // Note that we don't request this for HEAD requests, // due to a bug in nginx: // http://trac.nginx.org/nginx/ticket/358 - // http://golang.org/issue/5522 + // https://golang.org/issue/5522 // // We don't request gzip if the request is for a range, since // auto-decoding a portion of a gzipped document will just fail - // anyway. See http://golang.org/issue/8923 + // anyway. See https://golang.org/issue/8923 requestedGzip = true req.extraHeaders().Set("Accept-Encoding", "gzip") } + if pc.t.DisableKeepAlives { + req.extraHeaders().Set("Connection", "close") + } + // Write the request concurrently with waiting for a response, // in case the server decides to reply before reading our full // request body. @@ -1074,38 +1157,57 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err pc.reqch <- requestAndChan{req.Request, resc, requestedGzip} var re responseAndError - var pconnDeadCh = pc.closech - var failTicker <-chan time.Time var respHeaderTimer <-chan time.Time + cancelChan := req.Request.Cancel WaitResponse: for { select { case err := <-writeErrCh: + if isNetWriteError(err) { + // Issue 11745. If we failed to write the request + // body, it's possible the server just heard enough + // and already wrote to us. Prioritize the server's + // response over returning a body write error. + select { + case re = <-resc: + pc.close() + break WaitResponse + case <-time.After(50 * time.Millisecond): + // Fall through. + } + } if err != nil { re = responseAndError{nil, err} pc.close() break WaitResponse } if d := pc.t.ResponseHeaderTimeout; d > 0 { - respHeaderTimer = time.After(d) + timer := time.NewTimer(d) + defer timer.Stop() // prevent leaks + respHeaderTimer = timer.C } - case <-pconnDeadCh: + case <-pc.closech: // The persist connection is dead. This shouldn't // usually happen (only with Connection: close responses // with no response bodies), but if it does happen it // means either a) the remote server hung up on us // prematurely, or b) the readLoop sent us a response & // closed its closech at roughly the same time, and we - // selected this case first, in which case a response - // might still be coming soon. - // - // We can't avoid the select race in b) by using a unbuffered - // resc channel instead, because then goroutines can - // leak if we exit due to other errors. - pconnDeadCh = nil // avoid spinning - failTicker = time.After(100 * time.Millisecond) // arbitrary time to wait for resc - case <-failTicker: - re = responseAndError{err: errClosed} + // selected this case first. If we got a response, readLoop makes sure + // to send it before it puts the conn and closes the channel. + // That way, we can fetch the response, if there is one, + // with a non-blocking receive. + select { + case re = <-resc: + if fn := testHookPersistConnClosedGotRes; fn != nil { + fn() + } + default: + re = responseAndError{err: errClosed} + if pc.isCanceled() { + re = responseAndError{err: errRequestCanceled} + } + } break WaitResponse case <-respHeaderTimer: pc.close() @@ -1113,13 +1215,12 @@ WaitResponse: break WaitResponse case re = <-resc: break WaitResponse + case <-cancelChan: + pc.t.CancelRequest(req.Request) + cancelChan = nil } } - pc.lk.Lock() - pc.numExpectedResponses-- - pc.lk.Unlock() - if re.err != nil { pc.t.setReqCanceler(req.Request, nil) } @@ -1167,16 +1268,18 @@ func canonicalAddr(url *url.URL) string { // bodyEOFSignal wraps a ReadCloser but runs fn (if non-nil) at most // once, right before its final (error-producing) Read or Close call -// returns. If earlyCloseFn is non-nil and Close is called before -// io.EOF is seen, earlyCloseFn is called instead of fn, and its -// return value is the return value from Close. +// returns. fn should return the new error to return from Read or Close. +// +// If earlyCloseFn is non-nil and Close is called before io.EOF is +// seen, earlyCloseFn is called instead of fn, and its return value is +// the return value from Close. type bodyEOFSignal struct { body io.ReadCloser - mu sync.Mutex // guards following 4 fields - closed bool // whether Close has been called - rerr error // sticky Read error - fn func(error) // error will be nil on Read io.EOF - earlyCloseFn func() error // optional alt Close func used if io.EOF not seen + mu sync.Mutex // guards following 4 fields + closed bool // whether Close has been called + rerr error // sticky Read error + fn func(error) error // err will be nil on Read io.EOF + earlyCloseFn func() error // optional alt Close func used if io.EOF not seen } func (es *bodyEOFSignal) Read(p []byte) (n int, err error) { @@ -1197,7 +1300,7 @@ func (es *bodyEOFSignal) Read(p []byte) (n int, err error) { if es.rerr == nil { es.rerr = err } - es.condfn(err) + err = es.condfn(err) } return } @@ -1213,20 +1316,17 @@ func (es *bodyEOFSignal) Close() error { return es.earlyCloseFn() } err := es.body.Close() - es.condfn(err) - return err + return es.condfn(err) } // caller must hold es.mu. -func (es *bodyEOFSignal) condfn(err error) { +func (es *bodyEOFSignal) condfn(err error) error { if es.fn == nil { - return - } - if err == io.EOF { - err = nil + return err } - es.fn(err) + err = es.fn(err) es.fn = nil + return err } // gzipReader wraps a response body so it can lazily @@ -1273,3 +1373,89 @@ func (nr noteEOFReader) Read(p []byte) (n int, err error) { } return } + +// fakeLocker is a sync.Locker which does nothing. It's used to guard +// test-only fields when not under test, to avoid runtime atomic +// overhead. +type fakeLocker struct{} + +func (fakeLocker) Lock() {} +func (fakeLocker) Unlock() {} + +func isNetWriteError(err error) bool { + switch e := err.(type) { + case *url.Error: + return isNetWriteError(e.Err) + case *net.OpError: + return e.Op == "write" + default: + return false + } +} + +// cloneTLSConfig returns a shallow clone of the exported +// fields of cfg, ignoring the unexported sync.Once, which +// contains a mutex and must not be copied. +// +// The cfg must not be in active use by tls.Server, or else +// there can still be a race with tls.Server updating SessionTicketKey +// and our copying it, and also a race with the server setting +// SessionTicketsDisabled=false on failure to set the random +// ticket key. +// +// If cfg is nil, a new zero tls.Config is returned. +func cloneTLSConfig(cfg *tls.Config) *tls.Config { + if cfg == nil { + return &tls.Config{} + } + return &tls.Config{ + Rand: cfg.Rand, + Time: cfg.Time, + Certificates: cfg.Certificates, + NameToCertificate: cfg.NameToCertificate, + GetCertificate: cfg.GetCertificate, + RootCAs: cfg.RootCAs, + NextProtos: cfg.NextProtos, + ServerName: cfg.ServerName, + ClientAuth: cfg.ClientAuth, + ClientCAs: cfg.ClientCAs, + InsecureSkipVerify: cfg.InsecureSkipVerify, + CipherSuites: cfg.CipherSuites, + PreferServerCipherSuites: cfg.PreferServerCipherSuites, + SessionTicketsDisabled: cfg.SessionTicketsDisabled, + SessionTicketKey: cfg.SessionTicketKey, + ClientSessionCache: cfg.ClientSessionCache, + MinVersion: cfg.MinVersion, + MaxVersion: cfg.MaxVersion, + CurvePreferences: cfg.CurvePreferences, + } +} + +// cloneTLSClientConfig is like cloneTLSConfig but omits +// the fields SessionTicketsDisabled and SessionTicketKey. +// This makes it safe to call cloneTLSClientConfig on a config +// in active use by a server. +func cloneTLSClientConfig(cfg *tls.Config) *tls.Config { + if cfg == nil { + return &tls.Config{} + } + return &tls.Config{ + Rand: cfg.Rand, + Time: cfg.Time, + Certificates: cfg.Certificates, + NameToCertificate: cfg.NameToCertificate, + GetCertificate: cfg.GetCertificate, + RootCAs: cfg.RootCAs, + NextProtos: cfg.NextProtos, + ServerName: cfg.ServerName, + ClientAuth: cfg.ClientAuth, + ClientCAs: cfg.ClientCAs, + InsecureSkipVerify: cfg.InsecureSkipVerify, + CipherSuites: cfg.CipherSuites, + PreferServerCipherSuites: cfg.PreferServerCipherSuites, + ClientSessionCache: cfg.ClientSessionCache, + MinVersion: cfg.MinVersion, + MaxVersion: cfg.MaxVersion, + CurvePreferences: cfg.CurvePreferences, + } +} |