summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java177
1 files changed, 155 insertions, 22 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 8ab23a240e..3097b33da3 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -21,12 +21,18 @@
package org.apache.qpid.client;
+import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_FAILURE;
+import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD;
+import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_FAILURE;
+import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUndeliveredException;
import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.failover.FailoverNoopSupport;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.failover.FailoverRetrySupport;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
@@ -58,6 +64,27 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
+ public static final String QPID_SYNC_AFTER_CLIENT_ACK = "qpid.sync_after_client.ack";
+
+ private final boolean _syncAfterClientAck =
+ Boolean.parseBoolean(System.getProperty(QPID_SYNC_AFTER_CLIENT_ACK, "true"));
+
+ /**
+ * The period to wait while flow controlled before sending a log message confirming that the session is still
+ * waiting on flow control being revoked
+ */
+ private final long _flowControlWaitPeriod = Long.getLong(QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD,
+ DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD);
+
+ /**
+ * The period to wait while flow controlled before declaring a failure
+ */
+ private final long _flowControlWaitFailure = Long.getLong(QPID_FLOW_CONTROL_WAIT_FAILURE,
+ DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
+
+ /** Flow control */
+ private FlowControlIndicator _flowControl = new FlowControlIndicator();
+
/**
* Creates a new session on a connection.
*
@@ -98,8 +125,9 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
return getProtocolHandler().getProtocolVersion();
}
- protected void acknowledgeImpl()
+ protected void acknowledgeImpl() throws JMSException
{
+ boolean syncRequired = false;
while (true)
{
Long tag = getUnacknowledgedMessageTags().poll();
@@ -109,6 +137,19 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
}
acknowledgeMessage(tag, false);
+ syncRequired = true;
+ }
+
+ try
+ {
+ if (syncRequired && _syncAfterClientAck)
+ {
+ sync();
+ }
+ }
+ catch (AMQException a)
+ {
+ throw new JMSAMQException("Failed to sync after acknowledge", a);
}
}
@@ -359,9 +400,9 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
}
}
- @Override public void sendConsume(BasicMessageConsumer_0_8 consumer,
+ @Override
+ public void sendConsume(BasicMessageConsumer_0_8 consumer,
AMQShortString queueName,
- AMQProtocolHandler protocolHandler,
boolean nowait,
int tag) throws AMQException, FailoverException
{
@@ -380,27 +421,29 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
if (nowait)
{
- protocolHandler.writeFrame(jmsConsume);
+ getProtocolHandler().writeFrame(jmsConsume);
}
else
{
- protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class);
+ getProtocolHandler().syncWrite(jmsConsume, BasicConsumeOkBody.class);
}
}
- public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler,
- final boolean nowait) throws AMQException, FailoverException
+ @Override
+ public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait,
+ boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException
{
+ //The 'noWait' parameter is only used on the 0-10 path, it is ignored on the 0-8/0-9/0-9-1 path
+
ExchangeDeclareBody body = getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type,
name.toString().startsWith("amq."),
- false,false,false,false,null);
+ durable, autoDelete, internal, false, null);
AMQFrame exchangeDeclare = body.generateFrame(getChannelId());
- protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
+ getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
}
- public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean nowait, boolean passive) throws AMQException, FailoverException
+ private void sendQueueDeclare(final AMQDestination amqd, boolean passive) throws AMQException, FailoverException
{
QueueDeclareBody body =
getMethodRegistry().createQueueDeclareBody(getTicket(),
@@ -414,7 +457,32 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
AMQFrame queueDeclare = body.generateFrame(getChannelId());
- protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class);
+ getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
+ }
+
+ @Override
+ protected AMQShortString declareQueue(final AMQDestination amqd, final boolean noLocal,
+ final boolean nowait, final boolean passive) throws AMQException
+ {
+ //The 'noWait' parameter is only used on the 0-10 path, it is ignored on the 0-8/0-9/0-9-1 path
+
+ final AMQProtocolHandler protocolHandler = getProtocolHandler();
+ return new FailoverNoopSupport<AMQShortString, AMQException>(
+ new FailoverProtectedOperation<AMQShortString, AMQException>()
+ {
+ public AMQShortString execute() throws AMQException, FailoverException
+ {
+ // Generate the queue name if the destination indicates that a client generated name is to be used.
+ if (amqd.isNameRequired())
+ {
+ amqd.setQueueName(protocolHandler.generateQueueName());
+ }
+
+ sendQueueDeclare(amqd, passive);
+
+ return amqd.getAMQQueueName();
+ }
+ }, getAMQConnection()).execute();
}
public void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException
@@ -440,10 +508,8 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, final FieldTable arguments,
final boolean noConsume, final boolean autoClose) throws JMSException
{
-
- final AMQProtocolHandler protocolHandler = getProtocolHandler();
return new BasicMessageConsumer_0_8(getChannelId(), getAMQConnection(), destination, messageSelector, noLocal,
- getMessageFactoryRegistry(),this, protocolHandler, arguments, prefetchHigh, prefetchLow,
+ getMessageFactoryRegistry(),this, arguments, prefetchHigh, prefetchLow,
exclusive, getAcknowledgeMode(), noConsume, autoClose);
}
@@ -629,12 +695,11 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
declareExchange(new AMQShortString("amq.direct"), new AMQShortString("direct"), false);
}
- public void handleAddressBasedDestination(AMQDestination dest,
+ public void resolveAddress(AMQDestination dest,
boolean isConsumer,
- boolean noLocal,
- boolean noWait) throws AMQException
+ boolean noLocal) throws AMQException
{
- throw new UnsupportedOperationException("The new addressing based sytanx is "
+ throw new UnsupportedOperationException("The new addressing based syntax is "
+ "not supported for AMQP 0-8/0-9 versions");
}
@@ -662,14 +727,23 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
queueName == null ? null : new AMQShortString(queueName),
bindingKey == null ? null : new AMQShortString(bindingKey));
}
-
+
+ private AMQProtocolHandler getProtocolHandler()
+ {
+ return getAMQConnection().getProtocolHandler();
+ }
+
+ public MethodRegistry getMethodRegistry()
+ {
+ MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry();
+ return methodRegistry;
+ }
public AMQException getLastException()
{
// if the Connection has closed then we should throw any exception that
// has occurred that we were not waiting for
- AMQStateManager manager = getAMQConnection().getProtocolHandler()
- .getStateManager();
+ AMQStateManager manager = getProtocolHandler().getStateManager();
Exception e = manager.getLastException();
if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED)
@@ -693,6 +767,49 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
}
}
+ public boolean isFlowBlocked()
+ {
+ synchronized (_flowControl)
+ {
+ return !_flowControl.getFlowControl();
+ }
+ }
+
+ public void setFlowControl(final boolean active)
+ {
+ _flowControl.setFlowControl(active);
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Broker enforced flow control " + (active ? "no longer in effect" : "has been enforced"));
+ }
+ }
+
+ void checkFlowControl() throws InterruptedException, JMSException
+ {
+ long expiryTime = 0L;
+ synchronized (_flowControl)
+ {
+ while (!_flowControl.getFlowControl() &&
+ (expiryTime == 0L ? (expiryTime = System.currentTimeMillis() + _flowControlWaitFailure)
+ : expiryTime) >= System.currentTimeMillis() )
+ {
+
+ _flowControl.wait(_flowControlWaitPeriod);
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Message send delayed by " + (System.currentTimeMillis() + _flowControlWaitFailure - expiryTime)/1000 + "s due to broker enforced flow control");
+ }
+ }
+ if(!_flowControl.getFlowControl())
+ {
+ _logger.error("Message send failed due to timeout waiting on broker enforced flow control");
+ throw new JMSException("Unable to send message for " + _flowControlWaitFailure /1000 + " seconds due to broker enforced flow control");
+ }
+ }
+ }
+
+
+
public abstract static class DestinationCache<T extends AMQDestination>
{
private final Map<AMQShortString, Map<AMQShortString, T>> cache = new HashMap<AMQShortString, Map<AMQShortString, T>>();
@@ -740,6 +857,22 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
}
}
+ private static final class FlowControlIndicator
+ {
+ private volatile boolean _flowControl = true;
+
+ public synchronized void setFlowControl(boolean flowControl)
+ {
+ _flowControl = flowControl;
+ notify();
+ }
+
+ public boolean getFlowControl()
+ {
+ return _flowControl;
+ }
+ }
+
private final TopicDestinationCache _topicDestinationCache = new TopicDestinationCache();
private final QueueDestinationCache _queueDestinationCache = new QueueDestinationCache();