diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2007-01-16 22:10:07 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2007-01-16 22:10:07 +0000 |
| commit | 554b5d8aadb7a4ab8d734d284d4b0bf66b381b1c (patch) | |
| tree | 24152a3dd3fcbc159f1ac61c5003831d69ce61ac /java/client | |
| parent | bc84e62cc549ac2d751a45d61a867354c84c60d6 (diff) | |
| download | qpid-python-554b5d8aadb7a4ab8d734d284d4b0bf66b381b1c.tar.gz | |
Created wiring to client RequestManagers and ResponseManagers, refactored all frame write code to use new write mechanisms.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@496873 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
11 files changed, 240 insertions, 150 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 8eba0160a8..0b5c1fbaca 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -59,6 +59,7 @@ import org.apache.qpid.client.failover.FailoverSupport; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.ChannelOpenBody; import org.apache.qpid.framing.ChannelOpenOkBody; import org.apache.qpid.framing.MessageOkBody; @@ -481,9 +482,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - _protocolHandler.syncWrite( - ChannelOpenBody.createAMQFrame(channelId, - (byte)0, (byte)9, // AMQP version (major, minor) + _protocolHandler.syncWrite(channelId, + ChannelOpenBody.createMethodBody((byte)0, (byte)9, // AMQP version (major, minor) null), // outOfBand ChannelOpenOkBody.class); @@ -491,9 +491,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - _protocolHandler.syncWrite( - MessageQosBody.createAMQFrame(channelId, - (byte)0, (byte)9, // AMQP version (major, minor) + _protocolHandler.syncWrite(channelId, + MessageQosBody.createMethodBody((byte)0, (byte)9, // AMQP version (major, minor) false, // global prefetchHigh, // prefetchCount 0), // prefetchSize @@ -508,7 +507,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, (byte)0, (byte)9), TxSelectOkBody.class); + _protocolHandler.syncWrite(channelId, TxSelectBody.createMethodBody((byte)0, (byte)9), TxSelectOkBody.class); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index b1f19bc191..f1a9823e38 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -68,6 +68,7 @@ import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.ChannelCloseBody; import org.apache.qpid.framing.ChannelCloseOkBody; import org.apache.qpid.framing.ChannelFlowBody; @@ -523,7 +524,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - _connection.getProtocolHandler().syncWrite(TxCommitBody.createAMQFrame(_channelId, (byte)0, (byte)9), TxCommitOkBody.class); + _connection.getProtocolHandler().syncWrite(_channelId, TxCommitBody.createMethodBody((byte)0, (byte)9), TxCommitOkBody.class); } catch (AMQException e) { @@ -541,8 +542,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - _connection.getProtocolHandler().syncWrite( - TxRollbackBody.createAMQFrame(_channelId, (byte)0, (byte)9), TxRollbackOkBody.class); + _connection.getProtocolHandler().syncWrite(_channelId, + TxRollbackBody.createMethodBody((byte)0, (byte)9), TxRollbackOkBody.class); } catch (AMQException e) { @@ -568,13 +569,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - final AMQFrame frame = ChannelCloseBody.createAMQFrame(getChannelId(), + final AMQMethodBody methodBody = ChannelCloseBody.createMethodBody( (byte)0, (byte)9, // AMQP version (major, minor) 0, // classId 0, // methodId AMQConstant.REPLY_SUCCESS.getCode(), // replyCode "JMS client closing channel"); // replyText - _connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class); + _connection.getProtocolHandler().syncWrite(getChannelId(), methodBody, ChannelCloseOkBody.class); // When control resumes at this point, a reply will have been received that // indicates the broker has closed the channel successfully @@ -766,9 +767,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - _connection.getProtocolHandler().writeFrame(MessageRecoverBody.createAMQFrame(_channelId, - (byte)0, (byte)9, // AMQP version (major, minor) - false)); // requeue + _connection.getProtocolHandler().writeRequest(_channelId, + MessageRecoverBody.createMethodBody((byte)0, (byte)9, // AMQP version (major, minor) + false)); // requeue } boolean isInRecovery() @@ -1103,7 +1104,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId, + AMQMethodBody methodBody = ExchangeDeclareBody.createMethodBody( (byte)0, (byte)9, // AMQP version (major, minor) null, // arguments false, // autoDelete @@ -1114,7 +1115,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi false, // passive 0, // ticket type); // type - _connection.getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class); + _connection.getProtocolHandler().syncWrite(_channelId, methodBody, ExchangeDeclareOkBody.class); } private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler) @@ -1127,7 +1128,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId, + AMQMethodBody methodBody = ExchangeDeclareBody.createMethodBody( (byte)0, (byte)9, // AMQP version (major, minor) null, // arguments false, // autoDelete @@ -1138,7 +1139,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi false, // passive 0, // ticket type); // type - protocolHandler.writeFrame(exchangeDeclare); + protocolHandler.writeRequest(_channelId, methodBody); } /** @@ -1162,7 +1163,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, + AMQMethodBody methodBody = QueueDeclareBody.createMethodBody( (byte)0, (byte)9, // AMQP version (major, minor) null, // arguments amqd.isAutoDelete(), // autoDelete @@ -1173,7 +1174,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi amqd.getQueueName(), // queue 0); // ticket - protocolHandler.writeFrame(queueDeclare); + protocolHandler.writeRequest(_channelId, methodBody); return amqd.getQueueName(); } @@ -1182,7 +1183,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, + AMQMethodBody methodBody = QueueBindBody.createMethodBody( (byte)0, (byte)9, // AMQP version (major, minor) ft, // arguments amqd.getExchangeName(), // exchange @@ -1191,7 +1192,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi amqd.getRoutingKey(), // routingKey 0); // ticket - protocolHandler.writeFrame(queueBind); + protocolHandler.writeRequest(_channelId, methodBody); } /** @@ -1241,7 +1242,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi queueName, // queue 0); // ticket */ - AMQFrame jmsConsume = MessageConsumeBody.createAMQFrame(_channelId, + AMQMethodBody methodBody = MessageConsumeBody.createMethodBody( (byte)0, (byte)9, // AMQP version (major, minor) tag, // consumerTag consumer.isExclusive(), // exclusive @@ -1260,7 +1261,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class); }*/ - protocolHandler.syncWrite(jmsConsume,MessageOkBody.class); + protocolHandler.syncWrite(_channelId, methodBody, MessageOkBody.class); } catch (AMQException e) { @@ -1432,14 +1433,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId, + AMQMethodBody methodBody = QueueDeleteBody.createMethodBody( (byte)0, (byte)9, // AMQP version (major, minor) false, // ifEmpty false, // ifUnused true, // nowait queueName, // queue 0); // ticket - _connection.getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class); + _connection.getProtocolHandler().syncWrite(_channelId, methodBody, QueueDeleteOkBody.class); } catch (AMQException e) { @@ -1527,7 +1528,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, + AMQMethodBody methodBody = ExchangeBoundBody.createMethodBody( (byte)0, (byte)9, // AMQP version (major, minor) ExchangeDefaults.TOPIC_EXCHANGE_NAME, // exchange queueName, // queue @@ -1535,7 +1536,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi AMQMethodEvent response = null; try { - response = _connection.getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class); + response = _connection.getProtocolHandler().syncWrite(_channelId, methodBody, ExchangeBoundOkBody.class); } catch (AMQException e) { @@ -1586,19 +1587,19 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @param multiple if true will acknowledge all messages up to and including the one specified by the * delivery tag */ - public void acknowledgeMessage(long deliveryTag, boolean multiple) + public void acknowledgeMessage(long requestId, boolean multiple) { // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - final AMQFrame ackFrame = MessageOkBody.createAMQFrame(_channelId,(byte)0, (byte)9); // AMQP version (major, minor) + final AMQMethodBody methodBody = MessageOkBody.createMethodBody((byte)0, (byte)9); // AMQP version (major, minor) //deliveryTag, // deliveryTag //multiple); // multiple if (_logger.isDebugEnabled()) { - _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId); + _logger.debug("Sending ack for request ID " + requestId + " on channel " + _channelId); } - _connection.getProtocolHandler().writeFrame(ackFrame); + _connection.getProtocolHandler().writeResponse(_channelId, requestId, methodBody); } public int getDefaultPrefetch() @@ -1755,10 +1756,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, + AMQMethodBody methodBody = ChannelFlowBody.createMethodBody( (byte)0, (byte)9, // AMQP version (major, minor) false); // active - _connection.getProtocolHandler().writeFrame(channelFlowFrame); + _connection.getProtocolHandler().writeRequest(_channelId, methodBody); } private void unsuspendChannel() @@ -1767,10 +1768,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, + AMQMethodBody methodBody = ChannelFlowBody.createMethodBody( (byte)0, (byte)9, // AMQP version (major, minor) true); // active - _connection.getProtocolHandler().writeFrame(channelFlowFrame); + _connection.getProtocolHandler().writeRequest(_channelId, methodBody); } public void confirmConsumerCancelled(String consumerTag) diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index b13b22316a..55cb28be42 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -39,6 +39,7 @@ import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.MessageCancelBody; import org.apache.qpid.framing.MessageOkBody; @@ -452,13 +453,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - final AMQFrame cancelFrame = MessageCancelBody.createAMQFrame(_channelId, + final AMQMethodBody cancelBody = MessageCancelBody.createMethodBody( (byte)0, (byte)9, // AMQP version (major, minor) _consumerTag); // consumerTag try { - _protocolHandler.syncWrite(cancelFrame, MessageOkBody.class); + _protocolHandler.syncWrite(_channelId, cancelBody, MessageOkBody.class); } catch (AMQException e) { diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 3d1cd8ee60..0125cc3854 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -137,7 +137,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - AMQFrame declare = ExchangeDeclareBody.createAMQFrame(_channelId, + AMQPMethodBody methodBody = ExchangeDeclareBody.createMethodBody( (byte)0, (byte)9, // AMQP version (major, minor) null, // arguments false, // autoDelete @@ -148,7 +148,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j false, // passive 0, // ticket destination.getExchangeClass()); // type - _protocolHandler.writeFrame(declare); + _protocolHandler.writeRequest(_channelId, methodBody); } public void setDisableMessageID(boolean b) throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java index eb24f1fa74..0007ee8b5f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java @@ -30,6 +30,7 @@ import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.ChannelCloseBody; import org.apache.qpid.framing.ChannelCloseOkBody; import org.apache.qpid.protocol.AMQConstant; @@ -61,8 +62,8 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)0, (byte)9); - protocolSession.writeFrame(frame); + AMQMethodBody methodBody = ChannelCloseOkBody.createMethodBody((byte)0, (byte)9); + protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody); if (errorCode != AMQConstant.REPLY_SUCCESS.getCode()) { _logger.error("Channel close received with errorCode " + errorCode + ", and reason " + reason); diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java index 36e9c947f3..25acd2cb6b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java @@ -63,7 +63,7 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - protocolSession.writeFrame(ConnectionCloseOkBody.createAMQFrame((short)0, (byte)0, (byte)9)); + protocolSession.writeResponse(0, evt.getRequestId(), ConnectionCloseOkBody.createMethodBody((byte)0, (byte)9)); if (errorCode != 200) { diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java index 87a8bbd529..94cf8b16c6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java @@ -22,6 +22,7 @@ package org.apache.qpid.client.handler; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.ConnectionSecureOkBody; import org.apache.qpid.framing.ConnectionSecureBody; import org.apache.qpid.protocol.AMQMethodEvent; @@ -58,10 +59,9 @@ public class ConnectionSecureMethodHandler implements StateAwareMethodListener // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - AMQFrame responseFrame = ConnectionSecureOkBody.createAMQFrame(evt.getChannelId(), - (byte)0, (byte)9, // AMQP version (major, minor) + AMQMethodBody methodBody = ConnectionSecureOkBody.createMethodBody((byte)0, (byte)9, // AMQP version (major, minor) response); // response - protocolSession.writeFrame(responseFrame); + protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody); } catch (SaslException e) { diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java index a60c298bd2..e58499f116 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java @@ -128,12 +128,12 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - protocolSession.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(), - (byte)0, (byte)9, // AMQP version (major, minor) - clientProperties, // clientProperties - selectedLocale, // locale - mechanism, // mechanism - saslResponse)); // response + protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), + ConnectionStartOkBody.createMethodBody((byte)0, (byte)9, // AMQP version (major, minor) + clientProperties, // clientProperties + selectedLocale, // locale + mechanism, // mechanism + saslResponse)); // response } catch (UnsupportedEncodingException e) { diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java index e4e74be684..f2524cfbc9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java @@ -31,6 +31,7 @@ import org.apache.qpid.framing.ConnectionOpenBody; import org.apache.qpid.framing.ConnectionTuneBody; import org.apache.qpid.framing.ConnectionTuneOkBody; import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.protocol.AMQMethodEvent; public class ConnectionTuneMethodHandler implements StateAwareMethodListener @@ -65,29 +66,29 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener protocolSession.setConnectionTuneParameters(params); stateManager.changeState(AMQState.CONNECTION_NOT_OPENED); - protocolSession.writeFrame(createTuneOkFrame(evt.getChannelId(), params)); - protocolSession.writeFrame(createConnectionOpenFrame(evt.getChannelId(), session.getAMQConnection().getVirtualHost(), null, true)); + protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), createTuneOkMethodBody(params)); + protocolSession.writeRequest(evt.getChannelId(), + createConnectionOpenMethodBody(protocolSession.getAMQConnection().getVirtualHost(), null, true), + protocolSession.getStateManager()); } - protected AMQFrame createConnectionOpenFrame(int channel, String path, String capabilities, boolean insist) + protected AMQMethodBody createConnectionOpenMethodBody(String path, String capabilities, boolean insist) { // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - return ConnectionOpenBody.createAMQFrame(channel, - (byte)0, (byte)9, // AMQP version (major, minor) + return ConnectionOpenBody.createMethodBody((byte)0, (byte)9, // AMQP version (major, minor) capabilities, // capabilities insist, // insist path); // virtualHost } - protected AMQFrame createTuneOkFrame(int channel, ConnectionTuneParameters params) + protected AMQMethodBody createTuneOkMethodBody(ConnectionTuneParameters params) { // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - return ConnectionTuneOkBody.createAMQFrame(channel, - (byte)0, (byte)9, // AMQP version (major, minor) + return ConnectionTuneOkBody.createMethodBody((byte)0, (byte)9, // AMQP version (major, minor) params.getChannelMax(), // channelMax params.getFrameMax(), // frameMax params.getHeartbeat()); // heartbeat diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 12ace3c705..735af586c7 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -40,10 +40,10 @@ import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQRequestBody; +import org.apache.qpid.framing.AMQResponseBody; import org.apache.qpid.framing.ConnectionCloseBody; import org.apache.qpid.framing.ConnectionCloseOkBody; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.HeartbeatBody; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodListener; @@ -308,48 +308,56 @@ public class AMQProtocolHandler extends IoHandlerAdapter HeartbeatDiagnostics.received(frame.bodyFrame instanceof HeartbeatBody); - if (frame.bodyFrame instanceof AMQMethodBody) + if (frame.bodyFrame instanceof AMQRequestBody) { - if (_logger.isDebugEnabled()) - { - _logger.debug("Method frame received: " + frame); - } - - final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.channel, (AMQMethodBody) frame.bodyFrame); - try - { - boolean wasAnyoneInterested = false; - while (it.hasNext()) - { - final AMQMethodListener listener = (AMQMethodListener) it.next(); - wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; - } - if (!wasAnyoneInterested) - { - throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners); - } - } - catch (AMQException e) - { - it = _frameListeners.iterator(); - while (it.hasNext()) - { - final AMQMethodListener listener = (AMQMethodListener) it.next(); - listener.error(e); - } - exceptionCaught(session, e); - } + _protocolSession.messageRequestBodyReceived(frame.channel, (AMQRequestBody)frame.bodyFrame); } - else if (frame.bodyFrame instanceof ContentHeaderBody) + else if (frame.bodyFrame instanceof AMQResponseBody) { - _protocolSession.messageContentHeaderReceived(frame.channel, - (ContentHeaderBody) frame.bodyFrame); - } - else if (frame.bodyFrame instanceof ContentBody) - { - _protocolSession.messageContentBodyReceived(frame.channel, - (ContentBody) frame.bodyFrame); + _protocolSession.messageResponseBodyReceived(frame.channel, (AMQResponseBody)frame.bodyFrame); } +// if (frame.bodyFrame instanceof AMQMethodBody) +// { +// if (_logger.isDebugEnabled()) +// { +// _logger.debug("Method frame received: " + frame); +// } +// +// final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.channel, (AMQMethodBody) frame.bodyFrame); +// try +// { +// boolean wasAnyoneInterested = false; +// while (it.hasNext()) +// { +// final AMQMethodListener listener = (AMQMethodListener) it.next(); +// wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; +// } +// if (!wasAnyoneInterested) +// { +// throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners); +// } +// } +// catch (AMQException e) +// { +// it = _frameListeners.iterator(); +// while (it.hasNext()) +// { +// final AMQMethodListener listener = (AMQMethodListener) it.next(); +// listener.error(e); +// } +// exceptionCaught(session, e); +// } +// } +// else if (frame.bodyFrame instanceof ContentHeaderBody) +// { +// _protocolSession.messageContentHeaderReceived(frame.channel, +// (ContentHeaderBody) frame.bodyFrame); +// } +// else if (frame.bodyFrame instanceof ContentBody) +// { +// _protocolSession.messageContentBodyReceived(frame.channel, +// (ContentBody) frame.bodyFrame); +// } else if (frame.bodyFrame instanceof HeartbeatBody) { _logger.debug("Received heartbeat"); @@ -402,23 +410,33 @@ public class AMQProtocolHandler extends IoHandlerAdapter { _protocolSession.writeFrame(frame, wait); } + + public long writeRequest(int channelNum, AMQMethodBody methodBody) + { + return _protocolSession.writeRequest(channelNum, methodBody, _protocolSession.getStateManager()); + } + + public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody) + { + _protocolSession.writeResponse(channelNum, requestId, methodBody); + } /** * Convenience method that writes a frame to the protocol session and waits for * a particular response. Equivalent to calling getProtocolSession().write() then * waiting for the response. * - * @param frame + * @param methodBody * @param listener the blocking listener. Note the calling thread will block. */ - private AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, + private AMQMethodEvent writeCommandFrameAndWaitForReply(int channelNum, AMQMethodBody methodBody, BlockingMethodFrameListener listener) throws AMQException { try { _frameListeners.add(listener); - _protocolSession.writeFrame(frame); + _protocolSession.writeRequest(channelNum, methodBody, listener); AMQMethodEvent e = listener.blockForFrame(); return e; @@ -436,10 +454,10 @@ public class AMQProtocolHandler extends IoHandlerAdapter /** * More convenient method to write a frame and wait for it's response. */ - public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException + public AMQMethodEvent syncWrite(int channelNum, AMQMethodBody methodBody, Class responseClass) throws AMQException { - return writeCommandFrameAndWaitForReply(frame, - new SpecificMethodFrameListener(frame.channel, responseClass)); + return writeCommandFrameAndWaitForReply(channelNum, methodBody, + new SpecificMethodFrameListener(channelNum, responseClass)); } /** @@ -477,13 +495,14 @@ public class AMQProtocolHandler extends IoHandlerAdapter // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - final AMQFrame frame = ConnectionCloseBody.createAMQFrame(0, + AMQMethodBody methodBody = ConnectionCloseBody.createMethodBody( (byte)0, (byte)9, // AMQP version (major, minor) 0, // classId 0, // methodId AMQConstant.REPLY_SUCCESS.getCode(), // replyCode "JMS client is closing the connection."); // replyText - syncWrite(frame, ConnectionCloseOkBody.class); + + syncWrite(0, methodBody, ConnectionCloseOkBody.class); _protocolSession.closeProtocolSession(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 440eef54a6..d1c86dde45 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -33,10 +33,15 @@ import org.apache.qpid.client.message.UnexpectedBodyReceivedException; import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQRequestBody; +import org.apache.qpid.framing.AMQResponseBody; import org.apache.qpid.framing.ProtocolInitiation; import org.apache.qpid.framing.ProtocolVersionList; +import org.apache.qpid.framing.RequestManager; +import org.apache.qpid.framing.ResponseManager; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.protocol.AMQProtocolWriter; import org.apache.commons.lang.StringUtils; @@ -91,6 +96,9 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis */ protected ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap(); + protected ConcurrentMap _channelId2RequestMgrMap = new ConcurrentHashMap(); + protected ConcurrentMap _channelId2ResponseMgrMap = new ConcurrentHashMap(); + /** * Counter to ensure unique queue names */ @@ -234,52 +242,76 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis { _channelId2UnprocessedMsgMap.put(message.channelId, message); } - - public void messageContentHeaderReceived(int channelId, ContentHeaderBody contentHeader) - throws AMQException + + public void messageRequestBodyReceived(int channelId, AMQRequestBody requestBody) throws Exception { - UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId); - if (msg == null) - { - throw new AMQException("Error: received content header without having received a BasicDeliver frame first"); - } - if (msg.contentHeader != null) - { - throw new AMQException("Error: received duplicate content header or did not receive correct number of content body frames"); - } - msg.contentHeader = contentHeader; - if (contentHeader.bodySize == 0) + if (_logger.isDebugEnabled()) { - deliverMessageToAMQSession(channelId, msg); + _logger.debug("Request frame received: " + requestBody); } + ResponseManager responseManager = (ResponseManager)_channelId2ResponseMgrMap.get(channelId); + if (responseManager == null) + throw new AMQException("Unable to find ResponseManager for channel " + channelId); + responseManager.requestReceived(requestBody); } - - public void messageContentBodyReceived(int channelId, ContentBody contentBody) throws AMQException + + public void messageResponseBodyReceived(int channelId, AMQResponseBody responseBody) throws Exception { - UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId); - if (msg == null) - { - throw new AMQException("Error: received content body without having received a JMSDeliver frame first"); - } - if (msg.contentHeader == null) - { - _channelId2UnprocessedMsgMap.remove(channelId); - throw new AMQException("Error: received content body without having received a ContentHeader frame first"); - } - try - { - msg.receiveBody(contentBody); - } - catch (UnexpectedBodyReceivedException e) + if (_logger.isDebugEnabled()) { - _channelId2UnprocessedMsgMap.remove(channelId); - throw e; + _logger.debug("Response frame received: " + responseBody); } - if (msg.isAllBodyDataReceived()) - { - deliverMessageToAMQSession(channelId, msg); - } - } + RequestManager requestManager = (RequestManager)_channelId2RequestMgrMap.get(channelId); + if (requestManager == null) + throw new AMQException("Unable to find RequestManager for channel " + channelId); + requestManager.responseReceived(responseBody); + } + +// public void messageContentHeaderReceived(int channelId, ContentHeaderBody contentHeader) +// throws AMQException +// { +// UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId); +// if (msg == null) +// { +// throw new AMQException("Error: received content header without having received a BasicDeliver frame first"); +// } +// if (msg.contentHeader != null) +// { +// throw new AMQException("Error: received duplicate content header or did not receive correct number of content body frames"); +// } +// msg.contentHeader = contentHeader; +// if (contentHeader.bodySize == 0) +// { +// deliverMessageToAMQSession(channelId, msg); +// } +// } +// +// public void messageContentBodyReceived(int channelId, ContentBody contentBody) throws AMQException +// { +// UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId); +// if (msg == null) +// { +// throw new AMQException("Error: received content body without having received a JMSDeliver frame first"); +// } +// if (msg.contentHeader == null) +// { +// _channelId2UnprocessedMsgMap.remove(channelId); +// throw new AMQException("Error: received content body without having received a ContentHeader frame first"); +// } +// try +// { +// msg.receiveBody(contentBody); +// } +// catch (UnexpectedBodyReceivedException e) +// { +// _channelId2UnprocessedMsgMap.remove(channelId); +// throw e; +// } +// if (msg.isAllBodyDataReceived()) +// { +// deliverMessageToAMQSession(channelId, msg); +// } +// } /** * Deliver a message to the appropriate session, removing the unprocessed message @@ -293,6 +325,31 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis session.messageReceived(msg); _channelId2UnprocessedMsgMap.remove(channelId); } + + public long writeRequest(int channelNum, AMQMethodBody methodBody, + AMQMethodListener methodListener) + throws AMQException + { + RequestManager requestManager = (RequestManager)_channelId2RequestMgrMap.get(channelNum); + if (requestManager == null) + throw new AMQException("Unable to find RequestManager for channel " + channelNum); + requestManager.sendRequest(methodBody, methodListener); + } + + public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody) + throws AMQException + { + ResponseManager responseManager = (ResponseManager)_channelId2ResponseMgrMap.get(channelNum); + if (responseManager == null) + throw new AMQException("Unable to find ResponseManager for channel " + channelNum); + responseManager.sendResponse(requestId, methodBody); + } + + public void writeResponse(AMQMethodEvent evt, AMQMethodBody response) + throws AMQException + { + writeResponse(evt.getChannelId(), evt.getRequestId(), response); + } /** * Convenience method that writes a frame to the protocol session. Equivalent @@ -330,6 +387,17 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis } _logger.debug("Add session with channel id " + channelId); _channelId2SessionMap.put(channelId, session); + + // Add request and response handlers, one per channel, if they do not already exist + if (_channelId2RequestMgrMap.get(channelId) == null) + { + _channelId2RequestMgrMap.put(channelId, new RequestManager(channelId, this)); + } + if (_channelId2ResponseMgrMap.get(channelId) == null) + { + + _channelId2ResponseMgrMap.put(channelId, new ResponseManager(channelId, _stateManager, this)); + } } public void removeSessionByChannel(int channelId) |
