summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJens Geyer <jensg@apache.org>2021-11-13 23:21:02 +0100
committerJens Geyer <Jens-G@users.noreply.github.com>2021-11-14 12:35:30 +0100
commitea1e8ff1407112342ed17cb95087bbda4e9b2cc0 (patch)
tree9e465f2ef6fd0b57676662811cfa2fd512832df5
parent7156940c1da6f7e0c4e8b830cea1e37f770db173 (diff)
downloadthrift-ea1e8ff1407112342ed17cb95087bbda4e9b2cc0.tar.gz
THRIFT-5481 consolidate netstd server implementation details into one common model
Client: netstd Patch: JensG
-rw-r--r--lib/netstd/Thrift/Server/TSimpleAsyncServer.cs153
-rw-r--r--lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs8
2 files changed, 81 insertions, 80 deletions
diff --git a/lib/netstd/Thrift/Server/TSimpleAsyncServer.cs b/lib/netstd/Thrift/Server/TSimpleAsyncServer.cs
index 45e55139c..d46d58a75 100644
--- a/lib/netstd/Thrift/Server/TSimpleAsyncServer.cs
+++ b/lib/netstd/Thrift/Server/TSimpleAsyncServer.cs
@@ -17,21 +17,24 @@
using System;
using System.Threading;
-using System.Threading.Tasks;
-using Microsoft.Extensions.Logging;
using Thrift.Protocol;
-using Thrift.Processor;
using Thrift.Transport;
+using Thrift.Processor;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+
+#pragma warning disable IDE0079 // remove unnecessary pragmas
+#pragma warning disable IDE0063 // using can be simplified, we don't
namespace Thrift.Server
{
- //TODO: unhandled exceptions, etc.
// ReSharper disable once InconsistentNaming
public class TSimpleAsyncServer : TServer
{
- private readonly int _clientWaitingDelay;
- private volatile Task _serverTask;
+ private volatile bool stop = false;
+
+ private CancellationToken ServerCancellationToken;
public TSimpleAsyncServer(ITProcessorFactory itProcessorFactory,
TServerTransport serverTransport,
@@ -39,8 +42,7 @@ namespace Thrift.Server
TTransportFactory outputTransportFactory,
TProtocolFactory inputProtocolFactory,
TProtocolFactory outputProtocolFactory,
- ILogger logger,
- int clientWaitingDelay = 10)
+ ILogger logger)
: base(itProcessorFactory,
serverTransport,
inputTransportFactory,
@@ -49,7 +51,6 @@ namespace Thrift.Server
outputProtocolFactory,
logger)
{
- _clientWaitingDelay = clientWaitingDelay;
}
public TSimpleAsyncServer(ITProcessorFactory itProcessorFactory,
@@ -58,16 +59,14 @@ namespace Thrift.Server
TTransportFactory outputTransportFactory,
TProtocolFactory inputProtocolFactory,
TProtocolFactory outputProtocolFactory,
- ILoggerFactory loggerFactory,
- int clientWaitingDelay = 10)
+ ILoggerFactory loggerFactory)
: this(itProcessorFactory,
serverTransport,
inputTransportFactory,
outputTransportFactory,
inputProtocolFactory,
outputProtocolFactory,
- loggerFactory.CreateLogger<TSimpleAsyncServer>(),
- clientWaitingDelay)
+ loggerFactory.CreateLogger<TSimpleAsyncServer>())
{
}
@@ -75,87 +74,87 @@ namespace Thrift.Server
TServerTransport serverTransport,
TProtocolFactory inputProtocolFactory,
TProtocolFactory outputProtocolFactory,
- ILoggerFactory loggerFactory,
- int clientWaitingDelay = 10)
+ ILoggerFactory loggerFactory)
: this(new TSingletonProcessorFactory(processor),
serverTransport,
null, // defaults to TTransportFactory()
null, // defaults to TTransportFactory()
inputProtocolFactory,
outputProtocolFactory,
- loggerFactory.CreateLogger(nameof(TSimpleAsyncServer)),
- clientWaitingDelay)
+ loggerFactory.CreateLogger(nameof(TSimpleAsyncServer)))
{
}
public override async Task ServeAsync(CancellationToken cancellationToken)
{
+ ServerCancellationToken = cancellationToken;
try
{
- // cancelation token
- _serverTask = Task.Factory.StartNew(() => StartListening(cancellationToken), TaskCreationOptions.LongRunning);
- await _serverTask;
- }
- catch (Exception ex)
- {
- Logger.LogError(ex.ToString());
- }
- }
-
- private async Task StartListening(CancellationToken cancellationToken)
- {
- ServerTransport.Listen();
-
- Logger.LogTrace("Started listening at server");
+ try
+ {
+ ServerTransport.Listen();
+ }
+ catch (TTransportException ttx)
+ {
+ LogError("Error, could not listen on ServerTransport: " + ttx);
+ return;
+ }
- if (ServerEventHandler != null)
- {
- await ServerEventHandler.PreServeAsync(cancellationToken);
- }
+ //Fire the preServe server event when server is up but before any client connections
+ if (ServerEventHandler != null)
+ await ServerEventHandler.PreServeAsync(cancellationToken);
- while (!cancellationToken.IsCancellationRequested)
- {
- if (ServerTransport.IsClientPending())
+ while (!(stop || ServerCancellationToken.IsCancellationRequested))
{
- Logger.LogTrace("Waiting for client connection");
-
try
{
- var client = await ServerTransport.AcceptAsync(cancellationToken);
- await Task.Factory.StartNew(() => Execute(client, cancellationToken), cancellationToken);
+ using (TTransport client = await ServerTransport.AcceptAsync(cancellationToken))
+ {
+ await ExecuteAsync(client);
+ }
+ }
+ catch (TaskCanceledException)
+ {
+ stop = true;
}
catch (TTransportException ttx)
{
- Logger.LogTrace($"Transport exception: {ttx}");
-
- if (ttx.Type != TTransportException.ExceptionType.Interrupted)
+ if (!stop || ttx.Type != TTransportException.ExceptionType.Interrupted)
{
- Logger.LogError(ttx.ToString());
+ LogError(ttx.ToString());
}
+
}
}
- else
+
+ if (stop)
{
try
{
- await Task.Delay(TimeSpan.FromMilliseconds(_clientWaitingDelay), cancellationToken);
+ ServerTransport.Close();
+ }
+ catch (TTransportException ttx)
+ {
+ LogError("TServerTransport failed on close: " + ttx.Message);
}
- catch (TaskCanceledException) { }
+ stop = false;
}
- }
- ServerTransport.Close();
-
- Logger.LogTrace("Completed listening at server");
- }
-
- public override void Stop()
- {
+ }
+ finally
+ {
+ ServerCancellationToken = default;
+ }
}
- private async Task Execute(TTransport client, CancellationToken cancellationToken)
+ /// <summary>
+ /// Loops on processing a client forever
+ /// client will be a TTransport instance
+ /// </summary>
+ /// <param name="client"></param>
+ private async Task ExecuteAsync(TTransport client)
{
- Logger.LogTrace("Started client request processing");
+ var cancellationToken = ServerCancellationToken;
var processor = ProcessorFactory.GetAsyncProcessor(client, this);
@@ -164,7 +163,6 @@ namespace Thrift.Server
TProtocol inputProtocol = null;
TProtocol outputProtocol = null;
object connectionContext = null;
-
try
{
try
@@ -174,42 +172,41 @@ namespace Thrift.Server
inputProtocol = InputProtocolFactory.GetProtocol(inputTransport);
outputProtocol = OutputProtocolFactory.GetProtocol(outputTransport);
+ //Recover event handler (if any) and fire createContext server event when a client connects
if (ServerEventHandler != null)
- {
connectionContext = await ServerEventHandler.CreateContextAsync(inputProtocol, outputProtocol, cancellationToken);
- }
- while (!cancellationToken.IsCancellationRequested)
+ //Process client requests until client disconnects
+ while (!(stop || cancellationToken.IsCancellationRequested))
{
if (!await inputTransport.PeekAsync(cancellationToken))
- {
break;
- }
+ //Fire processContext server event
+ //N.B. This is the pattern implemented in C++ and the event fires provisionally.
+ //That is to say it may be many minutes between the event firing and the client request
+ //actually arriving or the client may hang up without ever makeing a request.
if (ServerEventHandler != null)
- {
await ServerEventHandler.ProcessContextAsync(connectionContext, inputTransport, cancellationToken);
- }
+ //Process client request (blocks until transport is readable)
if (!await processor.ProcessAsync(inputProtocol, outputProtocol, cancellationToken))
- {
break;
- }
}
}
- catch (TTransportException ttx)
+ catch (TTransportException)
{
- Logger.LogTrace($"Transport exception: {ttx}");
+ //Usually a client disconnect, expected
}
catch (Exception x)
{
- Logger.LogError($"Error: {x}");
+ //Unexpected
+ LogError("Error: " + x);
}
+ //Fire deleteContext server event after client disconnects
if (ServerEventHandler != null)
- {
await ServerEventHandler.DeleteContextAsync(connectionContext, inputProtocol, outputProtocol, cancellationToken);
- }
}
finally
@@ -224,8 +221,12 @@ namespace Thrift.Server
inputTransport?.Dispose();
outputTransport?.Dispose();
}
+ }
- Logger.LogTrace("Completed client request processing");
+ public override void Stop()
+ {
+ stop = true;
+ ServerTransport?.Close();
}
}
}
diff --git a/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs b/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs
index 46cc9d4a8..ba1834cde 100644
--- a/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs
+++ b/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs
@@ -105,7 +105,7 @@ namespace Thrift.Server
TTransportFactory outputTransportFactory,
TProtocolFactory inputProtocolFactory,
TProtocolFactory outputProtocolFactory,
- int minThreadPoolThreads, int maxThreadPoolThreads, ILogger logger= null)
+ int minThreadPoolThreads, int maxThreadPoolThreads, ILogger logger = null)
: this(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory,
inputProtocolFactory, outputProtocolFactory,
new Configuration(minThreadPoolThreads, maxThreadPoolThreads),
@@ -245,9 +245,9 @@ namespace Thrift.Server
connectionContext = await ServerEventHandler.CreateContextAsync(inputProtocol, outputProtocol, cancellationToken);
//Process client requests until client disconnects
- while (!stop)
+ while (!(stop || cancellationToken.IsCancellationRequested))
{
- if (! await inputTransport.PeekAsync(cancellationToken))
+ if (!await inputTransport.PeekAsync(cancellationToken))
break;
//Fire processContext server event
@@ -258,7 +258,7 @@ namespace Thrift.Server
await ServerEventHandler.ProcessContextAsync(connectionContext, inputTransport, cancellationToken);
//Process client request (blocks until transport is readable)
- if (! await processor.ProcessAsync(inputProtocol, outputProtocol, cancellationToken))
+ if (!await processor.ProcessAsync(inputProtocol, outputProtocol, cancellationToken))
break;
}
}