summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java225
1 files changed, 54 insertions, 171 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 1468e90c4e..91a6389214 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -20,11 +20,6 @@
*/
package org.apache.qpid.client;
-import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_FAILURE;
-import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD;
-import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_FAILURE;
-import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,6 +35,7 @@ import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.failover.FailoverRetrySupport;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.message.AMQPEncodedMapMessage;
+import org.apache.qpid.client.message.AMQPEncodedListMessage;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.CloseConsumerMessage;
import org.apache.qpid.client.message.JMSBytesMessage;
@@ -49,14 +45,13 @@ import org.apache.qpid.client.message.JMSStreamMessage;
import org.apache.qpid.client.message.JMSTextMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.jms.Session;
+import org.apache.qpid.jms.ListMessage;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.SessionException;
@@ -122,27 +117,16 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
/** Immediate message prefetch default. */
public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
- /**
- * The period to wait while flow controlled before sending a log message confirming that the session is still
- * waiting on flow control being revoked
- */
- private final long _flowControlWaitPeriod = Long.getLong(QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD,
- DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD);
-
- /**
- * The period to wait while flow controlled before declaring a failure
- */
- private final long _flowControlWaitFailure = Long.getLong(QPID_FLOW_CONTROL_WAIT_FAILURE,
- DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
-
private final boolean _delareQueues =
- Boolean.parseBoolean(System.getProperty("qpid.declare_queues", "true"));
+ Boolean.parseBoolean(System.getProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME, "true"));
private final boolean _declareExchanges =
- Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", "true"));
+ Boolean.parseBoolean(System.getProperty(ClientProperties.QPID_DECLARE_EXCHANGES_PROP_NAME, "true"));
private final boolean _useAMQPEncodedMapMessage;
+ private final boolean _useAMQPEncodedStreamMessage;
+
/**
* Flag indicating to start dispatcher as a daemon thread
*/
@@ -265,11 +249,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
/** Has failover occured on this session with outstanding actions to commit? */
private boolean _failedOverDirty;
- /** Flow control */
- private FlowControlIndicator _flowControl = new FlowControlIndicator();
-
-
-
/** Holds the highest received delivery tag. */
protected AtomicLong getHighestDeliveryTag()
{
@@ -408,22 +387,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
}
- private static final class FlowControlIndicator
- {
- private volatile boolean _flowControl = true;
-
- public synchronized void setFlowControl(boolean flowControl)
- {
- _flowControl = flowControl;
- notify();
- }
-
- public boolean getFlowControl()
- {
- return _flowControl;
- }
- }
-
/**
* Creates a new session on a connection.
*
@@ -439,6 +402,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
{
_useAMQPEncodedMapMessage = con == null ? true : !con.isUseLegacyMapMessageFormat();
+ _useAMQPEncodedStreamMessage = con == null ? false : !con.isUseLegacyStreamMessageFormat();
_strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT));
_strictAMQPFATAL =
Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT));
@@ -649,12 +613,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
*/
public abstract void acknowledgeMessage(long deliveryTag, boolean multiple);
- public MethodRegistry getMethodRegistry()
- {
- MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry();
- return methodRegistry;
- }
-
/**
* Binds the named queue, with the specified routing key, to the named exchange.
*
@@ -1041,12 +999,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
try
{
- handleAddressBasedDestination(dest,false,noLocal,true);
+ resolveAddress(dest,false,noLocal);
if (dest.getAddressType() != AMQDestination.TOPIC_TYPE)
{
throw new JMSException("Durable subscribers can only be created for Topics");
}
- dest.getSourceNode().setDurable(true);
}
catch(AMQException e)
{
@@ -1158,6 +1115,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
}
+ public ListMessage createListMessage() throws JMSException
+ {
+ checkNotClosed();
+ AMQPEncodedListMessage msg = new AMQPEncodedListMessage(getMessageDelegateFactory());
+ msg.setAMQSession(this);
+ return msg;
+ }
+
public MapMessage createMapMessage() throws JMSException
{
checkNotClosed();
@@ -1400,17 +1365,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public StreamMessage createStreamMessage() throws JMSException
{
- // This method needs to be improved. Throwables only arrive here from the mina : exceptionRecived
- // calls through connection.closeAllSessions which is also called by the public connection.close()
- // with a null cause
- // When we are closing the Session due to a protocol session error we simply create a new AMQException
- // with the correct error code and text this is cleary WRONG as the instanceof check below will fail.
- // We need to determin here if the connection should be
-
- synchronized (getFailoverMutex())
+ checkNotClosed();
+ if (_useAMQPEncodedStreamMessage)
+ {
+ AMQPEncodedListMessage msg = new AMQPEncodedListMessage(getMessageDelegateFactory());
+ msg.setAMQSession(this);
+ return msg;
+ }
+ else
{
- checkNotClosed();
-
JMSStreamMessage msg = new JMSStreamMessage(getMessageDelegateFactory());
msg.setAMQSession(this);
return msg;
@@ -1550,7 +1513,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public void declareExchange(AMQShortString name, AMQShortString type, boolean nowait) throws AMQException
{
- declareExchange(name, type, getProtocolHandler(), nowait);
+ declareExchange(name, type, nowait, false, false, false);
}
abstract public void sync() throws AMQException;
@@ -1690,8 +1653,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
throws
AMQException
{
- AMQProtocolHandler protocolHandler = getProtocolHandler();
- declareExchange(amqd, protocolHandler, false);
+ declareExchange(amqd, false);
AMQShortString queueName = declareQueue(amqd, false);
bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(), amqd);
}
@@ -2582,11 +2544,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
/**
* Register to consume from the queue.
- *
* @param queueName
*/
- private void consumeFromQueue(C consumer, AMQShortString queueName,
- AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException, FailoverException
+ private void consumeFromQueue(C consumer, AMQShortString queueName, boolean nowait) throws AMQException, FailoverException
{
int tagId = _nextTag++;
@@ -2603,7 +2563,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
try
{
- sendConsume(consumer, queueName, protocolHandler, nowait, tagId);
+ sendConsume(consumer, queueName, nowait, tagId);
}
catch (AMQException e)
{
@@ -2614,7 +2574,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
public abstract void sendConsume(C consumer, AMQShortString queueName,
- AMQProtocolHandler protocolHandler, boolean nowait, int tag) throws AMQException, FailoverException;
+ boolean nowait, int tag) throws AMQException, FailoverException;
private P createProducerImpl(final Destination destination, final Boolean mandatory, final Boolean immediate)
throws JMSException
@@ -2648,9 +2608,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public abstract P createMessageProducer(final Destination destination, final Boolean mandatory,
final Boolean immediate, final long producerId) throws JMSException;
- private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException
+ private void declareExchange(AMQDestination amqd, boolean nowait) throws AMQException
{
- declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait);
+ declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), nowait, amqd.isExchangeDurable(),
+ amqd.isExchangeAutoDelete(), amqd.isExchangeInternal());
}
/**
@@ -2707,33 +2668,29 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
*
* @param name The name of the exchange to declare.
* @param type The type of the exchange to declare.
- * @param protocolHandler The protocol handler to process the communication through.
* @param nowait
- *
+ * @param durable
+ * @param autoDelete
+ * @param internal
* @throws AMQException If the exchange cannot be declared for any reason.
* @todo Be aware of possible changes to parameter order as versions change.
*/
private void declareExchange(final AMQShortString name, final AMQShortString type,
- final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException
+ final boolean nowait, final boolean durable,
+ final boolean autoDelete, final boolean internal) throws AMQException
{
new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
{
public Object execute() throws AMQException, FailoverException
{
- sendExchangeDeclare(name, type, protocolHandler, nowait);
+ sendExchangeDeclare(name, type, nowait, durable, autoDelete, internal);
return null;
}
}, _connection).execute();
}
- public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler,
- final boolean nowait) throws AMQException, FailoverException;
-
-
- void declareQueuePassive(AMQDestination queue) throws AMQException
- {
- declareQueue(queue,false,false,true);
- }
+ public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait,
+ boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException;
/**
* Declares a queue for a JMS destination.
@@ -2768,31 +2725,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
return declareQueue(amqd, noLocal, nowait, false);
}
- protected AMQShortString declareQueue(final AMQDestination amqd,
- final boolean noLocal, final boolean nowait, final boolean passive)
- throws AMQException
- {
- final AMQProtocolHandler protocolHandler = getProtocolHandler();
- return new FailoverNoopSupport<AMQShortString, AMQException>(
- new FailoverProtectedOperation<AMQShortString, AMQException>()
- {
- public AMQShortString execute() throws AMQException, FailoverException
- {
- // Generate the queue name if the destination indicates that a client generated name is to be used.
- if (amqd.isNameRequired())
- {
- amqd.setQueueName(protocolHandler.generateQueueName());
- }
-
- sendQueueDeclare(amqd, protocolHandler, nowait, passive);
-
- return amqd.getAMQQueueName();
- }
- }, _connection).execute();
- }
-
- public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean nowait, boolean passive) throws AMQException, FailoverException;
+ protected abstract AMQShortString declareQueue(final AMQDestination amqd,
+ final boolean noLocal, final boolean nowait, final boolean passive) throws AMQException;
/**
* Undeclares the specified queue.
@@ -2845,21 +2779,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
return ++_nextProducerId;
}
- protected AMQProtocolHandler getProtocolHandler()
- {
- return _connection.getProtocolHandler();
- }
-
- public byte getProtocolMajorVersion()
- {
- return getProtocolHandler().getProtocolMajorVersion();
- }
-
- public byte getProtocolMinorVersion()
- {
- return getProtocolHandler().getProtocolMinorVersion();
- }
-
protected boolean hasMessageListeners()
{
return _hasMessageListeners;
@@ -2918,17 +2837,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
AMQDestination amqd = consumer.getDestination();
- AMQProtocolHandler protocolHandler = getProtocolHandler();
-
if (amqd.getDestSyntax() == DestSyntax.ADDR)
{
- handleAddressBasedDestination(amqd,true,consumer.isNoLocal(),nowait);
+ resolveAddress(amqd,true,consumer.isNoLocal());
}
else
{
if (_declareExchanges)
{
- declareExchange(amqd, protocolHandler, nowait);
+ declareExchange(amqd, nowait);
}
if (_delareQueues || amqd.isNameRequired())
@@ -2973,7 +2890,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
try
{
- consumeFromQueue(consumer, queueName, protocolHandler, nowait);
+ consumeFromQueue(consumer, queueName, nowait);
}
catch (FailoverException e)
{
@@ -2981,10 +2898,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
}
- public abstract void handleAddressBasedDestination(AMQDestination dest,
+ public abstract void resolveAddress(AMQDestination dest,
boolean isConsumer,
- boolean noLocal,
- boolean noWait) throws AMQException;
+ boolean noLocal) throws AMQException;
private void registerProducer(long producerId, MessageProducer producer)
{
@@ -3141,47 +3057,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
_ticket = ticket;
}
- public boolean isFlowBlocked()
- {
- synchronized (_flowControl)
- {
- return !_flowControl.getFlowControl();
- }
- }
-
- public void setFlowControl(final boolean active)
- {
- _flowControl.setFlowControl(active);
- if (_logger.isInfoEnabled())
- {
- _logger.info("Broker enforced flow control " + (active ? "no longer in effect" : "has been enforced"));
- }
- }
-
- public void checkFlowControl() throws InterruptedException, JMSException
- {
- long expiryTime = 0L;
- synchronized (_flowControl)
- {
- while (!_flowControl.getFlowControl() &&
- (expiryTime == 0L ? (expiryTime = System.currentTimeMillis() + _flowControlWaitFailure)
- : expiryTime) >= System.currentTimeMillis() )
- {
-
- _flowControl.wait(_flowControlWaitPeriod);
- if (_logger.isInfoEnabled())
- {
- _logger.info("Message send delayed by " + (System.currentTimeMillis() + _flowControlWaitFailure - expiryTime)/1000 + "s due to broker enforced flow control");
- }
- }
- if(!_flowControl.getFlowControl())
- {
- _logger.error("Message send failed due to timeout waiting on broker enforced flow control");
- throw new JMSException("Unable to send message for " + _flowControlWaitFailure /1000 + " seconds due to broker enforced flow control");
- }
- }
+ /**
+ * Tests whether flow to this session is blocked.
+ *
+ * @return true if flow is blocked or false otherwise.
+ */
+ public abstract boolean isFlowBlocked();
- }
+ public abstract void setFlowControl(final boolean active);
public interface Dispatchable
{