summaryrefslogtreecommitdiff
path: root/qpid/java/client
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2013-10-13 22:22:23 +0000
committerRobert Gemmell <robbie@apache.org>2013-10-13 22:22:23 +0000
commit35ad470e6d15e4dbab802b58b2d54dd840b248d8 (patch)
treeeffb5c35075a9925066a8ca4b01fa43a310dd56c /qpid/java/client
parent3b9b2ece8af8767492c77ea0e3bbd71a36307ce7 (diff)
downloadqpid-python-35ad470e6d15e4dbab802b58b2d54dd840b248d8.tar.gz
QPID-5224: escape usage of the ExchangeBound AMQP extension during basic consumer setup to enable interop with 0-8/0-9/0-9-1 brokers that lack this Qpid extension to AMQP, such as RabbitMQ.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1531762 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java7
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java36
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java9
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java13
6 files changed, 64 insertions, 5 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
index a8fdaeb65c..0329deee03 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
@@ -80,4 +80,6 @@ public interface AMQConnectionDelegate
boolean isSupportedServerFeature(final String featureName);
void setHeartbeatListener(HeartbeatListener listener);
+
+ boolean supportsIsBound();
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 69e79d42a0..66590aa0d7 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -530,4 +530,11 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
}
return true;
}
+
+ @Override
+ public boolean supportsIsBound()
+ {
+ //0-10 supports the isBound method
+ return true;
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 67d7c2a78c..340aca70eb 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -22,7 +22,6 @@ package org.apache.qpid.client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
@@ -35,13 +34,14 @@ import org.apache.qpid.framing.BasicQosBody;
import org.apache.qpid.framing.BasicQosOkBody;
import org.apache.qpid.framing.ChannelOpenBody;
import org.apache.qpid.framing.ChannelOpenOkBody;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.framing.TxSelectBody;
import org.apache.qpid.framing.TxSelectOkBody;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ChannelLimitReachedException;
import org.apache.qpid.jms.ConnectionURL;
-import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.OutgoingNetworkTransport;
@@ -51,11 +51,10 @@ import org.apache.qpid.transport.network.security.SecurityLayerFactory;
import javax.jms.JMSException;
import javax.jms.XASession;
-import javax.net.ssl.SSLContext;
+
import java.io.IOException;
import java.net.ConnectException;
import java.nio.channels.UnresolvedAddressException;
-import java.security.GeneralSecurityException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.EnumSet;
@@ -384,4 +383,33 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
{
_conn.getProtocolHandler().setHeartbeatListener(listener);
}
+
+ @Override
+ public boolean supportsIsBound()
+ {
+ //Rough check whether the 'isBound' AMQP extension method is supported, by trying to determine if we are connected to Qpid.
+ //As older versions of the Qpid broker did not send properties, the value will be assumed true if no server properties
+ //are found, or the 'product' entry isn't present, and will only be false if it is present but doesn't match expectation.
+ boolean connectedToQpid = true;
+
+ FieldTable serverProperties = _conn.getProtocolHandler().getProtocolSession().getConnectionStartServerProperties();
+ if(serverProperties != null)
+ {
+ if(serverProperties.containsKey(ConnectionStartProperties.PRODUCT))
+ {
+ //String.valueof to ensure it is non-null, then lowercase it
+ String product = String.valueOf(serverProperties.getString(ConnectionStartProperties.PRODUCT)).toLowerCase();
+
+ //value is "unknown" when the naming properties file hasn't been found, e.g in IDE.
+ connectedToQpid = product.contains("qpid") || product.equals("unknown");
+ }
+ }
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("supportsIsBound: " + connectedToQpid);
+ }
+
+ return connectedToQpid;
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 9a9da62f2a..3ff7416d8f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -397,10 +397,19 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
}
}
+ /**
+ * Checks if a particular queue is bound to an exchange with a given key.
+ *
+ * Returns false if not connected to a Qpid broker which supports the necessary AMQP extension.
+ */
@Override
protected boolean isBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
throws AMQException
{
+ if(!getAMQConnection().getDelegate().supportsIsBound())
+ {
+ return false;
+ }
AMQMethodEvent response = new FailoverNoopSupport<AMQMethodEvent, AMQException>(
new FailoverProtectedOperation<AMQMethodEvent, AMQException>()
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
index 366b5f115e..b0c30f82fa 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
@@ -168,6 +168,8 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener<Co
ConnectionStartProperties.getPID());
FieldTable serverProperties = body.getServerProperties();
+ session.setConnectionStartServerProperties(serverProperties);
+
ConnectionURL url = getConnectionURL(session);
_closeWhenNoRouteHelper.setClientProperties(clientProperties, url, serverProperties);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index aed10cf15f..67bd8de846 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -22,7 +22,6 @@ package org.apache.qpid.client.protocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
@@ -36,6 +35,7 @@ import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.HeartbeatBody;
import org.apache.qpid.framing.MethodDispatcher;
import org.apache.qpid.framing.MethodRegistry;
@@ -101,6 +101,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
private final AMQConnection _connection;
private ConnectionTuneParameters _connectionTuneParameters;
+ private FieldTable _connectionStartServerProperties;
private SaslClient _saslClient;
@@ -529,4 +530,14 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
return _connection;
}
+
+ public void setConnectionStartServerProperties(FieldTable serverProperties)
+ {
+ _connectionStartServerProperties = serverProperties;
+ }
+
+ public FieldTable getConnectionStartServerProperties()
+ {
+ return _connectionStartServerProperties;
+ }
}