diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2008-04-24 17:49:03 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2008-04-24 17:49:03 +0000 |
commit | d964eae817b538c532996af0b41993d128fa5a5c (patch) | |
tree | b0f9a56bc8a7691bd4cf009cbc83cf0fc8aa3ffc /java/broker/src/main/java | |
parent | 757d86d81e811f105f72fdfce5bc18d83aaa08d4 (diff) | |
download | qpid-python-d964eae817b538c532996af0b41993d128fa5a5c.tar.gz |
QPID-832 : Fix eol-style
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@651325 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main/java')
19 files changed, 2622 insertions, 2622 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index e7c887f306..f1b383eac9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -1,240 +1,240 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.exchange;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.management.MBeanConstructor;
-import org.apache.qpid.server.management.MBeanDescription;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-import javax.management.JMException;
-import javax.management.MBeanException;
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.CompositeDataSupport;
-import javax.management.openmbean.OpenDataException;
-import javax.management.openmbean.TabularData;
-import javax.management.openmbean.TabularDataSupport;
-import java.util.List;
-import java.util.Map;
-import java.util.ArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
-
-public class FanoutExchange extends AbstractExchange
-{
- private static final Logger _logger = Logger.getLogger(FanoutExchange.class);
-
- /**
- * Maps from queue name to queue instances
- */
- private final CopyOnWriteArraySet<AMQQueue> _queues = new CopyOnWriteArraySet<AMQQueue>();
-
- /**
- * MBean class implementing the management interfaces.
- */
- @MBeanDescription("Management Bean for Fanout Exchange")
- private final class FanoutExchangeMBean extends ExchangeMBean
- {
- @MBeanConstructor("Creates an MBean for AMQ fanout exchange")
- public FanoutExchangeMBean() throws JMException
- {
- super();
- _exchangeType = "fanout";
- init();
- }
-
- public TabularData bindings() throws OpenDataException
- {
-
- _bindingList = new TabularDataSupport(_bindinglistDataType);
-
- for (AMQQueue queue : _queues)
- {
- String queueName = queue.getName().toString();
-
- Object[] bindingItemValues = {queueName, new String[]{queueName}};
- CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
- _bindingList.put(bindingData);
- }
-
- return _bindingList;
- }
-
- public void createNewBinding(String queueName, String binding) throws JMException
- {
- AMQQueue queue = getQueueRegistry().getQueue(new AMQShortString(queueName));
- if (queue == null)
- {
- throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange.");
- }
-
- try
- {
- queue.bind(new AMQShortString(binding), null, FanoutExchange.this);
- }
- catch (AMQException ex)
- {
- throw new MBeanException(ex);
- }
- }
-
- } // End of MBean class
-
- protected ExchangeMBean createMBean() throws AMQException
- {
- try
- {
- return new FanoutExchange.FanoutExchangeMBean();
- }
- catch (JMException ex)
- {
- _logger.error("Exception occured in creating the direct exchange mbean", ex);
- throw new AMQException("Exception occured in creating the direct exchange mbean", ex);
- }
- }
-
- public static final ExchangeType<FanoutExchange> TYPE = new ExchangeType<FanoutExchange>()
- {
-
- public AMQShortString getName()
- {
- return ExchangeDefaults.FANOUT_EXCHANGE_CLASS;
- }
-
- public Class<FanoutExchange> getExchangeClass()
- {
- return FanoutExchange.class;
- }
-
- public FanoutExchange newInstance(VirtualHost host,
- AMQShortString name,
- boolean durable,
- int ticket,
- boolean autoDelete) throws AMQException
- {
- FanoutExchange exch = new FanoutExchange();
- exch.initialise(host, name, durable, ticket, autoDelete);
- return exch;
- }
-
- public AMQShortString getDefaultExchangeName()
- {
- return ExchangeDefaults.FANOUT_EXCHANGE_NAME;
- }
- };
-
- public Map<AMQShortString, List<AMQQueue>> getBindings()
- {
- return null;
- }
-
- public AMQShortString getType()
- {
- return ExchangeDefaults.FANOUT_EXCHANGE_CLASS;
- }
-
- public void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
- {
- assert queue != null;
-
- if (_queues.contains(queue))
- {
- _logger.debug("Queue " + queue + " is already registered");
- }
- else
- {
- _queues.add(queue);
- _logger.debug("Binding queue " + queue + " with routing key " + routingKey + " to exchange " + this);
- }
- }
-
- public void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
- {
- assert queue != null;
-
- if (!_queues.remove(queue))
- {
- throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName() + ". ");
- }
- }
-
- public void route(AMQMessage payload) throws AMQException
- {
- final MessagePublishInfo publishInfo = payload.getMessagePublishInfo();
- final AMQShortString routingKey = publishInfo.getRoutingKey();
- if ((_queues == null) || _queues.isEmpty())
- {
- String msg = "No queues bound to " + this;
- if (publishInfo.isMandatory() || publishInfo.isImmediate())
- {
- throw new NoRouteException(msg, payload);
- }
- else
- {
- _logger.warn(msg);
- }
- }
- else
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Publishing message to queue " + _queues);
- }
-
- payload.enqueue(new ArrayList(_queues));
-
- }
- }
-
- public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
- {
- return isBound(routingKey, queue);
- }
-
- public boolean isBound(AMQShortString routingKey, AMQQueue queue)
- {
- return _queues.contains(queue);
- }
-
- public boolean isBound(AMQShortString routingKey)
- {
-
- return (_queues != null) && !_queues.isEmpty();
- }
-
- public boolean isBound(AMQQueue queue)
- {
-
- return _queues.contains(queue);
- }
-
- public boolean hasBindings()
- {
- return !_queues.isEmpty();
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.exchange; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.management.MBeanConstructor; +import org.apache.qpid.server.management.MBeanDescription; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import javax.management.JMException; +import javax.management.MBeanException; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; +import java.util.List; +import java.util.Map; +import java.util.ArrayList; +import java.util.concurrent.CopyOnWriteArraySet; + +public class FanoutExchange extends AbstractExchange +{ + private static final Logger _logger = Logger.getLogger(FanoutExchange.class); + + /** + * Maps from queue name to queue instances + */ + private final CopyOnWriteArraySet<AMQQueue> _queues = new CopyOnWriteArraySet<AMQQueue>(); + + /** + * MBean class implementing the management interfaces. + */ + @MBeanDescription("Management Bean for Fanout Exchange") + private final class FanoutExchangeMBean extends ExchangeMBean + { + @MBeanConstructor("Creates an MBean for AMQ fanout exchange") + public FanoutExchangeMBean() throws JMException + { + super(); + _exchangeType = "fanout"; + init(); + } + + public TabularData bindings() throws OpenDataException + { + + _bindingList = new TabularDataSupport(_bindinglistDataType); + + for (AMQQueue queue : _queues) + { + String queueName = queue.getName().toString(); + + Object[] bindingItemValues = {queueName, new String[]{queueName}}; + CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues); + _bindingList.put(bindingData); + } + + return _bindingList; + } + + public void createNewBinding(String queueName, String binding) throws JMException + { + AMQQueue queue = getQueueRegistry().getQueue(new AMQShortString(queueName)); + if (queue == null) + { + throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange."); + } + + try + { + queue.bind(new AMQShortString(binding), null, FanoutExchange.this); + } + catch (AMQException ex) + { + throw new MBeanException(ex); + } + } + + } // End of MBean class + + protected ExchangeMBean createMBean() throws AMQException + { + try + { + return new FanoutExchange.FanoutExchangeMBean(); + } + catch (JMException ex) + { + _logger.error("Exception occured in creating the direct exchange mbean", ex); + throw new AMQException("Exception occured in creating the direct exchange mbean", ex); + } + } + + public static final ExchangeType<FanoutExchange> TYPE = new ExchangeType<FanoutExchange>() + { + + public AMQShortString getName() + { + return ExchangeDefaults.FANOUT_EXCHANGE_CLASS; + } + + public Class<FanoutExchange> getExchangeClass() + { + return FanoutExchange.class; + } + + public FanoutExchange newInstance(VirtualHost host, + AMQShortString name, + boolean durable, + int ticket, + boolean autoDelete) throws AMQException + { + FanoutExchange exch = new FanoutExchange(); + exch.initialise(host, name, durable, ticket, autoDelete); + return exch; + } + + public AMQShortString getDefaultExchangeName() + { + return ExchangeDefaults.FANOUT_EXCHANGE_NAME; + } + }; + + public Map<AMQShortString, List<AMQQueue>> getBindings() + { + return null; + } + + public AMQShortString getType() + { + return ExchangeDefaults.FANOUT_EXCHANGE_CLASS; + } + + public void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + { + assert queue != null; + + if (_queues.contains(queue)) + { + _logger.debug("Queue " + queue + " is already registered"); + } + else + { + _queues.add(queue); + _logger.debug("Binding queue " + queue + " with routing key " + routingKey + " to exchange " + this); + } + } + + public void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + { + assert queue != null; + + if (!_queues.remove(queue)) + { + throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName() + ". "); + } + } + + public void route(AMQMessage payload) throws AMQException + { + final MessagePublishInfo publishInfo = payload.getMessagePublishInfo(); + final AMQShortString routingKey = publishInfo.getRoutingKey(); + if ((_queues == null) || _queues.isEmpty()) + { + String msg = "No queues bound to " + this; + if (publishInfo.isMandatory() || publishInfo.isImmediate()) + { + throw new NoRouteException(msg, payload); + } + else + { + _logger.warn(msg); + } + } + else + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Publishing message to queue " + _queues); + } + + payload.enqueue(new ArrayList(_queues)); + + } + } + + public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) + { + return isBound(routingKey, queue); + } + + public boolean isBound(AMQShortString routingKey, AMQQueue queue) + { + return _queues.contains(queue); + } + + public boolean isBound(AMQShortString routingKey) + { + + return (_queues != null) && !_queues.isEmpty(); + } + + public boolean isBound(AMQQueue queue) + { + + return _queues.contains(queue); + } + + public boolean hasBindings() + { + return !_queues.isEmpty(); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java index dd712a404c..133c97a146 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java @@ -1,65 +1,65 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.server.handler;
-
-import org.apache.qpid.framing.*;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.AMQException;
-
-/**
- * @author Apache Software Foundation
- *
- *
- */
-public class AccessRequestHandler implements StateAwareMethodListener<AccessRequestBody>
-{
- private static final AccessRequestHandler _instance = new AccessRequestHandler();
-
-
- public static AccessRequestHandler getInstance()
- {
- return _instance;
- }
-
- private AccessRequestHandler()
- {
- }
-
- public void methodReceived(AMQStateManager stateManager, AccessRequestBody body, int channelId) throws AMQException
- {
- AMQProtocolSession session = stateManager.getProtocolSession();
-
- MethodRegistry methodRegistry = session.getMethodRegistry();
-
- // We don't implement access control class, but to keep clients happy that expect it
- // always use the "0" ticket.
- AccessRequestOkBody response = methodRegistry.createAccessRequestOkBody(0);
-
- session.writeFrame(response.generateFrame(channelId));
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.handler; + +import org.apache.qpid.framing.*; +import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.AMQException; + +/** + * @author Apache Software Foundation + * + * + */ +public class AccessRequestHandler implements StateAwareMethodListener<AccessRequestBody> +{ + private static final AccessRequestHandler _instance = new AccessRequestHandler(); + + + public static AccessRequestHandler getInstance() + { + return _instance; + } + + private AccessRequestHandler() + { + } + + public void methodReceived(AMQStateManager stateManager, AccessRequestBody body, int channelId) throws AMQException + { + AMQProtocolSession session = stateManager.getProtocolSession(); + + MethodRegistry methodRegistry = session.getMethodRegistry(); + + // We don't implement access control class, but to keep clients happy that expect it + // always use the "0" ticket. + AccessRequestOkBody response = methodRegistry.createAccessRequestOkBody(0); + + session.writeFrame(response.generateFrame(channelId)); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java index f8f9127809..3731116cba 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java @@ -1,101 +1,101 @@ -/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- *
- */
-
-package org.apache.qpid.server.handler;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicGetBody;
-import org.apache.qpid.framing.BasicGetEmptyBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.security.access.Permission;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetBody>
-{
- private static final Logger _log = Logger.getLogger(BasicGetMethodHandler.class);
-
- private static final BasicGetMethodHandler _instance = new BasicGetMethodHandler();
-
- public static BasicGetMethodHandler getInstance()
- {
- return _instance;
- }
-
- private BasicGetMethodHandler()
- {
- }
-
- public void methodReceived(AMQStateManager stateManager, BasicGetBody body, int channelId) throws AMQException
- {
- AMQProtocolSession session = stateManager.getProtocolSession();
-
-
- VirtualHost vHost = session.getVirtualHost();
-
- AMQChannel channel = session.getChannel(channelId);
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId);
- }
- else
- {
- AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.getQueue());
-
- if (queue == null)
- {
- _log.info("No queue for '" + body.getQueue() + "'");
- if(body.getQueue()!=null)
- {
- throw body.getConnectionException(AMQConstant.NOT_FOUND,
- "No such queue, '" + body.getQueue()+ "'");
- }
- else
- {
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "No queue name provided, no default queue defined.");
- }
- }
- else
- {
-
- //Perform ACLs
- vHost.getAccessManager().authorise(session, Permission.CONSUME, body, queue);
-
- if (!queue.performGet(session, channel, !body.getNoAck()))
- {
- MethodRegistry methodRegistry = session.getMethodRegistry();
- // TODO - set clusterId
- BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null);
-
-
- session.writeFrame(responseBody.generateFrame(channelId));
- }
- }
- }
- }
-}
+/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ + +package org.apache.qpid.server.handler; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicGetBody; +import org.apache.qpid.framing.BasicGetEmptyBody; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.security.access.Permission; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.virtualhost.VirtualHost; + +public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetBody> +{ + private static final Logger _log = Logger.getLogger(BasicGetMethodHandler.class); + + private static final BasicGetMethodHandler _instance = new BasicGetMethodHandler(); + + public static BasicGetMethodHandler getInstance() + { + return _instance; + } + + private BasicGetMethodHandler() + { + } + + public void methodReceived(AMQStateManager stateManager, BasicGetBody body, int channelId) throws AMQException + { + AMQProtocolSession session = stateManager.getProtocolSession(); + + + VirtualHost vHost = session.getVirtualHost(); + + AMQChannel channel = session.getChannel(channelId); + if (channel == null) + { + throw body.getChannelNotFoundException(channelId); + } + else + { + AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.getQueue()); + + if (queue == null) + { + _log.info("No queue for '" + body.getQueue() + "'"); + if(body.getQueue()!=null) + { + throw body.getConnectionException(AMQConstant.NOT_FOUND, + "No such queue, '" + body.getQueue()+ "'"); + } + else + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, + "No queue name provided, no default queue defined."); + } + } + else + { + + //Perform ACLs + vHost.getAccessManager().authorise(session, Permission.CONSUME, body, queue); + + if (!queue.performGet(session, channel, !body.getNoAck())) + { + MethodRegistry methodRegistry = session.getMethodRegistry(); + // TODO - set clusterId + BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null); + + + session.writeFrame(responseBody.generateFrame(channelId)); + } + } + } + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java index 3e2cab2e53..bca35be535 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java @@ -1,75 +1,75 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.server.handler;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.framing.BasicRecoverBody;
-import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.BasicRecoverSyncBody;
-import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
-import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.AMQException;
-
-public class BasicRecoverSyncMethodHandler implements StateAwareMethodListener<BasicRecoverSyncBody>
-{
- private static final Logger _logger = Logger.getLogger(BasicRecoverSyncMethodHandler.class);
-
- private static final BasicRecoverSyncMethodHandler _instance = new BasicRecoverSyncMethodHandler();
-
- public static BasicRecoverSyncMethodHandler getInstance()
- {
- return _instance;
- }
-
- public void methodReceived(AMQStateManager stateManager, BasicRecoverSyncBody body, int channelId) throws AMQException
- {
- AMQProtocolSession session = stateManager.getProtocolSession();
-
- _logger.debug("Recover received on protocol session " + session + " and channel " + channelId);
- AMQChannel channel = session.getChannel(channelId);
-
-
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId);
- }
-
- channel.resend(body.getRequeue());
-
- // Qpid 0-8 hacks a synchronous -ok onto recover.
- // In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant
- if(session.getProtocolVersion().equals(ProtocolVersion.v0_9))
- {
- MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) session.getMethodRegistry();
- AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody();
- session.writeFrame(recoverOk.generateFrame(channelId));
-
- }
-
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.handler; + +import org.apache.log4j.Logger; + +import org.apache.qpid.framing.BasicRecoverBody; +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.BasicRecoverSyncBody; +import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; +import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0; +import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.AMQException; + +public class BasicRecoverSyncMethodHandler implements StateAwareMethodListener<BasicRecoverSyncBody> +{ + private static final Logger _logger = Logger.getLogger(BasicRecoverSyncMethodHandler.class); + + private static final BasicRecoverSyncMethodHandler _instance = new BasicRecoverSyncMethodHandler(); + + public static BasicRecoverSyncMethodHandler getInstance() + { + return _instance; + } + + public void methodReceived(AMQStateManager stateManager, BasicRecoverSyncBody body, int channelId) throws AMQException + { + AMQProtocolSession session = stateManager.getProtocolSession(); + + _logger.debug("Recover received on protocol session " + session + " and channel " + channelId); + AMQChannel channel = session.getChannel(channelId); + + + if (channel == null) + { + throw body.getChannelNotFoundException(channelId); + } + + channel.resend(body.getRequeue()); + + // Qpid 0-8 hacks a synchronous -ok onto recover. + // In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant + if(session.getProtocolVersion().equals(ProtocolVersion.v0_9)) + { + MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) session.getMethodRegistry(); + AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody(); + session.writeFrame(recoverOk.generateFrame(channelId)); + + } + + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java index cce49f13c7..a854c97f60 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java @@ -1,121 +1,121 @@ -/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- *
- */
-
-package org.apache.qpid.server.handler;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.QueuePurgeBody;
-import org.apache.qpid.framing.QueuePurgeOkBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.security.access.Permission;
-
-public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBody>
-{
- private static final QueuePurgeHandler _instance = new QueuePurgeHandler();
-
- public static QueuePurgeHandler getInstance()
- {
- return _instance;
- }
-
- private final boolean _failIfNotFound;
-
- public QueuePurgeHandler()
- {
- this(true);
- }
-
- public QueuePurgeHandler(boolean failIfNotFound)
- {
- _failIfNotFound = failIfNotFound;
- }
-
- public void methodReceived(AMQStateManager stateManager, QueuePurgeBody body, int channelId) throws AMQException
- {
- AMQProtocolSession session = stateManager.getProtocolSession();
- VirtualHost virtualHost = session.getVirtualHost();
- QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
-
- AMQChannel channel = session.getChannel(channelId);
-
-
- AMQQueue queue;
- if(body.getQueue() == null)
- {
-
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId);
- }
-
- //get the default queue on the channel:
- queue = channel.getDefaultQueue();
-
- if(queue == null)
- {
- if(_failIfNotFound)
- {
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED,"No queue specified.");
- }
- }
- }
- else
- {
- queue = queueRegistry.getQueue(body.getQueue());
- }
-
- if(queue == null)
- {
- if(_failIfNotFound)
- {
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
- }
- }
- else
- {
-
- //Perform ACLs
- virtualHost.getAccessManager().authorise(session, Permission.PURGE, body, queue);
-
- long purged = queue.clearQueue(channel.getStoreContext());
-
-
- if(!body.getNowait())
- {
-
- MethodRegistry methodRegistry = session.getMethodRegistry();
- AMQMethodBody responseBody = methodRegistry.createQueuePurgeOkBody(purged);
- session.writeFrame(responseBody.generateFrame(channelId));
-
- }
- }
- }
-}
+/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ + +package org.apache.qpid.server.handler; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.QueuePurgeBody; +import org.apache.qpid.framing.QueuePurgeOkBody; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.security.access.Permission; + +public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBody> +{ + private static final QueuePurgeHandler _instance = new QueuePurgeHandler(); + + public static QueuePurgeHandler getInstance() + { + return _instance; + } + + private final boolean _failIfNotFound; + + public QueuePurgeHandler() + { + this(true); + } + + public QueuePurgeHandler(boolean failIfNotFound) + { + _failIfNotFound = failIfNotFound; + } + + public void methodReceived(AMQStateManager stateManager, QueuePurgeBody body, int channelId) throws AMQException + { + AMQProtocolSession session = stateManager.getProtocolSession(); + VirtualHost virtualHost = session.getVirtualHost(); + QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + + AMQChannel channel = session.getChannel(channelId); + + + AMQQueue queue; + if(body.getQueue() == null) + { + + if (channel == null) + { + throw body.getChannelNotFoundException(channelId); + } + + //get the default queue on the channel: + queue = channel.getDefaultQueue(); + + if(queue == null) + { + if(_failIfNotFound) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED,"No queue specified."); + } + } + } + else + { + queue = queueRegistry.getQueue(body.getQueue()); + } + + if(queue == null) + { + if(_failIfNotFound) + { + throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist."); + } + } + else + { + + //Perform ACLs + virtualHost.getAccessManager().authorise(session, Permission.PURGE, body, queue); + + long purged = queue.clearQueue(channel.getStoreContext()); + + + if(!body.getNowait()) + { + + MethodRegistry methodRegistry = session.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createQueuePurgeOkBody(purged); + session.writeFrame(responseBody.generateFrame(channelId)); + + } + } + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl.java index 9475b83c8f..d24e4f6ffa 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl.java @@ -1,566 +1,566 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.handler;
-
-import java.util.Map;
-import java.util.HashMap;
-
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.AMQException;
-
-public class ServerMethodDispatcherImpl implements MethodDispatcher
-{
- private final AMQStateManager _stateManager;
-
- private static interface DispatcherFactory
- {
- public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager);
- }
-
- private static final Map<ProtocolVersion, DispatcherFactory> _dispatcherFactories =
- new HashMap<ProtocolVersion, DispatcherFactory>();
-
-
- static
- {
- _dispatcherFactories.put(ProtocolVersion.v8_0,
- new DispatcherFactory()
- {
- public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager)
- {
- return new ServerMethodDispatcherImpl_8_0(stateManager);
- }
- });
-
- _dispatcherFactories.put(ProtocolVersion.v0_9,
- new DispatcherFactory()
- {
- public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager)
- {
- return new ServerMethodDispatcherImpl_0_9(stateManager);
- }
- });
-
- }
-
-
- private static final AccessRequestHandler _accessRequestHandler = AccessRequestHandler.getInstance();
- private static final ChannelCloseHandler _channelCloseHandler = ChannelCloseHandler.getInstance();
- private static final ChannelOpenHandler _channelOpenHandler = ChannelOpenHandler.getInstance();
- private static final ChannelCloseOkHandler _channelCloseOkHandler = ChannelCloseOkHandler.getInstance();
- private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance();
- private static final ConnectionCloseOkMethodHandler _connectionCloseOkMethodHandler = ConnectionCloseOkMethodHandler.getInstance();
- private static final ConnectionOpenMethodHandler _connectionOpenMethodHandler = ConnectionOpenMethodHandler.getInstance();
- private static final ConnectionTuneOkMethodHandler _connectionTuneOkMethodHandler = ConnectionTuneOkMethodHandler.getInstance();
- private static final ConnectionSecureOkMethodHandler _connectionSecureOkMethodHandler = ConnectionSecureOkMethodHandler.getInstance();
- private static final ConnectionStartOkMethodHandler _connectionStartOkMethodHandler = ConnectionStartOkMethodHandler.getInstance();
- private static final ExchangeDeclareHandler _exchangeDeclareHandler = ExchangeDeclareHandler.getInstance();
- private static final ExchangeDeleteHandler _exchangeDeleteHandler = ExchangeDeleteHandler.getInstance();
- private static final ExchangeBoundHandler _exchangeBoundHandler = ExchangeBoundHandler.getInstance();
- private static final BasicAckMethodHandler _basicAckMethodHandler = BasicAckMethodHandler.getInstance();
- private static final BasicRecoverMethodHandler _basicRecoverMethodHandler = BasicRecoverMethodHandler.getInstance();
- private static final BasicConsumeMethodHandler _basicConsumeMethodHandler = BasicConsumeMethodHandler.getInstance();
- private static final BasicGetMethodHandler _basicGetMethodHandler = BasicGetMethodHandler.getInstance();
- private static final BasicCancelMethodHandler _basicCancelMethodHandler = BasicCancelMethodHandler.getInstance();
- private static final BasicPublishMethodHandler _basicPublishMethodHandler = BasicPublishMethodHandler.getInstance();
- private static final BasicQosHandler _basicQosHandler = BasicQosHandler.getInstance();
- private static final QueueBindHandler _queueBindHandler = QueueBindHandler.getInstance();
- private static final QueueDeclareHandler _queueDeclareHandler = QueueDeclareHandler.getInstance();
- private static final QueueDeleteHandler _queueDeleteHandler = QueueDeleteHandler.getInstance();
- private static final QueuePurgeHandler _queuePurgeHandler = QueuePurgeHandler.getInstance();
- private static final ChannelFlowHandler _channelFlowHandler = ChannelFlowHandler.getInstance();
- private static final TxSelectHandler _txSelectHandler = TxSelectHandler.getInstance();
- private static final TxCommitHandler _txCommitHandler = TxCommitHandler.getInstance();
- private static final TxRollbackHandler _txRollbackHandler = TxRollbackHandler.getInstance();
- private static final BasicRejectMethodHandler _basicRejectMethodHandler = BasicRejectMethodHandler.getInstance();
-
-
-
- public static MethodDispatcher createMethodDispatcher(AMQStateManager stateManager, ProtocolVersion protocolVersion)
- {
- return _dispatcherFactories.get(protocolVersion).createMethodDispatcher(stateManager);
- }
-
-
- public ServerMethodDispatcherImpl(AMQStateManager stateManager)
- {
- _stateManager = stateManager;
- }
-
-
- protected AMQStateManager getStateManager()
- {
- return _stateManager;
- }
-
-
-
- public boolean dispatchAccessRequest(AccessRequestBody body, int channelId) throws AMQException
- {
- _accessRequestHandler.methodReceived(_stateManager, body, channelId);
- return true;
- }
-
- public boolean dispatchBasicAck(BasicAckBody body, int channelId) throws AMQException
- {
- _basicAckMethodHandler.methodReceived(_stateManager, body, channelId);
- return true;
- }
-
- public boolean dispatchBasicCancel(BasicCancelBody body, int channelId) throws AMQException
- {
- _basicCancelMethodHandler.methodReceived(_stateManager, body, channelId);
- return true;
- }
-
- public boolean dispatchBasicConsume(BasicConsumeBody body, int channelId) throws AMQException
- {
- _basicConsumeMethodHandler.methodReceived(_stateManager, body, channelId);
- return true;
- }
-
- public boolean dispatchBasicGet(BasicGetBody body, int channelId) throws AMQException
- {
- _basicGetMethodHandler.methodReceived(_stateManager, body, channelId);
- return true;
- }
-
- public boolean dispatchBasicPublish(BasicPublishBody body, int channelId) throws AMQException
- {
- _basicPublishMethodHandler.methodReceived(_stateManager, body, channelId);
- return true;
- }
-
- public boolean dispatchBasicQos(BasicQosBody body, int channelId) throws AMQException
- {
- _basicQosHandler.methodReceived(_stateManager, body, channelId);
- return true;
- }
-
- public boolean dispatchBasicRecover(BasicRecoverBody body, int channelId) throws AMQException
- {
- _basicRecoverMethodHandler.methodReceived(_stateManager, body, channelId);
- return true;
- }
-
- public boolean dispatchBasicReject(BasicRejectBody body, int channelId) throws AMQException
- {
- _basicRejectMethodHandler.methodReceived(_stateManager, body, channelId);
- return true;
- }
-
- public boolean dispatchChannelOpen(ChannelOpenBody body, int channelId) throws AMQException
- {
- _channelOpenHandler.methodReceived(_stateManager, body, channelId);
- return true;
- }
-
-
- public boolean dispatchAccessRequestOk(AccessRequestOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchBasicConsumeOk(BasicConsumeOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchBasicDeliver(BasicDeliverBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchBasicGetEmpty(BasicGetEmptyBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchBasicGetOk(BasicGetOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchBasicQosOk(BasicQosOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchBasicReturn(BasicReturnBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException
- {
- _channelCloseHandler.methodReceived(_stateManager, body, channelId);
- return true;
- }
-
-
- public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId) throws AMQException
- {
- _channelCloseOkHandler.methodReceived(_stateManager, body, channelId);
- return true;
- }
-
-
- public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException
- {
- _channelFlowHandler.methodReceived(_stateManager, body, channelId);
- return true;
- }
-
- public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchChannelOpenOk(ChannelOpenOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
-
- public boolean dispatchConnectionOpen(ConnectionOpenBody body, int channelId) throws AMQException
- {
- _connectionOpenMethodHandler.methodReceived(_stateManager, body, channelId);
- return true;
- }
-
-
- public boolean dispatchConnectionClose(ConnectionCloseBody body, int channelId) throws AMQException
- {
- _connectionCloseMethodHandler.methodReceived(_stateManager, body, channelId);
- return true;
- }
-
-
- public boolean dispatchConnectionCloseOk(ConnectionCloseOkBody body, int channelId) throws AMQException
- {
- _connectionCloseOkMethodHandler.methodReceived(_stateManager, body, channelId);
- return true;
- }
-
- public boolean dispatchConnectionOpenOk(ConnectionOpenOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchConnectionRedirect(ConnectionRedirectBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchConnectionSecure(ConnectionSecureBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchConnectionStart(ConnectionStartBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchConnectionTune(ConnectionTuneBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchDtxSelectOk(DtxSelectOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchDtxStartOk(DtxStartOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchExchangeDeclareOk(ExchangeDeclareOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchExchangeDeleteOk(ExchangeDeleteOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchFileCancelOk(FileCancelOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchFileConsumeOk(FileConsumeOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchFileDeliver(FileDeliverBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchFileOpen(FileOpenBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchFileOpenOk(FileOpenOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchFileQosOk(FileQosOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchFileReturn(FileReturnBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchFileStage(FileStageBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchQueueBindOk(QueueBindOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchQueueDeclareOk(QueueDeclareOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchQueueDeleteOk(QueueDeleteOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchQueuePurgeOk(QueuePurgeOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchStreamCancelOk(StreamCancelOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchStreamConsumeOk(StreamConsumeOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchStreamDeliver(StreamDeliverBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchStreamQosOk(StreamQosOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchStreamReturn(StreamReturnBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchTxCommitOk(TxCommitOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchTxRollbackOk(TxRollbackOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchTxSelectOk(TxSelectOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
-
- public boolean dispatchConnectionSecureOk(ConnectionSecureOkBody body, int channelId) throws AMQException
- {
- _connectionSecureOkMethodHandler.methodReceived(_stateManager, body, channelId);
- return true;
- }
-
- public boolean dispatchConnectionStartOk(ConnectionStartOkBody body, int channelId) throws AMQException
- {
- _connectionStartOkMethodHandler.methodReceived(_stateManager, body, channelId);
- return true;
- }
-
- public boolean dispatchConnectionTuneOk(ConnectionTuneOkBody body, int channelId) throws AMQException
- {
- _connectionTuneOkMethodHandler.methodReceived(_stateManager, body, channelId);
- return true;
- }
-
- public boolean dispatchDtxSelect(DtxSelectBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchDtxStart(DtxStartBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchExchangeBound(ExchangeBoundBody body, int channelId) throws AMQException
- {
- _exchangeBoundHandler.methodReceived(_stateManager, body, channelId);
- return true;
- }
-
- public boolean dispatchExchangeDeclare(ExchangeDeclareBody body, int channelId) throws AMQException
- {
- _exchangeDeclareHandler.methodReceived(_stateManager, body, channelId);
- return true;
- }
-
- public boolean dispatchExchangeDelete(ExchangeDeleteBody body, int channelId) throws AMQException
- {
- _exchangeDeleteHandler.methodReceived(_stateManager, body, channelId);
- return true;
- }
-
- public boolean dispatchFileAck(FileAckBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchFileCancel(FileCancelBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchFileConsume(FileConsumeBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchFilePublish(FilePublishBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchFileQos(FileQosBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchFileReject(FileRejectBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchQueueBind(QueueBindBody body, int channelId) throws AMQException
- {
- _queueBindHandler.methodReceived(_stateManager, body, channelId);
- return true;
- }
-
- public boolean dispatchQueueDeclare(QueueDeclareBody body, int channelId) throws AMQException
- {
- _queueDeclareHandler.methodReceived(_stateManager, body, channelId);
- return true;
- }
-
- public boolean dispatchQueueDelete(QueueDeleteBody body, int channelId) throws AMQException
- {
- _queueDeleteHandler.methodReceived(_stateManager, body, channelId);
- return true;
- }
-
- public boolean dispatchQueuePurge(QueuePurgeBody body, int channelId) throws AMQException
- {
- _queuePurgeHandler.methodReceived(_stateManager, body, channelId);
- return true;
- }
-
- public boolean dispatchStreamCancel(StreamCancelBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchStreamConsume(StreamConsumeBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchStreamPublish(StreamPublishBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchStreamQos(StreamQosBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTunnelRequest(TunnelRequestBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTxCommit(TxCommitBody body, int channelId) throws AMQException
- {
- _txCommitHandler.methodReceived(_stateManager, body, channelId);
- return true;
- }
-
- public boolean dispatchTxRollback(TxRollbackBody body, int channelId) throws AMQException
- {
- _txRollbackHandler.methodReceived(_stateManager, body, channelId);
- return true;
- }
-
- public boolean dispatchTxSelect(TxSelectBody body, int channelId) throws AMQException
- {
- _txSelectHandler.methodReceived(_stateManager, body, channelId);
- return true;
- }
-
-
-
-
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.handler; + +import java.util.Map; +import java.util.HashMap; + +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.framing.*; +import org.apache.qpid.AMQException; + +public class ServerMethodDispatcherImpl implements MethodDispatcher +{ + private final AMQStateManager _stateManager; + + private static interface DispatcherFactory + { + public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager); + } + + private static final Map<ProtocolVersion, DispatcherFactory> _dispatcherFactories = + new HashMap<ProtocolVersion, DispatcherFactory>(); + + + static + { + _dispatcherFactories.put(ProtocolVersion.v8_0, + new DispatcherFactory() + { + public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager) + { + return new ServerMethodDispatcherImpl_8_0(stateManager); + } + }); + + _dispatcherFactories.put(ProtocolVersion.v0_9, + new DispatcherFactory() + { + public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager) + { + return new ServerMethodDispatcherImpl_0_9(stateManager); + } + }); + + } + + + private static final AccessRequestHandler _accessRequestHandler = AccessRequestHandler.getInstance(); + private static final ChannelCloseHandler _channelCloseHandler = ChannelCloseHandler.getInstance(); + private static final ChannelOpenHandler _channelOpenHandler = ChannelOpenHandler.getInstance(); + private static final ChannelCloseOkHandler _channelCloseOkHandler = ChannelCloseOkHandler.getInstance(); + private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance(); + private static final ConnectionCloseOkMethodHandler _connectionCloseOkMethodHandler = ConnectionCloseOkMethodHandler.getInstance(); + private static final ConnectionOpenMethodHandler _connectionOpenMethodHandler = ConnectionOpenMethodHandler.getInstance(); + private static final ConnectionTuneOkMethodHandler _connectionTuneOkMethodHandler = ConnectionTuneOkMethodHandler.getInstance(); + private static final ConnectionSecureOkMethodHandler _connectionSecureOkMethodHandler = ConnectionSecureOkMethodHandler.getInstance(); + private static final ConnectionStartOkMethodHandler _connectionStartOkMethodHandler = ConnectionStartOkMethodHandler.getInstance(); + private static final ExchangeDeclareHandler _exchangeDeclareHandler = ExchangeDeclareHandler.getInstance(); + private static final ExchangeDeleteHandler _exchangeDeleteHandler = ExchangeDeleteHandler.getInstance(); + private static final ExchangeBoundHandler _exchangeBoundHandler = ExchangeBoundHandler.getInstance(); + private static final BasicAckMethodHandler _basicAckMethodHandler = BasicAckMethodHandler.getInstance(); + private static final BasicRecoverMethodHandler _basicRecoverMethodHandler = BasicRecoverMethodHandler.getInstance(); + private static final BasicConsumeMethodHandler _basicConsumeMethodHandler = BasicConsumeMethodHandler.getInstance(); + private static final BasicGetMethodHandler _basicGetMethodHandler = BasicGetMethodHandler.getInstance(); + private static final BasicCancelMethodHandler _basicCancelMethodHandler = BasicCancelMethodHandler.getInstance(); + private static final BasicPublishMethodHandler _basicPublishMethodHandler = BasicPublishMethodHandler.getInstance(); + private static final BasicQosHandler _basicQosHandler = BasicQosHandler.getInstance(); + private static final QueueBindHandler _queueBindHandler = QueueBindHandler.getInstance(); + private static final QueueDeclareHandler _queueDeclareHandler = QueueDeclareHandler.getInstance(); + private static final QueueDeleteHandler _queueDeleteHandler = QueueDeleteHandler.getInstance(); + private static final QueuePurgeHandler _queuePurgeHandler = QueuePurgeHandler.getInstance(); + private static final ChannelFlowHandler _channelFlowHandler = ChannelFlowHandler.getInstance(); + private static final TxSelectHandler _txSelectHandler = TxSelectHandler.getInstance(); + private static final TxCommitHandler _txCommitHandler = TxCommitHandler.getInstance(); + private static final TxRollbackHandler _txRollbackHandler = TxRollbackHandler.getInstance(); + private static final BasicRejectMethodHandler _basicRejectMethodHandler = BasicRejectMethodHandler.getInstance(); + + + + public static MethodDispatcher createMethodDispatcher(AMQStateManager stateManager, ProtocolVersion protocolVersion) + { + return _dispatcherFactories.get(protocolVersion).createMethodDispatcher(stateManager); + } + + + public ServerMethodDispatcherImpl(AMQStateManager stateManager) + { + _stateManager = stateManager; + } + + + protected AMQStateManager getStateManager() + { + return _stateManager; + } + + + + public boolean dispatchAccessRequest(AccessRequestBody body, int channelId) throws AMQException + { + _accessRequestHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchBasicAck(BasicAckBody body, int channelId) throws AMQException + { + _basicAckMethodHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchBasicCancel(BasicCancelBody body, int channelId) throws AMQException + { + _basicCancelMethodHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchBasicConsume(BasicConsumeBody body, int channelId) throws AMQException + { + _basicConsumeMethodHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchBasicGet(BasicGetBody body, int channelId) throws AMQException + { + _basicGetMethodHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchBasicPublish(BasicPublishBody body, int channelId) throws AMQException + { + _basicPublishMethodHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchBasicQos(BasicQosBody body, int channelId) throws AMQException + { + _basicQosHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchBasicRecover(BasicRecoverBody body, int channelId) throws AMQException + { + _basicRecoverMethodHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchBasicReject(BasicRejectBody body, int channelId) throws AMQException + { + _basicRejectMethodHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchChannelOpen(ChannelOpenBody body, int channelId) throws AMQException + { + _channelOpenHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + + public boolean dispatchAccessRequestOk(AccessRequestOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchBasicConsumeOk(BasicConsumeOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchBasicDeliver(BasicDeliverBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchBasicGetEmpty(BasicGetEmptyBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchBasicGetOk(BasicGetOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchBasicQosOk(BasicQosOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchBasicReturn(BasicReturnBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException + { + _channelCloseHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + + public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId) throws AMQException + { + _channelCloseOkHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + + public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException + { + _channelFlowHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchChannelOpenOk(ChannelOpenOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + + public boolean dispatchConnectionOpen(ConnectionOpenBody body, int channelId) throws AMQException + { + _connectionOpenMethodHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + + public boolean dispatchConnectionClose(ConnectionCloseBody body, int channelId) throws AMQException + { + _connectionCloseMethodHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + + public boolean dispatchConnectionCloseOk(ConnectionCloseOkBody body, int channelId) throws AMQException + { + _connectionCloseOkMethodHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchConnectionOpenOk(ConnectionOpenOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchConnectionRedirect(ConnectionRedirectBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchConnectionSecure(ConnectionSecureBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchConnectionStart(ConnectionStartBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchConnectionTune(ConnectionTuneBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchDtxSelectOk(DtxSelectOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchDtxStartOk(DtxStartOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchExchangeDeclareOk(ExchangeDeclareOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchExchangeDeleteOk(ExchangeDeleteOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchFileCancelOk(FileCancelOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchFileConsumeOk(FileConsumeOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchFileDeliver(FileDeliverBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchFileOpen(FileOpenBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchFileOpenOk(FileOpenOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchFileQosOk(FileQosOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchFileReturn(FileReturnBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchFileStage(FileStageBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchQueueBindOk(QueueBindOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchQueueDeclareOk(QueueDeclareOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchQueueDeleteOk(QueueDeleteOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchQueuePurgeOk(QueuePurgeOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchStreamCancelOk(StreamCancelOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchStreamConsumeOk(StreamConsumeOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchStreamDeliver(StreamDeliverBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchStreamQosOk(StreamQosOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchStreamReturn(StreamReturnBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchTxCommitOk(TxCommitOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchTxRollbackOk(TxRollbackOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchTxSelectOk(TxSelectOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + + public boolean dispatchConnectionSecureOk(ConnectionSecureOkBody body, int channelId) throws AMQException + { + _connectionSecureOkMethodHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchConnectionStartOk(ConnectionStartOkBody body, int channelId) throws AMQException + { + _connectionStartOkMethodHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchConnectionTuneOk(ConnectionTuneOkBody body, int channelId) throws AMQException + { + _connectionTuneOkMethodHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchDtxSelect(DtxSelectBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchDtxStart(DtxStartBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchExchangeBound(ExchangeBoundBody body, int channelId) throws AMQException + { + _exchangeBoundHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchExchangeDeclare(ExchangeDeclareBody body, int channelId) throws AMQException + { + _exchangeDeclareHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchExchangeDelete(ExchangeDeleteBody body, int channelId) throws AMQException + { + _exchangeDeleteHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchFileAck(FileAckBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchFileCancel(FileCancelBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchFileConsume(FileConsumeBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchFilePublish(FilePublishBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchFileQos(FileQosBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchFileReject(FileRejectBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchQueueBind(QueueBindBody body, int channelId) throws AMQException + { + _queueBindHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchQueueDeclare(QueueDeclareBody body, int channelId) throws AMQException + { + _queueDeclareHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchQueueDelete(QueueDeleteBody body, int channelId) throws AMQException + { + _queueDeleteHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchQueuePurge(QueuePurgeBody body, int channelId) throws AMQException + { + _queuePurgeHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchStreamCancel(StreamCancelBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchStreamConsume(StreamConsumeBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchStreamPublish(StreamPublishBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchStreamQos(StreamQosBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchTunnelRequest(TunnelRequestBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchTxCommit(TxCommitBody body, int channelId) throws AMQException + { + _txCommitHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchTxRollback(TxRollbackBody body, int channelId) throws AMQException + { + _txRollbackHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + public boolean dispatchTxSelect(TxSelectBody body, int channelId) throws AMQException + { + _txSelectHandler.methodReceived(_stateManager, body, channelId); + return true; + } + + + + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java index 8b1dca77ba..382a85347b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java @@ -1,164 +1,164 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.handler;
-
-
-import org.apache.qpid.framing.amqp_0_9.MethodDispatcher_0_9;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.AMQException;
-
-
-
-public class ServerMethodDispatcherImpl_0_9
- extends ServerMethodDispatcherImpl
- implements MethodDispatcher_0_9
-
-{
-
- private static final BasicRecoverSyncMethodHandler _basicRecoverSyncMethodHandler =
- BasicRecoverSyncMethodHandler.getInstance();
- private static final QueueUnbindHandler _queueUnbindHandler =
- QueueUnbindHandler.getInstance();
-
-
- public ServerMethodDispatcherImpl_0_9(AMQStateManager stateManager)
- {
- super(stateManager);
- }
-
- public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException
- {
- _basicRecoverSyncMethodHandler.methodReceived(getStateManager(), body, channelId);
- return true;
- }
-
- public boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchChannelOk(ChannelOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchChannelPing(ChannelPingBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchChannelPong(ChannelPongBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchChannelResume(ChannelResumeBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageAppend(MessageAppendBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageCancel(MessageCancelBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageCheckpoint(MessageCheckpointBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageClose(MessageCloseBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageConsume(MessageConsumeBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageEmpty(MessageEmptyBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageGet(MessageGetBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageOffset(MessageOffsetBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageOk(MessageOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageOpen(MessageOpenBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageQos(MessageQosBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageRecover(MessageRecoverBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageReject(MessageRejectBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageResume(MessageResumeBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageTransfer(MessageTransferBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException
- {
- _queueUnbindHandler.methodReceived(getStateManager(),body,channelId);
- return true;
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.handler; + + +import org.apache.qpid.framing.amqp_0_9.MethodDispatcher_0_9; +import org.apache.qpid.framing.*; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.AMQException; + + + +public class ServerMethodDispatcherImpl_0_9 + extends ServerMethodDispatcherImpl + implements MethodDispatcher_0_9 + +{ + + private static final BasicRecoverSyncMethodHandler _basicRecoverSyncMethodHandler = + BasicRecoverSyncMethodHandler.getInstance(); + private static final QueueUnbindHandler _queueUnbindHandler = + QueueUnbindHandler.getInstance(); + + + public ServerMethodDispatcherImpl_0_9(AMQStateManager stateManager) + { + super(stateManager); + } + + public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException + { + _basicRecoverSyncMethodHandler.methodReceived(getStateManager(), body, channelId); + return true; + } + + public boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchChannelOk(ChannelOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchChannelPing(ChannelPingBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchChannelPong(ChannelPongBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchChannelResume(ChannelResumeBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageAppend(MessageAppendBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageCancel(MessageCancelBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageCheckpoint(MessageCheckpointBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageClose(MessageCloseBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageConsume(MessageConsumeBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageEmpty(MessageEmptyBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageGet(MessageGetBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageOffset(MessageOffsetBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageOk(MessageOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageOpen(MessageOpenBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageQos(MessageQosBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageRecover(MessageRecoverBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageReject(MessageRejectBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageResume(MessageResumeBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageTransfer(MessageTransferBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException + { + _queueUnbindHandler.methodReceived(getStateManager(),body,channelId); + return true; + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_8_0.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_8_0.java index d599ca3d4e..22f64cf7d3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_8_0.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_8_0.java @@ -1,86 +1,86 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.handler;
-
-import org.apache.qpid.framing.amqp_8_0.MethodDispatcher_8_0;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.AMQException;
-
-public class ServerMethodDispatcherImpl_8_0
- extends ServerMethodDispatcherImpl
- implements MethodDispatcher_8_0
-{
- public ServerMethodDispatcherImpl_8_0(AMQStateManager stateManager)
- {
- super(stateManager);
- }
-
- public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchChannelAlert(ChannelAlertBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchTestContent(TestContentBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestContentOk(TestContentOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestInteger(TestIntegerBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestIntegerOk(TestIntegerOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestString(TestStringBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestStringOk(TestStringOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestTable(TestTableBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestTableOk(TestTableOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.handler; + +import org.apache.qpid.framing.amqp_8_0.MethodDispatcher_8_0; +import org.apache.qpid.framing.*; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.AMQException; + +public class ServerMethodDispatcherImpl_8_0 + extends ServerMethodDispatcherImpl + implements MethodDispatcher_8_0 +{ + public ServerMethodDispatcherImpl_8_0(AMQStateManager stateManager) + { + super(stateManager); + } + + public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchChannelAlert(ChannelAlertBody body, int channelId) throws AMQException + { + throw new UnexpectedMethodException(body); + } + + public boolean dispatchTestContent(TestContentBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchTestContentOk(TestContentOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchTestInteger(TestIntegerBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchTestIntegerOk(TestIntegerOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchTestString(TestStringBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchTestStringOk(TestStringOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchTestTable(TestTableBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchTestTableOk(TestTableOkBody body, int channelId) throws AMQException + { + return false; + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/UnexpectedMethodException.java b/java/broker/src/main/java/org/apache/qpid/server/handler/UnexpectedMethodException.java index fb18519fe1..0abb3cdd7d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/UnexpectedMethodException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/UnexpectedMethodException.java @@ -1,33 +1,33 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.handler;
-
-
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.AMQException;
-
-public class UnexpectedMethodException extends AMQException
-{
- public UnexpectedMethodException(AMQMethodBody body)
- {
- super("Unexpected method recevied: " + body.getClass().getName());
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.handler; + + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.AMQException; + +public class UnexpectedMethodException extends AMQException +{ + public UnexpectedMethodException(AMQMethodBody body) + { + super("Unexpected method recevied: " + body.getClass().getName()); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java b/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java index e01c5aabbf..576d577b40 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java @@ -1,57 +1,57 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-/*
- * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
- * Supported AMQP versions:
- * 8-0
- */
-package org.apache.qpid.server.output;
-
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.AMQException;
-
-public interface ProtocolOutputConverter
-{
- void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag);
-
- interface Factory
- {
- ProtocolOutputConverter newInstance(AMQProtocolSession session);
- }
-
- void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
- throws AMQException;
-
- void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException;
-
- byte getProtocolMinorVersion();
-
- byte getProtocolMajorVersion();
-
- void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
- throws AMQException;
-
- void writeFrame(AMQDataBlock block);
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* + * This file is auto-generated by Qpid Gentools v.0.1 - do not modify. + * Supported AMQP versions: + * 8-0 + */ +package org.apache.qpid.server.output; + +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.AMQException; + +public interface ProtocolOutputConverter +{ + void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag); + + interface Factory + { + ProtocolOutputConverter newInstance(AMQProtocolSession session); + } + + void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag) + throws AMQException; + + void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException; + + byte getProtocolMinorVersion(); + + byte getProtocolMajorVersion(); + + void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) + throws AMQException; + + void writeFrame(AMQDataBlock block); +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java index 36e7e88fd6..02fb1429c0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java @@ -1,61 +1,61 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-/*
- * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
- * Supported AMQP versions:
- * 8-0
- */
-package org.apache.qpid.server.output;
-
-import org.apache.qpid.server.output.ProtocolOutputConverter.Factory;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.framing.ProtocolVersion;
-
-import java.util.Map;
-import java.util.HashMap;
-
-public class ProtocolOutputConverterRegistry
-{
-
- private static final Map<ProtocolVersion, Factory> _registry =
- new HashMap<ProtocolVersion, Factory>();
-
-
- static
- {
- register(ProtocolVersion.v8_0, org.apache.qpid.server.output.amqp0_8.ProtocolOutputConverterImpl.getInstanceFactory());
- register(ProtocolVersion.v0_9, org.apache.qpid.server.output.amqp0_9.ProtocolOutputConverterImpl.getInstanceFactory());
-
- }
-
- private static void register(ProtocolVersion version, Factory converter)
- {
-
- _registry.put(version,converter);
- }
-
-
- public static ProtocolOutputConverter getConverter(AMQProtocolSession session)
- {
- return _registry.get(session.getProtocolVersion()).newInstance(session);
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* + * This file is auto-generated by Qpid Gentools v.0.1 - do not modify. + * Supported AMQP versions: + * 8-0 + */ +package org.apache.qpid.server.output; + +import org.apache.qpid.server.output.ProtocolOutputConverter.Factory; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.framing.ProtocolVersion; + +import java.util.Map; +import java.util.HashMap; + +public class ProtocolOutputConverterRegistry +{ + + private static final Map<ProtocolVersion, Factory> _registry = + new HashMap<ProtocolVersion, Factory>(); + + + static + { + register(ProtocolVersion.v8_0, org.apache.qpid.server.output.amqp0_8.ProtocolOutputConverterImpl.getInstanceFactory()); + register(ProtocolVersion.v0_9, org.apache.qpid.server.output.amqp0_9.ProtocolOutputConverterImpl.getInstanceFactory()); + + } + + private static void register(ProtocolVersion version, Factory converter) + { + + _registry.put(version,converter); + } + + + public static ProtocolOutputConverter getConverter(AMQProtocolSession session) + { + return _registry.get(session.getProtocolVersion()).newInstance(session); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java index d7a879180a..d4cb7c878f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java @@ -1,285 +1,285 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-/*
- * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
- * Supported AMQP versions:
- * 8-0
- */
-package org.apache.qpid.server.output.amqp0_8;
-
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQMessageHandle;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.AMQException;
-
-import org.apache.mina.common.ByteBuffer;
-
-import java.util.Iterator;
-
-public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
-{
-
-
- public static Factory getInstanceFactory()
- {
- return new Factory()
- {
-
- public ProtocolOutputConverter newInstance(AMQProtocolSession session)
- {
- return new ProtocolOutputConverterImpl(session);
- }
- };
- }
-
- private final AMQProtocolSession _protocolSession;
-
- private ProtocolOutputConverterImpl(AMQProtocolSession session)
- {
- _protocolSession = session;
- }
-
-
- public AMQProtocolSession getProtocolSession()
- {
- return _protocolSession;
- }
-
- public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
- throws AMQException
- {
- AMQDataBlock deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- message.getContentHeaderBody());
-
- final AMQMessageHandle messageHandle = message.getMessageHandle();
- final StoreContext storeContext = message.getStoreContext();
- final Long messageId = message.getMessageId();
-
- final int bodyCount = messageHandle.getBodyCount(storeContext,messageId);
-
- if(bodyCount == 0)
- {
- SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
- contentHeader);
-
- writeFrame(compositeBlock);
- }
- else
- {
-
-
- //
- // Optimise the case where we have a single content body. In that case we create a composite block
- // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
- //
- ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
-
- AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
- writeFrame(compositeBlock);
-
- //
- // Now start writing out the other content bodies
- //
- for(int i = 1; i < bodyCount; i++)
- {
- cb = messageHandle.getContentChunk(storeContext,messageId, i);
- writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
- }
-
-
- }
-
-
- }
-
-
- public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
- {
-
- final AMQMessageHandle messageHandle = message.getMessageHandle();
- final StoreContext storeContext = message.getStoreContext();
- final long messageId = message.getMessageId();
-
- AMQDataBlock deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
-
-
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- message.getContentHeaderBody());
-
- final int bodyCount = messageHandle.getBodyCount(storeContext,messageId);
- if(bodyCount == 0)
- {
- SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
- contentHeader);
- writeFrame(compositeBlock);
- }
- else
- {
-
-
- //
- // Optimise the case where we have a single content body. In that case we create a composite block
- // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
- //
- ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
-
- AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
- writeFrame(compositeBlock);
-
- //
- // Now start writing out the other content bodies
- //
- for(int i = 1; i < bodyCount; i++)
- {
- cb = messageHandle.getContentChunk(storeContext, messageId, i);
- writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
- }
-
-
- }
-
-
- }
-
-
- private AMQDataBlock createEncodedDeliverFrame(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
- throws AMQException
- {
- final MessagePublishInfo pb = message.getMessagePublishInfo();
- final AMQMessageHandle messageHandle = message.getMessageHandle();
-
- MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
- BasicDeliverBody deliverBody =
- methodRegistry.createBasicDeliverBody(consumerTag,
- deliveryTag,
- messageHandle.isRedelivered(),
- pb.getExchange(),
- pb.getRoutingKey());
- AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
-
-
- return deliverFrame;
- }
-
- private AMQDataBlock createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
- throws AMQException
- {
- final MessagePublishInfo pb = message.getMessagePublishInfo();
- final AMQMessageHandle messageHandle = message.getMessageHandle();
-
- MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
- BasicGetOkBody getOkBody =
- methodRegistry.createBasicGetOkBody(deliveryTag,
- messageHandle.isRedelivered(),
- pb.getExchange(),
- pb.getRoutingKey(),
- queueSize);
- AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
-
- return getOkFrame;
- }
-
- public byte getProtocolMinorVersion()
- {
- return getProtocolSession().getProtocolMinorVersion();
- }
-
- public byte getProtocolMajorVersion()
- {
- return getProtocolSession().getProtocolMajorVersion();
- }
-
- private AMQDataBlock createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
- {
- MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
- BasicReturnBody basicReturnBody =
- methodRegistry.createBasicReturnBody(replyCode,
- replyText,
- message.getMessagePublishInfo().getExchange(),
- message.getMessagePublishInfo().getRoutingKey());
- AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
-
- return returnFrame;
- }
-
- public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
- throws AMQException
- {
- AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
-
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- message.getContentHeaderBody());
-
- Iterator<AMQDataBlock> bodyFrameIterator = message.getBodyFrameIterator(getProtocolSession(), channelId);
- //
- // Optimise the case where we have a single content body. In that case we create a composite block
- // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
- //
- if (bodyFrameIterator.hasNext())
- {
- AMQDataBlock firstContentBody = bodyFrameIterator.next();
- AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
- writeFrame(compositeBlock);
- }
- else
- {
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader});
-
- writeFrame(compositeBlock);
- }
-
- //
- // Now start writing out the other content bodies
- // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
- //
- while (bodyFrameIterator.hasNext())
- {
- writeFrame(bodyFrameIterator.next());
- }
- }
-
-
- public void writeFrame(AMQDataBlock block)
- {
- getProtocolSession().writeFrame(block);
- }
-
-
- public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
- {
- MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
- BasicCancelOkBody basicCancelOkBody = methodRegistry.createBasicCancelOkBody(consumerTag);
- writeFrame(basicCancelOkBody.generateFrame(channelId));
-
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* + * This file is auto-generated by Qpid Gentools v.0.1 - do not modify. + * Supported AMQP versions: + * 8-0 + */ +package org.apache.qpid.server.output.amqp0_8; + +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQMessageHandle; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.output.ProtocolOutputConverter; +import org.apache.qpid.framing.*; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.AMQException; + +import org.apache.mina.common.ByteBuffer; + +import java.util.Iterator; + +public class ProtocolOutputConverterImpl implements ProtocolOutputConverter +{ + + + public static Factory getInstanceFactory() + { + return new Factory() + { + + public ProtocolOutputConverter newInstance(AMQProtocolSession session) + { + return new ProtocolOutputConverterImpl(session); + } + }; + } + + private final AMQProtocolSession _protocolSession; + + private ProtocolOutputConverterImpl(AMQProtocolSession session) + { + _protocolSession = session; + } + + + public AMQProtocolSession getProtocolSession() + { + return _protocolSession; + } + + public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag) + throws AMQException + { + AMQDataBlock deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag); + AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, + message.getContentHeaderBody()); + + final AMQMessageHandle messageHandle = message.getMessageHandle(); + final StoreContext storeContext = message.getStoreContext(); + final Long messageId = message.getMessageId(); + + final int bodyCount = messageHandle.getBodyCount(storeContext,messageId); + + if(bodyCount == 0) + { + SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver, + contentHeader); + + writeFrame(compositeBlock); + } + else + { + + + // + // Optimise the case where we have a single content body. In that case we create a composite block + // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. + // + ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0); + + AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)); + AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody}; + CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks); + writeFrame(compositeBlock); + + // + // Now start writing out the other content bodies + // + for(int i = 1; i < bodyCount; i++) + { + cb = messageHandle.getContentChunk(storeContext,messageId, i); + writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb))); + } + + + } + + + } + + + public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException + { + + final AMQMessageHandle messageHandle = message.getMessageHandle(); + final StoreContext storeContext = message.getStoreContext(); + final long messageId = message.getMessageId(); + + AMQDataBlock deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize); + + + AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, + message.getContentHeaderBody()); + + final int bodyCount = messageHandle.getBodyCount(storeContext,messageId); + if(bodyCount == 0) + { + SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver, + contentHeader); + writeFrame(compositeBlock); + } + else + { + + + // + // Optimise the case where we have a single content body. In that case we create a composite block + // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. + // + ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0); + + AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)); + AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody}; + CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks); + writeFrame(compositeBlock); + + // + // Now start writing out the other content bodies + // + for(int i = 1; i < bodyCount; i++) + { + cb = messageHandle.getContentChunk(storeContext, messageId, i); + writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb))); + } + + + } + + + } + + + private AMQDataBlock createEncodedDeliverFrame(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag) + throws AMQException + { + final MessagePublishInfo pb = message.getMessagePublishInfo(); + final AMQMessageHandle messageHandle = message.getMessageHandle(); + + MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0); + BasicDeliverBody deliverBody = + methodRegistry.createBasicDeliverBody(consumerTag, + deliveryTag, + messageHandle.isRedelivered(), + pb.getExchange(), + pb.getRoutingKey()); + AMQFrame deliverFrame = deliverBody.generateFrame(channelId); + + + return deliverFrame; + } + + private AMQDataBlock createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize) + throws AMQException + { + final MessagePublishInfo pb = message.getMessagePublishInfo(); + final AMQMessageHandle messageHandle = message.getMessageHandle(); + + MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0); + BasicGetOkBody getOkBody = + methodRegistry.createBasicGetOkBody(deliveryTag, + messageHandle.isRedelivered(), + pb.getExchange(), + pb.getRoutingKey(), + queueSize); + AMQFrame getOkFrame = getOkBody.generateFrame(channelId); + + return getOkFrame; + } + + public byte getProtocolMinorVersion() + { + return getProtocolSession().getProtocolMinorVersion(); + } + + public byte getProtocolMajorVersion() + { + return getProtocolSession().getProtocolMajorVersion(); + } + + private AMQDataBlock createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException + { + MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0); + BasicReturnBody basicReturnBody = + methodRegistry.createBasicReturnBody(replyCode, + replyText, + message.getMessagePublishInfo().getExchange(), + message.getMessagePublishInfo().getRoutingKey()); + AMQFrame returnFrame = basicReturnBody.generateFrame(channelId); + + return returnFrame; + } + + public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) + throws AMQException + { + AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText); + + AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, + message.getContentHeaderBody()); + + Iterator<AMQDataBlock> bodyFrameIterator = message.getBodyFrameIterator(getProtocolSession(), channelId); + // + // Optimise the case where we have a single content body. In that case we create a composite block + // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. + // + if (bodyFrameIterator.hasNext()) + { + AMQDataBlock firstContentBody = bodyFrameIterator.next(); + AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody}; + CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks); + writeFrame(compositeBlock); + } + else + { + CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader}); + + writeFrame(compositeBlock); + } + + // + // Now start writing out the other content bodies + // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded + // + while (bodyFrameIterator.hasNext()) + { + writeFrame(bodyFrameIterator.next()); + } + } + + + public void writeFrame(AMQDataBlock block) + { + getProtocolSession().writeFrame(block); + } + + + public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag) + { + MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0); + BasicCancelOkBody basicCancelOkBody = methodRegistry.createBasicCancelOkBody(consumerTag); + writeFrame(basicCancelOkBody.generateFrame(channelId)); + + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java index 48d2ca9bc9..f87d3bcae1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java @@ -1,397 +1,397 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.output.amqp0_9;
-
-import org.apache.mina.common.ByteBuffer;
-
-import java.util.Iterator;
-
-import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQMessageHandle;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-
-public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
-{
- private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
- private static final ProtocolVersionMethodConverter PROTOCOL_METHOD_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter();
-
-
- public static Factory getInstanceFactory()
- {
- return new Factory()
- {
-
- public ProtocolOutputConverter newInstance(AMQProtocolSession session)
- {
- return new ProtocolOutputConverterImpl(session);
- }
- };
- }
-
- private final AMQProtocolSession _protocolSession;
-
- private ProtocolOutputConverterImpl(AMQProtocolSession session)
- {
- _protocolSession = session;
- }
-
-
- public AMQProtocolSession getProtocolSession()
- {
- return _protocolSession;
- }
-
- public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
- throws AMQException
- {
- AMQBody deliverBody = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
- final ContentHeaderBody contentHeaderBody = message.getContentHeaderBody();
-
-
- final AMQMessageHandle messageHandle = message.getMessageHandle();
- final StoreContext storeContext = message.getStoreContext();
- final Long messageId = message.getMessageId();
-
- final int bodyCount = messageHandle.getBodyCount(storeContext,messageId);
-
- if(bodyCount == 0)
- {
- SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
- contentHeaderBody);
-
- writeFrame(compositeBlock);
- }
- else
- {
-
-
- //
- // Optimise the case where we have a single content body. In that case we create a composite block
- // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
- //
- ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
-
- AMQBody firstContentBody = PROTOCOL_METHOD_CONVERTER.convertToBody(cb);
-
- CompositeAMQBodyBlock compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
- writeFrame(compositeBlock);
-
- //
- // Now start writing out the other content bodies
- //
- for(int i = 1; i < bodyCount; i++)
- {
- cb = messageHandle.getContentChunk(storeContext,messageId, i);
- writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
- }
-
-
- }
-
-
- }
-
- private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
- {
-
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- contentHeaderBody);
- return contentHeader;
- }
-
-
- public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
- {
-
- final AMQMessageHandle messageHandle = message.getMessageHandle();
- final StoreContext storeContext = message.getStoreContext();
- final long messageId = message.getMessageId();
-
- AMQFrame deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
-
-
- AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
-
- final int bodyCount = messageHandle.getBodyCount(storeContext,messageId);
- if(bodyCount == 0)
- {
- SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
- contentHeader);
- writeFrame(compositeBlock);
- }
- else
- {
-
-
- //
- // Optimise the case where we have a single content body. In that case we create a composite block
- // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
- //
- ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
-
- AMQDataBlock firstContentBody = new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb));
- AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
- writeFrame(compositeBlock);
-
- //
- // Now start writing out the other content bodies
- //
- for(int i = 1; i < bodyCount; i++)
- {
- cb = messageHandle.getContentChunk(storeContext, messageId, i);
- writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
- }
-
-
- }
-
-
- }
-
-
- private AMQBody createEncodedDeliverFrame(AMQMessage message, final int channelId, final long deliveryTag, final AMQShortString consumerTag)
- throws AMQException
- {
-
-
- final MessagePublishInfo pb = message.getMessagePublishInfo();
- final AMQMessageHandle messageHandle = message.getMessageHandle();
-
-
- final AMQBody returnBlock = new AMQBody()
- {
-
-
-
- private final boolean _isRedelivered = messageHandle.isRedelivered();
- private final AMQShortString _exchangeName = pb.getExchange();
- private final AMQShortString _routingKey = pb.getRoutingKey();
-
-
- public AMQBody _underlyingBody;
-
- public AMQBody createAMQBody()
- {
- return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
- deliveryTag,
- _isRedelivered,
- _exchangeName,
- _routingKey);
-
-
-
-
-
- }
-
- public byte getFrameType()
- {
- return AMQMethodBody.TYPE;
- }
-
- public int getSize()
- {
- if(_underlyingBody == null)
- {
- _underlyingBody = createAMQBody();
- }
- return _underlyingBody.getSize();
- }
-
- public void writePayload(ByteBuffer buffer)
- {
- if(_underlyingBody == null)
- {
- _underlyingBody = createAMQBody();
- }
- _underlyingBody.writePayload(buffer);
- }
-
- public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
- throws AMQException
- {
- throw new AMQException("This block should never be dispatched!");
- }
- };
- return returnBlock;
- }
-
- private AMQFrame createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
- throws AMQException
- {
- final MessagePublishInfo pb = message.getMessagePublishInfo();
- final AMQMessageHandle messageHandle = message.getMessageHandle();
-
-
- BasicGetOkBody getOkBody =
- METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
- messageHandle.isRedelivered(),
- pb.getExchange(),
- pb.getRoutingKey(),
- queueSize);
- AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
-
- return getOkFrame;
- }
-
- public byte getProtocolMinorVersion()
- {
- return getProtocolSession().getProtocolMinorVersion();
- }
-
- public byte getProtocolMajorVersion()
- {
- return getProtocolSession().getProtocolMajorVersion();
- }
-
- private AMQDataBlock createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
- {
-
- BasicReturnBody basicReturnBody =
- METHOD_REGISTRY.createBasicReturnBody(replyCode,
- replyText,
- message.getMessagePublishInfo().getExchange(),
- message.getMessagePublishInfo().getRoutingKey());
- AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
-
- return returnFrame;
- }
-
- public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
- throws AMQException
- {
- AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
-
- AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
-
- Iterator<AMQDataBlock> bodyFrameIterator = message.getBodyFrameIterator(getProtocolSession(), channelId);
- //
- // Optimise the case where we have a single content body. In that case we create a composite block
- // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
- //
- if (bodyFrameIterator.hasNext())
- {
- AMQDataBlock firstContentBody = bodyFrameIterator.next();
- AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
- writeFrame(compositeBlock);
- }
- else
- {
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader});
-
- writeFrame(compositeBlock);
- }
-
- //
- // Now start writing out the other content bodies
- // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
- //
- while (bodyFrameIterator.hasNext())
- {
- writeFrame(bodyFrameIterator.next());
- }
- }
-
-
- public void writeFrame(AMQDataBlock block)
- {
- getProtocolSession().writeFrame(block);
- }
-
-
- public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
- {
-
- BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
- writeFrame(basicCancelOkBody.generateFrame(channelId));
-
- }
-
-
- public static final class CompositeAMQBodyBlock extends AMQDataBlock
- {
- public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
-
- private final AMQBody _methodBody;
- private final AMQBody _headerBody;
- private final AMQBody _contentBody;
- private final int _channel;
-
-
- public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
- {
- _channel = channel;
- _methodBody = methodBody;
- _headerBody = headerBody;
- _contentBody = contentBody;
-
- }
-
- public long getSize()
- {
- return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
- }
-
- public void writePayload(ByteBuffer buffer)
- {
- AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
- }
- }
-
- public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
- {
- public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
-
- private final AMQBody _methodBody;
- private final AMQBody _headerBody;
- private final int _channel;
-
-
- public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
- {
- _channel = channel;
- _methodBody = methodBody;
- _headerBody = headerBody;
-
- }
-
- public long getSize()
- {
- return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
- }
-
- public void writePayload(ByteBuffer buffer)
- {
- AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
- }
- }
-
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.output.amqp0_9; + +import org.apache.mina.common.ByteBuffer; + +import java.util.Iterator; + +import org.apache.qpid.server.output.ProtocolOutputConverter; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQMessageHandle; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.framing.*; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; + +public class ProtocolOutputConverterImpl implements ProtocolOutputConverter +{ + private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9); + private static final ProtocolVersionMethodConverter PROTOCOL_METHOD_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter(); + + + public static Factory getInstanceFactory() + { + return new Factory() + { + + public ProtocolOutputConverter newInstance(AMQProtocolSession session) + { + return new ProtocolOutputConverterImpl(session); + } + }; + } + + private final AMQProtocolSession _protocolSession; + + private ProtocolOutputConverterImpl(AMQProtocolSession session) + { + _protocolSession = session; + } + + + public AMQProtocolSession getProtocolSession() + { + return _protocolSession; + } + + public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag) + throws AMQException + { + AMQBody deliverBody = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag); + final ContentHeaderBody contentHeaderBody = message.getContentHeaderBody(); + + + final AMQMessageHandle messageHandle = message.getMessageHandle(); + final StoreContext storeContext = message.getStoreContext(); + final Long messageId = message.getMessageId(); + + final int bodyCount = messageHandle.getBodyCount(storeContext,messageId); + + if(bodyCount == 0) + { + SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody, + contentHeaderBody); + + writeFrame(compositeBlock); + } + else + { + + + // + // Optimise the case where we have a single content body. In that case we create a composite block + // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. + // + ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0); + + AMQBody firstContentBody = PROTOCOL_METHOD_CONVERTER.convertToBody(cb); + + CompositeAMQBodyBlock compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody); + writeFrame(compositeBlock); + + // + // Now start writing out the other content bodies + // + for(int i = 1; i < bodyCount; i++) + { + cb = messageHandle.getContentChunk(storeContext,messageId, i); + writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb))); + } + + + } + + + } + + private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody) + { + + AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, + contentHeaderBody); + return contentHeader; + } + + + public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException + { + + final AMQMessageHandle messageHandle = message.getMessageHandle(); + final StoreContext storeContext = message.getStoreContext(); + final long messageId = message.getMessageId(); + + AMQFrame deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize); + + + AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody()); + + final int bodyCount = messageHandle.getBodyCount(storeContext,messageId); + if(bodyCount == 0) + { + SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver, + contentHeader); + writeFrame(compositeBlock); + } + else + { + + + // + // Optimise the case where we have a single content body. In that case we create a composite block + // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. + // + ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0); + + AMQDataBlock firstContentBody = new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)); + AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody}; + CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks); + writeFrame(compositeBlock); + + // + // Now start writing out the other content bodies + // + for(int i = 1; i < bodyCount; i++) + { + cb = messageHandle.getContentChunk(storeContext, messageId, i); + writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb))); + } + + + } + + + } + + + private AMQBody createEncodedDeliverFrame(AMQMessage message, final int channelId, final long deliveryTag, final AMQShortString consumerTag) + throws AMQException + { + + + final MessagePublishInfo pb = message.getMessagePublishInfo(); + final AMQMessageHandle messageHandle = message.getMessageHandle(); + + + final AMQBody returnBlock = new AMQBody() + { + + + + private final boolean _isRedelivered = messageHandle.isRedelivered(); + private final AMQShortString _exchangeName = pb.getExchange(); + private final AMQShortString _routingKey = pb.getRoutingKey(); + + + public AMQBody _underlyingBody; + + public AMQBody createAMQBody() + { + return METHOD_REGISTRY.createBasicDeliverBody(consumerTag, + deliveryTag, + _isRedelivered, + _exchangeName, + _routingKey); + + + + + + } + + public byte getFrameType() + { + return AMQMethodBody.TYPE; + } + + public int getSize() + { + if(_underlyingBody == null) + { + _underlyingBody = createAMQBody(); + } + return _underlyingBody.getSize(); + } + + public void writePayload(ByteBuffer buffer) + { + if(_underlyingBody == null) + { + _underlyingBody = createAMQBody(); + } + _underlyingBody.writePayload(buffer); + } + + public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) + throws AMQException + { + throw new AMQException("This block should never be dispatched!"); + } + }; + return returnBlock; + } + + private AMQFrame createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize) + throws AMQException + { + final MessagePublishInfo pb = message.getMessagePublishInfo(); + final AMQMessageHandle messageHandle = message.getMessageHandle(); + + + BasicGetOkBody getOkBody = + METHOD_REGISTRY.createBasicGetOkBody(deliveryTag, + messageHandle.isRedelivered(), + pb.getExchange(), + pb.getRoutingKey(), + queueSize); + AMQFrame getOkFrame = getOkBody.generateFrame(channelId); + + return getOkFrame; + } + + public byte getProtocolMinorVersion() + { + return getProtocolSession().getProtocolMinorVersion(); + } + + public byte getProtocolMajorVersion() + { + return getProtocolSession().getProtocolMajorVersion(); + } + + private AMQDataBlock createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException + { + + BasicReturnBody basicReturnBody = + METHOD_REGISTRY.createBasicReturnBody(replyCode, + replyText, + message.getMessagePublishInfo().getExchange(), + message.getMessagePublishInfo().getRoutingKey()); + AMQFrame returnFrame = basicReturnBody.generateFrame(channelId); + + return returnFrame; + } + + public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) + throws AMQException + { + AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText); + + AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody()); + + Iterator<AMQDataBlock> bodyFrameIterator = message.getBodyFrameIterator(getProtocolSession(), channelId); + // + // Optimise the case where we have a single content body. In that case we create a composite block + // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. + // + if (bodyFrameIterator.hasNext()) + { + AMQDataBlock firstContentBody = bodyFrameIterator.next(); + AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody}; + CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks); + writeFrame(compositeBlock); + } + else + { + CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader}); + + writeFrame(compositeBlock); + } + + // + // Now start writing out the other content bodies + // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded + // + while (bodyFrameIterator.hasNext()) + { + writeFrame(bodyFrameIterator.next()); + } + } + + + public void writeFrame(AMQDataBlock block) + { + getProtocolSession().writeFrame(block); + } + + + public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag) + { + + BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag); + writeFrame(basicCancelOkBody.generateFrame(channelId)); + + } + + + public static final class CompositeAMQBodyBlock extends AMQDataBlock + { + public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead(); + + private final AMQBody _methodBody; + private final AMQBody _headerBody; + private final AMQBody _contentBody; + private final int _channel; + + + public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody) + { + _channel = channel; + _methodBody = methodBody; + _headerBody = headerBody; + _contentBody = contentBody; + + } + + public long getSize() + { + return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize(); + } + + public void writePayload(ByteBuffer buffer) + { + AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody); + } + } + + public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock + { + public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead(); + + private final AMQBody _methodBody; + private final AMQBody _headerBody; + private final int _channel; + + + public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody) + { + _channel = channel; + _methodBody = methodBody; + _headerBody = headerBody; + + } + + public long getSize() + { + return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ; + } + + public void writePayload(ByteBuffer buffer) + { + AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody); + } + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java index a7599a3e0d..92f951ce39 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java @@ -1,46 +1,46 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
-
-/**
- * AMQNoMethodHandlerException represents the case where no method handler exists to handle an AQMP method.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Represents failure to handle an AMQP method.
- * </table>
- *
- * @todo Not an AMQP exception as no status code.
- *
- * @todo Missing method handler. Unlikely to ever happen, and if it does its a coding error. Consider replacing with a
- * Runtime.
- */
-public class AMQNoMethodHandlerException extends AMQException
-{
- public AMQNoMethodHandlerException(AMQMethodEvent<AMQMethodBody> evt)
- {
- super("AMQMethodEvent " + evt + " was not processed by any listener on Broker.");
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.protocol.AMQMethodEvent; + +/** + * AMQNoMethodHandlerException represents the case where no method handler exists to handle an AQMP method. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Represents failure to handle an AMQP method. + * </table> + * + * @todo Not an AMQP exception as no status code. + * + * @todo Missing method handler. Unlikely to ever happen, and if it does its a coding error. Consider replacing with a + * Runtime. + */ +public class AMQNoMethodHandlerException extends AMQException +{ + public AMQNoMethodHandlerException(AMQMethodEvent<AMQMethodBody> evt) + { + super("AMQMethodEvent " + evt + " was not processed by any listener on Broker."); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java index 6e72aa062f..bb2db8d506 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java @@ -1,46 +1,46 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQDataBlock;
-
-/**
- * UnknnownMessageTypeException represents a failure when Mina passes an unexpected frame type.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Represents failure to cast a frame to its expected type.
- * </table>
- *
- * @todo Not an AMQP exception as no status code.
- *
- * @todo Seems like this exception was created to handle an unsafe type cast that will never happen in practice. Would
- * be better just to leave that as a ClassCastException. However, check the framing layer catches this error
- * first.
- */
-public class UnknnownMessageTypeException extends AMQException
-{
- public UnknnownMessageTypeException(AMQDataBlock message)
- {
- super("Unknown message type: " + message.getClass().getName() + ": " + message);
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQDataBlock; + +/** + * UnknnownMessageTypeException represents a failure when Mina passes an unexpected frame type. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Represents failure to cast a frame to its expected type. + * </table> + * + * @todo Not an AMQP exception as no status code. + * + * @todo Seems like this exception was created to handle an unsafe type cast that will never happen in practice. Would + * be better just to leave that as a ClassCastException. However, check the framing layer catches this error + * first. + */ +public class UnknnownMessageTypeException extends AMQException +{ + public UnknnownMessageTypeException(AMQDataBlock message) + { + super("Unknown message type: " + message.getClass().getName() + ": " + message); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java b/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java index 6f9efd3200..65115e4103 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java @@ -1,138 +1,138 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.AMQException;
-
-public enum NotificationCheck
-{
-
- MESSAGE_COUNT_ALERT
- {
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
- {
- int msgCount;
- final long maximumMessageCount = queue.getMaximumMessageCount();
- if (maximumMessageCount!= 0 && (msgCount = queue.getMessageCount()) >= maximumMessageCount)
- {
- listener.notifyClients(this, queue, msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached.");
- return true;
- }
- return false;
- }
- },
- MESSAGE_SIZE_ALERT(true)
- {
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
- {
- final long maximumMessageSize = queue.getMaximumMessageSize();
- if(maximumMessageSize != 0)
- {
- // Check for threshold message size
- long messageSize;
- try
- {
- messageSize = (msg == null) ? 0 : msg.getContentHeaderBody().bodySize;
- }
- catch (AMQException e)
- {
- messageSize = 0;
- }
-
-
- if (messageSize >= maximumMessageSize)
- {
- listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageId() + "]");
- return true;
- }
- }
- return false;
- }
-
- },
- QUEUE_DEPTH_ALERT
- {
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
- {
- // Check for threshold queue depth in bytes
- final long maximumQueueDepth = queue.getMaximumQueueDepth();
-
- if(maximumQueueDepth != 0)
- {
- final long queueDepth = queue.getQueueDepth();
-
- if (queueDepth >= maximumQueueDepth)
- {
- listener.notifyClients(this, queue, (queueDepth>>10) + "Kb : Maximum queue depth threshold ("+(maximumQueueDepth>>10)+"Kb) breached.");
- return true;
- }
- }
- return false;
- }
-
- },
- MESSAGE_AGE_ALERT
- {
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
- {
-
- final long maxMessageAge = queue.getMaximumMessageAge();
- if(maxMessageAge != 0)
- {
- final long currentTime = System.currentTimeMillis();
- final long thresholdTime = currentTime - maxMessageAge;
- final long firstArrivalTime = queue.getOldestMessageArrivalTime();
-
- if(firstArrivalTime < thresholdTime)
- {
- long oldestAge = currentTime - firstArrivalTime;
- listener.notifyClients(this, queue, (oldestAge/1000) + "s : Maximum age on queue threshold ("+(maxMessageAge /1000)+"s) breached.");
-
- return true;
- }
- }
- return false;
-
- }
-
- }
- ;
-
- private final boolean _messageSpecific;
-
- NotificationCheck()
- {
- this(false);
- }
-
- NotificationCheck(boolean messageSpecific)
- {
- _messageSpecific = messageSpecific;
- }
-
- public boolean isMessageSpecific()
- {
- return _messageSpecific;
- }
-
- abstract boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener);
-
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.AMQException; + +public enum NotificationCheck +{ + + MESSAGE_COUNT_ALERT + { + boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener) + { + int msgCount; + final long maximumMessageCount = queue.getMaximumMessageCount(); + if (maximumMessageCount!= 0 && (msgCount = queue.getMessageCount()) >= maximumMessageCount) + { + listener.notifyClients(this, queue, msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached."); + return true; + } + return false; + } + }, + MESSAGE_SIZE_ALERT(true) + { + boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener) + { + final long maximumMessageSize = queue.getMaximumMessageSize(); + if(maximumMessageSize != 0) + { + // Check for threshold message size + long messageSize; + try + { + messageSize = (msg == null) ? 0 : msg.getContentHeaderBody().bodySize; + } + catch (AMQException e) + { + messageSize = 0; + } + + + if (messageSize >= maximumMessageSize) + { + listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageId() + "]"); + return true; + } + } + return false; + } + + }, + QUEUE_DEPTH_ALERT + { + boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener) + { + // Check for threshold queue depth in bytes + final long maximumQueueDepth = queue.getMaximumQueueDepth(); + + if(maximumQueueDepth != 0) + { + final long queueDepth = queue.getQueueDepth(); + + if (queueDepth >= maximumQueueDepth) + { + listener.notifyClients(this, queue, (queueDepth>>10) + "Kb : Maximum queue depth threshold ("+(maximumQueueDepth>>10)+"Kb) breached."); + return true; + } + } + return false; + } + + }, + MESSAGE_AGE_ALERT + { + boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener) + { + + final long maxMessageAge = queue.getMaximumMessageAge(); + if(maxMessageAge != 0) + { + final long currentTime = System.currentTimeMillis(); + final long thresholdTime = currentTime - maxMessageAge; + final long firstArrivalTime = queue.getOldestMessageArrivalTime(); + + if(firstArrivalTime < thresholdTime) + { + long oldestAge = currentTime - firstArrivalTime; + listener.notifyClients(this, queue, (oldestAge/1000) + "s : Maximum age on queue threshold ("+(maxMessageAge /1000)+"s) breached."); + + return true; + } + } + return false; + + } + + } + ; + + private final boolean _messageSpecific; + + NotificationCheck() + { + this(false); + } + + NotificationCheck(boolean messageSpecific) + { + _messageSpecific = messageSpecific; + } + + public boolean isMessageSpecific() + { + return _messageSpecific; + } + + abstract boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener); + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java index 959ca03c80..f1e7c98387 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java @@ -1,27 +1,27 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-
-public interface QueueNotificationListener
-{
- void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg);
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + + +public interface QueueNotificationListener +{ + void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg); +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java index 85d804457e..70a76dd8c2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java @@ -1,44 +1,44 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost;
-
-import java.io.IOException;
-
-import org.apache.qpid.server.management.MBeanAttribute;
-
-/**
- * The management interface exposed to allow management of an Exchange.
- * @version 0.1
- */
-public interface ManagedVirtualHost
-{
- static final String TYPE = "VirtualHost";
-
- /**
- * Returns the name of the managed virtualHost.
- * @return the name of the exchange.
- * @throws java.io.IOException
- */
- @MBeanAttribute(name="Name", description= TYPE + " Name")
- String getName() throws IOException;
-
-
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost; + +import java.io.IOException; + +import org.apache.qpid.server.management.MBeanAttribute; + +/** + * The management interface exposed to allow management of an Exchange. + * @version 0.1 + */ +public interface ManagedVirtualHost +{ + static final String TYPE = "VirtualHost"; + + /** + * Returns the name of the managed virtualHost. + * @return the name of the exchange. + * @throws java.io.IOException + */ + @MBeanAttribute(name="Name", description= TYPE + " Name") + String getName() throws IOException; + + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java index 27917fac8a..9b1619c609 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java @@ -1,70 +1,70 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-public class VirtualHostRegistry
-{
- private final Map<String, VirtualHost> _registry = new ConcurrentHashMap<String,VirtualHost>();
-
-
- private String _defaultVirtualHostName;
-
- public synchronized void registerVirtualHost(VirtualHost host) throws Exception
- {
- if(_registry.containsKey(host.getName()))
- {
- throw new Exception("Virtual Host with name " + host.getName() + " already registered.");
- }
- _registry.put(host.getName(),host);
- }
-
- public VirtualHost getVirtualHost(String name)
- {
- if(name == null || name.trim().length() == 0 )
- {
- name = getDefaultVirtualHostName();
- }
-
- return _registry.get(name);
- }
-
- private String getDefaultVirtualHostName()
- {
- return _defaultVirtualHostName;
- }
-
- public void setDefaultVirtualHostName(String defaultVirtualHostName)
- {
- _defaultVirtualHostName = defaultVirtualHostName;
- }
-
-
- public Collection<VirtualHost> getVirtualHosts()
- {
- return new ArrayList<VirtualHost>(_registry.values());
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + + +public class VirtualHostRegistry +{ + private final Map<String, VirtualHost> _registry = new ConcurrentHashMap<String,VirtualHost>(); + + + private String _defaultVirtualHostName; + + public synchronized void registerVirtualHost(VirtualHost host) throws Exception + { + if(_registry.containsKey(host.getName())) + { + throw new Exception("Virtual Host with name " + host.getName() + " already registered."); + } + _registry.put(host.getName(),host); + } + + public VirtualHost getVirtualHost(String name) + { + if(name == null || name.trim().length() == 0 ) + { + name = getDefaultVirtualHostName(); + } + + return _registry.get(name); + } + + private String getDefaultVirtualHostName() + { + return _defaultVirtualHostName; + } + + public void setDefaultVirtualHostName(String defaultVirtualHostName) + { + _defaultVirtualHostName = defaultVirtualHostName; + } + + + public Collection<VirtualHost> getVirtualHosts() + { + return new ArrayList<VirtualHost>(_registry.values()); + } +} |