From 5fb71f78117c9bc9d50be8cf1ccc1534bdadefc6 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Fri, 23 Feb 2007 11:36:44 +0000 Subject: Implementation of queue.unbind & message.get git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@510912 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/AMQChannel.java | 10 ++- .../qpid/server/handler/MessageGetHandler.java | 13 ++-- .../qpid/server/handler/QueueUnbindHandler.java | 84 ++++++++++++++++++++++ .../org/apache/qpid/server/queue/AMQMessage.java | 9 +-- .../org/apache/qpid/server/queue/AMQQueue.java | 11 ++- .../queue/ConcurrentSelectorDeliveryManager.java | 7 +- .../apache/qpid/server/queue/DeliveryManager.java | 4 +- .../apache/qpid/server/queue/ExchangeBindings.java | 13 +++- .../apache/qpid/server/state/AMQStateManager.java | 1 + .../org/apache/qpid/protocol/RequestToken.java | 69 ++++++++++++++++++ 10 files changed, 199 insertions(+), 22 deletions(-) create mode 100644 java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java create mode 100644 java/common/src/main/java/org/apache/qpid/protocol/RequestToken.java (limited to 'java') diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 3f0fb26a65..baacc01da8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -22,7 +22,6 @@ package org.apache.qpid.server; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.MessageOkBody; import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; @@ -32,12 +31,15 @@ import org.apache.qpid.framing.Content; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.MessageAppendBody; import org.apache.qpid.framing.MessageCloseBody; +import org.apache.qpid.framing.MessageGetBody; import org.apache.qpid.framing.MessageOpenBody; +import org.apache.qpid.framing.MessageOkBody; import org.apache.qpid.framing.MessageTransferBody; import org.apache.qpid.framing.RequestManager; import org.apache.qpid.framing.ResponseManager; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.protocol.AMQProtocolWriter; +import org.apache.qpid.protocol.RequestToken; import org.apache.qpid.server.ack.TxAck; import org.apache.qpid.server.ack.UnacknowledgedMessage; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; @@ -351,6 +353,12 @@ public class AMQChannel _session.writeResponse(_channelId, msg.getRequestId(), ok); } + public void deliverGet(RequestToken request, long deliveryTag, AMQMessage msg) + { + request.respond(MessageOkBody.createMethodBody(request.getMajor(), request.getMinor())); + deliver(msg, request.getRequest().destination, deliveryTag); + } + public void deliver(AMQMessage msg, AMQShortString destination, final long deliveryTag) { AMQMethodListener listener = new AMQMethodListener() diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java index 7ebcd6ab38..d0fb2a43fe 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java @@ -28,6 +28,7 @@ import org.apache.qpid.framing.MessageEmptyBody; import org.apache.qpid.framing.MessageOkBody; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.RequestToken; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.ConsumerTagNotUniqueException; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -89,13 +90,11 @@ public class MessageGetHandler implements StateAwareMethodListener request = + new RequestToken(session, evt, + session.getProtocolMajorVersion(), + session.getProtocolMinorVersion()); + if(!queue.performGet(request, channel)) { session.writeResponse(evt, MessageEmptyBody.createMethodBody( session.getProtocolMajorVersion(), // AMQP major version diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java new file mode 100644 index 0000000000..2464a05e57 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java @@ -0,0 +1,84 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.handler; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.QueueUnbindBody; +import org.apache.qpid.framing.QueueUnbindOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import org.apache.log4j.Logger; + +public class QueueUnbindHandler implements StateAwareMethodListener +{ + private static final Logger _log = Logger.getLogger(QueueUnbindHandler.class); + + private static final QueueUnbindHandler _instance = new QueueUnbindHandler(); + + public static QueueUnbindHandler getInstance() + { + return _instance; + } + + private QueueUnbindHandler() {} + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + { + AMQProtocolSession session = stateManager.getProtocolSession(); + final QueueUnbindBody body = evt.getMethod(); + VirtualHost virtualHost = session.getVirtualHost(); + ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); + QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + + + final AMQQueue queue = queueRegistry.getQueue(body.queue); + if (queue == null) + { + throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Queue " + body.queue + " does not exist."); + } + final Exchange exch = exchangeRegistry.getExchange(body.exchange); + if (exch == null) + { + throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Exchange " + body.exchange + " does not exist."); + } + + queue.unbind(body.routingKey, exch);//TODO: this should take the args as well + if (_log.isInfoEnabled()) + { + _log.info("Unbinding queue " + queue + " from exchange " + exch + " with routing key " + body.routingKey); + } + // Be aware of possible changes to parameter order as versions change. + final AMQMethodBody response = QueueUnbindOkBody.createMethodBody( + session.getProtocolMajorVersion(), // AMQP major version + session.getProtocolMinorVersion()); // AMQP minor version + session.writeResponse(evt, response); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 35b779da25..711e045516 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -626,14 +626,7 @@ public class AMQMessage { throw new Error("XXX"); } - public void writeGetOk(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) throws AMQException - { - throw new Error("XXX"); - } - private ByteBuffer createEncodedGetOkFrame(int channelId, long deliveryTag, int queueSize) - { - throw new Error("XXX"); - } + // Robert Godfrey added these in r503604 public long getArrivalTime() { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 2f128a3e3e..e11edb47b0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -24,6 +24,8 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.MessageGetBody; +import org.apache.qpid.protocol.RequestToken; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; @@ -349,6 +351,11 @@ public class AMQQueue implements Managable, Comparable _bindings.addBinding(routingKey, exchange); } + public void unbind(AMQShortString routingKey, Exchange exchange) throws AMQException + { + _bindings.unbind(routingKey, exchange); + } + public void registerProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag, boolean acks, FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException @@ -598,9 +605,9 @@ public class AMQQueue implements Managable, Comparable } } - public boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException + public boolean performGet(RequestToken request, AMQChannel channel) throws AMQException { - return _deliveryMgr.performGet(session, channel, acks); + return _deliveryMgr.performGet(request, channel); } public QueueRegistry getQueueRegistry() diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index fcbcc2f09a..2ec8f80611 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -25,6 +25,8 @@ import org.apache.qpid.AMQException; import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; import org.apache.qpid.configuration.Configured; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.MessageGetBody; +import org.apache.qpid.protocol.RequestToken; import org.apache.qpid.server.configuration.Configurator; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.AMQChannel; @@ -185,8 +187,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } - public boolean performGet(AMQProtocolSession protocolSession, AMQChannel channel, boolean acks) throws AMQException + public boolean performGet(RequestToken request, AMQChannel channel) throws AMQException { + final boolean acks = !request.getRequest().noAck; AMQMessage msg = getNextMessage(); if(msg == null) { @@ -223,7 +226,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager channel.addUnacknowledgedMessage(msg, deliveryTag, null, _queue); } - msg.writeGetOk(protocolSession, channel.getChannelId(), deliveryTag, _queue.getMessageCount()); + channel.deliverGet(request, deliveryTag, msg); _totalMessageSize.addAndGet(-msg.getSize()); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java index c7c1643fb8..f4188c54c4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java @@ -22,6 +22,8 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.MessageGetBody; +import org.apache.qpid.protocol.RequestToken; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -81,7 +83,7 @@ interface DeliveryManager void populatePreDeliveryQueue(Subscription subscription); - boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException; + boolean performGet(RequestToken request, AMQChannel channel) throws AMQException; long getTotalMessageSize(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java index 656549e025..aca50fc007 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java @@ -44,6 +44,7 @@ class ExchangeBindings { this.routingKey = routingKey; this.exchange = exchange; + if(exchange == null) throw new NullPointerException("Can't create binding for null exchange"); } void unbind(AMQQueue queue) throws AMQException @@ -70,7 +71,8 @@ class ExchangeBindings { if (!(o instanceof ExchangeBinding)) return false; ExchangeBinding eb = (ExchangeBinding) o; - return exchange.equals(eb.exchange) && routingKey.equals(eb.routingKey); + return exchange.equals(eb.exchange) + && (routingKey == null ? eb.routingKey == null : routingKey.equals(eb.routingKey)); } } @@ -93,6 +95,15 @@ class ExchangeBindings _bindings.add(new ExchangeBinding(routingKey, exchange)); } + void unbind(AMQShortString routingKey, Exchange exchange) throws AMQException + { + ExchangeBinding b = new ExchangeBinding(routingKey, exchange); + if (_bindings.remove(b)) + { + b.unbind(_queue); + } + } + /** * Deregisters this queue from any exchange it has been bound to */ diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java index b80ed8b408..ca50b5859c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java @@ -130,6 +130,7 @@ public class AMQStateManager implements AMQMethodListener frame2handlerMap.put(MessageResumeBody.class, MessageResumeHandler.getInstance()); frame2handlerMap.put(MessageTransferBody.class, MessageTransferHandler.getInstance()); frame2handlerMap.put(QueueBindBody.class, QueueBindHandler.getInstance()); + frame2handlerMap.put(QueueUnbindBody.class, QueueUnbindHandler.getInstance()); frame2handlerMap.put(QueueDeclareBody.class, QueueDeclareHandler.getInstance()); frame2handlerMap.put(QueueDeleteBody.class, QueueDeleteHandler.getInstance()); frame2handlerMap.put(QueuePurgeBody.class, QueuePurgeHandler.getInstance()); diff --git a/java/common/src/main/java/org/apache/qpid/protocol/RequestToken.java b/java/common/src/main/java/org/apache/qpid/protocol/RequestToken.java new file mode 100644 index 0000000000..ee274f9f66 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/protocol/RequestToken.java @@ -0,0 +1,69 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.protocol; + +import org.apache.qpid.framing.AMQMethodBody; + +/** + * Allows the context for a request to be passed around, simplying the + * task of responding to it. + */ +public class RequestToken +{ + private final AMQProtocolWriter _session; + private final AMQMethodEvent _request; + private final byte _major; + private final byte _minor; + + public RequestToken(AMQProtocolWriter session, AMQMethodEvent request, byte major, byte minor) + { + _session = session; + _request = request; + _major = major; + _minor = minor; + } + + /** + * Sends a response to the request this token represents. + */ + public void respond(AMQMethodBody response) + { + _session.writeResponse(_request.getChannelId(), _request.getRequestId(), response); + } + + /** + * Provides access to the original request + */ + public M getRequest() + { + return _request.getMethod(); + } + + public byte getMajor() + { + return _major; + } + + public byte getMinor() + { + return _minor; + } +} -- cgit v1.2.1