diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java | 177 |
1 files changed, 155 insertions, 22 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 8ab23a240e..3097b33da3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -21,12 +21,18 @@ package org.apache.qpid.client; +import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_FAILURE; +import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD; +import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_FAILURE; +import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUndeliveredException; import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.failover.FailoverNoopSupport; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.failover.FailoverRetrySupport; import org.apache.qpid.client.message.AMQMessageDelegateFactory; @@ -58,6 +64,27 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe /** Used for debugging. */ private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class); + public static final String QPID_SYNC_AFTER_CLIENT_ACK = "qpid.sync_after_client.ack"; + + private final boolean _syncAfterClientAck = + Boolean.parseBoolean(System.getProperty(QPID_SYNC_AFTER_CLIENT_ACK, "true")); + + /** + * The period to wait while flow controlled before sending a log message confirming that the session is still + * waiting on flow control being revoked + */ + private final long _flowControlWaitPeriod = Long.getLong(QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD, + DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD); + + /** + * The period to wait while flow controlled before declaring a failure + */ + private final long _flowControlWaitFailure = Long.getLong(QPID_FLOW_CONTROL_WAIT_FAILURE, + DEFAULT_FLOW_CONTROL_WAIT_FAILURE); + + /** Flow control */ + private FlowControlIndicator _flowControl = new FlowControlIndicator(); + /** * Creates a new session on a connection. * @@ -98,8 +125,9 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe return getProtocolHandler().getProtocolVersion(); } - protected void acknowledgeImpl() + protected void acknowledgeImpl() throws JMSException { + boolean syncRequired = false; while (true) { Long tag = getUnacknowledgedMessageTags().poll(); @@ -109,6 +137,19 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe } acknowledgeMessage(tag, false); + syncRequired = true; + } + + try + { + if (syncRequired && _syncAfterClientAck) + { + sync(); + } + } + catch (AMQException a) + { + throw new JMSAMQException("Failed to sync after acknowledge", a); } } @@ -359,9 +400,9 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe } } - @Override public void sendConsume(BasicMessageConsumer_0_8 consumer, + @Override + public void sendConsume(BasicMessageConsumer_0_8 consumer, AMQShortString queueName, - AMQProtocolHandler protocolHandler, boolean nowait, int tag) throws AMQException, FailoverException { @@ -380,27 +421,29 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe if (nowait) { - protocolHandler.writeFrame(jmsConsume); + getProtocolHandler().writeFrame(jmsConsume); } else { - protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class); + getProtocolHandler().syncWrite(jmsConsume, BasicConsumeOkBody.class); } } - public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler, - final boolean nowait) throws AMQException, FailoverException + @Override + public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait, + boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException { + //The 'noWait' parameter is only used on the 0-10 path, it is ignored on the 0-8/0-9/0-9-1 path + ExchangeDeclareBody body = getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type, name.toString().startsWith("amq."), - false,false,false,false,null); + durable, autoDelete, internal, false, null); AMQFrame exchangeDeclare = body.generateFrame(getChannelId()); - protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); + getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); } - public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean nowait, boolean passive) throws AMQException, FailoverException + private void sendQueueDeclare(final AMQDestination amqd, boolean passive) throws AMQException, FailoverException { QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(), @@ -414,7 +457,32 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe AMQFrame queueDeclare = body.generateFrame(getChannelId()); - protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class); + getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class); + } + + @Override + protected AMQShortString declareQueue(final AMQDestination amqd, final boolean noLocal, + final boolean nowait, final boolean passive) throws AMQException + { + //The 'noWait' parameter is only used on the 0-10 path, it is ignored on the 0-8/0-9/0-9-1 path + + final AMQProtocolHandler protocolHandler = getProtocolHandler(); + return new FailoverNoopSupport<AMQShortString, AMQException>( + new FailoverProtectedOperation<AMQShortString, AMQException>() + { + public AMQShortString execute() throws AMQException, FailoverException + { + // Generate the queue name if the destination indicates that a client generated name is to be used. + if (amqd.isNameRequired()) + { + amqd.setQueueName(protocolHandler.generateQueueName()); + } + + sendQueueDeclare(amqd, passive); + + return amqd.getAMQQueueName(); + } + }, getAMQConnection()).execute(); } public void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException @@ -440,10 +508,8 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, final FieldTable arguments, final boolean noConsume, final boolean autoClose) throws JMSException { - - final AMQProtocolHandler protocolHandler = getProtocolHandler(); return new BasicMessageConsumer_0_8(getChannelId(), getAMQConnection(), destination, messageSelector, noLocal, - getMessageFactoryRegistry(),this, protocolHandler, arguments, prefetchHigh, prefetchLow, + getMessageFactoryRegistry(),this, arguments, prefetchHigh, prefetchLow, exclusive, getAcknowledgeMode(), noConsume, autoClose); } @@ -629,12 +695,11 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe declareExchange(new AMQShortString("amq.direct"), new AMQShortString("direct"), false); } - public void handleAddressBasedDestination(AMQDestination dest, + public void resolveAddress(AMQDestination dest, boolean isConsumer, - boolean noLocal, - boolean noWait) throws AMQException + boolean noLocal) throws AMQException { - throw new UnsupportedOperationException("The new addressing based sytanx is " + throw new UnsupportedOperationException("The new addressing based syntax is " + "not supported for AMQP 0-8/0-9 versions"); } @@ -662,14 +727,23 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe queueName == null ? null : new AMQShortString(queueName), bindingKey == null ? null : new AMQShortString(bindingKey)); } - + + private AMQProtocolHandler getProtocolHandler() + { + return getAMQConnection().getProtocolHandler(); + } + + public MethodRegistry getMethodRegistry() + { + MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry(); + return methodRegistry; + } public AMQException getLastException() { // if the Connection has closed then we should throw any exception that // has occurred that we were not waiting for - AMQStateManager manager = getAMQConnection().getProtocolHandler() - .getStateManager(); + AMQStateManager manager = getProtocolHandler().getStateManager(); Exception e = manager.getLastException(); if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED) @@ -693,6 +767,49 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe } } + public boolean isFlowBlocked() + { + synchronized (_flowControl) + { + return !_flowControl.getFlowControl(); + } + } + + public void setFlowControl(final boolean active) + { + _flowControl.setFlowControl(active); + if (_logger.isInfoEnabled()) + { + _logger.info("Broker enforced flow control " + (active ? "no longer in effect" : "has been enforced")); + } + } + + void checkFlowControl() throws InterruptedException, JMSException + { + long expiryTime = 0L; + synchronized (_flowControl) + { + while (!_flowControl.getFlowControl() && + (expiryTime == 0L ? (expiryTime = System.currentTimeMillis() + _flowControlWaitFailure) + : expiryTime) >= System.currentTimeMillis() ) + { + + _flowControl.wait(_flowControlWaitPeriod); + if (_logger.isInfoEnabled()) + { + _logger.info("Message send delayed by " + (System.currentTimeMillis() + _flowControlWaitFailure - expiryTime)/1000 + "s due to broker enforced flow control"); + } + } + if(!_flowControl.getFlowControl()) + { + _logger.error("Message send failed due to timeout waiting on broker enforced flow control"); + throw new JMSException("Unable to send message for " + _flowControlWaitFailure /1000 + " seconds due to broker enforced flow control"); + } + } + } + + + public abstract static class DestinationCache<T extends AMQDestination> { private final Map<AMQShortString, Map<AMQShortString, T>> cache = new HashMap<AMQShortString, Map<AMQShortString, T>>(); @@ -740,6 +857,22 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe } } + private static final class FlowControlIndicator + { + private volatile boolean _flowControl = true; + + public synchronized void setFlowControl(boolean flowControl) + { + _flowControl = flowControl; + notify(); + } + + public boolean getFlowControl() + { + return _flowControl; + } + } + private final TopicDestinationCache _topicDestinationCache = new TopicDestinationCache(); private final QueueDestinationCache _queueDestinationCache = new QueueDestinationCache(); |