(* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. *) unit Thrift.Transport.STOMP; interface uses Classes,Windows, SysUtils, Thrift, Thrift.Transport, Thrift.Protocol, Thrift.Stream, StompClient, StompTypes; type TStompTransportImpl = class( TStreamTransportImpl) strict private FData : TStringStream; FServer : string; FOutQueue : string; FStompCli : IStompClient; protected function GetIsOpen: Boolean; override; function Peek: Boolean; override; public constructor Create( const aServerAndPort, aOutQueue : string); destructor Destroy; override; procedure Open(); override; procedure Close(); override; procedure Flush; override; end; TStompServerTransportImpl = class( TServerTransportImpl) strict private FServer : string; FInQueue : string; FClient : IStompClient; protected procedure Listen; override; procedure Close; override; function Accept( const fnAccepting: TProc): ITransport; override; public constructor Create( const aServerAndPort, aInQueue : string); destructor Destroy; override; end; const QUEUE_PREFIX = '/queue/'; TOPIC_PREFIX = '/topic/'; EXCHANGE_PREFIX = '/exchange/'; implementation constructor TStompTransportImpl.Create( const aServerAndPort, aOutQueue : string); var adapter : IThriftStream; begin FData := TStringStream.Create; FServer := aServerAndPort; FOutQueue := aOutQueue; adapter := TThriftStreamAdapterDelphi.Create( FData, FALSE); inherited Create( nil, adapter); // output only end; destructor TStompTransportImpl.Destroy; begin inherited Destroy; FreeAndNil( FData); FStompCli := nil; end; function TStompTransportImpl.GetIsOpen: Boolean; begin result := (FStompCli <> nil); end; function TStompTransportImpl.Peek: Boolean; begin result := FALSE; // output only end; procedure TStompTransportImpl.Open; begin if FStompCli <> nil then raise TTransportException.Create( TTransportException.TExceptionType.AlreadyOpen, 'already open') else FStompCli := StompUtils.NewStomp( FServer); end; procedure TStompTransportImpl.Close; begin FStompCli := nil; FData.Clear; end; procedure TStompTransportImpl.Flush; begin if FStompCli = nil then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'not open'); FStompCli.Send( FOutQueue, FData.DataString); FData.Clear; end; //--- TStompServerTransportImpl -------------------------------------------- constructor TStompServerTransportImpl.Create( const aServerAndPort, aInQueue : string); begin inherited Create; FServer := aServerAndPort; FInQueue := aInQueue; end; destructor TStompServerTransportImpl.Destroy; begin try Close; finally inherited Destroy; end; end; procedure TStompServerTransportImpl.Listen; begin FClient := StompUtils.NewStomp(FServer); FClient.Subscribe( FInQueue); end; procedure TStompServerTransportImpl.Close; begin if FClient <> nil then begin FClient.Unsubscribe( FInQueue); FClient := nil; end; end; function TStompServerTransportImpl.Accept( const fnAccepting: TProc): ITransport; var frame : IStompFrame; adapter : IThriftStream; stream : TStringStream; begin if FClient = nil then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'Not connected.'); if Assigned(fnAccepting) then fnAccepting(); try frame := FClient.Receive(MAXINT); if frame = nil then Exit(nil); stream := TStringStream.Create( frame.GetBody); adapter := TThriftStreamAdapterDelphi.Create( stream, TRUE); result := TStreamTransportImpl.Create( adapter, nil); except on E: Exception do raise TTransportException.Create( E.ToString ); end; end; end.