summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2007-11-26 01:41:31 +0000
committerRobert Godfrey <rgodfrey@apache.org>2007-11-26 01:41:31 +0000
commita1391b5724829e4268faf4de60625b2d05e86288 (patch)
treed124245b02b8ead3be3e37cef3fbac684c606e4e /java/client/src
parent66f97f32c78e0cf5914a441ae8277ee3aa659ce9 (diff)
downloadqpid-python-a1391b5724829e4268faf4de60625b2d05e86288.tar.gz
QPID-567 : Add mutliversion support to Qpid/Java, fixed client support when server returns Protocol header.
Added QueueUnbind Added ability to select protocol version in ConnectionURL or with -Dorg.apache.qpid.amqp_version git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@598105 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java40
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java16
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java138
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/AMQState.java26
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java38
-rw-r--r--java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java4
7 files changed, 184 insertions, 82 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 85a5fbf996..38325a1e41 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -32,13 +32,7 @@ import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicQosBody;
-import org.apache.qpid.framing.BasicQosOkBody;
-import org.apache.qpid.framing.ChannelOpenBody;
-import org.apache.qpid.framing.ChannelOpenOkBody;
-import org.apache.qpid.framing.TxSelectBody;
-import org.apache.qpid.framing.TxSelectOkBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ChannelLimitReachedException;
import org.apache.qpid.jms.Connection;
@@ -161,6 +155,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
/** Thread Pool for executing connection level processes. Such as returning bounced messages. */
private final ExecutorService _taskPool = Executors.newCachedThreadPool();
private static final long DEFAULT_TIMEOUT = 1000 * 30;
+ private ProtocolVersion _protocolVersion;
/**
* @param broker brokerdetails
@@ -253,6 +248,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_clientName = connectionURL.getClientName();
_username = connectionURL.getUsername();
_password = connectionURL.getPassword();
+
+ _protocolVersion = connectionURL.getProtocolVersion();
+
setVirtualHost(connectionURL.getVirtualHost());
if (connectionURL.getDefaultQueueExchangeName() != null)
@@ -393,16 +391,24 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
{
+ final Set<AMQState> openOrClosedStates =
+ EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED);
try
{
TransportConnection.getInstance(brokerDetail).connect(_protocolHandler, brokerDetail);
// this blocks until the connection has been set up or when an error
// has prevented the connection being set up
- _protocolHandler.attainState(AMQState.CONNECTION_OPEN);
- _failoverPolicy.attainedConnection();
- // Again this should be changed to a suitable notify
- _connected = true;
+ //_protocolHandler.attainState(AMQState.CONNECTION_OPEN);
+ AMQState state = _protocolHandler.attainState(openOrClosedStates);
+ if(state == AMQState.CONNECTION_OPEN)
+ {
+
+ _failoverPolicy.attainedConnection();
+
+ // Again this should be changed to a suitable notify
+ _connected = true;
+ }
}
catch (AMQException e)
{
@@ -1285,4 +1291,16 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
return _sessions.get(channelId);
}
+
+ public ProtocolVersion getProtocolVersion()
+ {
+ return _protocolVersion;
+ }
+
+ public void setProtocolVersion(ProtocolVersion protocolVersion)
+ {
+ _protocolVersion = protocolVersion;
+ _protocolHandler.getProtocolSession().setProtocolVersion(protocolVersion);
+ }
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
index 24f5ead2d0..64dbabf222 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
@@ -21,6 +21,7 @@
package org.apache.qpid.client;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.url.URLHelper;
@@ -52,6 +53,7 @@ public class AMQConnectionURL implements ConnectionURL
private AMQShortString _defaultTopicExchangeName;
private AMQShortString _temporaryTopicExchangeName;
private AMQShortString _temporaryQueueExchangeName;
+ private ProtocolVersion _protocolVersion = ProtocolVersion.defaultProtocolVersion();
public AMQConnectionURL(String fullURL) throws URLSyntaxException
{
@@ -255,6 +257,15 @@ public class AMQConnectionURL implements ConnectionURL
{
_temporaryTopicExchangeName = new AMQShortString(_options.get(OPTIONS_TEMPORARY_TOPIC_EXCHANGE));
}
+ if(_options.containsKey(OPTIONS_PROTOCOL_VERSION))
+ {
+ ProtocolVersion pv = ProtocolVersion.parse(_options.get(OPTIONS_PROTOCOL_VERSION));
+ if(pv != null)
+ {
+ _protocolVersion = pv;
+ }
+ }
+
}
public String getURL()
@@ -377,6 +388,11 @@ public class AMQConnectionURL implements ConnectionURL
return _temporaryTopicExchangeName;
}
+ public ProtocolVersion getProtocolVersion()
+ {
+ return _protocolVersion;
+ }
+
public String toString()
{
StringBuffer sb = new StringBuffer();
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 56efec4fa2..5ee3fa5407 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -52,6 +52,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
+import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
@@ -387,94 +388,109 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public void messageReceived(IoSession session, Object message) throws Exception
{
- final boolean debug = _logger.isDebugEnabled();
- final long msgNumber = ++_messageReceivedCount;
-
- if (debug && ((msgNumber % 1000) == 0))
+ if(message instanceof AMQFrame)
{
- _logger.debug("Received " + _messageReceivedCount + " protocol messages");
- }
+ final boolean debug = _logger.isDebugEnabled();
+ final long msgNumber = ++_messageReceivedCount;
- AMQFrame frame = (AMQFrame) message;
+ if (debug && ((msgNumber % 1000) == 0))
+ {
+ _logger.debug("Received " + _messageReceivedCount + " protocol messages");
+ }
- final AMQBody bodyFrame = frame.getBodyFrame();
+ AMQFrame frame = (AMQFrame) message;
- HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
+ final AMQBody bodyFrame = frame.getBodyFrame();
- switch (bodyFrame.getFrameType())
- {
- case AMQMethodBody.TYPE:
+ HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
- if (debug)
- {
- _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame);
- }
+ switch (bodyFrame.getFrameType())
+ {
+ case AMQMethodBody.TYPE:
- final AMQMethodEvent<AMQMethodBody> evt =
- new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame);
+ if (debug)
+ {
+ _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame);
+ }
- try
- {
+ final AMQMethodEvent<AMQMethodBody> evt =
+ new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame);
- boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
- if (!_frameListeners.isEmpty())
+ try
{
- Iterator it = _frameListeners.iterator();
- while (it.hasNext())
+
+ boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
+ if (!_frameListeners.isEmpty())
{
- final AMQMethodListener listener = (AMQMethodListener) it.next();
- wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
+ Iterator it = _frameListeners.iterator();
+ while (it.hasNext())
+ {
+ final AMQMethodListener listener = (AMQMethodListener) it.next();
+ wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
+ }
}
- }
- if (!wasAnyoneInterested)
- {
- throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:"
- + _frameListeners);
+ if (!wasAnyoneInterested)
+ {
+ throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:"
+ + _frameListeners);
+ }
}
- }
- catch (AMQException e)
- {
- getStateManager().error(e);
- if (!_frameListeners.isEmpty())
+ catch (AMQException e)
{
- Iterator it = _frameListeners.iterator();
- while (it.hasNext())
+ getStateManager().error(e);
+ if (!_frameListeners.isEmpty())
{
- final AMQMethodListener listener = (AMQMethodListener) it.next();
- listener.error(e);
+ Iterator it = _frameListeners.iterator();
+ while (it.hasNext())
+ {
+ final AMQMethodListener listener = (AMQMethodListener) it.next();
+ listener.error(e);
+ }
}
+
+ exceptionCaught(session, e);
}
- exceptionCaught(session, e);
- }
+ break;
- break;
+ case ContentHeaderBody.TYPE:
- case ContentHeaderBody.TYPE:
+ _protocolSession.messageContentHeaderReceived(frame.getChannel(), (ContentHeaderBody) bodyFrame);
+ break;
- _protocolSession.messageContentHeaderReceived(frame.getChannel(), (ContentHeaderBody) bodyFrame);
- break;
+ case ContentBody.TYPE:
- case ContentBody.TYPE:
+ _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame);
+ break;
- _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame);
- break;
+ case HeartbeatBody.TYPE:
- case HeartbeatBody.TYPE:
+ if (debug)
+ {
+ _logger.debug("Received heartbeat");
+ }
- if (debug)
- {
- _logger.debug("Received heartbeat");
- }
+ break;
- break;
+ default:
- default:
+ }
+ _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
+ }
+ else if (message instanceof ProtocolInitiation)
+ {
+ // We get here if the server sends a response to our initial protocol header
+ // suggesting an alternate ProtocolVersion; the server will then close the
+ // connection.
+ ProtocolInitiation protocolInit = (ProtocolInitiation) message;
+ ProtocolVersion pv = protocolInit.checkVersion();
+ getConnection().setProtocolVersion(pv);
+
+ // get round a bug in old versions of qpid whereby the connection is not closed
+ _stateManager.changeState(AMQState.CONNECTION_CLOSED);
}
-
- _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
}
private static int _messagesOut;
@@ -515,6 +531,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter
getStateManager().attainState(s);
}
+ public AMQState attainState(Set<AMQState> states) throws AMQException
+ {
+ return getStateManager().attainState(states);
+ }
+
+
/**
* Convenience method that writes a frame to the protocol session. Equivalent to calling
* getProtocolSession().write().
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 18c1e85eaa..b48adbdb08 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -121,7 +121,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
_minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
_stateManager = stateManager;
_stateManager.setProtocolSession(this);
- _protocolVersion = ProtocolVersion.getLatestSupportedVersion();
+ _protocolVersion = connection.getProtocolVersion();
_methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
stateManager);
_connection = connection;
@@ -133,7 +133,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
// start the process of setting up the connection. This is the first place that
// data is written to the server.
- _minaProtocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
+ _minaProtocolSession.write(new ProtocolInitiation(_connection.getProtocolVersion()));
}
public String getClientID()
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQState.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQState.java
index 4996f59345..d32d10542f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/AMQState.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQState.java
@@ -24,8 +24,22 @@ package org.apache.qpid.client.state;
* States used in the AMQ protocol. Used by the finite state machine to determine
* valid responses.
*/
-public class AMQState
+public enum AMQState
{
+
+ CONNECTION_NOT_STARTED(1, "CONNECTION_NOT_STARTED"),
+
+ CONNECTION_NOT_TUNED(2, "CONNECTION_NOT_TUNED"),
+
+ CONNECTION_NOT_OPENED(3, "CONNECTION_NOT_OPENED"),
+
+ CONNECTION_OPEN(4, "CONNECTION_OPEN"),
+
+ CONNECTION_CLOSING(5, "CONNECTION_CLOSING"),
+
+ CONNECTION_CLOSED(6, "CONNECTION_CLOSED");
+
+
private final int _id;
private final String _name;
@@ -41,16 +55,6 @@ public class AMQState
return "AMQState: id = " + _id + " name: " + _name;
}
- public static final AMQState CONNECTION_NOT_STARTED = new AMQState(1, "CONNECTION_NOT_STARTED");
-
- public static final AMQState CONNECTION_NOT_TUNED = new AMQState(2, "CONNECTION_NOT_TUNED");
-
- public static final AMQState CONNECTION_NOT_OPENED = new AMQState(3, "CONNECTION_NOT_OPENED");
- public static final AMQState CONNECTION_OPEN = new AMQState(4, "CONNECTION_OPEN");
- public static final AMQState CONNECTION_CLOSING = new AMQState(5, "CONNECTION_CLOSING");
-
- public static final AMQState CONNECTION_CLOSED = new AMQState(6, "CONNECTION_CLOSED");
-
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index a9473df08c..b6baefe1b0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -30,6 +30,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
+import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
/**
@@ -165,4 +166,41 @@ public class AMQStateManager implements AMQMethodListener
{
return getProtocolSession().getMethodRegistry();
}
+
+ public AMQState attainState(Set<AMQState> stateSet) throws AMQException
+ {
+ synchronized (_stateLock)
+ {
+ final long waitUntilTime = System.currentTimeMillis() + MAXIMUM_STATE_WAIT_TIME;
+ long waitTime = MAXIMUM_STATE_WAIT_TIME;
+
+ while (!stateSet.contains(_currentState) && (waitTime > 0))
+ {
+ try
+ {
+ _stateLock.wait(MAXIMUM_STATE_WAIT_TIME);
+ }
+ catch (InterruptedException e)
+ {
+ _logger.warn("Thread interrupted");
+ }
+
+ if (!stateSet.contains(_currentState))
+ {
+ waitTime = waitUntilTime - System.currentTimeMillis();
+ }
+ }
+
+ if (!stateSet.contains(_currentState))
+ {
+ _logger.warn("State not achieved within permitted time. Current state " + _currentState
+ + ", desired state: " + stateSet);
+ throw new AMQException("State not achieved within permitted time. Current state " + _currentState
+ + ", desired state: " + stateSet);
+ }
+ return _currentState;
+ }
+
+
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
index 2d91e290c4..098256c75f 100644
--- a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
+++ b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
@@ -21,6 +21,7 @@
package org.apache.qpid.jms;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ProtocolVersion;
import java.util.List;
@@ -41,6 +42,7 @@ public interface ConnectionURL
public static final String OPTIONS_DEFAULT_QUEUE_EXCHANGE = "defaultQueueExchange";
public static final String OPTIONS_TEMPORARY_TOPIC_EXCHANGE = "temporaryTopicExchange";
public static final String OPTIONS_TEMPORARY_QUEUE_EXCHANGE = "temporaryQueueExchange";
+ public static final String OPTIONS_PROTOCOL_VERSION = "protocolVersion";
String getURL();
@@ -83,4 +85,6 @@ public interface ConnectionURL
AMQShortString getTemporaryQueueExchangeName();
AMQShortString getTemporaryTopicExchangeName();
+
+ ProtocolVersion getProtocolVersion();
}