summaryrefslogtreecommitdiff
path: root/dotnet/Qpid.Client/Client/AmqChannel.cs
diff options
context:
space:
mode:
authorSteven Shaw <steshaw@apache.org>2006-11-25 22:04:39 +0000
committerSteven Shaw <steshaw@apache.org>2006-11-25 22:04:39 +0000
commit7c1f9158be7a5d1124a48f42f8d7dcfb6d5df2a6 (patch)
tree3122525268281cd9df870e0a9cb309ee7410a424 /dotnet/Qpid.Client/Client/AmqChannel.cs
parent8f32ca18d5281eaa5baafa769c99fa70c830b14f (diff)
downloadqpid-python-7c1f9158be7a5d1124a48f42f8d7dcfb6d5df2a6.tar.gz
QPID-128 Initial import of the C# sources.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@479211 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'dotnet/Qpid.Client/Client/AmqChannel.cs')
-rw-r--r--dotnet/Qpid.Client/Client/AmqChannel.cs1071
1 files changed, 1071 insertions, 0 deletions
diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs
new file mode 100644
index 0000000000..02818940dd
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/AmqChannel.cs
@@ -0,0 +1,1071 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Text.RegularExpressions;
+using System.Threading;
+using log4net;
+using Qpid.Client.Message;
+using Qpid.Collections;
+using Qpid.Framing;
+using Qpid.Messaging;
+
+namespace Qpid.Client
+{
+ public class AmqChannel : Closeable, IChannel
+ {
+ private const int BASIC_CONTENT_TYPE = 60;
+
+ private static readonly ILog _logger = LogManager.GetLogger(typeof (AmqChannel));
+
+ private static int _nextSessionNumber = 0;
+
+ private int _sessionNumber;
+
+ // Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature.
+ private int _nextConsumerNumber = 1;
+
+ internal const int DEFAULT_PREFETCH = 5000;
+
+ private AMQConnection _connection;
+
+ private bool _transacted;
+
+ private AcknowledgeMode _acknowledgeMode;
+
+ private ushort _channelId;
+
+ private int _defaultPrefetch = DEFAULT_PREFETCH;
+
+ private BlockingQueue _queue = new LinkedBlockingQueue();
+
+ private Dispatcher _dispatcher;
+
+ private MessageFactoryRegistry _messageFactoryRegistry;
+
+ /// <summary>
+ /// Set of all producers created by this session
+ /// </summary>
+ private Hashtable _producers = Hashtable.Synchronized(new Hashtable());
+
+ /// <summary>
+ /// Maps from consumer tag to JMSMessageConsumer instance
+ /// </summary>
+ private Hashtable _consumers = Hashtable.Synchronized(new Hashtable());
+
+ /// <summary>
+ /// The counter of the _next producer id. This id is generated by the session and used only to allow the
+ /// producer to identify itself to the session when deregistering itself.
+ ///
+ /// Access to this id does not require to be synchronized since according to the JMS specification only one
+ /// thread of control is allowed to create producers for any given session instance.
+ /// </summary>
+ private long _nextProducerId;
+
+ /// <summary>
+ /// Responsible for decoding a message fragment and passing it to the appropriate message consumer.
+ /// </summary>
+ private class Dispatcher
+ {
+ private int _stopped = 0;
+
+ private AmqChannel _containingChannel;
+
+ public Dispatcher(AmqChannel containingChannel)
+ {
+ _containingChannel = containingChannel;
+ }
+
+ /// <summary>
+ /// Runs the dispatcher. This is intended to be Run in a separate thread.
+ /// </summary>
+ public void RunDispatcher()
+ {
+ UnprocessedMessage message;
+
+ while (_stopped == 0 && (message = (UnprocessedMessage)_containingChannel._queue.DequeueBlocking()) != null)
+ {
+ //_queue.size()
+ DispatchMessage(message);
+ }
+
+ _logger.Info("Dispatcher thread terminating for channel " + _containingChannel._channelId);
+ }
+
+// private void DispatchMessage(UnprocessedMessage message)
+ private void DispatchMessage(UnprocessedMessage message)
+ {
+ if (message.DeliverBody != null)
+ {
+ BasicMessageConsumer consumer = (BasicMessageConsumer) _containingChannel._consumers[message.DeliverBody.ConsumerTag];
+
+ if (consumer == null)
+ {
+ _logger.Warn("Received a message from queue " + message.DeliverBody.ConsumerTag + " without a handler - ignoring...");
+ }
+ else
+ {
+ consumer.NotifyMessage(message, _containingChannel.AcknowledgeMode, _containingChannel.ChannelId);
+ }
+ }
+ else
+ {
+ try
+ {
+ // Bounced message is processed here, away from the mina thread
+ AbstractQmsMessage bouncedMessage = _containingChannel._messageFactoryRegistry.
+ CreateMessage(0, false, message.ContentHeader, message.Bodies);
+
+ int errorCode = message.BounceBody.ReplyCode;
+ string reason = message.BounceBody.ReplyText;
+ _logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")");
+
+ _containingChannel._connection.ExceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
+ }
+ catch (Exception e)
+ {
+ _logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e);
+ }
+ }
+ }
+
+ public void StopDispatcher()
+ {
+ Interlocked.Exchange(ref _stopped, 1);
+ }
+ }
+
+ internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode, int defaultPrefetch) :
+ this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.NewDefaultRegistry(), defaultPrefetch)
+ {
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="AmqChannel"/> class.
+ /// </summary>
+ /// <param name="con">The con.</param>
+ /// <param name="channelId">The channel id.</param>
+ /// <param name="transacted">if set to <c>true</c> [transacted].</param>
+ /// <param name="acknowledgeMode">The acknowledge mode.</param>
+ /// <param name="messageFactoryRegistry">The message factory registry.</param>
+ internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode,
+ MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetch)
+ {
+ _sessionNumber = Interlocked.Increment(ref _nextSessionNumber);
+ _connection = con;
+ _transacted = transacted;
+ if (transacted)
+ {
+ _acknowledgeMode = AcknowledgeMode.SessionTransacted;
+ }
+ else
+ {
+ _acknowledgeMode = acknowledgeMode;
+ }
+ _channelId = channelId;
+ _messageFactoryRegistry = messageFactoryRegistry;
+ }
+
+ public IBytesMessage CreateBytesMessage()
+ {
+ lock (_connection.FailoverMutex)
+ {
+ CheckNotClosed();
+ try
+ {
+ return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream");
+ }
+ catch (AMQException e)
+ {
+ throw new QpidException("Unable to create message: " + e);
+ }
+ }
+ }
+
+ public IMessage CreateMessage()
+ {
+ lock (_connection.FailoverMutex)
+ {
+ CheckNotClosed();
+ try
+ {
+ // TODO: this is supposed to create a message consisting only of message headers
+ return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream");
+ }
+ catch (AMQException e)
+ {
+ throw new QpidException("Unable to create message: " + e);
+ }
+ }
+ }
+
+ public ITextMessage CreateTextMessage()
+ {
+ lock (_connection.FailoverMutex)
+ {
+ CheckNotClosed();
+
+ try
+ {
+ return (ITextMessage)_messageFactoryRegistry.CreateMessage("text/plain");
+ }
+ catch (AMQException e)
+ {
+ throw new QpidException("Unable to create message: " + e);
+ }
+ }
+ }
+
+ public ITextMessage CreateTextMessage(string text)
+ {
+ lock (_connection.FailoverMutex)
+ {
+ CheckNotClosed();
+ try
+ {
+ ITextMessage msg = (ITextMessage)_messageFactoryRegistry.CreateMessage("text/plain");
+ msg.Text = text;
+ return msg;
+ }
+ catch (AMQException e)
+ {
+ throw new QpidException("Unable to create message: " + e);
+ }
+ }
+ }
+
+ public bool Transacted
+ {
+ get
+ {
+ CheckNotClosed();
+ return _transacted;
+ }
+ }
+
+ public AcknowledgeMode AcknowledgeMode
+ {
+ get
+ {
+ CheckNotClosed();
+ return _acknowledgeMode;
+ }
+ }
+
+ public void Commit()
+ {
+ CheckNotClosed();
+ CheckTransacted(); // throws IllegalOperationException if not a transacted session
+
+ /*Channel.Commit frame = new Channel.Commit();
+ frame.channelId = _channelId;
+ frame.confirmTag = 1;*/
+
+ // try
+ // {
+ // _connection.getProtocolHandler().writeCommandFrameAndWaitForReply(frame, new ChannelReplyListener(_channelId));
+ // }
+ // catch (AMQException e)
+ // {
+ // throw new JMSException("Error creating session: " + e);
+ // }
+ throw new NotImplementedException();
+ //_logger.Info("Transaction commited on channel " + _channelId);
+ }
+
+ public void Rollback()
+ {
+ CheckNotClosed();
+ CheckTransacted(); // throws IllegalOperationException if not a transacted session
+
+ /*Channel.Rollback frame = new Channel.Rollback();
+ frame.channelId = _channelId;
+ frame.confirmTag = 1;*/
+
+ // try
+ // {
+ // _connection.getProtocolHandler().writeCommandFrameAndWaitForReply(frame, new ChannelReplyListener(_channelId));
+ // }
+ // catch (AMQException e)
+ // {
+ // throw new JMSException("Error rolling back session: " + e);
+ // }
+ throw new NotImplementedException();
+ //_logger.Info("Transaction rolled back on channel " + _channelId);
+ }
+
+ public override void Close()
+ {
+ lock (_connection.FailoverMutex)
+ {
+ // We must close down all producers and consumers in an orderly fashion. This is the only method
+ // that can be called from a different thread of control from the one controlling the session
+
+ lock (_closingLock)
+ {
+ SetClosed();
+
+ // we pass null since this is not an error case
+ CloseProducersAndConsumers(null);
+
+ try
+ {
+ _connection.CloseSession(this);
+ }
+ catch (AMQException e)
+ {
+ throw new QpidException("Error closing session: " + e);
+ }
+ finally
+ {
+ _connection.DeregisterSession(_channelId);
+ }
+ }
+ }
+ }
+
+ private void SetClosed()
+ {
+ Interlocked.Exchange(ref _closed, CLOSED);
+ }
+
+ /// <summary>
+ /// Close all producers or consumers. This is called either in the error case or when closing the session normally.
+ /// <param name="amqe">the exception, may be null to indicate no error has occurred</param>
+ ///
+ private void CloseProducersAndConsumers(AMQException amqe)
+ {
+ try
+ {
+ CloseProducers();
+ }
+ catch (QpidException e)
+ {
+ _logger.Error("Error closing session: " + e, e);
+ }
+ try
+ {
+ CloseConsumers(amqe);
+ }
+ catch (QpidException e)
+ {
+ _logger.Error("Error closing session: " + e, e);
+ }
+ }
+
+ /**
+ * Called when the server initiates the closure of the session
+ * unilaterally.
+ * @param e the exception that caused this session to be closed. Null causes the
+ */
+ public void Closed(Exception e)
+ {
+ lock (_connection.FailoverMutex)
+ {
+ // An AMQException has an error code and message already and will be passed in when closure occurs as a
+ // result of a channel close request
+ SetClosed();
+ AMQException amqe;
+ if (e is AMQException)
+ {
+ amqe = (AMQException) e;
+ }
+ else
+ {
+ amqe = new AMQException("Closing session forcibly", e);
+ }
+ _connection.DeregisterSession(_channelId);
+ CloseProducersAndConsumers(amqe);
+ }
+ }
+
+ /// <summary>
+ /// Called to close message producers cleanly. This may or may <b>not</b> be as a result of an error. There is
+ /// currently no way of propagating errors to message producers (this is a JMS limitation).
+ /// </summary>
+ private void CloseProducers()
+ {
+ _logger.Info("Closing producers on session " + this);
+ // we need to clone the list of producers since the close() method updates the _producers collection
+ // which would result in a concurrent modification exception
+ ArrayList clonedProducers = new ArrayList(_producers.Values);
+
+ foreach (BasicMessageProducer prod in clonedProducers)
+ {
+ _logger.Info("Closing producer " + prod);
+ prod.Close();
+ }
+ // at this point the _producers map is empty
+ }
+
+ /// <summary>
+ /// Called to close message consumers cleanly. This may or may <b>not</b> be as a result of an error.
+ /// <param name="error">not null if this is a result of an error occurring at the connection level</param>
+ ///
+ private void CloseConsumers(Exception error)
+ {
+ if (_dispatcher != null)
+ {
+ _dispatcher.StopDispatcher();
+ }
+ // we need to clone the list of consumers since the close() method updates the _consumers collection
+ // which would result in a concurrent modification exception
+ ArrayList clonedConsumers = new ArrayList(_consumers.Values);
+
+ foreach (BasicMessageConsumer con in clonedConsumers)
+ {
+ if (error != null)
+ {
+ con.NotifyError(error);
+ }
+ else
+ {
+ con.Close();
+ }
+ }
+ // at this point the _consumers map will be empty
+ }
+
+ public void Recover()
+ {
+ CheckNotClosed();
+ CheckNotTransacted(); // throws IllegalOperationException if not a transacted session
+
+ // TODO: This cannot be implemented using 0.8 semantics
+ throw new NotImplementedException();
+ }
+
+ public void Run()
+ {
+ throw new NotImplementedException();
+ }
+
+// public IMessagePublisher CreatePublisher(string exchangeName, string exchangeClass)
+// {
+// return CreatePublisher(exchangeClass, exchangeName, null);
+// }
+
+// public IMessagePublisher CreatePublisher(string exchangeName, string exchangeClass, string routingKey)
+// {
+// return CreatePublisherBuilder().withExchangeName(exchangeName)
+// .withRoutingKey(routingKey).Create();
+// }
+
+ public IMessagePublisher CreatePublisher(string exchangeName, string routingKey, DeliveryMode deliveryMode,
+ long timeToLive, bool immediate, bool mandatory, int priority)
+ {
+ _logger.Debug(string.Format("Using new CreatePublisher exchangeName={0}, exchangeClass={1} routingKey={2}",
+ exchangeName, "none", routingKey));
+ return CreateProducerImpl(exchangeName, routingKey, deliveryMode,
+ timeToLive, immediate, mandatory, priority);
+ }
+
+ // TODO: Create a producer that doesn't require an IDestination.
+// private IMessagePublisher CreateProducerImpl(IDestination destination)
+// {
+// lock (_closingLock)
+// {
+// CheckNotClosed();
+//
+// AMQDestination amqd = (AMQDestination)destination;
+//
+// try
+// {
+// return new BasicMessageProducer(amqd, _transacted, _channelId,
+// this, GetNextProducerId());
+// }
+// catch (AMQException e)
+// {
+// _logger.Error("Error creating message producer: " + e, e);
+// throw new QpidException("Error creating message producer", e);
+// }
+// }
+// }
+
+ public IMessagePublisher CreateProducerImpl(string exchangeName, string routingKey,
+ DeliveryMode deliveryMode,
+ long timeToLive, bool immediate, bool mandatory, int priority)
+ {
+ lock (_closingLock)
+ {
+ CheckNotClosed();
+
+ try
+ {
+ return new BasicMessageProducer(exchangeName, routingKey, _transacted, _channelId,
+ this, GetNextProducerId(),
+ deliveryMode, timeToLive, immediate, mandatory, priority);
+ }
+ catch (AMQException e)
+ {
+ _logger.Error("Error creating message producer: " + e, e);
+ throw new QpidException("Error creating message producer", e);
+ }
+ }
+ }
+
+ public IMessageConsumer CreateConsumer(string queueName,
+ int prefetch,
+ bool noLocal,
+ bool exclusive,
+ bool durable,
+ string subscriptionName)
+ {
+ _logger.Debug(String.Format("CreateConsumer queueName={0} prefetch={1} noLocal={2} exclusive={3} durable={4} subscriptionName={5}",
+ queueName, prefetch, noLocal, exclusive, durable, subscriptionName));
+ return CreateConsumerImpl(queueName, prefetch, noLocal, exclusive, durable, subscriptionName);
+ }
+
+ private IMessageConsumer CreateConsumerImpl(string queueName,
+ int prefetch,
+ bool noLocal,
+ bool exclusive,
+ bool durable,
+ string subscriptionName)
+ {
+
+ if (durable || subscriptionName != null)
+ {
+ throw new NotImplementedException(); // TODO: durable subscriptions.
+ }
+
+ lock (_closingLock)
+ {
+ CheckNotClosed();
+
+ BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, queueName, noLocal,
+ _messageFactoryRegistry, this);
+ try
+ {
+ RegisterConsumer(consumer);
+ }
+ catch (AMQException e)
+ {
+ throw new QpidException("Error registering consumer: " + e, e);
+ }
+
+ return consumer;
+ }
+ }
+
+// public IDestination CreateQueue(string queueName)
+// {
+// return new AMQQueue(queueName);
+// }
+//
+// public IDestination CreateTopic(String topicName)
+// {
+// return new AMQTopic(topicName);
+// }
+
+ public IFieldTable CreateFieldTable()
+ {
+ return new FieldTable();
+ }
+
+// public IDestination CreateTemporaryQueue()
+// {
+// return new AMQQueue("TempQueue" + DateTime.Now.Ticks.ToString(), true);
+//
+//// return new AMQTemporaryQueue(); // XXX: port AMQTemporaryQueue and AMQQueue changes.
+// }
+
+// public IDestination CreateTemporaryTopic()
+// {
+// throw new NotImplementedException(); // FIXME
+// }
+
+ public void Unsubscribe(String name)
+ {
+ throw new NotImplementedException(); // FIXME
+ }
+
+ private void CheckTransacted()
+ {
+ if (!Transacted)
+ {
+ throw new InvalidOperationException("Channel is not transacted");
+ }
+ }
+
+ private void CheckNotTransacted()
+ {
+ if (Transacted)
+ {
+ throw new InvalidOperationException("Channel is transacted");
+ }
+ }
+
+ public void MessageReceived(UnprocessedMessage message)
+ {
+ if (_logger.IsDebugEnabled)
+ {
+ _logger.Debug("Message received in session with channel id " + _channelId);
+ }
+ _queue.EnqueueBlocking(message);
+ }
+
+ public int DefaultPrefetch
+ {
+ get
+ {
+ return _defaultPrefetch;
+ }
+ set
+ {
+ _defaultPrefetch = value;
+ }
+ }
+
+ public ushort ChannelId
+ {
+ get
+ {
+ return _channelId;
+ }
+ }
+
+ public AMQConnection Connection
+ {
+ get
+ {
+ return _connection;
+ }
+ }
+
+ /// <summary>
+ /// Send an acknowledgement for all messages up to a specified number on this session.
+ /// <param name="messageNbr">the message number up to an including which all messages will be acknowledged.</param>
+ /// </summary>
+ public void SendAcknowledgement(ulong messageNbr)
+ {
+ /*if (_logger.IsDebugEnabled)
+ {
+ _logger.Debug("Channel Ack being sent for channel id " + _channelId + " and message number " + messageNbr);
+ }*/
+ /*Channel.Ack frame = new Channel.Ack();
+ frame.channelId = _channelId;
+ frame.messageNbr = messageNbr;
+ _connection.getProtocolHandler().writeFrame(frame);*/
+ }
+
+ internal void Start()
+ {
+ _dispatcher = new Dispatcher(this);
+ Thread dispatcherThread = new Thread(new ThreadStart(_dispatcher.RunDispatcher));
+ dispatcherThread.IsBackground = true;
+ dispatcherThread.Start();
+ }
+
+ internal void RegisterConsumer(string consumerTag, IMessageConsumer consumer)
+ {
+ _consumers[consumerTag] = consumer;
+ }
+
+ /// <summary>
+ /// Called by the MessageConsumer when closing, to deregister the consumer from the
+ /// map from consumerTag to consumer instance.
+ /// </summary>
+ /// <param name="consumerTag">the consumer tag, that was broker-generated</param>
+ internal void DeregisterConsumer(string consumerTag)
+ {
+ _consumers.Remove(consumerTag);
+ }
+
+ internal void RegisterProducer(long producerId, IMessagePublisher publisher)
+ {
+ _producers[producerId] = publisher;
+ }
+
+ internal void DeregisterProducer(long producerId)
+ {
+ _producers.Remove(producerId);
+ }
+
+ private long GetNextProducerId()
+ {
+ return ++_nextProducerId;
+ }
+
+ public void Dispose()
+ {
+ Close();
+ }
+
+ /**
+ * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after
+ * failover when the client has veoted resubscription.
+ *
+ * The caller of this method must already hold the failover mutex.
+ */
+ internal void MarkClosed()
+ {
+ SetClosed();
+ _connection.DeregisterSession(_channelId);
+ MarkClosedProducersAndConsumers();
+ }
+
+ private void MarkClosedProducersAndConsumers()
+ {
+ try
+ {
+ // no need for a markClosed* method in this case since there is no protocol traffic closing a producer
+ CloseProducers();
+ }
+ catch (QpidException e)
+ {
+ _logger.Error("Error closing session: " + e, e);
+ }
+ try
+ {
+ MarkClosedConsumers();
+ }
+ catch (QpidException e)
+ {
+ _logger.Error("Error closing session: " + e, e);
+ }
+ }
+
+ private void MarkClosedConsumers()
+ {
+ if (_dispatcher != null)
+ {
+ _dispatcher.StopDispatcher();
+ }
+ // we need to clone the list of consumers since the close() method updates the _consumers collection
+ // which would result in a concurrent modification exception
+ ArrayList clonedConsumers = new ArrayList(_consumers.Values);
+
+ foreach (BasicMessageConsumer consumer in clonedConsumers)
+ {
+ consumer.MarkClosed();
+ }
+ // at this point the _consumers map will be empty
+ }
+
+ /**
+ * Resubscribes all producers and consumers. This is called when performing failover.
+ * @throws AMQException
+ */
+ internal void Resubscribe()
+ {
+ ResubscribeProducers();
+ ResubscribeConsumers();
+ }
+
+ private void ResubscribeProducers()
+ {
+ // FIXME: This needs to Replay DeclareExchange method calls.
+
+// ArrayList producers = new ArrayList(_producers.Values);
+// _logger.Debug(String.Format("Resubscribing producers = {0} producers.size={1}", producers, producers.Count));
+// foreach (BasicMessageProducer producer in producers)
+// {
+// producer.Resubscribe();
+// }
+ }
+
+ private void ResubscribeConsumers()
+ {
+ ArrayList consumers = new ArrayList(_consumers.Values);
+ _consumers.Clear();
+
+ foreach (BasicMessageConsumer consumer in consumers)
+ {
+ RegisterConsumer(consumer);
+ }
+ }
+
+ /// <summary>
+ /// Callers must hold the failover mutex before calling this method.
+ /// </summary>
+ /// <param name="consumer"></param>
+ void RegisterConsumer(BasicMessageConsumer consumer)
+ {
+ String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.Prefetch, consumer.NoLocal,
+ consumer.Exclusive, consumer.AcknowledgeMode);
+
+ consumer.ConsumerTag = consumerTag;
+ _consumers.Add(consumerTag, consumer);
+ }
+
+ public void Bind(string queueName, string exchangeName, string routingKey, IFieldTable args)
+ {
+ DoBind(queueName, exchangeName, routingKey, (FieldTable)args);
+ }
+
+ public void Bind(string queueName, string exchangeName, string routingKey)
+ {
+ DoBind(queueName, exchangeName, routingKey, new FieldTable());
+ }
+
+ internal void DoBind(string queueName, string exchangeName, string routingKey, FieldTable args)
+ {
+ _logger.Debug(string.Format("QueueBind queueName={0} exchangeName={1} routingKey={2}, arg={3}",
+ queueName, exchangeName, routingKey, args));
+
+ AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0,
+ queueName, exchangeName,
+ routingKey, true, args);
+ _connection.ProtocolWriter.Write(queueBind);
+ }
+
+
+// /**
+// * Declare the queue.
+// * @param amqd
+// * @param protocolHandler
+// * @return the queue name. This is useful where the broker is generating a queue name on behalf of the client.
+// * @throws AMQException
+// */
+// private String DeclareQueue(AMQDestination amqd)
+// {
+// // For queues (but not topics) we generate the name in the client rather than the
+// // server. This allows the name to be reused on failover if required. In general,
+// // the destination indicates whether it wants a name generated or not.
+// if (amqd.IsNameRequired)
+// {
+// amqd.QueueName = GenerateUniqueName();
+// }
+//
+// return DoDeclareQueue(amqd);
+// }
+
+ private String ConsumeFromQueue(String queueName, int prefetch,
+ bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode)
+ {
+ // Need to generate a consumer tag on the client so we can exploit the nowait flag.
+ String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++);
+
+ AMQFrame basicConsume = BasicConsumeBody.CreateAMQFrame(_channelId, 0,
+ queueName, tag, noLocal,
+ acknowledgeMode == AcknowledgeMode.NoAcknowledge,
+ exclusive, true);
+
+ _connection.ProtocolWriter.Write(basicConsume);
+ return tag;
+ }
+
+ public void DeleteExchange(string exchangeName)
+ {
+ throw new NotImplementedException(); // FIXME
+ }
+
+ public void DeleteQueue()
+ {
+ throw new NotImplementedException(); // FIXME
+ }
+
+ public MessageConsumerBuilder CreateConsumerBuilder(string queueName)
+ {
+ return new MessageConsumerBuilder(this, queueName);
+ }
+
+ public MessagePublisherBuilder CreatePublisherBuilder()
+ {
+ return new MessagePublisherBuilder(this);
+ }
+
+// public void Publish(string exchangeName, string routingKey, bool mandatory, bool immediate,
+// IMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive,
+// bool disableTimestamps)
+// {
+// lock (Connection.FailoverMutex)
+// {
+// DoBasicPublish(exchangeName, routingKey, mandatory, immediate, (AbstractQmsMessage)message, deliveryMode, timeToLive, priority, disableTimestamps);
+// }
+// }
+
+ internal void BasicPublish(string exchangeName, string routingKey, bool mandatory, bool immediate,
+ AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive,
+ bool disableTimestamps)
+ {
+ lock (Connection.FailoverMutex)
+ {
+ DoBasicPublish(exchangeName, routingKey, mandatory, immediate, message, deliveryMode, timeToLive, priority, disableTimestamps);
+ }
+ }
+
+ private void DoBasicPublish(string exchangeName, string routingKey, bool mandatory, bool immediate, AbstractQmsMessage message, DeliveryMode deliveryMode, uint timeToLive, int priority, bool disableTimestamps)
+ {
+ AMQFrame publishFrame = BasicPublishBody.CreateAMQFrame(_channelId, 0, exchangeName,
+ routingKey, mandatory, immediate);
+
+ long currentTime = 0;
+ if (!disableTimestamps)
+ {
+ currentTime = DateTime.UtcNow.Ticks;
+ message.Timestamp = currentTime;
+ }
+ byte[] payload = message.Data;
+ BasicContentHeaderProperties contentHeaderProperties = message.ContentHeaderProperties;
+
+ if (timeToLive > 0)
+ {
+ if (!disableTimestamps)
+ {
+ contentHeaderProperties.Expiration = (uint)currentTime + timeToLive;
+ }
+ }
+ else
+ {
+ contentHeaderProperties.Expiration = 0;
+ }
+ contentHeaderProperties.SetDeliveryMode(deliveryMode);
+ contentHeaderProperties.Priority = (byte)priority;
+
+ ContentBody[] contentBodies = CreateContentBodies(payload);
+ AMQFrame[] frames = new AMQFrame[2 + contentBodies.Length];
+ for (int i = 0; i < contentBodies.Length; i++)
+ {
+ frames[2 + i] = ContentBody.CreateAMQFrame(_channelId, contentBodies[i]);
+ }
+ if (contentBodies.Length > 0 && _logger.IsDebugEnabled)
+ {
+ _logger.Debug(string.Format("Sending content body frames to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey));
+ }
+
+ // weight argument of zero indicates no child content headers, just bodies
+ AMQFrame contentHeaderFrame = ContentHeaderBody.CreateAMQFrame(_channelId, BASIC_CONTENT_TYPE, 0, contentHeaderProperties,
+ (uint)payload.Length);
+ if (_logger.IsDebugEnabled)
+ {
+ _logger.Debug(string.Format("Sending content header frame to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey));
+ }
+
+ frames[0] = publishFrame;
+ frames[1] = contentHeaderFrame;
+ CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
+ Connection.ConvenientProtocolWriter.WriteFrame(compositeFrame);
+ }
+
+ /// <summary>
+ /// Create content bodies. This will split a large message into numerous bodies depending on the negotiated
+ /// maximum frame size.
+ /// </summary>
+ /// <param name="payload"></param>
+ /// <returns>return the array of content bodies</returns>
+ private ContentBody[] CreateContentBodies(byte[] payload)
+ {
+ if (payload == null)
+ {
+ return null;
+ }
+ else if (payload.Length == 0)
+ {
+ return new ContentBody[0];
+ }
+ // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
+ // (0xCE byte).
+ long framePayloadMax = Connection.MaximumFrameSize - 1;
+ int lastFrame = (payload.Length % framePayloadMax) > 0 ? 1 : 0;
+ int frameCount = (int)(payload.Length / framePayloadMax) + lastFrame;
+ ContentBody[] bodies = new ContentBody[frameCount];
+
+ if (frameCount == 1)
+ {
+ bodies[0] = new ContentBody();
+ bodies[0].Payload = payload;
+ }
+ else
+ {
+ long remaining = payload.Length;
+ for (int i = 0; i < bodies.Length; i++)
+ {
+ bodies[i] = new ContentBody();
+ byte[] framePayload = new byte[(remaining >= framePayloadMax) ? (int)framePayloadMax : (int)remaining];
+ Array.Copy(payload, (int)framePayloadMax * i, framePayload, 0, framePayload.Length);
+ bodies[i].Payload = framePayload;
+ remaining -= framePayload.Length;
+ }
+ }
+ return bodies;
+ }
+
+ public string GenerateUniqueName()
+ {
+ string result = _connection.ProtocolSession.GenerateQueueName();
+ return Regex.Replace(result, "[^a-z0-9_]", "_");
+ }
+
+ public void DeclareQueue(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete)
+ {
+ DoQueueDeclare(queueName, isDurable, isExclusive, isAutoDelete);
+ }
+
+ private string DoDeclareQueue(AMQDestination amqd)
+ {
+ string queueName = amqd.QueueName;
+ bool isDurable = amqd.IsDurable;
+ bool isExclusive = amqd.IsExclusive;
+
+ DoQueueDeclare(queueName, isDurable, isExclusive, amqd.AutoDelete);
+
+ _logger.Debug("returning amqp.QueueName = " + amqd.QueueName);
+ return amqd.QueueName;
+ }
+
+ private void DoQueueDeclare(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete)
+ {
+ _logger.Debug(string.Format("DeclareQueue name={0} durable={1} exclusive={2}, auto-delete={3}",
+ queueName, isDurable, isExclusive, isAutoDelete));
+
+ AMQFrame queueDeclare = QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName,
+ false, isDurable, isExclusive,
+ isAutoDelete, true, null);
+
+ _connection.ProtocolWriter.Write(queueDeclare);
+ }
+
+ public void DeclareExchange(String exchangeName, String exchangeClass)
+ {
+ _logger.Debug(string.Format("DeclareExchange vame={0} exchangeClass={1}", exchangeName, exchangeClass));
+
+ DeclareExchange(_channelId, 0, exchangeName, exchangeClass, false, false, false, false, true, null);
+ }
+
+ // AMQP-level method.
+ private void DeclareExchange(ushort channelId, ushort ticket, string exchangeName,
+ string exchangeClass, bool passive, bool durable,
+ bool autoDelete, bool xinternal, bool noWait, FieldTable args)
+ {
+ _logger.Debug(String.Format("DeclareExchange channelId={0} exchangeName={1} exchangeClass={2}",
+ _channelId, exchangeName, exchangeClass));
+
+ AMQFrame exchangeDeclareFrame = ExchangeDeclareBody.CreateAMQFrame(
+ channelId, ticket, exchangeName, exchangeClass, passive, durable, autoDelete, xinternal, noWait, args);
+
+// Console.WriteLine(string.Format("XXX AMQP:DeclareExchange frame=[{0}]", exchangeDeclareFrame));
+
+ // FIXME: Probably need to record the exchangeDeclareBody for later replay.
+ ExchangeDeclareBody exchangeDeclareBody = (ExchangeDeclareBody)exchangeDeclareFrame.BodyFrame;
+// Console.WriteLine(string.Format("XXX AMQP:DeclareExchangeBody=[{0}]", exchangeDeclareBody));
+ if (exchangeDeclareBody.Nowait)
+ {
+ _connection.ProtocolWriter.Write(exchangeDeclareFrame);
+ }
+ else
+ {
+ _connection.ConvenientProtocolWriter.SyncWrite(exchangeDeclareFrame, typeof (ExchangeDeclareOkBody));
+ }
+ }
+ }
+}