diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2007-11-26 01:41:31 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2007-11-26 01:41:31 +0000 |
| commit | a1391b5724829e4268faf4de60625b2d05e86288 (patch) | |
| tree | d124245b02b8ead3be3e37cef3fbac684c606e4e /java/client/src | |
| parent | 66f97f32c78e0cf5914a441ae8277ee3aa659ce9 (diff) | |
| download | qpid-python-a1391b5724829e4268faf4de60625b2d05e86288.tar.gz | |
QPID-567 : Add mutliversion support to Qpid/Java, fixed client support when server returns Protocol header.
Added QueueUnbind
Added ability to select protocol version in ConnectionURL or with -Dorg.apache.qpid.amqp_version
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@598105 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
7 files changed, 184 insertions, 82 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 85a5fbf996..38325a1e41 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -32,13 +32,7 @@ import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicQosBody; -import org.apache.qpid.framing.BasicQosOkBody; -import org.apache.qpid.framing.ChannelOpenBody; -import org.apache.qpid.framing.ChannelOpenOkBody; -import org.apache.qpid.framing.TxSelectBody; -import org.apache.qpid.framing.TxSelectOkBody; +import org.apache.qpid.framing.*; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ChannelLimitReachedException; import org.apache.qpid.jms.Connection; @@ -161,6 +155,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /** Thread Pool for executing connection level processes. Such as returning bounced messages. */ private final ExecutorService _taskPool = Executors.newCachedThreadPool(); private static final long DEFAULT_TIMEOUT = 1000 * 30; + private ProtocolVersion _protocolVersion; /** * @param broker brokerdetails @@ -253,6 +248,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _clientName = connectionURL.getClientName(); _username = connectionURL.getUsername(); _password = connectionURL.getPassword(); + + _protocolVersion = connectionURL.getProtocolVersion(); + setVirtualHost(connectionURL.getVirtualHost()); if (connectionURL.getDefaultQueueExchangeName() != null) @@ -393,16 +391,24 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException { + final Set<AMQState> openOrClosedStates = + EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED); try { TransportConnection.getInstance(brokerDetail).connect(_protocolHandler, brokerDetail); // this blocks until the connection has been set up or when an error // has prevented the connection being set up - _protocolHandler.attainState(AMQState.CONNECTION_OPEN); - _failoverPolicy.attainedConnection(); - // Again this should be changed to a suitable notify - _connected = true; + //_protocolHandler.attainState(AMQState.CONNECTION_OPEN); + AMQState state = _protocolHandler.attainState(openOrClosedStates); + if(state == AMQState.CONNECTION_OPEN) + { + + _failoverPolicy.attainedConnection(); + + // Again this should be changed to a suitable notify + _connected = true; + } } catch (AMQException e) { @@ -1285,4 +1291,16 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return _sessions.get(channelId); } + + public ProtocolVersion getProtocolVersion() + { + return _protocolVersion; + } + + public void setProtocolVersion(ProtocolVersion protocolVersion) + { + _protocolVersion = protocolVersion; + _protocolHandler.getProtocolSession().setProtocolVersion(protocolVersion); + } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java index 24f5ead2d0..64dbabf222 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java @@ -21,6 +21,7 @@ package org.apache.qpid.client; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.url.URLHelper; @@ -52,6 +53,7 @@ public class AMQConnectionURL implements ConnectionURL private AMQShortString _defaultTopicExchangeName; private AMQShortString _temporaryTopicExchangeName; private AMQShortString _temporaryQueueExchangeName; + private ProtocolVersion _protocolVersion = ProtocolVersion.defaultProtocolVersion(); public AMQConnectionURL(String fullURL) throws URLSyntaxException { @@ -255,6 +257,15 @@ public class AMQConnectionURL implements ConnectionURL { _temporaryTopicExchangeName = new AMQShortString(_options.get(OPTIONS_TEMPORARY_TOPIC_EXCHANGE)); } + if(_options.containsKey(OPTIONS_PROTOCOL_VERSION)) + { + ProtocolVersion pv = ProtocolVersion.parse(_options.get(OPTIONS_PROTOCOL_VERSION)); + if(pv != null) + { + _protocolVersion = pv; + } + } + } public String getURL() @@ -377,6 +388,11 @@ public class AMQConnectionURL implements ConnectionURL return _temporaryTopicExchangeName; } + public ProtocolVersion getProtocolVersion() + { + return _protocolVersion; + } + public String toString() { StringBuffer sb = new StringBuffer(); diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 56efec4fa2..5ee3fa5407 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -52,6 +52,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Iterator; +import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; @@ -387,94 +388,109 @@ public class AMQProtocolHandler extends IoHandlerAdapter public void messageReceived(IoSession session, Object message) throws Exception { - final boolean debug = _logger.isDebugEnabled(); - final long msgNumber = ++_messageReceivedCount; - - if (debug && ((msgNumber % 1000) == 0)) + if(message instanceof AMQFrame) { - _logger.debug("Received " + _messageReceivedCount + " protocol messages"); - } + final boolean debug = _logger.isDebugEnabled(); + final long msgNumber = ++_messageReceivedCount; - AMQFrame frame = (AMQFrame) message; + if (debug && ((msgNumber % 1000) == 0)) + { + _logger.debug("Received " + _messageReceivedCount + " protocol messages"); + } - final AMQBody bodyFrame = frame.getBodyFrame(); + AMQFrame frame = (AMQFrame) message; - HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody); + final AMQBody bodyFrame = frame.getBodyFrame(); - switch (bodyFrame.getFrameType()) - { - case AMQMethodBody.TYPE: + HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody); - if (debug) - { - _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame); - } + switch (bodyFrame.getFrameType()) + { + case AMQMethodBody.TYPE: - final AMQMethodEvent<AMQMethodBody> evt = - new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame); + if (debug) + { + _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame); + } - try - { + final AMQMethodEvent<AMQMethodBody> evt = + new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame); - boolean wasAnyoneInterested = getStateManager().methodReceived(evt); - if (!_frameListeners.isEmpty()) + try { - Iterator it = _frameListeners.iterator(); - while (it.hasNext()) + + boolean wasAnyoneInterested = getStateManager().methodReceived(evt); + if (!_frameListeners.isEmpty()) { - final AMQMethodListener listener = (AMQMethodListener) it.next(); - wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; + Iterator it = _frameListeners.iterator(); + while (it.hasNext()) + { + final AMQMethodListener listener = (AMQMethodListener) it.next(); + wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; + } } - } - if (!wasAnyoneInterested) - { - throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" - + _frameListeners); + if (!wasAnyoneInterested) + { + throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + + _frameListeners); + } } - } - catch (AMQException e) - { - getStateManager().error(e); - if (!_frameListeners.isEmpty()) + catch (AMQException e) { - Iterator it = _frameListeners.iterator(); - while (it.hasNext()) + getStateManager().error(e); + if (!_frameListeners.isEmpty()) { - final AMQMethodListener listener = (AMQMethodListener) it.next(); - listener.error(e); + Iterator it = _frameListeners.iterator(); + while (it.hasNext()) + { + final AMQMethodListener listener = (AMQMethodListener) it.next(); + listener.error(e); + } } + + exceptionCaught(session, e); } - exceptionCaught(session, e); - } + break; - break; + case ContentHeaderBody.TYPE: - case ContentHeaderBody.TYPE: + _protocolSession.messageContentHeaderReceived(frame.getChannel(), (ContentHeaderBody) bodyFrame); + break; - _protocolSession.messageContentHeaderReceived(frame.getChannel(), (ContentHeaderBody) bodyFrame); - break; + case ContentBody.TYPE: - case ContentBody.TYPE: + _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame); + break; - _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame); - break; + case HeartbeatBody.TYPE: - case HeartbeatBody.TYPE: + if (debug) + { + _logger.debug("Received heartbeat"); + } - if (debug) - { - _logger.debug("Received heartbeat"); - } + break; - break; + default: - default: + } + _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes()); + } + else if (message instanceof ProtocolInitiation) + { + // We get here if the server sends a response to our initial protocol header + // suggesting an alternate ProtocolVersion; the server will then close the + // connection. + ProtocolInitiation protocolInit = (ProtocolInitiation) message; + ProtocolVersion pv = protocolInit.checkVersion(); + getConnection().setProtocolVersion(pv); + + // get round a bug in old versions of qpid whereby the connection is not closed + _stateManager.changeState(AMQState.CONNECTION_CLOSED); } - - _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes()); } private static int _messagesOut; @@ -515,6 +531,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter getStateManager().attainState(s); } + public AMQState attainState(Set<AMQState> states) throws AMQException + { + return getStateManager().attainState(states); + } + + /** * Convenience method that writes a frame to the protocol session. Equivalent to calling * getProtocolSession().write(). diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 18c1e85eaa..b48adbdb08 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -121,7 +121,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT); _stateManager = stateManager; _stateManager.setProtocolSession(this); - _protocolVersion = ProtocolVersion.getLatestSupportedVersion(); + _protocolVersion = connection.getProtocolVersion(); _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(), stateManager); _connection = connection; @@ -133,7 +133,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession // start the process of setting up the connection. This is the first place that // data is written to the server. - _minaProtocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())); + _minaProtocolSession.write(new ProtocolInitiation(_connection.getProtocolVersion())); } public String getClientID() diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQState.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQState.java index 4996f59345..d32d10542f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/AMQState.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQState.java @@ -24,8 +24,22 @@ package org.apache.qpid.client.state; * States used in the AMQ protocol. Used by the finite state machine to determine * valid responses. */ -public class AMQState +public enum AMQState { + + CONNECTION_NOT_STARTED(1, "CONNECTION_NOT_STARTED"), + + CONNECTION_NOT_TUNED(2, "CONNECTION_NOT_TUNED"), + + CONNECTION_NOT_OPENED(3, "CONNECTION_NOT_OPENED"), + + CONNECTION_OPEN(4, "CONNECTION_OPEN"), + + CONNECTION_CLOSING(5, "CONNECTION_CLOSING"), + + CONNECTION_CLOSED(6, "CONNECTION_CLOSED"); + + private final int _id; private final String _name; @@ -41,16 +55,6 @@ public class AMQState return "AMQState: id = " + _id + " name: " + _name; } - public static final AMQState CONNECTION_NOT_STARTED = new AMQState(1, "CONNECTION_NOT_STARTED"); - - public static final AMQState CONNECTION_NOT_TUNED = new AMQState(2, "CONNECTION_NOT_TUNED"); - - public static final AMQState CONNECTION_NOT_OPENED = new AMQState(3, "CONNECTION_NOT_OPENED"); - public static final AMQState CONNECTION_OPEN = new AMQState(4, "CONNECTION_OPEN"); - public static final AMQState CONNECTION_CLOSING = new AMQState(5, "CONNECTION_CLOSING"); - - public static final AMQState CONNECTION_CLOSED = new AMQState(6, "CONNECTION_CLOSED"); - } diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index a9473df08c..b6baefe1b0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -30,6 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Iterator; +import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; /** @@ -165,4 +166,41 @@ public class AMQStateManager implements AMQMethodListener { return getProtocolSession().getMethodRegistry(); } + + public AMQState attainState(Set<AMQState> stateSet) throws AMQException + { + synchronized (_stateLock) + { + final long waitUntilTime = System.currentTimeMillis() + MAXIMUM_STATE_WAIT_TIME; + long waitTime = MAXIMUM_STATE_WAIT_TIME; + + while (!stateSet.contains(_currentState) && (waitTime > 0)) + { + try + { + _stateLock.wait(MAXIMUM_STATE_WAIT_TIME); + } + catch (InterruptedException e) + { + _logger.warn("Thread interrupted"); + } + + if (!stateSet.contains(_currentState)) + { + waitTime = waitUntilTime - System.currentTimeMillis(); + } + } + + if (!stateSet.contains(_currentState)) + { + _logger.warn("State not achieved within permitted time. Current state " + _currentState + + ", desired state: " + stateSet); + throw new AMQException("State not achieved within permitted time. Current state " + _currentState + + ", desired state: " + stateSet); + } + return _currentState; + } + + + } } diff --git a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java index 2d91e290c4..098256c75f 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java +++ b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java @@ -21,6 +21,7 @@ package org.apache.qpid.jms; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ProtocolVersion; import java.util.List; @@ -41,6 +42,7 @@ public interface ConnectionURL public static final String OPTIONS_DEFAULT_QUEUE_EXCHANGE = "defaultQueueExchange"; public static final String OPTIONS_TEMPORARY_TOPIC_EXCHANGE = "temporaryTopicExchange"; public static final String OPTIONS_TEMPORARY_QUEUE_EXCHANGE = "temporaryQueueExchange"; + public static final String OPTIONS_PROTOCOL_VERSION = "protocolVersion"; String getURL(); @@ -83,4 +85,6 @@ public interface ConnectionURL AMQShortString getTemporaryQueueExchangeName(); AMQShortString getTemporaryTopicExchangeName(); + + ProtocolVersion getProtocolVersion(); } |
