From c8d206abb661ade5e113ba7950e5d8b90a0b29ef Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Tue, 17 Apr 2012 11:15:25 +0000 Subject: QPID-3953 : Fix durable subscriber unsubscribe to explicitly set durability of link to NONE git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1327044 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/amqp_1_0/jms/impl/SessionImpl.java | 45 ++++++++++++++-------- .../org/apache/qpid/amqp_1_0/client/Receiver.java | 7 +++- 2 files changed, 35 insertions(+), 17 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java index 08a6ac6f20..e321245a0e 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java @@ -18,6 +18,24 @@ */ package org.apache.qpid.amqp_1_0.jms.impl; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.List; +import java.util.UUID; +import javax.jms.BytesMessage; +import javax.jms.Destination; +import javax.jms.IllegalStateException; +import javax.jms.InvalidDestinationException; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.MessageEOFException; +import javax.jms.MessageListener; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; +import javax.jms.Topic; import org.apache.qpid.amqp_1_0.client.Connection; import org.apache.qpid.amqp_1_0.client.Message; import org.apache.qpid.amqp_1_0.client.Receiver; @@ -32,25 +50,10 @@ import org.apache.qpid.amqp_1_0.jms.TopicPublisher; import org.apache.qpid.amqp_1_0.jms.TopicSession; import org.apache.qpid.amqp_1_0.jms.TopicSubscriber; import org.apache.qpid.amqp_1_0.type.AmqpErrorException; +import org.apache.qpid.amqp_1_0.type.messaging.Source; import org.apache.qpid.amqp_1_0.type.messaging.Target; import org.apache.qpid.amqp_1_0.type.transport.AmqpError; -import javax.jms.*; -import javax.jms.BytesMessage; -import javax.jms.Destination; -import javax.jms.IllegalStateException; -import javax.jms.MapMessage; -import javax.jms.ObjectMessage; -import javax.jms.Queue; -import javax.jms.StreamMessage; -import javax.jms.TextMessage; -import javax.jms.Topic; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Enumeration; -import java.util.List; -import java.util.UUID; - public class SessionImpl implements Session, QueueSession, TopicSession { private ConnectionImpl _connection; @@ -519,6 +522,16 @@ public class SessionImpl implements Session, QueueSession, TopicSession { Receiver receiver = new Receiver(getClientSession(), s, target, null, org.apache.qpid.amqp_1_0.client.AcknowledgeMode.ALO, false); + + final org.apache.qpid.amqp_1_0.type.Source receiverSource = receiver.getSource(); + if(receiverSource instanceof Source) + { + Source source = (Source) receiverSource; + receiver.close(); + receiver = new Receiver(getClientSession(), s, target, source, + org.apache.qpid.amqp_1_0.client.AcknowledgeMode.ALO, false); + + } receiver.close(); } catch(AmqpErrorException e) diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java index a9a6ade642..ad390fd498 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java @@ -85,7 +85,7 @@ public class Receiver implements DeliveryStateHandler source.setDurable(TerminusDurability.UNSETTLED_STATE); source.setExpiryPolicy(TerminusExpiryPolicy.NEVER); } - else + else if(source != null) { source.setDurable(TerminusDurability.NONE); source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); @@ -542,6 +542,11 @@ public class Receiver implements DeliveryStateHandler return _session; } + public org.apache.qpid.amqp_1_0.type.Source getSource() + { + return _endpoint.getSource(); + } + public static interface SettledAction { public void onSettled(Binary deliveryTag); -- cgit v1.2.1