diff options
Diffstat (limited to 'qpid/java/client')
15 files changed, 658 insertions, 127 deletions
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java index 8784d340da..b6544db995 100644 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java @@ -18,18 +18,18 @@ */ package org.apache.qpid.example.publisher; -import org.apache.log4j.Logger; import org.apache.log4j.BasicConfigurator; +import org.apache.log4j.Logger; -import javax.jms.*; - -import java.util.Properties; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; /** - * Class that sends heartbeat messages to allow monitoring of message consumption - * Sends regular (currently 20 seconds apart) heartbeat message + * Class that sends heartbeat messages to allow monitoring of message consumption Sends regular (currently 20 seconds + * apart) heartbeat message */ -public class MonitorMessageDispatcher { +public class MonitorMessageDispatcher +{ private static final Logger _logger = Logger.getLogger(MonitorMessageDispatcher.class); @@ -39,17 +39,18 @@ public class MonitorMessageDispatcher { /** * Easy entry point for running a message dispatcher for monitoring consumption + * * @param args */ public static void main(String[] args) { - //Switch on logging appropriately for your app BasicConfigurator.configure(); try { - while(true) + int i =0; + while (i < 1000) { try { @@ -62,9 +63,10 @@ public class MonitorMessageDispatcher { } //sleep for twenty seconds and then publish again - change if appropriate - Thread.sleep(20000); + //Thread.sleep(1000); + i++ ; } - catch(UndeliveredMessageException a) + catch (UndeliveredMessageException a) { //trigger application specific failure handling here _logger.error("Problem delivering monitor message"); @@ -72,7 +74,7 @@ public class MonitorMessageDispatcher { } } } - catch(Exception e) + catch (Exception e) { _logger.error("Error trying to dispatch AMS monitor message: " + e); System.exit(1); @@ -81,7 +83,7 @@ public class MonitorMessageDispatcher { { if (getMonitorPublisher() != null) { - getMonitorPublisher().cleanup(); + getMonitorPublisher().cleanup(); } } @@ -90,19 +92,24 @@ public class MonitorMessageDispatcher { /** * Publish heartbeat message + * * @throws JMSException * @throws UndeliveredMessageException */ public static void publish() throws JMSException, UndeliveredMessageException { //Send the message generated from the payload using the _publisher - getMonitorPublisher().sendImmediateMessage - (FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(),"monitor:" +System.currentTimeMillis())); +// getMonitorPublisher().sendImmediateMessage +// (FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(),"monitor:" +System.currentTimeMillis())); + + getMonitorPublisher().sendMessage + (getMonitorPublisher()._session, + FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(), "monitor:" + System.currentTimeMillis()), + DeliveryMode.PERSISTENT, false, true); + } - /** - * Cleanup publishers - */ + /** Cleanup publishers */ public static void cleanup() { if (getMonitorPublisher() != null) @@ -119,16 +126,16 @@ public class MonitorMessageDispatcher { //Returns a _publisher for the monitor queue private static MonitorPublisher getMonitorPublisher() { - if (_monitorPublisher != null) - { - return _monitorPublisher; - } + if (_monitorPublisher != null) + { + return _monitorPublisher; + } - //Create a _publisher using failover details and constant for monitor queue - _monitorPublisher = new MonitorPublisher(); + //Create a _publisher using failover details and constant for monitor queue + _monitorPublisher = new MonitorPublisher(); - _monitorPublisher.setName(MonitorMessageDispatcher.DEFAULT_MONITOR_PUB_NAME); - return _monitorPublisher; + _monitorPublisher.setName(MonitorMessageDispatcher.DEFAULT_MONITOR_PUB_NAME); + return _monitorPublisher; } } diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java index 233c3fea0a..a67b602e58 100644 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java @@ -18,15 +18,17 @@ */ package org.apache.qpid.example.publisher; -import javax.jms.Message; +import org.apache.log4j.Logger; +import org.apache.qpid.client.BasicMessageProducer; + import javax.jms.DeliveryMode; import javax.jms.JMSException; -import org.apache.qpid.client.BasicMessageProducer; -import org.apache.log4j.Logger; +import javax.jms.Message; +import javax.jms.Session; /** - * Subclass of Publisher which uses QPID functionality to send a heartbeat message - * Note immediate flag not available via JMS MessageProducer + * Subclass of Publisher which uses QPID functionality to send a heartbeat message Note immediate flag not available via + * JMS MessageProducer */ public class MonitorPublisher extends Publisher { @@ -40,14 +42,45 @@ public class MonitorPublisher extends Publisher super(); } - /* - * Publishes a non-persistent message using transacted session - */ + /* + * Publishes a message using given details + */ + public boolean sendMessage(Session session, Message message, int deliveryMode, + boolean immediate, boolean commit) throws UndeliveredMessageException + { + try + { + _producer = (BasicMessageProducer) session.createProducer(_destination); + + _producer.send(message, deliveryMode, immediate); + + if (commit) + { + //commit the message send and close the transaction + _session.commit(); + } + + } + catch (JMSException e) + { + //Have to assume our commit failed but do not rollback here as channel closed + _log.error(e); + e.printStackTrace(); + throw new UndeliveredMessageException("Cannot deliver immediate message", e); + } + + _log.info(_name + " finished sending message: " + message); + return true; + } + + /* + * Publishes a non-persistent message using transacted session + */ public boolean sendImmediateMessage(Message message) throws UndeliveredMessageException { try { - _producer = (BasicMessageProducer)_session.createProducer(_destination); + _producer = (BasicMessageProducer) _session.createProducer(_destination); //Send message via our producer which is not persistent and is immediate //NB: not available via jms interface MessageProducer @@ -62,7 +95,7 @@ public class MonitorPublisher extends Publisher //Have to assume our commit failed but do not rollback here as channel closed _log.error(e); e.printStackTrace(); - throw new UndeliveredMessageException("Cannot deliver immediate message",e); + throw new UndeliveredMessageException("Cannot deliver immediate message", e); } _log.info(_name + " finished sending message: " + message); diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Client.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Client.java new file mode 100644 index 0000000000..e32ee0ba73 --- /dev/null +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Client.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.example.pubsub; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Session; +import javax.naming.NamingException; + +/** + * An abstract base class that wraps up the creation of a JMS client utilising JNDI + */ +public abstract class Client +{ + protected ConnectionSetup _setup; + + protected Connection _connection; + protected Destination _destination; + protected Session _session; + + public Client(String destination) + { + if (destination == null) + { + destination = ConnectionSetup.TOPIC_JNDI_NAME; + } + + try + { + _setup = new ConnectionSetup(); + } + catch (NamingException e) + { + //ignore + } + + if (_setup != null) + { + try + { + _connection = _setup.getConnectionFactory().createConnection(); + _destination = _setup.getDestination(destination); + } + catch (JMSException e) + { + System.err.println(e.getMessage()); + } + } + } + + public abstract void start(); + +}
\ No newline at end of file diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java new file mode 100644 index 0000000000..c4edd9034f --- /dev/null +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.example.pubsub; + +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; +import java.util.Properties; + +/** + * This ConnectionSetup is a wrapper around JNDI it creates a number of entries. + * + * It is equivalent to a PropertyFile of value: + * + * connectionfactory.local=amqp://guest:guest@clientid/test?brokerlist='localhost' + * connectionfactory.vm=amqp://guest:guest@clientid/test?brokerlist='vm://:1' + * + * queue.queue=example.MyQueue + * topic.topic=example.hierarical.topic + * + */ +public class ConnectionSetup +{ + final static String INITIAL_CONTEXT_FACTORY = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"; + + final static String CONNECTION_JNDI_NAME = "local"; + final static String CONNECTION_NAME = "amqp://guest:guest@clientid/test?brokerlist='localhost'"; + + public static final String QUEUE_JNDI_NAME = "queue"; + final static String QUEUE_NAME = "example.MyQueue"; + + public static final String TOPIC_JNDI_NAME = "topic"; + final static String TOPIC_NAME = "example.hierarical.topic"; + + private Context _ctx; + + public ConnectionSetup() throws NamingException + { + + // Set the properties ... + Properties properties = new Properties(); + properties.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY); + properties.put("connectionfactory." + CONNECTION_JNDI_NAME, CONNECTION_NAME); + properties.put("connectionfactory." + "vm", "amqp://guest:guest@clientid/test?brokerlist='vm://:1'"); + + properties.put("queue." + QUEUE_JNDI_NAME, QUEUE_NAME); + properties.put("topic." + TOPIC_JNDI_NAME, TOPIC_NAME); + // Create the initial context + _ctx = new InitialContext(properties); + + } + + public ConnectionSetup(Properties properties) throws NamingException + { + _ctx = new InitialContext(properties); + } + + public ConnectionFactory getConnectionFactory() + { + + // Perform the lookups + try + { + return (ConnectionFactory) _ctx.lookup(CONNECTION_JNDI_NAME); + } + catch (NamingException e) + { + //ignore + } + return null; + } + + public Destination getDestination(String jndiName) + { + // Perform the lookups + try + { + return (Destination) _ctx.lookup(jndiName); + } + catch (ClassCastException cce) + { + //ignore + } + catch (NamingException ne) + { + //ignore + } + return null; + } + + + public void close() + { + try + { + _ctx.close(); + } + catch (NamingException e) + { + //ignore + } + } +} diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java new file mode 100644 index 0000000000..dd936e429f --- /dev/null +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.example.pubsub; + +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; + +/** + * A simple Publisher example. + * + * The class can take two arguments. + * java Publisher <destination> <msgCount> + * Where: + * destination is either 'topic' or 'queue' (Default: topic) + * msgCount is the number of messages to send (Default : 100) + * + */ +public class Publisher extends Client +{ + int _msgCount; + + public Publisher(String destination, int msgCount) + { + super(destination); + _msgCount = msgCount; + } + + public void start() + { + try + { + _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer _producer = _session.createProducer(_destination); + + for (int msgCount = 0; msgCount < _msgCount; msgCount++) + { + _producer.send(_session.createTextMessage("msg:" + msgCount)); + System.out.println("Sent:" + msgCount); + } + + System.out.println("Done."); + _connection.close(); + } + catch (JMSException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + + + public static void main(String[] args) + { + + String destination = args.length > 2 ? args[1] : null; + + int msgCount = args.length > 2 ? Integer.parseInt(args[2]) : 100; + + new Publisher(destination, msgCount).start(); + } + +} diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Subscriber.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Subscriber.java new file mode 100644 index 0000000000..f2d736701f --- /dev/null +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Subscriber.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.example.pubsub; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import java.util.concurrent.CountDownLatch; + + +/** + * Simple client that listens for the specified number of msgs on the given Destinaton + * + * The class can take two arguments. + * java Subscriber <destination> <msgCount> + * Where: + * destination is either 'topic' or 'queue' (Default: topic) + * msgCount is the number of messages to send (Default : 100) + */ +public class Subscriber extends Client implements MessageListener +{ + + CountDownLatch _count; + + public Subscriber(String destination, int msgCount) + { + super(destination); + _count = new CountDownLatch(msgCount); + } + + + public void start() + { + try + { + _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _session.createDurableSubscriber((Topic) _setup.getDestination(ConnectionSetup.TOPIC_JNDI_NAME), + "exampleClient").setMessageListener(this); + _connection.start(); + _count.await(); + + System.out.println("Done"); + + _connection.close(); + } + catch (JMSException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + catch (InterruptedException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + + public static void main(String[] args) + { + String destination = args.length > 2 ? args[1] : null; + int msgCount = args.length > 2 ? Integer.parseInt(args[2]) : 100; + + new Subscriber(destination, msgCount).start(); + } + + public void onMessage(Message message) + { + try + { + _count.countDown(); + System.out.println("Received msg:" + ((TextMessage) message).getText()); + } + catch (JMSException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } +} diff --git a/qpid/java/client/pom.xml b/qpid/java/client/pom.xml index c36c54a10f..b51e540c2d 100644 --- a/qpid/java/client/pom.xml +++ b/qpid/java/client/pom.xml @@ -123,6 +123,50 @@ <build> <plugins> + + <plugin> + <artifactId>minijar-maven-plugin</artifactId> + <groupId>org.codehaus.mojo</groupId> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>ueberjar</goal> + </goals> + <configuration> + <stripUnusedClasses>false</stripUnusedClasses> + <name>[artifactId]-[version]-single.jar</name> + <classifier>single</classifier> + <attach>true</attach> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <executions> + <execution> + <id>attach-artifacts</id> + <phase>package</phase> + <goals> + <goal>attach-artifact</goal> + </goals> + <configuration> + <artifacts> + <artifact> + <file>target/${artifactId}-${version}-single.jar</file> + <type>jar</type> + <classifier>single</classifier> + </artifact> + </artifacts> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-antrun-plugin</artifactId> diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index d59412fdba..ddce0db7ff 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -46,7 +46,6 @@ import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.jms.FailoverPolicy; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.url.URLSyntaxException; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,7 +67,6 @@ import javax.naming.NamingException; import javax.naming.Reference; import javax.naming.Referenceable; import javax.naming.StringRefAddr; - import java.io.IOException; import java.net.ConnectException; import java.nio.channels.UnresolvedAddressException; @@ -1148,7 +1146,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } else { - _logger.info("Not a hard-error connection not closing."); + _logger.info("Not a hard-error connection not closing: " + cause.getMessage()); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java index b3fbd1f510..24f5ead2d0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java @@ -20,6 +20,14 @@ */ package org.apache.qpid.client; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.url.URLHelper; +import org.apache.qpid.url.URLSyntaxException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; @@ -27,14 +35,10 @@ import java.util.LinkedList; import java.util.List; import java.util.StringTokenizer; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.jms.ConnectionURL; -import org.apache.qpid.url.URLHelper; -import org.apache.qpid.url.URLSyntaxException; - public class AMQConnectionURL implements ConnectionURL { + private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionURL.class); + private String _url; private String _failoverMethod; private HashMap<String, String> _failoverOptions; @@ -162,7 +166,7 @@ public class AMQConnectionURL implements ConnectionURL if ((slash != 0) && (fullURL.charAt(slash - 1) == ':')) { throw URLHelper.parseError(slash - 2, fullURL.indexOf('?') - slash + 2, - "Virtual host looks like a windows path, forward slash not allowed in URL", fullURL); + "Virtual host looks like a windows path, forward slash not allowed in URL", fullURL); } else { @@ -182,7 +186,7 @@ public class AMQConnectionURL implements ConnectionURL if (colonIndex == -1) { throw URLHelper.parseError(AMQ_PROTOCOL.length() + 3, userinfo.length(), - "Null password in user information not allowed.", _url); + "Null password in user information not allowed.", _url); } else { @@ -387,7 +391,14 @@ public class AMQConnectionURL implements ConnectionURL if (_password != null) { sb.append(':'); - sb.append(_password); + if (_logger.isDebugEnabled()) + { + sb.append(_password); + } + else + { + sb.append("********"); + } } sb.append('@'); @@ -432,7 +443,7 @@ public class AMQConnectionURL implements ConnectionURL public static void main(String[] args) throws URLSyntaxException { String url2 = - "amqp://ritchiem:bob@temp?brokerlist='tcp://localhost:5672;jcp://fancyserver:3000/',failover='roundrobin'"; + "amqp://ritchiem:bob@temp?brokerlist='tcp://localhost:5672;jcp://fancyserver:3000/',failover='roundrobin'"; // "amqp://user:pass@clientid/virtualhost?brokerlist='tcp://host:1?option1=\'value\',option2=\'value\';vm://:3?option1=\'value\'',failover='method?option1=\'value\',option2='value''"; ConnectionURL connectionurl2 = new AMQConnectionURL(url2); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 879578bd6c..8f0ad3947a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -2319,6 +2319,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } + public void declareAndBind(AMQDestination amqd) + throws + AMQException + { + AMQProtocolHandler protocolHandler = getProtocolHandler(); + declareExchange(amqd, protocolHandler, false); + AMQShortString queueName = declareQueue(amqd, protocolHandler); + bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName()); + } + /** * Callers must hold the failover mutex before calling this method. * diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java index 8f9a84a3a6..862a9be8d4 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java @@ -1,18 +1,21 @@ /* * - * Copyright (c) 2006 The Apache Software Foundation + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. * */ package org.apache.qpid.client.handler; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java index 81228b4cdc..65060d44d2 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java @@ -1,18 +1,21 @@ /* * - * Copyright (c) 2006 The Apache Software Foundation + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. * */ package org.apache.qpid.client.handler; diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java index d19a6095d5..9600d1e9d3 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java @@ -1,18 +1,21 @@ /* * - * Copyright (c) 2006 The Apache Software Foundation + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. * */ package org.apache.qpid.test.unit.client.channelclose; diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java index 9c354ee260..9cde24dd92 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java @@ -21,9 +21,7 @@ package org.apache.qpid.test.unit.client.forwardall; import junit.framework.TestCase; - import org.apache.qpid.testutil.VMBrokerSetup; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,6 +32,7 @@ import org.slf4j.LoggerFactory; public class CombinedTest extends TestCase { private static final Logger _logger = LoggerFactory.getLogger(CombinedTest.class); + private int run = 0; protected void setUp() throws Exception { @@ -48,14 +47,18 @@ public class CombinedTest extends TestCase public void testForwardAll() throws Exception { - int services = 2; - ServiceCreator.start("vm://:1", services); + while (run < 10) + { + int services = 2; + ServiceCreator.start("vm://:1", services); + + _logger.info("Starting " + ++run + " client..."); - _logger.info("Starting client..."); + new Client("vm://:1", services).shutdownWhenComplete(); - new Client("vm://:1", services).shutdownWhenComplete(); - _logger.info("Completed successfully!"); + _logger.info("Completed " + run + " successfully!"); + } } public static junit.framework.Test suite() diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java index df2a38d0fc..1a45773907 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -21,12 +21,10 @@ package org.apache.qpid.test.unit.transacted; import junit.framework.TestCase; - import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.url.URLSyntaxException; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,6 +55,9 @@ public class CommitRollbackTest extends TestCase private static final Logger _logger = LoggerFactory.getLogger(CommitRollbackTest.class); private static final String BROKER = "vm://:1"; + private boolean _gotone = false; + private boolean _gottwo = false; + private boolean _gottwoRedelivered = false; protected void setUp() throws Exception { @@ -340,57 +341,98 @@ public class CommitRollbackTest extends TestCase * * @throws Exception On error */ - /*public void testSend2ThenRollback() throws Exception + public void testSend2ThenRollback() throws Exception { - assertTrue("session is not transacted", _session.getTransacted()); - assertTrue("session is not transacted", _pubSession.getTransacted()); + int run = 0; + while (run < 10) + { + run++; + _logger.info("Run:" + run); + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); - _logger.info("sending two test messages"); - _publisher.send(_pubSession.createTextMessage("1")); - _publisher.send(_pubSession.createTextMessage("2")); - _pubSession.commit(); + _logger.info("sending two test messages"); + _publisher.send(_pubSession.createTextMessage("1")); + _publisher.send(_pubSession.createTextMessage("2")); + _pubSession.commit(); - _logger.info("getting test message"); - assertEquals("1", ((TextMessage) _consumer.receive(1000)).getText()); + _logger.info("getting test message"); + assertEquals("1", ((TextMessage) _consumer.receive(1000)).getText()); - _logger.info("rolling back"); - _session.rollback(); + _logger.info("rolling back"); + _session.rollback(); - _logger.info("receiving result"); - Message result = _consumer.receive(1000); + _logger.info("receiving result"); + Message result = _consumer.receive(1000); - assertNotNull("test message was consumed and rolled back, but is gone", result); + assertNotNull("test message was consumed and rolled back, but is gone", result); + // Message Order is: - if (((TextMessage) result).getText().equals("2")) - { - assertTrue("Messasge is marked as redelivered", !result.getJMSRedelivered()); + // Send 1 , 2 + // Retrieve 1 and then rollback + // Receieve 1 (redelivered) , 2 (may or may not be redelivered??) - result = _consumer.receive(1000); - assertEquals("1", ((TextMessage) result).getText()); - assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered()); + verifyMessages(result); + + // Occassionally get message 2 first! +// assertEquals("Should get message one first", "1", ((TextMessage) result).getText()); +// assertTrue("Message is not marked as redelivered", result.getJMSRedelivered()); +// +// result = _consumer.receive(1000); +// assertEquals("Second message should be message 2", "2", ((TextMessage) result).getText()); +// assertTrue("Message is not marked as redelivered", result.getJMSRedelivered()); +// +// result = _consumer.receive(1000); +// assertNull("There should be no more messages", result); + + _session.commit(); } - else + } + + private void verifyMessages(Message result) throws JMSException + { + + if (result == null) { - assertEquals("1", ((TextMessage) result).getText()); - assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered()); - result = _consumer.receive(1000); - assertNotNull("test message was consumed and rolled back, but is gone", result); - assertEquals("2", ((TextMessage) result).getText()); - assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered()); + assertTrue("Didn't receive redelivered message one", _gotone); + assertTrue("Didn't receive message two at all", _gottwo | _gottwoRedelivered); + _gotone = false; + _gottwo = false; + _gottwoRedelivered = false; + return; } - result = _consumer.receive(1000); + if (((TextMessage) result).getText().equals("1")) + { + _logger.info("Got 1 redelivered"); + assertTrue("Message is not marked as redelivered", result.getJMSRedelivered()); + assertFalse("Already received message one", _gotone); + _gotone = true; - if (result != null) + } + else { assertEquals("2", ((TextMessage) result).getText()); - assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered()); - result = _consumer.receive(1000); + + if (result.getJMSRedelivered()) + { + _logger.info("Got 2 redelivered, message was prefetched"); + assertFalse("Already received message redelivered two", _gottwoRedelivered); + + _gottwoRedelivered = true; + } + else + { + _logger.warn("Got 2, message prefetched wasn't cleared or messages was in transit when rollback occured"); + assertFalse("Already received message two", _gottwo); + + _gottwo = true; + } } - assertNull("test message should be null", result); - }*/ + verifyMessages(_consumer.receive(1000)); + } public void testSend2ThenCloseAfter1andTryAgain() throws Exception { @@ -417,12 +459,12 @@ public class CommitRollbackTest extends TestCase _logger.info("receiving result"); - // NOTE: Both msg 1 & 2 will be marked as redelivered as they have both will have been rejected. - // Only the occasion where it is not rejected will it mean it hasn't arrived at the client yet. +// NOTE: Both msg 1 & 2 will be marked as redelivered as they have both will have been rejected. +// Only the occasion where it is not rejected will it mean it hasn't arrived at the client yet. result = _consumer.receive(1000); assertNotNull("test message was consumed and rolled back, but is gone", result); - // The first message back will be either 1 or 2 being redelivered +// The first message back will be either 1 or 2 being redelivered if (result.getJMSRedelivered()) { assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered()); |
