diff options
author | Rafael H. Schloming <rhs@apache.org> | 2010-03-11 00:03:25 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2010-03-11 00:03:25 +0000 |
commit | 88086e0099c0fb67ac3a01c5f8793c0634b946a0 (patch) | |
tree | 7fab04466df2bb9e33e9e83ccc3286a420f0ee0d /python/qpid/messaging/driver.py | |
parent | 195193dab20a2e7481e470ddc8226cff9102e1fb (diff) | |
download | qpid-python-88086e0099c0fb67ac3a01c5f8793c0634b946a0.tar.gz |
added support for reject/release
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@921638 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging/driver.py')
-rw-r--r-- | python/qpid/messaging/driver.py | 70 |
1 files changed, 53 insertions, 17 deletions
diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py index d0f5b746f3..383845f214 100644 --- a/python/qpid/messaging/driver.py +++ b/python/qpid/messaging/driver.py @@ -18,7 +18,7 @@ # import socket, struct, sys, time -from logging import getLogger +from logging import getLogger, DEBUG from qpid import compat from qpid import sasl from qpid.concurrency import synchronized @@ -27,9 +27,9 @@ from qpid.exceptions import Timeout, VersionError from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \ FrameDecoder, SegmentDecoder, OpDecoder from qpid.messaging import address -from qpid.messaging.constants import UNLIMITED +from qpid.messaging.constants import UNLIMITED, REJECTED, RELEASED from qpid.messaging.exceptions import ConnectError -from qpid.messaging.message import get_codec, Message +from qpid.messaging.message import get_codec, Disposition, Message from qpid.ops import * from qpid.selector import Selector from qpid.util import connect @@ -435,6 +435,8 @@ class Driver: self._host = (self._host + 1) % len(self._hosts) self.close_engine(e) +DEFAULT_DISPOSITION = Disposition(None) + class Engine: def __init__(self, connection): @@ -915,19 +917,49 @@ class Engine: if ssn.acked: messages = [m for m in ssn.acked if m not in sst.acked] if messages: - # XXX: we're ignoring acks that get lost when disconnected, - # could we deal this via some message-id based purge? - ids = RangedSet(*[m._transfer_id for m in messages if m._transfer_id is not None]) + ids = RangedSet() + + disposed = [(DEFAULT_DISPOSITION, [])] + for m in messages: + # XXX: we're ignoring acks that get lost when disconnected, + # could we deal this via some message-id based purge? + if m._transfer_id is None: + continue + ids.add(m._transfer_id) + disp = m._disposition or DEFAULT_DISPOSITION + last, msgs = disposed[-1] + if disp.type is last.type and disp.options == last.options: + msgs.append(m) + else: + disposed.append((disp, [m])) + for range in ids: sst.executed.add_range(range) sst.write_op(SessionCompleted(sst.executed)) - def ack_ack(): - for m in messages: - ssn.acked.remove(m) - if not ssn.transactional: - sst.acked.remove(m) - sst.write_cmd(MessageAccept(ids), ack_ack) - log.debug("SACK[%s]: %s", ssn.log_id, m) + + def ack_acker(msgs): + def ack_ack(): + for m in msgs: + ssn.acked.remove(m) + if not ssn.transactional: + sst.acked.remove(m) + return ack_ack + + for disp, msgs in disposed: + if not msgs: continue + if disp.type is None: + op = MessageAccept + elif disp.type is RELEASED: + op = MessageRelease + elif disp.type is REJECTED: + op = MessageReject + sst.write_cmd(op(RangedSet(*[m._transfer_id for m in msgs]), + **disp.options), + ack_acker(msgs)) + if log.isEnabledFor(DEBUG): + for m in msgs: + log.debug("SACK[%s]: %s, %s", ssn.log_id, m, m._disposition) + sst.acked.extend(messages) if ssn.committing and not sst.committing: @@ -948,7 +980,7 @@ class Engine: for range in ids: sst.executed.add_range(range) sst.write_op(SessionCompleted(sst.executed)) - sst.write_cmd(MessageRelease(ids)) + sst.write_cmd(MessageRelease(ids, True)) sst.write_cmd(TxRollback(), do_rb_ok) def do_rb_ok(): @@ -1055,8 +1087,11 @@ class Engine: if mp.application_headers is None: mp.application_headers = {} mp.application_headers[TO] = msg.to - if msg.durable: - dp.delivery_mode = delivery_mode.persistent + if msg.durable is not None: + if msg.durable: + dp.delivery_mode = delivery_mode.persistent + else: + dp.delivery_mode = delivery_mode.non_persistent if msg.priority is not None: dp.priority = msg.priority if msg.ttl is not None: @@ -1109,7 +1144,8 @@ class Engine: if mp.reply_to is not None: msg.reply_to = reply_to2addr(mp.reply_to) msg.correlation_id = mp.correlation_id - msg.durable = dp.delivery_mode == delivery_mode.persistent + if dp.delivery_mode is not None: + msg.durable = dp.delivery_mode == delivery_mode.persistent msg.priority = dp.priority msg.ttl = dp.ttl msg.redelivered = dp.redelivered |