summaryrefslogtreecommitdiff
path: root/go/vendor/google.golang.org/grpc/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'go/vendor/google.golang.org/grpc/server.go')
-rw-r--r--go/vendor/google.golang.org/grpc/server.go1108
1 files changed, 1108 insertions, 0 deletions
diff --git a/go/vendor/google.golang.org/grpc/server.go b/go/vendor/google.golang.org/grpc/server.go
new file mode 100644
index 0000000..b15f71c
--- /dev/null
+++ b/go/vendor/google.golang.org/grpc/server.go
@@ -0,0 +1,1108 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+package grpc
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+ "io"
+ "net"
+ "net/http"
+ "reflect"
+ "runtime"
+ "strings"
+ "sync"
+ "time"
+
+ "golang.org/x/net/context"
+ "golang.org/x/net/http2"
+ "golang.org/x/net/trace"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/grpclog"
+ "google.golang.org/grpc/internal"
+ "google.golang.org/grpc/keepalive"
+ "google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/stats"
+ "google.golang.org/grpc/status"
+ "google.golang.org/grpc/tap"
+ "google.golang.org/grpc/transport"
+)
+
+type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
+
+// MethodDesc represents an RPC service's method specification.
+type MethodDesc struct {
+ MethodName string
+ Handler methodHandler
+}
+
+// ServiceDesc represents an RPC service's specification.
+type ServiceDesc struct {
+ ServiceName string
+ // The pointer to the service interface. Used to check whether the user
+ // provided implementation satisfies the interface requirements.
+ HandlerType interface{}
+ Methods []MethodDesc
+ Streams []StreamDesc
+ Metadata interface{}
+}
+
+// service consists of the information of the server serving this service and
+// the methods in this service.
+type service struct {
+ server interface{} // the server for service methods
+ md map[string]*MethodDesc
+ sd map[string]*StreamDesc
+ mdata interface{}
+}
+
+// Server is a gRPC server to serve RPC requests.
+type Server struct {
+ opts options
+
+ mu sync.Mutex // guards following
+ lis map[net.Listener]bool
+ conns map[io.Closer]bool
+ drain bool
+ ctx context.Context
+ cancel context.CancelFunc
+ // A CondVar to let GracefulStop() blocks until all the pending RPCs are finished
+ // and all the transport goes away.
+ cv *sync.Cond
+ m map[string]*service // service name -> service info
+ events trace.EventLog
+}
+
+type options struct {
+ creds credentials.TransportCredentials
+ codec Codec
+ cp Compressor
+ dc Decompressor
+ maxMsgSize int
+ unaryInt UnaryServerInterceptor
+ streamInt StreamServerInterceptor
+ inTapHandle tap.ServerInHandle
+ statsHandler stats.Handler
+ maxConcurrentStreams uint32
+ useHandlerImpl bool // use http.Handler-based server
+ unknownStreamDesc *StreamDesc
+ keepaliveParams keepalive.ServerParameters
+ keepalivePolicy keepalive.EnforcementPolicy
+}
+
+var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size limit
+
+// A ServerOption sets options.
+type ServerOption func(*options)
+
+// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
+func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
+ return func(o *options) {
+ o.keepaliveParams = kp
+ }
+}
+
+// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
+func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
+ return func(o *options) {
+ o.keepalivePolicy = kep
+ }
+}
+
+// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
+func CustomCodec(codec Codec) ServerOption {
+ return func(o *options) {
+ o.codec = codec
+ }
+}
+
+// RPCCompressor returns a ServerOption that sets a compressor for outbound messages.
+func RPCCompressor(cp Compressor) ServerOption {
+ return func(o *options) {
+ o.cp = cp
+ }
+}
+
+// RPCDecompressor returns a ServerOption that sets a decompressor for inbound messages.
+func RPCDecompressor(dc Decompressor) ServerOption {
+ return func(o *options) {
+ o.dc = dc
+ }
+}
+
+// MaxMsgSize returns a ServerOption to set the max message size in bytes for inbound mesages.
+// If this is not set, gRPC uses the default 4MB.
+func MaxMsgSize(m int) ServerOption {
+ return func(o *options) {
+ o.maxMsgSize = m
+ }
+}
+
+// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
+// of concurrent streams to each ServerTransport.
+func MaxConcurrentStreams(n uint32) ServerOption {
+ return func(o *options) {
+ o.maxConcurrentStreams = n
+ }
+}
+
+// Creds returns a ServerOption that sets credentials for server connections.
+func Creds(c credentials.TransportCredentials) ServerOption {
+ return func(o *options) {
+ o.creds = c
+ }
+}
+
+// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
+// server. Only one unary interceptor can be installed. The construction of multiple
+// interceptors (e.g., chaining) can be implemented at the caller.
+func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
+ return func(o *options) {
+ if o.unaryInt != nil {
+ panic("The unary server interceptor has been set.")
+ }
+ o.unaryInt = i
+ }
+}
+
+// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
+// server. Only one stream interceptor can be installed.
+func StreamInterceptor(i StreamServerInterceptor) ServerOption {
+ return func(o *options) {
+ if o.streamInt != nil {
+ panic("The stream server interceptor has been set.")
+ }
+ o.streamInt = i
+ }
+}
+
+// InTapHandle returns a ServerOption that sets the tap handle for all the server
+// transport to be created. Only one can be installed.
+func InTapHandle(h tap.ServerInHandle) ServerOption {
+ return func(o *options) {
+ if o.inTapHandle != nil {
+ panic("The tap handle has been set.")
+ }
+ o.inTapHandle = h
+ }
+}
+
+// StatsHandler returns a ServerOption that sets the stats handler for the server.
+func StatsHandler(h stats.Handler) ServerOption {
+ return func(o *options) {
+ o.statsHandler = h
+ }
+}
+
+// UnknownServiceHandler returns a ServerOption that allows for adding a custom
+// unknown service handler. The provided method is a bidi-streaming RPC service
+// handler that will be invoked instead of returning the the "unimplemented" gRPC
+// error whenever a request is received for an unregistered service or method.
+// The handling function has full access to the Context of the request and the
+// stream, and the invocation passes through interceptors.
+func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
+ return func(o *options) {
+ o.unknownStreamDesc = &StreamDesc{
+ StreamName: "unknown_service_handler",
+ Handler: streamHandler,
+ // We need to assume that the users of the streamHandler will want to use both.
+ ClientStreams: true,
+ ServerStreams: true,
+ }
+ }
+}
+
+// NewServer creates a gRPC server which has no service registered and has not
+// started to accept requests yet.
+func NewServer(opt ...ServerOption) *Server {
+ var opts options
+ opts.maxMsgSize = defaultMaxMsgSize
+ for _, o := range opt {
+ o(&opts)
+ }
+ if opts.codec == nil {
+ // Set the default codec.
+ opts.codec = protoCodec{}
+ }
+ s := &Server{
+ lis: make(map[net.Listener]bool),
+ opts: opts,
+ conns: make(map[io.Closer]bool),
+ m: make(map[string]*service),
+ }
+ s.cv = sync.NewCond(&s.mu)
+ s.ctx, s.cancel = context.WithCancel(context.Background())
+ if EnableTracing {
+ _, file, line, _ := runtime.Caller(1)
+ s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
+ }
+ return s
+}
+
+// printf records an event in s's event log, unless s has been stopped.
+// REQUIRES s.mu is held.
+func (s *Server) printf(format string, a ...interface{}) {
+ if s.events != nil {
+ s.events.Printf(format, a...)
+ }
+}
+
+// errorf records an error in s's event log, unless s has been stopped.
+// REQUIRES s.mu is held.
+func (s *Server) errorf(format string, a ...interface{}) {
+ if s.events != nil {
+ s.events.Errorf(format, a...)
+ }
+}
+
+// RegisterService register a service and its implementation to the gRPC
+// server. Called from the IDL generated code. This must be called before
+// invoking Serve.
+func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
+ ht := reflect.TypeOf(sd.HandlerType).Elem()
+ st := reflect.TypeOf(ss)
+ if !st.Implements(ht) {
+ grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
+ }
+ s.register(sd, ss)
+}
+
+func (s *Server) register(sd *ServiceDesc, ss interface{}) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.printf("RegisterService(%q)", sd.ServiceName)
+ if _, ok := s.m[sd.ServiceName]; ok {
+ grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
+ }
+ srv := &service{
+ server: ss,
+ md: make(map[string]*MethodDesc),
+ sd: make(map[string]*StreamDesc),
+ mdata: sd.Metadata,
+ }
+ for i := range sd.Methods {
+ d := &sd.Methods[i]
+ srv.md[d.MethodName] = d
+ }
+ for i := range sd.Streams {
+ d := &sd.Streams[i]
+ srv.sd[d.StreamName] = d
+ }
+ s.m[sd.ServiceName] = srv
+}
+
+// MethodInfo contains the information of an RPC including its method name and type.
+type MethodInfo struct {
+ // Name is the method name only, without the service name or package name.
+ Name string
+ // IsClientStream indicates whether the RPC is a client streaming RPC.
+ IsClientStream bool
+ // IsServerStream indicates whether the RPC is a server streaming RPC.
+ IsServerStream bool
+}
+
+// ServiceInfo contains unary RPC method info, streaming RPC methid info and metadata for a service.
+type ServiceInfo struct {
+ Methods []MethodInfo
+ // Metadata is the metadata specified in ServiceDesc when registering service.
+ Metadata interface{}
+}
+
+// GetServiceInfo returns a map from service names to ServiceInfo.
+// Service names include the package names, in the form of <package>.<service>.
+func (s *Server) GetServiceInfo() map[string]ServiceInfo {
+ ret := make(map[string]ServiceInfo)
+ for n, srv := range s.m {
+ methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd))
+ for m := range srv.md {
+ methods = append(methods, MethodInfo{
+ Name: m,
+ IsClientStream: false,
+ IsServerStream: false,
+ })
+ }
+ for m, d := range srv.sd {
+ methods = append(methods, MethodInfo{
+ Name: m,
+ IsClientStream: d.ClientStreams,
+ IsServerStream: d.ServerStreams,
+ })
+ }
+
+ ret[n] = ServiceInfo{
+ Methods: methods,
+ Metadata: srv.mdata,
+ }
+ }
+ return ret
+}
+
+var (
+ // ErrServerStopped indicates that the operation is now illegal because of
+ // the server being stopped.
+ ErrServerStopped = errors.New("grpc: the server has been stopped")
+)
+
+func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
+ if s.opts.creds == nil {
+ return rawConn, nil, nil
+ }
+ return s.opts.creds.ServerHandshake(rawConn)
+}
+
+// Serve accepts incoming connections on the listener lis, creating a new
+// ServerTransport and service goroutine for each. The service goroutines
+// read gRPC requests and then call the registered handlers to reply to them.
+// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
+// this method returns.
+// Serve always returns non-nil error.
+func (s *Server) Serve(lis net.Listener) error {
+ s.mu.Lock()
+ s.printf("serving")
+ if s.lis == nil {
+ s.mu.Unlock()
+ lis.Close()
+ return ErrServerStopped
+ }
+ s.lis[lis] = true
+ s.mu.Unlock()
+ defer func() {
+ s.mu.Lock()
+ if s.lis != nil && s.lis[lis] {
+ lis.Close()
+ delete(s.lis, lis)
+ }
+ s.mu.Unlock()
+ }()
+
+ var tempDelay time.Duration // how long to sleep on accept failure
+
+ for {
+ rawConn, err := lis.Accept()
+ if err != nil {
+ if ne, ok := err.(interface {
+ Temporary() bool
+ }); ok && ne.Temporary() {
+ if tempDelay == 0 {
+ tempDelay = 5 * time.Millisecond
+ } else {
+ tempDelay *= 2
+ }
+ if max := 1 * time.Second; tempDelay > max {
+ tempDelay = max
+ }
+ s.mu.Lock()
+ s.printf("Accept error: %v; retrying in %v", err, tempDelay)
+ s.mu.Unlock()
+ select {
+ case <-time.After(tempDelay):
+ case <-s.ctx.Done():
+ }
+ continue
+ }
+ s.mu.Lock()
+ s.printf("done serving; Accept = %v", err)
+ s.mu.Unlock()
+ return err
+ }
+ tempDelay = 0
+ // Start a new goroutine to deal with rawConn
+ // so we don't stall this Accept loop goroutine.
+ go s.handleRawConn(rawConn)
+ }
+}
+
+// handleRawConn is run in its own goroutine and handles a just-accepted
+// connection that has not had any I/O performed on it yet.
+func (s *Server) handleRawConn(rawConn net.Conn) {
+ conn, authInfo, err := s.useTransportAuthenticator(rawConn)
+ if err != nil {
+ s.mu.Lock()
+ s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
+ s.mu.Unlock()
+ grpclog.Printf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
+ // If serverHandShake returns ErrConnDispatched, keep rawConn open.
+ if err != credentials.ErrConnDispatched {
+ rawConn.Close()
+ }
+ return
+ }
+
+ s.mu.Lock()
+ if s.conns == nil {
+ s.mu.Unlock()
+ conn.Close()
+ return
+ }
+ s.mu.Unlock()
+
+ if s.opts.useHandlerImpl {
+ s.serveUsingHandler(conn)
+ } else {
+ s.serveHTTP2Transport(conn, authInfo)
+ }
+}
+
+// serveHTTP2Transport sets up a http/2 transport (using the
+// gRPC http2 server transport in transport/http2_server.go) and
+// serves streams on it.
+// This is run in its own goroutine (it does network I/O in
+// transport.NewServerTransport).
+func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
+ config := &transport.ServerConfig{
+ MaxStreams: s.opts.maxConcurrentStreams,
+ AuthInfo: authInfo,
+ InTapHandle: s.opts.inTapHandle,
+ StatsHandler: s.opts.statsHandler,
+ KeepaliveParams: s.opts.keepaliveParams,
+ KeepalivePolicy: s.opts.keepalivePolicy,
+ }
+ st, err := transport.NewServerTransport("http2", c, config)
+ if err != nil {
+ s.mu.Lock()
+ s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
+ s.mu.Unlock()
+ c.Close()
+ grpclog.Println("grpc: Server.Serve failed to create ServerTransport: ", err)
+ return
+ }
+ if !s.addConn(st) {
+ st.Close()
+ return
+ }
+ s.serveStreams(st)
+}
+
+func (s *Server) serveStreams(st transport.ServerTransport) {
+ defer s.removeConn(st)
+ defer st.Close()
+ var wg sync.WaitGroup
+ st.HandleStreams(func(stream *transport.Stream) {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ s.handleStream(st, stream, s.traceInfo(st, stream))
+ }()
+ }, func(ctx context.Context, method string) context.Context {
+ if !EnableTracing {
+ return ctx
+ }
+ tr := trace.New("grpc.Recv."+methodFamily(method), method)
+ return trace.NewContext(ctx, tr)
+ })
+ wg.Wait()
+}
+
+var _ http.Handler = (*Server)(nil)
+
+// serveUsingHandler is called from handleRawConn when s is configured
+// to handle requests via the http.Handler interface. It sets up a
+// net/http.Server to handle the just-accepted conn. The http.Server
+// is configured to route all incoming requests (all HTTP/2 streams)
+// to ServeHTTP, which creates a new ServerTransport for each stream.
+// serveUsingHandler blocks until conn closes.
+//
+// This codepath is only used when Server.TestingUseHandlerImpl has
+// been configured. This lets the end2end tests exercise the ServeHTTP
+// method as one of the environment types.
+//
+// conn is the *tls.Conn that's already been authenticated.
+func (s *Server) serveUsingHandler(conn net.Conn) {
+ if !s.addConn(conn) {
+ conn.Close()
+ return
+ }
+ defer s.removeConn(conn)
+ h2s := &http2.Server{
+ MaxConcurrentStreams: s.opts.maxConcurrentStreams,
+ }
+ h2s.ServeConn(conn, &http2.ServeConnOpts{
+ Handler: s,
+ })
+}
+
+func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ st, err := transport.NewServerHandlerTransport(w, r)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ if !s.addConn(st) {
+ st.Close()
+ return
+ }
+ defer s.removeConn(st)
+ s.serveStreams(st)
+}
+
+// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
+// If tracing is not enabled, it returns nil.
+func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
+ tr, ok := trace.FromContext(stream.Context())
+ if !ok {
+ return nil
+ }
+
+ trInfo = &traceInfo{
+ tr: tr,
+ }
+ trInfo.firstLine.client = false
+ trInfo.firstLine.remoteAddr = st.RemoteAddr()
+
+ if dl, ok := stream.Context().Deadline(); ok {
+ trInfo.firstLine.deadline = dl.Sub(time.Now())
+ }
+ return trInfo
+}
+
+func (s *Server) addConn(c io.Closer) bool {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if s.conns == nil || s.drain {
+ return false
+ }
+ s.conns[c] = true
+ return true
+}
+
+func (s *Server) removeConn(c io.Closer) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if s.conns != nil {
+ delete(s.conns, c)
+ s.cv.Broadcast()
+ }
+}
+
+func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options) error {
+ var (
+ cbuf *bytes.Buffer
+ outPayload *stats.OutPayload
+ )
+ if cp != nil {
+ cbuf = new(bytes.Buffer)
+ }
+ if s.opts.statsHandler != nil {
+ outPayload = &stats.OutPayload{}
+ }
+ p, err := encode(s.opts.codec, msg, cp, cbuf, outPayload)
+ if err != nil {
+ // This typically indicates a fatal issue (e.g., memory
+ // corruption or hardware faults) the application program
+ // cannot handle.
+ //
+ // TODO(zhaoq): There exist other options also such as only closing the
+ // faulty stream locally and remotely (Other streams can keep going). Find
+ // the optimal option.
+ grpclog.Fatalf("grpc: Server failed to encode response %v", err)
+ }
+ err = t.Write(stream, p, opts)
+ if err == nil && outPayload != nil {
+ outPayload.SentTime = time.Now()
+ s.opts.statsHandler.HandleRPC(stream.Context(), outPayload)
+ }
+ return err
+}
+
+func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
+ sh := s.opts.statsHandler
+ if sh != nil {
+ begin := &stats.Begin{
+ BeginTime: time.Now(),
+ }
+ sh.HandleRPC(stream.Context(), begin)
+ }
+ defer func() {
+ if sh != nil {
+ end := &stats.End{
+ EndTime: time.Now(),
+ }
+ if err != nil && err != io.EOF {
+ end.Error = toRPCErr(err)
+ }
+ sh.HandleRPC(stream.Context(), end)
+ }
+ }()
+ if trInfo != nil {
+ defer trInfo.tr.Finish()
+ trInfo.firstLine.client = false
+ trInfo.tr.LazyLog(&trInfo.firstLine, false)
+ defer func() {
+ if err != nil && err != io.EOF {
+ trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
+ trInfo.tr.SetError()
+ }
+ }()
+ }
+ if s.opts.cp != nil {
+ // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
+ stream.SetSendCompress(s.opts.cp.Type())
+ }
+ p := &parser{r: stream}
+ for { // TODO: delete
+ pf, req, err := p.recvMsg(s.opts.maxMsgSize)
+ if err == io.EOF {
+ // The entire stream is done (for unary RPC only).
+ return err
+ }
+ if err == io.ErrUnexpectedEOF {
+ err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
+ }
+ if err != nil {
+ if st, ok := status.FromError(err); ok {
+ if e := t.WriteStatus(stream, st); e != nil {
+ grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
+ }
+ } else {
+ switch st := err.(type) {
+ case transport.ConnectionError:
+ // Nothing to do here.
+ case transport.StreamError:
+ if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
+ grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
+ }
+ default:
+ panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", st, st))
+ }
+ }
+ return err
+ }
+
+ if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil {
+ if st, ok := status.FromError(err); ok {
+ if e := t.WriteStatus(stream, st); e != nil {
+ grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
+ }
+ return err
+ }
+ if e := t.WriteStatus(stream, status.New(codes.Internal, err.Error())); e != nil {
+ grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
+ }
+
+ // TODO checkRecvPayload always return RPC error. Add a return here if necessary.
+ }
+ var inPayload *stats.InPayload
+ if sh != nil {
+ inPayload = &stats.InPayload{
+ RecvTime: time.Now(),
+ }
+ }
+ df := func(v interface{}) error {
+ if inPayload != nil {
+ inPayload.WireLength = len(req)
+ }
+ if pf == compressionMade {
+ var err error
+ req, err = s.opts.dc.Do(bytes.NewReader(req))
+ if err != nil {
+ return Errorf(codes.Internal, err.Error())
+ }
+ }
+ if len(req) > s.opts.maxMsgSize {
+ // TODO: Revisit the error code. Currently keep it consistent with
+ // java implementation.
+ return status.Errorf(codes.Internal, "grpc: server received a message of %d bytes exceeding %d limit", len(req), s.opts.maxMsgSize)
+ }
+ if err := s.opts.codec.Unmarshal(req, v); err != nil {
+ return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
+ }
+ if inPayload != nil {
+ inPayload.Payload = v
+ inPayload.Data = req
+ inPayload.Length = len(req)
+ sh.HandleRPC(stream.Context(), inPayload)
+ }
+ if trInfo != nil {
+ trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
+ }
+ return nil
+ }
+ reply, appErr := md.Handler(srv.server, stream.Context(), df, s.opts.unaryInt)
+ if appErr != nil {
+ appStatus, ok := status.FromError(appErr)
+ if !ok {
+ // Convert appErr if it is not a grpc status error.
+ appErr = status.Error(convertCode(appErr), appErr.Error())
+ appStatus, _ = status.FromError(appErr)
+ }
+ if trInfo != nil {
+ trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
+ trInfo.tr.SetError()
+ }
+ if e := t.WriteStatus(stream, appStatus); e != nil {
+ grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", e)
+ }
+ return appErr
+ }
+ if trInfo != nil {
+ trInfo.tr.LazyLog(stringer("OK"), false)
+ }
+ opts := &transport.Options{
+ Last: true,
+ Delay: false,
+ }
+ if err := s.sendResponse(t, stream, reply, s.opts.cp, opts); err != nil {
+ if err == io.EOF {
+ // The entire stream is done (for unary RPC only).
+ return err
+ }
+ if s, ok := status.FromError(err); ok {
+ if e := t.WriteStatus(stream, s); e != nil {
+ grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", e)
+ }
+ } else {
+ switch st := err.(type) {
+ case transport.ConnectionError:
+ // Nothing to do here.
+ case transport.StreamError:
+ if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
+ grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
+ }
+ default:
+ panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
+ }
+ }
+ return err
+ }
+ if trInfo != nil {
+ trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
+ }
+ // TODO: Should we be logging if writing status failed here, like above?
+ // Should the logging be in WriteStatus? Should we ignore the WriteStatus
+ // error or allow the stats handler to see it?
+ return t.WriteStatus(stream, status.New(codes.OK, ""))
+ }
+}
+
+func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
+ sh := s.opts.statsHandler
+ if sh != nil {
+ begin := &stats.Begin{
+ BeginTime: time.Now(),
+ }
+ sh.HandleRPC(stream.Context(), begin)
+ }
+ defer func() {
+ if sh != nil {
+ end := &stats.End{
+ EndTime: time.Now(),
+ }
+ if err != nil && err != io.EOF {
+ end.Error = toRPCErr(err)
+ }
+ sh.HandleRPC(stream.Context(), end)
+ }
+ }()
+ if s.opts.cp != nil {
+ stream.SetSendCompress(s.opts.cp.Type())
+ }
+ ss := &serverStream{
+ t: t,
+ s: stream,
+ p: &parser{r: stream},
+ codec: s.opts.codec,
+ cp: s.opts.cp,
+ dc: s.opts.dc,
+ maxMsgSize: s.opts.maxMsgSize,
+ trInfo: trInfo,
+ statsHandler: sh,
+ }
+ if ss.cp != nil {
+ ss.cbuf = new(bytes.Buffer)
+ }
+ if trInfo != nil {
+ trInfo.tr.LazyLog(&trInfo.firstLine, false)
+ defer func() {
+ ss.mu.Lock()
+ if err != nil && err != io.EOF {
+ ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
+ ss.trInfo.tr.SetError()
+ }
+ ss.trInfo.tr.Finish()
+ ss.trInfo.tr = nil
+ ss.mu.Unlock()
+ }()
+ }
+ var appErr error
+ var server interface{}
+ if srv != nil {
+ server = srv.server
+ }
+ if s.opts.streamInt == nil {
+ appErr = sd.Handler(server, ss)
+ } else {
+ info := &StreamServerInfo{
+ FullMethod: stream.Method(),
+ IsClientStream: sd.ClientStreams,
+ IsServerStream: sd.ServerStreams,
+ }
+ appErr = s.opts.streamInt(server, ss, info, sd.Handler)
+ }
+ if appErr != nil {
+ appStatus, ok := status.FromError(appErr)
+ if !ok {
+ switch err := appErr.(type) {
+ case transport.StreamError:
+ appStatus = status.New(err.Code, err.Desc)
+ default:
+ appStatus = status.New(convertCode(appErr), appErr.Error())
+ }
+ appErr = appStatus.Err()
+ }
+ if trInfo != nil {
+ ss.mu.Lock()
+ ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
+ ss.trInfo.tr.SetError()
+ ss.mu.Unlock()
+ }
+ t.WriteStatus(ss.s, appStatus)
+ // TODO: Should we log an error from WriteStatus here and below?
+ return appErr
+ }
+ if trInfo != nil {
+ ss.mu.Lock()
+ ss.trInfo.tr.LazyLog(stringer("OK"), false)
+ ss.mu.Unlock()
+ }
+ return t.WriteStatus(ss.s, status.New(codes.OK, ""))
+
+}
+
+func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
+ sm := stream.Method()
+ if sm != "" && sm[0] == '/' {
+ sm = sm[1:]
+ }
+ pos := strings.LastIndex(sm, "/")
+ if pos == -1 {
+ if trInfo != nil {
+ trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
+ trInfo.tr.SetError()
+ }
+ errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
+ if err := t.WriteStatus(stream, status.New(codes.InvalidArgument, errDesc)); err != nil {
+ if trInfo != nil {
+ trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
+ trInfo.tr.SetError()
+ }
+ grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
+ }
+ if trInfo != nil {
+ trInfo.tr.Finish()
+ }
+ return
+ }
+ service := sm[:pos]
+ method := sm[pos+1:]
+ srv, ok := s.m[service]
+ if !ok {
+ if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
+ s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
+ return
+ }
+ if trInfo != nil {
+ trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
+ trInfo.tr.SetError()
+ }
+ errDesc := fmt.Sprintf("unknown service %v", service)
+ if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
+ if trInfo != nil {
+ trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
+ trInfo.tr.SetError()
+ }
+ grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
+ }
+ if trInfo != nil {
+ trInfo.tr.Finish()
+ }
+ return
+ }
+ // Unary RPC or Streaming RPC?
+ if md, ok := srv.md[method]; ok {
+ s.processUnaryRPC(t, stream, srv, md, trInfo)
+ return
+ }
+ if sd, ok := srv.sd[method]; ok {
+ s.processStreamingRPC(t, stream, srv, sd, trInfo)
+ return
+ }
+ if trInfo != nil {
+ trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true)
+ trInfo.tr.SetError()
+ }
+ if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
+ s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
+ return
+ }
+ errDesc := fmt.Sprintf("unknown method %v", method)
+ if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
+ if trInfo != nil {
+ trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
+ trInfo.tr.SetError()
+ }
+ grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
+ }
+ if trInfo != nil {
+ trInfo.tr.Finish()
+ }
+}
+
+// Stop stops the gRPC server. It immediately closes all open
+// connections and listeners.
+// It cancels all active RPCs on the server side and the corresponding
+// pending RPCs on the client side will get notified by connection
+// errors.
+func (s *Server) Stop() {
+ s.mu.Lock()
+ listeners := s.lis
+ s.lis = nil
+ st := s.conns
+ s.conns = nil
+ // interrupt GracefulStop if Stop and GracefulStop are called concurrently.
+ s.cv.Broadcast()
+ s.mu.Unlock()
+
+ for lis := range listeners {
+ lis.Close()
+ }
+ for c := range st {
+ c.Close()
+ }
+
+ s.mu.Lock()
+ s.cancel()
+ if s.events != nil {
+ s.events.Finish()
+ s.events = nil
+ }
+ s.mu.Unlock()
+}
+
+// GracefulStop stops the gRPC server gracefully. It stops the server to accept new
+// connections and RPCs and blocks until all the pending RPCs are finished.
+func (s *Server) GracefulStop() {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if s.conns == nil {
+ return
+ }
+ for lis := range s.lis {
+ lis.Close()
+ }
+ s.lis = nil
+ s.cancel()
+ if !s.drain {
+ for c := range s.conns {
+ c.(transport.ServerTransport).Drain()
+ }
+ s.drain = true
+ }
+ for len(s.conns) != 0 {
+ s.cv.Wait()
+ }
+ s.conns = nil
+ if s.events != nil {
+ s.events.Finish()
+ s.events = nil
+ }
+}
+
+func init() {
+ internal.TestingCloseConns = func(arg interface{}) {
+ arg.(*Server).testingCloseConns()
+ }
+ internal.TestingUseHandlerImpl = func(arg interface{}) {
+ arg.(*Server).opts.useHandlerImpl = true
+ }
+}
+
+// testingCloseConns closes all existing transports but keeps s.lis
+// accepting new connections.
+func (s *Server) testingCloseConns() {
+ s.mu.Lock()
+ for c := range s.conns {
+ c.Close()
+ delete(s.conns, c)
+ }
+ s.mu.Unlock()
+}
+
+// SetHeader sets the header metadata.
+// When called multiple times, all the provided metadata will be merged.
+// All the metadata will be sent out when one of the following happens:
+// - grpc.SendHeader() is called;
+// - The first response is sent out;
+// - An RPC status is sent out (error or success).
+func SetHeader(ctx context.Context, md metadata.MD) error {
+ if md.Len() == 0 {
+ return nil
+ }
+ stream, ok := transport.StreamFromContext(ctx)
+ if !ok {
+ return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
+ }
+ return stream.SetHeader(md)
+}
+
+// SendHeader sends header metadata. It may be called at most once.
+// The provided md and headers set by SetHeader() will be sent.
+func SendHeader(ctx context.Context, md metadata.MD) error {
+ stream, ok := transport.StreamFromContext(ctx)
+ if !ok {
+ return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
+ }
+ t := stream.ServerTransport()
+ if t == nil {
+ grpclog.Fatalf("grpc: SendHeader: %v has no ServerTransport to send header metadata.", stream)
+ }
+ if err := t.WriteHeader(stream, md); err != nil {
+ return toRPCErr(err)
+ }
+ return nil
+}
+
+// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
+// When called more than once, all the provided metadata will be merged.
+func SetTrailer(ctx context.Context, md metadata.MD) error {
+ if md.Len() == 0 {
+ return nil
+ }
+ stream, ok := transport.StreamFromContext(ctx)
+ if !ok {
+ return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
+ }
+ return stream.SetTrailer(md)
+}