summaryrefslogtreecommitdiff
path: root/go/vendor/google.golang.org/grpc/clientconn.go
diff options
context:
space:
mode:
Diffstat (limited to 'go/vendor/google.golang.org/grpc/clientconn.go')
-rw-r--r--go/vendor/google.golang.org/grpc/clientconn.go1338
1 files changed, 880 insertions, 458 deletions
diff --git a/go/vendor/google.golang.org/grpc/clientconn.go b/go/vendor/google.golang.org/grpc/clientconn.go
index 146166a..bfbef36 100644
--- a/go/vendor/google.golang.org/grpc/clientconn.go
+++ b/go/vendor/google.golang.org/grpc/clientconn.go
@@ -1,33 +1,18 @@
/*
*
- * Copyright 2014, Google Inc.
- * All rights reserved.
+ * Copyright 2014 gRPC authors.
*
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
@@ -36,15 +21,24 @@ package grpc
import (
"errors"
"fmt"
+ "math"
"net"
+ "reflect"
"strings"
"sync"
"time"
"golang.org/x/net/context"
"golang.org/x/net/trace"
+ "google.golang.org/grpc/balancer"
+ _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
+ "google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
+ "google.golang.org/grpc/keepalive"
+ "google.golang.org/grpc/resolver"
+ _ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
+ _ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
"google.golang.org/grpc/stats"
"google.golang.org/grpc/transport"
)
@@ -55,10 +49,22 @@ var (
ErrClientConnClosing = errors.New("grpc: the client connection is closing")
// ErrClientConnTimeout indicates that the ClientConn cannot establish the
// underlying connections within the specified timeout.
- // DEPRECATED: Please use context.DeadlineExceeded instead. This error will be
- // removed in Q1 2017.
+ // DEPRECATED: Please use context.DeadlineExceeded instead.
ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
+ // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
+ errConnDrain = errors.New("grpc: the connection is drained")
+ // errConnClosing indicates that the connection is closing.
+ errConnClosing = errors.New("grpc: the connection is closing")
+ // errConnUnavailable indicates that the connection is unavailable.
+ errConnUnavailable = errors.New("grpc: the connection is unavailable")
+ // errBalancerClosed indicates that the balancer is closed.
+ errBalancerClosed = errors.New("grpc: balancer is closed")
+ // minimum time to give a connection to complete
+ minConnectTimeout = 20 * time.Second
+)
+// The following errors are returned from Dial and DialContext
+var (
// errNoTransportSecurity indicates that there is no transport security
// being set for ClientConn. Users should either set one or explicitly
// call WithInsecure DialOption to disable security.
@@ -72,37 +78,94 @@ var (
errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
// errNetworkIO indicates that the connection is down due to some network I/O error.
errNetworkIO = errors.New("grpc: failed with network I/O error")
- // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
- errConnDrain = errors.New("grpc: the connection is drained")
- // errConnClosing indicates that the connection is closing.
- errConnClosing = errors.New("grpc: the connection is closing")
- // errConnUnavailable indicates that the connection is unavailable.
- errConnUnavailable = errors.New("grpc: the connection is unavailable")
- errNoAddr = errors.New("grpc: there is no address available to dial")
- // minimum time to give a connection to complete
- minConnectTimeout = 20 * time.Second
)
// dialOptions configure a Dial call. dialOptions are set by the DialOption
// values passed to Dial.
type dialOptions struct {
- unaryInt UnaryClientInterceptor
- streamInt StreamClientInterceptor
- codec Codec
- cp Compressor
- dc Decompressor
- bs backoffStrategy
- balancer Balancer
- block bool
- insecure bool
- timeout time.Duration
- scChan <-chan ServiceConfig
- copts transport.ConnectOptions
+ unaryInt UnaryClientInterceptor
+ streamInt StreamClientInterceptor
+ codec Codec
+ cp Compressor
+ dc Decompressor
+ bs backoffStrategy
+ block bool
+ insecure bool
+ timeout time.Duration
+ scChan <-chan ServiceConfig
+ copts transport.ConnectOptions
+ callOptions []CallOption
+ // This is used by v1 balancer dial option WithBalancer to support v1
+ // balancer, and also by WithBalancerName dial option.
+ balancerBuilder balancer.Builder
+ // This is to support grpclb.
+ resolverBuilder resolver.Builder
+ // Custom user options for resolver.Build.
+ resolverBuildUserOptions interface{}
+ waitForHandshake bool
}
+const (
+ defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
+ defaultClientMaxSendMessageSize = math.MaxInt32
+)
+
// DialOption configures how we set up the connection.
type DialOption func(*dialOptions)
+// WithWaitForHandshake blocks until the initial settings frame is received from the
+// server before assigning RPCs to the connection.
+// Experimental API.
+func WithWaitForHandshake() DialOption {
+ return func(o *dialOptions) {
+ o.waitForHandshake = true
+ }
+}
+
+// WithWriteBufferSize lets you set the size of write buffer, this determines how much data can be batched
+// before doing a write on the wire.
+func WithWriteBufferSize(s int) DialOption {
+ return func(o *dialOptions) {
+ o.copts.WriteBufferSize = s
+ }
+}
+
+// WithReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
+// for each read syscall.
+func WithReadBufferSize(s int) DialOption {
+ return func(o *dialOptions) {
+ o.copts.ReadBufferSize = s
+ }
+}
+
+// WithInitialWindowSize returns a DialOption which sets the value for initial window size on a stream.
+// The lower bound for window size is 64K and any value smaller than that will be ignored.
+func WithInitialWindowSize(s int32) DialOption {
+ return func(o *dialOptions) {
+ o.copts.InitialWindowSize = s
+ }
+}
+
+// WithInitialConnWindowSize returns a DialOption which sets the value for initial window size on a connection.
+// The lower bound for window size is 64K and any value smaller than that will be ignored.
+func WithInitialConnWindowSize(s int32) DialOption {
+ return func(o *dialOptions) {
+ o.copts.InitialConnWindowSize = s
+ }
+}
+
+// WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive. Deprecated: use WithDefaultCallOptions(MaxCallRecvMsgSize(s)) instead.
+func WithMaxMsgSize(s int) DialOption {
+ return WithDefaultCallOptions(MaxCallRecvMsgSize(s))
+}
+
+// WithDefaultCallOptions returns a DialOption which sets the default CallOptions for calls over the connection.
+func WithDefaultCallOptions(cos ...CallOption) DialOption {
+ return func(o *dialOptions) {
+ o.callOptions = append(o.callOptions, cos...)
+ }
+}
+
// WithCodec returns a DialOption which sets a codec for message marshaling and unmarshaling.
func WithCodec(c Codec) DialOption {
return func(o *dialOptions) {
@@ -110,30 +173,80 @@ func WithCodec(c Codec) DialOption {
}
}
-// WithCompressor returns a DialOption which sets a CompressorGenerator for generating message
-// compressor.
+// WithCompressor returns a DialOption which sets a Compressor to use for
+// message compression. It has lower priority than the compressor set by
+// the UseCompressor CallOption.
+//
+// Deprecated: use UseCompressor instead.
func WithCompressor(cp Compressor) DialOption {
return func(o *dialOptions) {
o.cp = cp
}
}
-// WithDecompressor returns a DialOption which sets a DecompressorGenerator for generating
-// message decompressor.
+// WithDecompressor returns a DialOption which sets a Decompressor to use for
+// incoming message decompression. If incoming response messages are encoded
+// using the decompressor's Type(), it will be used. Otherwise, the message
+// encoding will be used to look up the compressor registered via
+// encoding.RegisterCompressor, which will then be used to decompress the
+// message. If no compressor is registered for the encoding, an Unimplemented
+// status error will be returned.
+//
+// Deprecated: use encoding.RegisterCompressor instead.
func WithDecompressor(dc Decompressor) DialOption {
return func(o *dialOptions) {
o.dc = dc
}
}
-// WithBalancer returns a DialOption which sets a load balancer.
+// WithBalancer returns a DialOption which sets a load balancer with the v1 API.
+// Name resolver will be ignored if this DialOption is specified.
+//
+// Deprecated: use the new balancer APIs in balancer package and WithBalancerName.
func WithBalancer(b Balancer) DialOption {
return func(o *dialOptions) {
- o.balancer = b
+ o.balancerBuilder = &balancerWrapperBuilder{
+ b: b,
+ }
+ }
+}
+
+// WithBalancerName sets the balancer that the ClientConn will be initialized
+// with. Balancer registered with balancerName will be used. This function
+// panics if no balancer was registered by balancerName.
+//
+// The balancer cannot be overridden by balancer option specified by service
+// config.
+//
+// This is an EXPERIMENTAL API.
+func WithBalancerName(balancerName string) DialOption {
+ builder := balancer.Get(balancerName)
+ if builder == nil {
+ panic(fmt.Sprintf("grpc.WithBalancerName: no balancer is registered for name %v", balancerName))
+ }
+ return func(o *dialOptions) {
+ o.balancerBuilder = builder
+ }
+}
+
+// withResolverBuilder is only for grpclb.
+func withResolverBuilder(b resolver.Builder) DialOption {
+ return func(o *dialOptions) {
+ o.resolverBuilder = b
+ }
+}
+
+// WithResolverUserOptions returns a DialOption which sets the UserOptions
+// field of resolver's BuildOption.
+func WithResolverUserOptions(userOpt interface{}) DialOption {
+ return func(o *dialOptions) {
+ o.resolverBuildUserOptions = userOpt
}
}
// WithServiceConfig returns a DialOption which has a channel to read the service configuration.
+// DEPRECATED: service config should be received through name resolver, as specified here.
+// https://github.com/grpc/grpc/blob/master/doc/service_config.md
func WithServiceConfig(c <-chan ServiceConfig) DialOption {
return func(o *dialOptions) {
o.scChan = c
@@ -158,7 +271,7 @@ func WithBackoffConfig(b BackoffConfig) DialOption {
return withBackoff(b)
}
-// withBackoff sets the backoff strategy used for retries after a
+// withBackoff sets the backoff strategy used for connectRetryNum after a
// failed connection attempt.
//
// This can be exported if arbitrary backoff strategies are allowed by gRPC.
@@ -194,7 +307,7 @@ func WithTransportCredentials(creds credentials.TransportCredentials) DialOption
}
// WithPerRPCCredentials returns a DialOption which sets
-// credentials which will place auth state on each outbound RPC.
+// credentials and places auth state on each outbound RPC.
func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
return func(o *dialOptions) {
o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds)
@@ -203,24 +316,30 @@ func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
// WithTimeout returns a DialOption that configures a timeout for dialing a ClientConn
// initially. This is valid if and only if WithBlock() is present.
+// Deprecated: use DialContext and context.WithTimeout instead.
func WithTimeout(d time.Duration) DialOption {
return func(o *dialOptions) {
o.timeout = d
}
}
+func withContextDialer(f func(context.Context, string) (net.Conn, error)) DialOption {
+ return func(o *dialOptions) {
+ o.copts.Dialer = f
+ }
+}
+
// WithDialer returns a DialOption that specifies a function to use for dialing network addresses.
// If FailOnNonTempDialError() is set to true, and an error is returned by f, gRPC checks the error's
// Temporary() method to decide if it should try to reconnect to the network address.
func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
- return func(o *dialOptions) {
- o.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) {
+ return withContextDialer(
+ func(ctx context.Context, addr string) (net.Conn, error) {
if deadline, ok := ctx.Deadline(); ok {
return f(addr, deadline.Sub(time.Now()))
}
return f(addr, 0)
- }
- }
+ })
}
// WithStatsHandler returns a DialOption that specifies the stats handler
@@ -231,7 +350,7 @@ func WithStatsHandler(h stats.Handler) DialOption {
}
}
-// FailOnNonTempDialError returns a DialOption that specified if gRPC fails on non-temporary dial errors.
+// FailOnNonTempDialError returns a DialOption that specifies if gRPC fails on non-temporary dial errors.
// If f is true, and dialer returns a non-temporary error, gRPC will fail the connection to the network
// address and won't try to reconnect.
// The default value of FailOnNonTempDialError is false.
@@ -249,6 +368,13 @@ func WithUserAgent(s string) DialOption {
}
}
+// WithKeepaliveParams returns a DialOption that specifies keepalive parameters for the client transport.
+func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption {
+ return func(o *dialOptions) {
+ o.copts.KeepaliveParams = kp
+ }
+}
+
// WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs.
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
return func(o *dialOptions) {
@@ -263,25 +389,69 @@ func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
}
}
+// WithAuthority returns a DialOption that specifies the value to be used as
+// the :authority pseudo-header. This value only works with WithInsecure and
+// has no effect if TransportCredentials are present.
+func WithAuthority(a string) DialOption {
+ return func(o *dialOptions) {
+ o.copts.Authority = a
+ }
+}
+
// Dial creates a client connection to the given target.
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), target, opts...)
}
// DialContext creates a client connection to the given target. ctx can be used to
-// cancel or expire the pending connecting. Once this function returns, the
+// cancel or expire the pending connection. Once this function returns, the
// cancellation and expiration of ctx will be noop. Users should call ClientConn.Close
// to terminate all the pending operations after this function returns.
-// This is the EXPERIMENTAL API.
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
- conns: make(map[Address]*addrConn),
+ csMgr: &connectivityStateManager{},
+ conns: make(map[*addrConn]struct{}),
+
+ blockingpicker: newPickerWrapper(),
}
cc.ctx, cc.cancel = context.WithCancel(context.Background())
+
for _, opt := range opts {
opt(&cc.dopts)
}
+
+ if !cc.dopts.insecure {
+ if cc.dopts.copts.TransportCredentials == nil {
+ return nil, errNoTransportSecurity
+ }
+ } else {
+ if cc.dopts.copts.TransportCredentials != nil {
+ return nil, errCredentialsConflict
+ }
+ for _, cd := range cc.dopts.copts.PerRPCCredentials {
+ if cd.RequireTransportSecurity() {
+ return nil, errTransportCredentialsMissing
+ }
+ }
+ }
+
+ cc.mkp = cc.dopts.copts.KeepaliveParams
+
+ if cc.dopts.copts.Dialer == nil {
+ cc.dopts.copts.Dialer = newProxyDialer(
+ func(ctx context.Context, addr string) (net.Conn, error) {
+ return dialContext(ctx, "tcp", addr)
+ },
+ )
+ }
+
+ if cc.dopts.copts.UserAgent != "" {
+ cc.dopts.copts.UserAgent += " " + grpcUA
+ } else {
+ cc.dopts.copts.UserAgent = grpcUA
+ }
+
if cc.dopts.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
@@ -300,15 +470,16 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
}()
+ scSet := false
if cc.dopts.scChan != nil {
- // Wait for the initial service config.
+ // Try to get an initial service config.
select {
case sc, ok := <-cc.dopts.scChan:
if ok {
cc.sc = sc
+ scSet = true
}
- case <-ctx.Done():
- return nil, ctx.Err()
+ default:
}
}
// Set defaults.
@@ -318,110 +489,114 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
if cc.dopts.bs == nil {
cc.dopts.bs = DefaultBackoffConfig
}
+ cc.parsedTarget = parseTarget(cc.target)
creds := cc.dopts.copts.TransportCredentials
if creds != nil && creds.Info().ServerName != "" {
cc.authority = creds.Info().ServerName
+ } else if cc.dopts.insecure && cc.dopts.copts.Authority != "" {
+ cc.authority = cc.dopts.copts.Authority
} else {
- colonPos := strings.LastIndex(target, ":")
- if colonPos == -1 {
- colonPos = len(target)
- }
- cc.authority = target[:colonPos]
+ // Use endpoint from "scheme://authority/endpoint" as the default
+ // authority for ClientConn.
+ cc.authority = cc.parsedTarget.Endpoint
}
- var ok bool
- waitC := make(chan error, 1)
- go func() {
- var addrs []Address
- if cc.dopts.balancer == nil && cc.sc.LB != nil {
- cc.dopts.balancer = cc.sc.LB
- }
- if cc.dopts.balancer == nil {
- // Connect to target directly if balancer is nil.
- addrs = append(addrs, Address{Addr: target})
- } else {
- var credsClone credentials.TransportCredentials
- if creds != nil {
- credsClone = creds.Clone()
- }
- config := BalancerConfig{
- DialCreds: credsClone,
- }
- if err := cc.dopts.balancer.Start(target, config); err != nil {
- waitC <- err
- return
- }
- ch := cc.dopts.balancer.Notify()
- if ch == nil {
- // There is no name resolver installed.
- addrs = append(addrs, Address{Addr: target})
- } else {
- addrs, ok = <-ch
- if !ok || len(addrs) == 0 {
- waitC <- errNoAddr
- return
- }
- }
- }
- for _, a := range addrs {
- if err := cc.resetAddrConn(a, false, nil); err != nil {
- waitC <- err
- return
+
+ if cc.dopts.scChan != nil && !scSet {
+ // Blocking wait for the initial service config.
+ select {
+ case sc, ok := <-cc.dopts.scChan:
+ if ok {
+ cc.sc = sc
}
- }
- close(waitC)
- }()
- select {
- case <-ctx.Done():
- return nil, ctx.Err()
- case err := <-waitC:
- if err != nil {
- return nil, err
+ case <-ctx.Done():
+ return nil, ctx.Err()
}
}
+ if cc.dopts.scChan != nil {
+ go cc.scWatcher()
+ }
- // If balancer is nil or balancer.Notify() is nil, ok will be false here.
- // The lbWatcher goroutine will not be created.
- if ok {
- go cc.lbWatcher()
+ var credsClone credentials.TransportCredentials
+ if creds := cc.dopts.copts.TransportCredentials; creds != nil {
+ credsClone = creds.Clone()
+ }
+ cc.balancerBuildOpts = balancer.BuildOptions{
+ DialCreds: credsClone,
+ Dialer: cc.dopts.copts.Dialer,
}
- if cc.dopts.scChan != nil {
- go cc.scWatcher()
+ // Build the resolver.
+ cc.resolverWrapper, err = newCCResolverWrapper(cc)
+ if err != nil {
+ return nil, fmt.Errorf("failed to build resolver: %v", err)
}
+ // Start the resolver wrapper goroutine after resolverWrapper is created.
+ //
+ // If the goroutine is started before resolverWrapper is ready, the
+ // following may happen: The goroutine sends updates to cc. cc forwards
+ // those to balancer. Balancer creates new addrConn. addrConn fails to
+ // connect, and calls resolveNow(). resolveNow() tries to use the non-ready
+ // resolverWrapper.
+ cc.resolverWrapper.start()
+
+ // A blocking dial blocks until the clientConn is ready.
+ if cc.dopts.block {
+ for {
+ s := cc.GetState()
+ if s == connectivity.Ready {
+ break
+ }
+ if !cc.WaitForStateChange(ctx, s) {
+ // ctx got timeout or canceled.
+ return nil, ctx.Err()
+ }
+ }
+ }
+
return cc, nil
}
-// ConnectivityState indicates the state of a client connection.
-type ConnectivityState int
+// connectivityStateManager keeps the connectivity.State of ClientConn.
+// This struct will eventually be exported so the balancers can access it.
+type connectivityStateManager struct {
+ mu sync.Mutex
+ state connectivity.State
+ notifyChan chan struct{}
+}
-const (
- // Idle indicates the ClientConn is idle.
- Idle ConnectivityState = iota
- // Connecting indicates the ClienConn is connecting.
- Connecting
- // Ready indicates the ClientConn is ready for work.
- Ready
- // TransientFailure indicates the ClientConn has seen a failure but expects to recover.
- TransientFailure
- // Shutdown indicates the ClientConn has started shutting down.
- Shutdown
-)
+// updateState updates the connectivity.State of ClientConn.
+// If there's a change it notifies goroutines waiting on state change to
+// happen.
+func (csm *connectivityStateManager) updateState(state connectivity.State) {
+ csm.mu.Lock()
+ defer csm.mu.Unlock()
+ if csm.state == connectivity.Shutdown {
+ return
+ }
+ if csm.state == state {
+ return
+ }
+ csm.state = state
+ if csm.notifyChan != nil {
+ // There are other goroutines waiting on this channel.
+ close(csm.notifyChan)
+ csm.notifyChan = nil
+ }
+}
+
+func (csm *connectivityStateManager) getState() connectivity.State {
+ csm.mu.Lock()
+ defer csm.mu.Unlock()
+ return csm.state
+}
-func (s ConnectivityState) String() string {
- switch s {
- case Idle:
- return "IDLE"
- case Connecting:
- return "CONNECTING"
- case Ready:
- return "READY"
- case TransientFailure:
- return "TRANSIENT_FAILURE"
- case Shutdown:
- return "SHUTDOWN"
- default:
- panic(fmt.Sprintf("unknown connectivity state: %d", s))
+func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
+ csm.mu.Lock()
+ defer csm.mu.Unlock()
+ if csm.notifyChan == nil {
+ csm.notifyChan = make(chan struct{})
}
+ return csm.notifyChan
}
// ClientConn represents a client connection to an RPC server.
@@ -429,50 +604,50 @@ type ClientConn struct {
ctx context.Context
cancel context.CancelFunc
- target string
- authority string
- dopts dialOptions
+ target string
+ parsedTarget resolver.Target
+ authority string
+ dopts dialOptions
+ csMgr *connectivityStateManager
+
+ balancerBuildOpts balancer.BuildOptions
+ resolverWrapper *ccResolverWrapper
+ blockingpicker *pickerWrapper
mu sync.RWMutex
sc ServiceConfig
- conns map[Address]*addrConn
+ scRaw string
+ conns map[*addrConn]struct{}
+ // Keepalive parameter can be updated if a GoAway is received.
+ mkp keepalive.ClientParameters
+ curBalancerName string
+ preBalancerName string // previous balancer name.
+ curAddresses []resolver.Address
+ balancerWrapper *ccBalancerWrapper
}
-func (cc *ClientConn) lbWatcher() {
- for addrs := range cc.dopts.balancer.Notify() {
- var (
- add []Address // Addresses need to setup connections.
- del []*addrConn // Connections need to tear down.
- )
- cc.mu.Lock()
- for _, a := range addrs {
- if _, ok := cc.conns[a]; !ok {
- add = append(add, a)
- }
- }
- for k, c := range cc.conns {
- var keep bool
- for _, a := range addrs {
- if k == a {
- keep = true
- break
- }
- }
- if !keep {
- del = append(del, c)
- delete(cc.conns, c.addr)
- }
- }
- cc.mu.Unlock()
- for _, a := range add {
- cc.resetAddrConn(a, true, nil)
- }
- for _, c := range del {
- c.tearDown(errConnDrain)
- }
+// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
+// ctx expires. A true value is returned in former case and false in latter.
+// This is an EXPERIMENTAL API.
+func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
+ ch := cc.csMgr.getNotifyChan()
+ if cc.csMgr.getState() != sourceState {
+ return true
+ }
+ select {
+ case <-ctx.Done():
+ return false
+ case <-ch:
+ return true
}
}
+// GetState returns the connectivity.State of ClientConn.
+// This is an EXPERIMENTAL API.
+func (cc *ClientConn) GetState() connectivity.State {
+ return cc.csMgr.getState()
+}
+
func (cc *ClientConn) scWatcher() {
for {
select {
@@ -484,6 +659,7 @@ func (cc *ClientConn) scWatcher() {
// TODO: load balance policy runtime change is ignored.
// We may revist this decision in the future.
cc.sc = sc
+ cc.scRaw = ""
cc.mu.Unlock()
case <-cc.ctx.Done():
return
@@ -491,151 +667,271 @@ func (cc *ClientConn) scWatcher() {
}
}
-// resetAddrConn creates an addrConn for addr and adds it to cc.conns.
-// If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
-// If tearDownErr is nil, errConnDrain will be used instead.
-func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr error) error {
- ac := &addrConn{
- cc: cc,
- addr: addr,
- dopts: cc.dopts,
+func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) {
+ cc.mu.Lock()
+ defer cc.mu.Unlock()
+ if cc.conns == nil {
+ // cc was closed.
+ return
}
- ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
- ac.stateCV = sync.NewCond(&ac.mu)
- if EnableTracing {
- ac.events = trace.NewEventLog("grpc.ClientConn", ac.addr.Addr)
+
+ if reflect.DeepEqual(cc.curAddresses, addrs) {
+ return
}
- if !ac.dopts.insecure {
- if ac.dopts.copts.TransportCredentials == nil {
- return errNoTransportSecurity
- }
- } else {
- if ac.dopts.copts.TransportCredentials != nil {
- return errCredentialsConflict
+
+ cc.curAddresses = addrs
+
+ if cc.dopts.balancerBuilder == nil {
+ // Only look at balancer types and switch balancer if balancer dial
+ // option is not set.
+ var isGRPCLB bool
+ for _, a := range addrs {
+ if a.Type == resolver.GRPCLB {
+ isGRPCLB = true
+ break
+ }
}
- for _, cd := range ac.dopts.copts.PerRPCCredentials {
- if cd.RequireTransportSecurity() {
- return errTransportCredentialsMissing
+ var newBalancerName string
+ if isGRPCLB {
+ newBalancerName = grpclbName
+ } else {
+ // Address list doesn't contain grpclb address. Try to pick a
+ // non-grpclb balancer.
+ newBalancerName = cc.curBalancerName
+ // If current balancer is grpclb, switch to the previous one.
+ if newBalancerName == grpclbName {
+ newBalancerName = cc.preBalancerName
+ }
+ // The following could be true in two cases:
+ // - the first time handling resolved addresses
+ // (curBalancerName="")
+ // - the first time handling non-grpclb addresses
+ // (curBalancerName="grpclb", preBalancerName="")
+ if newBalancerName == "" {
+ newBalancerName = PickFirstBalancerName
}
}
+ cc.switchBalancer(newBalancerName)
+ } else if cc.balancerWrapper == nil {
+ // Balancer dial option was set, and this is the first time handling
+ // resolved addresses. Build a balancer with dopts.balancerBuilder.
+ cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
}
+
+ cc.balancerWrapper.handleResolvedAddrs(addrs, nil)
+}
+
+// switchBalancer starts the switching from current balancer to the balancer
+// with the given name.
+//
+// It will NOT send the current address list to the new balancer. If needed,
+// caller of this function should send address list to the new balancer after
+// this function returns.
+//
+// Caller must hold cc.mu.
+func (cc *ClientConn) switchBalancer(name string) {
+ if cc.conns == nil {
+ return
+ }
+
+ if strings.ToLower(cc.curBalancerName) == strings.ToLower(name) {
+ return
+ }
+
+ grpclog.Infof("ClientConn switching balancer to %q", name)
+ if cc.dopts.balancerBuilder != nil {
+ grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead")
+ return
+ }
+ // TODO(bar switching) change this to two steps: drain and close.
+ // Keep track of sc in wrapper.
+ if cc.balancerWrapper != nil {
+ cc.balancerWrapper.close()
+ }
+
+ builder := balancer.Get(name)
+ if builder == nil {
+ grpclog.Infof("failed to get balancer builder for: %v, using pick_first instead", name)
+ builder = newPickfirstBuilder()
+ }
+ cc.preBalancerName = cc.curBalancerName
+ cc.curBalancerName = builder.Name()
+ cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
+}
+
+func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
+ cc.mu.Lock()
+ if cc.conns == nil {
+ cc.mu.Unlock()
+ return
+ }
+ // TODO(bar switching) send updates to all balancer wrappers when balancer
+ // gracefully switching is supported.
+ cc.balancerWrapper.handleSubConnStateChange(sc, s)
+ cc.mu.Unlock()
+}
+
+// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
+//
+// Caller needs to make sure len(addrs) > 0.
+func (cc *ClientConn) newAddrConn(addrs []resolver.Address) (*addrConn, error) {
+ ac := &addrConn{
+ cc: cc,
+ addrs: addrs,
+ dopts: cc.dopts,
+ }
+ ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
// Track ac in cc. This needs to be done before any getTransport(...) is called.
cc.mu.Lock()
if cc.conns == nil {
cc.mu.Unlock()
- return ErrClientConnClosing
+ return nil, ErrClientConnClosing
}
- stale := cc.conns[ac.addr]
- cc.conns[ac.addr] = ac
+ cc.conns[ac] = struct{}{}
cc.mu.Unlock()
- if stale != nil {
- // There is an addrConn alive on ac.addr already. This could be due to
- // 1) a buggy Balancer notifies duplicated Addresses;
- // 2) goaway was received, a new ac will replace the old ac.
- // The old ac should be deleted from cc.conns, but the
- // underlying transport should drain rather than close.
- if tearDownErr == nil {
- // tearDownErr is nil if resetAddrConn is called by
- // 1) Dial
- // 2) lbWatcher
- // In both cases, the stale ac should drain, not close.
- stale.tearDown(errConnDrain)
- } else {
- stale.tearDown(tearDownErr)
- }
+ return ac, nil
+}
+
+// removeAddrConn removes the addrConn in the subConn from clientConn.
+// It also tears down the ac with the given error.
+func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
+ cc.mu.Lock()
+ if cc.conns == nil {
+ cc.mu.Unlock()
+ return
}
- // skipWait may overwrite the decision in ac.dopts.block.
- if ac.dopts.block && !skipWait {
- if err := ac.resetTransport(false); err != nil {
+ delete(cc.conns, ac)
+ cc.mu.Unlock()
+ ac.tearDown(err)
+}
+
+// connect starts to creating transport and also starts the transport monitor
+// goroutine for this ac.
+// It does nothing if the ac is not IDLE.
+// TODO(bar) Move this to the addrConn section.
+// This was part of resetAddrConn, keep it here to make the diff look clean.
+func (ac *addrConn) connect() error {
+ ac.mu.Lock()
+ if ac.state == connectivity.Shutdown {
+ ac.mu.Unlock()
+ return errConnClosing
+ }
+ if ac.state != connectivity.Idle {
+ ac.mu.Unlock()
+ return nil
+ }
+ ac.state = connectivity.Connecting
+ ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
+ ac.mu.Unlock()
+
+ // Start a goroutine connecting to the server asynchronously.
+ go func() {
+ if err := ac.resetTransport(); err != nil {
+ grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addrs[0].Addr, err)
if err != errConnClosing {
- // Tear down ac and delete it from cc.conns.
- cc.mu.Lock()
- delete(cc.conns, ac.addr)
- cc.mu.Unlock()
+ // Keep this ac in cc.conns, to get the reason it's torn down.
ac.tearDown(err)
}
- if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
- return e.Origin()
- }
- return err
+ return
}
- // Start to monitor the error status of transport.
- go ac.transportMonitor()
- } else {
- // Start a goroutine connecting to the server asynchronously.
- go func() {
- if err := ac.resetTransport(false); err != nil {
- grpclog.Printf("Failed to dial %s: %v; please retry.", ac.addr.Addr, err)
- if err != errConnClosing {
- // Keep this ac in cc.conns, to get the reason it's torn down.
- ac.tearDown(err)
- }
- return
- }
- ac.transportMonitor()
- }()
- }
+ ac.transportMonitor()
+ }()
return nil
}
-// TODO: Avoid the locking here.
-func (cc *ClientConn) getMethodConfig(method string) (m MethodConfig, ok bool) {
- cc.mu.RLock()
- defer cc.mu.RUnlock()
- m, ok = cc.sc.Methods[method]
- return
-}
+// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
+//
+// It checks whether current connected address of ac is in the new addrs list.
+// - If true, it updates ac.addrs and returns true. The ac will keep using
+// the existing connection.
+// - If false, it does nothing and returns false.
+func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
+ ac.mu.Lock()
+ defer ac.mu.Unlock()
+ grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
+ if ac.state == connectivity.Shutdown {
+ ac.addrs = addrs
+ return true
+ }
-func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) {
- var (
- ac *addrConn
- ok bool
- put func()
- )
- if cc.dopts.balancer == nil {
- // If balancer is nil, there should be only one addrConn available.
- cc.mu.RLock()
- if cc.conns == nil {
- cc.mu.RUnlock()
- return nil, nil, toRPCErr(ErrClientConnClosing)
- }
- for _, ac = range cc.conns {
- // Break after the first iteration to get the first addrConn.
- ok = true
+ var curAddrFound bool
+ for _, a := range addrs {
+ if reflect.DeepEqual(ac.curAddr, a) {
+ curAddrFound = true
break
}
- cc.mu.RUnlock()
- } else {
- var (
- addr Address
- err error
- )
- addr, put, err = cc.dopts.balancer.Get(ctx, opts)
- if err != nil {
- return nil, nil, toRPCErr(err)
- }
- cc.mu.RLock()
- if cc.conns == nil {
- cc.mu.RUnlock()
- return nil, nil, toRPCErr(ErrClientConnClosing)
- }
- ac, ok = cc.conns[addr]
- cc.mu.RUnlock()
}
+ grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
+ if curAddrFound {
+ ac.addrs = addrs
+ ac.reconnectIdx = 0 // Start reconnecting from beginning in the new list.
+ }
+
+ return curAddrFound
+}
+
+// GetMethodConfig gets the method config of the input method.
+// If there's an exact match for input method (i.e. /service/method), we return
+// the corresponding MethodConfig.
+// If there isn't an exact match for the input method, we look for the default config
+// under the service (i.e /service/). If there is a default MethodConfig for
+// the serivce, we return it.
+// Otherwise, we return an empty MethodConfig.
+func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
+ // TODO: Avoid the locking here.
+ cc.mu.RLock()
+ defer cc.mu.RUnlock()
+ m, ok := cc.sc.Methods[method]
if !ok {
- if put != nil {
- put()
- }
- return nil, nil, errConnClosing
+ i := strings.LastIndex(method, "/")
+ m, _ = cc.sc.Methods[method[:i+1]]
}
- t, err := ac.wait(ctx, cc.dopts.balancer != nil, !opts.BlockingWait)
+ return m
+}
+
+func (cc *ClientConn) getTransport(ctx context.Context, failfast bool) (transport.ClientTransport, func(balancer.DoneInfo), error) {
+ t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{})
if err != nil {
- if put != nil {
- put()
+ return nil, nil, toRPCErr(err)
+ }
+ return t, done, nil
+}
+
+// handleServiceConfig parses the service config string in JSON format to Go native
+// struct ServiceConfig, and store both the struct and the JSON string in ClientConn.
+func (cc *ClientConn) handleServiceConfig(js string) error {
+ sc, err := parseServiceConfig(js)
+ if err != nil {
+ return err
+ }
+ cc.mu.Lock()
+ cc.scRaw = js
+ cc.sc = sc
+ if sc.LB != nil && *sc.LB != grpclbName { // "grpclb" is not a valid balancer option in service config.
+ if cc.curBalancerName == grpclbName {
+ // If current balancer is grpclb, there's at least one grpclb
+ // balancer address in the resolved list. Don't switch the balancer,
+ // but change the previous balancer name, so if a new resolved
+ // address list doesn't contain grpclb address, balancer will be
+ // switched to *sc.LB.
+ cc.preBalancerName = *sc.LB
+ } else {
+ cc.switchBalancer(*sc.LB)
+ cc.balancerWrapper.handleResolvedAddrs(cc.curAddresses, nil)
}
- return nil, nil, err
}
- return t, put, nil
+ cc.mu.Unlock()
+ return nil
+}
+
+func (cc *ClientConn) resolveNow(o resolver.ResolveNowOption) {
+ cc.mu.Lock()
+ r := cc.resolverWrapper
+ cc.mu.Unlock()
+ if r == nil {
+ return
+ }
+ go r.resolveNow(o)
}
// Close tears down the ClientConn and all underlying connections.
@@ -649,11 +945,21 @@ func (cc *ClientConn) Close() error {
}
conns := cc.conns
cc.conns = nil
+ cc.csMgr.updateState(connectivity.Shutdown)
+
+ rWrapper := cc.resolverWrapper
+ cc.resolverWrapper = nil
+ bWrapper := cc.balancerWrapper
+ cc.balancerWrapper = nil
cc.mu.Unlock()
- if cc.dopts.balancer != nil {
- cc.dopts.balancer.Close()
+ cc.blockingpicker.close()
+ if rWrapper != nil {
+ rWrapper.close()
+ }
+ if bWrapper != nil {
+ bWrapper.close()
}
- for _, ac := range conns {
+ for ac := range conns {
ac.tearDown(ErrClientConnClosing)
}
return nil
@@ -665,14 +971,15 @@ type addrConn struct {
cancel context.CancelFunc
cc *ClientConn
- addr Address
+ addrs []resolver.Address
dopts dialOptions
events trace.EventLog
+ acbw balancer.SubConn
- mu sync.Mutex
- state ConnectivityState
- stateCV *sync.Cond
- down func(error) // the handler called when a connection is down.
+ mu sync.Mutex
+ curAddr resolver.Address
+ reconnectIdx int // The index in addrs list to start reconnecting from.
+ state connectivity.State
// ready is closed and becomes nil when a new transport is up or failed
// due to timeout.
ready chan struct{}
@@ -680,6 +987,28 @@ type addrConn struct {
// The reason this addrConn is torn down.
tearDownErr error
+
+ connectRetryNum int
+ // backoffDeadline is the time until which resetTransport needs to
+ // wait before increasing connectRetryNum count.
+ backoffDeadline time.Time
+ // connectDeadline is the time by which all connection
+ // negotiations must complete.
+ connectDeadline time.Time
+}
+
+// adjustParams updates parameters used to create transports upon
+// receiving a GoAway.
+func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
+ switch r {
+ case transport.GoAwayTooManyPings:
+ v := 2 * ac.dopts.copts.KeepaliveParams.Time
+ ac.cc.mu.Lock()
+ if v > ac.cc.mkp.Time {
+ ac.cc.mkp.Time = v
+ }
+ ac.cc.mu.Unlock()
+ }
}
// printf records an event in ac's event log, unless ac has been closed.
@@ -698,198 +1027,270 @@ func (ac *addrConn) errorf(format string, a ...interface{}) {
}
}
-// getState returns the connectivity state of the Conn
-func (ac *addrConn) getState() ConnectivityState {
- ac.mu.Lock()
- defer ac.mu.Unlock()
- return ac.state
-}
-
-// waitForStateChange blocks until the state changes to something other than the sourceState.
-func (ac *addrConn) waitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) {
+// resetTransport recreates a transport to the address for ac. The old
+// transport will close itself on error or when the clientconn is closed.
+// The created transport must receive initial settings frame from the server.
+// In case that doesnt happen, transportMonitor will kill the newly created
+// transport after connectDeadline has expired.
+// In case there was an error on the transport before the settings frame was
+// received, resetTransport resumes connecting to backends after the one that
+// was previously connected to. In case end of the list is reached, resetTransport
+// backs off until the original deadline.
+// If the DialOption WithWaitForHandshake was set, resetTrasport returns
+// successfully only after server settings are received.
+//
+// TODO(bar) make sure all state transitions are valid.
+func (ac *addrConn) resetTransport() error {
ac.mu.Lock()
- defer ac.mu.Unlock()
- if sourceState != ac.state {
- return ac.state, nil
+ if ac.state == connectivity.Shutdown {
+ ac.mu.Unlock()
+ return errConnClosing
}
- done := make(chan struct{})
- var err error
- go func() {
- select {
- case <-ctx.Done():
- ac.mu.Lock()
- err = ctx.Err()
- ac.stateCV.Broadcast()
- ac.mu.Unlock()
- case <-done:
- }
- }()
- defer close(done)
- for sourceState == ac.state {
- ac.stateCV.Wait()
- if err != nil {
- return ac.state, err
- }
+ if ac.ready != nil {
+ close(ac.ready)
+ ac.ready = nil
}
- return ac.state, nil
-}
-
-func (ac *addrConn) resetTransport(closeTransport bool) error {
- for retries := 0; ; retries++ {
+ ac.transport = nil
+ ridx := ac.reconnectIdx
+ ac.mu.Unlock()
+ ac.cc.mu.RLock()
+ ac.dopts.copts.KeepaliveParams = ac.cc.mkp
+ ac.cc.mu.RUnlock()
+ var backoffDeadline, connectDeadline time.Time
+ for connectRetryNum := 0; ; connectRetryNum++ {
ac.mu.Lock()
- ac.printf("connecting")
- if ac.state == Shutdown {
- // ac.tearDown(...) has been invoked.
+ if ac.backoffDeadline.IsZero() {
+ // This means either a successful HTTP2 connection was established
+ // or this is the first time this addrConn is trying to establish a
+ // connection.
+ backoffFor := ac.dopts.bs.backoff(connectRetryNum) // time.Duration.
+ // This will be the duration that dial gets to finish.
+ dialDuration := minConnectTimeout
+ if backoffFor > dialDuration {
+ // Give dial more time as we keep failing to connect.
+ dialDuration = backoffFor
+ }
+ start := time.Now()
+ backoffDeadline = start.Add(backoffFor)
+ connectDeadline = start.Add(dialDuration)
+ ridx = 0 // Start connecting from the beginning.
+ } else {
+ // Continue trying to conect with the same deadlines.
+ connectRetryNum = ac.connectRetryNum
+ backoffDeadline = ac.backoffDeadline
+ connectDeadline = ac.connectDeadline
+ ac.backoffDeadline = time.Time{}
+ ac.connectDeadline = time.Time{}
+ ac.connectRetryNum = 0
+ }
+ if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return errConnClosing
}
- if ac.down != nil {
- ac.down(downErrorf(false, true, "%v", errNetworkIO))
- ac.down = nil
+ ac.printf("connecting")
+ if ac.state != connectivity.Connecting {
+ ac.state = connectivity.Connecting
+ ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
}
- ac.state = Connecting
- ac.stateCV.Broadcast()
- t := ac.transport
+ // copy ac.addrs in case of race
+ addrsIter := make([]resolver.Address, len(ac.addrs))
+ copy(addrsIter, ac.addrs)
+ copts := ac.dopts.copts
ac.mu.Unlock()
- if closeTransport && t != nil {
- t.Close()
+ connected, err := ac.createTransport(connectRetryNum, ridx, backoffDeadline, connectDeadline, addrsIter, copts)
+ if err != nil {
+ return err
}
- sleepTime := ac.dopts.bs.backoff(retries)
- timeout := minConnectTimeout
- if timeout < sleepTime {
- timeout = sleepTime
+ if connected {
+ return nil
}
- ctx, cancel := context.WithTimeout(ac.ctx, timeout)
- connectTime := time.Now()
- sinfo := transport.TargetInfo{
- Addr: ac.addr.Addr,
- Metadata: ac.addr.Metadata,
+ }
+}
+
+// createTransport creates a connection to one of the backends in addrs.
+// It returns true if a connection was established.
+func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline, connectDeadline time.Time, addrs []resolver.Address, copts transport.ConnectOptions) (bool, error) {
+ for i := ridx; i < len(addrs); i++ {
+ addr := addrs[i]
+ target := transport.TargetInfo{
+ Addr: addr.Addr,
+ Metadata: addr.Metadata,
+ Authority: ac.cc.authority,
}
- newTransport, err := transport.NewClientTransport(ctx, sinfo, ac.dopts.copts)
+ done := make(chan struct{})
+ onPrefaceReceipt := func() {
+ ac.mu.Lock()
+ close(done)
+ if !ac.backoffDeadline.IsZero() {
+ // If we haven't already started reconnecting to
+ // other backends.
+ // Note, this can happen when writer notices an error
+ // and triggers resetTransport while at the same time
+ // reader receives the preface and invokes this closure.
+ ac.backoffDeadline = time.Time{}
+ ac.connectDeadline = time.Time{}
+ ac.connectRetryNum = 0
+ }
+ ac.mu.Unlock()
+ }
+ // Do not cancel in the success path because of
+ // this issue in Go1.6: https://github.com/golang/go/issues/15078.
+ connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
+ newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt)
if err != nil {
cancel()
-
if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
- return err
+ ac.mu.Lock()
+ if ac.state != connectivity.Shutdown {
+ ac.state = connectivity.TransientFailure
+ ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
+ }
+ ac.mu.Unlock()
+ return false, err
}
- grpclog.Printf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, ac.addr)
ac.mu.Lock()
- if ac.state == Shutdown {
+ if ac.state == connectivity.Shutdown {
// ac.tearDown(...) has been invoked.
ac.mu.Unlock()
- return errConnClosing
- }
- ac.errorf("transient failure: %v", err)
- ac.state = TransientFailure
- ac.stateCV.Broadcast()
- if ac.ready != nil {
- close(ac.ready)
- ac.ready = nil
+ return false, errConnClosing
}
ac.mu.Unlock()
- closeTransport = false
+ grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
+ continue
+ }
+ if ac.dopts.waitForHandshake {
select {
- case <-time.After(sleepTime - time.Since(connectTime)):
+ case <-done:
+ case <-connectCtx.Done():
+ // Didn't receive server preface, must kill this new transport now.
+ grpclog.Warningf("grpc: addrConn.createTransport failed to receive server preface before deadline.")
+ newTr.Close()
+ break
case <-ac.ctx.Done():
- return ac.ctx.Err()
}
- continue
}
ac.mu.Lock()
- ac.printf("ready")
- if ac.state == Shutdown {
- // ac.tearDown(...) has been invoked.
+ if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
- newTransport.Close()
- return errConnClosing
+ // ac.tearDonn(...) has been invoked.
+ newTr.Close()
+ return false, errConnClosing
}
- ac.state = Ready
- ac.stateCV.Broadcast()
- ac.transport = newTransport
+ ac.printf("ready")
+ ac.state = connectivity.Ready
+ ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
+ ac.transport = newTr
+ ac.curAddr = addr
if ac.ready != nil {
close(ac.ready)
ac.ready = nil
}
- if ac.cc.dopts.balancer != nil {
- ac.down = ac.cc.dopts.balancer.Up(ac.addr)
+ select {
+ case <-done:
+ // If the server has responded back with preface already,
+ // don't set the reconnect parameters.
+ default:
+ ac.connectRetryNum = connectRetryNum
+ ac.backoffDeadline = backoffDeadline
+ ac.connectDeadline = connectDeadline
+ ac.reconnectIdx = i + 1 // Start reconnecting from the next backend in the list.
}
ac.mu.Unlock()
- return nil
+ return true, nil
+ }
+ ac.mu.Lock()
+ ac.state = connectivity.TransientFailure
+ ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
+ ac.cc.resolveNow(resolver.ResolveNowOption{})
+ if ac.ready != nil {
+ close(ac.ready)
+ ac.ready = nil
}
+ ac.mu.Unlock()
+ timer := time.NewTimer(backoffDeadline.Sub(time.Now()))
+ select {
+ case <-timer.C:
+ case <-ac.ctx.Done():
+ timer.Stop()
+ return false, ac.ctx.Err()
+ }
+ return false, nil
}
// Run in a goroutine to track the error in transport and create the
// new transport if an error happens. It returns when the channel is closing.
func (ac *addrConn) transportMonitor() {
for {
+ var timer *time.Timer
+ var cdeadline <-chan time.Time
ac.mu.Lock()
t := ac.transport
+ if !ac.connectDeadline.IsZero() {
+ timer = time.NewTimer(ac.connectDeadline.Sub(time.Now()))
+ cdeadline = timer.C
+ }
ac.mu.Unlock()
+ // Block until we receive a goaway or an error occurs.
select {
- // This is needed to detect the teardown when
- // the addrConn is idle (i.e., no RPC in flight).
- case <-ac.ctx.Done():
- select {
- case <-t.Error():
- t.Close()
- default:
- }
- return
case <-t.GoAway():
- // If GoAway happens without any network I/O error, ac is closed without shutting down the
- // underlying transport (the transport will be closed when all the pending RPCs finished or
- // failed.).
- // If GoAway and some network I/O error happen concurrently, ac and its underlying transport
- // are closed.
- // In both cases, a new ac is created.
- select {
- case <-t.Error():
- ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
- default:
- ac.cc.resetAddrConn(ac.addr, true, errConnDrain)
- }
- return
case <-t.Error():
- select {
- case <-ac.ctx.Done():
- t.Close()
- return
- case <-t.GoAway():
- ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
- return
- default:
- }
+ case <-cdeadline:
ac.mu.Lock()
- if ac.state == Shutdown {
- // ac has been shutdown.
+ // This implies that client received server preface.
+ if ac.backoffDeadline.IsZero() {
ac.mu.Unlock()
- return
+ continue
}
- ac.state = TransientFailure
- ac.stateCV.Broadcast()
ac.mu.Unlock()
- if err := ac.resetTransport(true); err != nil {
- ac.mu.Lock()
- ac.printf("transport exiting: %v", err)
- ac.mu.Unlock()
- grpclog.Printf("grpc: addrConn.transportMonitor exits due to: %v", err)
- if err != errConnClosing {
- // Keep this ac in cc.conns, to get the reason it's torn down.
- ac.tearDown(err)
- }
- return
+ timer = nil
+ // No server preface received until deadline.
+ // Kill the connection.
+ grpclog.Warningf("grpc: addrConn.transportMonitor didn't get server preface after waiting. Closing the new transport now.")
+ t.Close()
+ }
+ if timer != nil {
+ timer.Stop()
+ }
+ // If a GoAway happened, regardless of error, adjust our keepalive
+ // parameters as appropriate.
+ select {
+ case <-t.GoAway():
+ ac.adjustParams(t.GetGoAwayReason())
+ default:
+ }
+ ac.mu.Lock()
+ if ac.state == connectivity.Shutdown {
+ ac.mu.Unlock()
+ return
+ }
+ // Set connectivity state to TransientFailure before calling
+ // resetTransport. Transition READY->CONNECTING is not valid.
+ ac.state = connectivity.TransientFailure
+ ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
+ ac.cc.resolveNow(resolver.ResolveNowOption{})
+ ac.curAddr = resolver.Address{}
+ ac.mu.Unlock()
+ if err := ac.resetTransport(); err != nil {
+ ac.mu.Lock()
+ ac.printf("transport exiting: %v", err)
+ ac.mu.Unlock()
+ grpclog.Warningf("grpc: addrConn.transportMonitor exits due to: %v", err)
+ if err != errConnClosing {
+ // Keep this ac in cc.conns, to get the reason it's torn down.
+ ac.tearDown(err)
}
+ return
}
}
}
// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
-// iv) transport is in TransientFailure and there is a balancer/failfast is true.
+// iv) transport is in connectivity.TransientFailure and there is a balancer/failfast is true.
func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) {
for {
ac.mu.Lock()
switch {
- case ac.state == Shutdown:
+ case ac.state == connectivity.Shutdown:
if failfast || !hasBalancer {
// RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr.
err := ac.tearDownErr
@@ -898,11 +1299,11 @@ func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (trans
}
ac.mu.Unlock()
return nil, errConnClosing
- case ac.state == Ready:
+ case ac.state == connectivity.Ready:
ct := ac.transport
ac.mu.Unlock()
return ct, nil
- case ac.state == TransientFailure:
+ case ac.state == connectivity.TransientFailure:
if failfast || hasBalancer {
ac.mu.Unlock()
return nil, errConnUnavailable
@@ -923,6 +1324,28 @@ func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (trans
}
}
+// getReadyTransport returns the transport if ac's state is READY.
+// Otherwise it returns nil, false.
+// If ac's state is IDLE, it will trigger ac to connect.
+func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
+ ac.mu.Lock()
+ if ac.state == connectivity.Ready {
+ t := ac.transport
+ ac.mu.Unlock()
+ return t, true
+ }
+ var idle bool
+ if ac.state == connectivity.Idle {
+ idle = true
+ }
+ ac.mu.Unlock()
+ // Trigger idle ac to connect.
+ if idle {
+ ac.connect()
+ }
+ return nil, false
+}
+
// tearDown starts to tear down the addrConn.
// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
// some edge cases (e.g., the caller opens and closes many addrConn's in a
@@ -930,13 +1353,12 @@ func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (trans
// tearDown doesn't remove ac from ac.cc.conns.
func (ac *addrConn) tearDown(err error) {
ac.cancel()
-
ac.mu.Lock()
defer ac.mu.Unlock()
- if ac.down != nil {
- ac.down(downErrorf(false, false, "%v", err))
- ac.down = nil
+ if ac.state == connectivity.Shutdown {
+ return
}
+ ac.curAddr = resolver.Address{}
if err == errConnDrain && ac.transport != nil {
// GracefulClose(...) may be executed multiple times when
// i) receiving multiple GoAway frames from the server; or
@@ -944,12 +1366,9 @@ func (ac *addrConn) tearDown(err error) {
// address removal and GoAway.
ac.transport.GracefulClose()
}
- if ac.state == Shutdown {
- return
- }
- ac.state = Shutdown
+ ac.state = connectivity.Shutdown
ac.tearDownErr = err
- ac.stateCV.Broadcast()
+ ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
if ac.events != nil {
ac.events.Finish()
ac.events = nil
@@ -958,8 +1377,11 @@ func (ac *addrConn) tearDown(err error) {
close(ac.ready)
ac.ready = nil
}
- if ac.transport != nil && err != errConnDrain {
- ac.transport.Close()
- }
return
}
+
+func (ac *addrConn) getState() connectivity.State {
+ ac.mu.Lock()
+ defer ac.mu.Unlock()
+ return ac.state
+}