diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 225 |
1 files changed, 54 insertions, 171 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 1468e90c4e..91a6389214 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -20,11 +20,6 @@ */ package org.apache.qpid.client; -import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_FAILURE; -import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD; -import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_FAILURE; -import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +35,7 @@ import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.failover.FailoverRetrySupport; import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.message.AMQPEncodedMapMessage; +import org.apache.qpid.client.message.AMQPEncodedListMessage; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.CloseConsumerMessage; import org.apache.qpid.client.message.JMSBytesMessage; @@ -49,14 +45,13 @@ import org.apache.qpid.client.message.JMSStreamMessage; import org.apache.qpid.client.message.JMSTextMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; -import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.jms.Session; +import org.apache.qpid.jms.ListMessage; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.SessionException; @@ -122,27 +117,16 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** Immediate message prefetch default. */ public static final String IMMEDIATE_PREFETCH_DEFAULT = "false"; - /** - * The period to wait while flow controlled before sending a log message confirming that the session is still - * waiting on flow control being revoked - */ - private final long _flowControlWaitPeriod = Long.getLong(QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD, - DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD); - - /** - * The period to wait while flow controlled before declaring a failure - */ - private final long _flowControlWaitFailure = Long.getLong(QPID_FLOW_CONTROL_WAIT_FAILURE, - DEFAULT_FLOW_CONTROL_WAIT_FAILURE); - private final boolean _delareQueues = - Boolean.parseBoolean(System.getProperty("qpid.declare_queues", "true")); + Boolean.parseBoolean(System.getProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME, "true")); private final boolean _declareExchanges = - Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", "true")); + Boolean.parseBoolean(System.getProperty(ClientProperties.QPID_DECLARE_EXCHANGES_PROP_NAME, "true")); private final boolean _useAMQPEncodedMapMessage; + private final boolean _useAMQPEncodedStreamMessage; + /** * Flag indicating to start dispatcher as a daemon thread */ @@ -265,11 +249,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** Has failover occured on this session with outstanding actions to commit? */ private boolean _failedOverDirty; - /** Flow control */ - private FlowControlIndicator _flowControl = new FlowControlIndicator(); - - - /** Holds the highest received delivery tag. */ protected AtomicLong getHighestDeliveryTag() { @@ -408,22 +387,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } - private static final class FlowControlIndicator - { - private volatile boolean _flowControl = true; - - public synchronized void setFlowControl(boolean flowControl) - { - _flowControl = flowControl; - notify(); - } - - public boolean getFlowControl() - { - return _flowControl; - } - } - /** * Creates a new session on a connection. * @@ -439,6 +402,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark) { _useAMQPEncodedMapMessage = con == null ? true : !con.isUseLegacyMapMessageFormat(); + _useAMQPEncodedStreamMessage = con == null ? false : !con.isUseLegacyStreamMessageFormat(); _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT)); _strictAMQPFATAL = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT)); @@ -649,12 +613,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic */ public abstract void acknowledgeMessage(long deliveryTag, boolean multiple); - public MethodRegistry getMethodRegistry() - { - MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry(); - return methodRegistry; - } - /** * Binds the named queue, with the specified routing key, to the named exchange. * @@ -1041,12 +999,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { try { - handleAddressBasedDestination(dest,false,noLocal,true); + resolveAddress(dest,false,noLocal); if (dest.getAddressType() != AMQDestination.TOPIC_TYPE) { throw new JMSException("Durable subscribers can only be created for Topics"); } - dest.getSourceNode().setDurable(true); } catch(AMQException e) { @@ -1158,6 +1115,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } + public ListMessage createListMessage() throws JMSException + { + checkNotClosed(); + AMQPEncodedListMessage msg = new AMQPEncodedListMessage(getMessageDelegateFactory()); + msg.setAMQSession(this); + return msg; + } + public MapMessage createMapMessage() throws JMSException { checkNotClosed(); @@ -1400,17 +1365,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public StreamMessage createStreamMessage() throws JMSException { - // This method needs to be improved. Throwables only arrive here from the mina : exceptionRecived - // calls through connection.closeAllSessions which is also called by the public connection.close() - // with a null cause - // When we are closing the Session due to a protocol session error we simply create a new AMQException - // with the correct error code and text this is cleary WRONG as the instanceof check below will fail. - // We need to determin here if the connection should be - - synchronized (getFailoverMutex()) + checkNotClosed(); + if (_useAMQPEncodedStreamMessage) + { + AMQPEncodedListMessage msg = new AMQPEncodedListMessage(getMessageDelegateFactory()); + msg.setAMQSession(this); + return msg; + } + else { - checkNotClosed(); - JMSStreamMessage msg = new JMSStreamMessage(getMessageDelegateFactory()); msg.setAMQSession(this); return msg; @@ -1550,7 +1513,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public void declareExchange(AMQShortString name, AMQShortString type, boolean nowait) throws AMQException { - declareExchange(name, type, getProtocolHandler(), nowait); + declareExchange(name, type, nowait, false, false, false); } abstract public void sync() throws AMQException; @@ -1690,8 +1653,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic throws AMQException { - AMQProtocolHandler protocolHandler = getProtocolHandler(); - declareExchange(amqd, protocolHandler, false); + declareExchange(amqd, false); AMQShortString queueName = declareQueue(amqd, false); bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(), amqd); } @@ -2582,11 +2544,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** * Register to consume from the queue. - * * @param queueName */ - private void consumeFromQueue(C consumer, AMQShortString queueName, - AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException, FailoverException + private void consumeFromQueue(C consumer, AMQShortString queueName, boolean nowait) throws AMQException, FailoverException { int tagId = _nextTag++; @@ -2603,7 +2563,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic try { - sendConsume(consumer, queueName, protocolHandler, nowait, tagId); + sendConsume(consumer, queueName, nowait, tagId); } catch (AMQException e) { @@ -2614,7 +2574,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } public abstract void sendConsume(C consumer, AMQShortString queueName, - AMQProtocolHandler protocolHandler, boolean nowait, int tag) throws AMQException, FailoverException; + boolean nowait, int tag) throws AMQException, FailoverException; private P createProducerImpl(final Destination destination, final Boolean mandatory, final Boolean immediate) throws JMSException @@ -2648,9 +2608,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public abstract P createMessageProducer(final Destination destination, final Boolean mandatory, final Boolean immediate, final long producerId) throws JMSException; - private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException + private void declareExchange(AMQDestination amqd, boolean nowait) throws AMQException { - declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait); + declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), nowait, amqd.isExchangeDurable(), + amqd.isExchangeAutoDelete(), amqd.isExchangeInternal()); } /** @@ -2707,33 +2668,29 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * * @param name The name of the exchange to declare. * @param type The type of the exchange to declare. - * @param protocolHandler The protocol handler to process the communication through. * @param nowait - * + * @param durable + * @param autoDelete + * @param internal * @throws AMQException If the exchange cannot be declared for any reason. * @todo Be aware of possible changes to parameter order as versions change. */ private void declareExchange(final AMQShortString name, final AMQShortString type, - final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException + final boolean nowait, final boolean durable, + final boolean autoDelete, final boolean internal) throws AMQException { new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() { public Object execute() throws AMQException, FailoverException { - sendExchangeDeclare(name, type, protocolHandler, nowait); + sendExchangeDeclare(name, type, nowait, durable, autoDelete, internal); return null; } }, _connection).execute(); } - public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler, - final boolean nowait) throws AMQException, FailoverException; - - - void declareQueuePassive(AMQDestination queue) throws AMQException - { - declareQueue(queue,false,false,true); - } + public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait, + boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException; /** * Declares a queue for a JMS destination. @@ -2768,31 +2725,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic return declareQueue(amqd, noLocal, nowait, false); } - protected AMQShortString declareQueue(final AMQDestination amqd, - final boolean noLocal, final boolean nowait, final boolean passive) - throws AMQException - { - final AMQProtocolHandler protocolHandler = getProtocolHandler(); - return new FailoverNoopSupport<AMQShortString, AMQException>( - new FailoverProtectedOperation<AMQShortString, AMQException>() - { - public AMQShortString execute() throws AMQException, FailoverException - { - // Generate the queue name if the destination indicates that a client generated name is to be used. - if (amqd.isNameRequired()) - { - amqd.setQueueName(protocolHandler.generateQueueName()); - } - - sendQueueDeclare(amqd, protocolHandler, nowait, passive); - - return amqd.getAMQQueueName(); - } - }, _connection).execute(); - } - - public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean nowait, boolean passive) throws AMQException, FailoverException; + protected abstract AMQShortString declareQueue(final AMQDestination amqd, + final boolean noLocal, final boolean nowait, final boolean passive) throws AMQException; /** * Undeclares the specified queue. @@ -2845,21 +2779,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic return ++_nextProducerId; } - protected AMQProtocolHandler getProtocolHandler() - { - return _connection.getProtocolHandler(); - } - - public byte getProtocolMajorVersion() - { - return getProtocolHandler().getProtocolMajorVersion(); - } - - public byte getProtocolMinorVersion() - { - return getProtocolHandler().getProtocolMinorVersion(); - } - protected boolean hasMessageListeners() { return _hasMessageListeners; @@ -2918,17 +2837,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { AMQDestination amqd = consumer.getDestination(); - AMQProtocolHandler protocolHandler = getProtocolHandler(); - if (amqd.getDestSyntax() == DestSyntax.ADDR) { - handleAddressBasedDestination(amqd,true,consumer.isNoLocal(),nowait); + resolveAddress(amqd,true,consumer.isNoLocal()); } else { if (_declareExchanges) { - declareExchange(amqd, protocolHandler, nowait); + declareExchange(amqd, nowait); } if (_delareQueues || amqd.isNameRequired()) @@ -2973,7 +2890,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic try { - consumeFromQueue(consumer, queueName, protocolHandler, nowait); + consumeFromQueue(consumer, queueName, nowait); } catch (FailoverException e) { @@ -2981,10 +2898,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } - public abstract void handleAddressBasedDestination(AMQDestination dest, + public abstract void resolveAddress(AMQDestination dest, boolean isConsumer, - boolean noLocal, - boolean noWait) throws AMQException; + boolean noLocal) throws AMQException; private void registerProducer(long producerId, MessageProducer producer) { @@ -3141,47 +3057,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic _ticket = ticket; } - public boolean isFlowBlocked() - { - synchronized (_flowControl) - { - return !_flowControl.getFlowControl(); - } - } - - public void setFlowControl(final boolean active) - { - _flowControl.setFlowControl(active); - if (_logger.isInfoEnabled()) - { - _logger.info("Broker enforced flow control " + (active ? "no longer in effect" : "has been enforced")); - } - } - - public void checkFlowControl() throws InterruptedException, JMSException - { - long expiryTime = 0L; - synchronized (_flowControl) - { - while (!_flowControl.getFlowControl() && - (expiryTime == 0L ? (expiryTime = System.currentTimeMillis() + _flowControlWaitFailure) - : expiryTime) >= System.currentTimeMillis() ) - { - - _flowControl.wait(_flowControlWaitPeriod); - if (_logger.isInfoEnabled()) - { - _logger.info("Message send delayed by " + (System.currentTimeMillis() + _flowControlWaitFailure - expiryTime)/1000 + "s due to broker enforced flow control"); - } - } - if(!_flowControl.getFlowControl()) - { - _logger.error("Message send failed due to timeout waiting on broker enforced flow control"); - throw new JMSException("Unable to send message for " + _flowControlWaitFailure /1000 + " seconds due to broker enforced flow control"); - } - } + /** + * Tests whether flow to this session is blocked. + * + * @return true if flow is blocked or false otherwise. + */ + public abstract boolean isFlowBlocked(); - } + public abstract void setFlowControl(final boolean active); public interface Dispatchable { |