From 5b060554268c763cb883a102b04be21741551161 Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Mon, 28 Jan 2008 16:48:00 +0000 Subject: Merged revisions 608477,609961,610475,610479,610806,611146 via svnmerge from https://svn.apache.org/repos/asf/incubator/qpid/branches/M2 ........ r608477 | rgodfrey | 2008-01-03 13:23:04 +0000 (Thu, 03 Jan 2008) | 1 line QPID-499 : Added per-virtual host timed tasks to inspect queues (with no consumers) for expired messages ........ r609961 | ritchiem | 2008-01-08 12:59:01 +0000 (Tue, 08 Jan 2008) | 2 lines QPID-499 : Patch to update the queue size statistics when the Active TTL process runs Removed old single commented out code line from AMQSession. ........ r610475 | ritchiem | 2008-01-09 17:32:43 +0000 (Wed, 09 Jan 2008) | 1 line Qpid-723 Added exec to qpid.start ........ r610479 | ritchiem | 2008-01-09 17:39:54 +0000 (Wed, 09 Jan 2008) | 1 line Qpid-690 : Provide configurable delay between re-connecion attempts. ........ r610806 | ritchiem | 2008-01-10 14:41:37 +0000 (Thu, 10 Jan 2008) | 1 line QPID-690 : Relaxed the timings on failover as Thread.sleep is accurate to 10ms so may finish the sleep 10ms early. Resulting in erratic failures as 9.9s < 10s. ........ r611146 | ritchiem | 2008-01-11 11:33:31 +0000 (Fri, 11 Jan 2008) | 1 line Patch by Aidan Skinner to make third constructor public. This is done so that the BDBMessageStore tests can still run with the addition of the VirtualHost reaper thread. ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@615943 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 65 ++++++++++++++++++++++ .../qpid/client/transport/TransportConnection.java | 5 +- 2 files changed, 69 insertions(+), 1 deletion(-) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 42f07f97f9..deaa435d8c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -39,6 +39,7 @@ import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.util.FlowControllingBlockingQueue; +import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.*; import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; @@ -2148,6 +2149,70 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait); } + + /** + * Returns the number of messages currently queued for the given destination. + * + *

