summaryrefslogtreecommitdiff
path: root/python/qpid/messaging/driver.py
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-03-11 00:03:25 +0000
committerRafael H. Schloming <rhs@apache.org>2010-03-11 00:03:25 +0000
commit88086e0099c0fb67ac3a01c5f8793c0634b946a0 (patch)
tree7fab04466df2bb9e33e9e83ccc3286a420f0ee0d /python/qpid/messaging/driver.py
parent195193dab20a2e7481e470ddc8226cff9102e1fb (diff)
downloadqpid-python-88086e0099c0fb67ac3a01c5f8793c0634b946a0.tar.gz
added support for reject/release
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@921638 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging/driver.py')
-rw-r--r--python/qpid/messaging/driver.py70
1 files changed, 53 insertions, 17 deletions
diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py
index d0f5b746f3..383845f214 100644
--- a/python/qpid/messaging/driver.py
+++ b/python/qpid/messaging/driver.py
@@ -18,7 +18,7 @@
#
import socket, struct, sys, time
-from logging import getLogger
+from logging import getLogger, DEBUG
from qpid import compat
from qpid import sasl
from qpid.concurrency import synchronized
@@ -27,9 +27,9 @@ from qpid.exceptions import Timeout, VersionError
from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \
FrameDecoder, SegmentDecoder, OpDecoder
from qpid.messaging import address
-from qpid.messaging.constants import UNLIMITED
+from qpid.messaging.constants import UNLIMITED, REJECTED, RELEASED
from qpid.messaging.exceptions import ConnectError
-from qpid.messaging.message import get_codec, Message
+from qpid.messaging.message import get_codec, Disposition, Message
from qpid.ops import *
from qpid.selector import Selector
from qpid.util import connect
@@ -435,6 +435,8 @@ class Driver:
self._host = (self._host + 1) % len(self._hosts)
self.close_engine(e)
+DEFAULT_DISPOSITION = Disposition(None)
+
class Engine:
def __init__(self, connection):
@@ -915,19 +917,49 @@ class Engine:
if ssn.acked:
messages = [m for m in ssn.acked if m not in sst.acked]
if messages:
- # XXX: we're ignoring acks that get lost when disconnected,
- # could we deal this via some message-id based purge?
- ids = RangedSet(*[m._transfer_id for m in messages if m._transfer_id is not None])
+ ids = RangedSet()
+
+ disposed = [(DEFAULT_DISPOSITION, [])]
+ for m in messages:
+ # XXX: we're ignoring acks that get lost when disconnected,
+ # could we deal this via some message-id based purge?
+ if m._transfer_id is None:
+ continue
+ ids.add(m._transfer_id)
+ disp = m._disposition or DEFAULT_DISPOSITION
+ last, msgs = disposed[-1]
+ if disp.type is last.type and disp.options == last.options:
+ msgs.append(m)
+ else:
+ disposed.append((disp, [m]))
+
for range in ids:
sst.executed.add_range(range)
sst.write_op(SessionCompleted(sst.executed))
- def ack_ack():
- for m in messages:
- ssn.acked.remove(m)
- if not ssn.transactional:
- sst.acked.remove(m)
- sst.write_cmd(MessageAccept(ids), ack_ack)
- log.debug("SACK[%s]: %s", ssn.log_id, m)
+
+ def ack_acker(msgs):
+ def ack_ack():
+ for m in msgs:
+ ssn.acked.remove(m)
+ if not ssn.transactional:
+ sst.acked.remove(m)
+ return ack_ack
+
+ for disp, msgs in disposed:
+ if not msgs: continue
+ if disp.type is None:
+ op = MessageAccept
+ elif disp.type is RELEASED:
+ op = MessageRelease
+ elif disp.type is REJECTED:
+ op = MessageReject
+ sst.write_cmd(op(RangedSet(*[m._transfer_id for m in msgs]),
+ **disp.options),
+ ack_acker(msgs))
+ if log.isEnabledFor(DEBUG):
+ for m in msgs:
+ log.debug("SACK[%s]: %s, %s", ssn.log_id, m, m._disposition)
+
sst.acked.extend(messages)
if ssn.committing and not sst.committing:
@@ -948,7 +980,7 @@ class Engine:
for range in ids:
sst.executed.add_range(range)
sst.write_op(SessionCompleted(sst.executed))
- sst.write_cmd(MessageRelease(ids))
+ sst.write_cmd(MessageRelease(ids, True))
sst.write_cmd(TxRollback(), do_rb_ok)
def do_rb_ok():
@@ -1055,8 +1087,11 @@ class Engine:
if mp.application_headers is None:
mp.application_headers = {}
mp.application_headers[TO] = msg.to
- if msg.durable:
- dp.delivery_mode = delivery_mode.persistent
+ if msg.durable is not None:
+ if msg.durable:
+ dp.delivery_mode = delivery_mode.persistent
+ else:
+ dp.delivery_mode = delivery_mode.non_persistent
if msg.priority is not None:
dp.priority = msg.priority
if msg.ttl is not None:
@@ -1109,7 +1144,8 @@ class Engine:
if mp.reply_to is not None:
msg.reply_to = reply_to2addr(mp.reply_to)
msg.correlation_id = mp.correlation_id
- msg.durable = dp.delivery_mode == delivery_mode.persistent
+ if dp.delivery_mode is not None:
+ msg.durable = dp.delivery_mode == delivery_mode.persistent
msg.priority = dp.priority
msg.ttl = dp.ttl
msg.redelivered = dp.redelivered