From 9bb5ada379722f35f12bce387cc71af1172bc2c5 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Fri, 11 May 2012 12:46:43 +0000 Subject: QPID-3993 : NPE on continuation transfers with no deliveryId git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1337130 13f79535-47bb-0310-9956-ffa450edef68 --- .../amqp_1_0/transport/ReceivingLinkEndpoint.java | 14 +++++++++++++ .../qpid/amqp_1_0/transport/SessionEndpoint.java | 24 ++++++++++++++++++---- 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java index bd9244c858..cf86fc2471 100644 --- a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java +++ b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java @@ -32,6 +32,8 @@ public class ReceivingLinkEndpoint extends LinkEndpoint { + private UnsignedInteger _lastDeliveryId; + private static class TransientState { @@ -435,4 +437,16 @@ public class ReceivingLinkEndpoint extends LinkEndpoint { return _drainLimit; } + + UnsignedInteger getLastDeliveryId() + { + return _lastDeliveryId; + } + + void setLastDeliveryId(UnsignedInteger lastDeliveryId) + { + _lastDeliveryId = lastDeliveryId; + } + + } diff --git a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java index 4b8bf82a4a..17f90fef59 100644 --- a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java +++ b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java @@ -321,21 +321,32 @@ public class SessionEndpoint } - Delivery delivery = _incomingUnsettled.get(transfer.getDeliveryId()); + UnsignedInteger deliveryId = transfer.getDeliveryId(); + if(deliveryId == null) + { + deliveryId = ((ReceivingLinkEndpoint)endpoint).getLastDeliveryId(); + } + + Delivery delivery = _incomingUnsettled.get(deliveryId); if(delivery == null) { delivery = new Delivery(transfer, endpoint); - _incomingUnsettled.put(transfer.getDeliveryId(),delivery); + _incomingUnsettled.put(deliveryId,delivery); if(delivery.isSettled() || Boolean.TRUE.equals(transfer.getAborted())) { /* _availableIncomingCredit++; */ } + + if(Boolean.TRUE.equals(transfer.getMore())) + { + ((ReceivingLinkEndpoint)endpoint).setLastDeliveryId(transfer.getDeliveryId()); + } } else { - if(delivery.getDeliveryId().equals(transfer.getDeliveryId())) + if(delivery.getDeliveryId().equals(deliveryId)) { delivery.addTransfer(transfer); if(delivery.isSettled()) @@ -350,6 +361,11 @@ public class SessionEndpoint _availableIncomingCredit += delivery.getTransfers().size(); */ } + + if(!Boolean.TRUE.equals(transfer.getMore())) + { + ((ReceivingLinkEndpoint)endpoint).setLastDeliveryId(null); + } } else { @@ -365,7 +381,7 @@ public class SessionEndpoint if((delivery.isComplete() && delivery.isSettled() || Boolean.TRUE.equals(transfer.getAborted()))) { - _incomingUnsettled.remove(transfer.getDeliveryId()); + _incomingUnsettled.remove(deliveryId); } } } -- cgit v1.2.1