Note that this operation automatically retries in the event of fail-over. + * + * @param amqd The destination to be checked + * + * @return the number of queued messages. + * + * @throws AMQException If the queue cannot be declared for any reason. + */ + public long getQueueDepth(final AMQDestination amqd) + throws AMQException + { + + class QueueDeclareOkHandler extends SpecificMethodFrameListener + { + + private long _messageCount; + private long _consumerCount; + + public QueueDeclareOkHandler() + { + super(getChannelId(), QueueDeclareOkBody.class); + } + + public boolean processMethod(int channelId, AMQMethodBody frame) //throws AMQException + { + boolean matches = super.processMethod(channelId, frame); + QueueDeclareOkBody declareOk = (QueueDeclareOkBody) frame; + _messageCount = declareOk.getMessageCount(); + _consumerCount = declareOk.getConsumerCount(); + return matches; + } + + } + + return new FailoverNoopSupport( + new FailoverProtectedOperation() + { + public Long execute() throws AMQException, FailoverException + { + + AMQFrame queueDeclare = + getMethodRegistry().createQueueDeclareBody(getTicket(), + amqd.getAMQQueueName(), + true, + amqd.isDurable(), + amqd.isExclusive(), + amqd.isAutoDelete(), + false, + null).generateFrame(_channelId); + QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler(); + getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler); + + return okHandler._messageCount; + } + }, _connection).execute(); + + } + + + /** * Declares the named exchange and type of exchange. * diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index e8a220f5e9..c1116ca01e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -283,7 +283,10 @@ public class TransportConnection public static void killAllVMBrokers() { _logger.info("Killing all VM Brokers"); - _acceptor.unbindAll(); + if (_acceptor != null) + { + _acceptor.unbindAll(); + } synchronized (_inVmPipeAddress) { _inVmPipeAddress.clear(); -- cgit v1.2.1 From cebb9bb85aa61402762e2567be0d6a714b65f8d2 Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Fri, 8 Feb 2008 12:52:54 +0000 Subject: QPID-588: change instances of trace() and isTraceEnabled to debug equivalent to support older versions of log4j git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@619868 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 8 ++++---- .../apache/qpid/client/BasicMessageConsumer.java | 24 +++++++++++----------- .../qpid/test/unit/basic/PropertyValueTest.java | 2 +- 3 files changed, 17 insertions(+), 17 deletions(-) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index deaa435d8c..64895fe16c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -1394,9 +1394,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void rejectMessage(UnprocessedMessage message, boolean requeue) { - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { - _logger.trace("Rejecting Unacked message:" + message.getDeliverBody().getDeliveryTag()); + _logger.debug("Rejecting Unacked message:" + message.getDeliverBody().getDeliveryTag()); } rejectMessage(message.getDeliverBody().getDeliveryTag(), requeue); @@ -1404,9 +1404,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void rejectMessage(AbstractJMSMessage message, boolean requeue) { - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { - _logger.trace("Rejecting Abstract message:" + message.getDeliveryTag()); + _logger.debug("Rejecting Abstract message:" + message.getDeliveryTag()); } rejectMessage(message.getDeliveryTag(), requeue); diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 610e0109b1..488d22c4bd 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -551,12 +551,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { if (!_closed.getAndSet(true)) { - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); if (_closedStack != null) { - _logger.trace(_consumerTag + " previously:" + _closedStack.toString()); + _logger.debug(_consumerTag + " previously:" + _closedStack.toString()); } else { @@ -621,14 +621,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _closed.set(true); - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); if (_closedStack != null) { - _logger.trace(_consumerTag + " markClosed():" + _logger.debug(_consumerTag + " markClosed():" + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1)); - _logger.trace(_consumerTag + " previously:" + _closedStack.toString()); + _logger.debug(_consumerTag + " previously:" + _closedStack.toString()); } else { @@ -845,14 +845,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer // synchronized (_closed) { _closed.set(true); - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); if (_closedStack != null) { - _logger.trace(_consumerTag + " notifyError():" + _logger.debug(_consumerTag + " notifyError():" + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1)); - _logger.trace(_consumerTag + " previously" + _closedStack.toString()); + _logger.debug(_consumerTag + " previously" + _closedStack.toString()); } else { @@ -982,9 +982,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (tag != null) { - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { - _logger.trace("Rejecting tag from _receivedDTs:" + tag); + _logger.debug("Rejecting tag from _receivedDTs:" + tag); } _session.rejectMessage(tag, true); @@ -1025,9 +1025,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _session.rejectMessage(((AbstractJMSMessage) o), true); - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { - _logger.trace("Rejected message:" + ((AbstractJMSMessage) o).getDeliveryTag()); + _logger.debug("Rejected message:" + ((AbstractJMSMessage) o).getDeliveryTag()); } iterator.remove(); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java index d2a7ba301b..8cceba6ffd 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java @@ -173,7 +173,7 @@ public class PropertyValueTest extends TestCase implements MessageListener m.setJMSReplyTo(q); m.setStringProperty("TempQueue", q.toString()); - _logger.trace("Message:" + m); + _logger.debug("Message:" + m); Assert.assertEquals("Check temp queue has been set correctly", m.getJMSReplyTo().toString(), m.getStringProperty("TempQueue")); -- cgit v1.2.1 From d203e3c2b52e59b8b5faed91b57be4603840b9a7 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Tue, 12 Feb 2008 11:29:19 +0000 Subject: QPID-784 : Added ability to provide existing Socket to Qpid Client Libraries to use as for connection. AMQBrokerDetails.java, BrokerDetails.java And ConnectionURLTest.java augmented to allow new transport type 'socket' New ExistingSocketConnector, which utises a given Socket() rather than creating its own from a SocketChannel. This code was taken from the Mina library v1.0.0. Changes to AMQConnection.java, SocketTransportConnection.java were required to allow the new Socket object to be passed through to the ExistingSocketConnector. The TransportConnection.java was updated to return an ExistingSocketConnector when the 'socket' transport is used. AMQConnection.makeBrokerConnection was changed when the 'socket' transport is being used. This allows the set Socket to be passed down to the ExistingSocketConnector for the transport to be run over. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@620767 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/ExistingSocketConnectorDemo.java | 161 +++++++ .../socket/nio/ExistingSocketConnector.java | 478 +++++++++++++++++++++ .../org/apache/qpid/client/AMQBrokerDetails.java | 3 +- .../java/org/apache/qpid/client/AMQConnection.java | 56 ++- .../transport/SocketTransportConnection.java | 36 +- .../qpid/client/transport/TransportConnection.java | 17 +- .../java/org/apache/qpid/jms/BrokerDetails.java | 1 + .../client/connectionurl/ConnectionURLTest.java | 17 + 8 files changed, 757 insertions(+), 12 deletions(-) create mode 100644 java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java create mode 100644 java/client/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java (limited to 'java/client') diff --git a/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java b/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java new file mode 100644 index 0000000000..0979c9c6b8 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java @@ -0,0 +1,161 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.example.transport; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.url.URLSyntaxException; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.channels.SocketChannel; + +/** + * This is a simple application that demonstrates how you can use the Qpid AMQP interfaces to use existing sockets as + * the transport for the Client API. + * + * The Demo here runs twice: + * 1. Just to show a simple publish and receive. + * 2. To demonstrate how to use existing sockets and utilise the underlying client failover mechnaism. + */ +public class ExistingSocketConnectorDemo implements ConnectionListener +{ + private static boolean DEMO_FAILOVER = false; + + public static void main(String[] args) throws IOException, URLSyntaxException, AMQException, JMSException + { + System.out.println("Testing socket connection to localhost:5672."); + + new ExistingSocketConnectorDemo(); + + System.out.println("Testing socket connection failover between localhost:5672 and localhost:5673."); + + DEMO_FAILOVER = true; + + new ExistingSocketConnectorDemo(); + } + + Connection _connection; + MessageProducer _producer; + Session _session; + + + /** Here we can see the broker we are connecting to is set to be 'socket:///' signifying we will provide the socket. */ + public static final String CONNECTION = "amqp://guest:guest@id/test?brokerlist='socket:///'"; + + public ExistingSocketConnectorDemo() throws IOException, URLSyntaxException, AMQException, JMSException + { + + Socket socket = SocketChannel.open().socket(); + socket.connect(new InetSocketAddress("localhost", 5672)); + + _connection = new AMQConnection(CONNECTION, socket); + + _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageConsumer consumer = _session.createConsumer(_session.createQueue("Queue")); + + _producer = _session.createProducer(_session.createQueue("Queue")); + + _connection.start(); + + if (!DEMO_FAILOVER) + { + _producer.send(_session.createTextMessage("Simple Test")); + } + else + { + // Using the Qpid interfaces we can set a listener that allows us to demonstrate failover + ((AMQConnection) _connection).setConnectionListener(this); + + System.out.println("Testing failover: Please ensure second broker running on localhost:5673 and shutdown broker on 5672."); + } + + //We do a blocking receive here so that we can demonstrate failover. + Message message = consumer.receive(); + + System.out.println("Recevied :" + message); + + _connection.close(); + } + + // ConnectionListener Interface + + public void bytesSent(long count) + { + //not used in this example + } + public void bytesReceived(long count) + { + //not used in this example + } + + public boolean preFailover(boolean redirect) + { + /** + * This method is called before the underlying client library starts to reconnect. This gives us the opportunity + * to set a new socket for the failover to occur on. + */ + try + { + Socket socket = SocketChannel.open().socket(); + + socket.connect(new InetSocketAddress("localhost", 5673)); + + // This is the new method to pass in an open socket for the connection to use. + ((AMQConnection) _connection).setOpenSocket(socket); + } + catch (IOException e) + { + e.printStackTrace(); + return false; + } + return true; + } + + public boolean preResubscribe() + { + //not used in this example - but must return true to allow the resubscription of existing clients. + return true; + } + + public void failoverComplete() + { + // Now that failover has completed we can send a message that the receiving thread will pick up + try + { + _producer.send(_session.createTextMessage("Simple Failover Test")); + } + catch (JMSException e) + { + e.printStackTrace(); + } + } +} diff --git a/java/client/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java b/java/client/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java new file mode 100644 index 0000000000..ab3bc28d83 --- /dev/null +++ b/java/client/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.transport.socket.nio; + +import edu.emory.mathcs.backport.java.util.concurrent.Executor; +import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.ExceptionMonitor; +import org.apache.mina.common.IoConnector; +import org.apache.mina.common.IoConnectorConfig; +import org.apache.mina.common.IoHandler; +import org.apache.mina.common.IoServiceConfig; +import org.apache.mina.common.support.BaseIoConnector; +import org.apache.mina.common.support.DefaultConnectFuture; +import org.apache.mina.util.NamePreservingRunnable; +import org.apache.mina.util.NewThreadExecutor; +import org.apache.mina.util.Queue; + +import java.io.IOException; +import java.net.ConnectException; +import java.net.Socket; +import java.net.SocketAddress; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.Iterator; +import java.util.Set; + +/** + * {@link IoConnector} for socket transport (TCP/IP). + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + * @version $Rev$, $Date$ + */ +public class ExistingSocketConnector extends BaseIoConnector +{ + /** @noinspection StaticNonFinalField */ + private static volatile int nextId = 0; + + private final Object lock = new Object(); + private final int id = nextId++; + private final String threadName = "SocketConnector-" + id; + private SocketConnectorConfig defaultConfig = new SocketConnectorConfig(); + private final Queue connectQueue = new Queue(); + private final SocketIoProcessor[] ioProcessors; + private final int processorCount; + private final Executor executor; + + /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */ + private Selector selector; + private Worker worker; + private int processorDistributor = 0; + private int workerTimeout = 60; // 1 min. + private Socket _openSocket = null; + + /** Create a connector with a single processing thread using a NewThreadExecutor */ + public ExistingSocketConnector() + { + this(1, new NewThreadExecutor()); + } + + /** + * Create a connector with the desired number of processing threads + * + * @param processorCount Number of processing threads + * @param executor Executor to use for launching threads + */ + public ExistingSocketConnector(int processorCount, Executor executor) + { + if (processorCount < 1) + { + throw new IllegalArgumentException("Must have at least one processor"); + } + + this.executor = executor; + this.processorCount = processorCount; + ioProcessors = new SocketIoProcessor[processorCount]; + + for (int i = 0; i < processorCount; i++) + { + ioProcessors[i] = new SocketIoProcessor("SocketConnectorIoProcessor-" + id + "." + i, executor); + } + } + + /** + * How many seconds to keep the connection thread alive between connection requests + * + * @return Number of seconds to keep connection thread alive + */ + public int getWorkerTimeout() + { + return workerTimeout; + } + + /** + * Set how many seconds the connection worker thread should remain alive once idle before terminating itself. + * + * @param workerTimeout Number of seconds to keep thread alive. Must be >=0 + */ + public void setWorkerTimeout(int workerTimeout) + { + if (workerTimeout < 0) + { + throw new IllegalArgumentException("Must be >= 0"); + } + this.workerTimeout = workerTimeout; + } + + public ConnectFuture connect(SocketAddress address, IoHandler handler, IoServiceConfig config) + { + return connect(address, null, handler, config); + } + + public ConnectFuture connect(SocketAddress address, SocketAddress localAddress, + IoHandler handler, IoServiceConfig config) + { + /** Changes here from the Mina OpenSocketConnector. + * Ignoreing all address as they are not needed */ + + if (handler == null) + { + throw new NullPointerException("handler"); + } + + + if (config == null) + { + config = getDefaultConfig(); + } + + if (_openSocket == null) + { + throw new IllegalArgumentException("Specifed Socket not active"); + } + + boolean success = false; + + try + { + DefaultConnectFuture future = new DefaultConnectFuture(); + newSession(_openSocket, handler, config, future); + success = true; + return future; + } + catch (IOException e) + { + return DefaultConnectFuture.newFailedFuture(e); + } + finally + { + if (!success && _openSocket != null) + { + try + { + _openSocket.close(); + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + } + } + } + } + + public IoServiceConfig getDefaultConfig() + { + return defaultConfig; + } + + /** + * Sets the config this connector will use by default. + * + * @param defaultConfig the default config. + * + * @throws NullPointerException if the specified value is null. + */ + public void setDefaultConfig(SocketConnectorConfig defaultConfig) + { + if (defaultConfig == null) + { + throw new NullPointerException("defaultConfig"); + } + this.defaultConfig = defaultConfig; + } + + private synchronized void startupWorker() throws IOException + { + if (worker == null) + { + selector = Selector.open(); + worker = new Worker(); + executor.execute(new NamePreservingRunnable(worker)); + } + } + + private void registerNew() + { + if (connectQueue.isEmpty()) + { + return; + } + + for (; ;) + { + ConnectionRequest req; + synchronized (connectQueue) + { + req = (ConnectionRequest) connectQueue.pop(); + } + + if (req == null) + { + break; + } + + SocketChannel ch = req.channel; + try + { + ch.register(selector, SelectionKey.OP_CONNECT, req); + } + catch (IOException e) + { + req.setException(e); + } + } + } + + private void processSessions(Set keys) + { + Iterator it = keys.iterator(); + + while (it.hasNext()) + { + SelectionKey key = (SelectionKey) it.next(); + + if (!key.isConnectable()) + { + continue; + } + + SocketChannel ch = (SocketChannel) key.channel(); + ConnectionRequest entry = (ConnectionRequest) key.attachment(); + + boolean success = false; + try + { + ch.finishConnect(); + newSession(ch, entry.handler, entry.config, entry); + success = true; + } + catch (Throwable e) + { + entry.setException(e); + } + finally + { + key.cancel(); + if (!success) + { + try + { + ch.close(); + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + } + } + } + } + + keys.clear(); + } + + private void processTimedOutSessions(Set keys) + { + long currentTime = System.currentTimeMillis(); + Iterator it = keys.iterator(); + + while (it.hasNext()) + { + SelectionKey key = (SelectionKey) it.next(); + + if (!key.isValid()) + { + continue; + } + + ConnectionRequest entry = (ConnectionRequest) key.attachment(); + + if (currentTime >= entry.deadline) + { + entry.setException(new ConnectException()); + try + { + key.channel().close(); + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + } + finally + { + key.cancel(); + } + } + } + } + + private void newSession(Socket socket, IoHandler handler, IoServiceConfig config, ConnectFuture connectFuture) + throws IOException + { + SocketSessionImpl session = new SocketSessionImpl(this, + nextProcessor(), + getListeners(), + config, + socket.getChannel(), + handler, + socket.getRemoteSocketAddress()); + + newSession(session, config, connectFuture); + } + + private void newSession(SocketChannel ch, IoHandler handler, IoServiceConfig config, ConnectFuture connectFuture) + throws IOException + + { + SocketSessionImpl session = new SocketSessionImpl(this, + nextProcessor(), + getListeners(), + config, + ch, + handler, + ch.socket().getRemoteSocketAddress()); + + newSession(session, config, connectFuture); + } + + private void newSession(SocketSessionImpl session, IoServiceConfig config, ConnectFuture connectFuture) + throws IOException + { + try + { + getFilterChainBuilder().buildFilterChain(session.getFilterChain()); + config.getFilterChainBuilder().buildFilterChain(session.getFilterChain()); + config.getThreadModel().buildFilterChain(session.getFilterChain()); + } + catch (Throwable e) + { + throw (IOException) new IOException("Failed to create a session.").initCause(e); + } + session.getIoProcessor().addNew(session); + connectFuture.setSession(session); + } + + private SocketIoProcessor nextProcessor() + { + return ioProcessors[processorDistributor++ % processorCount]; + } + + public void setOpenSocket(Socket openSocket) + { + _openSocket = openSocket; + } + + private class Worker implements Runnable + { + private long lastActive = System.currentTimeMillis(); + + public void run() + { + Thread.currentThread().setName(ExistingSocketConnector.this.threadName); + + for (; ;) + { + try + { + int nKeys = selector.select(1000); + + registerNew(); + + if (nKeys > 0) + { + processSessions(selector.selectedKeys()); + } + + processTimedOutSessions(selector.keys()); + + if (selector.keys().isEmpty()) + { + if (System.currentTimeMillis() - lastActive > workerTimeout * 1000L) + { + synchronized (lock) + { + if (selector.keys().isEmpty() && + connectQueue.isEmpty()) + { + worker = null; + try + { + selector.close(); + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + } + finally + { + selector = null; + } + break; + } + } + } + } + else + { + lastActive = System.currentTimeMillis(); + } + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + + try + { + Thread.sleep(1000); + } + catch (InterruptedException e1) + { + ExceptionMonitor.getInstance().exceptionCaught(e1); + } + } + } + } + } + + private class ConnectionRequest extends DefaultConnectFuture + { + private final SocketChannel channel; + private final long deadline; + private final IoHandler handler; + private final IoServiceConfig config; + + private ConnectionRequest(SocketChannel channel, IoHandler handler, IoServiceConfig config) + { + this.channel = channel; + long timeout; + if (config instanceof IoConnectorConfig) + { + timeout = ((IoConnectorConfig) config).getConnectTimeoutMillis(); + } + else + { + timeout = ((IoConnectorConfig) getDefaultConfig()).getConnectTimeoutMillis(); + } + this.deadline = System.currentTimeMillis() + timeout; + this.handler = handler; + this.config = config; + } + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java index c04380ba8c..89ce4b2c72 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java @@ -58,7 +58,8 @@ public class AMQBrokerDetails implements BrokerDetails { //todo this list of valid transports should be enumerated somewhere if ((!(transport.equalsIgnoreCase("vm") || - transport.equalsIgnoreCase("tcp")))) + transport.equalsIgnoreCase("tcp") || + transport.equalsIgnoreCase("socket")))) { if (transport.equalsIgnoreCase("localhost")) { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 39b3b80e74..acbe495550 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -30,6 +30,8 @@ import org.apache.qpid.client.failover.FailoverRetrySupport; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.transport.ITransportConnection; +import org.apache.qpid.client.transport.SocketTransportConnection; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.*; @@ -62,6 +64,7 @@ import javax.naming.Referenceable; import javax.naming.StringRefAddr; import java.io.IOException; import java.net.ConnectException; +import java.net.Socket; import java.nio.channels.UnresolvedAddressException; import java.text.MessageFormat; import java.util.*; @@ -157,6 +160,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private static final long DEFAULT_TIMEOUT = 1000 * 30; private ProtocolVersion _protocolVersion; + /** The active socket that is to be used as a value for connection */ + private Socket _openSocket; + /** * @param broker brokerdetails * @param username username @@ -173,7 +179,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect this(new AMQConnectionURL( ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='" - + AMQBrokerDetails.checkTransport(broker) + "'"), null); + + AMQBrokerDetails.checkTransport(broker) + "'"), null, null); } /** @@ -192,7 +198,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect this(new AMQConnectionURL( ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='" - + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig); + + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig, null); } public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost) @@ -217,26 +223,38 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect + "'" + "," + ConnectionURL.OPTIONS_SSL + "='true'") : (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port - + "'" + "," + ConnectionURL.OPTIONS_SSL + "='false'")), sslConfig); + + "'" + "," + ConnectionURL.OPTIONS_SSL + "='false'")), sslConfig, null); + } + + public AMQConnection(String connection, Socket socket) throws AMQException, URLSyntaxException + { + this(new AMQConnectionURL(connection), null, socket); } public AMQConnection(String connection) throws AMQException, URLSyntaxException { - this(new AMQConnectionURL(connection), null); + this(new AMQConnectionURL(connection), null, null); } public AMQConnection(String connection, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException { - this(new AMQConnectionURL(connection), sslConfig); + this(new AMQConnectionURL(connection), sslConfig, null); } public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException + { + this(connectionURL, sslConfig, null); + } + + public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig, Socket socket) throws AMQException { if (_logger.isInfoEnabled()) { _logger.info("Connection:" + connectionURL); } + _openSocket = socket; + _sslConfiguration = sslConfig; if (connectionURL == null) { @@ -395,9 +413,26 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED); try { - TransportConnection.getInstance(brokerDetail).connect(_protocolHandler, brokerDetail); - // this blocks until the connection has been set up or when an error - // has prevented the connection being set up + + ITransportConnection connection = TransportConnection.getInstance(brokerDetail); + + if (brokerDetail.getTransport().equals(BrokerDetails.SOCKET)) + { + if (_openSocket != null) + { + ((SocketTransportConnection) connection).setOpenSocket(_openSocket); + } + else + { + throw new IllegalArgumentException("Active Socket must be provided for broker " + + "with 'socket' transport:" + brokerDetail); + } + + } + + connection.connect(_protocolHandler, brokerDetail); + // this blocks until the connection has been set up or when an error + // has prevented the connection being set up //_protocolHandler.attainState(AMQState.CONNECTION_OPEN); AMQState state = _protocolHandler.attainState(openOrClosedStates); @@ -1292,6 +1327,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _sessions.get(channelId); } + public void setOpenSocket(Socket socket) + { + _openSocket = socket; + } + public ProtocolVersion getProtocolVersion() { return _protocolVersion; diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java index 5482e48699..e9d6242e77 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java @@ -24,6 +24,7 @@ import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.IoConnector; import org.apache.mina.common.SimpleByteBufferAllocator; +import org.apache.mina.transport.socket.nio.ExistingSocketConnector; import org.apache.mina.transport.socket.nio.SocketConnectorConfig; import org.apache.mina.transport.socket.nio.SocketSessionConfig; @@ -36,6 +37,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetSocketAddress; +import java.net.Socket; public class SocketTransportConnection implements ITransportConnection { @@ -44,6 +46,8 @@ public class SocketTransportConnection implements ITransportConnection private SocketConnectorFactory _socketConnectorFactory; + private Socket _openSocket; + static interface SocketConnectorFactory { IoConnector newSocketConnector(); @@ -54,6 +58,11 @@ public class SocketTransportConnection implements ITransportConnection _socketConnectorFactory = socketConnectorFactory; } + public void setOpenSocket(Socket openSocket) + { + _openSocket = openSocket; + } + public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException { ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers")); @@ -83,8 +92,31 @@ public class SocketTransportConnection implements ITransportConnection _logger.info("send-buffer-size = " + scfg.getSendBufferSize()); scfg.setReceiveBufferSize(Integer.getInteger("amqj.receiveBufferSize", DEFAULT_BUFFER_SIZE)); _logger.info("recv-buffer-size = " + scfg.getReceiveBufferSize()); - final InetSocketAddress address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort()); - _logger.info("Attempting connection to " + address); + + final InetSocketAddress address; + + if (brokerDetail.getTransport().equals(BrokerDetails.SOCKET)) + { + address = null; + + if (_openSocket != null) + { + _logger.info("Using existing Socket:" + _openSocket); + ((ExistingSocketConnector) ioConnector).setOpenSocket(_openSocket); + } + else + { + throw new IllegalArgumentException("Active Socket must be provided for broker " + + "with 'socket' transport:" + brokerDetail); + } + } + else + { + address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort()); + _logger.info("Attempting connection to " + address); + } + + ConnectFuture future = ioConnector.connect(address, protocolHandler); // wait for connection to complete diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index c1116ca01e..94361fccfc 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -23,6 +23,7 @@ package org.apache.qpid.client.transport; import org.apache.mina.common.IoConnector; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoServiceConfig; +import org.apache.mina.transport.socket.nio.ExistingSocketConnector; import org.apache.mina.transport.socket.nio.MultiThreadSocketConnector; import org.apache.mina.transport.socket.nio.SocketConnector; import org.apache.mina.transport.vmpipe.VmPipeAcceptor; @@ -54,6 +55,7 @@ public class TransportConnection private static final int TCP = 0; private static final int VM = 1; + private static final int SOCKET = 2; private static Logger _logger = LoggerFactory.getLogger(TransportConnection.class); @@ -87,7 +89,15 @@ public class TransportConnection switch (transport) { - + case SOCKET: + _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() + { + public IoConnector newSocketConnector() + { + return new ExistingSocketConnector(); + } + }); + break; case TCP: _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() { @@ -127,6 +137,11 @@ public class TransportConnection private static int getTransport(String transport) { + if (transport.equals(BrokerDetails.SOCKET)) + { + return SOCKET; + } + if (transport.equals(BrokerDetails.TCP)) { return TCP; diff --git a/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java b/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java index 603b0834a3..8b353a7264 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java +++ b/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java @@ -34,6 +34,7 @@ public interface BrokerDetails public static final String OPTIONS_CONNECT_DELAY = "connectdelay"; public static final int DEFAULT_PORT = 5672; + public static final String SOCKET = "socket"; public static final String TCP = "tcp"; public static final String VM = "vm"; diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java index 6c872a0e10..978ce34d59 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java @@ -510,6 +510,23 @@ public class ConnectionURLTest extends TestCase } + public void testSocketProtocol() throws URLSyntaxException + { + String url = "amqp://guest:guest@id/test" + "?brokerlist='socket:///'"; + + try + { + AMQConnectionURL curl = new AMQConnectionURL(url); + assertNotNull(curl); + assertEquals(1, curl.getBrokerCount()); + assertNotNull(curl.getBrokerDetails(0)); + assertEquals("socket", curl.getBrokerDetails(0).getTransport()); + } + catch (URLSyntaxException e) + { + fail(e.getMessage()); + } + } public static junit.framework.Test suite() { -- cgit v1.2.1 From 1df04df23ddda53cda350ddaeec692cb2f7cbed8 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Tue, 12 Feb 2008 16:44:59 +0000 Subject: QPID-787 : Allow for quoting of identifiers in selectors git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@620858 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'java/client') diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java index 40c712c1c9..e0bfc5d498 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java @@ -72,7 +72,7 @@ public class SelectorTest extends TestCase implements MessageListener connection.start(); String selector = null; - // selector = "Cost = 2 AND JMSDeliveryMode=" + DeliveryMode.NON_PERSISTENT; + selector = "Cost = 2 AND \"property-with-hyphen\" = 'wibble'"; // selector = "JMSType = Special AND Cost = 2 AND AMQMessageID > 0 AND JMSDeliveryMode=" + DeliveryMode.NON_PERSISTENT; _session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); @@ -87,6 +87,7 @@ public class SelectorTest extends TestCase implements MessageListener Message msg = _session.createTextMessage("Message"); msg.setJMSPriority(1); msg.setIntProperty("Cost", 2); + msg.setStringProperty("property-with-hyphen","wibble"); msg.setJMSType("Special"); _logger.info("Sending Message:" + msg); -- cgit v1.2.1 From 031b02ced99d04d103c02575913ea238e48221ac Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Tue, 12 Feb 2008 17:36:07 +0000 Subject: QPID-784 : Added ability to provide existing Socket to Qpid Client Libraries to use as for connection. Modified based on review by Robert Godfrey due to Thread safety around SocketTransportConnection.java and TransportConnection.java. Now use a safe Map to store all registered sockets in the TransportConnection.java these are then removed as used or on request. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@620876 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/ExistingSocketConnectorDemo.java | 16 +++++-- .../org/apache/qpid/client/AMQBrokerDetails.java | 14 ++++-- .../java/org/apache/qpid/client/AMQConnection.java | 50 +++------------------- .../transport/SocketTransportConnection.java | 20 ++++----- .../qpid/client/transport/TransportConnection.java | 17 +++++++- 5 files changed, 54 insertions(+), 63 deletions(-) (limited to 'java/client') diff --git a/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java b/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java index 0979c9c6b8..d7eb138523 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java @@ -23,6 +23,7 @@ package org.apache.qpid.example.transport; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.jms.ConnectionListener; import org.apache.qpid.url.URLSyntaxException; @@ -36,6 +37,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.channels.SocketChannel; +import java.util.UUID; /** * This is a simple application that demonstrates how you can use the Qpid AMQP interfaces to use existing sockets as @@ -66,9 +68,14 @@ public class ExistingSocketConnectorDemo implements ConnectionListener MessageProducer _producer; Session _session; + String Socket1_ID = UUID.randomUUID().toString(); + String Socket2_ID = UUID.randomUUID().toString(); + + /** Here we can see the broker we are connecting to is set to be 'socket:///' signifying we will provide the socket. */ - public static final String CONNECTION = "amqp://guest:guest@id/test?brokerlist='socket:///'"; + public final String CONNECTION = "amqp://guest:guest@id/test?brokerlist='socket://" + Socket1_ID + ";socket://" + Socket2_ID + "'"; + public ExistingSocketConnectorDemo() throws IOException, URLSyntaxException, AMQException, JMSException { @@ -76,7 +83,10 @@ public class ExistingSocketConnectorDemo implements ConnectionListener Socket socket = SocketChannel.open().socket(); socket.connect(new InetSocketAddress("localhost", 5672)); - _connection = new AMQConnection(CONNECTION, socket); + TransportConnection.registerOpenSocket(Socket1_ID, socket); + + + _connection = new AMQConnection(CONNECTION); _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -130,7 +140,7 @@ public class ExistingSocketConnectorDemo implements ConnectionListener socket.connect(new InetSocketAddress("localhost", 5673)); // This is the new method to pass in an open socket for the connection to use. - ((AMQConnection) _connection).setOpenSocket(socket); + TransportConnection.registerOpenSocket(Socket2_ID, socket); } catch (IOException e) { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java index 89ce4b2c72..572ea48f85 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java @@ -157,7 +157,10 @@ public class AMQBrokerDetails implements BrokerDetails } else { - setPort(port); + if (!_transport.equalsIgnoreCase(SOCKET)) + { + setPort(port); + } } String queryString = connection.getQuery(); @@ -264,13 +267,16 @@ public class AMQBrokerDetails implements BrokerDetails sb.append(_transport); sb.append("://"); - if (!(_transport.equalsIgnoreCase("vm"))) + if (!(_transport.equalsIgnoreCase(VM))) { sb.append(_host); } - sb.append(':'); - sb.append(_port); + if (!(_transport.equalsIgnoreCase(SOCKET))) + { + sb.append(':'); + sb.append(_port); + } sb.append(printOptionsURL()); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index acbe495550..c9928a084e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -30,8 +30,6 @@ import org.apache.qpid.client.failover.FailoverRetrySupport; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; -import org.apache.qpid.client.transport.ITransportConnection; -import org.apache.qpid.client.transport.SocketTransportConnection; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.*; @@ -64,7 +62,6 @@ import javax.naming.Referenceable; import javax.naming.StringRefAddr; import java.io.IOException; import java.net.ConnectException; -import java.net.Socket; import java.nio.channels.UnresolvedAddressException; import java.text.MessageFormat; import java.util.*; @@ -160,8 +157,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private static final long DEFAULT_TIMEOUT = 1000 * 30; private ProtocolVersion _protocolVersion; - /** The active socket that is to be used as a value for connection */ - private Socket _openSocket; /** * @param broker brokerdetails @@ -179,7 +174,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect this(new AMQConnectionURL( ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='" - + AMQBrokerDetails.checkTransport(broker) + "'"), null, null); + + AMQBrokerDetails.checkTransport(broker) + "'"), null); } /** @@ -198,7 +193,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect this(new AMQConnectionURL( ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='" - + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig, null); + + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig); } public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost) @@ -223,38 +218,26 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect + "'" + "," + ConnectionURL.OPTIONS_SSL + "='true'") : (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port - + "'" + "," + ConnectionURL.OPTIONS_SSL + "='false'")), sslConfig, null); - } - - public AMQConnection(String connection, Socket socket) throws AMQException, URLSyntaxException - { - this(new AMQConnectionURL(connection), null, socket); + + "'" + "," + ConnectionURL.OPTIONS_SSL + "='false'")), sslConfig); } public AMQConnection(String connection) throws AMQException, URLSyntaxException { - this(new AMQConnectionURL(connection), null, null); + this(new AMQConnectionURL(connection), null); } public AMQConnection(String connection, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException { - this(new AMQConnectionURL(connection), sslConfig, null); + this(new AMQConnectionURL(connection), sslConfig); } public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException - { - this(connectionURL, sslConfig, null); - } - - public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig, Socket socket) throws AMQException { if (_logger.isInfoEnabled()) { _logger.info("Connection:" + connectionURL); } - _openSocket = socket; - _sslConfiguration = sslConfig; if (connectionURL == null) { @@ -414,23 +397,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect try { - ITransportConnection connection = TransportConnection.getInstance(brokerDetail); - - if (brokerDetail.getTransport().equals(BrokerDetails.SOCKET)) - { - if (_openSocket != null) - { - ((SocketTransportConnection) connection).setOpenSocket(_openSocket); - } - else - { - throw new IllegalArgumentException("Active Socket must be provided for broker " + - "with 'socket' transport:" + brokerDetail); - } - - } - - connection.connect(_protocolHandler, brokerDetail); + TransportConnection.getInstance(brokerDetail).connect(_protocolHandler, brokerDetail); // this blocks until the connection has been set up or when an error // has prevented the connection being set up @@ -1327,11 +1294,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _sessions.get(channelId); } - public void setOpenSocket(Socket socket) - { - _openSocket = socket; - } - public ProtocolVersion getProtocolVersion() { return _protocolVersion; diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java index e9d6242e77..b2f7ae8395 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java @@ -38,6 +38,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public class SocketTransportConnection implements ITransportConnection { @@ -46,8 +48,6 @@ public class SocketTransportConnection implements ITransportConnection private SocketConnectorFactory _socketConnectorFactory; - private Socket _openSocket; - static interface SocketConnectorFactory { IoConnector newSocketConnector(); @@ -58,11 +58,6 @@ public class SocketTransportConnection implements ITransportConnection _socketConnectorFactory = socketConnectorFactory; } - public void setOpenSocket(Socket openSocket) - { - _openSocket = openSocket; - } - public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException { ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers")); @@ -99,15 +94,18 @@ public class SocketTransportConnection implements ITransportConnection { address = null; - if (_openSocket != null) + Socket socket = TransportConnection.removeOpenSocket(brokerDetail.getHost()); + + if (socket != null) { - _logger.info("Using existing Socket:" + _openSocket); - ((ExistingSocketConnector) ioConnector).setOpenSocket(_openSocket); + _logger.info("Using existing Socket:" + socket); + + ((ExistingSocketConnector) ioConnector).setOpenSocket(socket); } else { throw new IllegalArgumentException("Active Socket must be provided for broker " + - "with 'socket' transport:" + brokerDetail); + "with 'socket://' transport:" + brokerDetail); } } else diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 94361fccfc..0ea04e5bc3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -37,6 +37,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.net.Socket; + /** * The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up the underlying @@ -61,6 +64,18 @@ public class TransportConnection private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQPFastProtocolHandler"; + private static Map _openSocketRegister = new ConcurrentHashMap(); + + public static void registerOpenSocket(String socketID, Socket openSocket) + { + _openSocketRegister.put(socketID, openSocket); + } + + public static Socket removeOpenSocket(String socketID) + { + return _openSocketRegister.remove(socketID); + } + public static ITransportConnection getInstance(BrokerDetails details) throws AMQTransportConnectionException { int transport = getTransport(details.getTransport()); @@ -305,7 +320,7 @@ public class TransportConnection synchronized (_inVmPipeAddress) { _inVmPipeAddress.clear(); - } + } _acceptor = null; _currentInstance = -1; _currentVMPort = -1; -- cgit v1.2.1 From 855aa3ce41f07c28071cebde64df12349bdcec51 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Wed, 13 Feb 2008 11:00:13 +0000 Subject: QPID-788 : Changed MAXIMUM_STATE_WAIT_TIME to pickup value via -Damqj.MaximumStateWait=90000 or default to 30000. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@627354 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/main/java/org/apache/qpid/client/state/AMQStateManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index b6baefe1b0..e62fce0f60 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -54,7 +54,7 @@ public class AMQStateManager implements AMQMethodListener private final CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet(); private final Object _stateLock = new Object(); - private static final long MAXIMUM_STATE_WAIT_TIME = 30000L; + private static final long MAXIMUM_STATE_WAIT_TIME = Long.parseLong(System.getProperty("amqj.MaximumStateWait", "30000")); public AMQStateManager() { -- cgit v1.2.1 From 45072deb936db16404f3746eeb9dcb74e4ecd3df Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Tue, 19 Feb 2008 16:53:57 +0000 Subject: Qpid-594: make AMQConnection listen for exceptions that are thrown asynchronously in it's constructor and do something appropriate with them git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@629158 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQConnection.java | 41 ++++++++++++++++++++-- .../qpid/client/protocol/AMQProtocolHandler.java | 1 + .../unit/client/connection/ConnectionTest.java | 16 +++++++++ 3 files changed, 55 insertions(+), 3 deletions(-) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index c9928a084e..79d92f7705 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -39,6 +39,7 @@ import org.apache.qpid.jms.Connection; import org.apache.qpid.jms.ConnectionListener; import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.jms.FailoverPolicy; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.url.URLSyntaxException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -233,6 +234,26 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException { + final ArrayList exceptions = new ArrayList(); + + class Listener implements ExceptionListener + { + public void onException(JMSException e) + { + exceptions.add(e); + } + } + + try + { + setExceptionListener(new Listener()); + } + catch (JMSException e) + { + // Shouldn't happen + throw new AMQException(null, null, e); + } + if (_logger.isInfoEnabled()) { _logger.info("Connection:" + connectionURL); @@ -289,8 +310,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect try { makeBrokerConnection(_failoverPolicy.getNextBrokerDetails()); - lastException = null; - _connected = true; } catch (Exception e) { @@ -318,7 +337,23 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { String message = null; - if (lastException != null) + if (exceptions.size() > 0) + { + JMSException e = exceptions.get(exceptions.size() - 1); + int code = -1; + try + { + code = new Integer(e.getErrorCode()).intValue(); + } + catch (NumberFormatException nfe) + { + // Ignore this, we have some error codes and messages swapped around + } + + throw new AMQConnectionFailureException(AMQConstant.getConstant(code), + e.getMessage(), e); + } + else if (lastException != null) { if (lastException.getCause() != null) { diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 8a1e78d2e0..f70c1faa84 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -361,6 +361,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter // this will attemp failover sessionClosed(session); + _connection.exceptionReceived(cause); } else { diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java index 56394fee27..7103397ad4 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java @@ -165,6 +165,22 @@ public class ConnectionTest extends TestCase } } + public void testUnresolvedVirtualHostFailure() throws Exception + { + try + { + new AMQConnection("amqp://guest:guest@clientid/rubbishhost?brokerlist='" + _broker + "?retries='0''"); + fail("Connection should not be established"); + } + catch (AMQException amqe) + { + if (!(amqe instanceof AMQConnectionFailureException)) + { + fail("Correct exception not thrown. Excpected 'AMQConnectionFailureException' got: " + amqe); + } + } + } + public void testClientIdCannotBeChanged() throws Exception { Connection connection = new AMQConnection(_broker, "guest", "guest", -- cgit v1.2.1 From bc9cf0d495598b359fc1a67d03f4636ca610c6a9 Mon Sep 17 00:00:00 2001 From: Rupert Smith Date: Wed, 20 Feb 2008 16:04:25 +0000 Subject: QPID-800 : junit toolkit sources added. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@629518 13f79535-47bb-0310-9956-ffa450edef68 --- java/client/pom.xml | 2 +- java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 1 - .../test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java | 4 ++-- 3 files changed, 3 insertions(+), 4 deletions(-) (limited to 'java/client') diff --git a/java/client/pom.xml b/java/client/pom.xml index 5fe9cbf528..82c7f67faf 100644 --- a/java/client/pom.xml +++ b/java/client/pom.xml @@ -79,7 +79,7 @@ - uk.co.thebadgerset + org.apache.qpid junit-toolkit test diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 64895fe16c..0c94216597 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -43,7 +43,6 @@ import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.*; import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; -import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java index 5a61480f6a..ee110e7932 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java @@ -29,8 +29,8 @@ import org.apache.qpid.client.transport.TransportConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import uk.co.thebadgerset.junit.concurrency.TestRunnable; -import uk.co.thebadgerset.junit.concurrency.ThreadTestCoordinator; +import org.apache.qpid.junit.concurrency.TestRunnable; +import org.apache.qpid.junit.concurrency.ThreadTestCoordinator; import javax.jms.Connection; import javax.jms.Message; -- cgit v1.2.1 From 3aed99f65d795c234faa9b584182cf3ea8c67b4a Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Wed, 20 Feb 2008 17:12:32 +0000 Subject: QPID-786 Remove bogus Failover timeout, add test. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@629540 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/main/java/org/apache/qpid/jms/FailoverPolicy.java | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java b/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java index 6ec883ff0b..8e3ccc3b02 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java +++ b/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java @@ -34,7 +34,6 @@ public class FailoverPolicy private static final long MINUTE = 60000L; private static final long DEFAULT_METHOD_TIMEOUT = 1 * MINUTE; - private static final long DEFAULT_FAILOVER_TIMEOUT = 4 * MINUTE; private FailoverMethod[] _methods = new FailoverMethod[1]; @@ -161,16 +160,7 @@ public class FailoverPolicy } else { - if ((now - _lastFailTime) >= DEFAULT_FAILOVER_TIMEOUT) - { - _logger.info("Failover timeout"); - - return false; - } - else - { - _lastMethodTime = now; - } + _lastMethodTime = now; } } else -- cgit v1.2.1 From 3047c0ec2d581f4b51c77fec84fbf0bec8599573 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Thu, 21 Feb 2008 10:09:03 +0000 Subject: QPID-790 : Performance Improvements git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@629731 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQConnection.java | 111 ++++++++++- .../java/org/apache/qpid/client/AMQSession.java | 206 +++++++++++++++++---- .../apache/qpid/client/BasicMessageConsumer.java | 63 ++++--- .../apache/qpid/client/BasicMessageProducer.java | 12 ++ .../client/handler/BasicDeliverMethodHandler.java | 5 +- .../client/handler/BasicReturnMethodHandler.java | 5 +- .../client/handler/ChannelFlowMethodHandler.java | 54 ++++++ .../qpid/client/message/AbstractJMSMessage.java | 53 +++--- .../apache/qpid/client/message/JMSMapMessage.java | 8 +- .../qpid/client/message/UnprocessedMessage.java | 93 ++++++---- .../qpid/client/protocol/AMQProtocolHandler.java | 123 +++++------- .../qpid/client/protocol/AMQProtocolSession.java | 74 ++++++-- .../apache/qpid/client/state/AMQStateManager.java | 17 +- .../qpid/client/transport/TransportConnection.java | 28 +-- 14 files changed, 600 insertions(+), 252 deletions(-) create mode 100644 java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 79d92f7705..b60a8dfaad 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -73,6 +73,105 @@ import java.util.concurrent.atomic.AtomicInteger; public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable { + private static final class ChannelToSessionMap + { + private final AMQSession[] _fastAccessSessions = new AMQSession[16]; + private final LinkedHashMap _slowAccessSessions = new LinkedHashMap(); + private int _size = 0; + private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; + + public AMQSession get(int channelId) + { + if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) + { + return _fastAccessSessions[channelId]; + } + else + { + return _slowAccessSessions.get(channelId); + } + } + + public AMQSession put(int channelId, AMQSession session) + { + AMQSession oldVal; + if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) + { + oldVal = _fastAccessSessions[channelId]; + _fastAccessSessions[channelId] = session; + } + else + { + oldVal = _slowAccessSessions.put(channelId, session); + } + if((oldVal != null) && (session == null)) + { + _size--; + } + else if((oldVal == null) && (session != null)) + { + _size++; + } + + return session; + + } + + + public AMQSession remove(int channelId) + { + AMQSession session; + if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) + { + session = _fastAccessSessions[channelId]; + _fastAccessSessions[channelId] = null; + } + else + { + session = _slowAccessSessions.remove(channelId); + } + + if(session != null) + { + _size--; + } + return session; + + } + + public Collection values() + { + ArrayList values = new ArrayList(size()); + + for(int i = 0; i < 16; i++) + { + if(_fastAccessSessions[i] != null) + { + values.add(_fastAccessSessions[i]); + } + } + values.addAll(_slowAccessSessions.values()); + + return values; + } + + public int size() + { + return _size; + } + + public void clear() + { + _size = 0; + _slowAccessSessions.clear(); + for(int i = 0; i<16; i++) + { + _fastAccessSessions[i] = null; + } + } + } + + private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class); private AtomicInteger _idFactory = new AtomicInteger(0); @@ -102,7 +201,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private AMQProtocolHandler _protocolHandler; /** Maps from session id (Integer) to AMQSession instance */ - private final Map _sessions = new LinkedHashMap(); + private final ChannelToSessionMap _sessions = new ChannelToSessionMap(); private String _clientName; @@ -757,10 +856,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect checkNotClosed(); if (!_started) { - final Iterator it = _sessions.entrySet().iterator(); + final Iterator it = _sessions.values().iterator(); while (it.hasNext()) { - final AMQSession s = (AMQSession) ((Map.Entry) it.next()).getValue(); + final AMQSession s = (AMQSession) (it.next()); try { s.start(); @@ -1014,11 +1113,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _maximumFrameSize; } - public Map getSessions() - { - return _sessions; - } - public String getUsername() { return _username; @@ -1239,6 +1333,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // _protocolHandler.addSessionByChannel(s.getChannelId(), s); reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted()); s.resubscribe(); + s.setFlowControl(true); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 0c94216597..87c813982e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -78,10 +78,7 @@ import javax.jms.TopicSubscriber; import javax.jms.TransactionRolledBackException; import java.io.Serializable; import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.Map; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; @@ -107,6 +104,89 @@ import java.util.concurrent.atomic.AtomicLong; */ public class AMQSession extends Closeable implements Session, QueueSession, TopicSession { + private static final class IdToConsumerMap + { + private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; + private final ConcurrentHashMap _slowAccessConsumers = new ConcurrentHashMap(); + + + public BasicMessageConsumer get(int id) + { + if((id & 0xFFFFFFF0) == 0) + { + return _fastAccessConsumers[id]; + } + else + { + return _slowAccessConsumers.get(id); + } + } + + public BasicMessageConsumer put(int id, BasicMessageConsumer consumer) + { + BasicMessageConsumer oldVal; + if((id & 0xFFFFFFF0) == 0) + { + oldVal = _fastAccessConsumers[id]; + _fastAccessConsumers[id] = consumer; + } + else + { + oldVal = _slowAccessConsumers.put(id, consumer); + } + + return consumer; + + } + + + public BasicMessageConsumer remove(int id) + { + BasicMessageConsumer consumer; + if((id & 0xFFFFFFF0) == 0) + { + consumer = _fastAccessConsumers[id]; + _fastAccessConsumers[id] = null; + } + else + { + consumer = _slowAccessConsumers.remove(id); + } + + return consumer; + + } + + public Collection values() + { + ArrayList values = new ArrayList(); + + for(int i = 0; i < 16; i++) + { + if(_fastAccessConsumers[i] != null) + { + values.add(_fastAccessConsumers[i]); + } + } + values.addAll(_slowAccessConsumers.values()); + + return values; + } + + + public void clear() + { + _slowAccessConsumers.clear(); + for(int i = 0; i<16; i++) + { + _fastAccessConsumers[i] = null; + } + } + } + + + + /** Used for debugging. */ private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class); @@ -156,7 +236,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private boolean _transacted; /** Holds the sessions acknowledgement mode. */ - private int _acknowledgeMode; + private final int _acknowledgeMode; /** Holds this session unique identifier, used to distinguish it from other sessions. */ private int _channelId; @@ -217,8 +297,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right * consumer. */ - private Map _consumers = - new ConcurrentHashMap(); + private final IdToConsumerMap _consumers = new IdToConsumerMap(); + + //Map _consumers = + //new ConcurrentHashMap(); /** * Contains a list of consumers which have been removed but which might still have @@ -281,6 +363,27 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** Has failover occured on this session */ private boolean _failedOver; + + + private static final class FlowControlIndicator + { + private volatile boolean _flowControl = true; + + public synchronized void setFlowControl(boolean flowControl) + { + _flowControl= flowControl; + notify(); + } + + public boolean getFlowControl() + { + return _flowControl; + } + } + + /** Flow control */ + private FlowControlIndicator _flowControl = new FlowControlIndicator(); + /** * Creates a new session on a connection. * @@ -327,24 +430,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { public void aboveThreshold(int currentValue) { - if (_acknowledgeMode == NO_ACKNOWLEDGE) - { _logger.debug( "Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + currentValue); new Thread(new SuspenderRunner(true)).start(); - } + } public void underThreshold(int currentValue) { - if (_acknowledgeMode == NO_ACKNOWLEDGE) - { _logger.debug( "Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + currentValue); new Thread(new SuspenderRunner(false)).start(); - } + } }); } @@ -662,7 +761,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { // Remove the consumer from the map - BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag); + BasicMessageConsumer consumer = _consumers.get(consumerTag.toIntValue()); if (consumer != null) { // fixme this isn't right.. needs to check if _queue contains data for this consumer @@ -744,6 +843,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi false, false); } + + public MessageConsumer createExclusiveConsumer(Destination destination) throws JMSException + { + checkValidDestination(destination); + + return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, true, null, null, + false, false); + } + + public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { checkValidDestination(destination); @@ -761,6 +870,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi messageSelector, null, false, false); } + + public MessageConsumer createExclusiveConsumer(Destination destination, String messageSelector, boolean noLocal) + throws JMSException + { + checkValidDestination(destination); + + return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, true, + messageSelector, null, false, false); + } + + public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive, String selector) throws JMSException { @@ -925,7 +1045,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkNotClosed(); - return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic), topic); + return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic,false,false), topic); } public Queue createQueue(String queueName) throws JMSException @@ -1089,9 +1209,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi AMQTopic dest = checkValidTopic(topic); // AMQTopic dest = new AMQTopic(topic.getTopicName()); - return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); + return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest)); } + /** * Creates a non-durable subscriber with a message selector * @@ -1109,7 +1230,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi AMQTopic dest = checkValidTopic(topic); // AMQTopic dest = new AMQTopic(topic.getTopicName()); - return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal)); + return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest, messageSelector, noLocal)); } public TemporaryQueue createTemporaryQueue() throws JMSException @@ -1276,15 +1397,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi + "] received in session with channel id " + _channelId); } - if (message.getDeliverBody() == null) + if (message.isDeliverMessage()) { - // Return of the bounced message. - returnBouncedMessage(message); + _highestDeliveryTag.set(message.getDeliverBody().getDeliveryTag()); + _queue.add(message); } else { - _highestDeliveryTag.set(message.getDeliverBody().getDeliveryTag()); - _queue.add(message); + // Return of the bounced message. + returnBouncedMessage(message); } } @@ -1666,7 +1787,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ void deregisterConsumer(BasicMessageConsumer consumer) { - if (_consumers.remove(consumer.getConsumerTag()) != null) + if (_consumers.remove(consumer.getConsumerTag().toIntValue()) != null) { String subscriptionName = _reverseSubscriptionMap.remove(consumer); if (subscriptionName != null) @@ -2063,8 +2184,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException { + int tagId = _nextTag++; // need to generate a consumer tag on the client so we can exploit the nowait flag - AMQShortString tag = new AMQShortString(Integer.toString(_nextTag++)); + AMQShortString tag = new AMQShortString(Integer.toString(tagId)); FieldTable arguments = FieldTableFactory.newFieldTable(); if ((messageSelector != null) && !messageSelector.equals("")) @@ -2084,7 +2206,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi consumer.setConsumerTag(tag); // we must register the consumer in the map before we actually start listening - _consumers.put(tag, consumer); + _consumers.put(tagId, consumer); try { @@ -2112,7 +2234,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi catch (AMQException e) { // clean-up the map in the event of an error - _consumers.remove(tag); + _consumers.remove(tagId); throw e; } } @@ -2659,6 +2781,25 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _ticket = ticket; } + public void setFlowControl(final boolean active) + { + _flowControl.setFlowControl(active); + } + + + public void checkFlowControl() throws InterruptedException + { + synchronized(_flowControl) + { + while(!_flowControl.getFlowControl()) + { + _flowControl.wait(); + } + } + + } + + /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ private class Dispatcher extends Thread { @@ -2850,10 +2991,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private void dispatchMessage(UnprocessedMessage message) { - if (message.getDeliverBody() != null) + final BasicDeliverBody deliverBody = message.getDeliverBody(); + if (deliverBody != null) { final BasicMessageConsumer consumer = - (BasicMessageConsumer) _consumers.get(message.getDeliverBody().getConsumerTag()); + _consumers.get(deliverBody.getConsumerTag().toIntValue()); if ((consumer == null) || consumer.isClosed()) { @@ -2862,13 +3004,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (consumer == null) { _dispatcherLogger.info("Dispatcher(" + dispatcherID + ")Received a message(" + System.identityHashCode(message) + ")" + "[" - + message.getDeliverBody().getDeliveryTag() + "] from queue " - + message.getDeliverBody().getConsumerTag() + " )without a handler - rejecting(requeue)..."); + + deliverBody.getDeliveryTag() + "] from queue " + + deliverBody.getConsumerTag() + " )without a handler - rejecting(requeue)..."); } else { _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "[" - + message.getDeliverBody().getDeliveryTag() + "] from queue " + " consumer(" + + deliverBody.getDeliveryTag() + "] from queue " + " consumer(" + consumer.debugIdentity() + ") is closed rejecting(requeue)..."); } } @@ -2880,7 +3022,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } else { - consumer.notifyMessage(message, _channelId); + consumer.notifyMessage(message); } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 488d22c4bd..bf11572163 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -26,11 +26,7 @@ import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicCancelBody; -import org.apache.qpid.framing.BasicCancelOkBody; -import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.*; import org.apache.qpid.jms.MessageConsumer; import org.apache.qpid.jms.Session; import org.slf4j.Logger; @@ -53,13 +49,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class); /** The connection being used by this consumer */ - private AMQConnection _connection; + private final AMQConnection _connection; - private String _messageSelector; + private final String _messageSelector; - private boolean _noLocal; + private final boolean _noLocal; - private AMQDestination _destination; + private final AMQDestination _destination; /** When true indicates that a blocking receive call is in progress */ private final AtomicBoolean _receiving = new AtomicBoolean(false); @@ -70,7 +66,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private AMQShortString _consumerTag; /** We need to know the channel id when constructing frames */ - private int _channelId; + private final int _channelId; /** * Used in the blocking receive methods to receive a message from the Session thread.

Or to notify of errors @@ -78,36 +74,36 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer */ private final ArrayBlockingQueue _synchronousQueue; - private MessageFactoryRegistry _messageFactory; + private final MessageFactoryRegistry _messageFactory; private final AMQSession _session; - private AMQProtocolHandler _protocolHandler; + private final AMQProtocolHandler _protocolHandler; /** We need to store the "raw" field table so that we can resubscribe in the event of failover being required */ - private FieldTable _rawSelectorFieldTable; + private final FieldTable _rawSelectorFieldTable; /** * We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of * failover */ - private int _prefetchHigh; + private final int _prefetchHigh; /** * We store the low water prefetch field in order to be able to reuse it when resubscribing in the event of * failover */ - private int _prefetchLow; + private final int _prefetchLow; /** We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover */ - private boolean _exclusive; + private final boolean _exclusive; /** * The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per * consumer whereas JMS defines this at the session level, hence why we associate it with the consumer in our * implementation. */ - private int _acknowledgeMode; + private final int _acknowledgeMode; /** Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode */ private int _outstanding; @@ -133,10 +129,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer * autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive * on the queue. This is used for queue browsing. */ - private boolean _autoClose; + private final boolean _autoClose; private boolean _closeWhenNoMessages; - private boolean _noConsume; + private final boolean _noConsume; private List _closedStack = null; protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, @@ -156,7 +152,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer _prefetchHigh = prefetchHigh; _prefetchLow = prefetchLow; _exclusive = exclusive; - _acknowledgeMode = acknowledgeMode; + _synchronousQueue = new ArrayBlockingQueue(prefetchHigh, true); _autoClose = autoClose; _noConsume = noConsume; @@ -166,6 +162,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _acknowledgeMode = Session.NO_ACKNOWLEDGE; } + else + { + _acknowledgeMode = acknowledgeMode; + } } public AMQDestination getDestination() @@ -254,7 +254,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer switch (_acknowledgeMode) { case Session.DUPS_OK_ACKNOWLEDGE: - _logger.info("Recording tag for acking on close:" + msg.getDeliveryTag()); _receivedDeliveryTags.add(msg.getDeliveryTag()); break; @@ -269,7 +268,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } else { - _logger.info("Recording tag for commit:" + msg.getDeliveryTag()); _receivedDeliveryTags.add(msg.getDeliveryTag()); } @@ -645,9 +643,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer * message listener or a synchronous receive() caller. * * @param messageFrame the raw unprocessed mesage - * @param channelId channel on which this message was sent */ - void notifyMessage(UnprocessedMessage messageFrame, int channelId) + void notifyMessage(UnprocessedMessage messageFrame) { final boolean debug = _logger.isDebugEnabled(); @@ -658,10 +655,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer try { + final BasicDeliverBody deliverBody = messageFrame.getDeliverBody(); + AbstractJMSMessage jmsMessage = - _messageFactory.createMessage(messageFrame.getDeliverBody().getDeliveryTag(), - messageFrame.getDeliverBody().getRedelivered(), messageFrame.getDeliverBody().getExchange(), - messageFrame.getDeliverBody().getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies()); + _messageFactory.createMessage(deliverBody.getDeliveryTag(), + deliverBody.getRedelivered(), + deliverBody.getExchange(), + deliverBody.getRoutingKey(), + messageFrame.getContentHeader(), + messageFrame.getBodies()); if (debug) { @@ -673,11 +675,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer // if (!_closed.get()) { - jmsMessage.setConsumer(this); - preDeliver(jmsMessage); - notifyMessage(jmsMessage, channelId); + notifyMessage(jmsMessage); } // else // { @@ -702,9 +702,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer /** * @param jmsMessage this message has already been processed so can't redo preDeliver - * @param channelId */ - public void notifyMessage(AbstractJMSMessage jmsMessage, int channelId) + public void notifyMessage(AbstractJMSMessage jmsMessage) { try { diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 7e96fb537c..ae71846870 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -538,6 +538,18 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j frames[0] = publishFrame; frames[1] = contentHeaderFrame; CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames); + + try + { + _session.checkFlowControl(); + } + catch (InterruptedException e) + { + JMSException jmsEx = new JMSException("Interrupted while waiting for flow control to be removed"); + jmsEx.setLinkedException(e); + throw jmsEx; + } + _protocolHandler.writeFrame(compositeFrame, wait); if (message != origMessage) diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java index 49c8a83833..d05e99d210 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java @@ -26,7 +26,6 @@ import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.BasicDeliverBody; -import org.apache.qpid.protocol.AMQMethodEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,8 +45,8 @@ public class BasicDeliverMethodHandler implements StateAwareMethodListener +{ + private static final Logger _logger = LoggerFactory.getLogger(ChannelFlowMethodHandler.class); + private static final ChannelFlowMethodHandler _instance = new ChannelFlowMethodHandler(); + + public static ChannelFlowMethodHandler getInstance() + { + return _instance; + } + + private ChannelFlowMethodHandler() + { } + + public void methodReceived(AMQStateManager stateManager, ChannelFlowBody body, int channelId) + throws AMQException + { + + final AMQProtocolSession session = stateManager.getProtocolSession(); + session.setFlowControl(channelId, body.getActive()); + } + + +} diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index b029770946..1947a18653 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -55,8 +55,9 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach protected boolean _changedData; private Destination _destination; private JMSHeaderAdapter _headerAdapter; - private BasicMessageConsumer _consumer; - private boolean _strictAMQP; + + private static final boolean STRICT_AMQP_COMPLIANCE = + Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT)); protected AbstractJMSMessage(ByteBuffer data) { @@ -72,8 +73,6 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach _changedData = (data == null); _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders()); - _strictAMQP = - Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT)); } protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange, @@ -121,7 +120,10 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach { if (getContentHeaderProperties().getMessageIdAsString() == null) { - getContentHeaderProperties().setMessageId("ID:" + UUID.randomUUID()); + StringBuilder b = new StringBuilder(39); + b.append("ID"); + b.append(UUID.randomUUID()); + getContentHeaderProperties().setMessageId(b.toString()); } return getContentHeaderProperties().getMessageIdAsString(); @@ -301,7 +303,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public boolean getBooleanProperty(AMQShortString propertyName) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -311,7 +313,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public boolean getBooleanProperty(String propertyName) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -321,7 +323,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public byte getByteProperty(String propertyName) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -331,7 +333,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public byte[] getBytesProperty(AMQShortString propertyName) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -341,7 +343,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public short getShortProperty(String propertyName) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -351,7 +353,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public int getIntProperty(String propertyName) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -361,7 +363,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public long getLongProperty(String propertyName) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -371,7 +373,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public float getFloatProperty(String propertyName) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -381,7 +383,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public double getDoubleProperty(String propertyName) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -404,7 +406,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach } else { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -425,7 +427,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public void setBooleanProperty(AMQShortString propertyName, boolean b) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -436,7 +438,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public void setBooleanProperty(String propertyName, boolean b) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -447,7 +449,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public void setByteProperty(String propertyName, byte b) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -458,7 +460,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public void setBytesProperty(AMQShortString propertyName, byte[] bytes) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -469,7 +471,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public void setShortProperty(String propertyName, short i) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -487,7 +489,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public void setLongProperty(String propertyName, long l) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -498,7 +500,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public void setFloatProperty(String propertyName, float f) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -509,7 +511,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public void setDoubleProperty(String propertyName, double v) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -691,9 +693,4 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach } } - public void setConsumer(BasicMessageConsumer basicMessageConsumer) - { - _consumer = basicMessageConsumer; - } - } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java index a70acbabbe..d8fe964b85 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java @@ -55,7 +55,11 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm JMSMapMessage(ByteBuffer data) throws JMSException { super(data); // this instantiates a content header - populateMapFromData(); + if(data != null) + { + populateMapFromData(); + } + } JMSMapMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, AMQShortString routingKey, @@ -76,7 +80,7 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm public String toBodyString() throws JMSException { - return _map.toString(); + return _map == null ? "" : _map.toString(); } public AMQShortString getMimeTypeAsShortString() diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java index 5b199f2478..bc1ba155cb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java @@ -36,33 +36,15 @@ import org.apache.qpid.framing.ContentHeaderBody; * Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher * thread in order to minimise the amount of work done in the MINA dispatcher thread. */ -public class UnprocessedMessage +public abstract class UnprocessedMessage { - private long _bytesReceived = 0; + private long _bytesReceived = 0L; - private final BasicDeliverBody _deliverBody; - private final BasicReturnBody _bounceBody; // TODO: check change (gustavo) - private final int _channelId; private ContentHeaderBody _contentHeader; /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */ private List _bodies; - public UnprocessedMessage(int channelId, BasicDeliverBody deliverBody) - { - _deliverBody = deliverBody; - _channelId = channelId; - _bounceBody = null; - } - - - public UnprocessedMessage(int channelId, BasicReturnBody bounceBody) - { - _deliverBody = null; - _channelId = channelId; - _bounceBody = bounceBody; - } - public void receiveBody(ContentBody body) //throws UnexpectedBodyReceivedException { @@ -96,22 +78,11 @@ public class UnprocessedMessage return _bytesReceived == getContentHeader().bodySize; } - public BasicDeliverBody getDeliverBody() - { - return _deliverBody; - } - public BasicReturnBody getBounceBody() - { - return _bounceBody; - } + abstract public BasicDeliverBody getDeliverBody(); - public int getChannelId() - { - return _channelId; - } - + abstract public BasicReturnBody getBounceBody(); public ContentHeaderBody getContentHeader() { @@ -128,4 +99,60 @@ public class UnprocessedMessage return _bodies; } + abstract public boolean isDeliverMessage(); + + public static final class UnprocessedDeliverMessage extends UnprocessedMessage + { + private final BasicDeliverBody _body; + + public UnprocessedDeliverMessage(final BasicDeliverBody body) + { + _body = body; + } + + + public BasicDeliverBody getDeliverBody() + { + return _body; + } + + public BasicReturnBody getBounceBody() + { + return null; + } + + public boolean isDeliverMessage() + { + return true; + } + } + + public static final class UnprocessedBouncedMessage extends UnprocessedMessage + { + private final BasicReturnBody _body; + + public UnprocessedBouncedMessage(final BasicReturnBody body) + { + _body = body; + } + + + public BasicDeliverBody getDeliverBody() + { + return null; + } + + public BasicReturnBody getBounceBody() + { + return _body; + } + + public boolean isDeliverMessage() + { + return false; + } + } + + + } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index f70c1faa84..3dee0b0142 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -407,7 +407,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter */ public void propagateExceptionToWaiters(Exception e) { - getStateManager().error(e); + if (!_frameListeners.isEmpty()) { final Iterator it = _frameListeners.iterator(); @@ -439,78 +439,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody); - switch (bodyFrame.getFrameType()) - { - case AMQMethodBody.TYPE: - - if (debug) - { - _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame); - } - - final AMQMethodEvent evt = - new AMQMethodEvent(frame.getChannel(), (AMQMethodBody) bodyFrame); - - try - { - - boolean wasAnyoneInterested = getStateManager().methodReceived(evt); - if (!_frameListeners.isEmpty()) - { - Iterator it = _frameListeners.iterator(); - while (it.hasNext()) - { - final AMQMethodListener listener = (AMQMethodListener) it.next(); - wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; - } - } - - if (!wasAnyoneInterested) - { - throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" - + _frameListeners); - } - } - catch (AMQException e) - { - getStateManager().error(e); - if (!_frameListeners.isEmpty()) - { - Iterator it = _frameListeners.iterator(); - while (it.hasNext()) - { - final AMQMethodListener listener = (AMQMethodListener) it.next(); - listener.error(e); - } - } - - exceptionCaught(session, e); - } - - break; - - case ContentHeaderBody.TYPE: - - _protocolSession.messageContentHeaderReceived(frame.getChannel(), (ContentHeaderBody) bodyFrame); - break; + bodyFrame.handle(frame.getChannel(),_protocolSession); - case ContentBody.TYPE: - - _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame); - break; - - case HeartbeatBody.TYPE: - - if (debug) - { - _logger.debug("Received heartbeat"); - } - - break; - - default: - - } _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes()); } @@ -528,6 +458,55 @@ public class AMQProtocolHandler extends IoHandlerAdapter } } + public void methodBodyReceived(final int channelId, final AMQBody bodyFrame, IoSession session)//, final IoSession session) + throws AMQException + { + + if (_logger.isDebugEnabled()) + { + _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + bodyFrame); + } + + final AMQMethodEvent evt = + new AMQMethodEvent(channelId, (AMQMethodBody) bodyFrame); + + try + { + + boolean wasAnyoneInterested = getStateManager().methodReceived(evt); + if (!_frameListeners.isEmpty()) + { + Iterator it = _frameListeners.iterator(); + while (it.hasNext()) + { + final AMQMethodListener listener = (AMQMethodListener) it.next(); + wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; + } + } + + if (!wasAnyoneInterested) + { + throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + + _frameListeners); + } + } + catch (AMQException e) + { + if (!_frameListeners.isEmpty()) + { + Iterator it = _frameListeners.iterator(); + while (it.hasNext()) + { + final AMQMethodListener listener = (AMQMethodListener) it.next(); + listener.error(e); + } + } + + exceptionCaught(session, e); + } + + } + private static int _messagesOut; public void messageSent(IoSession session, Object message) throws Exception diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index b48adbdb08..6a5cc62bfc 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -74,8 +74,6 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession */ protected final AMQProtocolHandler _protocolHandler; - /** Maps from the channel id to the AMQSession that it represents. */ - protected ConcurrentMap _channelId2SessionMap = new ConcurrentHashMap(); protected ConcurrentMap _closingChannels = new ConcurrentHashMap(); @@ -83,7 +81,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession * Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives * first) with the subsequent content header and content bodies. */ - protected ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap(); + private final ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap(); + private final UnprocessedMessage[] _channelId2UnprocessedMsgArray = new UnprocessedMessage[16]; /** Counter to ensure unique queue names */ protected int _queueId = 1; @@ -101,7 +100,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession private MethodDispatcher _methodDispatcher; - private final AMQConnection _connection; + private final AMQConnection _connection; + private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection) { @@ -230,14 +230,24 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession * * @throws AMQException if this was not expected */ - public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException + public void unprocessedMessageReceived(final int channelId, UnprocessedMessage message) throws AMQException { - _channelId2UnprocessedMsgMap.put(message.getChannelId(), message); + if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) + { + _channelId2UnprocessedMsgArray[channelId] = message; + } + else + { + _channelId2UnprocessedMsgMap.put(channelId, message); + } } - public void messageContentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException + public void contentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException { - UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId); + final UnprocessedMessage msg = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0 ? _channelId2UnprocessedMsgArray[channelId] + : _channelId2UnprocessedMsgMap.get(channelId); + + if (msg == null) { throw new AMQException("Error: received content header without having received a BasicDeliver frame first"); @@ -256,9 +266,19 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession } } - public void messageContentBodyReceived(int channelId, ContentBody contentBody) throws AMQException + public void contentBodyReceived(final int channelId, ContentBody contentBody) throws AMQException { - UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId); + UnprocessedMessage msg; + final boolean fastAccess = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0; + if(fastAccess) + { + msg = _channelId2UnprocessedMsgArray[channelId]; + } + else + { + msg = _channelId2UnprocessedMsgMap.get(channelId); + } + if (msg == null) { throw new AMQException("Error: received content body without having received a JMSDeliver frame first"); @@ -266,7 +286,14 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession if (msg.getContentHeader() == null) { - _channelId2UnprocessedMsgMap.remove(channelId); + if(fastAccess) + { + _channelId2UnprocessedMsgArray[channelId] = null; + } + else + { + _channelId2UnprocessedMsgMap.remove(channelId); + } throw new AMQException("Error: received content body without having received a ContentHeader frame first"); } @@ -286,6 +313,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession } } + public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException + { + + } + /** * Deliver a message to the appropriate session, removing the unprocessed message from our map * @@ -296,7 +328,14 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { AMQSession session = getSession(channelId); session.messageReceived(msg); - _channelId2UnprocessedMsgMap.remove(channelId); + if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) + { + _channelId2UnprocessedMsgArray[channelId] = null; + } + else + { + _channelId2UnprocessedMsgMap.remove(channelId); + } } protected AMQSession getSession(int channelId) @@ -486,4 +525,15 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { _methodDispatcher = methodDispatcher; } + + public void setFlowControl(final int channelId, final boolean active) + { + final AMQSession session = getSession(channelId); + session.setFlowControl(active); + } + + public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException + { + _protocolHandler.methodBodyReceived(channel, amqMethodBody, _minaProtocolSession); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index e62fce0f60..2e6a4beb83 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -37,7 +37,7 @@ import java.util.concurrent.CopyOnWriteArraySet; * The state manager is responsible for managing the state of the protocol session.

For each AMQProtocolHandler * there is a separate state manager. */ -public class AMQStateManager implements AMQMethodListener +public class AMQStateManager { private static final Logger _logger = LoggerFactory.getLogger(AMQStateManager.class); @@ -52,7 +52,7 @@ public class AMQStateManager implements AMQMethodListener * AMQFrame. */ - private final CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet(); + private final Object _stateLock = new Object(); private static final long MAXIMUM_STATE_WAIT_TIME = Long.parseLong(System.getProperty("amqj.MaximumStateWait", "30000")); @@ -91,19 +91,6 @@ public class AMQStateManager implements AMQMethodListener } } - public void error(Exception e) - { - _logger.debug("State manager receive error notification: " + e); - synchronized (_stateListeners) - { - final Iterator it = _stateListeners.iterator(); - while (it.hasNext()) - { - final StateListener l = (StateListener) it.next(); - l.error(e); - } - } - } public boolean methodReceived(AMQMethodEvent evt) throws AMQException { diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 0ea04e5bc3..adbec6e35f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -85,7 +85,7 @@ public class TransportConnection throw new AMQNoTransportForProtocolException(details); } - if (transport == _currentInstance) + /* if (transport == _currentInstance) { if (transport == VM) { @@ -100,21 +100,23 @@ public class TransportConnection } } - _currentInstance = transport; + _currentInstance = transport;*/ + ITransportConnection instance; switch (transport) { case SOCKET: - _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() - { - public IoConnector newSocketConnector() - { - return new ExistingSocketConnector(); - } - }); + instance = + new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() + { + public IoConnector newSocketConnector() + { + return new ExistingSocketConnector(); + } + }); break; case TCP: - _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() + instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() { public IoConnector newSocketConnector() { @@ -142,12 +144,14 @@ public class TransportConnection break; case VM: { - _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker")); + instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker")); break; } + default: + throw new AMQNoTransportForProtocolException(details); } - return _instance; + return instance; } private static int getTransport(String transport) -- cgit v1.2.1 From a7901852d59190651225843843d013ac34e77b52 Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Thu, 21 Feb 2008 15:28:43 +0000 Subject: QPID-785: Make sure queue browser consumers are auto-close, add test for browsing an empty queue. Refactor QueueBrowserTest a little to split up responsibilities a bit. We should move the sendMessage stuff to a super class, there are at least 4 implementations of that kicking about. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@629824 13f79535-47bb-0310-9956-ffa450edef68 --- java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java | 1 + 1 file changed, 1 insertion(+) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java index 28e5992b26..4171e9bf9b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java @@ -88,6 +88,7 @@ public class AMQQueueBrowser implements QueueBrowser checkState(); final BasicMessageConsumer consumer = (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false); + consumer.closeWhenNoMessages(true); _consumers.add(consumer); return new Enumeration() -- cgit v1.2.1 From 90b7512b4814a0efd6fd5567d6d2a21c5c14ac0b Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Fri, 22 Feb 2008 16:15:11 +0000 Subject: QPID-790 : Performance Improvements git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@630239 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/fragmentation/TestLargePublisher.java | 2 +- .../src/old_test/java/org/apache/qpid/pubsub1/TestPublisher.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'java/client') diff --git a/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargePublisher.java b/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargePublisher.java index a246352d8b..2fe01fc126 100644 --- a/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargePublisher.java +++ b/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargePublisher.java @@ -183,7 +183,7 @@ public class TestLargePublisher } catch (UnknownHostException e) { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + e.printStackTrace(); } catch (AMQException e) { diff --git a/java/client/src/old_test/java/org/apache/qpid/pubsub1/TestPublisher.java b/java/client/src/old_test/java/org/apache/qpid/pubsub1/TestPublisher.java index 33891142b5..37b4ff1498 100644 --- a/java/client/src/old_test/java/org/apache/qpid/pubsub1/TestPublisher.java +++ b/java/client/src/old_test/java/org/apache/qpid/pubsub1/TestPublisher.java @@ -133,7 +133,7 @@ public class TestPublisher } catch (JMSException e) { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + e.printStackTrace(); } } @@ -163,7 +163,7 @@ public class TestPublisher } catch (UnknownHostException e) { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + e.printStackTrace(); } catch (AMQException e) { -- cgit v1.2.1 From ce3001f7df64a793e231efab13d3163d0df7be50 Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Fri, 22 Feb 2008 16:50:26 +0000 Subject: QPID-771: fix up exception handling a bit more, this is so lovely. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@630247 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/main/java/org/apache/qpid/client/AMQConnection.java | 2 +- .../org/apache/qpid/client/protocol/AMQProtocolHandler.java | 2 +- .../qpid/test/unit/client/connection/ConnectionTest.java | 12 ++++++------ 3 files changed, 8 insertions(+), 8 deletions(-) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index b60a8dfaad..85542830bf 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -438,7 +438,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect if (exceptions.size() > 0) { - JMSException e = exceptions.get(exceptions.size() - 1); + JMSException e = exceptions.get(0); int code = -1; try { diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 3dee0b0142..345a124a12 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -373,8 +373,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter AMQException amqe = new AMQException("Protocol handler error: " + cause, cause); propagateExceptionToWaiters(amqe); - _connection.exceptionReceived(cause); } + _connection.exceptionReceived(cause); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java index 7103397ad4..c97e2c4cb1 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java @@ -33,6 +33,7 @@ import org.apache.qpid.jms.Session; import junit.framework.TestCase; import javax.jms.Connection; +import javax.jms.JMSException; import javax.jms.QueueSession; import javax.jms.TopicSession; @@ -115,8 +116,8 @@ public class ConnectionTest extends TestCase } } - //fixme AMQAuthenticationException is not propogaged - public void PasswordFailureConnection() throws Exception + //See QPID-771 + public void testPasswordFailureConnection() throws Exception { try { @@ -125,10 +126,9 @@ public class ConnectionTest extends TestCase } catch (AMQException amqe) { - if (!(amqe instanceof AMQAuthenticationException)) - { - fail("Correct exception not thrown. Excpected 'AMQAuthenticationException' got: " + amqe); - } + assertEquals("Exception was wrong type", JMSException.class, amqe.getCause().getClass()); + Exception linked = ((JMSException) amqe.getCause()).getLinkedException(); + assertEquals("Exception was wrong type", AMQAuthenticationException.class, linked.getClass()); } } -- cgit v1.2.1 From 7b034ba79cd2a5bcff361c2b7d42655c646483cf Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 25 Feb 2008 03:33:58 +0000 Subject: QPID-807 : made methods public. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@630724 13f79535-47bb-0310-9956-ffa450edef68 --- java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 87c813982e..73f3ccbee4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -2383,7 +2383,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @todo Verify the destiation is valid or throw an exception. * @todo Be aware of possible changes to parameter order as versions change. */ - private AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) + public AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) throws AMQException { /*return new FailoverRetrySupport(*/ @@ -2451,7 +2451,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi return ++_nextProducerId; } - private AMQProtocolHandler getProtocolHandler() + public AMQProtocolHandler getProtocolHandler() { return _connection.getProtocolHandler(); } -- cgit v1.2.1 From eea3a20fa424a7735dfde645eab2bb1c2d0f8951 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 25 Feb 2008 03:36:55 +0000 Subject: QPID-808 : add method to qpid.jms Interface. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@630725 13f79535-47bb-0310-9956-ffa450edef68 --- java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java b/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java index b91fc2d960..b830c377b8 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java @@ -50,4 +50,8 @@ public interface MessageProducer extends javax.jms.MessageProducer void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, boolean mandatory, boolean immediate) throws JMSException; + + void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, + boolean mandatory, boolean immediate, boolean waitUntilSent) throws JMSException; + } -- cgit v1.2.1 From 57f32613463323267ff2e021dde83fc38be67e39 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 25 Feb 2008 03:52:54 +0000 Subject: QPID-809 : Added comments and additional error logging. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@630728 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/main/java/org/apache/qpid/client/failover/FailoverHandler.java | 3 +++ .../main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java | 1 + 2 files changed, 4 insertions(+) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java index 60f95bfe33..9911b75b80 100644 --- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java @@ -138,6 +138,9 @@ public class FailoverHandler implements Runnable _logger.info("Failover process veto-ed by client"); _amqProtocolHandler.setStateManager(existingStateManager); + + //todo: ritchiem these exceptions are useless... Would be better to attempt to propogate exception that + // prompted the failover event. if (_host != null) { _amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException( diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 345a124a12..5eb08c5e8d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -209,6 +209,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter } catch (RuntimeException e) { + _logger.warn(e.getMessage()); e.printStackTrace(); } -- cgit v1.2.1 From 74091e2211758b399aaaccd057b0845d805f9fe6 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 25 Feb 2008 03:58:25 +0000 Subject: QPID-107 added ACCESS_REFUSED as a reason to throw AMQAuthenticationException git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@630729 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java index 4d805cf123..bd16211880 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java @@ -73,7 +73,7 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener Date: Mon, 25 Feb 2008 03:59:58 +0000 Subject: QPID-809 : Added comments and improved logging msgs. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@630730 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java index bd16211880..fdcb493f38 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java @@ -75,8 +75,9 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener Date: Mon, 25 Feb 2008 12:59:03 +0000 Subject: QPID-811 : The RejectionEE is occuring because the task pool is shutdown before the close-ok has been received. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@630833 13f79535-47bb-0310-9956-ffa450edef68 --- java/client/src/main/java/org/apache/qpid/client/AMQConnection.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 85542830bf..6adb7f345c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -927,9 +927,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { long startCloseTime = System.currentTimeMillis(); - _taskPool.shutdown(); closeAllSessions(null, timeout, startCloseTime); + //This MUST occur after we have successfully closed all Channels/Sessions + _taskPool.shutdown(); + if (!_taskPool.isTerminated()) { try -- cgit v1.2.1 From 5a7d2f803b1a182559456c53d094ca00d5523503 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 25 Feb 2008 14:50:04 +0000 Subject: QPID-809 Updated logging and removed an unnecessary printStackTrace(). git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@630865 13f79535-47bb-0310-9956-ffa450edef68 --- java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 6 ------ 1 file changed, 6 deletions(-) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 73f3ccbee4..2ecc0515a5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -615,7 +615,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi final AMQFrame frame = body.generateFrame(getChannelId()); getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout); - // When control resumes at this point, a reply will have been received that // indicates the broker has closed the channel successfully. @@ -1759,11 +1758,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { JMSException ex = new JMSException("Error registering consumer: " + e); - if (_logger.isDebugEnabled()) - { - e.printStackTrace(); - } - ex.setLinkedException(e); throw ex; } -- cgit v1.2.1 From 0d3a8c857888a307ef52958be2da422bfd412a94 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Tue, 26 Feb 2008 09:15:31 +0000 Subject: QPID=813 : Synchronized getInstance git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@631141 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpid/client/transport/TransportConnection.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index adbec6e35f..18faade677 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -76,7 +76,7 @@ public class TransportConnection return _openSocketRegister.remove(socketID); } - public static ITransportConnection getInstance(BrokerDetails details) throws AMQTransportConnectionException + public static synchronized ITransportConnection getInstance(BrokerDetails details) throws AMQTransportConnectionException { int transport = getTransport(details.getTransport()); -- cgit v1.2.1 From af4f34d70b44b9fc8ebbb8230706f9e7b2a0ec38 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Thu, 28 Feb 2008 16:51:12 +0000 Subject: QPID-821 Set default to false for MultiIO and Buffer Limiting. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@632054 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java | 2 +- .../main/java/org/apache/qpid/client/transport/TransportConnection.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 5eb08c5e8d..0faacaabc1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -213,7 +213,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter e.printStackTrace(); } - if (!System.getProperties().containsKey("protectio") || Boolean.getBoolean("protectio")) + if (Boolean.getBoolean("protectio")) { try { diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 18faade677..78b54a8de9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -122,7 +122,7 @@ public class TransportConnection { SocketConnector result; // FIXME - this needs to be sorted to use the new Mina MultiThread SA. - if (!System.getProperties().containsKey("qpidnio") || Boolean.getBoolean("qpidnio")) + if (Boolean.getBoolean("qpidnio")) { _logger.warn("Using Qpid MultiThreaded NIO - " + (System.getProperties().containsKey("qpidnio") ? "Qpid NIO is new default" -- cgit v1.2.1 From 48248ab5bcab430eddaa887c6468ca7d1939c9d3 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Thu, 28 Feb 2008 17:38:20 +0000 Subject: QPID-822 Setting default to false for JMSXUserID cause this test to fail. Now test only validates the JMSXUserID if it exists. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@632070 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/test/unit/basic/PropertyValueTest.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'java/client') diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java index 8cceba6ffd..09886f4736 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java @@ -292,8 +292,11 @@ public class PropertyValueTest extends TestCase implements MessageListener ((AMQMessage) m).getPropertyHeaders().containsKey("void")); //JMSXUserID - Assert.assertEquals("Check 'JMSXUserID' is supported ", USERNAME, - m.getStringProperty("JMSXUserID")); + if (m.getStringProperty("JMSXUserID") != null) + { + Assert.assertEquals("Check 'JMSXUserID' is supported ", USERNAME, + m.getStringProperty("JMSXUserID")); + } } received.clear(); -- cgit v1.2.1 From e954a2147afe343f0cbd9c5522232eda88541594 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 3 Mar 2008 14:02:27 +0000 Subject: QPID-594 QPID-107 Updated Connection missing AMQAuthenticationException problems git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@633088 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/test/unit/client/connection/ConnectionTest.java | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'java/client') diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java index c97e2c4cb1..4d47d080b7 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java @@ -126,6 +126,12 @@ public class ConnectionTest extends TestCase } catch (AMQException amqe) { + if (amqe.getCause() instanceof Exception) + { + System.err.println("QPID-594 : WARNING RACE CONDITION. Unable to determine cause of Connection Failure."); + return; + } + assertEquals("Exception was wrong type", JMSException.class, amqe.getCause().getClass()); Exception linked = ((JMSException) amqe.getCause()).getLinkedException(); assertEquals("Exception was wrong type", AMQAuthenticationException.class, linked.getClass()); -- cgit v1.2.1 From 79af556e23ff02c660abee6ff9f1f0aa278b1d87 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 3 Mar 2008 14:54:59 +0000 Subject: QPID-594 QPID-107 Updated Connection missing AMQAuthenticationException problems git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@633111 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/test/unit/client/connection/ConnectionTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'java/client') diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java index 4d47d080b7..53d25c2cad 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java @@ -126,7 +126,7 @@ public class ConnectionTest extends TestCase } catch (AMQException amqe) { - if (amqe.getCause() instanceof Exception) + if (amqe.getCause().getClass() == Exception.class) { System.err.println("QPID-594 : WARNING RACE CONDITION. Unable to determine cause of Connection Failure."); return; -- cgit v1.2.1 From 0eadb79ee82aadfc6fa6d2b81aa483cf3f62d7d4 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 3 Mar 2008 16:28:36 +0000 Subject: QPID-784 : Minor changes based on code comments by ASkinner and RGodfrey. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@633160 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/main/java/org/apache/qpid/client/AMQBrokerDetails.java | 6 +++--- .../qpid/test/unit/client/connectionurl/ConnectionURLTest.java | 6 ++++-- 2 files changed, 7 insertions(+), 5 deletions(-) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java index 572ea48f85..f3e71d2035 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java @@ -57,9 +57,9 @@ public class AMQBrokerDetails implements BrokerDetails if (transport != null) { //todo this list of valid transports should be enumerated somewhere - if ((!(transport.equalsIgnoreCase("vm") || - transport.equalsIgnoreCase("tcp") || - transport.equalsIgnoreCase("socket")))) + if ((!(transport.equalsIgnoreCase(BrokerDetails.VM) || + transport.equalsIgnoreCase(BrokerDetails.TCP) || + transport.equalsIgnoreCase(BrokerDetails.SOCKET)))) { if (transport.equalsIgnoreCase("localhost")) { diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java index 978ce34d59..d90873a6a7 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java @@ -512,7 +512,7 @@ public class ConnectionURLTest extends TestCase public void testSocketProtocol() throws URLSyntaxException { - String url = "amqp://guest:guest@id/test" + "?brokerlist='socket:///'"; + String url = "amqp://guest:guest@id/test" + "?brokerlist='socket://VM-Unique-socketID'"; try { @@ -520,7 +520,9 @@ public class ConnectionURLTest extends TestCase assertNotNull(curl); assertEquals(1, curl.getBrokerCount()); assertNotNull(curl.getBrokerDetails(0)); - assertEquals("socket", curl.getBrokerDetails(0).getTransport()); + assertEquals(BrokerDetails.SOCKET, curl.getBrokerDetails(0).getTransport()); + assertEquals("VM-Unique-socketID", curl.getBrokerDetails(0).getHost()); + assertEquals("URL does not toString as expected", url, curl.toString()); } catch (URLSyntaxException e) { -- cgit v1.2.1 From 48fe77e5ba156f63ca7704757b298563ca77a8c1 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Fri, 7 Mar 2008 15:54:41 +0000 Subject: QPID-839 : DUPS_OK should behave exactly as AUTO_ACK ... this would not cause the seen test failure; but is still incorrect git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@634720 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/client/BasicMessageConsumer.java | 40 ++-------------------- 1 file changed, 3 insertions(+), 37 deletions(-) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index bf11572163..605e2d1e83 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -105,15 +105,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer */ private final int _acknowledgeMode; - /** Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode */ - private int _outstanding; - - /** - * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. Enabled when _outstannding - * number of msgs >= _prefetchHigh and disabled at < _prefetchLow - */ - private boolean _dups_ok_acknowledge_send; - private ConcurrentLinkedQueue _unacknowledgedDeliveryTags = new ConcurrentLinkedQueue(); /** List of tags delievered, The last of which which should be acknowledged on commit in transaction mode. */ @@ -253,9 +244,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer switch (_acknowledgeMode) { - case Session.DUPS_OK_ACKNOWLEDGE: - _receivedDeliveryTags.add(msg.getDeliveryTag()); - break; case Session.CLIENT_ACKNOWLEDGE: _unacknowledgedDeliveryTags.add(msg.getDeliveryTag()); @@ -282,8 +270,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer * * @return boolean if the acquisition was successful * - * @throws JMSException - * @throws InterruptedException + * @throws JMSException if a listener has already been set or another thread is receiving + * @throws InterruptedException if interrupted */ private boolean acquireReceiving(boolean immediate) throws JMSException, InterruptedException { @@ -505,7 +493,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer * We can get back either a Message or an exception from the queue. This method examines the argument and deals with * it by throwing it (if an exception) or returning it (in any other case). * - * @param o + * @param o the object to return or throw * * @return a message only if o is a Message * @@ -772,28 +760,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer break; case Session.DUPS_OK_ACKNOWLEDGE: - /*( if (++_outstanding >= _prefetchHigh) - { - _dups_ok_acknowledge_send = true; - } - - //Can't use <= as _prefetchHigh may equal _prefetchLow so no acking would occur. - if (_outstanding < _prefetchLow) - { - _dups_ok_acknowledge_send = false; - } - - if (_dups_ok_acknowledge_send) - { - if (!_session.isInRecovery()) - { - _session.acknowledgeMessage(msg.getDeliveryTag(), true); - _outstanding = 0; - } - } - - break; - */ case Session.AUTO_ACKNOWLEDGE: // we do not auto ack a message if the application code called recover() if (!_session.isInRecovery()) -- cgit v1.2.1 From 6aa10cc1aeb0ffbc6b02bf662b93eab879c517d7 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 10 Mar 2008 17:16:09 +0000 Subject: QPID-107 : Changes based on code review. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@635602 13f79535-47bb-0310-9956-ffa450edef68 --- java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 4 ++-- .../main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 2ecc0515a5..0a90846b94 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -2377,7 +2377,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @todo Verify the destiation is valid or throw an exception. * @todo Be aware of possible changes to parameter order as versions change. */ - public AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) + private AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) throws AMQException { /*return new FailoverRetrySupport(*/ @@ -2445,7 +2445,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi return ++_nextProducerId; } - public AMQProtocolHandler getProtocolHandler() + private AMQProtocolHandler getProtocolHandler() { return _connection.getProtocolHandler(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 0faacaabc1..89982a1af0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -209,8 +209,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter } catch (RuntimeException e) { - _logger.warn(e.getMessage()); - e.printStackTrace(); + _logger.error(e.getMessage(), e); } if (Boolean.getBoolean("protectio")) -- cgit v1.2.1 From cf71efa7c437971cf68e3a3a7e4fcdacbaf6a93e Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Wed, 12 Mar 2008 10:40:01 +0000 Subject: QPID-845 : Reinserted ':' to JMSMessageID. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@636273 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpid/client/message/AbstractJMSMessage.java | 2 +- .../test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 1947a18653..83ca82df5c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -121,7 +121,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach if (getContentHeaderProperties().getMessageIdAsString() == null) { StringBuilder b = new StringBuilder(39); - b.append("ID"); + b.append("ID:"); b.append(UUID.randomUUID()); getContentHeaderProperties().setMessageId(b.toString()); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java index 3012909daa..86f6f13156 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java @@ -101,6 +101,7 @@ public class JMSPropertiesTest extends TestCase // assertEquals("JMS Delivery Mode mismatch",sentMsg.getJMSDeliveryMode(),rm.getJMSDeliveryMode()); assertEquals("JMS Type mismatch", sentMsg.getJMSType(), rm.getJMSType()); assertEquals("JMS Reply To mismatch", sentMsg.getJMSReplyTo(), rm.getJMSReplyTo()); + assertTrue("JMSMessageID Does not start ID:", rm.getJMSMessageID().startsWith("ID:")); con.close(); } -- cgit v1.2.1 From 90ddc20dc80b95f3d5dacb20418b08138cff88a1 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Wed, 12 Mar 2008 14:44:40 +0000 Subject: QPID-846 : Update to prevent JMSX* values being masked, updated JMSPropertiesTest to prevent regression. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@636347 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/client/message/AbstractJMSMessage.java | 11 +++----- .../qpid/test/unit/message/JMSPropertiesTest.java | 30 ++++++++++++++++++++++ 2 files changed, 33 insertions(+), 8 deletions(-) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 83ca82df5c..6d4c61fb29 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -394,15 +394,10 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public String getStringProperty(String propertyName) throws JMSException { - if (propertyName.startsWith("JMSX")) + //NOTE: if the JMSX Property is a non AMQP property then we must check _strictAMQP and throw as below. + if (propertyName.equals(CustomJMSXProperty.JMSXUserID.toString())) { - //NOTE: if the JMSX Property is a non AMQP property then we must check _strictAMQP and throw as below. - if (propertyName.equals(CustomJMSXProperty.JMSXUserID.toString())) - { - return ((BasicContentHeaderProperties) _contentHeaderProperties).getUserIdAsString(); - } - - return null; + return ((BasicContentHeaderProperties) _contentHeaderProperties).getUserIdAsString(); } else { diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java index 86f6f13156..f2655adc98 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java @@ -38,6 +38,7 @@ import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Queue; import javax.jms.Session; +import java.util.Enumeration; /** * @author Apache Software Foundation @@ -85,6 +86,12 @@ public class JMSPropertiesTest extends TestCase sentMsg.setJMSType(JMS_TYPE); sentMsg.setJMSReplyTo(JMS_REPLY_TO); + String JMSXGroupID_VALUE = "group"; + sentMsg.setStringProperty("JMSXGroupID", JMSXGroupID_VALUE); + + int JMSXGroupSeq_VALUE = 1; + sentMsg.setIntProperty("JMSXGroupSeq", JMSXGroupSeq_VALUE); + // send it producer.send(sentMsg); @@ -103,6 +110,29 @@ public class JMSPropertiesTest extends TestCase assertEquals("JMS Reply To mismatch", sentMsg.getJMSReplyTo(), rm.getJMSReplyTo()); assertTrue("JMSMessageID Does not start ID:", rm.getJMSMessageID().startsWith("ID:")); + //Validate that the JMSX values are correct + assertEquals("JMSXGroupID is not as expected:", JMSXGroupID_VALUE, rm.getStringProperty("JMSXGroupID")); + assertEquals("JMSXGroupSeq is not as expected:", JMSXGroupSeq_VALUE, rm.getIntProperty("JMSXGroupSeq")); + + boolean JMSXGroupID_Available = false; + boolean JMSXGroupSeq_Available = false; + Enumeration props = con.getMetaData().getJMSXPropertyNames(); + while (props.hasMoreElements()) + { + String name = (String) props.nextElement(); + if (name.equals("JMSXGroupID")) + { + JMSXGroupID_Available = true; + } + if (name.equals("JMSXGroupSeq")) + { + JMSXGroupSeq_Available = true; + } + } + + assertTrue("JMSXGroupID not available.",JMSXGroupID_Available); + assertTrue("JMSXGroupSeq not available.",JMSXGroupSeq_Available); + con.close(); } -- cgit v1.2.1 From 20cf766ec6465c52c56984780256791d97f481ac Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Fri, 14 Mar 2008 10:46:40 +0000 Subject: QPID-592 : Parameterised the Read/Write buffer limits. On the broker extra config [read|write]BufferLimitSize on the client System properties qpid.[read|write].buffer.limit. All the defaults are 256k(262144). git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@637047 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/client/protocol/AMQProtocolHandler.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 89982a1af0..7909663f24 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -30,7 +30,6 @@ import org.apache.mina.filter.WriteBufferLimitFilterBuilder; import org.apache.mina.filter.codec.ProtocolCodecException; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.executor.ExecutorFilter; -import org.apache.mina.transport.socket.nio.SocketSessionConfig; import org.apache.qpid.AMQConnectionClosedException; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; @@ -156,6 +155,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter /** Defines the default timeout to use for synchronous protocol commands. */ private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30; + /** Default buffer size for pending messages reads */ + private static final String DEFAULT_READ_BUFFER_LIMIT = "262144"; + + /** Default buffer size for pending messages writes */ + private static final String DEFAULT_WRITE_BUFFER_LIMIT = "262144"; + /** * Creates a new protocol handler, associated with the specified client connection instance. * @@ -219,19 +224,14 @@ public class AMQProtocolHandler extends IoHandlerAdapter //Add IO Protection Filters IoFilterChain chain = session.getFilterChain(); - int buf_size = 32768; - if (session.getConfig() instanceof SocketSessionConfig) - { - buf_size = ((SocketSessionConfig) session.getConfig()).getReceiveBufferSize(); - } session.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter()); ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); - readfilter.setMaximumConnectionBufferSize(buf_size); + readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER_LIMIT))); readfilter.attach(chain); WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); - writefilter.setMaximumConnectionBufferSize(buf_size * 2); + writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.write.buffer.limit", DEFAULT_WRITE_BUFFER_LIMIT))); writefilter.attach(chain); session.getFilterChain().remove("tempExecutorFilterForFilterBuilder"); -- cgit v1.2.1 From 92a03fa08a503afb6af1988862393a86813d8482 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Fri, 14 Mar 2008 12:45:59 +0000 Subject: QPID-853 : Use the result of processMethod (match) to decide if we have the right frame. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@637084 13f79535-47bb-0310-9956-ffa450edef68 --- java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 0a90846b94..9f7f53a011 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -2294,9 +2294,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public boolean processMethod(int channelId, AMQMethodBody frame) //throws AMQException { boolean matches = super.processMethod(channelId, frame); - QueueDeclareOkBody declareOk = (QueueDeclareOkBody) frame; - _messageCount = declareOk.getMessageCount(); - _consumerCount = declareOk.getConsumerCount(); + if (matches) + { + QueueDeclareOkBody declareOk = (QueueDeclareOkBody) frame; + _messageCount = declareOk.getMessageCount(); + _consumerCount = declareOk.getConsumerCount(); + } return matches; } -- cgit v1.2.1 From 21d5aa6534515bc7c1f343b5a4b579fe9513b0ce Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Fri, 14 Mar 2008 12:57:42 +0000 Subject: QPID-854 : Changes to the client to make the dispatcher responsible for closing the queue browser when all the messages have been processed. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@637086 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/client/AMQQueueBrowser.java | 51 +++--- .../java/org/apache/qpid/client/AMQSession.java | 45 ++++-- .../apache/qpid/client/BasicMessageConsumer.java | 157 +++++++++---------- .../client/handler/ChannelCloseMethodHandler.java | 5 + .../qpid/client/message/UnprocessedMessage.java | 174 ++++++++++++++++++--- 5 files changed, 297 insertions(+), 135 deletions(-) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java index 4171e9bf9b..a3cf39003d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java @@ -22,6 +22,7 @@ package org.apache.qpid.client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.AMQException; import javax.jms.IllegalStateException; import javax.jms.JMSException; @@ -50,7 +51,9 @@ public class AMQQueueBrowser implements QueueBrowser _messageSelector = ((messageSelector == null) || (messageSelector.trim().length() == 0)) ? null : messageSelector; // Create Consumer to verify message selector. BasicMessageConsumer consumer = - (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false); + (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false); + // Close this consumer as we are not looking to consume only to establish that, at least for now, + // the QB can be created consumer.close(); } @@ -88,40 +91,40 @@ public class AMQQueueBrowser implements QueueBrowser checkState(); final BasicMessageConsumer consumer = (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false); - consumer.closeWhenNoMessages(true); + _consumers.add(consumer); return new Enumeration() + { + + Message _nextMessage = consumer == null ? null : consumer.receive(); + + public boolean hasMoreElements() { + _logger.info("QB:hasMoreElements:" + (_nextMessage != null)); - Message _nextMessage = consumer.receive(); + return (_nextMessage != null); + } - public boolean hasMoreElements() + public Object nextElement() + { + Message msg = _nextMessage; + try { - _logger.info("QB:hasMoreElements:" + (_nextMessage != null)); + _logger.info("QB:nextElement about to receive"); - return (_nextMessage != null); + _nextMessage = consumer.receive(); + _logger.info("QB:nextElement received:" + _nextMessage); } - - public Object nextElement() + catch (JMSException e) { - Message msg = _nextMessage; - try - { - _logger.info("QB:nextElement about to receive"); - - _nextMessage = consumer.receive(); - _logger.info("QB:nextElement received:" + _nextMessage); - } - catch (JMSException e) - { - _logger.warn("Exception caught while queue browsing", e); - _nextMessage = null; - } - - return msg; + _logger.warn("Exception caught while queue browsing", e); + _nextMessage = null; } - }; + + return msg; + } + }; } public void close() throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 9f7f53a011..0a51ec7c47 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -648,6 +648,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public void closed(Throwable e) throws JMSException { + // This method needs to be improved. Throwables only arrive here from the mina : exceptionRecived + // calls through connection.closeAllSessions which is also called by the public connection.close() + // with a null cause + // When we are closing the Session due to a protocol session error we simply create a new AMQException + // with the correct error code and text this is cleary WRONG as the instanceof check below will fail. + // We need to determin here if the connection should be + synchronized (_connection.getFailoverMutex()) { if (e instanceof AMQDisconnectedException) @@ -763,13 +770,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi BasicMessageConsumer consumer = _consumers.get(consumerTag.toIntValue()); if (consumer != null) { - // fixme this isn't right.. needs to check if _queue contains data for this consumer - if (consumer.isAutoClose()) // && _queue.isEmpty()) - { - consumer.closeWhenNoMessages(true); - } - - if (!consumer.isNoConsume()) + if (!consumer.isNoConsume()) // Normal Consumer { // Clean the Maps up first // Flush any pending messages for this consumerTag @@ -785,7 +786,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _dispatcher.rejectPending(consumer); } - else + else // Queue Browser { // Just close the consumer // fixme the CancelOK is being processed before the arriving messages.. @@ -793,13 +794,28 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // has yet to receive before the close comes in. // consumer.markClosed(); + + + + if (consumer.isAutoClose()) + { // There is a small window where the message is between the two queues in the dispatcher. + if (consumer.isClosed()) + { + if (_logger.isInfoEnabled()) + { + _logger.info("Closing consumer:" + consumer.debugIdentity()); + } + + deregisterConsumer(consumer); + + } + else + { + _queue.add(new UnprocessedMessage.CloseConsumerMessage(consumer)); + } + } } } - else - { - _logger.warn("Unable to confirm cancellation of consumer (" + consumerTag + "). Not found in consumer map."); - } - } public QueueBrowser createBrowser(Queue queue) throws JMSException @@ -2934,7 +2950,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _lock.wait(2000); } - if (message.getDeliverBody().getDeliveryTag() <= _rollbackMark.get()) + if (!(message instanceof UnprocessedMessage.CloseConsumerMessage) + && (message.getDeliverBody().getDeliveryTag() <= _rollbackMark.get())) { rejectMessage(message, true); } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 605e2d1e83..efbce6033b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -38,6 +38,7 @@ import javax.jms.MessageListener; import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; @@ -121,7 +122,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer * on the queue. This is used for queue browsing. */ private final boolean _autoClose; - private boolean _closeWhenNoMessages; private final boolean _noConsume; private List _closedStack = null; @@ -358,7 +358,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } catch (InterruptedException e) { - _logger.warn("Interrupted: " + e); + _logger.warn("Interrupted acquire: " + e); if (isClosed()) { return null; @@ -369,11 +369,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer try { - if (closeOnAutoClose()) - { - return null; - } - Object o = null; if (l > 0) { @@ -386,7 +381,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } catch (InterruptedException e) { - _logger.warn("Interrupted: " + e); + _logger.warn("Interrupted poll: " + e); if (isClosed()) { return null; @@ -404,7 +399,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } catch (InterruptedException e) { - _logger.warn("Interrupted: " + e); + _logger.warn("Interrupted take: " + e); if (isClosed()) { return null; @@ -426,20 +421,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - private boolean closeOnAutoClose() throws JMSException - { - if (isAutoClose() && _closeWhenNoMessages && _synchronousQueue.isEmpty()) - { - close(false); - - return true; - } - else - { - return false; - } - } - public Message receiveNoWait() throws JMSException { checkPreConditions(); @@ -468,11 +449,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer try { - if (closeOnAutoClose()) - { - return null; - } - Object o = _synchronousQueue.poll(); final AbstractJMSMessage m = returnMessageOrThrow(o); if (m != null) @@ -513,6 +489,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer throw e; } + else if (o instanceof UnprocessedMessage.CloseConsumerMessage) + { + _closed.set(true); + deregisterConsumer(); + return null; + } else { return (AbstractJMSMessage) o; @@ -526,31 +508,30 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public void close(boolean sendClose) throws JMSException { - // synchronized (_closed) - if (_logger.isInfoEnabled()) { _logger.info("Closing consumer:" + debugIdentity()); } - synchronized (_connection.getFailoverMutex()) + if (!_closed.getAndSet(true)) { - if (!_closed.getAndSet(true)) + if (_logger.isDebugEnabled()) { - if (_logger.isDebugEnabled()) + StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); + if (_closedStack != null) { - StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); - if (_closedStack != null) - { - _logger.debug(_consumerTag + " previously:" + _closedStack.toString()); - } - else - { - _closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1); - } + _logger.debug(_consumerTag + " previously:" + _closedStack.toString()); } + else + { + _closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1); + } + } - if (sendClose) + if (sendClose) + { + // The Synchronized block only needs to protect network traffic. + synchronized (_connection.getFailoverMutex()) { BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(_consumerTag, false); @@ -564,7 +545,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _logger.debug("CancelOk'd for consumer:" + debugIdentity()); } - } catch (AMQException e) { @@ -575,24 +555,26 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer throw new JMSAMQException("FailoverException interrupted basic cancel.", e); } } - else - { - // //fixme this probably is not right - // if (!isNoConsume()) - { // done in BasicCancelOK Handler but not sending one so just deregister. - deregisterConsumer(); - } + } + else + { + // //fixme this probably is not right + // if (!isNoConsume()) + { // done in BasicCancelOK Handler but not sending one so just deregister. + deregisterConsumer(); } + } - if ((_messageListener != null) && _receiving.get()) + // This will occur if session.close is called closing all consumers we may be blocked waiting for a receive + // so we need to let it know it is time to close. + if ((_messageListener != null) && _receiving.get()) + { + if (_logger.isInfoEnabled()) { - if (_logger.isInfoEnabled()) - { - _logger.info("Interrupting thread: " + _receivingThread); - } - - _receivingThread.interrupt(); + _logger.info("Interrupting thread: " + _receivingThread); } + + _receivingThread.interrupt(); } } } @@ -634,6 +616,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer */ void notifyMessage(UnprocessedMessage messageFrame) { + if (messageFrame instanceof UnprocessedMessage.CloseConsumerMessage) + { + notifyCloseMessage((UnprocessedMessage.CloseConsumerMessage) messageFrame); + return; + } + final boolean debug = _logger.isDebugEnabled(); if (debug) @@ -646,12 +634,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer final BasicDeliverBody deliverBody = messageFrame.getDeliverBody(); AbstractJMSMessage jmsMessage = - _messageFactory.createMessage(deliverBody.getDeliveryTag(), - deliverBody.getRedelivered(), - deliverBody.getExchange(), - deliverBody.getRoutingKey(), - messageFrame.getContentHeader(), - messageFrame.getBodies()); + _messageFactory.createMessage(deliverBody.getDeliveryTag(), + deliverBody.getRedelivered(), + deliverBody.getExchange(), + deliverBody.getRoutingKey(), + messageFrame.getContentHeader(), + messageFrame.getBodies()); if (debug) { @@ -688,9 +676,32 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - /** - * @param jmsMessage this message has already been processed so can't redo preDeliver - */ + /** @param closeMessage this message signals that we should close the browser */ + public void notifyCloseMessage(UnprocessedMessage.CloseConsumerMessage closeMessage) + { + if (isMessageListenerSet()) + { + // Currently only possible to get this msg type with a browser. + // If we get the message here then we should probably just close this consumer. + // Though an AutoClose consumer with message listener is quite odd... + // Just log out the fact so we know where we are + _logger.warn("Using an AutoCloseconsumer with message listener is not supported."); + } + else + { + try + { + _synchronousQueue.put(closeMessage); + } + catch (InterruptedException e) + { + _logger.info(" SynchronousQueue.put interupted. Usually result of connection closing," + + "but we shouldn't have close yet"); + } + } + } + + /** @param jmsMessage this message has already been processed so can't redo preDeliver */ public void notifyMessage(AbstractJMSMessage jmsMessage) { try @@ -913,18 +924,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer return _noConsume; } - public void closeWhenNoMessages(boolean b) - { - _closeWhenNoMessages = b; - - if (_closeWhenNoMessages && _synchronousQueue.isEmpty() && _receiving.get() && (_messageListener != null)) - { - _closed.set(true); - _receivingThread.interrupt(); - } - - } - public void rollback() { clearUnackedMessages(); diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java index 8c8814e9b7..a580a6466d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java @@ -104,6 +104,11 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener Date: Fri, 14 Mar 2008 16:29:08 +0000 Subject: QPID-848: Add short sleep when a connection fails to complete so that we wait for all exceptions to be thrown git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@637146 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/main/java/org/apache/qpid/client/AMQConnection.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 6adb7f345c..2d4d2be8fb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -435,7 +435,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect if (!_connected) { String message = null; - + try + { + Thread.sleep(150); + } + catch (InterruptedException e) + { + // Eat it, we've hopefully got all the exceptions if this happened + } if (exceptions.size() > 0) { JMSException e = exceptions.get(0); -- cgit v1.2.1 From 8d4c9d65ff592a7605dc22d4c696f745417d562e Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Fri, 14 Mar 2008 17:31:44 +0000 Subject: QPID-855 : Added check for the IllegalStateException and ignore ones from the Noop Operation. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@637170 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/failover/FailoverRetrySupport.java | 7 +++++++ 1 file changed, 7 insertions(+) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java index 120a07f0fc..e756d7baf9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java @@ -122,6 +122,13 @@ public class FailoverRetrySupport implements FailoverSup { _log.debug("Failover exception caught during operation: " + e, e); } + catch (IllegalStateException e) + { + if (!(e.getMessage().startsWith("Fail-over interupted no-op failover support"))) + { + throw e; + } + } } } } -- cgit v1.2.1 From f2235c6a4674c2be1698374ea13086adaaf0fbf1 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 17 Mar 2008 10:36:13 +0000 Subject: QPID-856 : Moved unbind out of the synchronized block as it doesn't need to be done there. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@637828 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/client/transport/TransportConnection.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 78b54a8de9..2a5365c322 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -332,16 +332,21 @@ public class TransportConnection public static void killVMBroker(int port) { + VmPipeAddress pipe; synchronized (_inVmPipeAddress) { - VmPipeAddress pipe = (VmPipeAddress) _inVmPipeAddress.get(port); + pipe = (VmPipeAddress) _inVmPipeAddress.get(port); if (pipe != null) { _logger.info("Killing VM Broker:" + port); _inVmPipeAddress.remove(port); - _acceptor.unbind(pipe); } } + //This doesn't need to be sychronized + if (pipe != null) + { + _acceptor.unbind(pipe); + } } } -- cgit v1.2.1 From 9be19ae547c592691ac4f3f085d8ffcdc23c4390 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 17 Mar 2008 17:22:23 +0000 Subject: QPID-847 : InvalidSelectorException. This was caused by the Broker now being more AMQP spec compliant than previously. Reverted the change in AMQMinaProtocolSession.java that is causing the issue but we need to correctly fix this issue in the client as the client is not AMQP spec compliant, even with the STRICT_AMQP flag. Updated SelectorTest.java with an additional test so we don't have the functionality reversion later. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@637977 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/test/unit/basic/SelectorTest.java | 188 +++++++++++++++++++-- 1 file changed, 175 insertions(+), 13 deletions(-) (limited to 'java/client') diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java index e0bfc5d498..d05ed7ca73 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java @@ -21,18 +21,20 @@ package org.apache.qpid.test.unit.basic; import junit.framework.TestCase; - +import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.BasicMessageProducer; import org.apache.qpid.client.transport.TransportConnection; - +import org.apache.qpid.url.URLSyntaxException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.jms.Connection; import javax.jms.DeliveryMode; +import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; @@ -46,12 +48,12 @@ public class SelectorTest extends TestCase implements MessageListener private AMQSession _session; private int count; public String _connectionString = "vm://:1"; + private static final String INVALID_SELECTOR = "Cost LIKE 5"; protected void setUp() throws Exception { super.setUp(); TransportConnection.createVMBroker(1); - init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test")); } protected void tearDown() throws Exception @@ -60,19 +62,19 @@ public class SelectorTest extends TestCase implements MessageListener TransportConnection.killAllVMBrokers(); } - private void init(AMQConnection connection) throws Exception + private void init(AMQConnection connection) throws JMSException { init(connection, new AMQQueue(connection, randomize("SessionStartTest"), true)); } - private void init(AMQConnection connection, AMQDestination destination) throws Exception + private void init(AMQConnection connection, AMQDestination destination) throws JMSException { _connection = connection; _destination = destination; connection.start(); String selector = null; - selector = "Cost = 2 AND \"property-with-hyphen\" = 'wibble'"; + selector = "Cost = 2 AND \"property-with-hyphen\" = 'wibble'"; // selector = "JMSType = Special AND Cost = 2 AND AMQMessageID > 0 AND JMSDeliveryMode=" + DeliveryMode.NON_PERSISTENT; _session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); @@ -80,14 +82,17 @@ public class SelectorTest extends TestCase implements MessageListener _session.createConsumer(destination, selector).setMessageListener(this); } - public synchronized void test() throws JMSException, InterruptedException + public synchronized void test() throws JMSException, InterruptedException, URLSyntaxException, AMQException { try { + + init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test")); + Message msg = _session.createTextMessage("Message"); msg.setJMSPriority(1); msg.setIntProperty("Cost", 2); - msg.setStringProperty("property-with-hyphen","wibble"); + msg.setStringProperty("property-with-hyphen", "wibble"); msg.setJMSType("Special"); _logger.info("Sending Message:" + msg); @@ -107,10 +112,147 @@ public class SelectorTest extends TestCase implements MessageListener // throw new RuntimeException("Did not get message!"); } } + catch (JMSException e) + { + _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage()); + if (!(e instanceof InvalidSelectorException)) + { + fail("Wrong exception:" + e.getMessage()); + } + else + { + System.out.println("SUCCESS!!"); + } + } + catch (InterruptedException e) + { + _logger.debug("IE :" + e.getClass().getSimpleName() + ":" + e.getMessage()); + } + catch (URLSyntaxException e) + { + _logger.debug("URL:" + e.getClass().getSimpleName() + ":" + e.getMessage()); + fail("Wrong exception"); + } + catch (AMQException e) + { + _logger.debug("AMQ:" + e.getClass().getSimpleName() + ":" + e.getMessage()); + fail("Wrong exception"); + } + + finally + { + if (_session != null) + { + _session.close(); + } + if (_connection != null) + { + _connection.close(); + } + } + } + + + public void testInvalidSelectors() + { + Connection connection = null; + + try + { + connection = new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"); + _session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); + } + catch (JMSException e) + { + fail(e.getMessage()); + } + catch (AMQException e) + { + fail(e.getMessage()); + } + catch (URLSyntaxException e) + { + fail("Error:" + e.getMessage()); + } + + //Try Creating a Browser + try + { + _session.createBrowser(_session.createQueue("Ping"), INVALID_SELECTOR); + } + catch (JMSException e) + { + _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage()); + if (!(e instanceof InvalidSelectorException)) + { + fail("Wrong exception:" + e.getMessage()); + } + else + { + _logger.debug("SUCCESS!!"); + } + } + + //Try Creating a Consumer + try + { + _session.createConsumer(_session.createQueue("Ping"), INVALID_SELECTOR); + } + catch (JMSException e) + { + _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage()); + if (!(e instanceof InvalidSelectorException)) + { + fail("Wrong exception:" + e.getMessage()); + } + else + { + _logger.debug("SUCCESS!!"); + } + } + + //Try Creating a Receiever + try + { + _session.createReceiver(_session.createQueue("Ping"), INVALID_SELECTOR); + } + catch (JMSException e) + { + _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage()); + if (!(e instanceof InvalidSelectorException)) + { + fail("Wrong exception:" + e.getMessage()); + } + else + { + _logger.debug("SUCCESS!!"); + } + } + finally { - _session.close(); - _connection.close(); + if (_session != null) + { + try + { + _session.close(); + } + catch (JMSException e) + { + fail("Error cleaning up:" + e.getMessage()); + } + } + if (_connection != null) + { + try + { + _connection.close(); + } + catch (JMSException e) + { + fail("Error cleaning up:" + e.getMessage()); + } + } } } @@ -129,9 +271,29 @@ public class SelectorTest extends TestCase implements MessageListener public static void main(String[] argv) throws Exception { SelectorTest test = new SelectorTest(); - test._connectionString = (argv.length == 0) ? "localhost:5672" : argv[0]; - test.setUp(); - test.test(); + test._connectionString = (argv.length == 0) ? "localhost:3000" : argv[0]; + + try + { + while (true) + { + if (test._connectionString.contains("vm://:1")) + { + test.setUp(); + } + test.test(); + + if (test._connectionString.contains("vm://:1")) + { + test.tearDown(); + } + } + } + catch (Exception e) + { + System.err.println(e.getMessage()); + e.printStackTrace(); + } } public static junit.framework.Test suite() -- cgit v1.2.1 From c10241126321f173df7a3c77fb5c049c298029f8 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 17 Mar 2008 17:40:14 +0000 Subject: QPID-849 : Client Deadlock, there are various points where we take the failover mutex to check the _closed values which if it is false then that is all we do. As the _closed check doesn't require the mutex then move all the checks out side of the mutex lock. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@637986 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQConnection.java | 113 +++++++++++---------- .../java/org/apache/qpid/client/AMQSession.java | 58 ++++++----- 2 files changed, 88 insertions(+), 83 deletions(-) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 2d4d2be8fb..7a54617bf1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -656,71 +656,71 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetchHigh, final int prefetchLow) throws JMSException { - synchronized(_sessionCreationLock) + synchronized (_sessionCreationLock) { - checkNotClosed(); + checkNotClosed(); - if (channelLimitReached()) - { - throw new ChannelLimitReachedException(_maximumChannelCount); - } + if (channelLimitReached()) + { + throw new ChannelLimitReachedException(_maximumChannelCount); + } - return new FailoverRetrySupport( - new FailoverProtectedOperation() - { - public org.apache.qpid.jms.Session execute() throws JMSException, FailoverException + return new FailoverRetrySupport( + new FailoverProtectedOperation() { - int channelId = _idFactory.incrementAndGet(); - - if (_logger.isDebugEnabled()) + public org.apache.qpid.jms.Session execute() throws JMSException, FailoverException { - _logger.debug("Write channel open frame for channel id " + channelId); - } - - // We must create the session and register it before actually sending the frame to the server to - // open it, so that there is no window where we could receive data on the channel and not be set - // up to handle it appropriately. - AMQSession session = - new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh, - prefetchLow); - // _protocolHandler.addSessionByChannel(channelId, session); - registerSession(channelId, session); + int channelId = _idFactory.incrementAndGet(); - boolean success = false; - try - { - createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted); - success = true; - } - catch (AMQException e) - { - JMSException jmse = new JMSException("Error creating session: " + e); - jmse.setLinkedException(e); - throw jmse; - } - finally - { - if (!success) + if (_logger.isDebugEnabled()) { - deregisterSession(channelId); + _logger.debug("Write channel open frame for channel id " + channelId); } - } - if (_started) - { + // We must create the session and register it before actually sending the frame to the server to + // open it, so that there is no window where we could receive data on the channel and not be set + // up to handle it appropriately. + AMQSession session = + new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh, + prefetchLow); + // _protocolHandler.addSessionByChannel(channelId, session); + registerSession(channelId, session); + + boolean success = false; try { - session.start(); + createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted); + success = true; } catch (AMQException e) { - throw new JMSAMQException(e); + JMSException jmse = new JMSException("Error creating session: " + e); + jmse.setLinkedException(e); + throw jmse; + } + finally + { + if (!success) + { + deregisterSession(channelId); + } } - } - return session; - } - }, this).execute(); + if (_started) + { + try + { + session.start(); + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } + } + + return session; + } + }, this).execute(); } } @@ -732,13 +732,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // TODO: Be aware of possible changes to parameter order as versions change. - _protocolHandler.syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class); + _protocolHandler.syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class); - BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(0,prefetchHigh,false); + BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(0, prefetchHigh, false); // todo send low water mark when protocol allows. // todo Be aware of possible changes to parameter order as versions change. - _protocolHandler.syncWrite(basicQosBody.generateFrame(channelId),BasicQosOkBody.class); + _protocolHandler.syncWrite(basicQosBody.generateFrame(channelId), BasicQosOkBody.class); if (transacted) { @@ -925,14 +925,15 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } else - { - synchronized (getFailoverMutex()) { if (!_closed.getAndSet(true)) { - try + + synchronized (getFailoverMutex()) { - long startCloseTime = System.currentTimeMillis(); + try + { + long startCloseTime = System.currentTimeMillis(); closeAllSessions(null, timeout, startCloseTime); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 0a51ec7c47..c3219e6564 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -594,14 +594,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1)); } - synchronized (_connection.getFailoverMutex()) + // Ensure we only try and close an open session. + if (!_closed.getAndSet(true)) { - // We must close down all producers and consumers in an orderly fashion. This is the only method - // that can be called from a different thread of control from the one controlling the session. - synchronized (_messageDeliveryLock) + synchronized (_connection.getFailoverMutex()) { - // Ensure we only try and close an open session. - if (!_closed.getAndSet(true)) + // We must close down all producers and consumers in an orderly fashion. This is the only method + // that can be called from a different thread of control from the one controlling the session. + synchronized (_messageDeliveryLock) { // we pass null since this is not an error case closeProducersAndConsumers(null); @@ -655,33 +655,37 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // with the correct error code and text this is cleary WRONG as the instanceof check below will fail. // We need to determin here if the connection should be - synchronized (_connection.getFailoverMutex()) + if (e instanceof AMQDisconnectedException) { - if (e instanceof AMQDisconnectedException) + if (_dispatcher != null) { - if (_dispatcher != null) - { - // Failover failed and ain't coming back. Knife the dispatcher. - _dispatcher.interrupt(); - } + // Failover failed and ain't coming back. Knife the dispatcher. + _dispatcher.interrupt(); } - synchronized (_messageDeliveryLock) + } + + if (!_closed.getAndSet(true)) + { + synchronized (_connection.getFailoverMutex()) { - // An AMQException has an error code and message already and will be passed in when closure occurs as a - // result of a channel close request - _closed.set(true); - AMQException amqe; - if (e instanceof AMQException) - { - amqe = (AMQException) e; - } - else + synchronized (_messageDeliveryLock) { - amqe = new AMQException("Closing session forcibly", e); - } + // An AMQException has an error code and message already and will be passed in when closure occurs as a + // result of a channel close request + AMQException amqe; + if (e instanceof AMQException) + { + amqe = (AMQException) e; + } + else + { + amqe = new AMQException("Closing session forcibly", e); + } - _connection.deregisterSession(_channelId); - closeProducersAndConsumers(amqe); + + _connection.deregisterSession(_channelId); + closeProducersAndConsumers(amqe); + } } } } -- cgit v1.2.1 From ddd769ed901ea44163dd154ea978957d5dddeed7 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 17 Mar 2008 17:44:56 +0000 Subject: QPID-857 : Reset the State Manager when receiveing a CLOSED state. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@637989 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpid/client/AMQConnection.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 7a54617bf1..8cbcda053b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -539,19 +539,25 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { TransportConnection.getInstance(brokerDetail).connect(_protocolHandler, brokerDetail); - // this blocks until the connection has been set up or when an error - // has prevented the connection being set up + // this blocks until the connection has been set up or when an error + // has prevented the connection being set up //_protocolHandler.attainState(AMQState.CONNECTION_OPEN); AMQState state = _protocolHandler.attainState(openOrClosedStates); - if(state == AMQState.CONNECTION_OPEN) + if (state == AMQState.CONNECTION_OPEN) { - _failoverPolicy.attainedConnection(); // Again this should be changed to a suitable notify _connected = true; } + else if (state == AMQState.CONNECTION_CLOSED) + { + //We need to change protocol handler here as an error during the connect will not + // cause the StateManager to be replaced. So the state is out of sync on reconnect + // This occurs here when we need to re-negotiate protocol versions + _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_NOT_STARTED); + } } catch (AMQException e) { -- cgit v1.2.1 From f79decd5edc925aefebac3a5b93c04192a84f9c7 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Tue, 18 Mar 2008 12:33:34 +0000 Subject: QPID-847 : Prevented the InvalidArgumentException from closing the connection. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@638346 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/client/AMQAuthenticationException.java | 5 +++++ .../src/main/java/org/apache/qpid/client/AMQConnection.java | 12 +++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java b/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java index b6fbb6c6bf..8f90a0bdfa 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java @@ -39,4 +39,9 @@ public class AMQAuthenticationException extends AMQException { super(error, msg); } + public boolean isHardError() + { + return false; + } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 8cbcda053b..4b8143cfb5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -1301,7 +1301,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _logger.error("Throwable Received but no listener set: " + cause.getMessage()); } - if (!(cause instanceof AMQUndeliveredException) && !(cause instanceof AMQAuthenticationException)) + if (hardError(cause)) { try { @@ -1325,6 +1325,16 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } + private boolean hardError(Throwable cause) + { + if (cause instanceof AMQException) + { + return ((AMQException)cause).isHardError(); + } + + return true; + } + void registerSession(int channelId, AMQSession session) { _sessions.put(channelId, session); -- cgit v1.2.1 From 515327fab94bbbf5f2be1eac097c90754979131f Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Wed, 19 Mar 2008 17:42:42 +0000 Subject: QPID-862 : Add IOException to valid reason to start failover and removed the propogation of the error to client, as they don't need to know of the error unless failover fails. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@638950 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 7909663f24..9f2ba1d84d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -54,6 +54,7 @@ import org.apache.qpid.ssl.SSLContextFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.Iterator; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; @@ -355,13 +356,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter if (_failoverState == FailoverState.NOT_STARTED) { // if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException))) - if (cause instanceof AMQConnectionClosedException) + if ((cause instanceof AMQConnectionClosedException) || cause instanceof IOException) { _logger.info("Exception caught therefore going to attempt failover: " + cause, cause); // this will attemp failover sessionClosed(session); - _connection.exceptionReceived(cause); } else { -- cgit v1.2.1 From d9b578a4ec0a73e3c738610a6a6d79955a07a5bd Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Thu, 20 Mar 2008 11:29:56 +0000 Subject: QPID-854 Resynchronise the _acceptor so that we don't hang git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@639251 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/client/transport/TransportConnection.java | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 2a5365c322..7ae2ddf66c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -332,21 +332,18 @@ public class TransportConnection public static void killVMBroker(int port) { - VmPipeAddress pipe; synchronized (_inVmPipeAddress) { - pipe = (VmPipeAddress) _inVmPipeAddress.get(port); + VmPipeAddress pipe = (VmPipeAddress) _inVmPipeAddress.get(port); if (pipe != null) { _logger.info("Killing VM Broker:" + port); _inVmPipeAddress.remove(port); + // This does need to be sychronized as otherwise mina can hang + // if a new connection is made + _acceptor.unbind(pipe); } } - //This doesn't need to be sychronized - if (pipe != null) - { - _acceptor.unbind(pipe); - } } } -- cgit v1.2.1 From 1077fc8f79b764b553dab6a4db01337721695f31 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Fri, 21 Mar 2008 11:12:29 +0000 Subject: QPID-866 : Based on Patch from ASkinner. Only the FailoverException makes sence to process this way so remove list and synchronized so we either do an add or throw the set FailoverException. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@639598 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/client/failover/FailoverHandler.java | 6 +++- .../qpid/client/protocol/AMQProtocolHandler.java | 42 ++++++++++++++++++---- 2 files changed, 40 insertions(+), 8 deletions(-) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java index 9911b75b80..a944ff6bec 100644 --- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java @@ -120,13 +120,17 @@ public class FailoverHandler implements Runnable // We wake up listeners. If they can handle failover, they will extend the // FailoverRetrySupport class and will in turn block on the latch until failover // has completed before retrying the operation. - _amqProtocolHandler.propagateExceptionToWaiters(new FailoverException("Failing over about to start")); + _amqProtocolHandler.notifyFailoverStarting(); // Since failover impacts several structures we protect them all with a single mutex. These structures // are also in child objects of the connection. This allows us to manipulate them without affecting // client code which runs in a separate thread. synchronized (_amqProtocolHandler.getConnection().getFailoverMutex()) { + //Clear the exception now that we have the failover mutex there can be no one else waiting for a frame so + // we can clear the exception. + _amqProtocolHandler.failoverInProgress(); + // We switch in a new state manager temporarily so that the interaction to get to the "connection open" // state works, without us having to terminate any existing "state waiters". We could theoretically // have a state waiter waiting until the connection is closed for some reason. Or in future we may have diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 9f2ba1d84d..3932b098cd 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -153,6 +153,10 @@ public class AMQProtocolHandler extends IoHandlerAdapter /** Used to provide a condition to wait upon for operations that are required to wait for failover to complete. */ private CountDownLatch _failoverLatch; + + /** The last failover exception that occured */ + private FailoverException _lastFailoverException; + /** Defines the default timeout to use for synchronous protocol commands. */ private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30; @@ -419,6 +423,24 @@ public class AMQProtocolHandler extends IoHandlerAdapter } } + public void notifyFailoverStarting() + { + // Set the last exception in the sync block to ensure the ordering with add. + // either this gets done and the add does the ml.error + // or the add completes first and the iterator below will do ml.error + synchronized (_frameListeners) + { + _lastFailoverException = new FailoverException("Failing over about to start"); + } + + propagateExceptionToWaiters(_lastFailoverException); + } + + public void failoverInProgress() + { + _lastFailoverException = null; + } + private static int _messageReceivedCount; public void messageReceived(IoSession session, Object message) throws Exception @@ -471,11 +493,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter new AMQMethodEvent(channelId, (AMQMethodBody) bodyFrame); try - { + { - boolean wasAnyoneInterested = getStateManager().methodReceived(evt); + boolean wasAnyoneInterested = getStateManager().methodReceived(evt); if (!_frameListeners.isEmpty()) { + //This iterator is safe from the error state as the frame listeners always add before they send so their + // will be ready and waiting for this response. Iterator it = _frameListeners.iterator(); while (it.hasNext()) { @@ -592,7 +616,15 @@ public class AMQProtocolHandler extends IoHandlerAdapter { try { - _frameListeners.add(listener); + synchronized (_frameListeners) + { + if (_lastFailoverException != null) + { + throw _lastFailoverException; + } + + _frameListeners.add(listener); + } _protocolSession.writeFrame(frame); AMQMethodEvent e = listener.blockForFrame(timeout); @@ -601,10 +633,6 @@ public class AMQProtocolHandler extends IoHandlerAdapter // When control resumes before this line, a reply will have been received // that matches the criteria defined in the blocking listener } - catch (AMQException e) - { - throw e; - } finally { // If we don't removeKey the listener then no-one will -- cgit v1.2.1 From c5443b4489acd4782ed8049c0f2c687c9fbf9c81 Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Fri, 21 Mar 2008 13:04:46 +0000 Subject: QPID-867 Always close connections git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@639619 13f79535-47bb-0310-9956-ffa450edef68 --- .../unit/client/connection/ConnectionTest.java | 78 ++++++++++++++++++---- 1 file changed, 66 insertions(+), 12 deletions(-) (limited to 'java/client') diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java index 53d25c2cad..4b4df7e5c8 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java @@ -56,25 +56,30 @@ public class ConnectionTest extends TestCase TransportConnection.killVMBroker(1); } - public void testSimpleConnection() + public void testSimpleConnection() throws Exception { + AMQConnection conn = null; try { - AMQConnection conn = new AMQConnection(_broker, "guest", "guest", "fred", "test"); - conn.close(); + conn = new AMQConnection(_broker, "guest", "guest", "fred", "test"); } catch (Exception e) { fail("Connection to " + _broker + " should succeed. Reason: " + e); } + finally + { + conn.close(); + } } - public void testDefaultExchanges() + public void testDefaultExchanges() throws Exception { + AMQConnection conn = null; try { - AMQConnection conn = new AMQConnection("amqp://guest:guest@clientid/test?brokerlist='" + conn = new AMQConnection("amqp://guest:guest@clientid/test?brokerlist='" + _broker + "?retries='1''&defaultQueueExchange='test.direct'" + "&defaultTopicExchange='test.topic'" @@ -107,21 +112,24 @@ public class ConnectionTest extends TestCase topicSession.close(); - - conn.close(); } catch (Exception e) { fail("Connection to " + _broker + " should succeed. Reason: " + e); } + finally + { + conn.close(); + } } //See QPID-771 public void testPasswordFailureConnection() throws Exception { + AMQConnection conn = null; try { - new AMQConnection("amqp://guest:rubbishpassword@clientid/test?brokerlist='" + _broker + "?retries='1''"); + conn = new AMQConnection("amqp://guest:rubbishpassword@clientid/test?brokerlist='" + _broker + "?retries='1''"); fail("Connection should not be established password is wrong."); } catch (AMQException amqe) @@ -136,13 +144,21 @@ public class ConnectionTest extends TestCase Exception linked = ((JMSException) amqe.getCause()).getLinkedException(); assertEquals("Exception was wrong type", AMQAuthenticationException.class, linked.getClass()); } + finally + { + if (conn != null) + { + conn.close(); + } + } } public void testConnectionFailure() throws Exception { + AMQConnection conn = null; try { - new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='" + _broker_NotRunning + "?retries='0''"); + conn = new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='" + _broker_NotRunning + "?retries='0''"); fail("Connection should not be established"); } catch (AMQException amqe) @@ -152,14 +168,22 @@ public class ConnectionTest extends TestCase fail("Correct exception not thrown. Excpected 'AMQConnectionException' got: " + amqe); } } + finally + { + if (conn != null) + { + conn.close(); + } + } } public void testUnresolvedHostFailure() throws Exception { + AMQConnection conn = null; try { - new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='" + _broker_BadDNS + "?retries='0''"); + conn = new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='" + _broker_BadDNS + "?retries='0''"); fail("Connection should not be established"); } catch (AMQException amqe) @@ -169,13 +193,22 @@ public class ConnectionTest extends TestCase fail("Correct exception not thrown. Excpected 'AMQUnresolvedAddressException' got: " + amqe); } } + finally + { + if (conn != null) + { + conn.close(); + } + } + } public void testUnresolvedVirtualHostFailure() throws Exception { + AMQConnection conn = null; try { - new AMQConnection("amqp://guest:guest@clientid/rubbishhost?brokerlist='" + _broker + "?retries='0''"); + conn = new AMQConnection("amqp://guest:guest@clientid/rubbishhost?brokerlist='" + _broker + "?retries='0''"); fail("Connection should not be established"); } catch (AMQException amqe) @@ -185,6 +218,13 @@ public class ConnectionTest extends TestCase fail("Correct exception not thrown. Excpected 'AMQConnectionFailureException' got: " + amqe); } } + finally + { + if (conn != null) + { + conn.close(); + } + } } public void testClientIdCannotBeChanged() throws Exception @@ -200,13 +240,27 @@ public class ConnectionTest extends TestCase { // PASS } + finally + { + if (connection != null) + { + connection.close(); + } + } } public void testClientIdIsPopulatedAutomatically() throws Exception { Connection connection = new AMQConnection(_broker, "guest", "guest", null, "test"); - assertNotNull(connection.getClientID()); + try + { + assertNotNull(connection.getClientID()); + } + finally + { + connection.close(); + } } public static junit.framework.Test suite() -- cgit v1.2.1 From 30c764cf4a22961f169bd5f6716acee85c49c431 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Mon, 24 Mar 2008 13:49:06 +0000 Subject: QPID-873 : Authentication Exception should be hard error; also NPE in PropertiesPrincipalDatabase when user not known git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@640417 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpid/client/AMQAuthenticationException.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java b/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java index 8f90a0bdfa..69ff7a2c19 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java @@ -41,7 +41,7 @@ public class AMQAuthenticationException extends AMQException } public boolean isHardError() { - return false; + return true; } } -- cgit v1.2.1