summaryrefslogtreecommitdiff
path: root/qpid/java/client
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client')
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java59
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java53
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Client.java72
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java123
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java81
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Subscriber.java98
-rw-r--r--qpid/java/client/pom.xml44
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java31
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java10
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java25
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java25
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java25
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java17
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java118
15 files changed, 658 insertions, 127 deletions
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java
index 8784d340da..b6544db995 100644
--- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java
@@ -18,18 +18,18 @@
*/
package org.apache.qpid.example.publisher;
-import org.apache.log4j.Logger;
import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.Logger;
-import javax.jms.*;
-
-import java.util.Properties;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
/**
- * Class that sends heartbeat messages to allow monitoring of message consumption
- * Sends regular (currently 20 seconds apart) heartbeat message
+ * Class that sends heartbeat messages to allow monitoring of message consumption Sends regular (currently 20 seconds
+ * apart) heartbeat message
*/
-public class MonitorMessageDispatcher {
+public class MonitorMessageDispatcher
+{
private static final Logger _logger = Logger.getLogger(MonitorMessageDispatcher.class);
@@ -39,17 +39,18 @@ public class MonitorMessageDispatcher {
/**
* Easy entry point for running a message dispatcher for monitoring consumption
+ *
* @param args
*/
public static void main(String[] args)
{
-
//Switch on logging appropriately for your app
BasicConfigurator.configure();
try
{
- while(true)
+ int i =0;
+ while (i < 1000)
{
try
{
@@ -62,9 +63,10 @@ public class MonitorMessageDispatcher {
}
//sleep for twenty seconds and then publish again - change if appropriate
- Thread.sleep(20000);
+ //Thread.sleep(1000);
+ i++ ;
}
- catch(UndeliveredMessageException a)
+ catch (UndeliveredMessageException a)
{
//trigger application specific failure handling here
_logger.error("Problem delivering monitor message");
@@ -72,7 +74,7 @@ public class MonitorMessageDispatcher {
}
}
}
- catch(Exception e)
+ catch (Exception e)
{
_logger.error("Error trying to dispatch AMS monitor message: " + e);
System.exit(1);
@@ -81,7 +83,7 @@ public class MonitorMessageDispatcher {
{
if (getMonitorPublisher() != null)
{
- getMonitorPublisher().cleanup();
+ getMonitorPublisher().cleanup();
}
}
@@ -90,19 +92,24 @@ public class MonitorMessageDispatcher {
/**
* Publish heartbeat message
+ *
* @throws JMSException
* @throws UndeliveredMessageException
*/
public static void publish() throws JMSException, UndeliveredMessageException
{
//Send the message generated from the payload using the _publisher
- getMonitorPublisher().sendImmediateMessage
- (FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(),"monitor:" +System.currentTimeMillis()));
+// getMonitorPublisher().sendImmediateMessage
+// (FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(),"monitor:" +System.currentTimeMillis()));
+
+ getMonitorPublisher().sendMessage
+ (getMonitorPublisher()._session,
+ FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(), "monitor:" + System.currentTimeMillis()),
+ DeliveryMode.PERSISTENT, false, true);
+
}
- /**
- * Cleanup publishers
- */
+ /** Cleanup publishers */
public static void cleanup()
{
if (getMonitorPublisher() != null)
@@ -119,16 +126,16 @@ public class MonitorMessageDispatcher {
//Returns a _publisher for the monitor queue
private static MonitorPublisher getMonitorPublisher()
{
- if (_monitorPublisher != null)
- {
- return _monitorPublisher;
- }
+ if (_monitorPublisher != null)
+ {
+ return _monitorPublisher;
+ }
- //Create a _publisher using failover details and constant for monitor queue
- _monitorPublisher = new MonitorPublisher();
+ //Create a _publisher using failover details and constant for monitor queue
+ _monitorPublisher = new MonitorPublisher();
- _monitorPublisher.setName(MonitorMessageDispatcher.DEFAULT_MONITOR_PUB_NAME);
- return _monitorPublisher;
+ _monitorPublisher.setName(MonitorMessageDispatcher.DEFAULT_MONITOR_PUB_NAME);
+ return _monitorPublisher;
}
}
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java
index 233c3fea0a..a67b602e58 100644
--- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java
@@ -18,15 +18,17 @@
*/
package org.apache.qpid.example.publisher;
-import javax.jms.Message;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.BasicMessageProducer;
+
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
-import org.apache.qpid.client.BasicMessageProducer;
-import org.apache.log4j.Logger;
+import javax.jms.Message;
+import javax.jms.Session;
/**
- * Subclass of Publisher which uses QPID functionality to send a heartbeat message
- * Note immediate flag not available via JMS MessageProducer
+ * Subclass of Publisher which uses QPID functionality to send a heartbeat message Note immediate flag not available via
+ * JMS MessageProducer
*/
public class MonitorPublisher extends Publisher
{
@@ -40,14 +42,45 @@ public class MonitorPublisher extends Publisher
super();
}
- /*
- * Publishes a non-persistent message using transacted session
- */
+ /*
+ * Publishes a message using given details
+ */
+ public boolean sendMessage(Session session, Message message, int deliveryMode,
+ boolean immediate, boolean commit) throws UndeliveredMessageException
+ {
+ try
+ {
+ _producer = (BasicMessageProducer) session.createProducer(_destination);
+
+ _producer.send(message, deliveryMode, immediate);
+
+ if (commit)
+ {
+ //commit the message send and close the transaction
+ _session.commit();
+ }
+
+ }
+ catch (JMSException e)
+ {
+ //Have to assume our commit failed but do not rollback here as channel closed
+ _log.error(e);
+ e.printStackTrace();
+ throw new UndeliveredMessageException("Cannot deliver immediate message", e);
+ }
+
+ _log.info(_name + " finished sending message: " + message);
+ return true;
+ }
+
+ /*
+ * Publishes a non-persistent message using transacted session
+ */
public boolean sendImmediateMessage(Message message) throws UndeliveredMessageException
{
try
{
- _producer = (BasicMessageProducer)_session.createProducer(_destination);
+ _producer = (BasicMessageProducer) _session.createProducer(_destination);
//Send message via our producer which is not persistent and is immediate
//NB: not available via jms interface MessageProducer
@@ -62,7 +95,7 @@ public class MonitorPublisher extends Publisher
//Have to assume our commit failed but do not rollback here as channel closed
_log.error(e);
e.printStackTrace();
- throw new UndeliveredMessageException("Cannot deliver immediate message",e);
+ throw new UndeliveredMessageException("Cannot deliver immediate message", e);
}
_log.info(_name + " finished sending message: " + message);
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Client.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Client.java
new file mode 100644
index 0000000000..e32ee0ba73
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Client.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.example.pubsub;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.naming.NamingException;
+
+/**
+ * An abstract base class that wraps up the creation of a JMS client utilising JNDI
+ */
+public abstract class Client
+{
+ protected ConnectionSetup _setup;
+
+ protected Connection _connection;
+ protected Destination _destination;
+ protected Session _session;
+
+ public Client(String destination)
+ {
+ if (destination == null)
+ {
+ destination = ConnectionSetup.TOPIC_JNDI_NAME;
+ }
+
+ try
+ {
+ _setup = new ConnectionSetup();
+ }
+ catch (NamingException e)
+ {
+ //ignore
+ }
+
+ if (_setup != null)
+ {
+ try
+ {
+ _connection = _setup.getConnectionFactory().createConnection();
+ _destination = _setup.getDestination(destination);
+ }
+ catch (JMSException e)
+ {
+ System.err.println(e.getMessage());
+ }
+ }
+ }
+
+ public abstract void start();
+
+} \ No newline at end of file
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java
new file mode 100644
index 0000000000..c4edd9034f
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.example.pubsub;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import java.util.Properties;
+
+/**
+ * This ConnectionSetup is a wrapper around JNDI it creates a number of entries.
+ *
+ * It is equivalent to a PropertyFile of value:
+ *
+ * connectionfactory.local=amqp://guest:guest@clientid/test?brokerlist='localhost'
+ * connectionfactory.vm=amqp://guest:guest@clientid/test?brokerlist='vm://:1'
+ *
+ * queue.queue=example.MyQueue
+ * topic.topic=example.hierarical.topic
+ *
+ */
+public class ConnectionSetup
+{
+ final static String INITIAL_CONTEXT_FACTORY = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory";
+
+ final static String CONNECTION_JNDI_NAME = "local";
+ final static String CONNECTION_NAME = "amqp://guest:guest@clientid/test?brokerlist='localhost'";
+
+ public static final String QUEUE_JNDI_NAME = "queue";
+ final static String QUEUE_NAME = "example.MyQueue";
+
+ public static final String TOPIC_JNDI_NAME = "topic";
+ final static String TOPIC_NAME = "example.hierarical.topic";
+
+ private Context _ctx;
+
+ public ConnectionSetup() throws NamingException
+ {
+
+ // Set the properties ...
+ Properties properties = new Properties();
+ properties.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY);
+ properties.put("connectionfactory." + CONNECTION_JNDI_NAME, CONNECTION_NAME);
+ properties.put("connectionfactory." + "vm", "amqp://guest:guest@clientid/test?brokerlist='vm://:1'");
+
+ properties.put("queue." + QUEUE_JNDI_NAME, QUEUE_NAME);
+ properties.put("topic." + TOPIC_JNDI_NAME, TOPIC_NAME);
+ // Create the initial context
+ _ctx = new InitialContext(properties);
+
+ }
+
+ public ConnectionSetup(Properties properties) throws NamingException
+ {
+ _ctx = new InitialContext(properties);
+ }
+
+ public ConnectionFactory getConnectionFactory()
+ {
+
+ // Perform the lookups
+ try
+ {
+ return (ConnectionFactory) _ctx.lookup(CONNECTION_JNDI_NAME);
+ }
+ catch (NamingException e)
+ {
+ //ignore
+ }
+ return null;
+ }
+
+ public Destination getDestination(String jndiName)
+ {
+ // Perform the lookups
+ try
+ {
+ return (Destination) _ctx.lookup(jndiName);
+ }
+ catch (ClassCastException cce)
+ {
+ //ignore
+ }
+ catch (NamingException ne)
+ {
+ //ignore
+ }
+ return null;
+ }
+
+
+ public void close()
+ {
+ try
+ {
+ _ctx.close();
+ }
+ catch (NamingException e)
+ {
+ //ignore
+ }
+ }
+}
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java
new file mode 100644
index 0000000000..dd936e429f
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.example.pubsub;
+
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+/**
+ * A simple Publisher example.
+ *
+ * The class can take two arguments.
+ * java Publisher <destination> <msgCount>
+ * Where:
+ * destination is either 'topic' or 'queue' (Default: topic)
+ * msgCount is the number of messages to send (Default : 100)
+ *
+ */
+public class Publisher extends Client
+{
+ int _msgCount;
+
+ public Publisher(String destination, int msgCount)
+ {
+ super(destination);
+ _msgCount = msgCount;
+ }
+
+ public void start()
+ {
+ try
+ {
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer _producer = _session.createProducer(_destination);
+
+ for (int msgCount = 0; msgCount < _msgCount; msgCount++)
+ {
+ _producer.send(_session.createTextMessage("msg:" + msgCount));
+ System.out.println("Sent:" + msgCount);
+ }
+
+ System.out.println("Done.");
+ _connection.close();
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+
+
+ public static void main(String[] args)
+ {
+
+ String destination = args.length > 2 ? args[1] : null;
+
+ int msgCount = args.length > 2 ? Integer.parseInt(args[2]) : 100;
+
+ new Publisher(destination, msgCount).start();
+ }
+
+}
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Subscriber.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Subscriber.java
new file mode 100644
index 0000000000..f2d736701f
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Subscriber.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.example.pubsub;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import java.util.concurrent.CountDownLatch;
+
+
+/**
+ * Simple client that listens for the specified number of msgs on the given Destinaton
+ *
+ * The class can take two arguments.
+ * java Subscriber <destination> <msgCount>
+ * Where:
+ * destination is either 'topic' or 'queue' (Default: topic)
+ * msgCount is the number of messages to send (Default : 100)
+ */
+public class Subscriber extends Client implements MessageListener
+{
+
+ CountDownLatch _count;
+
+ public Subscriber(String destination, int msgCount)
+ {
+ super(destination);
+ _count = new CountDownLatch(msgCount);
+ }
+
+
+ public void start()
+ {
+ try
+ {
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _session.createDurableSubscriber((Topic) _setup.getDestination(ConnectionSetup.TOPIC_JNDI_NAME),
+ "exampleClient").setMessageListener(this);
+ _connection.start();
+ _count.await();
+
+ System.out.println("Done");
+
+ _connection.close();
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+
+ public static void main(String[] args)
+ {
+ String destination = args.length > 2 ? args[1] : null;
+ int msgCount = args.length > 2 ? Integer.parseInt(args[2]) : 100;
+
+ new Subscriber(destination, msgCount).start();
+ }
+
+ public void onMessage(Message message)
+ {
+ try
+ {
+ _count.countDown();
+ System.out.println("Received msg:" + ((TextMessage) message).getText());
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+}
diff --git a/qpid/java/client/pom.xml b/qpid/java/client/pom.xml
index c36c54a10f..b51e540c2d 100644
--- a/qpid/java/client/pom.xml
+++ b/qpid/java/client/pom.xml
@@ -123,6 +123,50 @@
<build>
<plugins>
+
+ <plugin>
+ <artifactId>minijar-maven-plugin</artifactId>
+ <groupId>org.codehaus.mojo</groupId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>ueberjar</goal>
+ </goals>
+ <configuration>
+ <stripUnusedClasses>false</stripUnusedClasses>
+ <name>[artifactId]-[version]-single.jar</name>
+ <classifier>single</classifier>
+ <attach>true</attach>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>attach-artifacts</id>
+ <phase>package</phase>
+ <goals>
+ <goal>attach-artifact</goal>
+ </goals>
+ <configuration>
+ <artifacts>
+ <artifact>
+ <file>target/${artifactId}-${version}-single.jar</file>
+ <type>jar</type>
+ <classifier>single</classifier>
+ </artifact>
+ </artifacts>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index d59412fdba..ddce0db7ff 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -46,7 +46,6 @@ import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.FailoverPolicy;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.url.URLSyntaxException;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,7 +67,6 @@ import javax.naming.NamingException;
import javax.naming.Reference;
import javax.naming.Referenceable;
import javax.naming.StringRefAddr;
-
import java.io.IOException;
import java.net.ConnectException;
import java.nio.channels.UnresolvedAddressException;
@@ -1148,7 +1146,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
else
{
- _logger.info("Not a hard-error connection not closing.");
+ _logger.info("Not a hard-error connection not closing: " + cause.getMessage());
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
index b3fbd1f510..24f5ead2d0 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
@@ -20,6 +20,14 @@
*/
package org.apache.qpid.client;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.url.URLHelper;
+import org.apache.qpid.url.URLSyntaxException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
@@ -27,14 +35,10 @@ import java.util.LinkedList;
import java.util.List;
import java.util.StringTokenizer;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.jms.ConnectionURL;
-import org.apache.qpid.url.URLHelper;
-import org.apache.qpid.url.URLSyntaxException;
-
public class AMQConnectionURL implements ConnectionURL
{
+ private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionURL.class);
+
private String _url;
private String _failoverMethod;
private HashMap<String, String> _failoverOptions;
@@ -162,7 +166,7 @@ public class AMQConnectionURL implements ConnectionURL
if ((slash != 0) && (fullURL.charAt(slash - 1) == ':'))
{
throw URLHelper.parseError(slash - 2, fullURL.indexOf('?') - slash + 2,
- "Virtual host looks like a windows path, forward slash not allowed in URL", fullURL);
+ "Virtual host looks like a windows path, forward slash not allowed in URL", fullURL);
}
else
{
@@ -182,7 +186,7 @@ public class AMQConnectionURL implements ConnectionURL
if (colonIndex == -1)
{
throw URLHelper.parseError(AMQ_PROTOCOL.length() + 3, userinfo.length(),
- "Null password in user information not allowed.", _url);
+ "Null password in user information not allowed.", _url);
}
else
{
@@ -387,7 +391,14 @@ public class AMQConnectionURL implements ConnectionURL
if (_password != null)
{
sb.append(':');
- sb.append(_password);
+ if (_logger.isDebugEnabled())
+ {
+ sb.append(_password);
+ }
+ else
+ {
+ sb.append("********");
+ }
}
sb.append('@');
@@ -432,7 +443,7 @@ public class AMQConnectionURL implements ConnectionURL
public static void main(String[] args) throws URLSyntaxException
{
String url2 =
- "amqp://ritchiem:bob@temp?brokerlist='tcp://localhost:5672;jcp://fancyserver:3000/',failover='roundrobin'";
+ "amqp://ritchiem:bob@temp?brokerlist='tcp://localhost:5672;jcp://fancyserver:3000/',failover='roundrobin'";
// "amqp://user:pass@clientid/virtualhost?brokerlist='tcp://host:1?option1=\'value\',option2=\'value\';vm://:3?option1=\'value\'',failover='method?option1=\'value\',option2='value''";
ConnectionURL connectionurl2 = new AMQConnectionURL(url2);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 879578bd6c..8f0ad3947a 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -2319,6 +2319,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
+ public void declareAndBind(AMQDestination amqd)
+ throws
+ AMQException
+ {
+ AMQProtocolHandler protocolHandler = getProtocolHandler();
+ declareExchange(amqd, protocolHandler, false);
+ AMQShortString queueName = declareQueue(amqd, protocolHandler);
+ bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName());
+ }
+
/**
* Callers must hold the failover mutex before calling this method.
*
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java
index 8f9a84a3a6..862a9be8d4 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java
@@ -1,18 +1,21 @@
/*
*
- * Copyright (c) 2006 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*
*/
package org.apache.qpid.client.handler;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java
index 81228b4cdc..65060d44d2 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java
@@ -1,18 +1,21 @@
/*
*
- * Copyright (c) 2006 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*
*/
package org.apache.qpid.client.handler;
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java
index d19a6095d5..9600d1e9d3 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java
@@ -1,18 +1,21 @@
/*
*
- * Copyright (c) 2006 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*
*/
package org.apache.qpid.test.unit.client.channelclose;
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java
index 9c354ee260..9cde24dd92 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java
@@ -21,9 +21,7 @@
package org.apache.qpid.test.unit.client.forwardall;
import junit.framework.TestCase;
-
import org.apache.qpid.testutil.VMBrokerSetup;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,6 +32,7 @@ import org.slf4j.LoggerFactory;
public class CombinedTest extends TestCase
{
private static final Logger _logger = LoggerFactory.getLogger(CombinedTest.class);
+ private int run = 0;
protected void setUp() throws Exception
{
@@ -48,14 +47,18 @@ public class CombinedTest extends TestCase
public void testForwardAll() throws Exception
{
- int services = 2;
- ServiceCreator.start("vm://:1", services);
+ while (run < 10)
+ {
+ int services = 2;
+ ServiceCreator.start("vm://:1", services);
+
+ _logger.info("Starting " + ++run + " client...");
- _logger.info("Starting client...");
+ new Client("vm://:1", services).shutdownWhenComplete();
- new Client("vm://:1", services).shutdownWhenComplete();
- _logger.info("Completed successfully!");
+ _logger.info("Completed " + run + " successfully!");
+ }
}
public static junit.framework.Test suite()
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
index df2a38d0fc..1a45773907 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
@@ -21,12 +21,10 @@
package org.apache.qpid.test.unit.transacted;
import junit.framework.TestCase;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.url.URLSyntaxException;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,6 +55,9 @@ public class CommitRollbackTest extends TestCase
private static final Logger _logger = LoggerFactory.getLogger(CommitRollbackTest.class);
private static final String BROKER = "vm://:1";
+ private boolean _gotone = false;
+ private boolean _gottwo = false;
+ private boolean _gottwoRedelivered = false;
protected void setUp() throws Exception
{
@@ -340,57 +341,98 @@ public class CommitRollbackTest extends TestCase
*
* @throws Exception On error
*/
- /*public void testSend2ThenRollback() throws Exception
+ public void testSend2ThenRollback() throws Exception
{
- assertTrue("session is not transacted", _session.getTransacted());
- assertTrue("session is not transacted", _pubSession.getTransacted());
+ int run = 0;
+ while (run < 10)
+ {
+ run++;
+ _logger.info("Run:" + run);
+ assertTrue("session is not transacted", _session.getTransacted());
+ assertTrue("session is not transacted", _pubSession.getTransacted());
- _logger.info("sending two test messages");
- _publisher.send(_pubSession.createTextMessage("1"));
- _publisher.send(_pubSession.createTextMessage("2"));
- _pubSession.commit();
+ _logger.info("sending two test messages");
+ _publisher.send(_pubSession.createTextMessage("1"));
+ _publisher.send(_pubSession.createTextMessage("2"));
+ _pubSession.commit();
- _logger.info("getting test message");
- assertEquals("1", ((TextMessage) _consumer.receive(1000)).getText());
+ _logger.info("getting test message");
+ assertEquals("1", ((TextMessage) _consumer.receive(1000)).getText());
- _logger.info("rolling back");
- _session.rollback();
+ _logger.info("rolling back");
+ _session.rollback();
- _logger.info("receiving result");
- Message result = _consumer.receive(1000);
+ _logger.info("receiving result");
+ Message result = _consumer.receive(1000);
- assertNotNull("test message was consumed and rolled back, but is gone", result);
+ assertNotNull("test message was consumed and rolled back, but is gone", result);
+ // Message Order is:
- if (((TextMessage) result).getText().equals("2"))
- {
- assertTrue("Messasge is marked as redelivered", !result.getJMSRedelivered());
+ // Send 1 , 2
+ // Retrieve 1 and then rollback
+ // Receieve 1 (redelivered) , 2 (may or may not be redelivered??)
- result = _consumer.receive(1000);
- assertEquals("1", ((TextMessage) result).getText());
- assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered());
+ verifyMessages(result);
+
+ // Occassionally get message 2 first!
+// assertEquals("Should get message one first", "1", ((TextMessage) result).getText());
+// assertTrue("Message is not marked as redelivered", result.getJMSRedelivered());
+//
+// result = _consumer.receive(1000);
+// assertEquals("Second message should be message 2", "2", ((TextMessage) result).getText());
+// assertTrue("Message is not marked as redelivered", result.getJMSRedelivered());
+//
+// result = _consumer.receive(1000);
+// assertNull("There should be no more messages", result);
+
+ _session.commit();
}
- else
+ }
+
+ private void verifyMessages(Message result) throws JMSException
+ {
+
+ if (result == null)
{
- assertEquals("1", ((TextMessage) result).getText());
- assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered());
- result = _consumer.receive(1000);
- assertNotNull("test message was consumed and rolled back, but is gone", result);
- assertEquals("2", ((TextMessage) result).getText());
- assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered());
+ assertTrue("Didn't receive redelivered message one", _gotone);
+ assertTrue("Didn't receive message two at all", _gottwo | _gottwoRedelivered);
+ _gotone = false;
+ _gottwo = false;
+ _gottwoRedelivered = false;
+ return;
}
- result = _consumer.receive(1000);
+ if (((TextMessage) result).getText().equals("1"))
+ {
+ _logger.info("Got 1 redelivered");
+ assertTrue("Message is not marked as redelivered", result.getJMSRedelivered());
+ assertFalse("Already received message one", _gotone);
+ _gotone = true;
- if (result != null)
+ }
+ else
{
assertEquals("2", ((TextMessage) result).getText());
- assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered());
- result = _consumer.receive(1000);
+
+ if (result.getJMSRedelivered())
+ {
+ _logger.info("Got 2 redelivered, message was prefetched");
+ assertFalse("Already received message redelivered two", _gottwoRedelivered);
+
+ _gottwoRedelivered = true;
+ }
+ else
+ {
+ _logger.warn("Got 2, message prefetched wasn't cleared or messages was in transit when rollback occured");
+ assertFalse("Already received message two", _gottwo);
+
+ _gottwo = true;
+ }
}
- assertNull("test message should be null", result);
- }*/
+ verifyMessages(_consumer.receive(1000));
+ }
public void testSend2ThenCloseAfter1andTryAgain() throws Exception
{
@@ -417,12 +459,12 @@ public class CommitRollbackTest extends TestCase
_logger.info("receiving result");
- // NOTE: Both msg 1 & 2 will be marked as redelivered as they have both will have been rejected.
- // Only the occasion where it is not rejected will it mean it hasn't arrived at the client yet.
+// NOTE: Both msg 1 & 2 will be marked as redelivered as they have both will have been rejected.
+// Only the occasion where it is not rejected will it mean it hasn't arrived at the client yet.
result = _consumer.receive(1000);
assertNotNull("test message was consumed and rolled back, but is gone", result);
- // The first message back will be either 1 or 2 being redelivered
+// The first message back will be either 1 or 2 being redelivered
if (result.getJMSRedelivered())
{
assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered());