diff options
| author | Steven Shaw <steshaw@apache.org> | 2006-11-25 22:04:39 +0000 |
|---|---|---|
| committer | Steven Shaw <steshaw@apache.org> | 2006-11-25 22:04:39 +0000 |
| commit | 7c1f9158be7a5d1124a48f42f8d7dcfb6d5df2a6 (patch) | |
| tree | 3122525268281cd9df870e0a9cb309ee7410a424 /dotnet/Qpid.Client/Client/Protocol | |
| parent | 8f32ca18d5281eaa5baafa769c99fa70c830b14f (diff) | |
| download | qpid-python-7c1f9158be7a5d1124a48f42f8d7dcfb6d5df2a6.tar.gz | |
QPID-128 Initial import of the C# sources.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@479211 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'dotnet/Qpid.Client/Client/Protocol')
9 files changed, 969 insertions, 0 deletions
diff --git a/dotnet/Qpid.Client/Client/Protocol/AMQMethodEvent.cs b/dotnet/Qpid.Client/Client/Protocol/AMQMethodEvent.cs new file mode 100644 index 0000000000..ab40a83b3e --- /dev/null +++ b/dotnet/Qpid.Client/Client/Protocol/AMQMethodEvent.cs @@ -0,0 +1,75 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Text; +using Qpid.Framing; + +namespace Qpid.Client.Protocol +{ + public class AMQMethodEvent + { + private AMQMethodBody _method; + + private ushort _channelId; + + private AMQProtocolSession _protocolSession; + + public AMQMethodEvent(ushort channelId, AMQMethodBody method, AMQProtocolSession protocolSession) + { + _channelId = channelId; + _method = method; + _protocolSession = protocolSession; + } + + public AMQMethodBody Method + { + get + { + return _method; + } + } + + public ushort ChannelId + { + get + { + return _channelId; + } + } + + public AMQProtocolSession ProtocolSession + { + get + { + return _protocolSession; + } + } + + public override String ToString() + { + StringBuilder buf = new StringBuilder("Method event: "); + buf.Append("\nChannel id: ").Append(_channelId); + buf.Append("\nMethod: ").Append(_method); + return buf.ToString(); + } + } +} + diff --git a/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs b/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs new file mode 100644 index 0000000000..7256ab9250 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs @@ -0,0 +1,289 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using System.Threading; +using log4net; +using Qpid.Client.Failover; +using Qpid.Client.Protocol.Listener; +using Qpid.Client.State; +using Qpid.Framing; + +namespace Qpid.Client.Protocol +{ + public class AMQProtocolListener : IProtocolListener + { + private static readonly ILog _log = LogManager.GetLogger(typeof(AMQProtocolListener)); + + /** + * We create the failover handler when the session is created since it needs a reference to the IoSession in order + * to be able to send errors during failover back to the client application. The session won't be available in the + * case where we failing over due to a Connection.Redirect message from the broker. + */ + private FailoverHandler _failoverHandler; + + /** + * This flag is used to track whether failover is being attempted. It is used to prevent the application constantly + * attempting failover where it is failing. + */ + internal FailoverState _failoverState = FailoverState.NOT_STARTED; + + internal FailoverState FailoverState + { + get { return _failoverState; } + set { _failoverState = value; } + } + + internal ManualResetEvent FailoverLatch; + + AMQConnection _connection; + AMQStateManager _stateManager; + + public AMQStateManager StateManager + { + get { return _stateManager; } + set { _stateManager = value; } + } + + //private readonly CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet(); + private readonly ArrayList _frameListeners = ArrayList.Synchronized(new ArrayList()); + + AMQProtocolSession _protocolSession = null; // FIXME + public AMQProtocolSession ProtocolSession { set { _protocolSession = value; } } // FIXME: can this be fixed? + + + private readonly Object _lock = new Object(); + + public AMQProtocolListener(AMQConnection connection, AMQStateManager stateManager) + { + _connection = connection; + _stateManager = stateManager; + _failoverHandler = new FailoverHandler(connection); + } + + public void OnMessage(IDataBlock message) + { + // Handle incorrect protocol version. + if (message is ProtocolInitiation) + { + string error = String.Format("Protocol mismatch - {0}", message.ToString()); + AMQException e = new AMQProtocolHeaderException(error); + _log.Error("Closing connection because of protocol mismatch", e); + //_protocolSession.CloseProtocolSession(); + _stateManager.Error(e); + return; + } + + AMQFrame frame = (AMQFrame)message; + + if (frame.BodyFrame is AMQMethodBody) + { + if (_log.IsDebugEnabled) + { + _log.Debug("Method frame received: " + frame); + } + AMQMethodEvent evt = new AMQMethodEvent(frame.Channel, (AMQMethodBody)frame.BodyFrame, _protocolSession); + try + { + bool wasAnyoneInterested = false; + lock (_frameListeners.SyncRoot) + { + foreach (IAMQMethodListener listener in _frameListeners) + { + wasAnyoneInterested = listener.MethodReceived(evt) || wasAnyoneInterested; + } + } + if (!wasAnyoneInterested) + { + throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener."); + } + } + catch (Exception e) + { + foreach (IAMQMethodListener listener in _frameListeners) + { + listener.Error(e); + } + } + } + else if (frame.BodyFrame is ContentHeaderBody) + { + _protocolSession.MessageContentHeaderReceived(frame.Channel, + (ContentHeaderBody)frame.BodyFrame); + } + else if (frame.BodyFrame is ContentBody) + { + _protocolSession.MessageContentBodyReceived(frame.Channel, + (ContentBody)frame.BodyFrame); + } + else if (frame.BodyFrame is HeartbeatBody) + { + _log.Debug("HeartBeat received"); + } + //_connection.BytesReceived(_protocolSession.Channel.ReadBytes); // XXX: is this really useful? + } + + public void OnException(Exception cause) + { + _log.Warn("Protocol Listener received exception", cause); + lock (_lock) + { + if (_failoverState == FailoverState.NOT_STARTED) + { + if (!(cause is AMQUndeliveredException)) + { + WhenClosed(); + } + } + // We reach this point if failover was attempted and failed therefore we need to let the calling app + // know since we cannot recover the situation. + else if (_failoverState == FailoverState.FAILED) + { + // we notify the state manager of the error in case we have any clients waiting on a state + // change. Those "waiters" will be interrupted and can handle the exception + AMQException amqe = new AMQException("Protocol handler error: " + cause, cause); + PropagateExceptionToWaiters(amqe); + _connection.ExceptionReceived(cause); + } + } + } + + /** + * When the broker connection dies we can either get sessionClosed() called or exceptionCaught() followed by + * sessionClosed() depending on whether we were trying to send data at the time of failure. + * + * @param session + * @throws Exception + */ + void WhenClosed() + { + _connection.StopHeartBeatThread(); + + // TODO: Server just closes session with no warning if auth fails. + if (_connection.Closed) + { + _log.Info("Channel closed called by client"); + } + else + { + _log.Info("Channel closed called with failover state currently " + _failoverState); + + // Reconnectablility was introduced here so as not to disturb the client as they have made their intentions + // known through the policy settings. + + if ((_failoverState != FailoverState.IN_PROGRESS) && _connection.IsFailoverAllowed) + { + _log.Info("FAILOVER STARTING"); + if (_failoverState == FailoverState.NOT_STARTED) + { + _failoverState = FailoverState.IN_PROGRESS; + startFailoverThread(); + } + else + { + _log.Info("Not starting failover as state currently " + _failoverState); + } + } + else + { + _log.Info("Failover not allowed by policy."); + + if (_failoverState != FailoverState.IN_PROGRESS) + { + _log.Info("sessionClose() not allowed to failover"); + _connection.ExceptionReceived( + new AMQDisconnectedException("Server closed connection and reconnection not permitted.")); + } + else + { + _log.Info("sessionClose() failover in progress"); + } + } + } + + _log.Info("Protocol Channel [" + this + "] closed"); + } + + /// <summary> + /// There are two cases where we have other threads potentially blocking for events to be handled by this + /// class. These are for the state manager (waiting for a state change) or a frame listener (waiting for a + /// particular type of frame to arrive). When an error occurs we need to notify these waiters so that they can + /// react appropriately. + /// + /// <param name="e">the exception to propagate</param> + /// </summary> + public void PropagateExceptionToWaiters(Exception e) + { + // FIXME: not sure if required as StateManager is in _frameListeners. Probably something to do with fail-over. + _stateManager.Error(e); + + foreach (IAMQMethodListener listener in _frameListeners) + { + listener.Error(e); + } + } + + public void AddFrameListener(IAMQMethodListener listener) + { + _frameListeners.Add(listener); + } + + public void RemoveFrameListener(IAMQMethodListener listener) + { + if (_log.IsDebugEnabled) + { + _log.Debug("Removing frame listener: " + listener.ToString()); + } + _frameListeners.Remove(listener); + } + + public void BlockUntilNotFailingOver() + { + if (FailoverLatch != null) + { + FailoverLatch.WaitOne(); + } + } + + /// <summary> + /// "Failover" for redirection. + /// </summary> + /// <param name="host"></param> + /// <param name="port"></param> + public void Failover(string host, int port) + { + _failoverHandler.setHost(host); + _failoverHandler.setPort(port); + // see javadoc for FailoverHandler to see rationale for separate thread + startFailoverThread(); + } + + private void startFailoverThread() + { + Thread failoverThread = new Thread(new ThreadStart(_failoverHandler.Run)); + failoverThread.Name = "Failover"; + // Do not inherit daemon-ness from current thread as this can be a daemon + // thread such as a AnonymousIoService thread. + failoverThread.IsBackground = false; + failoverThread.Start(); + } + } +} diff --git a/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs b/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs new file mode 100644 index 0000000000..65aca0d942 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs @@ -0,0 +1,269 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using log4net; +using Qpid.Client.Message; +using Qpid.Client.Transport; +using Qpid.Framing; + +namespace Qpid.Client.Protocol +{ + public class AMQProtocolSession + { + private static readonly ILog _logger = LogManager.GetLogger(typeof(AMQProtocolSession)); + + private readonly IProtocolWriter _protocolWriter; + private readonly IConnectionCloser _connectionCloser; + + /** + * Counter to ensure unique queue names + */ + private int _queueId = 1; + private readonly Object _queueIdLock = new Object(); + + /// <summary> + /// Maps from the channel id to the AmqChannel that it represents. + /// </summary> + //private ConcurrentMap _channelId2SessionMap = new ConcurrentHashMap(); + private Hashtable _channelId2SessionMap = Hashtable.Synchronized(new Hashtable()); + + //private ConcurrentMap _closingChannels = new ConcurrentHashMap(); + private Hashtable _closingChannels = Hashtable.Synchronized(new Hashtable()); + + /// <summary> + /// Maps from a channel id to an unprocessed message. This is used to tie together the + /// JmsDeliverBody (which arrives first) with the subsequent content header and content bodies. + /// </summary> + //private ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap(); + private Hashtable _channelId2UnprocessedMsgMap = Hashtable.Synchronized(new Hashtable()); + + private AMQConnection _connection; + + public AMQProtocolSession(IProtocolWriter protocolWriter, IConnectionCloser connectionCloser, AMQConnection connection) + { + _protocolWriter = protocolWriter; + _connectionCloser = connectionCloser; + _connection = connection; + } + + public void Init() + { + // start the process of setting up the connection. This is the first place that + // data is written to the server. + _protocolWriter.Write(new ProtocolInitiation()); + } + + public string Username + { + get + { + return AMQConnection.Username; + } + } + + public string Password + { + get + { + return AMQConnection.Password; + } + } + + ConnectionTuneParameters _connectionTuneParameters; // TODO: should be able to have this in the Java too. + + public ConnectionTuneParameters ConnectionTuneParameters + { + get + { + return _connectionTuneParameters; + } + set + { + _connectionTuneParameters = value; + AMQConnection con = AMQConnection; + con.SetMaximumChannelCount(value.ChannelMax); + con.MaximumFrameSize = value.FrameMax; + } + } + + /// <summary> + /// Callback invoked from the BasicDeliverMethodHandler when a message has been received. + /// This is invoked on the MINA dispatcher thread. + /// </summary> + /// <param name="message">the unprocessed message</param> + /// <exception cname="AMQException">if this was not expected</exception> + public void UnprocessedMessageReceived(UnprocessedMessage message) + { + _channelId2UnprocessedMsgMap[message.ChannelId] = message; + } + + public void MessageContentHeaderReceived(ushort channelId, ContentHeaderBody contentHeader) + { + UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap[channelId]; + if (msg == null) + { + throw new AMQException("Error: received content header without having received a JMSDeliver frame first"); + } + if (msg.ContentHeader != null) + { + throw new AMQException("Error: received duplicate content header or did not receive correct number of content body frames"); + } + msg.ContentHeader = contentHeader; + if (contentHeader.BodySize == 0) + { + DeliverMessageToAMQSession(channelId, msg); + } + } + + public void MessageContentBodyReceived(ushort channelId, ContentBody contentBody) + { + UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap[channelId]; + if (msg == null) + { + throw new AMQException("Error: received content body without having received a BasicDeliver frame first"); + } + if (msg.ContentHeader == null) + { + _channelId2UnprocessedMsgMap.Remove(channelId); + throw new AMQException("Error: received content body without having received a ContentHeader frame first"); + } + try + { + msg.ReceiveBody(contentBody); + } + catch (UnexpectedBodyReceivedException e) + { + _channelId2UnprocessedMsgMap.Remove(channelId); + throw e; + } + if (msg.IsAllBodyDataReceived()) + { + DeliverMessageToAMQSession(channelId, msg); + } + } + + /// <summary> + /// Deliver a message to the appropriate session, removing the unprocessed message + /// from our map + /// <param name="channelId">the channel id the message should be delivered to</param> + /// <param name="msg"> the message</param> + private void DeliverMessageToAMQSession(ushort channelId, UnprocessedMessage msg) + { + AmqChannel channel = (AmqChannel) _channelId2SessionMap[channelId]; + channel.MessageReceived(msg); + _channelId2UnprocessedMsgMap.Remove(channelId); + } + + /// <summary> + /// Convenience method that writes a frame to the protocol session. Equivalent + /// to calling getProtocolSession().write(). + /// </summary> + /// <param name="frame">the frame to write</param> + public void WriteFrame(IDataBlock frame) + { + _protocolWriter.Write(frame); + } + + public void AddSessionByChannel(ushort channelId, AmqChannel channel) + { + if (channel == null) + { + throw new ArgumentNullException("Attempt to register a null channel"); + } + _logger.Debug("Add channel with channel id " + channelId); + _channelId2SessionMap[channelId] = channel; + } + + public void RemoveSessionByChannel(ushort channelId) + { + _logger.Debug("Removing session with channelId " + channelId); + _channelId2SessionMap.Remove(channelId); + } + + /// <summary> + /// Starts the process of closing a channel + /// </summary> + /// <param name="channel" the AmqChannel being closed</param> + public void CloseSession(AmqChannel channel) + { + _logger.Debug("closeSession called on protocol channel for channel " + channel.ChannelId); + ushort channelId = channel.ChannelId; + + // we need to know when a channel is closing so that we can respond + // with a channel.close frame when we receive any other type of frame + // on that channel + _closingChannels[channelId] = channel; + + } + + /// <summary> + /// Called from the ChannelClose handler when a channel close frame is received. + /// This method decides whether this is a response or an initiation. The latter + /// case causes the AmqChannel to be closed and an exception to be thrown if + /// appropriate. + /// </summary> + /// <param name="channelId">the id of the channel (session)</param> + /// <returns>true if the client must respond to the server, i.e. if the server + /// initiated the channel close, false if the channel close is just the server + /// responding to the client's earlier request to close the channel.</returns> + public bool ChannelClosed(ushort channelId, int code, string text) + { + // if this is not a response to an earlier request to close the channel + if (!_closingChannels.ContainsKey(channelId)) + { + _closingChannels.Remove(channelId); + AmqChannel channel = (AmqChannel) _channelId2SessionMap[channelId]; + channel.Closed(new AMQException(_logger, code, text)); + return true; + } + else + { + return false; + } + } + + public AMQConnection AMQConnection + { + get + { + return _connection; + } + } + + public void CloseProtocolSession() + { + _logger.Debug("Closing protocol session"); + _connectionCloser.Close(); + } + + internal string GenerateQueueName() + { + int id; + lock(_queueIdLock) + { + id = _queueId++; + } + + return "tmp_" + _connection.Transport.getLocalEndPoint() + "_" + id; + } + } +} diff --git a/dotnet/Qpid.Client/Client/Protocol/IConnectionCloser.cs b/dotnet/Qpid.Client/Client/Protocol/IConnectionCloser.cs new file mode 100644 index 0000000000..be8a24a9f4 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Protocol/IConnectionCloser.cs @@ -0,0 +1,27 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Qpid.Client.Protocol +{ + public interface IConnectionCloser + { + void Close(); + } +}
\ No newline at end of file diff --git a/dotnet/Qpid.Client/Client/Protocol/IProtocolListener.cs b/dotnet/Qpid.Client/Client/Protocol/IProtocolListener.cs new file mode 100644 index 0000000000..6ac8a7537e --- /dev/null +++ b/dotnet/Qpid.Client/Client/Protocol/IProtocolListener.cs @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using Qpid.Client.Protocol.Listener; +using Qpid.Framing; + +namespace Qpid.Client.Protocol +{ + public interface IProtocolListener + { + void OnMessage(IDataBlock message); + void OnException(Exception e); + + // XXX: .NET way of doing listeners? + void AddFrameListener(IAMQMethodListener listener); + void RemoveFrameListener(IAMQMethodListener listener); + } +} diff --git a/dotnet/Qpid.Client/Client/Protocol/Listener/BlockingMethodFrameListener.cs b/dotnet/Qpid.Client/Client/Protocol/Listener/BlockingMethodFrameListener.cs new file mode 100644 index 0000000000..99643fe59f --- /dev/null +++ b/dotnet/Qpid.Client/Client/Protocol/Listener/BlockingMethodFrameListener.cs @@ -0,0 +1,109 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; +using Qpid.Framing; + +namespace Qpid.Client.Protocol.Listener +{ + public abstract class BlockingMethodFrameListener : IAMQMethodListener + { + private ManualResetEvent _resetEvent; + + public abstract bool ProcessMethod(ushort channelId, AMQMethodBody frame); + + /// <summary> + /// This is set if there is an exception thrown from processCommandFrame and the + /// exception is rethrown to the caller of blockForFrame() + /// </summary> + private volatile Exception _error; + + protected ushort _channelId; + + protected AMQMethodEvent _doneEvt = null; + + public BlockingMethodFrameListener(ushort channelId) + { + _channelId = channelId; + _resetEvent = new ManualResetEvent(false); + } + + /// <summary> + /// This method is called by the MINA dispatching thread. Note that it could + /// be called before BlockForFrame() has been called. + /// </summary> + /// <param name="evt">the frame event</param> + /// <returns>true if the listener has dealt with this frame</returns> + /// <exception cref="AMQException"></exception> + public bool MethodReceived(AMQMethodEvent evt) + { + AMQMethodBody method = evt.Method; + + try + { + bool ready = (evt.ChannelId == _channelId) && ProcessMethod(evt.ChannelId, method); + if (ready) + { + _doneEvt = evt; + _resetEvent.Set(); + } + + return ready; + } + catch (AMQException e) + { + Error(e); + // we rethrow the error here, and the code in the frame dispatcher will go round + // each listener informing them that an exception has been thrown + throw e; + } + } + + /// <summary> + /// This method is called by the thread that wants to wait for a frame. + /// </summary> + public AMQMethodEvent BlockForFrame() + { + _resetEvent.WaitOne(); + //at this point the event will have been signalled. The error field might or might not be set + // depending on whether an error occurred + if (_error != null) + { + throw _error; + } + + return _doneEvt; + } + + /// <summary> + /// This is a callback, called by the MINA dispatcher thread only. It is also called from within this + /// class to avoid code repetition but again is only called by the MINA dispatcher thread. + /// </summary> + /// <param name="e">the exception that caused the error</param> + public void Error(Exception e) + { + // set the error so that the thread that is blocking (in BlockForFrame()) + // can pick up the exception and rethrow to the caller + _error = e; + _resetEvent.Set(); + } + } +} diff --git a/dotnet/Qpid.Client/Client/Protocol/Listener/IAMQMethodListener.cs b/dotnet/Qpid.Client/Client/Protocol/Listener/IAMQMethodListener.cs new file mode 100644 index 0000000000..db82eb1013 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Protocol/Listener/IAMQMethodListener.cs @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; + +namespace Qpid.Client.Protocol.Listener +{ + public interface IAMQMethodListener + { + /// <summary> + /// Invoked when a method frame has been received + /// <param name="evt">the event</param> + /// <returns>true if the handler has processed the method frame, false otherwise. Note + /// that this does not prohibit the method event being delivered to subsequent listeners + /// but can be used to determine if nobody has dealt with an incoming method frame.</param> + /// <exception cname="AMQException">if an error has occurred. This exception will be delivered + /// to all registered listeners using the error() method (see below) allowing them to + /// perform cleanup if necessary.</exception> + bool MethodReceived(AMQMethodEvent evt); + + /// <summary> + /// Callback when an error has occurred. Allows listeners to clean up. + /// </summary> + /// <param name="e">the exception</param> + void Error(Exception e); + } +} + diff --git a/dotnet/Qpid.Client/Client/Protocol/Listener/SpecificMethodFrameListener.cs b/dotnet/Qpid.Client/Client/Protocol/Listener/SpecificMethodFrameListener.cs new file mode 100644 index 0000000000..65460a0c2e --- /dev/null +++ b/dotnet/Qpid.Client/Client/Protocol/Listener/SpecificMethodFrameListener.cs @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using Qpid.Framing; + +namespace Qpid.Client.Protocol.Listener +{ + public class SpecificMethodFrameListener : BlockingMethodFrameListener + { + private readonly Type _expectedClass; + + public SpecificMethodFrameListener(ushort channelId, Type expectedClass) : base(channelId) + { + _expectedClass = expectedClass; + } + + public override bool ProcessMethod(ushort channelId, AMQMethodBody frame) + { + return _expectedClass.IsInstanceOfType(frame); + } + } +} + diff --git a/dotnet/Qpid.Client/Client/Protocol/ProtocolWriter.cs b/dotnet/Qpid.Client/Client/Protocol/ProtocolWriter.cs new file mode 100644 index 0000000000..32847f9b9b --- /dev/null +++ b/dotnet/Qpid.Client/Client/Protocol/ProtocolWriter.cs @@ -0,0 +1,78 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using Qpid.Client.Protocol.Listener; +using Qpid.Client.Transport; +using Qpid.Framing; + +namespace Qpid.Client.Protocol +{ + /// <summary> + /// A convenient interface to writing protocol frames. + /// </summary> + public class ProtocolWriter + { + IProtocolWriter _protocolWriter; + IProtocolListener _protocolListener; + + public ProtocolWriter(IProtocolWriter protocolWriter, IProtocolListener protocolListener) + { + _protocolWriter = protocolWriter; + _protocolListener = protocolListener; + } + + public void WriteFrame(IDataBlock frame) + { + _protocolWriter.Write(frame); + } + + /// <summary> + /// Convenience method that writes a frame to the protocol session and waits for + /// a particular response. Equivalent to calling getProtocolSession().write() then + /// waiting for the response. + /// </summary> + /// <param name="frame">the frame</param> + /// <param name="listener">the blocking listener. Note the calling thread will block.</param> + private AMQMethodEvent SyncWrite(AMQFrame frame, BlockingMethodFrameListener listener) + { + try + { + _protocolListener.AddFrameListener(listener); + _protocolWriter.Write(frame); + return listener.BlockForFrame(); + } + finally + { + _protocolListener.RemoveFrameListener(listener); + } + // When control resumes before this line, a reply will have been received + // that matches the criteria defined in the blocking listener + } + + public AMQMethodEvent SyncWrite(AMQFrame frame, Type responseType) + { + // TODO: If each frame knew it's response type, then the responseType argument would + // TODO: not be neccesary. + return SyncWrite(frame, new SpecificMethodFrameListener(frame.Channel, responseType)); + } + } +} + |
