diff options
author | Jens Geyer <jensg@apache.org> | 2021-11-13 23:21:02 +0100 |
---|---|---|
committer | Jens Geyer <Jens-G@users.noreply.github.com> | 2021-11-14 12:35:30 +0100 |
commit | ea1e8ff1407112342ed17cb95087bbda4e9b2cc0 (patch) | |
tree | 9e465f2ef6fd0b57676662811cfa2fd512832df5 | |
parent | 7156940c1da6f7e0c4e8b830cea1e37f770db173 (diff) | |
download | thrift-ea1e8ff1407112342ed17cb95087bbda4e9b2cc0.tar.gz |
THRIFT-5481 consolidate netstd server implementation details into one common model
Client: netstd
Patch: JensG
-rw-r--r-- | lib/netstd/Thrift/Server/TSimpleAsyncServer.cs | 153 | ||||
-rw-r--r-- | lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs | 8 |
2 files changed, 81 insertions, 80 deletions
diff --git a/lib/netstd/Thrift/Server/TSimpleAsyncServer.cs b/lib/netstd/Thrift/Server/TSimpleAsyncServer.cs index 45e55139c..d46d58a75 100644 --- a/lib/netstd/Thrift/Server/TSimpleAsyncServer.cs +++ b/lib/netstd/Thrift/Server/TSimpleAsyncServer.cs @@ -17,21 +17,24 @@ using System; using System.Threading; -using System.Threading.Tasks; -using Microsoft.Extensions.Logging; using Thrift.Protocol; -using Thrift.Processor; using Thrift.Transport; +using Thrift.Processor; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; + +#pragma warning disable IDE0079 // remove unnecessary pragmas +#pragma warning disable IDE0063 // using can be simplified, we don't namespace Thrift.Server { - //TODO: unhandled exceptions, etc. // ReSharper disable once InconsistentNaming public class TSimpleAsyncServer : TServer { - private readonly int _clientWaitingDelay; - private volatile Task _serverTask; + private volatile bool stop = false; + + private CancellationToken ServerCancellationToken; public TSimpleAsyncServer(ITProcessorFactory itProcessorFactory, TServerTransport serverTransport, @@ -39,8 +42,7 @@ namespace Thrift.Server TTransportFactory outputTransportFactory, TProtocolFactory inputProtocolFactory, TProtocolFactory outputProtocolFactory, - ILogger logger, - int clientWaitingDelay = 10) + ILogger logger) : base(itProcessorFactory, serverTransport, inputTransportFactory, @@ -49,7 +51,6 @@ namespace Thrift.Server outputProtocolFactory, logger) { - _clientWaitingDelay = clientWaitingDelay; } public TSimpleAsyncServer(ITProcessorFactory itProcessorFactory, @@ -58,16 +59,14 @@ namespace Thrift.Server TTransportFactory outputTransportFactory, TProtocolFactory inputProtocolFactory, TProtocolFactory outputProtocolFactory, - ILoggerFactory loggerFactory, - int clientWaitingDelay = 10) + ILoggerFactory loggerFactory) : this(itProcessorFactory, serverTransport, inputTransportFactory, outputTransportFactory, inputProtocolFactory, outputProtocolFactory, - loggerFactory.CreateLogger<TSimpleAsyncServer>(), - clientWaitingDelay) + loggerFactory.CreateLogger<TSimpleAsyncServer>()) { } @@ -75,87 +74,87 @@ namespace Thrift.Server TServerTransport serverTransport, TProtocolFactory inputProtocolFactory, TProtocolFactory outputProtocolFactory, - ILoggerFactory loggerFactory, - int clientWaitingDelay = 10) + ILoggerFactory loggerFactory) : this(new TSingletonProcessorFactory(processor), serverTransport, null, // defaults to TTransportFactory() null, // defaults to TTransportFactory() inputProtocolFactory, outputProtocolFactory, - loggerFactory.CreateLogger(nameof(TSimpleAsyncServer)), - clientWaitingDelay) + loggerFactory.CreateLogger(nameof(TSimpleAsyncServer))) { } public override async Task ServeAsync(CancellationToken cancellationToken) { + ServerCancellationToken = cancellationToken; try { - // cancelation token - _serverTask = Task.Factory.StartNew(() => StartListening(cancellationToken), TaskCreationOptions.LongRunning); - await _serverTask; - } - catch (Exception ex) - { - Logger.LogError(ex.ToString()); - } - } - - private async Task StartListening(CancellationToken cancellationToken) - { - ServerTransport.Listen(); - - Logger.LogTrace("Started listening at server"); + try + { + ServerTransport.Listen(); + } + catch (TTransportException ttx) + { + LogError("Error, could not listen on ServerTransport: " + ttx); + return; + } - if (ServerEventHandler != null) - { - await ServerEventHandler.PreServeAsync(cancellationToken); - } + //Fire the preServe server event when server is up but before any client connections + if (ServerEventHandler != null) + await ServerEventHandler.PreServeAsync(cancellationToken); - while (!cancellationToken.IsCancellationRequested) - { - if (ServerTransport.IsClientPending()) + while (!(stop || ServerCancellationToken.IsCancellationRequested)) { - Logger.LogTrace("Waiting for client connection"); - try { - var client = await ServerTransport.AcceptAsync(cancellationToken); - await Task.Factory.StartNew(() => Execute(client, cancellationToken), cancellationToken); + using (TTransport client = await ServerTransport.AcceptAsync(cancellationToken)) + { + await ExecuteAsync(client); + } + } + catch (TaskCanceledException) + { + stop = true; } catch (TTransportException ttx) { - Logger.LogTrace($"Transport exception: {ttx}"); - - if (ttx.Type != TTransportException.ExceptionType.Interrupted) + if (!stop || ttx.Type != TTransportException.ExceptionType.Interrupted) { - Logger.LogError(ttx.ToString()); + LogError(ttx.ToString()); } + } } - else + + if (stop) { try { - await Task.Delay(TimeSpan.FromMilliseconds(_clientWaitingDelay), cancellationToken); + ServerTransport.Close(); + } + catch (TTransportException ttx) + { + LogError("TServerTransport failed on close: " + ttx.Message); } - catch (TaskCanceledException) { } + stop = false; } - } - ServerTransport.Close(); - - Logger.LogTrace("Completed listening at server"); - } - - public override void Stop() - { + } + finally + { + ServerCancellationToken = default; + } } - private async Task Execute(TTransport client, CancellationToken cancellationToken) + /// <summary> + /// Loops on processing a client forever + /// client will be a TTransport instance + /// </summary> + /// <param name="client"></param> + private async Task ExecuteAsync(TTransport client) { - Logger.LogTrace("Started client request processing"); + var cancellationToken = ServerCancellationToken; var processor = ProcessorFactory.GetAsyncProcessor(client, this); @@ -164,7 +163,6 @@ namespace Thrift.Server TProtocol inputProtocol = null; TProtocol outputProtocol = null; object connectionContext = null; - try { try @@ -174,42 +172,41 @@ namespace Thrift.Server inputProtocol = InputProtocolFactory.GetProtocol(inputTransport); outputProtocol = OutputProtocolFactory.GetProtocol(outputTransport); + //Recover event handler (if any) and fire createContext server event when a client connects if (ServerEventHandler != null) - { connectionContext = await ServerEventHandler.CreateContextAsync(inputProtocol, outputProtocol, cancellationToken); - } - while (!cancellationToken.IsCancellationRequested) + //Process client requests until client disconnects + while (!(stop || cancellationToken.IsCancellationRequested)) { if (!await inputTransport.PeekAsync(cancellationToken)) - { break; - } + //Fire processContext server event + //N.B. This is the pattern implemented in C++ and the event fires provisionally. + //That is to say it may be many minutes between the event firing and the client request + //actually arriving or the client may hang up without ever makeing a request. if (ServerEventHandler != null) - { await ServerEventHandler.ProcessContextAsync(connectionContext, inputTransport, cancellationToken); - } + //Process client request (blocks until transport is readable) if (!await processor.ProcessAsync(inputProtocol, outputProtocol, cancellationToken)) - { break; - } } } - catch (TTransportException ttx) + catch (TTransportException) { - Logger.LogTrace($"Transport exception: {ttx}"); + //Usually a client disconnect, expected } catch (Exception x) { - Logger.LogError($"Error: {x}"); + //Unexpected + LogError("Error: " + x); } + //Fire deleteContext server event after client disconnects if (ServerEventHandler != null) - { await ServerEventHandler.DeleteContextAsync(connectionContext, inputProtocol, outputProtocol, cancellationToken); - } } finally @@ -224,8 +221,12 @@ namespace Thrift.Server inputTransport?.Dispose(); outputTransport?.Dispose(); } + } - Logger.LogTrace("Completed client request processing"); + public override void Stop() + { + stop = true; + ServerTransport?.Close(); } } } diff --git a/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs b/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs index 46cc9d4a8..ba1834cde 100644 --- a/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs +++ b/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs @@ -105,7 +105,7 @@ namespace Thrift.Server TTransportFactory outputTransportFactory, TProtocolFactory inputProtocolFactory, TProtocolFactory outputProtocolFactory, - int minThreadPoolThreads, int maxThreadPoolThreads, ILogger logger= null) + int minThreadPoolThreads, int maxThreadPoolThreads, ILogger logger = null) : this(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory, inputProtocolFactory, outputProtocolFactory, new Configuration(minThreadPoolThreads, maxThreadPoolThreads), @@ -245,9 +245,9 @@ namespace Thrift.Server connectionContext = await ServerEventHandler.CreateContextAsync(inputProtocol, outputProtocol, cancellationToken); //Process client requests until client disconnects - while (!stop) + while (!(stop || cancellationToken.IsCancellationRequested)) { - if (! await inputTransport.PeekAsync(cancellationToken)) + if (!await inputTransport.PeekAsync(cancellationToken)) break; //Fire processContext server event @@ -258,7 +258,7 @@ namespace Thrift.Server await ServerEventHandler.ProcessContextAsync(connectionContext, inputTransport, cancellationToken); //Process client request (blocks until transport is readable) - if (! await processor.ProcessAsync(inputProtocol, outputProtocol, cancellationToken)) + if (!await processor.ProcessAsync(inputProtocol, outputProtocol, cancellationToken)) break; } } |