diff options
| author | Steven Shaw <steshaw@apache.org> | 2006-11-28 23:37:52 +0000 |
|---|---|---|
| committer | Steven Shaw <steshaw@apache.org> | 2006-11-28 23:37:52 +0000 |
| commit | fabf031b77cbf1784a25039fc79bcb21b820d4b8 (patch) | |
| tree | cb2aed1968622147ffef540857205e5a24d5e604 /dotnet/Qpid.Client/Client/AmqChannel.cs | |
| parent | 383a2c3ba7afb9f3c49b6980b3b64439e9e8e6ae (diff) | |
| download | qpid-python-fabf031b77cbf1784a25039fc79bcb21b820d4b8.tar.gz | |
QPID-135 Ported enough transaction support to run FailoverTxTest. Still has same problem as the Java client in that on fail-over the "transaction" continues but the earlier part of the transaction is forgotten.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@480283 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'dotnet/Qpid.Client/Client/AmqChannel.cs')
| -rw-r--r-- | dotnet/Qpid.Client/Client/AmqChannel.cs | 50 |
1 files changed, 37 insertions, 13 deletions
diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs index 6b1ee204b5..b7c8b1857e 100644 --- a/dotnet/Qpid.Client/Client/AmqChannel.cs +++ b/dotnet/Qpid.Client/Client/AmqChannel.cs @@ -276,20 +276,23 @@ namespace Qpid.Client CheckNotClosed(); CheckTransacted(); // throws IllegalOperationException if not a transacted session - /*Channel.Commit frame = new Channel.Commit(); - frame.channelId = _channelId; - frame.confirmTag = 1;*/ + try + { + // Acknowledge up to message last delivered (if any) for each consumer. + // Need to send ack for messages delivered to consumers so far. + foreach (BasicMessageConsumer consumer in _consumers.Values) + { + // Sends acknowledgement to server. + consumer.AcknowledgeLastDelivered(); + } - // try - // { - // _connection.getProtocolHandler().writeCommandFrameAndWaitForReply(frame, new ChannelReplyListener(_channelId)); - // } - // catch (AMQException e) - // { - // throw new JMSException("Error creating session: " + e); - // } - throw new NotImplementedException(); - //_logger.Info("Transaction commited on channel " + _channelId); + // Commits outstanding messages sent and outstanding acknowledgements. + _connection.ConvenientProtocolWriter.SyncWrite(TxCommitBody.CreateAMQFrame(_channelId), typeof(TxCommitOkBody)); + } + catch (AMQException e) + { + throw new QpidException("Failed to commit", e); + } } public void Rollback() @@ -978,5 +981,26 @@ namespace Qpid.Client // _connection.ConvenientProtocolWriter.SyncWrite(declareExchange, typeof (ExchangeDeclareOkBody)); } } + + /** + * Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from + * a BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is + * AUTO_ACK or similar. + * + * @param deliveryTag the tag of the last message to be acknowledged + * @param multiple if true will acknowledge all messages up to and including the one specified by the + * delivery tag + */ + public void AcknowledgeMessage(long deliveryTag, bool multiple) + { + // XXX: cast to ulong evil? + AMQFrame ackFrame = BasicAckBody.CreateAMQFrame(_channelId, (ulong)deliveryTag, multiple); + if (_logger.IsDebugEnabled) + { + _logger.Debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId); + } + // FIXME: lock FailoverMutex here? + _connection.ProtocolWriter.Write(ackFrame); + } } } |
