summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-10-17 14:29:21 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-10-17 14:29:21 +0000
commit95fc93485ab66966713611a4e1429d917dabde64 (patch)
tree09ee31bc9462449dbcfc62379a393017c8f39843 /qpid/java
parent28dbfe8d101dd14a95b1d75e799107bdaa6e18d0 (diff)
downloadqpid-python-95fc93485ab66966713611a4e1429d917dabde64.tar.gz
QPID-6164 : Add synchronous publish capability to 0-8/9/9-1
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1632585 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java119
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java14
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java3
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java66
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java12
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java85
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java8
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java24
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java15
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java10
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/BasicNackBody.java143
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ClientChannelMethodProcessor.java1
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ClientMethodDispatcher.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ConfirmSelectBody.java105
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ConfirmSelectOkBody.java77
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java18
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/MethodDispatcher.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java1
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ServerMethodDispatcher.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java2
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/client/SyncPublishTest.java131
-rwxr-xr-xqpid/java/test-profiles/Java010Excludes2
26 files changed, 822 insertions, 32 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 5ca94891c1..d3ddaa16dd 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -201,6 +201,8 @@ public class AMQChannel
private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
private Session<?> _modelObject;
+ private boolean _confirmOnPublish;
+ private long _confirmedMessageCounter;
public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore)
@@ -394,6 +396,11 @@ public class AMQChannel
// check and deliver if header says body length is zero
if (_currentMessage.allContentReceived())
{
+ if(_confirmOnPublish)
+ {
+ _confirmedMessageCounter++;
+ }
+
try
{
@@ -421,6 +428,10 @@ public class AMQChannel
if(!checkMessageUserId(_currentMessage.getContentHeader()))
{
+ if(_confirmOnPublish)
+ {
+ _connection.writeFrame(new AMQFrame(_channelId, new BasicNackBody(_confirmedMessageCounter, false, false)));
+ }
_transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.ACCESS_REFUSED, "Access Refused", amqMessage));
}
else
@@ -461,6 +472,12 @@ public class AMQChannel
}
else
{
+ if(_confirmOnPublish)
+ {
+ BasicAckBody responseBody = _connection.getMethodRegistry()
+ .createBasicAckBody(_confirmedMessageCounter, false);
+ _connection.writeFrame(responseBody.generateFrame(_channelId));
+ }
incrementOutstandingTxnsIfNecessary();
}
}
@@ -503,7 +520,7 @@ public class AMQChannel
description, mandatory, isTransactional(), closeOnNoRoute));
}
- if (mandatory && isTransactional() && _connection.isCloseWhenNoRoute())
+ if (mandatory && isTransactional() && !_confirmOnPublish && _connection.isCloseWhenNoRoute())
{
_connection.closeConnection(AMQConstant.NO_ROUTE,
"No route for message " + currentMessageDescription(), _channelId);
@@ -512,6 +529,10 @@ public class AMQChannel
{
if (mandatory || message.isImmediate())
{
+ if(_confirmOnPublish)
+ {
+ _connection.writeFrame(new AMQFrame(_channelId, new BasicNackBody(_confirmedMessageCounter, false, false)));
+ }
_transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE,
"No Route for message "
+ currentMessageDescription(),
@@ -2236,8 +2257,6 @@ public class AMQChannel
if (requeue)
{
- //this requeue represents a message rejected from the pre-dispatch queue
- //therefore we need to amend the delivery counter.
message.decrementDeliveryCount();
requeue(deliveryTag);
@@ -2359,6 +2378,85 @@ public class AMQChannel
}
@Override
+ public void receiveBasicNack(final long deliveryTag, final boolean multiple, final boolean requeue)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] BasicNack[" +" deliveryTag: " + deliveryTag + " multiple: " + multiple + " requeue: " + requeue + " ]");
+ }
+
+ Map<Long, MessageInstance> nackedMessageMap = new LinkedHashMap<>();
+ _unacknowledgedMessageMap.collect(deliveryTag, multiple, nackedMessageMap);
+
+ for(MessageInstance message : nackedMessageMap.values())
+ {
+
+ if (message == null)
+ {
+ _logger.warn("Ignoring nack request as message is null for tag:" + deliveryTag);
+ }
+ else
+ {
+
+ if (message.getMessage() == null)
+ {
+ _logger.warn("Message has already been purged, unable to nack.");
+ }
+ else
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Nack-ing: DT:" + deliveryTag
+ + "-" + message.getMessage() +
+ ": Requeue:" + requeue
+ +
+ " on channel:" + debugIdentity());
+ }
+
+ if (requeue)
+ {
+ message.decrementDeliveryCount();
+
+ requeue(deliveryTag);
+ }
+ else
+ {
+ message.reject();
+
+ final boolean maxDeliveryCountEnabled = isMaxDeliveryCountEnabled(deliveryTag);
+ _logger.debug("maxDeliveryCountEnabled: "
+ + maxDeliveryCountEnabled
+ + " deliveryTag "
+ + deliveryTag);
+ if (maxDeliveryCountEnabled)
+ {
+ final boolean deliveredTooManyTimes = isDeliveredTooManyTimes(deliveryTag);
+ _logger.debug("deliveredTooManyTimes: "
+ + deliveredTooManyTimes
+ + " deliveryTag "
+ + deliveryTag);
+ if (deliveredTooManyTimes)
+ {
+ deadLetter(deliveryTag);
+ }
+ else
+ {
+ message.incrementDeliveryCount();
+ }
+ }
+ else
+ {
+ requeue(deliveryTag);
+ }
+ }
+ }
+ }
+
+ }
+
+ }
+
+ @Override
public void receiveChannelFlow(final boolean active)
{
if(_logger.isDebugEnabled())
@@ -3355,6 +3453,21 @@ public class AMQChannel
resend();
}
+ @Override
+ public void receiveConfirmSelect(final boolean nowait)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] ConfirmSelect [ nowait: " + nowait + " ]");
+ }
+ _confirmOnPublish = true;
+
+ if(!nowait)
+ {
+ _connection.writeFrame(new AMQFrame(_channelId, ConfirmSelectOkBody.INSTANCE));
+ }
+ }
+
private void closeChannel(final AMQConstant cause, final String message)
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index 413cf49eaf..49db24be52 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -85,6 +85,7 @@ import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.util.BytesDataOutput;
@@ -432,6 +433,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
String.valueOf(_closeWhenNoRoute));
serverProperties.setString(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED,
String.valueOf(_broker.isMessageCompressionEnabled()));
+ serverProperties.setString(ConnectionStartProperties.QPID_CONFIRMED_PUBLISH_SUPPORTED, Boolean.TRUE.toString());
AMQMethodBody responseBody = getMethodRegistry().createConnectionStartBody((short) getProtocolMajorVersion(),
(short) pv.getActualMinorVersion(),
@@ -1119,9 +1121,17 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
_currentClassId,
_currentMethodId);
- writeFrame(closeBody.generateFrame(0));
+ try
+ {
+ writeFrame(closeBody.generateFrame(0));
+
+ _sender.close();
+ }
+ catch(SenderException e)
+ {
+ // ignore
+ }
- _sender.close();
}
finally
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
index bd7b070cd2..198b7fe21b 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.protocol.v0_8;
import java.util.Collection;
+import java.util.Map;
import java.util.Set;
import org.apache.qpid.AMQException;
@@ -63,7 +64,7 @@ public interface UnacknowledgedMessageMap
Set<Long> getDeliveryTags();
Collection<MessageInstance> acknowledge(long deliveryTag, boolean multiple);
-
+ void collect(long key, boolean multiple, Map<Long, MessageInstance> msgs);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 176eb5d0c4..bcf0721aab 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -46,6 +46,8 @@ import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.ChannelOpenBody;
import org.apache.qpid.framing.ChannelOpenOkBody;
+import org.apache.qpid.framing.ConfirmSelectBody;
+import org.apache.qpid.framing.ConfirmSelectOkBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.framing.TxSelectBody;
@@ -68,6 +70,8 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
private final AMQConnection _conn;
private boolean _messageCompressionSupported;
private boolean _addrSyntaxSupported;
+ private boolean _confirmedPublishSupported;
+ private boolean _confirmedPublishNonTransactionalSupported;
public void closeConnection(long timeout) throws JMSException, AMQException
{
@@ -94,6 +98,11 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException));
}
+ public boolean isConfirmedPublishSupported()
+ {
+ return _confirmedPublishSupported;
+ }
+
public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws AMQException, IOException
{
if (_logger.isDebugEnabled())
@@ -146,6 +155,8 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
_conn.setConnected(true);
_conn.logConnected(network.getLocalAddress(), network.getRemoteAddress());
_messageCompressionSupported = checkMessageCompressionSupported();
+ _confirmedPublishSupported = checkConfirmedPublishSupported();
+ _confirmedPublishNonTransactionalSupported = checkConfirmedPublishNonTransactionalSupported();
return null;
}
else
@@ -155,6 +166,32 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
}
+ // RabbitMQ supports confirmed publishing, but only on non transactional sessions
+ private boolean checkConfirmedPublishNonTransactionalSupported()
+ {
+ FieldTable serverProperties = _conn.getProtocolHandler().getProtocolSession().getConnectionStartServerProperties();
+ if( serverProperties != null
+ && serverProperties.containsKey("capabilities")
+ && serverProperties.get("capabilities") instanceof FieldTable)
+ {
+ FieldTable capabilities = serverProperties.getFieldTable("capabilities");
+ if(capabilities.containsKey("publisher_confirms")
+ && capabilities.get("publisher_confirms") instanceof Boolean
+ && capabilities.getBoolean("publisher_confirms"))
+ {
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ else
+ {
+ return false;
+ }
+ }
+
public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch)
throws JMSException
{
@@ -266,9 +303,21 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
}
TxSelectBody body = _conn.getProtocolHandler().getMethodRegistry().createTxSelectBody();
- // TODO: Be aware of possible changes to parameter order as versions change.
+
_conn.getProtocolHandler().syncWrite(body.generateFrame(channelId), TxSelectOkBody.class);
}
+ boolean useConfirms = (_confirmedPublishSupported || (!transacted && _confirmedPublishNonTransactionalSupported))
+ && "all".equals(_conn.getSyncPublish());
+ if(useConfirms)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Issuing ConfirmSelect for " + channelId);
+ }
+ ConfirmSelectBody body = new ConfirmSelectBody(false);
+
+ _conn.getProtocolHandler().syncWrite(body.generateFrame(channelId), ConfirmSelectOkBody.class);
+ }
}
public void failoverPrep()
@@ -340,7 +389,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
}
catch (IllegalStateException e)
{
- if (!(e.getMessage().startsWith("Fail-over interupted no-op failover support")))
+ if (!(e.getMessage().startsWith("Fail-over interrupted no-op failover support")))
{
throw e;
}
@@ -424,6 +473,14 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
}
+ private boolean checkConfirmedPublishSupported()
+ {
+ FieldTable serverProperties = _conn.getProtocolHandler().getProtocolSession().getConnectionStartServerProperties();
+ return serverProperties != null
+ && Boolean.parseBoolean(serverProperties.getString(ConnectionStartProperties.QPID_CONFIRMED_PUBLISH_SUPPORTED));
+
+ }
+
public boolean isMessageCompressionSupported()
{
return _messageCompressionSupported;
@@ -433,4 +490,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
{
return _addrSyntaxSupported;
}
+
+ public boolean isConfirmedPublishNonTransactionalSupported()
+ {
+ return _confirmedPublishNonTransactionalSupported;
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 1d47ce9a07..f72d915c25 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -122,7 +122,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
? System.getProperty("qpid.default_mandatory")
: "false"));
- private PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL;
+ private PublishMode _publishMode = PublishMode.ASYNC_PUBLISH_ALL;
protected BasicMessageProducer(Logger logger,AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
AMQSession session, long producerId, Boolean immediate, Boolean mandatory) throws AMQException
@@ -165,16 +165,16 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
// Support for deprecated option sync_persistence
if (syncPub.equals("persistent") || _connection.getSyncPersistence())
{
- publishMode = PublishMode.SYNC_PUBLISH_PERSISTENT;
+ _publishMode = PublishMode.SYNC_PUBLISH_PERSISTENT;
}
else if (syncPub.equals("all"))
{
- publishMode = PublishMode.SYNC_PUBLISH_ALL;
+ _publishMode = PublishMode.SYNC_PUBLISH_ALL;
}
if (_logger.isDebugEnabled())
{
- _logger.debug("MessageProducer " + toString() + " using publish mode : " + publishMode);
+ _logger.debug("MessageProducer " + toString() + " using publish mode : " + _publishMode);
}
}
@@ -720,12 +720,12 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
protected PublishMode getPublishMode()
{
- return publishMode;
+ return _publishMode;
}
protected void setPublishMode(PublishMode publishMode)
{
- this.publishMode = publishMode;
+ _publishMode = publishMode;
}
Logger getLogger()
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
index 69d02566bf..e4c879aca8 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
@@ -32,14 +32,19 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQException;
+import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.AMQMessageDelegate_0_8;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.QpidMessageProperties;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.protocol.BlockingMethodFrameListener;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicAckBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.BasicNackBody;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.CompositeAMQDataBlock;
import org.apache.qpid.framing.ContentBody;
@@ -211,9 +216,6 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
}
- // TODO: This is a hacky way of getting the AMQP class-id for the Basic class
- int classIfForBasic = getSession().getMethodRegistry().createBasicQosOkBody().getClazz();
-
AMQFrame contentHeaderFrame =
ContentHeaderBody.createAMQFrame(getChannelId(),
contentHeaderProperties, size);
@@ -232,7 +234,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
frames[0] = publishFrame;
frames[1] = contentHeaderFrame;
- CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
+ final CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
try
{
@@ -246,7 +248,40 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
throw jmse;
}
- getConnection().getProtocolHandler().writeFrame(compositeFrame);
+ AMQConnectionDelegate_8_0 connectionDelegate80 = (AMQConnectionDelegate_8_0) (getConnection().getDelegate());
+
+ boolean useConfirms = getPublishMode() == PublishMode.SYNC_PUBLISH_ALL
+ && (connectionDelegate80.isConfirmedPublishSupported()
+ || (!getSession().isTransacted() && connectionDelegate80.isConfirmedPublishNonTransactionalSupported()));
+
+ if(!useConfirms)
+ {
+ getConnection().getProtocolHandler().writeFrame(compositeFrame);
+ }
+ else
+ {
+ final PublishConfirmMessageListener frameListener = new PublishConfirmMessageListener(getChannelId());
+ try
+ {
+
+ getConnection().getProtocolHandler().writeCommandFrameAndWaitForReply(compositeFrame,
+ frameListener);
+
+ if(frameListener.isRejected())
+ {
+ throw new JMSException("The message was not accepted by the server (e.g. because the address was no longer valid)");
+ }
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
+ catch (FailoverException e)
+ {
+ throw new JMSAMQException("Fail-over interrupted send. Status of the send is uncertain.", e);
+
+ }
+ }
}
/**
@@ -290,7 +325,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
private int calculateContentBodyFrameCount(ByteBuffer payload)
{
- // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
+ // we subtract one from the total frame maximum size to account for the end of frame marker in a body frame
// (0xCE byte).
int frameCount;
if ((payload == null) || (payload.remaining() == 0))
@@ -313,4 +348,42 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
{
return (AMQSession_0_8) super.getSession();
}
+
+ private static class PublishConfirmMessageListener extends BlockingMethodFrameListener
+ {
+ private boolean _rejected;
+
+ /**
+ * Creates a new method listener, that filters incoming method to just those that match the specified channel id.
+ *
+ * @param channelId The channel id to filter incoming methods with.
+ */
+ public PublishConfirmMessageListener(final int channelId)
+ {
+ super(channelId);
+ }
+
+ @Override
+ public boolean processMethod(final int channelId, final AMQMethodBody frame)
+ {
+ if (frame instanceof BasicAckBody)
+ {
+ return true;
+ }
+ else if (frame instanceof BasicNackBody)
+ {
+ _rejected = true;
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public boolean isRejected()
+ {
+ return _rejected;
+ }
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java
index a69e808880..da17bedcfd 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java
@@ -68,7 +68,7 @@ public class FailoverNoopSupport<T, E extends Exception> implements FailoverSupp
}
catch (FailoverException e)
{
- throw new IllegalStateException("Fail-over interupted no-op failover support. "
+ throw new IllegalStateException("Fail-over interrupted no-op failover support. "
+ "No-op support should only be used where the caller is certain fail-over cannot occur.", e);
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java
index ffe0baecd8..74bf9a54fd 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java
@@ -45,7 +45,7 @@ import org.apache.qpid.client.AMQConnection;
* <p>
* Wrapping a synchronous method in a FailoverRetrySupport will have the effect that the operation will not be
* started during fail-over, but be delayed until any current fail-over has completed. Should a fail-over process want
- * to start whilst waiting for the synchrnous reply, the FailoverRetrySupport will detect this and rety the operation
+ * to start whilst waiting for the synchronous reply, the FailoverRetrySupport will detect this and retry the operation
* until it succeeds. Synchronous methods are usually coordinated with a
* {@link org.apache.qpid.client.protocol.BlockingMethodFrameListener} which is notified when a fail-over process wants
* to start and throws a FailoverException in response to this.
@@ -53,12 +53,6 @@ import org.apache.qpid.client.AMQConnection;
* Wrapping an asynchronous method in a FailoverRetrySupport will have the effect that the operation will not be
* started during fail-over, but be delayed until any current fail-over has completed.
* <p>
- * TODO Another continuation. Could use an interface Continuation (as described in other todos)
- * Then have a wrapping continuation (this), which blocks on an arbitrary
- * Condition or Latch (specified in constructor call), that this blocks on before calling the wrapped Continuation.
- * Must work on Java 1.4, so check retrotranslator works on Lock/Condition or latch first. Argument and return type
- * to match wrapped condition as type parameters. Rename to AsyncConditionalContinuation or something like that.
- * <p>
* TODO InterruptedException not handled well.
*/
public class FailoverRetrySupport<T, E extends Exception> implements FailoverSupport<T, E>
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
index e6eb2d814f..de2f2f52a9 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
@@ -53,6 +53,7 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher
private static final Logger _logger = LoggerFactory.getLogger(ClientMethodDispatcherImpl.class);
+
private static interface DispatcherFactory
{
public ClientMethodDispatcherImpl createMethodDispatcher(AMQProtocolSession session);
@@ -147,6 +148,13 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher
return false;
}
+ @Override
+ public boolean dispatchConfirmSelectOk(final ConfirmSelectOkBody confirmSelectOkBody, final int channelId)
+ throws AMQException
+ {
+ return false;
+ }
+
public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException
{
_basicCancelOkMethodHandler.methodReceived(_session, body, channelId);
@@ -271,11 +279,19 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher
throw new AMQMethodNotImplementedException(body);
}
+ @Override
public boolean dispatchBasicAck(BasicAckBody body, int channelId) throws AMQException
{
- throw new AMQMethodNotImplementedException(body);
+ return false;
}
+ @Override
+ public boolean dispatchBasicNack(final BasicNackBody basicNackBody, final int channelId)
+ {
+ return false;
+ }
+
+
public boolean dispatchBasicCancel(BasicCancelBody body, int channelId) throws AMQException
{
throw new AMQMethodNotImplementedException(body);
@@ -400,6 +416,12 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher
return false;
}
+ @Override
+ public boolean dispatchConfirmSelect(final ConfirmSelectBody body, final int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException
{
_exchangeBoundOkMethodHandler.methodReceived(_session, body, channelId);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index bb98c0abbd..4886eabb90 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -661,7 +661,7 @@ public class AMQProtocolHandler implements ProtocolEngine
* @param frame
* @param listener the blocking listener. Note the calling thread will block.
*/
- public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, BlockingMethodFrameListener listener)
+ public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQDataBlock frame, BlockingMethodFrameListener listener)
throws AMQException, FailoverException
{
return writeCommandFrameAndWaitForReply(frame, listener, DEFAULT_SYNC_TIMEOUT);
@@ -674,7 +674,7 @@ public class AMQProtocolHandler implements ProtocolEngine
* @param frame
* @param listener the blocking listener. Note the calling thread will block.
*/
- public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, BlockingMethodFrameListener listener,
+ public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQDataBlock frame, BlockingMethodFrameListener listener,
long timeout) throws AMQException, FailoverException
{
try
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java
index 5048193cac..9bdc1dd889 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java
@@ -209,6 +209,9 @@ public class ClientDecoder extends AMQDecoder<ClientMethodProcessor<? extends Cl
case 0x003c0048:
BasicGetEmptyBody.process(in, channelMethodProcessor);
break;
+ case 0x003c0050:
+ BasicAckBody.process(in, channelMethodProcessor);
+ break;
case 0x003c0065:
if(!channelMethodProcessor.ignoreAllButCloseOk())
{
@@ -221,6 +224,18 @@ public class ClientDecoder extends AMQDecoder<ClientMethodProcessor<? extends Cl
channelMethodProcessor.receiveBasicRecoverSyncOk();
}
break;
+ case 0x003c0078:
+ BasicNackBody.process(in, channelMethodProcessor);
+ break;
+
+ // CONFIRM CLASS:
+
+ case 0x0055000b:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveConfirmSelectOk();
+ }
+ break;
// TX_CLASS:
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
index 3b138ba278..32a45da60c 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
@@ -197,6 +197,15 @@ public class ServerDecoder extends AMQDecoder<ServerMethodProcessor<? extends Se
case 0x003c006e:
BasicRecoverSyncBody.process(in, channelMethodProcessor);
break;
+ case 0x003c0078:
+ BasicNackBody.process(in, channelMethodProcessor);
+ break;
+
+ // CONFIRM CLASS:
+
+ case 0x0055000a:
+ ConfirmSelectBody.process(in, channelMethodProcessor);
+ break;
// TX_CLASS:
@@ -219,6 +228,7 @@ public class ServerDecoder extends AMQDecoder<ServerMethodProcessor<? extends Se
}
break;
+
default:
throw newUnknownMethodException(classId, methodId,
methodProcessor.getProtocolVersion());
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java
index 68782231fe..01ca6f95f1 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java
@@ -113,7 +113,7 @@ public class BasicAckBody extends AMQMethodBodyImpl implements EncodableAMQDataB
}
public static void process(final MarkableDataInput buffer,
- final ServerChannelMethodProcessor dispatcher) throws IOException
+ final ChannelMethodProcessor dispatcher) throws IOException
{
long deliveryTag = buffer.readLong();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicNackBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicNackBody.java
new file mode 100644
index 0000000000..33ccb10f39
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicNackBody.java
@@ -0,0 +1,143 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP version:
+ * 8-0
+ */
+
+package org.apache.qpid.framing;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.codec.MarkableDataInput;
+
+public class BasicNackBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
+{
+
+ public static final int CLASS_ID = 60;
+ public static final int METHOD_ID = 120;
+
+ // Fields declared in specification
+ private final long _deliveryTag; // [deliveryTag]
+ private final byte _bitfield0; // [multiple]
+
+ // Constructor
+ public BasicNackBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
+ {
+ _deliveryTag = buffer.readLong();
+ _bitfield0 = buffer.readByte();
+ }
+
+ public BasicNackBody(
+ long deliveryTag,
+ boolean multiple,
+ boolean requeue
+ )
+ {
+ _deliveryTag = deliveryTag;
+ byte bitfield0 = (byte)0;
+ if( multiple )
+ {
+ bitfield0 = (byte) (((int) bitfield0) | 1);
+
+ }
+ if( requeue )
+ {
+ bitfield0 = (byte) (((int) bitfield0) | 2);
+ }
+ _bitfield0 = bitfield0;
+ }
+
+ public int getClazz()
+ {
+ return CLASS_ID;
+ }
+
+ public int getMethod()
+ {
+ return METHOD_ID;
+ }
+
+ public final long getDeliveryTag()
+ {
+ return _deliveryTag;
+ }
+
+ public final boolean getMultiple()
+ {
+ return (((int)(_bitfield0)) & 1) != 0;
+ }
+
+ public final boolean getRequeue()
+ {
+ return (((int)(_bitfield0)) & 2 ) != 0;
+ }
+
+ protected int getBodySize()
+ {
+ int size = 9;
+ return size;
+ }
+
+ public void writeMethodPayload(DataOutput buffer) throws IOException
+ {
+ writeLong( buffer, _deliveryTag );
+ writeBitfield( buffer, _bitfield0 );
+ }
+
+ public boolean execute(MethodDispatcher dispatcher, int channelId) throws AMQException
+ {
+ return dispatcher.dispatchBasicNack(this, channelId);
+ }
+
+ public String toString()
+ {
+ StringBuilder buf = new StringBuilder("[BasicNackBodyImpl: ");
+ buf.append( "deliveryTag=" );
+ buf.append( getDeliveryTag() );
+ buf.append( ", " );
+ buf.append( "multiple=" );
+ buf.append( getMultiple() );
+ buf.append( ", " );
+ buf.append( "requeue=" );
+ buf.append( getRequeue() );
+ buf.append("]");
+ return buf.toString();
+ }
+
+ public static void process(final MarkableDataInput buffer,
+ final ChannelMethodProcessor dispatcher) throws IOException
+ {
+
+ long deliveryTag = buffer.readLong();
+ byte bitfield = buffer.readByte();
+ boolean multiple = (bitfield & 0x01) != 0;
+ boolean requeue = (bitfield & 0x02) != 0;
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveBasicNack(deliveryTag, multiple, requeue);
+ }
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java
index 84cd1e13c2..75fbb15629 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java
@@ -35,4 +35,8 @@ public interface ChannelMethodProcessor
void receiveMessageHeader(BasicContentHeaderProperties properties, long bodySize);
boolean ignoreAllButCloseOk();
+
+ void receiveBasicNack(long deliveryTag, boolean multiple, boolean requeue);
+
+ void receiveBasicAck(long deliveryTag, boolean multiple);
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientChannelMethodProcessor.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientChannelMethodProcessor.java
index bef143e39b..289a284d6b 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientChannelMethodProcessor.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientChannelMethodProcessor.java
@@ -75,4 +75,5 @@ public interface ClientChannelMethodProcessor extends ChannelMethodProcessor
void receiveTxRollbackOk();
+ void receiveConfirmSelectOk();
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientMethodDispatcher.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientMethodDispatcher.java
index 97de0ac487..4f1b6b917e 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientMethodDispatcher.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientMethodDispatcher.java
@@ -72,4 +72,6 @@ public interface ClientMethodDispatcher
throws AMQException;
boolean dispatchChannelAlert(ChannelAlertBody channelAlertBody, int channelId) throws AMQException;
+
+ boolean dispatchConfirmSelectOk(ConfirmSelectOkBody confirmSelectOkBody, int channelId) throws AMQException;
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ConfirmSelectBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ConfirmSelectBody.java
new file mode 100644
index 0000000000..7f9e56caa6
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ConfirmSelectBody.java
@@ -0,0 +1,105 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP version:
+ * 8-0
+ */
+
+package org.apache.qpid.framing;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.codec.MarkableDataInput;
+
+public class ConfirmSelectBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
+{
+
+ public static final int CLASS_ID = 85;
+ public static final int METHOD_ID = 10;
+
+ // Fields declared in specification
+ private final boolean _nowait; // [active]
+
+ // Constructor
+ public ConfirmSelectBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
+ {
+ _nowait = (buffer.readByte() & 0x01) == 0x01;
+ }
+
+ public ConfirmSelectBody(boolean nowait)
+ {
+ _nowait = nowait;
+ }
+
+ public int getClazz()
+ {
+ return CLASS_ID;
+ }
+
+ public int getMethod()
+ {
+ return METHOD_ID;
+ }
+
+ public final boolean getNowait()
+ {
+ return _nowait;
+ }
+
+ protected int getBodySize()
+ {
+ return 1;
+ }
+
+ public void writeMethodPayload(DataOutput buffer) throws IOException
+ {
+ writeBitfield( buffer, _nowait ? (byte)1 : (byte)0 );
+ }
+
+ public boolean execute(MethodDispatcher dispatcher, int channelId) throws AMQException
+ {
+ return dispatcher.dispatchConfirmSelect(this, channelId);
+ }
+
+ public String toString()
+ {
+ StringBuilder buf = new StringBuilder("[ConfirmSelectBody: ");
+ buf.append( "active=" );
+ buf.append( getNowait() );
+ buf.append("]");
+ return buf.toString();
+ }
+
+ public static void process(final MarkableDataInput buffer,
+ final ServerChannelMethodProcessor dispatcher)
+ throws IOException
+ {
+ boolean nowait = (buffer.readByte() & 0x01) == 0x01;
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveConfirmSelect(nowait);
+ }
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ConfirmSelectOkBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ConfirmSelectOkBody.java
new file mode 100644
index 0000000000..d0e3bd093d
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ConfirmSelectOkBody.java
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP version:
+ * 8-0
+ */
+
+package org.apache.qpid.framing;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.qpid.AMQException;
+
+public class ConfirmSelectOkBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
+{
+
+ public static final int CLASS_ID = 85;
+ public static final int METHOD_ID = 11;
+
+ public static final ConfirmSelectOkBody INSTANCE = new ConfirmSelectOkBody();
+
+ private ConfirmSelectOkBody()
+ {
+ }
+
+ public int getClazz()
+ {
+ return CLASS_ID;
+ }
+
+ public int getMethod()
+ {
+ return METHOD_ID;
+ }
+
+
+ protected int getBodySize()
+ {
+ return 0;
+ }
+
+ public void writeMethodPayload(DataOutput buffer) throws IOException
+ {
+ }
+
+ public boolean execute(MethodDispatcher dispatcher, int channelId) throws AMQException
+ {
+ return dispatcher.dispatchConfirmSelectOk(this, channelId);
+ }
+
+ public String toString()
+ {
+ return "[ConfirmSelectOkBody]";
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
index 19b091a359..0d160a73d5 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
@@ -358,6 +358,12 @@ public class FrameCreatingMethodProcessor implements MethodProcessor<FrameCreati
}
@Override
+ public void receiveConfirmSelectOk()
+ {
+ _processedMethods.add(new AMQFrame(_channelId, ConfirmSelectOkBody.INSTANCE));
+ }
+
+ @Override
public void receiveAccessRequest(final AMQShortString realm,
final boolean exclusive,
final boolean passive,
@@ -564,6 +570,12 @@ public class FrameCreatingMethodProcessor implements MethodProcessor<FrameCreati
}
@Override
+ public void receiveConfirmSelect(final boolean nowait)
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new ConfirmSelectBody(nowait)));
+ }
+
+ @Override
public void receiveChannelFlow(final boolean active)
{
_processedMethods.add(new AMQFrame(_channelId, new ChannelFlowBody(active)));
@@ -607,5 +619,11 @@ public class FrameCreatingMethodProcessor implements MethodProcessor<FrameCreati
{
return false;
}
+
+ @Override
+ public void receiveBasicNack(final long deliveryTag, final boolean multiple, final boolean requeue)
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new BasicNackBody(deliveryTag, multiple, requeue)));
+ }
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodDispatcher.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodDispatcher.java
index 03b122a7a7..a485397a5e 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodDispatcher.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodDispatcher.java
@@ -32,4 +32,6 @@ package org.apache.qpid.framing;
public interface MethodDispatcher extends
ClientMethodDispatcher, ServerMethodDispatcher
{
+
+ boolean dispatchBasicNack(BasicNackBody basicNackBody, int channelId);
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java
index 89b75c2d2f..6d43accc96 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java
@@ -89,4 +89,5 @@ public interface ServerChannelMethodProcessor extends ChannelMethodProcessor
void receiveTxRollback();
+ void receiveConfirmSelect(boolean nowait);
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerMethodDispatcher.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerMethodDispatcher.java
index f4ab67dad4..d3961a1a59 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerMethodDispatcher.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerMethodDispatcher.java
@@ -68,4 +68,6 @@ public interface ServerMethodDispatcher
boolean dispatchQueueUnbind(QueueUnbindBody queueUnbindBody, int channelId) throws AMQException;
boolean dispatchBasicRecoverSync(BasicRecoverSyncBody basicRecoverSyncBody, int channelId) throws AMQException;
+
+ boolean dispatchConfirmSelect(ConfirmSelectBody confirmSelectBody, int channelId) throws AMQException;
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java
index 8f1a1d0be0..4f88fe7071 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java
@@ -60,6 +60,8 @@ public class ConnectionStartProperties
public static final String SESSION_FLOW = "qpid.session_flow";
+ public static final String QPID_CONFIRMED_PUBLISH_SUPPORTED = "qpid.confirmed_publish_supported";
+
public static int _pid;
public static final String _platformInfo;
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/client/SyncPublishTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/client/SyncPublishTest.java
new file mode 100644
index 0000000000..2985efe9b5
--- /dev/null
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/client/SyncPublishTest.java
@@ -0,0 +1,131 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class SyncPublishTest extends QpidBrokerTestCase
+{
+ private Connection _connection;
+
+ @Override
+ public void setUp() throws Exception
+ {
+
+ super.setUp();
+ Map<String, String> options = new HashMap<>();
+ options.put(ConnectionURL.OPTIONS_SYNC_PUBLISH, "all");
+ _connection = getConnectionWithOptions(options);
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ _connection.close();
+ super.tearDown();
+ }
+
+ public void testAnonPublisherUnknownDestination() throws Exception
+ {
+ Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(null);
+ try
+ {
+ producer.send(session.createQueue("direct://amq.direct/unknown/unknown"),session.createTextMessage("hello"));
+ fail("Send to unknown destination should result in error");
+ }
+ catch (JMSException e)
+ {
+ // pass
+ }
+ }
+
+
+ public void testAnonPublisherUnknownDestinationTransactional() throws Exception
+ {
+ Session session = _connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer producer = session.createProducer(null);
+ try
+ {
+ producer.send(session.createQueue("direct://amq.direct/unknown/unknown"),session.createTextMessage("hello"));
+ fail("Send to unknown destination should result in error");
+ }
+ catch (JMSException e)
+ {
+ // pass
+ }
+ try
+ {
+ session.commit();
+ }
+ catch (JMSException e)
+ {
+ fail("session should commit successfully even though the message was not sent");
+ }
+
+ }
+
+ public void testQueueRemovedAfterConsumerCreated() throws JMSException
+ {
+ Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ TemporaryQueue queue = session.createTemporaryQueue();
+ MessageProducer producer = session.createProducer(queue);
+ try
+ {
+ producer.send(session.createTextMessage("hello"));
+ }
+ catch (JMSException e)
+ {
+ fail("Send to temporary queue should succeed");
+ }
+
+ try
+ {
+ queue.delete();
+ }
+ catch (JMSException e)
+ {
+ fail("temporary queue should be deletable");
+ }
+
+ try
+ {
+ producer.send(session.createTextMessage("hello"));
+ fail("Send to deleted temporary queue should not succeed");
+ }
+ catch (JMSException e)
+ {
+ // pass
+ }
+
+
+ }
+}
diff --git a/qpid/java/test-profiles/Java010Excludes b/qpid/java/test-profiles/Java010Excludes
index ce7c5fa151..136bc7918f 100755
--- a/qpid/java/test-profiles/Java010Excludes
+++ b/qpid/java/test-profiles/Java010Excludes
@@ -76,3 +76,5 @@ org.apache.qpid.systest.management.jmx.QueueManagementTest#testExclusiveQueueHas
org.apache.qpid.test.unit.client.AMQSessionTest#testQueueDepthForQueueThatDoesNotExistLegacyBehaviour_08_091
org.apache.qpid.client.prefetch.PrefetchBehaviourTest#testPrefetchWindowExpandsOnReceiveTransaction
+
+org.apache.qpid.client.SyncPublishTest#*