summaryrefslogtreecommitdiff
path: root/dotnet/Qpid.Client/Client/Protocol
diff options
context:
space:
mode:
authorSteven Shaw <steshaw@apache.org>2006-11-25 22:04:39 +0000
committerSteven Shaw <steshaw@apache.org>2006-11-25 22:04:39 +0000
commit7c1f9158be7a5d1124a48f42f8d7dcfb6d5df2a6 (patch)
tree3122525268281cd9df870e0a9cb309ee7410a424 /dotnet/Qpid.Client/Client/Protocol
parent8f32ca18d5281eaa5baafa769c99fa70c830b14f (diff)
downloadqpid-python-7c1f9158be7a5d1124a48f42f8d7dcfb6d5df2a6.tar.gz
QPID-128 Initial import of the C# sources.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@479211 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'dotnet/Qpid.Client/Client/Protocol')
-rw-r--r--dotnet/Qpid.Client/Client/Protocol/AMQMethodEvent.cs75
-rw-r--r--dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs289
-rw-r--r--dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs269
-rw-r--r--dotnet/Qpid.Client/Client/Protocol/IConnectionCloser.cs27
-rw-r--r--dotnet/Qpid.Client/Client/Protocol/IProtocolListener.cs36
-rw-r--r--dotnet/Qpid.Client/Client/Protocol/Listener/BlockingMethodFrameListener.cs109
-rw-r--r--dotnet/Qpid.Client/Client/Protocol/Listener/IAMQMethodListener.cs45
-rw-r--r--dotnet/Qpid.Client/Client/Protocol/Listener/SpecificMethodFrameListener.cs41
-rw-r--r--dotnet/Qpid.Client/Client/Protocol/ProtocolWriter.cs78
9 files changed, 969 insertions, 0 deletions
diff --git a/dotnet/Qpid.Client/Client/Protocol/AMQMethodEvent.cs b/dotnet/Qpid.Client/Client/Protocol/AMQMethodEvent.cs
new file mode 100644
index 0000000000..ab40a83b3e
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Protocol/AMQMethodEvent.cs
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Text;
+using Qpid.Framing;
+
+namespace Qpid.Client.Protocol
+{
+ public class AMQMethodEvent
+ {
+ private AMQMethodBody _method;
+
+ private ushort _channelId;
+
+ private AMQProtocolSession _protocolSession;
+
+ public AMQMethodEvent(ushort channelId, AMQMethodBody method, AMQProtocolSession protocolSession)
+ {
+ _channelId = channelId;
+ _method = method;
+ _protocolSession = protocolSession;
+ }
+
+ public AMQMethodBody Method
+ {
+ get
+ {
+ return _method;
+ }
+ }
+
+ public ushort ChannelId
+ {
+ get
+ {
+ return _channelId;
+ }
+ }
+
+ public AMQProtocolSession ProtocolSession
+ {
+ get
+ {
+ return _protocolSession;
+ }
+ }
+
+ public override String ToString()
+ {
+ StringBuilder buf = new StringBuilder("Method event: ");
+ buf.Append("\nChannel id: ").Append(_channelId);
+ buf.Append("\nMethod: ").Append(_method);
+ return buf.ToString();
+ }
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs b/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs
new file mode 100644
index 0000000000..7256ab9250
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs
@@ -0,0 +1,289 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Threading;
+using log4net;
+using Qpid.Client.Failover;
+using Qpid.Client.Protocol.Listener;
+using Qpid.Client.State;
+using Qpid.Framing;
+
+namespace Qpid.Client.Protocol
+{
+ public class AMQProtocolListener : IProtocolListener
+ {
+ private static readonly ILog _log = LogManager.GetLogger(typeof(AMQProtocolListener));
+
+ /**
+ * We create the failover handler when the session is created since it needs a reference to the IoSession in order
+ * to be able to send errors during failover back to the client application. The session won't be available in the
+ * case where we failing over due to a Connection.Redirect message from the broker.
+ */
+ private FailoverHandler _failoverHandler;
+
+ /**
+ * This flag is used to track whether failover is being attempted. It is used to prevent the application constantly
+ * attempting failover where it is failing.
+ */
+ internal FailoverState _failoverState = FailoverState.NOT_STARTED;
+
+ internal FailoverState FailoverState
+ {
+ get { return _failoverState; }
+ set { _failoverState = value; }
+ }
+
+ internal ManualResetEvent FailoverLatch;
+
+ AMQConnection _connection;
+ AMQStateManager _stateManager;
+
+ public AMQStateManager StateManager
+ {
+ get { return _stateManager; }
+ set { _stateManager = value; }
+ }
+
+ //private readonly CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet();
+ private readonly ArrayList _frameListeners = ArrayList.Synchronized(new ArrayList());
+
+ AMQProtocolSession _protocolSession = null; // FIXME
+ public AMQProtocolSession ProtocolSession { set { _protocolSession = value; } } // FIXME: can this be fixed?
+
+
+ private readonly Object _lock = new Object();
+
+ public AMQProtocolListener(AMQConnection connection, AMQStateManager stateManager)
+ {
+ _connection = connection;
+ _stateManager = stateManager;
+ _failoverHandler = new FailoverHandler(connection);
+ }
+
+ public void OnMessage(IDataBlock message)
+ {
+ // Handle incorrect protocol version.
+ if (message is ProtocolInitiation)
+ {
+ string error = String.Format("Protocol mismatch - {0}", message.ToString());
+ AMQException e = new AMQProtocolHeaderException(error);
+ _log.Error("Closing connection because of protocol mismatch", e);
+ //_protocolSession.CloseProtocolSession();
+ _stateManager.Error(e);
+ return;
+ }
+
+ AMQFrame frame = (AMQFrame)message;
+
+ if (frame.BodyFrame is AMQMethodBody)
+ {
+ if (_log.IsDebugEnabled)
+ {
+ _log.Debug("Method frame received: " + frame);
+ }
+ AMQMethodEvent evt = new AMQMethodEvent(frame.Channel, (AMQMethodBody)frame.BodyFrame, _protocolSession);
+ try
+ {
+ bool wasAnyoneInterested = false;
+ lock (_frameListeners.SyncRoot)
+ {
+ foreach (IAMQMethodListener listener in _frameListeners)
+ {
+ wasAnyoneInterested = listener.MethodReceived(evt) || wasAnyoneInterested;
+ }
+ }
+ if (!wasAnyoneInterested)
+ {
+ throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.");
+ }
+ }
+ catch (Exception e)
+ {
+ foreach (IAMQMethodListener listener in _frameListeners)
+ {
+ listener.Error(e);
+ }
+ }
+ }
+ else if (frame.BodyFrame is ContentHeaderBody)
+ {
+ _protocolSession.MessageContentHeaderReceived(frame.Channel,
+ (ContentHeaderBody)frame.BodyFrame);
+ }
+ else if (frame.BodyFrame is ContentBody)
+ {
+ _protocolSession.MessageContentBodyReceived(frame.Channel,
+ (ContentBody)frame.BodyFrame);
+ }
+ else if (frame.BodyFrame is HeartbeatBody)
+ {
+ _log.Debug("HeartBeat received");
+ }
+ //_connection.BytesReceived(_protocolSession.Channel.ReadBytes); // XXX: is this really useful?
+ }
+
+ public void OnException(Exception cause)
+ {
+ _log.Warn("Protocol Listener received exception", cause);
+ lock (_lock)
+ {
+ if (_failoverState == FailoverState.NOT_STARTED)
+ {
+ if (!(cause is AMQUndeliveredException))
+ {
+ WhenClosed();
+ }
+ }
+ // We reach this point if failover was attempted and failed therefore we need to let the calling app
+ // know since we cannot recover the situation.
+ else if (_failoverState == FailoverState.FAILED)
+ {
+ // we notify the state manager of the error in case we have any clients waiting on a state
+ // change. Those "waiters" will be interrupted and can handle the exception
+ AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
+ PropagateExceptionToWaiters(amqe);
+ _connection.ExceptionReceived(cause);
+ }
+ }
+ }
+
+ /**
+ * When the broker connection dies we can either get sessionClosed() called or exceptionCaught() followed by
+ * sessionClosed() depending on whether we were trying to send data at the time of failure.
+ *
+ * @param session
+ * @throws Exception
+ */
+ void WhenClosed()
+ {
+ _connection.StopHeartBeatThread();
+
+ // TODO: Server just closes session with no warning if auth fails.
+ if (_connection.Closed)
+ {
+ _log.Info("Channel closed called by client");
+ }
+ else
+ {
+ _log.Info("Channel closed called with failover state currently " + _failoverState);
+
+ // Reconnectablility was introduced here so as not to disturb the client as they have made their intentions
+ // known through the policy settings.
+
+ if ((_failoverState != FailoverState.IN_PROGRESS) && _connection.IsFailoverAllowed)
+ {
+ _log.Info("FAILOVER STARTING");
+ if (_failoverState == FailoverState.NOT_STARTED)
+ {
+ _failoverState = FailoverState.IN_PROGRESS;
+ startFailoverThread();
+ }
+ else
+ {
+ _log.Info("Not starting failover as state currently " + _failoverState);
+ }
+ }
+ else
+ {
+ _log.Info("Failover not allowed by policy.");
+
+ if (_failoverState != FailoverState.IN_PROGRESS)
+ {
+ _log.Info("sessionClose() not allowed to failover");
+ _connection.ExceptionReceived(
+ new AMQDisconnectedException("Server closed connection and reconnection not permitted."));
+ }
+ else
+ {
+ _log.Info("sessionClose() failover in progress");
+ }
+ }
+ }
+
+ _log.Info("Protocol Channel [" + this + "] closed");
+ }
+
+ /// <summary>
+ /// There are two cases where we have other threads potentially blocking for events to be handled by this
+ /// class. These are for the state manager (waiting for a state change) or a frame listener (waiting for a
+ /// particular type of frame to arrive). When an error occurs we need to notify these waiters so that they can
+ /// react appropriately.
+ ///
+ /// <param name="e">the exception to propagate</param>
+ /// </summary>
+ public void PropagateExceptionToWaiters(Exception e)
+ {
+ // FIXME: not sure if required as StateManager is in _frameListeners. Probably something to do with fail-over.
+ _stateManager.Error(e);
+
+ foreach (IAMQMethodListener listener in _frameListeners)
+ {
+ listener.Error(e);
+ }
+ }
+
+ public void AddFrameListener(IAMQMethodListener listener)
+ {
+ _frameListeners.Add(listener);
+ }
+
+ public void RemoveFrameListener(IAMQMethodListener listener)
+ {
+ if (_log.IsDebugEnabled)
+ {
+ _log.Debug("Removing frame listener: " + listener.ToString());
+ }
+ _frameListeners.Remove(listener);
+ }
+
+ public void BlockUntilNotFailingOver()
+ {
+ if (FailoverLatch != null)
+ {
+ FailoverLatch.WaitOne();
+ }
+ }
+
+ /// <summary>
+ /// "Failover" for redirection.
+ /// </summary>
+ /// <param name="host"></param>
+ /// <param name="port"></param>
+ public void Failover(string host, int port)
+ {
+ _failoverHandler.setHost(host);
+ _failoverHandler.setPort(port);
+ // see javadoc for FailoverHandler to see rationale for separate thread
+ startFailoverThread();
+ }
+
+ private void startFailoverThread()
+ {
+ Thread failoverThread = new Thread(new ThreadStart(_failoverHandler.Run));
+ failoverThread.Name = "Failover";
+ // Do not inherit daemon-ness from current thread as this can be a daemon
+ // thread such as a AnonymousIoService thread.
+ failoverThread.IsBackground = false;
+ failoverThread.Start();
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs b/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs
new file mode 100644
index 0000000000..65aca0d942
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs
@@ -0,0 +1,269 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using log4net;
+using Qpid.Client.Message;
+using Qpid.Client.Transport;
+using Qpid.Framing;
+
+namespace Qpid.Client.Protocol
+{
+ public class AMQProtocolSession
+ {
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(AMQProtocolSession));
+
+ private readonly IProtocolWriter _protocolWriter;
+ private readonly IConnectionCloser _connectionCloser;
+
+ /**
+ * Counter to ensure unique queue names
+ */
+ private int _queueId = 1;
+ private readonly Object _queueIdLock = new Object();
+
+ /// <summary>
+ /// Maps from the channel id to the AmqChannel that it represents.
+ /// </summary>
+ //private ConcurrentMap _channelId2SessionMap = new ConcurrentHashMap();
+ private Hashtable _channelId2SessionMap = Hashtable.Synchronized(new Hashtable());
+
+ //private ConcurrentMap _closingChannels = new ConcurrentHashMap();
+ private Hashtable _closingChannels = Hashtable.Synchronized(new Hashtable());
+
+ /// <summary>
+ /// Maps from a channel id to an unprocessed message. This is used to tie together the
+ /// JmsDeliverBody (which arrives first) with the subsequent content header and content bodies.
+ /// </summary>
+ //private ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap();
+ private Hashtable _channelId2UnprocessedMsgMap = Hashtable.Synchronized(new Hashtable());
+
+ private AMQConnection _connection;
+
+ public AMQProtocolSession(IProtocolWriter protocolWriter, IConnectionCloser connectionCloser, AMQConnection connection)
+ {
+ _protocolWriter = protocolWriter;
+ _connectionCloser = connectionCloser;
+ _connection = connection;
+ }
+
+ public void Init()
+ {
+ // start the process of setting up the connection. This is the first place that
+ // data is written to the server.
+ _protocolWriter.Write(new ProtocolInitiation());
+ }
+
+ public string Username
+ {
+ get
+ {
+ return AMQConnection.Username;
+ }
+ }
+
+ public string Password
+ {
+ get
+ {
+ return AMQConnection.Password;
+ }
+ }
+
+ ConnectionTuneParameters _connectionTuneParameters; // TODO: should be able to have this in the Java too.
+
+ public ConnectionTuneParameters ConnectionTuneParameters
+ {
+ get
+ {
+ return _connectionTuneParameters;
+ }
+ set
+ {
+ _connectionTuneParameters = value;
+ AMQConnection con = AMQConnection;
+ con.SetMaximumChannelCount(value.ChannelMax);
+ con.MaximumFrameSize = value.FrameMax;
+ }
+ }
+
+ /// <summary>
+ /// Callback invoked from the BasicDeliverMethodHandler when a message has been received.
+ /// This is invoked on the MINA dispatcher thread.
+ /// </summary>
+ /// <param name="message">the unprocessed message</param>
+ /// <exception cname="AMQException">if this was not expected</exception>
+ public void UnprocessedMessageReceived(UnprocessedMessage message)
+ {
+ _channelId2UnprocessedMsgMap[message.ChannelId] = message;
+ }
+
+ public void MessageContentHeaderReceived(ushort channelId, ContentHeaderBody contentHeader)
+ {
+ UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap[channelId];
+ if (msg == null)
+ {
+ throw new AMQException("Error: received content header without having received a JMSDeliver frame first");
+ }
+ if (msg.ContentHeader != null)
+ {
+ throw new AMQException("Error: received duplicate content header or did not receive correct number of content body frames");
+ }
+ msg.ContentHeader = contentHeader;
+ if (contentHeader.BodySize == 0)
+ {
+ DeliverMessageToAMQSession(channelId, msg);
+ }
+ }
+
+ public void MessageContentBodyReceived(ushort channelId, ContentBody contentBody)
+ {
+ UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap[channelId];
+ if (msg == null)
+ {
+ throw new AMQException("Error: received content body without having received a BasicDeliver frame first");
+ }
+ if (msg.ContentHeader == null)
+ {
+ _channelId2UnprocessedMsgMap.Remove(channelId);
+ throw new AMQException("Error: received content body without having received a ContentHeader frame first");
+ }
+ try
+ {
+ msg.ReceiveBody(contentBody);
+ }
+ catch (UnexpectedBodyReceivedException e)
+ {
+ _channelId2UnprocessedMsgMap.Remove(channelId);
+ throw e;
+ }
+ if (msg.IsAllBodyDataReceived())
+ {
+ DeliverMessageToAMQSession(channelId, msg);
+ }
+ }
+
+ /// <summary>
+ /// Deliver a message to the appropriate session, removing the unprocessed message
+ /// from our map
+ /// <param name="channelId">the channel id the message should be delivered to</param>
+ /// <param name="msg"> the message</param>
+ private void DeliverMessageToAMQSession(ushort channelId, UnprocessedMessage msg)
+ {
+ AmqChannel channel = (AmqChannel) _channelId2SessionMap[channelId];
+ channel.MessageReceived(msg);
+ _channelId2UnprocessedMsgMap.Remove(channelId);
+ }
+
+ /// <summary>
+ /// Convenience method that writes a frame to the protocol session. Equivalent
+ /// to calling getProtocolSession().write().
+ /// </summary>
+ /// <param name="frame">the frame to write</param>
+ public void WriteFrame(IDataBlock frame)
+ {
+ _protocolWriter.Write(frame);
+ }
+
+ public void AddSessionByChannel(ushort channelId, AmqChannel channel)
+ {
+ if (channel == null)
+ {
+ throw new ArgumentNullException("Attempt to register a null channel");
+ }
+ _logger.Debug("Add channel with channel id " + channelId);
+ _channelId2SessionMap[channelId] = channel;
+ }
+
+ public void RemoveSessionByChannel(ushort channelId)
+ {
+ _logger.Debug("Removing session with channelId " + channelId);
+ _channelId2SessionMap.Remove(channelId);
+ }
+
+ /// <summary>
+ /// Starts the process of closing a channel
+ /// </summary>
+ /// <param name="channel" the AmqChannel being closed</param>
+ public void CloseSession(AmqChannel channel)
+ {
+ _logger.Debug("closeSession called on protocol channel for channel " + channel.ChannelId);
+ ushort channelId = channel.ChannelId;
+
+ // we need to know when a channel is closing so that we can respond
+ // with a channel.close frame when we receive any other type of frame
+ // on that channel
+ _closingChannels[channelId] = channel;
+
+ }
+
+ /// <summary>
+ /// Called from the ChannelClose handler when a channel close frame is received.
+ /// This method decides whether this is a response or an initiation. The latter
+ /// case causes the AmqChannel to be closed and an exception to be thrown if
+ /// appropriate.
+ /// </summary>
+ /// <param name="channelId">the id of the channel (session)</param>
+ /// <returns>true if the client must respond to the server, i.e. if the server
+ /// initiated the channel close, false if the channel close is just the server
+ /// responding to the client's earlier request to close the channel.</returns>
+ public bool ChannelClosed(ushort channelId, int code, string text)
+ {
+ // if this is not a response to an earlier request to close the channel
+ if (!_closingChannels.ContainsKey(channelId))
+ {
+ _closingChannels.Remove(channelId);
+ AmqChannel channel = (AmqChannel) _channelId2SessionMap[channelId];
+ channel.Closed(new AMQException(_logger, code, text));
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public AMQConnection AMQConnection
+ {
+ get
+ {
+ return _connection;
+ }
+ }
+
+ public void CloseProtocolSession()
+ {
+ _logger.Debug("Closing protocol session");
+ _connectionCloser.Close();
+ }
+
+ internal string GenerateQueueName()
+ {
+ int id;
+ lock(_queueIdLock)
+ {
+ id = _queueId++;
+ }
+
+ return "tmp_" + _connection.Transport.getLocalEndPoint() + "_" + id;
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Protocol/IConnectionCloser.cs b/dotnet/Qpid.Client/Client/Protocol/IConnectionCloser.cs
new file mode 100644
index 0000000000..be8a24a9f4
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Protocol/IConnectionCloser.cs
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace Qpid.Client.Protocol
+{
+ public interface IConnectionCloser
+ {
+ void Close();
+ }
+} \ No newline at end of file
diff --git a/dotnet/Qpid.Client/Client/Protocol/IProtocolListener.cs b/dotnet/Qpid.Client/Client/Protocol/IProtocolListener.cs
new file mode 100644
index 0000000000..6ac8a7537e
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Protocol/IProtocolListener.cs
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using Qpid.Client.Protocol.Listener;
+using Qpid.Framing;
+
+namespace Qpid.Client.Protocol
+{
+ public interface IProtocolListener
+ {
+ void OnMessage(IDataBlock message);
+ void OnException(Exception e);
+
+ // XXX: .NET way of doing listeners?
+ void AddFrameListener(IAMQMethodListener listener);
+ void RemoveFrameListener(IAMQMethodListener listener);
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Protocol/Listener/BlockingMethodFrameListener.cs b/dotnet/Qpid.Client/Client/Protocol/Listener/BlockingMethodFrameListener.cs
new file mode 100644
index 0000000000..99643fe59f
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Protocol/Listener/BlockingMethodFrameListener.cs
@@ -0,0 +1,109 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Threading;
+using Qpid.Framing;
+
+namespace Qpid.Client.Protocol.Listener
+{
+ public abstract class BlockingMethodFrameListener : IAMQMethodListener
+ {
+ private ManualResetEvent _resetEvent;
+
+ public abstract bool ProcessMethod(ushort channelId, AMQMethodBody frame);
+
+ /// <summary>
+ /// This is set if there is an exception thrown from processCommandFrame and the
+ /// exception is rethrown to the caller of blockForFrame()
+ /// </summary>
+ private volatile Exception _error;
+
+ protected ushort _channelId;
+
+ protected AMQMethodEvent _doneEvt = null;
+
+ public BlockingMethodFrameListener(ushort channelId)
+ {
+ _channelId = channelId;
+ _resetEvent = new ManualResetEvent(false);
+ }
+
+ /// <summary>
+ /// This method is called by the MINA dispatching thread. Note that it could
+ /// be called before BlockForFrame() has been called.
+ /// </summary>
+ /// <param name="evt">the frame event</param>
+ /// <returns>true if the listener has dealt with this frame</returns>
+ /// <exception cref="AMQException"></exception>
+ public bool MethodReceived(AMQMethodEvent evt)
+ {
+ AMQMethodBody method = evt.Method;
+
+ try
+ {
+ bool ready = (evt.ChannelId == _channelId) && ProcessMethod(evt.ChannelId, method);
+ if (ready)
+ {
+ _doneEvt = evt;
+ _resetEvent.Set();
+ }
+
+ return ready;
+ }
+ catch (AMQException e)
+ {
+ Error(e);
+ // we rethrow the error here, and the code in the frame dispatcher will go round
+ // each listener informing them that an exception has been thrown
+ throw e;
+ }
+ }
+
+ /// <summary>
+ /// This method is called by the thread that wants to wait for a frame.
+ /// </summary>
+ public AMQMethodEvent BlockForFrame()
+ {
+ _resetEvent.WaitOne();
+ //at this point the event will have been signalled. The error field might or might not be set
+ // depending on whether an error occurred
+ if (_error != null)
+ {
+ throw _error;
+ }
+
+ return _doneEvt;
+ }
+
+ /// <summary>
+ /// This is a callback, called by the MINA dispatcher thread only. It is also called from within this
+ /// class to avoid code repetition but again is only called by the MINA dispatcher thread.
+ /// </summary>
+ /// <param name="e">the exception that caused the error</param>
+ public void Error(Exception e)
+ {
+ // set the error so that the thread that is blocking (in BlockForFrame())
+ // can pick up the exception and rethrow to the caller
+ _error = e;
+ _resetEvent.Set();
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Protocol/Listener/IAMQMethodListener.cs b/dotnet/Qpid.Client/Client/Protocol/Listener/IAMQMethodListener.cs
new file mode 100644
index 0000000000..db82eb1013
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Protocol/Listener/IAMQMethodListener.cs
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+
+namespace Qpid.Client.Protocol.Listener
+{
+ public interface IAMQMethodListener
+ {
+ /// <summary>
+ /// Invoked when a method frame has been received
+ /// <param name="evt">the event</param>
+ /// <returns>true if the handler has processed the method frame, false otherwise. Note
+ /// that this does not prohibit the method event being delivered to subsequent listeners
+ /// but can be used to determine if nobody has dealt with an incoming method frame.</param>
+ /// <exception cname="AMQException">if an error has occurred. This exception will be delivered
+ /// to all registered listeners using the error() method (see below) allowing them to
+ /// perform cleanup if necessary.</exception>
+ bool MethodReceived(AMQMethodEvent evt);
+
+ /// <summary>
+ /// Callback when an error has occurred. Allows listeners to clean up.
+ /// </summary>
+ /// <param name="e">the exception</param>
+ void Error(Exception e);
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/Protocol/Listener/SpecificMethodFrameListener.cs b/dotnet/Qpid.Client/Client/Protocol/Listener/SpecificMethodFrameListener.cs
new file mode 100644
index 0000000000..65460a0c2e
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Protocol/Listener/SpecificMethodFrameListener.cs
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using Qpid.Framing;
+
+namespace Qpid.Client.Protocol.Listener
+{
+ public class SpecificMethodFrameListener : BlockingMethodFrameListener
+ {
+ private readonly Type _expectedClass;
+
+ public SpecificMethodFrameListener(ushort channelId, Type expectedClass) : base(channelId)
+ {
+ _expectedClass = expectedClass;
+ }
+
+ public override bool ProcessMethod(ushort channelId, AMQMethodBody frame)
+ {
+ return _expectedClass.IsInstanceOfType(frame);
+ }
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/Protocol/ProtocolWriter.cs b/dotnet/Qpid.Client/Client/Protocol/ProtocolWriter.cs
new file mode 100644
index 0000000000..32847f9b9b
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Protocol/ProtocolWriter.cs
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using Qpid.Client.Protocol.Listener;
+using Qpid.Client.Transport;
+using Qpid.Framing;
+
+namespace Qpid.Client.Protocol
+{
+ /// <summary>
+ /// A convenient interface to writing protocol frames.
+ /// </summary>
+ public class ProtocolWriter
+ {
+ IProtocolWriter _protocolWriter;
+ IProtocolListener _protocolListener;
+
+ public ProtocolWriter(IProtocolWriter protocolWriter, IProtocolListener protocolListener)
+ {
+ _protocolWriter = protocolWriter;
+ _protocolListener = protocolListener;
+ }
+
+ public void WriteFrame(IDataBlock frame)
+ {
+ _protocolWriter.Write(frame);
+ }
+
+ /// <summary>
+ /// Convenience method that writes a frame to the protocol session and waits for
+ /// a particular response. Equivalent to calling getProtocolSession().write() then
+ /// waiting for the response.
+ /// </summary>
+ /// <param name="frame">the frame</param>
+ /// <param name="listener">the blocking listener. Note the calling thread will block.</param>
+ private AMQMethodEvent SyncWrite(AMQFrame frame, BlockingMethodFrameListener listener)
+ {
+ try
+ {
+ _protocolListener.AddFrameListener(listener);
+ _protocolWriter.Write(frame);
+ return listener.BlockForFrame();
+ }
+ finally
+ {
+ _protocolListener.RemoveFrameListener(listener);
+ }
+ // When control resumes before this line, a reply will have been received
+ // that matches the criteria defined in the blocking listener
+ }
+
+ public AMQMethodEvent SyncWrite(AMQFrame frame, Type responseType)
+ {
+ // TODO: If each frame knew it's response type, then the responseType argument would
+ // TODO: not be neccesary.
+ return SyncWrite(frame, new SpecificMethodFrameListener(frame.Channel, responseType));
+ }
+ }
+}
+