From 684ccab5e72cddacda7a1fc2c1a80f23e1bc3163 Mon Sep 17 00:00:00 2001 From: Jens Geyer Date: Thu, 11 Sep 2014 21:14:44 +0200 Subject: THRIFT-2696 Unable to stop socket server while there are idle clients Client: Delphi Patch: Severian Duchenko & Jens Geyer The patch contains some additional refactoring, e.g. I consolidated the excessively overloaded CTORs a bit. --- lib/delphi/src/Thrift.Transport.pas | 144 +++++++++++++++--------------------- 1 file changed, 59 insertions(+), 85 deletions(-) (limited to 'lib/delphi/src') diff --git a/lib/delphi/src/Thrift.Transport.pas b/lib/delphi/src/Thrift.Transport.pas index b567aefd3..69d74a306 100644 --- a/lib/delphi/src/Thrift.Transport.pas +++ b/lib/delphi/src/Thrift.Transport.pas @@ -154,6 +154,7 @@ type TTcpSocketStreamImpl = class( TThriftStreamImpl ) private FTcpClient : TCustomIpClient; + FTimeout : Integer; protected procedure Write( const buffer: TBytes; offset: Integer; count: Integer); override; function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; override; @@ -164,7 +165,7 @@ type function IsOpen: Boolean; override; function ToArray: TBytes; override; public - constructor Create( const ATcpClient: TCustomIpClient); + constructor Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer = 0); end; IStreamTransport = interface( ITransport ) @@ -226,12 +227,8 @@ type protected function Accept( const fnAccepting: TProc) : ITransport; override; public - constructor Create( const AServer: TTcpServer ); overload; - constructor Create( const AServer: TTcpServer; AClientTimeout: Integer); overload; - constructor Create( APort: Integer); overload; - constructor Create( APort: Integer; AClientTimeout: Integer); overload; - constructor Create( APort: Integer; AClientTimeout: Integer; - AUseBufferedSockets: Boolean); overload; + constructor Create( const AServer: TTcpServer; AClientTimeout: Integer = 0); overload; + constructor Create( APort: Integer; AClientTimeout: Integer = 0; AUseBufferedSockets: Boolean = FALSE); overload; destructor Destroy; override; procedure Listen; override; procedure Close; override; @@ -273,9 +270,8 @@ type function GetIsOpen: Boolean; override; public procedure Open; override; - constructor Create( const AClient : TCustomIpClient); overload; - constructor Create( const AHost: string; APort: Integer); overload; - constructor Create( const AHost: string; APort: Integer; ATimeout: Integer); overload; + constructor Create( const AClient : TCustomIpClient; ATimeout: Integer = 0); overload; + constructor Create( const AHost: string; APort: Integer; ATimeout: Integer = 0); overload; destructor Destroy; override; procedure Close; override; property TcpClient: TCustomIpClient read FClient; @@ -532,16 +528,29 @@ begin FClientTimeout := AClientTimeout; end; -constructor TServerSocketImpl.Create( const AServer: TTcpServer); +constructor TServerSocketImpl.Create(APort, AClientTimeout: Integer; AUseBufferedSockets: Boolean); begin - //no inherited; - Create( AServer, 0 ); + inherited Create; + FPort := APort; + FClientTimeout := AClientTimeout; + FUseBufferedSocket := AUseBufferedSockets; + FOwnsServer := True; + FServer := TTcpServer.Create( nil ); + FServer.BlockMode := bmBlocking; +{$IF CompilerVersion >= 21.0} + FServer.LocalPort := AnsiString( IntToStr( FPort)); +{$ELSE} + FServer.LocalPort := IntToStr( FPort); +{$IFEND} end; -constructor TServerSocketImpl.Create(APort: Integer); +destructor TServerSocketImpl.Destroy; begin - //no inherited; - Create( APort, 0 ); + if FOwnsServer then begin + FServer.Free; + FServer := nil; + end; + inherited; end; function TServerSocketImpl.Accept( const fnAccepting: TProc): ITransport; @@ -574,7 +583,7 @@ begin Exit; end; - trans := TSocketImpl.Create( client); + trans := TSocketImpl.Create( client, FClientTimeout); if FUseBufferedSocket then result := TBufferedTransportImpl.Create( trans) else result := trans; @@ -587,92 +596,47 @@ begin end; end; -procedure TServerSocketImpl.Close; +procedure TServerSocketImpl.Listen; begin if FServer <> nil then begin try - FServer.Active := False; + FServer.Active := True; except on E: Exception do begin - raise TTransportException.Create('Error on closing socket : ' + E.Message); + raise TTransportException.Create('Could not accept on listening socket: ' + E.Message); end; end; end; end; -constructor TServerSocketImpl.Create(APort, AClientTimeout: Integer; - AUseBufferedSockets: Boolean); -begin - inherited Create; - FPort := APort; - FClientTimeout := AClientTimeout; - FUseBufferedSocket := AUseBufferedSockets; - FOwnsServer := True; - FServer := TTcpServer.Create( nil ); - FServer.BlockMode := bmBlocking; -{$IF CompilerVersion >= 21.0} - FServer.LocalPort := AnsiString( IntToStr( FPort)); -{$ELSE} - FServer.LocalPort := IntToStr( FPort); -{$IFEND} -end; - -destructor TServerSocketImpl.Destroy; -begin - if FOwnsServer then - begin - FServer.Free; - end; - inherited; -end; - -procedure TServerSocketImpl.Listen; +procedure TServerSocketImpl.Close; begin if FServer <> nil then begin try - FServer.Active := True; + FServer.Active := False; except on E: Exception do begin - raise TTransportException.Create('Could not accept on listening socket: ' + E.Message); + raise TTransportException.Create('Error on closing socket : ' + E.Message); end; end; end; end; -constructor TServerSocketImpl.Create(APort, AClientTimeout: Integer); -begin - //no inherited; - Create( APort, AClientTimeout, False ); -end; - { TSocket } -constructor TSocketImpl.Create( const AClient : TCustomIpClient); -var - stream : IThriftStream; +constructor TSocketImpl.Create( const AClient : TCustomIpClient; ATimeout: Integer = 0); +var stream : IThriftStream; begin FClient := AClient; - stream := TTcpSocketStreamImpl.Create( FClient); + FTimeout := ATimeout; + stream := TTcpSocketStreamImpl.Create( FClient, FTimeout); inherited Create( stream, stream); end; -constructor TSocketImpl.Create(const AHost: string; APort: Integer); -begin - //no inherited; - Create( AHost, APort, 0); -end; - -procedure TSocketImpl.Close; -begin - inherited Close; - if FClient <> nil - then FreeAndNil( FClient); -end; - constructor TSocketImpl.Create(const AHost: string; APort, ATimeout: Integer); begin inherited Create(nil,nil); @@ -691,6 +655,13 @@ begin inherited; end; +procedure TSocketImpl.Close; +begin + inherited Close; + if FClient <> nil + then FreeAndNil( FClient); +end; + function TSocketImpl.GetIsOpen: Boolean; begin Result := False; @@ -704,21 +675,17 @@ procedure TSocketImpl.InitSocket; var stream : IThriftStream; begin - if FClient <> nil then - begin - if FOwnsClient then - begin - FClient.Free; - FClient := nil; - end; + if (FClient <> nil) and FOwnsClient then begin + FClient.Free; + FClient := nil; end; - FClient := TTcpClient.Create( nil ); + + FClient := TTcpClient.Create( nil); FOwnsClient := True; - stream := TTcpSocketStreamImpl.Create( FClient); + stream := TTcpSocketStreamImpl.Create( FClient, FTimeout); FInputStream := stream; FOutputStream := stream; - end; procedure TSocketImpl.Open; @@ -750,7 +717,7 @@ begin FClient.RemotePort := TSocketPort( IntToStr( Port)); FClient.Connect; - FInputStream := TTcpSocketStreamImpl.Create( FClient); + FInputStream := TTcpSocketStreamImpl.Create( FClient, FTimeout); FOutputStream := FInputStream; end; @@ -1199,10 +1166,11 @@ begin FTcpClient.Close; end; -constructor TTcpSocketStreamImpl.Create( const ATcpClient: TCustomIpClient); +constructor TTcpSocketStreamImpl.Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer); begin inherited Create; FTcpClient := ATcpClient; + FTimeout := aTimeout; end; procedure TTcpSocketStreamImpl.Flush; @@ -1224,7 +1192,13 @@ function TTcpSocketStreamImpl.Read(var buffer: TBytes; offset, count: Integer): Integer; begin inherited; - Result := FTcpClient.ReceiveBuf( Pointer(@buffer[offset])^, count); + + if (FTimeout > 0) then begin + if not FTcpClient.WaitForData(FTimeout) + then Exit(0); + end; + + result := FTcpClient.ReceiveBuf( Pointer(@buffer[offset])^, count); end; function TTcpSocketStreamImpl.ToArray: TBytes; -- cgit v1.2.1