diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2007-01-09 19:47:02 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2007-01-09 19:47:02 +0000 |
| commit | 62c921bce20ab6d193602e31ccf75074efdf495d (patch) | |
| tree | 0d7087d6d7e05b5a56918b13ccb4e8d5ebbdb245 /java | |
| parent | 40fc953252097301bdc6cb8eff8b65b259e68e06 (diff) | |
| download | qpid-python-62c921bce20ab6d193602e31ccf75074efdf495d.tar.gz | |
New broker Messafe class handelrs added, removed old Basic* handlers; New framing Request and Response classes added
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@494541 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
26 files changed, 768 insertions, 473 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java deleted file mode 100644 index b2b7a21296..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.handler; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicAckBody; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.state.AMQStateManager; -import org.apache.qpid.server.state.StateAwareMethodListener; -import org.apache.qpid.server.AMQChannel; - -public class BasicAckMethodHandler implements StateAwareMethodListener<BasicAckBody> -{ - private static final BasicAckMethodHandler _instance = new BasicAckMethodHandler(); - - public static BasicAckMethodHandler getInstance() - { - return _instance; - } - - private BasicAckMethodHandler() - { - } - - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<BasicAckBody> evt) throws AMQException - { - BasicAckBody body = evt.getMethod(); - final AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); - // this method throws an AMQException if the delivery tag is not known - channel.acknowledgeMessage(body.deliveryTag, body.multiple); - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java deleted file mode 100644 index 041d8b3e6d..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.handler; - -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.BasicCancelBody; -import org.apache.qpid.framing.BasicCancelOkBody; -import org.apache.qpid.server.state.StateAwareMethodListener; -import org.apache.qpid.server.state.AMQStateManager; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.AMQMethodEvent; -import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.AMQException; - -public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicCancelBody> -{ - private static final BasicCancelMethodHandler _instance = new BasicCancelMethodHandler(); - - public static BasicCancelMethodHandler getInstance() - { - return _instance; - } - - private BasicCancelMethodHandler() - { - } - - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<BasicCancelBody> evt) throws AMQException - { - final AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); - final BasicCancelBody body = evt.getMethod(); - channel.unsubscribeConsumer(protocolSession, body.consumerTag); - if(!body.nowait) - { - // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - final AMQFrame responseFrame = BasicCancelOkBody.createAMQFrame(evt.getChannelId(), - (byte)0, (byte)9, // AMQP version (major, minor) - body.consumerTag); // consumerTag - protocolSession.writeFrame(responseFrame); - } - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java deleted file mode 100644 index 1e727f88b3..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.handler; - -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQInvalidSelectorException; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.framing.BasicConsumeBody; -import org.apache.qpid.framing.BasicConsumeOkBody; -import org.apache.qpid.framing.ConnectionCloseBody; -import org.apache.qpid.framing.ChannelCloseBody; -import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.ConsumerTagNotUniqueException; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.state.AMQStateManager; -import org.apache.qpid.server.state.StateAwareMethodListener; -import org.apache.log4j.Logger; - -public class BasicConsumeMethodHandler implements StateAwareMethodListener<BasicConsumeBody> -{ - private static final Logger _log = Logger.getLogger(BasicConsumeMethodHandler.class); - - private static final BasicConsumeMethodHandler _instance = new BasicConsumeMethodHandler(); - - public static BasicConsumeMethodHandler getInstance() - { - return _instance; - } - - private BasicConsumeMethodHandler() - { - } - - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession session, - AMQMethodEvent<BasicConsumeBody> evt) throws AMQException - { - BasicConsumeBody body = evt.getMethod(); - final int channelId = evt.getChannelId(); - - AMQChannel channel = session.getChannel(channelId); - if (channel == null) - { - _log.error("Channel " + channelId + " not found"); - // TODO: either alert or error that the - } - else - { - AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : queueRegistry.getQueue(body.queue); - - if (queue == null) - { - _log.info("No queue for '" + body.queue + "'"); - } - try - { - String consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck, - body.arguments, body.noLocal); - if (!body.nowait) - { - // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, - (byte)0, (byte)9, // AMQP version (major, minor) - consumerTag)); // consumerTag - } - - //now allow queue to start async processing of any backlog of messages - queue.deliverAsync(); - } - catch (AMQInvalidSelectorException ise) - { - _log.info("Closing connection due to invalid selector"); - // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, - (byte)0, (byte)9, // AMQP version (major, minor) - BasicConsumeBody.getClazz((byte)0, (byte)9), // classId - BasicConsumeBody.getMethod((byte)0, (byte)9), // methodId - AMQConstant.INVALID_SELECTOR.getCode(), // replyCode - ise.getMessage())); // replyText - } - catch (ConsumerTagNotUniqueException e) - { - String msg = "Non-unique consumer tag, '" + body.consumerTag + "'"; - // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, - (byte)0, (byte)9, // AMQP version (major, minor) - BasicConsumeBody.getClazz((byte)0, (byte)9), // classId - BasicConsumeBody.getMethod((byte)0, (byte)9), // methodId - AMQConstant.NOT_ALLOWED.getCode(), // replyCode - msg)); // replyText - } - } - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java deleted file mode 100644 index 5003ce4dad..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.handler; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.ChannelCloseBody; -import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.state.AMQStateManager; -import org.apache.qpid.server.state.StateAwareMethodListener; - -public class BasicPublishMethodHandler implements StateAwareMethodListener<BasicPublishBody> -{ - private static final BasicPublishMethodHandler _instance = new BasicPublishMethodHandler(); - - public static BasicPublishMethodHandler getInstance() - { - return _instance; - } - - private BasicPublishMethodHandler() - { - } - - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<BasicPublishBody> evt) throws AMQException - { - final BasicPublishBody body = evt.getMethod(); - - // TODO: check the delivery tag field details - is it unique across the broker or per subscriber? - if (body.exchange == null) - { - body.exchange = "amq.direct"; - } - Exchange e = exchangeRegistry.getExchange(body.exchange); - // if the exchange does not exist we raise a channel exception - if (e == null) - { - protocolSession.closeChannel(evt.getChannelId()); - // TODO: modify code gen to make getClazz and getMethod public methods rather than protected - // then we can remove the hardcoded 0,0 - // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - AMQFrame cf = ChannelCloseBody.createAMQFrame(evt.getChannelId(), - (byte)0, (byte)9, // AMQP version (major, minor) - ChannelCloseBody.getClazz((byte)0, (byte)9), // classId - ChannelCloseBody.getMethod((byte)0, (byte)9), // methodId - 500, // replyCode - "Unknown exchange name"); // replyText - protocolSession.writeFrame(cf); - } - else - { - // The partially populated BasicDeliver frame plus the received route body - // is stored in the channel. Once the final body frame has been received - // it is routed to the exchange. - AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); - channel.setPublishFrame(body, protocolSession); - } - } -} - diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java deleted file mode 100644 index 3d06121e6b..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.handler; - -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.BasicQosBody; -import org.apache.qpid.framing.BasicQosOkBody; -import org.apache.qpid.server.state.StateAwareMethodListener; -import org.apache.qpid.server.state.AMQStateManager; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.AMQMethodEvent; -import org.apache.qpid.AMQException; - -public class BasicQosHandler implements StateAwareMethodListener<BasicQosBody> -{ - private static final BasicQosHandler _instance = new BasicQosHandler(); - - public static BasicQosHandler getInstance() - { - return _instance; - } - - public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, - AMQProtocolSession session, AMQMethodEvent<BasicQosBody> evt) throws AMQException - { - session.getChannel(evt.getChannelId()).setPrefetchCount(evt.getMethod().prefetchCount); - // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - session.writeFrame(new AMQFrame(evt.getChannelId(), new BasicQosOkBody((byte)0, (byte)9))); - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java deleted file mode 100644 index 85e802d10d..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.handler; - -import org.apache.qpid.server.state.StateAwareMethodListener; -import org.apache.qpid.server.state.AMQStateManager; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.AMQMethodEvent; -import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.framing.BasicRecoverBody; -import org.apache.qpid.AMQException; -import org.apache.log4j.Logger; - -public class BasicRecoverMethodHandler implements StateAwareMethodListener<BasicRecoverBody> -{ - private static final Logger _logger = Logger.getLogger(BasicRecoverMethodHandler.class); - - private static final BasicRecoverMethodHandler _instance = new BasicRecoverMethodHandler(); - - public static BasicRecoverMethodHandler getInstance() - { - return _instance; - } - - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<BasicRecoverBody> evt) throws AMQException - { - _logger.debug("Recover received on protocol session " + protocolSession + " and channel " + evt.getChannelId()); - AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); - if (channel == null) - { - throw new AMQException("Unknown channel " + evt.getChannelId()); - } - channel.resend(protocolSession); - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageAppendHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageAppendHandler.java new file mode 100644 index 0000000000..91b5493f32 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageAppendHandler.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.handler; + +import org.apache.qpid.framing.MessageAppendBody; +import org.apache.qpid.server.state.StateAwareMethodListener; + +public class MessageAppendHandler implements StateAwareMethodListener<MessageAppendBody> +{ + private static MessageAppendHandler _instance = new MessageAppendHandler(); + + public static MessageAppendHandler getInstance() + { + return _instance; + } + + private MessageAppendHandler() {} + + + public void methodReceived (AMQStateManager stateManager, + QueueRegistry queueRegistry, + ExchangeRegistry exchangeRegistry, + AMQProtocolSession protocolSession, + AMQMethodEvent<MessageAppendBody> evt) + throws AMQException + { + // TODO + } +} + diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java new file mode 100644 index 0000000000..a84cbfb88e --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.handler; + +import org.apache.qpid.framing.MessageCancelBody; +import org.apache.qpid.server.state.StateAwareMethodListener; + +public class MessageCancelHandler implements StateAwareMethodListener<MessageCancelBody> +{ + private static MessageCancelHandler _instance = new MessageCancelHandler(); + + public static MessageCancelHandler getInstance() + { + return _instance; + } + + private MessageCancelHandler() {} + + + public void methodReceived (AMQStateManager stateManager, + QueueRegistry queueRegistry, + ExchangeRegistry exchangeRegistry, + AMQProtocolSession protocolSession, + AMQMethodEvent<MessageCancelBody> evt) + throws AMQException + { + // TODO + } +} + diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCheckpointHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCheckpointHandler.java new file mode 100644 index 0000000000..9dd9a6b18e --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCheckpointHandler.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.handler; + +import org.apache.qpid.framing.MessageCheckpointBody; +import org.apache.qpid.server.state.StateAwareMethodListener; + +public class MessageCheckpointHandler implements StateAwareMethodListener<MessageCheckpointBody> +{ + private static MessageCheckpointHandler _instance = new MessageCheckpointHandler(); + + public static MessageCheckpointHandler getInstance() + { + return _instance; + } + + private MessageCheckpointHandler() {} + + + public void methodReceived (AMQStateManager stateManager, + QueueRegistry queueRegistry, + ExchangeRegistry exchangeRegistry, + AMQProtocolSession protocolSession, + AMQMethodEvent<MessageCheckpointBody> evt) + throws AMQException + { + // TODO + } +} + diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCloseHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCloseHandler.java new file mode 100644 index 0000000000..5e21c1ee6c --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCloseHandler.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.handler; + +import org.apache.qpid.framing.MessageCloseBody; +import org.apache.qpid.server.state.StateAwareMethodListener; + +public class MessageCloseHandler implements StateAwareMethodListener<MessageCloseBody> +{ + private static MessageCloseHandler _instance = new MessageCloseHandler(); + + public static MessageCloseHandler getInstance() + { + return _instance; + } + + private MessageCloseHandler() {} + + + public void methodReceived (AMQStateManager stateManager, + QueueRegistry queueRegistry, + ExchangeRegistry exchangeRegistry, + AMQProtocolSession protocolSession, + AMQMethodEvent<MessageCloseBody> evt) + throws AMQException + { + // TODO + } +} + diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java new file mode 100644 index 0000000000..a2c5662703 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.handler; + +import org.apache.qpid.framing.MessageConsumeBody; +import org.apache.qpid.server.state.StateAwareMethodListener; + +public class MessageConsumeHandler implements StateAwareMethodListener<MessageConsumeBody> +{ + private static MessageConsumeHandler _instance = new MessageConsumeHandler(); + + public static MessageConsumeHandler getInstance() + { + return _instance; + } + + private MessageConsumeHandler() {} + + + public void methodReceived (AMQStateManager stateManager, + QueueRegistry queueRegistry, + ExchangeRegistry exchangeRegistry, + AMQProtocolSession protocolSession, + AMQMethodEvent<MessageConsumeBody> evt) + throws AMQException + { + // TODO + } +} + diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageEmptyHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageEmptyHandler.java new file mode 100644 index 0000000000..37d39a517a --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageEmptyHandler.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.handler; + +import org.apache.qpid.framing.MessageEmptyBody; +import org.apache.qpid.server.state.StateAwareMethodListener; + +public class MessageEmptyHandler implements StateAwareMethodListener<MessageEmptyBody> +{ + private static MessageEmptyHandler _instance = new MessageEmptyHandler(); + + public static MessageEmptyHandler getInstance() + { + return _instance; + } + + private MessageEmptyHandler() {} + + + public void methodReceived (AMQStateManager stateManager, + QueueRegistry queueRegistry, + ExchangeRegistry exchangeRegistry, + AMQProtocolSession protocolSession, + AMQMethodEvent<MessageEmptyBody> evt) + throws AMQException + { + // TODO + } +} + diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java new file mode 100644 index 0000000000..15d769e295 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.handler; + +import org.apache.qpid.framing.MessageGetBody; +import org.apache.qpid.server.state.StateAwareMethodListener; + +public class MessageGetHandler implements StateAwareMethodListener<MessageGetBody> +{ + private static MessageGetHandler _instance = new MessageGetHandler(); + + public static MessageGetHandler getInstance() + { + return _instance; + } + + private MessageGetHandler() {} + + public void methodReceived (AMQStateManager stateManager, + QueueRegistry queueRegistry, + ExchangeRegistry exchangeRegistry, + AMQProtocolSession protocolSession, + AMQMethodEvent<MessageGetBody> evt) + throws AMQException + { + // TODO + } +} + diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOffsetHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOffsetHandler.java new file mode 100644 index 0000000000..7ac075c8d4 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOffsetHandler.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.handler; + +import org.apache.qpid.framing.MessageOffsetBody; +import org.apache.qpid.server.state.StateAwareMethodListener; + +public class MessageOffsetHandler implements StateAwareMethodListener<MessageOffsetBody> +{ + private static MessageOffsetHandler _instance = new MessageOffsetHandler(); + + public static MessageOffsetHandler getInstance() + { + return _instance; + } + + private MessageOffsetHandler() {} + + + public void methodReceived (AMQStateManager stateManager, + QueueRegistry queueRegistry, + ExchangeRegistry exchangeRegistry, + AMQProtocolSession protocolSession, + AMQMethodEvent<MessageOffsetBody> evt) + throws AMQException + { + // TODO + } +} + diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOkHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOkHandler.java new file mode 100644 index 0000000000..4b2a1543cd --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOkHandler.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.handler; + +import org.apache.qpid.framing.MessageOkBody; +import org.apache.qpid.server.state.StateAwareMethodListener; + +public class MessageOkHandler implements StateAwareMethodListener<MessageOkBody> +{ + private static MessageOkHandler _instance = new MessageOkHandler(); + + public static MessageOkHandler getInstance() + { + return _instance; + } + + private MessageOkHandler() {} + + + public void methodReceived (AMQStateManager stateManager, + QueueRegistry queueRegistry, + ExchangeRegistry exchangeRegistry, + AMQProtocolSession protocolSession, + AMQMethodEvent<MessageOkBody> evt) + throws AMQException + { + // TODO + } +} + diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOpenHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOpenHandler.java new file mode 100644 index 0000000000..79679713ba --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOpenHandler.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.handler; + +import org.apache.qpid.framing.MessageOpenBody; +import org.apache.qpid.server.state.StateAwareMethodListener; + +public class MessageOpenHandler implements StateAwareMethodListener<MessageOpenBody> +{ + private static MessageOpenHandler _instance = new MessageOpenHandler(); + + public static MessageOpenHandler getInstance() + { + return _instance; + } + + private MessageOpenHandler() {} + + + public void methodReceived (AMQStateManager stateManager, + QueueRegistry queueRegistry, + ExchangeRegistry exchangeRegistry, + AMQProtocolSession protocolSession, + AMQMethodEvent<MessageOpenBody> evt) + throws AMQException + { + // TODO + } +} + diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java new file mode 100644 index 0000000000..baa01df602 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.handler; + +import org.apache.qpid.framing.MessageQosBody; +import org.apache.qpid.server.state.StateAwareMethodListener; + +public class MessageQosHandler implements StateAwareMethodListener<MessageQosBody> +{ + private static MessageQosHandler _instance = new MessageQosHandler(); + + public static MessageQosHandler getInstance() + { + return _instance; + } + + private MessageQosHandler() {} + + + public void methodReceived (AMQStateManager stateManager, + QueueRegistry queueRegistry, + ExchangeRegistry exchangeRegistry, + AMQProtocolSession protocolSession, + AMQMethodEvent<MessageQosBody> evt) + throws AMQException + { + // TODO + } +} + diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java new file mode 100644 index 0000000000..e178c60b27 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.handler; + +import org.apache.qpid.framing.MessageRecoverBody; +import org.apache.qpid.server.state.StateAwareMethodListener; + +public class MessageRecoverHandler implements StateAwareMethodListener<MessageRecoverBody> +{ + private static MessageRecoverHandler _instance = new MessageRecoverHandler(); + + public static MessageRecoverHandler getInstance() + { + return _instance; + } + + private MessageRecoverHandler() {} + + + public void methodReceived (AMQStateManager stateManager, + QueueRegistry queueRegistry, + ExchangeRegistry exchangeRegistry, + AMQProtocolSession protocolSession, + AMQMethodEvent<MessageRecoverBody> evt) + throws AMQException + { + // TODO + } +} + diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRejectHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRejectHandler.java new file mode 100644 index 0000000000..401b399fa0 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRejectHandler.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.handler; + +import org.apache.qpid.framing.MessageRejectBody; +import org.apache.qpid.server.state.StateAwareMethodListener; + +public class MessageRejectHandler implements StateAwareMethodListener<MessageRejectBody> +{ + private static MessageRejectHandler _instance = new MessageRejectHandler(); + + public static MessageRejectHandler getInstance() + { + return _instance; + } + + private MessageRejectHandler() {} + + + public void methodReceived (AMQStateManager stateManager, + QueueRegistry queueRegistry, + ExchangeRegistry exchangeRegistry, + AMQProtocolSession protocolSession, + AMQMethodEvent<MessageRejectBody> evt) + throws AMQException + { + // TODO + } +} + diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageResumeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageResumeHandler.java new file mode 100644 index 0000000000..429514cc5b --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageResumeHandler.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.handler; + +import org.apache.qpid.framing.MessageResumeBody; +import org.apache.qpid.server.state.StateAwareMethodListener; + +public class MessageResumeHandler implements StateAwareMethodListener<MessageResumeBody> +{ + private static MessageResumeHandler _instance = new MessageResumeHandler(); + + public static MessageResumeHandler getInstance() + { + return _instance; + } + + private MessageResumeHandler() {} + + + public void methodReceived (AMQStateManager stateManager, + QueueRegistry queueRegistry, + ExchangeRegistry exchangeRegistry, + AMQProtocolSession protocolSession, + AMQMethodEvent<MessageResumeBody> evt) + throws AMQException + { + // TODO + } +} + diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java new file mode 100644 index 0000000000..18027fdc2b --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.handler; + +import org.apache.qpid.framing.MessageTransferBody; +import org.apache.qpid.server.state.StateAwareMethodListener; + +public class MessageTransferHandler implements StateAwareMethodListener<MessageTransferBody> +{ + private static MessageTransferHandler _instance = new MessageTransferHandler(); + + public static MessageTransferHandler getInstance() + { + return _instance; + } + + private MessageTransferHandler() {} + + + public void methodReceived (AMQStateManager stateManager, + QueueRegistry queueRegistry, + ExchangeRegistry exchangeRegistry, + AMQProtocolSession protocolSession, + AMQMethodEvent<MessageTransferBody> evt) + throws AMQException + { + // TODO + } +} + diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 7bb8109449..525978e348 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -195,7 +195,15 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { AMQFrame frame = (AMQFrame) message; - if (frame.bodyFrame instanceof AMQMethodBody) + if (frame.bodyFrame instanceof AMQRequest) + { + requestFrameReceived(frame); + } + else if (frame.bodyFrame instanceof AMQResponse) + { + responseFrameReceived(frame); + } + else if (frame.bodyFrame instanceof AMQMethodBody) { methodFrameReceived(frame); } @@ -215,6 +223,22 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } } } + + private void requestFrameReceived(AMQFrame frame) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Request frame received: " + frame); + } + } + + private void responseFrameReceived(AMQFrame frame) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Response frame received: " + frame); + } + } private void methodFrameReceived(AMQFrame frame) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java index 4e9deeb8db..509d7761dd 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java @@ -110,12 +110,21 @@ public class AMQStateManager implements AMQMethodListener frame2handlerMap.put(ExchangeDeclareBody.class, ExchangeDeclareHandler.getInstance()); frame2handlerMap.put(ExchangeDeleteBody.class, ExchangeDeleteHandler.getInstance()); frame2handlerMap.put(ExchangeBoundBody.class, ExchangeBoundHandler.getInstance()); - frame2handlerMap.put(BasicAckBody.class, BasicAckMethodHandler.getInstance()); - frame2handlerMap.put(BasicRecoverBody.class, BasicRecoverMethodHandler.getInstance()); - frame2handlerMap.put(BasicConsumeBody.class, BasicConsumeMethodHandler.getInstance()); - frame2handlerMap.put(BasicCancelBody.class, BasicCancelMethodHandler.getInstance()); - frame2handlerMap.put(BasicPublishBody.class, BasicPublishMethodHandler.getInstance()); - frame2handlerMap.put(BasicQosBody.class, BasicQosHandler.getInstance()); + frame2handlerMap.put(MessageAppendBody.class, MessageAppendHandler.getInstance()); + frame2handlerMap.put(MessageCancelBody.class, MessageCancelHandler.getInstance()); + frame2handlerMap.put(MessageCheckpointBody.class, MessageCheckpointHandler.getInstance()); + frame2handlerMap.put(MessageCloseBody.class, MessageCloseHandler.getInstance()); + frame2handlerMap.put(MessageConsumeBody.class, MessageConsumeHandler.getInstance()); + frame2handlerMap.put(MessageEmptyBody.class, MessageEmptyHandler.getInstance()); + frame2handlerMap.put(MessageGetBody.class, MessageGetHandler.getInstance()); + frame2handlerMap.put(MessageOffsetBody.class, MessageOffsetHandler.getInstance()); + frame2handlerMap.put(MessageOkBody.class, MessageOkHandler.getInstance()); + frame2handlerMap.put(MessageOpenBody.class, MessageOpenHandler.getInstance()); + frame2handlerMap.put(MessageQosBody.class, MessageQosHandler.getInstance()); + frame2handlerMap.put(MessageRecoverBody.class, MessageRecoverHandler.getInstance()); + frame2handlerMap.put(MessageRejectBody.class, MessageRejectHandler.getInstance()); + frame2handlerMap.put(MessageResumeBody.class, MessageResumeHandler.getInstance()); + frame2handlerMap.put(MessageTransferBody.class, MessageTransferHandler.getInstance()); frame2handlerMap.put(QueueBindBody.class, QueueBindHandler.getInstance()); frame2handlerMap.put(QueueDeclareBody.class, QueueDeclareHandler.getInstance()); frame2handlerMap.put(QueueDeleteBody.class, QueueDeleteHandler.getInstance()); diff --git a/java/common/protocol-version.xml b/java/common/protocol-version.xml index ed56e87703..711efc4481 100644 --- a/java/common/protocol-version.xml +++ b/java/common/protocol-version.xml @@ -27,8 +27,8 @@ <property name="generated.dir" location="${generated.path}/${generated.package}" /> <property name="generated.timestamp" location="${generated.dir}/timestamp" /> <property name="xml.spec.dir" location="${topDirectoryLocation}/../specs" /> - <property name="xml.spec.deps" value="amqp.0-9.xml cluster.0-9.xml amqp-nogen.0-9.xml" /> - <property name="xml.spec.list" value="${xml.spec.dir}/amqp.0-9.xml ${xml.spec.dir}/cluster.0-9.xml ${xml.spec.dir}/amqp-nogen.0-9.xml" /> + <property name="xml.spec.deps" value="amqp.0-9.xml cluster.0-9.xml exchange-bound.0-9.xml amqp-nogen.0-9.xml" /> + <property name="xml.spec.list" value="${xml.spec.dir}/amqp.0-9.xml ${xml.spec.dir}/cluster.0-9.xml ${xml.spec.dir}/exchange-bound.0-9.xml ${xml.spec.dir}/amqp-nogen.0-9.xml" /> <target name="generate" depends="compile_generator,check_generate_deps" unless="generation.notRequired"> <mkdir dir="${generated.dir}"/> diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQRequest.java b/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java index 821b5d177d..a60db0fa3f 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQRequest.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java @@ -22,7 +22,7 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; -public class AMQRequest extends AMQBody +public class AMQRequestBody extends AMQBody { public static final byte TYPE = (byte)AmqpConstants.frameRequestAsInt(); @@ -33,7 +33,7 @@ public class AMQRequest extends AMQBody // Constructor - public AMQRequest() {} + public AMQRequestBody() {} // Field methods @@ -72,7 +72,7 @@ public class AMQRequest extends AMQBody public static AMQFrame createAMQFrame(int channelId, long requestId, long responseMark, AMQMethodBody methodPayload) { - AMQResponse responseFrame = new AMQResponse(); + AMQResponseBody responseFrame = new AMQResponseBody(); responseFrame.requestId = requestId; responseFrame.responseMark = responseMark; responseFrame.methodPayload = methodPayload; diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQResponse.java b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java index bf4bec8cda..c86bcbe2ad 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQResponse.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java @@ -22,36 +22,20 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; -public class AMQResponse extends AMQBody +public class AMQResponseBody extends AMQRequestBody { public static final byte TYPE = (byte)AmqpConstants.frameResponseAsInt(); // Fields declared in specification - public long requestId; - public long responseMark; public int batchOffset; - public AMQMethodBody methodPayload; // Constructor - public AMQResponse() {} + public AMQResponseBody() {} // Field methods - public long getRequestId() { return requestId; } - public long getResponseMark() { return responseMark; } public int getBatchOffset() { return batchOffset; } - public AMQMethodBody getMethodPayload() { return methodPayload; } - - protected byte getFrameType() - { - return TYPE; - } - - protected int getSize() - { - return 8 + 8 + 4 + methodPayload.getBodySize(); - } - + protected void writePayload(ByteBuffer buffer) { EncodingUtils.writeLong(buffer, requestId); @@ -72,7 +56,7 @@ public class AMQResponse extends AMQBody public static AMQFrame createAMQFrame(int channelId, long requestId, long responseMark, int batchOffset, AMQMethodBody methodPayload) { - AMQResponse responseFrame = new AMQResponse(); + AMQResponseBody responseFrame = new AMQResponseBody(); responseFrame.requestId = requestId; responseFrame.responseMark = responseMark; responseFrame.batchOffset = batchOffset; |
