diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-10-17 14:29:21 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-10-17 14:29:21 +0000 |
| commit | 95fc93485ab66966713611a4e1429d917dabde64 (patch) | |
| tree | 09ee31bc9462449dbcfc62379a393017c8f39843 /qpid/java | |
| parent | 28dbfe8d101dd14a95b1d75e799107bdaa6e18d0 (diff) | |
| download | qpid-python-95fc93485ab66966713611a4e1429d917dabde64.tar.gz | |
QPID-6164 : Add synchronous publish capability to 0-8/9/9-1
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1632585 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
26 files changed, 822 insertions, 32 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 5ca94891c1..d3ddaa16dd 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -201,6 +201,8 @@ public class AMQChannel private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener(); private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>(); private Session<?> _modelObject; + private boolean _confirmOnPublish; + private long _confirmedMessageCounter; public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore) @@ -394,6 +396,11 @@ public class AMQChannel // check and deliver if header says body length is zero if (_currentMessage.allContentReceived()) { + if(_confirmOnPublish) + { + _confirmedMessageCounter++; + } + try { @@ -421,6 +428,10 @@ public class AMQChannel if(!checkMessageUserId(_currentMessage.getContentHeader())) { + if(_confirmOnPublish) + { + _connection.writeFrame(new AMQFrame(_channelId, new BasicNackBody(_confirmedMessageCounter, false, false))); + } _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.ACCESS_REFUSED, "Access Refused", amqMessage)); } else @@ -461,6 +472,12 @@ public class AMQChannel } else { + if(_confirmOnPublish) + { + BasicAckBody responseBody = _connection.getMethodRegistry() + .createBasicAckBody(_confirmedMessageCounter, false); + _connection.writeFrame(responseBody.generateFrame(_channelId)); + } incrementOutstandingTxnsIfNecessary(); } } @@ -503,7 +520,7 @@ public class AMQChannel description, mandatory, isTransactional(), closeOnNoRoute)); } - if (mandatory && isTransactional() && _connection.isCloseWhenNoRoute()) + if (mandatory && isTransactional() && !_confirmOnPublish && _connection.isCloseWhenNoRoute()) { _connection.closeConnection(AMQConstant.NO_ROUTE, "No route for message " + currentMessageDescription(), _channelId); @@ -512,6 +529,10 @@ public class AMQChannel { if (mandatory || message.isImmediate()) { + if(_confirmOnPublish) + { + _connection.writeFrame(new AMQFrame(_channelId, new BasicNackBody(_confirmedMessageCounter, false, false))); + } _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message " + currentMessageDescription(), @@ -2236,8 +2257,6 @@ public class AMQChannel if (requeue) { - //this requeue represents a message rejected from the pre-dispatch queue - //therefore we need to amend the delivery counter. message.decrementDeliveryCount(); requeue(deliveryTag); @@ -2359,6 +2378,85 @@ public class AMQChannel } @Override + public void receiveBasicNack(final long deliveryTag, final boolean multiple, final boolean requeue) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] BasicNack[" +" deliveryTag: " + deliveryTag + " multiple: " + multiple + " requeue: " + requeue + " ]"); + } + + Map<Long, MessageInstance> nackedMessageMap = new LinkedHashMap<>(); + _unacknowledgedMessageMap.collect(deliveryTag, multiple, nackedMessageMap); + + for(MessageInstance message : nackedMessageMap.values()) + { + + if (message == null) + { + _logger.warn("Ignoring nack request as message is null for tag:" + deliveryTag); + } + else + { + + if (message.getMessage() == null) + { + _logger.warn("Message has already been purged, unable to nack."); + } + else + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Nack-ing: DT:" + deliveryTag + + "-" + message.getMessage() + + ": Requeue:" + requeue + + + " on channel:" + debugIdentity()); + } + + if (requeue) + { + message.decrementDeliveryCount(); + + requeue(deliveryTag); + } + else + { + message.reject(); + + final boolean maxDeliveryCountEnabled = isMaxDeliveryCountEnabled(deliveryTag); + _logger.debug("maxDeliveryCountEnabled: " + + maxDeliveryCountEnabled + + " deliveryTag " + + deliveryTag); + if (maxDeliveryCountEnabled) + { + final boolean deliveredTooManyTimes = isDeliveredTooManyTimes(deliveryTag); + _logger.debug("deliveredTooManyTimes: " + + deliveredTooManyTimes + + " deliveryTag " + + deliveryTag); + if (deliveredTooManyTimes) + { + deadLetter(deliveryTag); + } + else + { + message.incrementDeliveryCount(); + } + } + else + { + requeue(deliveryTag); + } + } + } + } + + } + + } + + @Override public void receiveChannelFlow(final boolean active) { if(_logger.isDebugEnabled()) @@ -3355,6 +3453,21 @@ public class AMQChannel resend(); } + @Override + public void receiveConfirmSelect(final boolean nowait) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] ConfirmSelect [ nowait: " + nowait + " ]"); + } + _confirmOnPublish = true; + + if(!nowait) + { + _connection.writeFrame(new AMQFrame(_channelId, ConfirmSelectOkBody.INSTANCE)); + } + } + private void closeChannel(final AMQConstant cause, final String message) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 413cf49eaf..49db24be52 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -85,6 +85,7 @@ import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.SenderException; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.util.BytesDataOutput; @@ -432,6 +433,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, String.valueOf(_closeWhenNoRoute)); serverProperties.setString(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED, String.valueOf(_broker.isMessageCompressionEnabled())); + serverProperties.setString(ConnectionStartProperties.QPID_CONFIRMED_PUBLISH_SUPPORTED, Boolean.TRUE.toString()); AMQMethodBody responseBody = getMethodRegistry().createConnectionStartBody((short) getProtocolMajorVersion(), (short) pv.getActualMinorVersion(), @@ -1119,9 +1121,17 @@ public class AMQProtocolEngine implements ServerProtocolEngine, _currentClassId, _currentMethodId); - writeFrame(closeBody.generateFrame(0)); + try + { + writeFrame(closeBody.generateFrame(0)); + + _sender.close(); + } + catch(SenderException e) + { + // ignore + } - _sender.close(); } finally { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java index bd7b070cd2..198b7fe21b 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.protocol.v0_8; import java.util.Collection; +import java.util.Map; import java.util.Set; import org.apache.qpid.AMQException; @@ -63,7 +64,7 @@ public interface UnacknowledgedMessageMap Set<Long> getDeliveryTags(); Collection<MessageInstance> acknowledge(long deliveryTag, boolean multiple); - + void collect(long key, boolean multiple, Map<Long, MessageInstance> msgs); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 176eb5d0c4..bcf0721aab 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -46,6 +46,8 @@ import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.ChannelOpenBody; import org.apache.qpid.framing.ChannelOpenOkBody; +import org.apache.qpid.framing.ConfirmSelectBody; +import org.apache.qpid.framing.ConfirmSelectOkBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.framing.TxSelectBody; @@ -68,6 +70,8 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate private final AMQConnection _conn; private boolean _messageCompressionSupported; private boolean _addrSyntaxSupported; + private boolean _confirmedPublishSupported; + private boolean _confirmedPublishNonTransactionalSupported; public void closeConnection(long timeout) throws JMSException, AMQException { @@ -94,6 +98,11 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException)); } + public boolean isConfirmedPublishSupported() + { + return _confirmedPublishSupported; + } + public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws AMQException, IOException { if (_logger.isDebugEnabled()) @@ -146,6 +155,8 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate _conn.setConnected(true); _conn.logConnected(network.getLocalAddress(), network.getRemoteAddress()); _messageCompressionSupported = checkMessageCompressionSupported(); + _confirmedPublishSupported = checkConfirmedPublishSupported(); + _confirmedPublishNonTransactionalSupported = checkConfirmedPublishNonTransactionalSupported(); return null; } else @@ -155,6 +166,32 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate } + // RabbitMQ supports confirmed publishing, but only on non transactional sessions + private boolean checkConfirmedPublishNonTransactionalSupported() + { + FieldTable serverProperties = _conn.getProtocolHandler().getProtocolSession().getConnectionStartServerProperties(); + if( serverProperties != null + && serverProperties.containsKey("capabilities") + && serverProperties.get("capabilities") instanceof FieldTable) + { + FieldTable capabilities = serverProperties.getFieldTable("capabilities"); + if(capabilities.containsKey("publisher_confirms") + && capabilities.get("publisher_confirms") instanceof Boolean + && capabilities.getBoolean("publisher_confirms")) + { + return true; + } + else + { + return false; + } + } + else + { + return false; + } + } + public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch) throws JMSException { @@ -266,9 +303,21 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate } TxSelectBody body = _conn.getProtocolHandler().getMethodRegistry().createTxSelectBody(); - // TODO: Be aware of possible changes to parameter order as versions change. + _conn.getProtocolHandler().syncWrite(body.generateFrame(channelId), TxSelectOkBody.class); } + boolean useConfirms = (_confirmedPublishSupported || (!transacted && _confirmedPublishNonTransactionalSupported)) + && "all".equals(_conn.getSyncPublish()); + if(useConfirms) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Issuing ConfirmSelect for " + channelId); + } + ConfirmSelectBody body = new ConfirmSelectBody(false); + + _conn.getProtocolHandler().syncWrite(body.generateFrame(channelId), ConfirmSelectOkBody.class); + } } public void failoverPrep() @@ -340,7 +389,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate } catch (IllegalStateException e) { - if (!(e.getMessage().startsWith("Fail-over interupted no-op failover support"))) + if (!(e.getMessage().startsWith("Fail-over interrupted no-op failover support"))) { throw e; } @@ -424,6 +473,14 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate } + private boolean checkConfirmedPublishSupported() + { + FieldTable serverProperties = _conn.getProtocolHandler().getProtocolSession().getConnectionStartServerProperties(); + return serverProperties != null + && Boolean.parseBoolean(serverProperties.getString(ConnectionStartProperties.QPID_CONFIRMED_PUBLISH_SUPPORTED)); + + } + public boolean isMessageCompressionSupported() { return _messageCompressionSupported; @@ -433,4 +490,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate { return _addrSyntaxSupported; } + + public boolean isConfirmedPublishNonTransactionalSupported() + { + return _confirmedPublishNonTransactionalSupported; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 1d47ce9a07..f72d915c25 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -122,7 +122,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac ? System.getProperty("qpid.default_mandatory") : "false")); - private PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL; + private PublishMode _publishMode = PublishMode.ASYNC_PUBLISH_ALL; protected BasicMessageProducer(Logger logger,AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, AMQSession session, long producerId, Boolean immediate, Boolean mandatory) throws AMQException @@ -165,16 +165,16 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac // Support for deprecated option sync_persistence if (syncPub.equals("persistent") || _connection.getSyncPersistence()) { - publishMode = PublishMode.SYNC_PUBLISH_PERSISTENT; + _publishMode = PublishMode.SYNC_PUBLISH_PERSISTENT; } else if (syncPub.equals("all")) { - publishMode = PublishMode.SYNC_PUBLISH_ALL; + _publishMode = PublishMode.SYNC_PUBLISH_ALL; } if (_logger.isDebugEnabled()) { - _logger.debug("MessageProducer " + toString() + " using publish mode : " + publishMode); + _logger.debug("MessageProducer " + toString() + " using publish mode : " + _publishMode); } } @@ -720,12 +720,12 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac protected PublishMode getPublishMode() { - return publishMode; + return _publishMode; } protected void setPublishMode(PublishMode publishMode) { - this.publishMode = publishMode; + _publishMode = publishMode; } Logger getLogger() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java index 69d02566bf..e4c879aca8 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java @@ -32,14 +32,19 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; +import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.AMQMessageDelegate_0_8; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.QpidMessageProperties; import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.client.protocol.BlockingMethodFrameListener; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicAckBody; import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.BasicNackBody; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.CompositeAMQDataBlock; import org.apache.qpid.framing.ContentBody; @@ -211,9 +216,6 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer } - // TODO: This is a hacky way of getting the AMQP class-id for the Basic class - int classIfForBasic = getSession().getMethodRegistry().createBasicQosOkBody().getClazz(); - AMQFrame contentHeaderFrame = ContentHeaderBody.createAMQFrame(getChannelId(), contentHeaderProperties, size); @@ -232,7 +234,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer frames[0] = publishFrame; frames[1] = contentHeaderFrame; - CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames); + final CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames); try { @@ -246,7 +248,40 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer throw jmse; } - getConnection().getProtocolHandler().writeFrame(compositeFrame); + AMQConnectionDelegate_8_0 connectionDelegate80 = (AMQConnectionDelegate_8_0) (getConnection().getDelegate()); + + boolean useConfirms = getPublishMode() == PublishMode.SYNC_PUBLISH_ALL + && (connectionDelegate80.isConfirmedPublishSupported() + || (!getSession().isTransacted() && connectionDelegate80.isConfirmedPublishNonTransactionalSupported())); + + if(!useConfirms) + { + getConnection().getProtocolHandler().writeFrame(compositeFrame); + } + else + { + final PublishConfirmMessageListener frameListener = new PublishConfirmMessageListener(getChannelId()); + try + { + + getConnection().getProtocolHandler().writeCommandFrameAndWaitForReply(compositeFrame, + frameListener); + + if(frameListener.isRejected()) + { + throw new JMSException("The message was not accepted by the server (e.g. because the address was no longer valid)"); + } + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } + catch (FailoverException e) + { + throw new JMSAMQException("Fail-over interrupted send. Status of the send is uncertain.", e); + + } + } } /** @@ -290,7 +325,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer private int calculateContentBodyFrameCount(ByteBuffer payload) { - // we substract one from the total frame maximum size to account for the end of frame marker in a body frame + // we subtract one from the total frame maximum size to account for the end of frame marker in a body frame // (0xCE byte). int frameCount; if ((payload == null) || (payload.remaining() == 0)) @@ -313,4 +348,42 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer { return (AMQSession_0_8) super.getSession(); } + + private static class PublishConfirmMessageListener extends BlockingMethodFrameListener + { + private boolean _rejected; + + /** + * Creates a new method listener, that filters incoming method to just those that match the specified channel id. + * + * @param channelId The channel id to filter incoming methods with. + */ + public PublishConfirmMessageListener(final int channelId) + { + super(channelId); + } + + @Override + public boolean processMethod(final int channelId, final AMQMethodBody frame) + { + if (frame instanceof BasicAckBody) + { + return true; + } + else if (frame instanceof BasicNackBody) + { + _rejected = true; + return true; + } + else + { + return false; + } + } + + public boolean isRejected() + { + return _rejected; + } + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java index a69e808880..da17bedcfd 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java @@ -68,7 +68,7 @@ public class FailoverNoopSupport<T, E extends Exception> implements FailoverSupp } catch (FailoverException e) { - throw new IllegalStateException("Fail-over interupted no-op failover support. " + throw new IllegalStateException("Fail-over interrupted no-op failover support. " + "No-op support should only be used where the caller is certain fail-over cannot occur.", e); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java index ffe0baecd8..74bf9a54fd 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java @@ -45,7 +45,7 @@ import org.apache.qpid.client.AMQConnection; * <p> * Wrapping a synchronous method in a FailoverRetrySupport will have the effect that the operation will not be * started during fail-over, but be delayed until any current fail-over has completed. Should a fail-over process want - * to start whilst waiting for the synchrnous reply, the FailoverRetrySupport will detect this and rety the operation + * to start whilst waiting for the synchronous reply, the FailoverRetrySupport will detect this and retry the operation * until it succeeds. Synchronous methods are usually coordinated with a * {@link org.apache.qpid.client.protocol.BlockingMethodFrameListener} which is notified when a fail-over process wants * to start and throws a FailoverException in response to this. @@ -53,12 +53,6 @@ import org.apache.qpid.client.AMQConnection; * Wrapping an asynchronous method in a FailoverRetrySupport will have the effect that the operation will not be * started during fail-over, but be delayed until any current fail-over has completed. * <p> - * TODO Another continuation. Could use an interface Continuation (as described in other todos) - * Then have a wrapping continuation (this), which blocks on an arbitrary - * Condition or Latch (specified in constructor call), that this blocks on before calling the wrapped Continuation. - * Must work on Java 1.4, so check retrotranslator works on Lock/Condition or latch first. Argument and return type - * to match wrapped condition as type parameters. Rename to AsyncConditionalContinuation or something like that. - * <p> * TODO InterruptedException not handled well. */ public class FailoverRetrySupport<T, E extends Exception> implements FailoverSupport<T, E> diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java index e6eb2d814f..de2f2f52a9 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java @@ -53,6 +53,7 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher private static final Logger _logger = LoggerFactory.getLogger(ClientMethodDispatcherImpl.class); + private static interface DispatcherFactory { public ClientMethodDispatcherImpl createMethodDispatcher(AMQProtocolSession session); @@ -147,6 +148,13 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher return false; } + @Override + public boolean dispatchConfirmSelectOk(final ConfirmSelectOkBody confirmSelectOkBody, final int channelId) + throws AMQException + { + return false; + } + public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException { _basicCancelOkMethodHandler.methodReceived(_session, body, channelId); @@ -271,11 +279,19 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher throw new AMQMethodNotImplementedException(body); } + @Override public boolean dispatchBasicAck(BasicAckBody body, int channelId) throws AMQException { - throw new AMQMethodNotImplementedException(body); + return false; } + @Override + public boolean dispatchBasicNack(final BasicNackBody basicNackBody, final int channelId) + { + return false; + } + + public boolean dispatchBasicCancel(BasicCancelBody body, int channelId) throws AMQException { throw new AMQMethodNotImplementedException(body); @@ -400,6 +416,12 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher return false; } + @Override + public boolean dispatchConfirmSelect(final ConfirmSelectBody body, final int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException { _exchangeBoundOkMethodHandler.methodReceived(_session, body, channelId); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index bb98c0abbd..4886eabb90 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -661,7 +661,7 @@ public class AMQProtocolHandler implements ProtocolEngine * @param frame * @param listener the blocking listener. Note the calling thread will block. */ - public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, BlockingMethodFrameListener listener) + public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQDataBlock frame, BlockingMethodFrameListener listener) throws AMQException, FailoverException { return writeCommandFrameAndWaitForReply(frame, listener, DEFAULT_SYNC_TIMEOUT); @@ -674,7 +674,7 @@ public class AMQProtocolHandler implements ProtocolEngine * @param frame * @param listener the blocking listener. Note the calling thread will block. */ - public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, BlockingMethodFrameListener listener, + public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQDataBlock frame, BlockingMethodFrameListener listener, long timeout) throws AMQException, FailoverException { try diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java index 5048193cac..9bdc1dd889 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java @@ -209,6 +209,9 @@ public class ClientDecoder extends AMQDecoder<ClientMethodProcessor<? extends Cl case 0x003c0048: BasicGetEmptyBody.process(in, channelMethodProcessor); break; + case 0x003c0050: + BasicAckBody.process(in, channelMethodProcessor); + break; case 0x003c0065: if(!channelMethodProcessor.ignoreAllButCloseOk()) { @@ -221,6 +224,18 @@ public class ClientDecoder extends AMQDecoder<ClientMethodProcessor<? extends Cl channelMethodProcessor.receiveBasicRecoverSyncOk(); } break; + case 0x003c0078: + BasicNackBody.process(in, channelMethodProcessor); + break; + + // CONFIRM CLASS: + + case 0x0055000b: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveConfirmSelectOk(); + } + break; // TX_CLASS: diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java index 3b138ba278..32a45da60c 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java @@ -197,6 +197,15 @@ public class ServerDecoder extends AMQDecoder<ServerMethodProcessor<? extends Se case 0x003c006e: BasicRecoverSyncBody.process(in, channelMethodProcessor); break; + case 0x003c0078: + BasicNackBody.process(in, channelMethodProcessor); + break; + + // CONFIRM CLASS: + + case 0x0055000a: + ConfirmSelectBody.process(in, channelMethodProcessor); + break; // TX_CLASS: @@ -219,6 +228,7 @@ public class ServerDecoder extends AMQDecoder<ServerMethodProcessor<? extends Se } break; + default: throw newUnknownMethodException(classId, methodId, methodProcessor.getProtocolVersion()); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java index 68782231fe..01ca6f95f1 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java @@ -113,7 +113,7 @@ public class BasicAckBody extends AMQMethodBodyImpl implements EncodableAMQDataB } public static void process(final MarkableDataInput buffer, - final ServerChannelMethodProcessor dispatcher) throws IOException + final ChannelMethodProcessor dispatcher) throws IOException { long deliveryTag = buffer.readLong(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicNackBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicNackBody.java new file mode 100644 index 0000000000..33ccb10f39 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicNackBody.java @@ -0,0 +1,143 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* + * This file is auto-generated by Qpid Gentools v.0.1 - do not modify. + * Supported AMQP version: + * 8-0 + */ + +package org.apache.qpid.framing; + +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.qpid.AMQException; +import org.apache.qpid.codec.MarkableDataInput; + +public class BasicNackBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody +{ + + public static final int CLASS_ID = 60; + public static final int METHOD_ID = 120; + + // Fields declared in specification + private final long _deliveryTag; // [deliveryTag] + private final byte _bitfield0; // [multiple] + + // Constructor + public BasicNackBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException + { + _deliveryTag = buffer.readLong(); + _bitfield0 = buffer.readByte(); + } + + public BasicNackBody( + long deliveryTag, + boolean multiple, + boolean requeue + ) + { + _deliveryTag = deliveryTag; + byte bitfield0 = (byte)0; + if( multiple ) + { + bitfield0 = (byte) (((int) bitfield0) | 1); + + } + if( requeue ) + { + bitfield0 = (byte) (((int) bitfield0) | 2); + } + _bitfield0 = bitfield0; + } + + public int getClazz() + { + return CLASS_ID; + } + + public int getMethod() + { + return METHOD_ID; + } + + public final long getDeliveryTag() + { + return _deliveryTag; + } + + public final boolean getMultiple() + { + return (((int)(_bitfield0)) & 1) != 0; + } + + public final boolean getRequeue() + { + return (((int)(_bitfield0)) & 2 ) != 0; + } + + protected int getBodySize() + { + int size = 9; + return size; + } + + public void writeMethodPayload(DataOutput buffer) throws IOException + { + writeLong( buffer, _deliveryTag ); + writeBitfield( buffer, _bitfield0 ); + } + + public boolean execute(MethodDispatcher dispatcher, int channelId) throws AMQException + { + return dispatcher.dispatchBasicNack(this, channelId); + } + + public String toString() + { + StringBuilder buf = new StringBuilder("[BasicNackBodyImpl: "); + buf.append( "deliveryTag=" ); + buf.append( getDeliveryTag() ); + buf.append( ", " ); + buf.append( "multiple=" ); + buf.append( getMultiple() ); + buf.append( ", " ); + buf.append( "requeue=" ); + buf.append( getRequeue() ); + buf.append("]"); + return buf.toString(); + } + + public static void process(final MarkableDataInput buffer, + final ChannelMethodProcessor dispatcher) throws IOException + { + + long deliveryTag = buffer.readLong(); + byte bitfield = buffer.readByte(); + boolean multiple = (bitfield & 0x01) != 0; + boolean requeue = (bitfield & 0x02) != 0; + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveBasicNack(deliveryTag, multiple, requeue); + } + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java index 84cd1e13c2..75fbb15629 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java @@ -35,4 +35,8 @@ public interface ChannelMethodProcessor void receiveMessageHeader(BasicContentHeaderProperties properties, long bodySize); boolean ignoreAllButCloseOk(); + + void receiveBasicNack(long deliveryTag, boolean multiple, boolean requeue); + + void receiveBasicAck(long deliveryTag, boolean multiple); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientChannelMethodProcessor.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientChannelMethodProcessor.java index bef143e39b..289a284d6b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientChannelMethodProcessor.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientChannelMethodProcessor.java @@ -75,4 +75,5 @@ public interface ClientChannelMethodProcessor extends ChannelMethodProcessor void receiveTxRollbackOk(); + void receiveConfirmSelectOk(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientMethodDispatcher.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientMethodDispatcher.java index 97de0ac487..4f1b6b917e 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientMethodDispatcher.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientMethodDispatcher.java @@ -72,4 +72,6 @@ public interface ClientMethodDispatcher throws AMQException; boolean dispatchChannelAlert(ChannelAlertBody channelAlertBody, int channelId) throws AMQException; + + boolean dispatchConfirmSelectOk(ConfirmSelectOkBody confirmSelectOkBody, int channelId) throws AMQException; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ConfirmSelectBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ConfirmSelectBody.java new file mode 100644 index 0000000000..7f9e56caa6 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ConfirmSelectBody.java @@ -0,0 +1,105 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* + * This file is auto-generated by Qpid Gentools v.0.1 - do not modify. + * Supported AMQP version: + * 8-0 + */ + +package org.apache.qpid.framing; + +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.qpid.AMQException; +import org.apache.qpid.codec.MarkableDataInput; + +public class ConfirmSelectBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody +{ + + public static final int CLASS_ID = 85; + public static final int METHOD_ID = 10; + + // Fields declared in specification + private final boolean _nowait; // [active] + + // Constructor + public ConfirmSelectBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException + { + _nowait = (buffer.readByte() & 0x01) == 0x01; + } + + public ConfirmSelectBody(boolean nowait) + { + _nowait = nowait; + } + + public int getClazz() + { + return CLASS_ID; + } + + public int getMethod() + { + return METHOD_ID; + } + + public final boolean getNowait() + { + return _nowait; + } + + protected int getBodySize() + { + return 1; + } + + public void writeMethodPayload(DataOutput buffer) throws IOException + { + writeBitfield( buffer, _nowait ? (byte)1 : (byte)0 ); + } + + public boolean execute(MethodDispatcher dispatcher, int channelId) throws AMQException + { + return dispatcher.dispatchConfirmSelect(this, channelId); + } + + public String toString() + { + StringBuilder buf = new StringBuilder("[ConfirmSelectBody: "); + buf.append( "active=" ); + buf.append( getNowait() ); + buf.append("]"); + return buf.toString(); + } + + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) + throws IOException + { + boolean nowait = (buffer.readByte() & 0x01) == 0x01; + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveConfirmSelect(nowait); + } + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ConfirmSelectOkBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ConfirmSelectOkBody.java new file mode 100644 index 0000000000..d0e3bd093d --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ConfirmSelectOkBody.java @@ -0,0 +1,77 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* + * This file is auto-generated by Qpid Gentools v.0.1 - do not modify. + * Supported AMQP version: + * 8-0 + */ + +package org.apache.qpid.framing; + +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.qpid.AMQException; + +public class ConfirmSelectOkBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody +{ + + public static final int CLASS_ID = 85; + public static final int METHOD_ID = 11; + + public static final ConfirmSelectOkBody INSTANCE = new ConfirmSelectOkBody(); + + private ConfirmSelectOkBody() + { + } + + public int getClazz() + { + return CLASS_ID; + } + + public int getMethod() + { + return METHOD_ID; + } + + + protected int getBodySize() + { + return 0; + } + + public void writeMethodPayload(DataOutput buffer) throws IOException + { + } + + public boolean execute(MethodDispatcher dispatcher, int channelId) throws AMQException + { + return dispatcher.dispatchConfirmSelectOk(this, channelId); + } + + public String toString() + { + return "[ConfirmSelectOkBody]"; + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java index 19b091a359..0d160a73d5 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java @@ -358,6 +358,12 @@ public class FrameCreatingMethodProcessor implements MethodProcessor<FrameCreati } @Override + public void receiveConfirmSelectOk() + { + _processedMethods.add(new AMQFrame(_channelId, ConfirmSelectOkBody.INSTANCE)); + } + + @Override public void receiveAccessRequest(final AMQShortString realm, final boolean exclusive, final boolean passive, @@ -564,6 +570,12 @@ public class FrameCreatingMethodProcessor implements MethodProcessor<FrameCreati } @Override + public void receiveConfirmSelect(final boolean nowait) + { + _processedMethods.add(new AMQFrame(_channelId, new ConfirmSelectBody(nowait))); + } + + @Override public void receiveChannelFlow(final boolean active) { _processedMethods.add(new AMQFrame(_channelId, new ChannelFlowBody(active))); @@ -607,5 +619,11 @@ public class FrameCreatingMethodProcessor implements MethodProcessor<FrameCreati { return false; } + + @Override + public void receiveBasicNack(final long deliveryTag, final boolean multiple, final boolean requeue) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicNackBody(deliveryTag, multiple, requeue))); + } } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodDispatcher.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodDispatcher.java index 03b122a7a7..a485397a5e 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodDispatcher.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodDispatcher.java @@ -32,4 +32,6 @@ package org.apache.qpid.framing; public interface MethodDispatcher extends ClientMethodDispatcher, ServerMethodDispatcher { + + boolean dispatchBasicNack(BasicNackBody basicNackBody, int channelId); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java index 89b75c2d2f..6d43accc96 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java @@ -89,4 +89,5 @@ public interface ServerChannelMethodProcessor extends ChannelMethodProcessor void receiveTxRollback(); + void receiveConfirmSelect(boolean nowait); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerMethodDispatcher.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerMethodDispatcher.java index f4ab67dad4..d3961a1a59 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerMethodDispatcher.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerMethodDispatcher.java @@ -68,4 +68,6 @@ public interface ServerMethodDispatcher boolean dispatchQueueUnbind(QueueUnbindBody queueUnbindBody, int channelId) throws AMQException; boolean dispatchBasicRecoverSync(BasicRecoverSyncBody basicRecoverSyncBody, int channelId) throws AMQException; + + boolean dispatchConfirmSelect(ConfirmSelectBody confirmSelectBody, int channelId) throws AMQException; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java index 8f1a1d0be0..4f88fe7071 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java @@ -60,6 +60,8 @@ public class ConnectionStartProperties public static final String SESSION_FLOW = "qpid.session_flow"; + public static final String QPID_CONFIRMED_PUBLISH_SUPPORTED = "qpid.confirmed_publish_supported"; + public static int _pid; public static final String _platformInfo; diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/client/SyncPublishTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/client/SyncPublishTest.java new file mode 100644 index 0000000000..2985efe9b5 --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/client/SyncPublishTest.java @@ -0,0 +1,131 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client; + +import java.util.HashMap; +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TemporaryQueue; + +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +public class SyncPublishTest extends QpidBrokerTestCase +{ + private Connection _connection; + + @Override + public void setUp() throws Exception + { + + super.setUp(); + Map<String, String> options = new HashMap<>(); + options.put(ConnectionURL.OPTIONS_SYNC_PUBLISH, "all"); + _connection = getConnectionWithOptions(options); + } + + @Override + public void tearDown() throws Exception + { + _connection.close(); + super.tearDown(); + } + + public void testAnonPublisherUnknownDestination() throws Exception + { + Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(null); + try + { + producer.send(session.createQueue("direct://amq.direct/unknown/unknown"),session.createTextMessage("hello")); + fail("Send to unknown destination should result in error"); + } + catch (JMSException e) + { + // pass + } + } + + + public void testAnonPublisherUnknownDestinationTransactional() throws Exception + { + Session session = _connection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(null); + try + { + producer.send(session.createQueue("direct://amq.direct/unknown/unknown"),session.createTextMessage("hello")); + fail("Send to unknown destination should result in error"); + } + catch (JMSException e) + { + // pass + } + try + { + session.commit(); + } + catch (JMSException e) + { + fail("session should commit successfully even though the message was not sent"); + } + + } + + public void testQueueRemovedAfterConsumerCreated() throws JMSException + { + Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + TemporaryQueue queue = session.createTemporaryQueue(); + MessageProducer producer = session.createProducer(queue); + try + { + producer.send(session.createTextMessage("hello")); + } + catch (JMSException e) + { + fail("Send to temporary queue should succeed"); + } + + try + { + queue.delete(); + } + catch (JMSException e) + { + fail("temporary queue should be deletable"); + } + + try + { + producer.send(session.createTextMessage("hello")); + fail("Send to deleted temporary queue should not succeed"); + } + catch (JMSException e) + { + // pass + } + + + } +} diff --git a/qpid/java/test-profiles/Java010Excludes b/qpid/java/test-profiles/Java010Excludes index ce7c5fa151..136bc7918f 100755 --- a/qpid/java/test-profiles/Java010Excludes +++ b/qpid/java/test-profiles/Java010Excludes @@ -76,3 +76,5 @@ org.apache.qpid.systest.management.jmx.QueueManagementTest#testExclusiveQueueHas org.apache.qpid.test.unit.client.AMQSessionTest#testQueueDepthForQueueThatDoesNotExistLegacyBehaviour_08_091 org.apache.qpid.client.prefetch.PrefetchBehaviourTest#testPrefetchWindowExpandsOnReceiveTransaction + +org.apache.qpid.client.SyncPublishTest#* |
