diff options
| author | Robert Gemmell <robbie@apache.org> | 2013-10-13 22:22:23 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2013-10-13 22:22:23 +0000 |
| commit | 35ad470e6d15e4dbab802b58b2d54dd840b248d8 (patch) | |
| tree | effb5c35075a9925066a8ca4b01fa43a310dd56c /qpid/java/client | |
| parent | 3b9b2ece8af8767492c77ea0e3bbd71a36307ce7 (diff) | |
| download | qpid-python-35ad470e6d15e4dbab802b58b2d54dd840b248d8.tar.gz | |
QPID-5224: escape usage of the ExchangeBound AMQP extension during basic consumer setup to enable interop with 0-8/0-9/0-9-1 brokers that lack this Qpid extension to AMQP, such as RabbitMQ.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1531762 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
6 files changed, 64 insertions, 5 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index a8fdaeb65c..0329deee03 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -80,4 +80,6 @@ public interface AMQConnectionDelegate boolean isSupportedServerFeature(final String featureName); void setHeartbeatListener(HeartbeatListener listener); + + boolean supportsIsBound(); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 69e79d42a0..66590aa0d7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -530,4 +530,11 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec } return true; } + + @Override + public boolean supportsIsBound() + { + //0-10 supports the isBound method + return true; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 67d7c2a78c..340aca70eb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -22,7 +22,6 @@ package org.apache.qpid.client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; @@ -35,13 +34,14 @@ import org.apache.qpid.framing.BasicQosBody; import org.apache.qpid.framing.BasicQosOkBody; import org.apache.qpid.framing.ChannelOpenBody; import org.apache.qpid.framing.ChannelOpenOkBody; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.framing.TxSelectBody; import org.apache.qpid.framing.TxSelectOkBody; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ChannelLimitReachedException; import org.apache.qpid.jms.ConnectionURL; -import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.properties.ConnectionStartProperties; import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.OutgoingNetworkTransport; @@ -51,11 +51,10 @@ import org.apache.qpid.transport.network.security.SecurityLayerFactory; import javax.jms.JMSException; import javax.jms.XASession; -import javax.net.ssl.SSLContext; + import java.io.IOException; import java.net.ConnectException; import java.nio.channels.UnresolvedAddressException; -import java.security.GeneralSecurityException; import java.text.MessageFormat; import java.util.ArrayList; import java.util.EnumSet; @@ -384,4 +383,33 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate { _conn.getProtocolHandler().setHeartbeatListener(listener); } + + @Override + public boolean supportsIsBound() + { + //Rough check whether the 'isBound' AMQP extension method is supported, by trying to determine if we are connected to Qpid. + //As older versions of the Qpid broker did not send properties, the value will be assumed true if no server properties + //are found, or the 'product' entry isn't present, and will only be false if it is present but doesn't match expectation. + boolean connectedToQpid = true; + + FieldTable serverProperties = _conn.getProtocolHandler().getProtocolSession().getConnectionStartServerProperties(); + if(serverProperties != null) + { + if(serverProperties.containsKey(ConnectionStartProperties.PRODUCT)) + { + //String.valueof to ensure it is non-null, then lowercase it + String product = String.valueOf(serverProperties.getString(ConnectionStartProperties.PRODUCT)).toLowerCase(); + + //value is "unknown" when the naming properties file hasn't been found, e.g in IDE. + connectedToQpid = product.contains("qpid") || product.equals("unknown"); + } + } + + if(_logger.isDebugEnabled()) + { + _logger.debug("supportsIsBound: " + connectedToQpid); + } + + return connectedToQpid; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 9a9da62f2a..3ff7416d8f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -397,10 +397,19 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe } } + /** + * Checks if a particular queue is bound to an exchange with a given key. + * + * Returns false if not connected to a Qpid broker which supports the necessary AMQP extension. + */ @Override protected boolean isBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) throws AMQException { + if(!getAMQConnection().getDelegate().supportsIsBound()) + { + return false; + } AMQMethodEvent response = new FailoverNoopSupport<AMQMethodEvent, AMQException>( new FailoverProtectedOperation<AMQMethodEvent, AMQException>() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java index 366b5f115e..b0c30f82fa 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java @@ -168,6 +168,8 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener<Co ConnectionStartProperties.getPID()); FieldTable serverProperties = body.getServerProperties(); + session.setConnectionStartServerProperties(serverProperties); + ConnectionURL url = getConnectionURL(session); _closeWhenNoRouteHelper.setClientProperties(clientProperties, url, serverProperties); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index aed10cf15f..67bd8de846 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -22,7 +22,6 @@ package org.apache.qpid.client.protocol; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; @@ -36,6 +35,7 @@ import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.HeartbeatBody; import org.apache.qpid.framing.MethodDispatcher; import org.apache.qpid.framing.MethodRegistry; @@ -101,6 +101,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession private final AMQConnection _connection; private ConnectionTuneParameters _connectionTuneParameters; + private FieldTable _connectionStartServerProperties; private SaslClient _saslClient; @@ -529,4 +530,14 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { return _connection; } + + public void setConnectionStartServerProperties(FieldTable serverProperties) + { + _connectionStartServerProperties = serverProperties; + } + + public FieldTable getConnectionStartServerProperties() + { + return _connectionStartServerProperties; + } } |
