summaryrefslogtreecommitdiff
path: root/java/amqp-1-0-client-jms
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-04-04 21:10:07 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-04-04 21:10:07 +0000
commitf93a96fc0cccd29885383f2781f01484f7833eeb (patch)
tree2f4a81835f29bc9a61f9d4947eed8fc0654f3fe7 /java/amqp-1-0-client-jms
parent6d83a5faddfeff7c3a788907d886dae7459dbaa1 (diff)
downloadqpid-python-f93a96fc0cccd29885383f2781f01484f7833eeb.tar.gz
QPID-3933 : [Java] Add interim AMQP 1-0 implementation
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1309594 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/amqp-1-0-client-jms')
-rw-r--r--java/amqp-1-0-client-jms/build.xml29
-rw-r--r--java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java178
-rw-r--r--java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/hello.properties28
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/AmqpMessage.java32
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/BytesMessage.java26
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/Connection.java37
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/ConnectionFactory.java31
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/ConnectionMetaData.java28
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/Destination.java28
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/JavaSerializable.java24
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MapMessage.java37
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/Message.java178
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageConsumer.java36
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageProducer.java27
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/ObjectMessage.java27
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/Queue.java26
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/QueueBrowser.java30
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/QueueConnection.java30
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/QueueReceiver.java29
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/QueueSender.java29
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/QueueSession.java42
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/Session.java75
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/StreamMessage.java26
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/TemporaryDestination.java33
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/TemporaryQueue.java26
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/TemporaryTopic.java26
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/TextMessage.java26
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/Topic.java26
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/TopicConnection.java30
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/TopicPublisher.java26
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/TopicSession.java43
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/TopicSubscriber.java29
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/AmqpMessageImpl.java78
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java538
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java167
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java329
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionMetaDataImpl.java105
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DestinationImpl.java85
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java444
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java447
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java191
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java1209
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java362
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java143
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java144
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueConnectionImpl.java48
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueImpl.java56
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java57
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSenderImpl.java36
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSessionImpl.java56
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java885
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java466
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryQueueImpl.java105
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java110
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TextMessageImpl.java93
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicConnectionImpl.java48
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicImpl.java56
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicPublisherImpl.java36
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSessionImpl.java55
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java127
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/jndi/NameParserImpl.java37
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/jndi/PropertiesFileInitialContextFactory.java230
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/jndi/ReadOnlyContext.java527
63 files changed, 8568 insertions, 0 deletions
diff --git a/java/amqp-1-0-client-jms/build.xml b/java/amqp-1-0-client-jms/build.xml
new file mode 100644
index 0000000000..4a685d7106
--- /dev/null
+++ b/java/amqp-1-0-client-jms/build.xml
@@ -0,0 +1,29 @@
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+ -->
+<project name="AMQP 1.0 JMS Client" default="build">
+
+ <property name="module.genpom" value="true"/>
+ <property name="module.depends" value="amqp-1-0-common amqp-1-0-client"/>
+
+
+ <import file="../module.xml"/>
+
+</project>
diff --git a/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java b/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java
new file mode 100644
index 0000000000..61317e7f37
--- /dev/null
+++ b/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java
@@ -0,0 +1,178 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.jms.example;
+
+import javax.jms.*;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.Properties;
+
+
+public class Hello
+{
+
+ public Hello()
+ {
+ }
+
+ public static void main(String[] args)
+ {
+ try
+ {
+ Class.forName("org.apache.qpid.amqp_1_0.jms.jndi.PropertiesFileInitialContextFactory");
+
+ Hashtable env = new Hashtable();
+ env.put("java.naming.provider.url", "hello.properties");
+ env.put("java.naming.factory.initial", "org.apache.qpid.amqp_1_0.jms.jndi.PropertiesFileInitialContextFactory");
+
+ Context context = new InitialContext(env);
+
+ ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("localhost");
+ Connection connection = connectionFactory.createConnection();
+
+ Session producersession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = (Queue) context.lookup("queue");
+
+
+ Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer messageConsumer = consumerSession.createConsumer(queue, "hello='true' and 7");
+
+ messageConsumer.setMessageListener(new MessageListener()
+ {
+ public void onMessage(final Message message)
+ {
+ try
+ {
+
+ if(message instanceof TextMessage)
+ {
+ System.out.println("Received text Message:");
+ System.out.println("======================");
+ System.out.println(((TextMessage) message).getText());
+ }
+ else if(message instanceof MapMessage)
+ {
+ System.out.println("Received Map Message:");
+ System.out.println("=====================");
+
+
+ MapMessage mapmessage = (MapMessage) message;
+
+ Enumeration names = mapmessage.getMapNames();
+
+ while(names.hasMoreElements())
+ {
+ String name = (String) names.nextElement();
+ System.out.println(name + " -> " + mapmessage.getObject(name));
+ }
+
+ }
+ else if(message instanceof BytesMessage)
+ {
+ System.out.println("Received Bytes Message:");
+ System.out.println("=======================");
+ System.out.println(((BytesMessage) message).readUTF());
+ }
+ else if(message instanceof StreamMessage)
+ {
+ System.out.println("Received Stream Message:");
+ System.out.println("========================");
+ StreamMessage streamMessage = (StreamMessage)message;
+ Object o = streamMessage.readObject();
+ System.out.println(o.getClass().getName() + ": " + o);
+ o = streamMessage.readObject();
+ System.out.println(o.getClass().getName() + ": " + o);
+ o = streamMessage.readObject();
+ System.out.println(o.getClass().getName() + ": " + o);
+
+ }
+ else if(message instanceof ObjectMessage)
+ {
+ System.out.println("Received Object Message:");
+ System.out.println("========================");
+ ObjectMessage objectMessage = (ObjectMessage)message;
+ Object o = objectMessage.getObject();
+ System.out.println(o.getClass().getName() + ": " + o);
+ }
+ else
+ {
+ System.out.println("Received Message " + message.getClass().getName());
+ }
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace(); //TODO
+ }
+
+ }
+ });
+
+ connection.start();
+
+
+ MessageProducer messageProducer = producersession.createProducer(queue);
+ TextMessage message = producersession.createTextMessage("Hello world!");
+ message.setJMSType("Hello");
+ message.setStringProperty("hello","true");
+ messageProducer.send(message);
+ /*
+ MapMessage mapmessage = producersession.createMapMessage();
+ mapmessage.setBoolean("mybool", true);
+ mapmessage.setString("mystring", "hello");
+ mapmessage.setLong("mylong", -25L);
+
+
+ messageProducer.send(mapmessage);
+
+ BytesMessage bytesMessage = producersession.createBytesMessage();
+ bytesMessage.writeUTF("This is a bytes message");
+
+ messageProducer.send(bytesMessage);
+
+ ObjectMessage objectMessage = producersession.createObjectMessage();
+ objectMessage.setObject(new Double("3.14159265358979323846264338327950288"));
+
+ messageProducer.send(objectMessage);
+
+/* StreamMessage streamMessage = producersession.createStreamMessage();
+ streamMessage.writeBoolean(true);
+ streamMessage.writeLong(18031974L);
+ streamMessage.writeString("this is a stream Message");
+ streamMessage.writeChar('£');
+ messageProducer.send(streamMessage);
+*/
+ Thread.sleep(50000L);
+
+ connection.close();
+ context.close();
+ }
+ catch (Exception exp)
+ {
+ exp.printStackTrace();
+ }
+ }
+}
diff --git a/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/hello.properties b/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/hello.properties
new file mode 100644
index 0000000000..930c2c15cc
--- /dev/null
+++ b/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/hello.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+java.naming.factory.initial = org.apache.qpid.amqp_1_0.jms.jndi.PropertiesFileInitialContextFactory
+
+# register some connection factories
+# connectionfactory.[jndiname] = [ConnectionURL]
+connectionfactory.localhost = http://guest:guest@localhost/test?cliendId='test-client'
+
+
+# Register an AMQP destination in JNDI
+# destination.[jniName] = [Address Format]
+queue.queue = queue
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/AmqpMessage.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/AmqpMessage.java
new file mode 100644
index 0000000000..de42b36ef2
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/AmqpMessage.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms;
+
+import org.apache.qpid.amqp_1_0.type.Section;
+
+import java.util.ListIterator;
+
+public interface AmqpMessage extends Message
+{
+ int getSectionCount();
+
+ Section getSection(int position);
+
+ ListIterator<Section> sectionIterator();
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/BytesMessage.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/BytesMessage.java
new file mode 100644
index 0000000000..3475319fe4
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/BytesMessage.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.jms;
+
+public interface BytesMessage extends Message, javax.jms.BytesMessage
+{
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/Connection.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/Connection.java
new file mode 100644
index 0000000000..97447917f6
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/Connection.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.jms;
+
+import javax.jms.JMSException;
+
+
+public interface Connection extends javax.jms.Connection
+{
+
+ ConnectionMetaData getMetaData() throws JMSException;
+
+ Session createSession(boolean transacted, int acknowledgeMode) throws JMSException;
+
+ Session createSession(Session.AcknowledgeMode acknowledgeMode) throws JMSException;
+
+
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/ConnectionFactory.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/ConnectionFactory.java
new file mode 100644
index 0000000000..752993b229
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/ConnectionFactory.java
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.jms;
+
+
+import javax.jms.JMSException;
+
+public interface ConnectionFactory extends javax.jms.ConnectionFactory
+{
+ Connection createConnection() throws JMSException;
+
+ Connection createConnection(String username, String password) throws JMSException;
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/ConnectionMetaData.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/ConnectionMetaData.java
new file mode 100644
index 0000000000..3802d6e416
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/ConnectionMetaData.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms;
+
+public interface ConnectionMetaData extends javax.jms.ConnectionMetaData
+{
+ int getAMQPMajorVersion();
+
+ int getAMQPMinorVersion();
+
+ int getAMQPRevisionVersion();
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/Destination.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/Destination.java
new file mode 100644
index 0000000000..0b467b3c99
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/Destination.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.jms;
+
+public interface Destination extends javax.jms.Destination
+{
+ public String getAddress();
+
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/JavaSerializable.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/JavaSerializable.java
new file mode 100644
index 0000000000..0c9179bc2b
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/JavaSerializable.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.amqp_1_0.jms;
+
+public interface JavaSerializable
+{
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MapMessage.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MapMessage.java
new file mode 100644
index 0000000000..81754ecc93
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MapMessage.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.jms;
+
+import javax.jms.JMSException;
+import java.util.Set;
+
+public interface MapMessage extends Message, javax.jms.MapMessage
+{
+ public Object get(Object key) throws JMSException;
+
+ public Object put(Object key, Object val) throws JMSException;
+
+ public boolean itemExists(Object key) throws JMSException;
+
+ Set<Object> keySet() throws JMSException;
+
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/Message.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/Message.java
new file mode 100644
index 0000000000..1481c54ee1
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/Message.java
@@ -0,0 +1,178 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.jms;
+
+import org.apache.qpid.amqp_1_0.messaging.MessageAttributes;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.UnsignedByte;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.UnsignedLong;
+import org.apache.qpid.amqp_1_0.type.UnsignedShort;
+
+import javax.jms.JMSException;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+
+public interface Message extends javax.jms.Message
+{
+
+ Destination getJMSReplyTo() throws JMSException;
+
+ Destination getJMSDestination() throws JMSException;
+
+ // properties can be keyed by any valid apache.qpid.amqp_1_0 datatype, not just strings
+
+ boolean propertyExists(Object name) throws JMSException;
+
+ boolean getBooleanProperty(Object name) throws JMSException;
+
+ byte getByteProperty(Object name) throws JMSException;
+
+ short getShortProperty(Object name) throws JMSException;
+
+ int getIntProperty(Object name) throws JMSException;
+
+ long getLongProperty(Object name) throws JMSException;
+
+ float getFloatProperty(Object name) throws JMSException;
+
+ double getDoubleProperty(Object name) throws JMSException;
+
+ String getStringProperty(Object name) throws JMSException;
+
+ Object getObjectProperty(Object name) throws JMSException;
+
+ // apache.qpid.amqp_1_0 allows for lists, maps, and unsigned integral data types
+
+ List<Object> getListProperty(Object name) throws JMSException;
+
+ Map<Object,Object> getMapProperty(Object name) throws JMSException;
+
+ UnsignedByte getUnsignedByteProperty(Object name) throws JMSException;
+
+ UnsignedShort getUnsignedShortProperty(Object name) throws JMSException;
+
+ UnsignedInteger getUnsignedIntProperty(Object name) throws JMSException;
+
+ UnsignedLong getUnsignedLongProperty(Object name) throws JMSException;
+
+ // properties can be keyed by any valid apache.qpid.amqp_1_0 datatype, not just strings
+
+ void setBooleanProperty(Object name, boolean b) throws JMSException;
+
+ void setByteProperty(Object name, byte b) throws JMSException;
+
+ void setShortProperty(Object name, short i) throws JMSException;
+
+ void setIntProperty(Object name, int i) throws JMSException;
+
+ void setLongProperty(Object name, long l) throws JMSException;
+
+ void setFloatProperty(Object name, float v) throws JMSException;
+
+ void setDoubleProperty(Object name, double v) throws JMSException;
+
+ void setStringProperty(Object name, String s1) throws JMSException;
+
+ void setObjectProperty(Object name, Object o) throws JMSException;
+
+ // apache.qpid.amqp_1_0 allows for lists, maps, and unsigned integral data types
+
+ void setListProperty(Object name, List<Object> list) throws JMSException;
+
+ void setMapProperty(Object name, Map<Object,Object> map) throws JMSException;
+
+ void setUnsignedByteProperty(Object name, UnsignedByte b) throws JMSException;
+
+ void setUnsignedShortProperty(Object name, UnsignedShort s) throws JMSException;
+
+ void setUnsignedIntProperty(Object name, UnsignedInteger i) throws JMSException;
+
+ void setUnsignedLongProperty(Object name, UnsignedLong l) throws JMSException;
+
+ // delegation accessors for Header section
+
+ UnsignedInteger getDeliveryFailures();
+
+ void setDeliveryFailures(UnsignedInteger failures);
+
+ MessageAttributes getHeaderMessageAttrs();
+
+ void setHeaderMessageAttrs(MessageAttributes messageAttrs);
+
+ MessageAttributes getHeaderDeliveryAttrs();
+
+ void setHeaderDeliveryAttrs(MessageAttributes deliveryAttrs);
+
+ Boolean getDurable();
+
+ void setDurable(Boolean durable);
+
+ UnsignedByte getPriority();
+
+ void setPriority(UnsignedByte priority);
+
+ Date getTransmitTime();
+
+ void setTransmitTime(Date transmitTime);
+
+ UnsignedInteger getTtl();
+
+ void setTtl(UnsignedInteger ttl);
+
+ UnsignedInteger getFormerAcquirers();
+
+ void setFormerAcquirers(UnsignedInteger formerAcquirers);
+
+ // delegation accessors for Properties section
+
+ Object getMessageId();
+
+ void setMessageId(Object messageId);
+
+ Binary getUserId();
+
+ void setUserId(Binary userId);
+
+ String getTo();
+
+ void setTo(String to);
+
+ String getSubject();
+
+ void setSubject(String subject);
+
+ String getReplyTo();
+
+ void setReplyTo(String replyTo);
+
+ Object getCorrelationId();
+
+ void setCorrelationId(Binary correlationId);
+
+ Symbol getContentType();
+
+ void setContentType(Symbol contentType);
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageConsumer.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageConsumer.java
new file mode 100644
index 0000000000..fe235f098f
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageConsumer.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.jms;
+
+import javax.jms.JMSException;
+
+
+public interface MessageConsumer extends javax.jms.MessageConsumer
+{
+
+ Message receive() throws JMSException;
+
+ Message receive(long l) throws JMSException;
+
+ Message receiveNoWait() throws JMSException;
+
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageProducer.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageProducer.java
new file mode 100644
index 0000000000..98987c2409
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageProducer.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.jms;
+
+
+public interface MessageProducer extends javax.jms.MessageProducer
+{
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/ObjectMessage.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/ObjectMessage.java
new file mode 100644
index 0000000000..8b59aa284a
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/ObjectMessage.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.jms;
+
+public interface ObjectMessage extends Message, javax.jms.ObjectMessage
+{
+
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/Queue.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/Queue.java
new file mode 100644
index 0000000000..40beac08d1
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/Queue.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.jms;
+
+public interface Queue extends Destination, javax.jms.Queue
+{
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/QueueBrowser.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/QueueBrowser.java
new file mode 100644
index 0000000000..5bb88569a1
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/QueueBrowser.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.jms;
+
+import javax.jms.JMSException;
+
+public interface QueueBrowser extends javax.jms.QueueBrowser
+{
+ Queue getQueue() throws JMSException;
+
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/QueueConnection.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/QueueConnection.java
new file mode 100644
index 0000000000..1375f1c5f2
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/QueueConnection.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.jms;
+
+import javax.jms.JMSException;
+
+public interface QueueConnection extends javax.jms.QueueConnection, Connection
+{
+ QueueSession createQueueSession(boolean b, int i) throws JMSException;
+
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/QueueReceiver.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/QueueReceiver.java
new file mode 100644
index 0000000000..9cc1a36ca7
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/QueueReceiver.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.jms;
+
+import javax.jms.JMSException;
+
+public interface QueueReceiver extends MessageConsumer, javax.jms.QueueReceiver
+{
+ Queue getQueue() throws JMSException;
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/QueueSender.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/QueueSender.java
new file mode 100644
index 0000000000..824e48364c
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/QueueSender.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.jms;
+
+import javax.jms.JMSException;
+
+public interface QueueSender extends MessageProducer, javax.jms.QueueSender
+{
+ Queue getQueue() throws JMSException;
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/QueueSession.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/QueueSession.java
new file mode 100644
index 0000000000..28d30f60c5
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/QueueSession.java
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.jms;
+
+import javax.jms.JMSException;
+
+
+public interface QueueSession extends Session, javax.jms.QueueSession
+{
+ Queue createQueue(String s) throws JMSException;
+
+ QueueReceiver createReceiver(javax.jms.Queue queue) throws JMSException;
+
+ QueueReceiver createReceiver(javax.jms.Queue queue, String s) throws JMSException;
+
+ QueueSender createSender(javax.jms.Queue queue) throws JMSException;
+
+ QueueBrowser createBrowser(javax.jms.Queue queue) throws JMSException;
+
+ QueueBrowser createBrowser(javax.jms.Queue queue, String s) throws JMSException;
+
+ TemporaryQueue createTemporaryQueue() throws JMSException;
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/Session.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/Session.java
new file mode 100644
index 0000000000..0ce9baecea
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/Session.java
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.jms;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.Topic;
+
+
+import java.io.Serializable;
+
+public interface Session extends javax.jms.Session
+{
+ static enum AcknowledgeMode { SESSION_TRANSACTED, AUTO_ACKNOWLEDGE, CLIENT_ACKNOWLEDGE, DUPS_OK_ACKNOWLEDGE };
+
+ BytesMessage createBytesMessage() throws JMSException;
+
+ MapMessage createMapMessage() throws JMSException;
+
+ Message createMessage() throws JMSException;
+
+ ObjectMessage createObjectMessage() throws JMSException;
+
+ ObjectMessage createObjectMessage(Serializable serializable) throws JMSException;
+
+ StreamMessage createStreamMessage() throws JMSException;
+
+ TextMessage createTextMessage() throws JMSException;
+
+ TextMessage createTextMessage(String s) throws JMSException;
+
+ AmqpMessage createAmqpMessage() throws JMSException;
+
+ MessageProducer createProducer(Destination destination) throws JMSException;
+
+ MessageConsumer createConsumer(Destination destination) throws JMSException;
+
+ MessageConsumer createConsumer(Destination destination, String s) throws JMSException;
+
+ MessageConsumer createConsumer(Destination destination, String s, boolean b) throws JMSException;
+
+ TopicSubscriber createDurableSubscriber(Topic topic, String s) throws JMSException;
+
+ TopicSubscriber createDurableSubscriber(Topic topic, String s, String s1, boolean b)
+ throws JMSException;
+
+ QueueBrowser createBrowser(Queue queue) throws JMSException;
+
+ QueueBrowser createBrowser(Queue queue, String s) throws JMSException;
+
+ TemporaryQueue createTemporaryQueue() throws JMSException;
+
+ TemporaryTopic createTemporaryTopic() throws JMSException;
+
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/StreamMessage.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/StreamMessage.java
new file mode 100644
index 0000000000..d207a708c9
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/StreamMessage.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.jms;
+
+public interface StreamMessage extends Message, javax.jms.StreamMessage
+{
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/TemporaryDestination.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/TemporaryDestination.java
new file mode 100644
index 0000000000..025a1af806
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/TemporaryDestination.java
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.jms;
+
+public interface TemporaryDestination
+{
+ Session getSession();
+
+ boolean isDeleted();
+
+ void addConsumer(MessageConsumer consumer);
+
+ void removeConsumer(MessageConsumer consumer);
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/TemporaryQueue.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/TemporaryQueue.java
new file mode 100644
index 0000000000..182f9a4c85
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/TemporaryQueue.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.jms;
+
+public interface TemporaryQueue extends Queue, TemporaryDestination, javax.jms.TemporaryQueue
+{
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/TemporaryTopic.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/TemporaryTopic.java
new file mode 100644
index 0000000000..277681079b
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/TemporaryTopic.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.jms;
+
+public interface TemporaryTopic extends Topic, TemporaryDestination, javax.jms.TemporaryTopic
+{
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/TextMessage.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/TextMessage.java
new file mode 100644
index 0000000000..1dbcd5f84f
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/TextMessage.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.jms;
+
+public interface TextMessage extends Message, javax.jms.TextMessage
+{
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/Topic.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/Topic.java
new file mode 100644
index 0000000000..755d698a04
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/Topic.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.jms;
+
+public interface Topic extends Destination, javax.jms.Topic
+{
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/TopicConnection.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/TopicConnection.java
new file mode 100644
index 0000000000..1431c7d2cb
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/TopicConnection.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.jms;
+
+import javax.jms.JMSException;
+
+public interface TopicConnection extends Connection, javax.jms.TopicConnection
+{
+ TopicSession createTopicSession(boolean b, int i) throws JMSException;
+
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/TopicPublisher.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/TopicPublisher.java
new file mode 100644
index 0000000000..dde6c8b606
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/TopicPublisher.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.jms;
+
+public interface TopicPublisher extends MessageProducer, javax.jms.TopicPublisher
+{
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/TopicSession.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/TopicSession.java
new file mode 100644
index 0000000000..2c02ff199e
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/TopicSession.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.jms;
+
+import javax.jms.JMSException;
+
+
+public interface TopicSession extends Session,javax.jms.TopicSession
+{
+ Topic createTopic(String s) throws JMSException;
+
+ TopicSubscriber createSubscriber(javax.jms.Topic topic) throws JMSException;
+
+ TopicSubscriber createSubscriber(javax.jms.Topic topic, String s, boolean b) throws JMSException;
+
+ TopicSubscriber createDurableSubscriber(javax.jms.Topic topic, String s) throws JMSException;
+
+ TopicSubscriber createDurableSubscriber(javax.jms.Topic topic, String s, String s1, boolean b)
+ throws JMSException;
+
+ TopicPublisher createPublisher(javax.jms.Topic topic) throws JMSException;
+
+ TemporaryTopic createTemporaryTopic() throws JMSException;
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/TopicSubscriber.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/TopicSubscriber.java
new file mode 100644
index 0000000000..00e5e9aca9
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/TopicSubscriber.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.jms;
+
+import javax.jms.JMSException;
+
+public interface TopicSubscriber extends MessageConsumer, javax.jms.TopicSubscriber
+{
+ Topic getTopic() throws JMSException;
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/AmqpMessageImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/AmqpMessageImpl.java
new file mode 100644
index 0000000000..0ca629db7e
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/AmqpMessageImpl.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.jms.AmqpMessage;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
+import org.apache.qpid.amqp_1_0.type.messaging.Footer;
+import org.apache.qpid.amqp_1_0.type.messaging.Header;
+import org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+
+import java.util.*;
+
+public class AmqpMessageImpl extends MessageImpl implements AmqpMessage
+{
+ private List<Section> _sections;
+
+ protected AmqpMessageImpl(Header header, MessageAnnotations messageAnnotations, Properties properties, ApplicationProperties appProperties, List<Section> sections,
+ Footer footer, SessionImpl session)
+ {
+ super(header, messageAnnotations, properties, appProperties, footer, session);
+ _sections = sections;
+ }
+
+ protected AmqpMessageImpl(final SessionImpl session)
+ {
+ super(new Header(), new MessageAnnotations(new HashMap()), new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
+ session);
+ _sections = new ArrayList<Section>();
+ }
+
+ public int getSectionCount()
+ {
+ return _sections.size();
+ }
+
+ public Section getSection(final int position)
+ {
+ return _sections.get(position);
+ }
+
+ public ListIterator<Section> sectionIterator()
+ {
+ return _sections.listIterator();
+ }
+
+ @Override Collection<Section> getSections()
+ {
+ List<Section> sections = new ArrayList<Section>();
+ sections.add(getHeader());
+ if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty())
+ {
+ sections.add(getMessageAnnotations());
+ }
+ sections.add(getProperties());
+ sections.add(getApplicationProperties());
+ sections.addAll(_sections);
+ sections.add(getFooter());
+ return sections;
+ }
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java
new file mode 100644
index 0000000000..83cc8eafb5
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.jms.BytesMessage;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
+import java.io.*;
+import java.util.*;
+
+public class BytesMessageImpl extends MessageImpl implements BytesMessage
+{
+ private DataInputStream _dataAsInput;
+ private DataOutputStream _dataAsOutput;
+ private ByteArrayOutputStream _bytesOut;
+ private Data _dataIn;
+
+ // message created for reading
+ protected BytesMessageImpl(Header header, MessageAnnotations messageAnnotations, Properties properties, ApplicationProperties appProperties, Data data,
+ Footer footer, SessionImpl session)
+ {
+ super(header, messageAnnotations, properties, appProperties, footer, session);
+ _dataIn = data;
+ final Binary dataBuffer = data.getValue();
+ _dataAsInput = new DataInputStream(new ByteArrayInputStream(dataBuffer.getArray(),dataBuffer.getArrayOffset(),dataBuffer.getLength()));
+
+ }
+
+ // message created to be sent
+ protected BytesMessageImpl(final SessionImpl session)
+ {
+ super(new Header(),
+ new MessageAnnotations(new HashMap()),
+ new Properties(),
+ new ApplicationProperties(new HashMap()),
+ new Footer(Collections.EMPTY_MAP),
+ session);
+
+ _bytesOut = new ByteArrayOutputStream();
+ _dataAsOutput = new DataOutputStream(_bytesOut);
+ }
+
+
+ private Data getDataSection()
+ {
+ if(_bytesOut != null)
+ {
+ return new Data(new Binary(_bytesOut.toByteArray()));
+ }
+ else
+ {
+ return _dataIn;
+ }
+ }
+
+ @Override
+ protected boolean isReadOnly()
+ {
+ return _dataIn != null;
+ }
+
+ public long getBodyLength() throws JMSException
+ {
+ checkReadable();
+ return getDataSection().getValue().getLength();
+ }
+
+ public boolean readBoolean() throws JMSException
+ {
+ checkReadable();
+ try
+ {
+ return _dataAsInput.readBoolean();
+ }
+ catch (IOException e)
+ {
+ throw handleInputException(e);
+ }
+ }
+
+
+ public byte readByte() throws JMSException
+ {
+ checkReadable();
+ try
+ {
+ return _dataAsInput.readByte();
+ }
+ catch (IOException e)
+ {
+ throw handleInputException(e);
+ }
+ }
+
+ public int readUnsignedByte() throws JMSException
+ {
+ checkReadable();
+ try
+ {
+ return _dataAsInput.readUnsignedByte();
+ }
+ catch (IOException e)
+ {
+ throw handleInputException(e);
+ }
+ }
+
+ public short readShort() throws JMSException
+ {
+ checkReadable();
+ try
+ {
+ return _dataAsInput.readShort();
+ }
+ catch (IOException e)
+ {
+ throw handleInputException(e);
+ }
+ }
+
+ public int readUnsignedShort() throws JMSException
+ {
+ checkReadable();
+ try
+ {
+ return _dataAsInput.readUnsignedShort();
+ }
+ catch (IOException e)
+ {
+ throw handleInputException(e);
+ }
+ }
+
+ public char readChar() throws JMSException
+ {
+ checkReadable();
+ try
+ {
+ return _dataAsInput.readChar();
+ }
+ catch (IOException e)
+ {
+ throw handleInputException(e);
+ }
+ }
+
+ public int readInt() throws JMSException
+ {
+ checkReadable();
+ try
+ {
+ return _dataAsInput.readInt();
+ }
+ catch (IOException e)
+ {
+ throw handleInputException(e);
+ }
+ }
+
+ public long readLong() throws JMSException
+ {
+ checkReadable();
+ try
+ {
+ return _dataAsInput.readLong();
+ }
+ catch (IOException e)
+ {
+ throw handleInputException(e);
+ }
+ }
+
+ public float readFloat() throws JMSException
+ {
+ checkReadable();
+ try
+ {
+ return _dataAsInput.readFloat();
+ }
+ catch (IOException e)
+ {
+ throw handleInputException(e);
+ }
+ }
+
+ public double readDouble() throws JMSException
+ {
+ checkReadable();
+ try
+ {
+ return _dataAsInput.readDouble();
+ }
+ catch (IOException e)
+ {
+ throw handleInputException(e);
+ }
+ }
+
+ public String readUTF() throws JMSException
+ {
+ checkReadable();
+ try
+ {
+ return _dataAsInput.readUTF();
+ }
+ catch (IOException e)
+ {
+ throw handleInputException(e);
+ }
+ }
+
+ public int readBytes(byte[] bytes) throws JMSException
+ {
+
+ return readBytes(bytes, bytes.length);
+ }
+
+ public int readBytes(byte[] bytes, int length) throws JMSException
+ {
+ checkReadable();
+
+ try
+ {
+ int offset = 0;
+ while(offset < length)
+ {
+ int read = _dataAsInput.read(bytes, offset, length - offset);
+ if(read < 0)
+ {
+ break;
+ }
+ offset += read;
+ }
+
+ if(offset == 0 && length != 0)
+ {
+ return -1;
+ }
+ else
+ {
+ return offset;
+ }
+ }
+ catch (IOException e)
+ {
+ throw handleInputException(e);
+ }
+ }
+
+ public void writeBoolean(boolean b) throws JMSException
+ {
+ checkWritable();
+ try
+ {
+ _dataAsOutput.writeBoolean(b);
+ }
+ catch (IOException e)
+ {
+ throw handleOutputException(e);
+ }
+
+ }
+
+ public void writeByte(byte b) throws JMSException
+ {
+ checkWritable();
+ try
+ {
+ _dataAsOutput.writeByte(b);
+ }
+ catch (IOException e)
+ {
+ throw handleOutputException(e);
+ }
+ }
+
+ public void writeShort(short i) throws JMSException
+ {
+ checkWritable();
+ try
+ {
+ _dataAsOutput.writeShort(i);
+ }
+ catch (IOException e)
+ {
+ throw handleOutputException(e);
+ }
+ }
+
+ public void writeChar(char c) throws JMSException
+ {
+ checkWritable();
+ try
+ {
+ _dataAsOutput.writeChar(c);
+ }
+ catch (IOException e)
+ {
+ throw handleOutputException(e);
+ }
+ }
+
+ public void writeInt(int i) throws JMSException
+ {
+ checkWritable();
+ try
+ {
+ _dataAsOutput.writeInt(i);
+ }
+ catch (IOException e)
+ {
+ throw handleOutputException(e);
+ }
+ }
+
+ public void writeLong(long l) throws JMSException
+ {
+ checkWritable();
+ try
+ {
+ _dataAsOutput.writeLong(l);
+ }
+ catch (IOException e)
+ {
+ throw handleOutputException(e);
+ }
+ }
+
+ public void writeFloat(float v) throws JMSException
+ {
+ checkWritable();
+ try
+ {
+ _dataAsOutput.writeFloat(v);
+ }
+ catch (IOException e)
+ {
+ throw handleOutputException(e);
+ }
+ }
+
+ public void writeDouble(double v) throws JMSException
+ {
+ checkWritable();
+ try
+ {
+ _dataAsOutput.writeDouble(v);
+ }
+ catch (IOException e)
+ {
+ throw handleOutputException(e);
+ }
+ }
+
+ public void writeUTF(String s) throws JMSException
+ {
+ checkWritable();
+ try
+ {
+ _dataAsOutput.writeUTF(s);
+ }
+ catch (IOException e)
+ {
+ throw handleOutputException(e);
+ }
+ }
+
+ public void writeBytes(byte[] bytes) throws JMSException
+ {
+ checkWritable();
+ try
+ {
+ _dataAsOutput.write(bytes);
+ }
+ catch (IOException e)
+ {
+ throw handleOutputException(e);
+ }
+ }
+
+ public void writeBytes(byte[] bytes, int off, int len) throws JMSException
+ {
+ checkWritable();
+ try
+ {
+ _dataAsOutput.write(bytes, off, len);
+ }
+ catch (IOException e)
+ {
+ throw handleOutputException(e);
+ }
+ }
+
+ public void writeObject(Object o) throws JMSException
+ {
+ checkWritable();
+ if(o == null)
+ {
+ throw new NullPointerException("Value passed to BytesMessage.writeObject() must be non null");
+ }
+ else if (o instanceof Boolean)
+ {
+ writeBoolean((Boolean)o);
+ }
+ else if (o instanceof Byte)
+ {
+ writeByte((Byte)o);
+ }
+ else if (o instanceof Short)
+ {
+ writeShort((Short)o);
+ }
+ else if (o instanceof Character)
+ {
+ writeChar((Character)o);
+ }
+ else if (o instanceof Integer)
+ {
+ writeInt((Integer)o);
+ }
+ else if(o instanceof Long)
+ {
+ writeLong((Long)o);
+ }
+ else if(o instanceof Float)
+ {
+ writeFloat((Float) o);
+ }
+ else if(o instanceof Double)
+ {
+ writeDouble((Double) o);
+ }
+ else if(o instanceof String)
+ {
+ writeUTF((String) o);
+ }
+ else if(o instanceof byte[])
+ {
+ writeBytes((byte[])o);
+ }
+ else
+ {
+ throw new MessageFormatException("Value passed to BytesMessage.writeObject() must be of primitive type. Type passed was " + o.getClass().getName());
+ }
+ }
+
+ public void reset() throws JMSException
+ {
+ if(_bytesOut != null)
+ {
+ byte[] data = _bytesOut.toByteArray();
+ _dataIn = new Data(new Binary(data));
+ _dataAsInput = new DataInputStream(new ByteArrayInputStream(data));
+ _dataAsOutput = null;
+ _bytesOut = null;
+ }
+ else
+ {
+
+ final Binary dataBuffer = _dataIn.getValue();
+ _dataAsInput = new DataInputStream(new ByteArrayInputStream(dataBuffer.getArray(),dataBuffer.getArrayOffset(),dataBuffer.getLength()));
+
+ }
+ }
+
+ private JMSException handleInputException(final IOException e)
+ {
+ JMSException ex;
+ if(e instanceof EOFException)
+ {
+ ex = new MessageEOFException(e.getMessage());
+ }
+ else
+ {
+ ex = new MessageFormatException(e.getMessage());
+ }
+ ex.initCause(e);
+ ex.setLinkedException(e);
+ return ex;
+ }
+
+ private JMSException handleOutputException(final IOException e)
+ {
+ JMSException ex = new JMSException(e.getMessage());
+ ex.initCause(e);
+ ex.setLinkedException(e);
+ return ex;
+ }
+
+ @Override
+ public void clearBody() throws JMSException
+ {
+ super.clearBody();
+ _bytesOut = new ByteArrayOutputStream();
+ _dataAsOutput = new DataOutputStream(_bytesOut);
+ _dataAsInput = null;
+ _dataIn = null;
+ }
+
+ @Override Collection<Section> getSections()
+ {
+ List<Section> sections = new ArrayList<Section>();
+ sections.add(getHeader());
+ if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty())
+ {
+ sections.add(getMessageAnnotations());
+ }
+ sections.add(getProperties());
+ sections.add(getApplicationProperties());
+ sections.add(getDataSection());
+ sections.add(getFooter());
+ return sections;
+ }
+
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
new file mode 100644
index 0000000000..66bf91a53f
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
@@ -0,0 +1,167 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.jms.Connection;
+import org.apache.qpid.amqp_1_0.jms.ConnectionFactory;
+
+import javax.jms.JMSException;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnectionFactory, QueueConnectionFactory
+{
+ private String _host;
+ private int _port;
+ private String _username;
+ private String _password;
+ private String _clientId;
+ private String _remoteHost;
+ private boolean _ssl;
+
+
+ public ConnectionFactoryImpl(final String host,
+ final int port,
+ final String username,
+ final String password,
+ final String clientId)
+ {
+ this(host,port,username,password,clientId,false);
+ }
+
+ public ConnectionFactoryImpl(final String host,
+ final int port,
+ final String username,
+ final String password,
+ final String clientId,
+ final boolean ssl)
+ {
+ this(host,port,username,password,clientId,null,ssl);
+ }
+
+ public ConnectionFactoryImpl(final String host,
+ final int port,
+ final String username,
+ final String password,
+ final String clientId,
+ final String remoteHost,
+ final boolean ssl)
+ {
+ _host = host;
+ _port = port;
+ _username = username;
+ _password = password;
+ _clientId = clientId;
+ _remoteHost = remoteHost;
+ _ssl = ssl;
+ }
+
+ public ConnectionImpl createConnection() throws JMSException
+ {
+ return new ConnectionImpl(_host, _port, _username, _password, _clientId, _remoteHost, _ssl);
+ }
+
+ public ConnectionImpl createConnection(final String username, final String password) throws JMSException
+ {
+ return new ConnectionImpl(_host, _port, username, password, _clientId, _remoteHost, _ssl);
+ }
+
+ public static ConnectionFactoryImpl createFromURL(final String urlString) throws MalformedURLException
+ {
+ URL url = new URL(urlString);
+ String host = url.getHost();
+ int port = url.getPort();
+ if(port == -1)
+ {
+ port = 5672;
+ }
+ String userInfo = url.getUserInfo();
+ String username = null;
+ String password = null;
+ String clientId = null;
+ String remoteHost = null;
+ boolean ssl = false;
+ if(userInfo != null)
+ {
+ String[] components = userInfo.split(":",2);
+ username = components[0];
+ if(components.length == 2)
+ {
+ password = components[1];
+ }
+ }
+ String query = url.getQuery();
+ if(query != null)
+ {
+ for(String param : query.split("&"))
+ {
+ String[] keyValuePair = param.split("=",2);
+ if(keyValuePair[0].equalsIgnoreCase("clientid"))
+ {
+ clientId = keyValuePair[1];
+ }
+ else if(keyValuePair[0].equalsIgnoreCase("ssl"))
+ {
+ ssl = Boolean.valueOf(keyValuePair[1]);
+ }
+ else if(keyValuePair[0].equalsIgnoreCase("remote-host"))
+ {
+ remoteHost = keyValuePair[1];
+ }
+ }
+ }
+
+ return new ConnectionFactoryImpl(host, port, username, password, clientId, remoteHost, ssl);
+
+ }
+
+ public QueueConnection createQueueConnection() throws JMSException
+ {
+ final ConnectionImpl connection = createConnection();
+ connection.setQueueConnection(true);
+ return connection;
+ }
+
+ public QueueConnection createQueueConnection(final String username, final String password) throws JMSException
+ {
+ final ConnectionImpl connection = createConnection(username, password);
+ connection.setQueueConnection(true);
+ return connection;
+ }
+
+ public TopicConnection createTopicConnection() throws JMSException
+ {
+ final ConnectionImpl connection = createConnection();
+ connection.setTopicConnection(true);
+ return connection;
+ }
+
+ public TopicConnection createTopicConnection(final String username, final String password) throws JMSException
+ {
+ final ConnectionImpl connection = createConnection(username, password);
+ connection.setTopicConnection(true);
+ return connection;
+ }
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
new file mode 100644
index 0000000000..587b12b51a
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.jms.Connection;
+import org.apache.qpid.amqp_1_0.jms.ConnectionMetaData;
+import org.apache.qpid.amqp_1_0.jms.Session;
+import org.apache.qpid.amqp_1_0.transport.Container;
+
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public class ConnectionImpl implements Connection, QueueConnection, TopicConnection
+{
+
+ private ConnectionMetaData _connectionMetaData;
+ private volatile ExceptionListener _exceptionListener;
+
+ private final List<SessionImpl> _sessions = new ArrayList<SessionImpl>();
+
+ private final Object _lock = new Object();
+
+ private org.apache.qpid.amqp_1_0.client.Connection _conn;
+ private boolean _isQueueConnection;
+ private boolean _isTopicConnection;
+ private final Collection<CloseTask> _closeTasks = new ArrayList<CloseTask>();
+
+
+ private static enum State
+ {
+ STOPPED,
+ STARTED,
+ CLOSED
+ }
+
+ private volatile State _state = State.STOPPED;
+
+ public ConnectionImpl(String host, int port, String username, String password, String clientId) throws JMSException
+ {
+ this(host,port,username,password,clientId,false);
+ }
+
+ public ConnectionImpl(String host, int port, String username, String password, String clientId, boolean ssl) throws JMSException
+ {
+ this(host,port,username,password,clientId,null,ssl);
+ }
+
+ public ConnectionImpl(String host, int port, String username, String password, String clientId, String remoteHost, boolean ssl) throws JMSException
+ {
+ Container container = clientId == null ? new Container() : new Container(clientId);
+ // TODO - authentication, containerId, clientId, ssl?, etc
+ try
+ {
+ _conn = new org.apache.qpid.amqp_1_0.client.Connection(host, port, username, password, container, remoteHost, ssl);
+ // TODO - retrieve negotiated AMQP version
+ _connectionMetaData = new ConnectionMetaDataImpl(1,0,0);
+ }
+ catch (org.apache.qpid.amqp_1_0.client.Connection.ConnectionException e)
+ {
+ JMSException jmsEx = new JMSException(e.getMessage());
+ jmsEx.setLinkedException(e);
+ jmsEx.initCause(e);
+ throw jmsEx;
+ }
+ }
+
+ public SessionImpl createSession(final boolean transacted, final int acknowledgeMode) throws JMSException
+ {
+ Session.AcknowledgeMode ackMode;
+
+ try
+ {
+ ackMode = transacted ? Session.AcknowledgeMode.SESSION_TRANSACTED
+ : Session.AcknowledgeMode.values()[acknowledgeMode];
+ }
+ catch (IndexOutOfBoundsException e)
+ {
+ JMSException jmsEx = new JMSException("Unknown acknowledgement mode " + acknowledgeMode);
+ jmsEx.setLinkedException(e);
+ jmsEx.initCause(e);
+ throw jmsEx;
+ }
+
+ return createSession(ackMode);
+ }
+
+ public SessionImpl createSession(final Session.AcknowledgeMode acknowledgeMode) throws JMSException
+ {
+ synchronized(_lock)
+ {
+ if(_state == State.CLOSED)
+ {
+ throw new IllegalStateException("Cannot create a session on a closed connection");
+ }
+
+ SessionImpl session = new SessionImpl(this, acknowledgeMode);
+ session.setQueueSession(_isQueueConnection);
+ session.setTopicSession(_isTopicConnection);
+ _sessions.add(session);
+
+ return session;
+ }
+
+ }
+
+ public String getClientID() throws JMSException
+ {
+ checkClosed();
+ return _conn.getEndpoint().getContainer().getId();
+ }
+
+ public void setClientID(final String s) throws JMSException
+ {
+ throw new IllegalStateException("Cannot set client-id to \""
+ + s
+ + "\"; client-id must be set on connection creation");
+ }
+
+ public ConnectionMetaData getMetaData() throws JMSException
+ {
+ checkClosed();
+ return _connectionMetaData;
+ }
+
+ public ExceptionListener getExceptionListener() throws JMSException
+ {
+ checkClosed();
+ return _exceptionListener;
+ }
+
+ public void setExceptionListener(final ExceptionListener exceptionListener) throws JMSException
+ {
+ checkClosed();
+ _exceptionListener = exceptionListener;
+ }
+
+ public void start() throws JMSException
+ {
+ synchronized(_lock)
+ {
+ checkClosed();
+ if(_state == State.STOPPED)
+ {
+ // TODO
+
+ _state = State.STARTED;
+
+ for(SessionImpl session : _sessions)
+ {
+ session.start();
+ }
+
+ }
+
+ _lock.notifyAll();
+ }
+
+ }
+
+ public void stop() throws JMSException
+ {
+ synchronized(_lock)
+ {
+ switch(_state)
+ {
+ case STARTED:
+ for(SessionImpl session : _sessions)
+ {
+ session.stop();
+ }
+ _state = State.STOPPED;
+ break;
+ case CLOSED:
+ throw new javax.jms.IllegalStateException("Closed");
+ }
+
+ _lock.notifyAll();
+ }
+ }
+
+
+ static interface CloseTask
+ {
+ public void onClose() throws JMSException;
+ }
+
+ void addOnCloseTask(CloseTask task)
+ {
+ synchronized (_lock)
+ {
+ _closeTasks.add(task);
+ }
+ }
+
+
+ void removeOnCloseTask(CloseTask task)
+ {
+ synchronized (_lock)
+ {
+ _closeTasks.remove(task);
+ }
+ }
+
+ public void close() throws JMSException
+ {
+ synchronized(_lock)
+ {
+ if(_state != State.CLOSED)
+ {
+ stop();
+ for(SessionImpl session : _sessions)
+ {
+ session.close();
+ }
+ for(CloseTask task : _closeTasks)
+ {
+ task.onClose();
+ }
+ _conn.close();
+ _state = State.CLOSED;
+ }
+
+ _lock.notifyAll();
+ }
+ }
+
+ private void checkClosed() throws IllegalStateException
+ {
+ if(_state == State.CLOSED)
+ throw new IllegalStateException("Closed");
+ }
+
+ public ConnectionConsumer createConnectionConsumer(final Destination destination,
+ final String s,
+ final ServerSessionPool serverSessionPool,
+ final int i) throws JMSException
+ {
+ checkClosed();
+ return null; //TODO
+ }
+
+ public TopicSession createTopicSession(final boolean transacted, final int acknowledgeMode) throws JMSException
+ {
+ checkClosed();
+ SessionImpl session = createSession(transacted, acknowledgeMode);
+ session.setTopicSession(true);
+ return session;
+ }
+
+ public ConnectionConsumer createConnectionConsumer(final Topic topic,
+ final String s,
+ final ServerSessionPool serverSessionPool,
+ final int i) throws JMSException
+ {
+ checkClosed();
+ return null; //TODO
+ }
+
+ public ConnectionConsumer createDurableConnectionConsumer(final Topic topic,
+ final String s,
+ final String s1,
+ final ServerSessionPool serverSessionPool,
+ final int i) throws JMSException
+ {
+ checkClosed();
+ return null; //TODO
+ }
+
+ public QueueSession createQueueSession(final boolean transacted, final int acknowledgeMode) throws JMSException
+ {
+ checkClosed();
+ SessionImpl session = createSession(transacted, acknowledgeMode);
+ session.setQueueSession(true);
+ return session;
+ }
+
+ public ConnectionConsumer createConnectionConsumer(final Queue queue,
+ final String s,
+ final ServerSessionPool serverSessionPool,
+ final int i) throws JMSException
+ {
+ checkClosed();
+ return null; //TODO
+ }
+
+
+
+ protected org.apache.qpid.amqp_1_0.client.Connection getClientConnection()
+ {
+ return _conn;
+ }
+
+ public boolean isStarted()
+ {
+ synchronized (_lock)
+ {
+ return _state == State.STARTED;
+ }
+ }
+
+ void setQueueConnection(final boolean queueConnection)
+ {
+ _isQueueConnection = queueConnection;
+ }
+
+ void setTopicConnection(final boolean topicConnection)
+ {
+ _isTopicConnection = topicConnection;
+ }
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionMetaDataImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionMetaDataImpl.java
new file mode 100644
index 0000000000..8159c7116b
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionMetaDataImpl.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.jms.ConnectionMetaData;
+
+import javax.jms.JMSException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Enumeration;
+
+public class ConnectionMetaDataImpl implements ConnectionMetaData
+{
+ private static final int JMS_MAJOR_VERSION = 1;
+ private static final int JMS_MINOR_VERSION = 1;
+
+ private static final int PROVIDER_MAJOR_VERSION = 1;
+ private static final int PROVIDER_MINOR_VERSION = 0;
+
+
+ private final int _amqpMajorVersion;
+ private final int _amqpMinorVersion;
+ private final int _amqpRevisionVersion;
+ private static final Collection<String> _jmsxProperties = Arrays.asList("JMSXGroupID", "JMSXGroupSeq");
+
+ public ConnectionMetaDataImpl(final int amqpMajorVersion, final int amqpMinorVersion, final int amqpRevisionVersion)
+ {
+ _amqpMajorVersion = amqpMajorVersion;
+ _amqpMinorVersion = amqpMinorVersion;
+ _amqpRevisionVersion = amqpRevisionVersion;
+ }
+
+ public String getJMSVersion() throws JMSException
+ {
+ return getJMSMajorVersion() + "." + getJMSMinorVersion();
+ }
+
+ public int getJMSMajorVersion() throws JMSException
+ {
+ return JMS_MAJOR_VERSION;
+ }
+
+ public int getJMSMinorVersion() throws JMSException
+ {
+ return JMS_MINOR_VERSION;
+ }
+
+ public String getJMSProviderName() throws JMSException
+ {
+ return "AMQP.ORG";
+ }
+
+ public String getProviderVersion() throws JMSException
+ {
+ return getProviderMajorVersion() + "." + getProviderMinorVersion();
+ }
+
+ public int getProviderMajorVersion() throws JMSException
+ {
+ return PROVIDER_MAJOR_VERSION;
+ }
+
+ public int getProviderMinorVersion() throws JMSException
+ {
+ return PROVIDER_MINOR_VERSION;
+ }
+
+ public Enumeration getJMSXPropertyNames() throws JMSException
+ {
+
+ return Collections.enumeration(_jmsxProperties);
+ }
+
+ public int getAMQPMajorVersion()
+ {
+ return _amqpMajorVersion;
+ }
+
+ public int getAMQPMinorVersion()
+ {
+ return _amqpMinorVersion;
+ }
+
+ public int getAMQPRevisionVersion()
+ {
+ return _amqpRevisionVersion;
+ }
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DestinationImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DestinationImpl.java
new file mode 100644
index 0000000000..b4ca2c6302
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DestinationImpl.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.jms.Destination;
+import org.apache.qpid.amqp_1_0.jms.Queue;
+import org.apache.qpid.amqp_1_0.jms.Topic;
+
+import javax.jms.JMSException;
+import java.util.WeakHashMap;
+
+public class DestinationImpl implements Destination, Queue, Topic
+{
+ private static final WeakHashMap<String, DestinationImpl> DESTINATION_CACHE =
+ new WeakHashMap<String, DestinationImpl>();
+
+ private final String _address;
+
+ protected DestinationImpl(String address)
+ {
+ _address = address;
+ }
+
+ public String getAddress()
+ {
+ return _address;
+ }
+
+ public static DestinationImpl valueOf(String address)
+ {
+ return address == null ? null : createDestination(address);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return _address.hashCode();
+ }
+
+ @Override
+ public boolean equals(final Object obj)
+ {
+ return obj != null
+ && obj.getClass() == getClass()
+ && _address.equals(((DestinationImpl)obj)._address);
+ }
+
+ public static synchronized DestinationImpl createDestination(final String address)
+ {
+ DestinationImpl destination = DESTINATION_CACHE.get(address);
+ if(destination == null)
+ {
+ destination = new DestinationImpl(address);
+ DESTINATION_CACHE.put(address, destination);
+ }
+ return destination;
+ }
+
+ public String getQueueName() throws JMSException
+ {
+ return getAddress();
+ }
+
+ public String getTopicName() throws JMSException
+ {
+ return getAddress();
+ }
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java
new file mode 100644
index 0000000000..47811a0f5a
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java
@@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.jms.MapMessage;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+import java.util.*;
+
+public class MapMessageImpl extends MessageImpl implements MapMessage
+{
+ private Map _map;
+
+ public MapMessageImpl(Header header, MessageAnnotations messageAnnotations, Properties properties, ApplicationProperties appProperties, Map map,
+ Footer footer,
+ SessionImpl session)
+ {
+ super(header, messageAnnotations, properties, appProperties, footer, session);
+ _map = map;
+ }
+
+ MapMessageImpl(final SessionImpl session)
+ {
+ super(new Header(), new MessageAnnotations(new HashMap()),
+ new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
+ session);
+ _map = new HashMap();
+ }
+
+ public boolean getBoolean(String name) throws JMSException
+ {
+ Object value = get(name);
+
+ if (value instanceof Boolean)
+ {
+ return ((Boolean) value).booleanValue();
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Boolean.valueOf((String) value);
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+ + " cannot be converted to boolean.");
+ }
+ }
+
+ public byte getByte(String name) throws JMSException
+ {
+ Object value = get(name);
+
+ if (value instanceof Byte)
+ {
+ return ((Byte) value).byteValue();
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Byte.valueOf((String) value).byteValue();
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+ + " cannot be converted to byte.");
+ } }
+
+ public short getShort(String name) throws JMSException
+ {
+ Object value = get(name);
+
+ if (value instanceof Short)
+ {
+ return ((Short) value).shortValue();
+ }
+ else if (value instanceof Byte)
+ {
+ return ((Byte) value).shortValue();
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Short.valueOf((String) value).shortValue();
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+ + " cannot be converted to short.");
+ } }
+
+ public char getChar(String name) throws JMSException
+ {
+ Object value = get(name);
+
+ if (!itemExists(name))
+ {
+ throw new MessageFormatException("Property " + name + " not present");
+ }
+ else if (value instanceof Character)
+ {
+ return ((Character) value).charValue();
+ }
+ else if (value == null)
+ {
+ throw new NullPointerException("Property " + name + " has null value and therefore cannot "
+ + "be converted to char.");
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+ + " cannot be converted to boolan.");
+ } }
+
+ public int getInt(String name) throws JMSException
+ {
+ Object value = get(name);
+
+ if (value instanceof Integer)
+ {
+ return ((Integer) value).intValue();
+ }
+ else if (value instanceof Short)
+ {
+ return ((Short) value).intValue();
+ }
+ else if (value instanceof Byte)
+ {
+ return ((Byte) value).intValue();
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Integer.valueOf((String) value).intValue();
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+ + " cannot be converted to int.");
+ }
+ }
+
+ public long getLong(String name) throws JMSException
+ {
+ Object value = get(name);
+
+ if (value instanceof Long)
+ {
+ return ((Long) value).longValue();
+ }
+ else if (value instanceof Integer)
+ {
+ return ((Integer) value).longValue();
+ }
+
+ if (value instanceof Short)
+ {
+ return ((Short) value).longValue();
+ }
+
+ if (value instanceof Byte)
+ {
+ return ((Byte) value).longValue();
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Long.valueOf((String) value).longValue();
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+ + " cannot be converted to long.");
+ }
+ }
+
+ public float getFloat(String name) throws JMSException
+ {
+ Object value = get(name);
+
+ if (value instanceof Float)
+ {
+ return ((Float) value).floatValue();
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Float.valueOf((String) value).floatValue();
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+ + " cannot be converted to float.");
+ }
+ }
+
+ public double getDouble(String name) throws JMSException
+ {
+ Object value = get(name);
+
+ if (value instanceof Double)
+ {
+ return ((Double) value).doubleValue();
+ }
+ else if (value instanceof Float)
+ {
+ return ((Float) value).doubleValue();
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Double.valueOf((String) value).doubleValue();
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+ + " cannot be converted to double.");
+ }
+ }
+
+ public String getString(String name) throws JMSException
+ {
+ Object value = get(name);
+
+ if ((value instanceof String) || (value == null))
+ {
+ return (String) value;
+ }
+ else if (value instanceof byte[] || value instanceof Binary)
+ {
+ throw new MessageFormatException("Property " + name + " of type byte[] " + "cannot be converted to String.");
+ }
+ else
+ {
+ return value.toString();
+ }
+ }
+
+ public byte[] getBytes(String name) throws JMSException
+ {
+ Object value = get(name);
+
+ if (!itemExists(name))
+ {
+ throw new MessageFormatException("Property " + name + " not present");
+ }
+ else if ((value instanceof byte[]) || (value == null))
+ {
+ return (byte[]) value;
+ }
+ else if(value instanceof Binary)
+ {
+ return ((Binary)value).getArray();
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+ + " cannot be converted to byte[].");
+ } }
+
+ public Object getObject(String s) throws JMSException
+ {
+ Object val = get(s);
+ return val instanceof Binary ? ((Binary)val).getArray() : val;
+ }
+
+ public Enumeration getMapNames() throws JMSException
+ {
+ return Collections.enumeration(keySet());
+ }
+
+ public void setBoolean(String name, boolean val) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(name);
+ put(name, val);
+ }
+
+ public void setByte(String name, byte val) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(name);
+ put(name, val);
+ }
+
+ public void setShort(String name, short val) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(name);
+ put(name, val);
+ }
+
+ public void setChar(String name, char val) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(name);
+ put(name, val);
+ }
+
+ public void setInt(String name, int val) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(name);
+ put(name, val);
+ }
+
+ public void setLong(String name, long val) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(name);
+ put(name, val);
+ }
+
+ public void setFloat(String name, float val) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(name);
+ put(name, val);
+ }
+
+ public void setDouble(String name, double val) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(name);
+ put(name, val);
+ }
+
+ public void setString(String name, String val) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(name);
+ put(name, val);
+ }
+
+ public void setBytes(String name, byte[] val) throws JMSException
+ {
+ setBytes(name, val, 0, val == null ? 0 : val.length);
+ }
+
+ public void setBytes(String name, byte[] bytes, int offset, int length) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(name);
+ byte[] val;
+
+ if(bytes == null)
+ {
+ val = null;
+ }
+ else
+ {
+ val = new byte[length];
+ System.arraycopy(bytes,offset,val,0,length);
+ }
+
+ put(name, new Binary(val));
+ }
+
+ public void setObject(String name, Object value) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(name);
+ if ((value instanceof Boolean) || (value instanceof Byte) || (value instanceof Short) || (value instanceof Integer)
+ || (value instanceof Long) || (value instanceof Character) || (value instanceof Float)
+ || (value instanceof Double) || (value instanceof String) || (value instanceof byte[]) || (value == null))
+ {
+ put(name, value);
+ }
+ else
+ {
+ throw new MessageFormatException("Cannot set property " + name + " to value " + value + "of type "
+ + value.getClass().getName() + ".");
+ } }
+
+ public boolean itemExists(String s)
+ {
+ return _map.containsKey(s);
+ }
+
+ public Object get(final Object key)
+ {
+ return _map.get(key);
+ }
+
+ public Object put(final Object key, final Object val)
+ {
+ return _map.put(key, val);
+ }
+
+ public boolean itemExists(final Object key)
+ {
+ return _map.containsKey(key);
+ }
+
+ public Set<Object> keySet()
+ {
+ return _map.keySet();
+ }
+
+ @Override
+ public void clearBody() throws JMSException
+ {
+ super.clearBody();
+ _map.clear();
+ }
+
+ private void checkPropertyName(String propName)
+ {
+ if ((propName == null) || propName.equals(""))
+ {
+ throw new IllegalArgumentException("Property name cannot be null, or the empty String.");
+ }
+ }
+
+ @Override Collection<Section> getSections()
+ {
+ List<Section> sections = new ArrayList<Section>();
+ sections.add(getHeader());
+ if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty())
+ {
+ sections.add(getMessageAnnotations());
+ }
+ sections.add(getProperties());
+ sections.add(getApplicationProperties());
+ sections.add(new AmqpValue(_map));
+ sections.add(getFooter());
+ return sections;
+ }
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
new file mode 100644
index 0000000000..dcea8f6771
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.client.AcknowledgeMode;
+import org.apache.qpid.amqp_1_0.client.Message;
+import org.apache.qpid.amqp_1_0.client.Receiver;
+import org.apache.qpid.amqp_1_0.client.Transaction;
+import org.apache.qpid.amqp_1_0.jms.MessageConsumer;
+import org.apache.qpid.amqp_1_0.jms.QueueReceiver;
+import org.apache.qpid.amqp_1_0.jms.Queue;
+import org.apache.qpid.amqp_1_0.jms.Session;
+import org.apache.qpid.amqp_1_0.jms.TemporaryDestination;
+import org.apache.qpid.amqp_1_0.jms.Topic;
+import org.apache.qpid.amqp_1_0.jms.TopicSubscriber;
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.messaging.Filter;
+import org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter;
+import org.apache.qpid.amqp_1_0.type.messaging.Modified;
+import org.apache.qpid.amqp_1_0.type.messaging.NoLocalFilter;
+import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
+
+
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+import javax.jms.MessageListener;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, TopicSubscriber
+{
+ private static final Symbol NO_LOCAL_FILTER_NAME = Symbol.valueOf("no-local");
+ private static final Symbol JMS_SELECTOR_FILTER_NAME = Symbol.valueOf("jms-selector");
+ private String _selector;
+ private boolean _noLocal;
+ private DestinationImpl _destination;
+ private SessionImpl _session;
+ private Receiver _receiver;
+ private Binary _lastUnackedMessage;
+ MessageListener _messageListener;
+
+ private boolean _isQueueConsumer;
+ private boolean _isTopicSubscriber;
+
+ private boolean _closed = false;
+ private String _linkName;
+ private boolean _durable;
+ private Collection<Binary> _txnMsgs = Collections.synchronizedCollection(new ArrayList<Binary>());
+ private Binary _lastTxnUpdate;
+ private final List<Message> _recoverReplayMessages = new ArrayList<Message>();
+ private final List<Message> _replaymessages = new ArrayList<Message>();
+
+ MessageConsumerImpl(final Destination destination,
+ final SessionImpl session,
+ final String selector,
+ final boolean noLocal) throws JMSException
+ {
+ this(destination,session,selector,noLocal,null,false);
+ }
+
+ MessageConsumerImpl(final Destination destination,
+ final SessionImpl session,
+ final String selector,
+ final boolean noLocal,
+ final String linkName,
+ final boolean durable) throws JMSException
+ {
+ _selector = selector;
+ _noLocal = noLocal;
+ _linkName = linkName;
+ _durable = durable;
+ if(destination instanceof DestinationImpl)
+ {
+ _destination = (DestinationImpl) destination;
+ if(destination instanceof javax.jms.Queue)
+ {
+ _isQueueConsumer = true;
+ }
+ else if(destination instanceof javax.jms.Topic)
+ {
+ _isTopicSubscriber = true;
+ }
+ if(destination instanceof TemporaryDestination)
+ {
+ ((TemporaryDestination)destination).addConsumer(this);
+ }
+ }
+ else
+ {
+ throw new InvalidDestinationException("Invalid destination class " + destination.getClass().getName());
+ }
+ _session = session;
+
+ _receiver = createClientReceiver();
+
+
+ }
+
+ protected Receiver createClientReceiver() throws JMSException
+ {
+ try
+ {
+ return _session.getClientSession(). createReceiver(_destination.getAddress(), AcknowledgeMode.ALO,
+ _linkName, _durable, getFilters(), null);
+ }
+ catch (AmqpErrorException e)
+ {
+ Error error = e.getError();
+ if(AmqpError.INVALID_FIELD.equals(error.getCondition())
+ && error.getInfo() != null && Symbol.valueOf("filter").equals(error.getInfo().get(Symbol.valueOf
+ ("field"))))
+ {
+ throw new InvalidSelectorException(e.getMessage());
+ }
+ else
+ {
+ throw new JMSException(e.getMessage(), error.getCondition().getValue().toString());
+
+ }
+ }
+ }
+
+ Map<Symbol, Filter> getFilters()
+ {
+ if(_selector == null || _selector.trim().equals(""))
+ {
+ if(_noLocal)
+ {
+ return Collections.singletonMap(NO_LOCAL_FILTER_NAME, (Filter) NoLocalFilter.INSTANCE);
+ }
+ else
+ {
+ return null;
+
+ }
+ }
+ else if(_noLocal)
+ {
+ Map<Symbol, Filter> filters = new HashMap<Symbol, Filter>();
+ filters.put(NO_LOCAL_FILTER_NAME, NoLocalFilter.INSTANCE);
+ filters.put(JMS_SELECTOR_FILTER_NAME, new JMSSelectorFilter(_selector));
+ return filters;
+ }
+ else
+ {
+ return Collections.singletonMap(JMS_SELECTOR_FILTER_NAME, (Filter)new JMSSelectorFilter(_selector));
+ }
+
+
+ }
+
+ public String getMessageSelector() throws JMSException
+ {
+ checkClosed();
+ return _selector;
+ }
+
+ public MessageListener getMessageListener() throws IllegalStateException
+ {
+ checkClosed();
+ return _messageListener;
+ }
+
+ public void setMessageListener(final MessageListener messageListener) throws JMSException
+ {
+ checkClosed();
+ _messageListener = messageListener;
+ _session.messageListenerSet( this );
+ _receiver.setMessageArrivalListener(new Receiver.MessageArrivalListener()
+ {
+
+ public void messageArrived(final Receiver receiver)
+ {
+ _session.messageArrived(MessageConsumerImpl.this);
+ }
+ });
+ }
+
+ public MessageImpl receive() throws JMSException
+ {
+ checkClosed();
+ return receiveImpl(-1L);
+ }
+
+ public MessageImpl receive(final long timeout) throws JMSException
+ {
+ checkClosed();
+ // TODO - validate timeout > 0
+
+ return receiveImpl(timeout);
+ }
+
+ public MessageImpl receiveNoWait() throws JMSException
+ {
+ checkClosed();
+ return receiveImpl(0L);
+ }
+
+ private MessageImpl receiveImpl(long timeout) throws IllegalStateException
+ {
+ org.apache.qpid.amqp_1_0.client.Message msg;
+ boolean redelivery;
+ if(_replaymessages.isEmpty())
+ {
+ msg = receive0(timeout);
+ redelivery = false;
+ }
+ else
+ {
+ msg = _replaymessages.remove(0);
+ redelivery = true;
+ }
+
+ if(msg != null)
+ {
+ preReceiveAction(msg);
+ }
+ return createJMSMessage(msg, redelivery);
+ }
+
+ Message receive0(final long timeout)
+ {
+ Message message = _receiver.receive(timeout);
+ if(_session.getAckModeEnum() == Session.AcknowledgeMode.CLIENT_ACKNOWLEDGE)
+ {
+ _recoverReplayMessages.add(message);
+ }
+ return message;
+ }
+
+
+ void acknowledge(final org.apache.qpid.amqp_1_0.client.Message msg)
+ {
+ _receiver.acknowledge(msg.getDeliveryTag(), _session.getTxn());
+ }
+
+ MessageImpl createJMSMessage(final Message msg, boolean redelivery)
+ {
+ if(msg != null)
+ {
+ MessageFactory factory = _session.getMessageFactory();
+ final MessageImpl message = factory.createMessage(_destination, msg);
+ message.setFromQueue(_isQueueConsumer);
+ message.setFromTopic(_isTopicSubscriber);
+ if(redelivery)
+ {
+ if(!message.getJMSRedelivered())
+ {
+ message.setJMSRedelivered(true);
+ }
+ }
+
+ return message;
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public void close() throws JMSException
+ {
+ if(!_closed)
+ {
+ _closed = true;
+
+ closeUnderlyingReceiver(_receiver);
+
+ if(_destination instanceof TemporaryDestination)
+ {
+ ((TemporaryDestination)_destination).removeConsumer(this);
+ }
+ }
+ }
+
+ protected void closeUnderlyingReceiver(Receiver receiver)
+ {
+ receiver.close();
+ }
+
+ private void checkClosed() throws IllegalStateException
+ {
+ if(_closed)
+ {
+ throw new javax.jms.IllegalStateException("Closed");
+ }
+ }
+
+ void setLastUnackedMessage(final Binary deliveryTag)
+ {
+ _lastUnackedMessage = deliveryTag;
+ }
+
+ void preReceiveAction(final org.apache.qpid.amqp_1_0.client.Message msg) throws IllegalStateException
+ {
+ final int acknowledgeMode = _session.getAcknowledgeMode();
+
+ if(acknowledgeMode == Session.AUTO_ACKNOWLEDGE
+ || acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE
+ || acknowledgeMode == Session.SESSION_TRANSACTED)
+ {
+ acknowledge(msg);
+ if(acknowledgeMode == Session.SESSION_TRANSACTED)
+ {
+ _txnMsgs.add(msg.getDeliveryTag());
+ }
+ }
+ else if(acknowledgeMode == Session.CLIENT_ACKNOWLEDGE)
+ {
+ setLastUnackedMessage(msg.getDeliveryTag());
+ }
+ }
+
+ void acknowledgeAll()
+ {
+ if(_lastUnackedMessage != null)
+ {
+ Transaction txn = _session.getTxn();
+ _receiver.acknowledgeAll(_lastUnackedMessage, txn, null);
+ if(txn != null)
+ {
+ _lastTxnUpdate = _lastUnackedMessage;
+ }
+ _lastUnackedMessage = null;
+
+ }
+ _recoverReplayMessages.clear();
+ if(!_replaymessages.isEmpty())
+ {
+ _recoverReplayMessages.addAll(_replaymessages);
+ }
+ }
+
+ void postRollback()
+ {
+ if(_lastTxnUpdate != null)
+ {
+ _receiver.updateAll(new Modified(), _lastTxnUpdate);
+ _lastTxnUpdate = null;
+ }
+ for(Binary tag : _txnMsgs)
+ {
+ _receiver.modified(tag);
+ }
+ _txnMsgs.clear();
+ }
+
+ void postCommit()
+ {
+ _lastTxnUpdate = null;
+ _txnMsgs.clear();
+ }
+
+ public DestinationImpl getDestination() throws IllegalStateException
+ {
+ checkClosed();
+ return _destination;
+ }
+
+
+ public SessionImpl getSession() throws IllegalStateException
+ {
+ checkClosed();
+ return _session;
+ }
+
+ public boolean getNoLocal() throws IllegalStateException
+ {
+ checkClosed();
+ return _noLocal;
+ }
+
+ public void start()
+ {
+ _receiver.setCredit(UnsignedInteger.valueOf(100), true);
+ }
+
+ public Queue getQueue() throws JMSException
+ {
+ return (Queue) getDestination();
+ }
+
+ public Topic getTopic() throws JMSException
+ {
+ return (Topic) getDestination();
+ }
+
+ void setQueueConsumer(final boolean queueConsumer)
+ {
+ _isQueueConsumer = queueConsumer;
+ }
+
+ void setTopicSubscriber(final boolean topicSubscriber)
+ {
+ _isTopicSubscriber = topicSubscriber;
+ }
+
+ String getLinkName()
+ {
+ return _linkName;
+ }
+
+ boolean isDurable()
+ {
+ return _durable;
+ }
+
+ void doRecover()
+ {
+ _replaymessages.clear();
+ if(!_recoverReplayMessages.isEmpty())
+ {
+ _replaymessages.addAll(_recoverReplayMessages);
+ for(Message msg : _replaymessages)
+ {
+ _session.messageArrived(this);
+ }
+ }
+ }
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java
new file mode 100644
index 0000000000..216107e53e
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.client.Message;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+import java.util.*;
+
+class MessageFactory
+{
+ private final SessionImpl _session;
+
+
+ MessageFactory(final SessionImpl session)
+ {
+ _session = session;
+ }
+
+ public MessageImpl createMessage(final DestinationImpl destination, final Message msg)
+ {
+ MessageImpl message;
+ List<Section> payload = msg.getPayload();
+ Header header = null;
+ MessageAnnotations messageAnnotations = null;
+
+ Properties properties = null;
+ ApplicationProperties appProperties = null;
+ Footer footer;
+
+ Iterator<Section> iter = payload.iterator();
+ List<Section> body = new ArrayList<Section>();
+
+ Section section = iter.hasNext() ? iter.next() : null;
+
+ if(section instanceof Header)
+ {
+ header = (Header) section;
+ section = iter.hasNext() ? iter.next() : null;
+ }
+
+ if(section instanceof MessageAnnotations)
+ {
+ messageAnnotations = (MessageAnnotations) section;
+ section = iter.hasNext() ? iter.next() : null;
+ }
+
+ if(section instanceof Properties)
+ {
+ properties = (Properties) section;
+ section = iter.hasNext() ? iter.next() : null;
+ }
+
+ if(section instanceof ApplicationProperties)
+ {
+ appProperties = (ApplicationProperties) section;
+ section = iter.hasNext() ? iter.next() : null;
+ }
+
+ while(section != null && !(section instanceof Footer))
+ {
+ body.add(section);
+ section = iter.hasNext() ? iter.next() : null;
+ }
+
+ footer = (Footer) section;
+
+ if(body.size() == 1)
+ {
+ Section bodySection = body.get(0);
+ if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof Map)
+ {
+ message = new MapMessageImpl(header, messageAnnotations, properties, appProperties, (Map) ((AmqpValue)bodySection).getValue(), footer, _session);
+ }
+ else if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof List)
+ {
+ message = new StreamMessageImpl(header, messageAnnotations, properties, appProperties,
+ (List) ((AmqpValue)bodySection).getValue(), footer, _session);
+ }
+ else if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof String)
+ {
+ message = new TextMessageImpl(header, messageAnnotations, properties, appProperties,
+ (String) ((AmqpValue)bodySection).getValue(), footer, _session);
+ }
+ else if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof Binary)
+ {
+
+ Binary value = (Binary) ((AmqpValue) bodySection).getValue();
+ message = new BytesMessageImpl(header, messageAnnotations, properties, appProperties,
+ new Data(value), footer, _session);
+ }
+ else if(bodySection instanceof Data)
+ {
+ if(properties != null && ObjectMessageImpl.CONTENT_TYPE.equals(properties.getContentType()))
+ {
+
+
+ message = new ObjectMessageImpl(header, messageAnnotations, properties, appProperties,
+ (Data) bodySection,
+ footer,
+ _session);
+ }
+ else
+ {
+ message = new BytesMessageImpl(header, messageAnnotations, properties, appProperties, (Data) bodySection, footer, _session);
+ }
+ }
+ else if(bodySection instanceof AmqpSequence)
+ {
+ message = new StreamMessageImpl(header, messageAnnotations, properties, appProperties, ((AmqpSequence) bodySection).getValue(), footer, _session);
+ }
+
+ /*else if(bodySection instanceof AmqpDataSection)
+ {
+ AmqpDataSection dataSection = (AmqpDataSection) bodySection;
+
+ List<Object> data = new ArrayList<Object>();
+
+ ListIterator<Object> dataIter = dataSection.iterator();
+
+ while(dataIter.hasNext())
+ {
+ data.add(dataIter.next());
+ }
+
+ if(data.size() == 1)
+ {
+ final Object obj = data.get(0);
+ if( obj instanceof String)
+ {
+ message = new TextMessageImpl(header,properties,appProperties,(String) data.get(0),footer, _session);
+ }
+ else if(obj instanceof JavaSerializable)
+ {
+ // TODO - ObjectMessage
+ message = new AmqpMessageImpl(header,properties,appProperties,body,footer, _session);
+ }
+ else if(obj instanceof Serializable)
+ {
+ message = new ObjectMessageImpl(header,properties,footer,appProperties,(Serializable)obj, _session);
+ }
+ else
+ {
+ message = new AmqpMessageImpl(header,properties,appProperties,body,footer, _session);
+ }
+ }
+ else
+ {
+ // not a text message
+ message = new AmqpMessageImpl(header,properties,appProperties,body,footer, _session);
+ }
+ }*/
+ else
+ {
+ message = new AmqpMessageImpl(header,messageAnnotations, properties,appProperties,body,footer, _session);
+ }
+ }
+ else
+ {
+ message = new AmqpMessageImpl(header,messageAnnotations, properties,appProperties,body,footer, _session);
+ }
+
+ message.setReadOnly();
+
+ return message;
+ }
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java
new file mode 100644
index 0000000000..96798b69a1
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java
@@ -0,0 +1,1209 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.jms.Message;
+import org.apache.qpid.amqp_1_0.messaging.MessageAttributes;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.UnsignedByte;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.UnsignedLong;
+import org.apache.qpid.amqp_1_0.type.UnsignedShort;
+import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
+import org.apache.qpid.amqp_1_0.type.messaging.Footer;
+import org.apache.qpid.amqp_1_0.type.messaging.Header;
+import org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageNotReadableException;
+import javax.jms.MessageNotWriteableException;
+import java.nio.charset.Charset;
+import java.util.*;
+
+public abstract class MessageImpl implements Message
+{
+ static final Set<Class> _supportedClasses =
+ new HashSet<Class>(Arrays.asList(Boolean.class, Byte.class, Short.class, Integer.class, Long.class,
+ Float.class, Double.class, Character.class, String.class, byte[].class));
+ private static final Symbol JMS_TYPE = Symbol.valueOf("jms-type");
+
+ private Header _header;
+ private Properties _properties;
+ private ApplicationProperties _applicationProperties;
+ private Footer _footer;
+ public static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");
+ private SessionImpl _sessionImpl;
+ private boolean _readOnly;
+ private MessageAnnotations _messageAnnotations;
+
+ private boolean _isFromQueue;
+ private boolean _isFromTopic;
+ private long _expiration;
+
+ protected MessageImpl(Header header,
+ MessageAnnotations messageAnnotations,
+ Properties properties,
+ ApplicationProperties appProperties,
+ Footer footer,
+ SessionImpl session)
+ {
+ _header = header == null ? new Header() : header;
+ _properties = properties == null ? new Properties() : properties;
+ _messageAnnotations = messageAnnotations == null ? new MessageAnnotations(new HashMap()) : messageAnnotations;
+ _footer = footer == null ? new Footer(Collections.EMPTY_MAP) : footer;
+ _applicationProperties = appProperties == null ? new ApplicationProperties(new HashMap()) : appProperties;
+ _sessionImpl = session;
+ }
+
+ public String getJMSMessageID() throws JMSException
+ {
+ Object messageId = getMessageId();
+
+ return messageId == null ? null : "ID:"+messageId.toString();
+ }
+
+ public void setJMSMessageID(String messageId) throws InvalidJMSMEssageIdException
+ {
+ if(messageId == null)
+ {
+ setMessageId(null);
+ }
+ else if(messageId.startsWith("ID:"))
+ {
+ setMessageId(messageId.substring(3));
+ }
+ else
+ {
+ throw new InvalidJMSMEssageIdException(messageId);
+ }
+ }
+
+ public long getJMSTimestamp() throws JMSException
+ {
+ Date transmitTime = getTransmitTime();
+ return transmitTime == null ? 0 : transmitTime.getTime();
+ }
+
+ public void setJMSTimestamp(long l) throws JMSException
+ {
+ setTransmitTime(new Date(l));
+ if(_expiration != 0l)
+ {
+ setTtl(UnsignedInteger.valueOf(_expiration-getTransmitTime().getTime()));
+ }
+ }
+
+ public byte[] getJMSCorrelationIDAsBytes() throws JMSException
+ {
+
+ Object o = getCorrelationId();
+ if(o instanceof Binary)
+ {
+ Binary correlationIdBinary = (Binary) o;
+ byte[] correlationId = new byte[correlationIdBinary.getLength()];
+ correlationIdBinary.asByteBuffer().get(correlationId);
+ return correlationId;
+ }
+ else
+ {
+ return o == null ? null : o.toString().getBytes();
+ }
+
+ }
+
+ public void setJMSCorrelationIDAsBytes(byte[] correlationId) throws JMSException
+ {
+ if(correlationId == null)
+ {
+ setCorrelationId(null);
+ }
+ else
+ {
+ byte[] dup = new byte[correlationId.length];
+ System.arraycopy(correlationId,0,dup,0,correlationId.length);
+ setCorrelationId(new Binary(dup));
+ }
+ }
+
+ public void setJMSCorrelationID(String s) throws JMSException
+ {
+ getProperties().setCorrelationId(s);
+ }
+
+ public String getJMSCorrelationID() throws JMSException
+ {
+ Object o = getProperties().getCorrelationId();
+ if(o instanceof Binary)
+ {
+ Binary id = (Binary) o;
+ return new String(id.getArray(), id.getArrayOffset(), id.getLength());
+ }
+ else
+ {
+ return o == null ? null : o.toString();
+ }
+ }
+
+ public DestinationImpl getJMSReplyTo() throws JMSException
+ {
+ return DestinationImpl.valueOf(getReplyTo());
+ }
+
+ public void setJMSReplyTo(Destination destination) throws NonAMQPDestinationException
+ {
+ if(destination == null)
+ {
+ setReplyTo(null);
+ }
+ else if (destination instanceof org.apache.qpid.amqp_1_0.jms.Destination)
+ {
+ setReplyTo(((org.apache.qpid.amqp_1_0.jms.Destination)destination).getAddress());
+ }
+ else
+ {
+ throw new NonAMQPDestinationException(destination);
+ }
+ }
+
+ public DestinationImpl getJMSDestination() throws JMSException
+ {
+ return _isFromQueue ? QueueImpl.valueOf(getTo())
+ : _isFromTopic ? TopicImpl.valueOf(getTo())
+ : DestinationImpl.valueOf(getTo());
+ }
+
+ public void setJMSDestination(Destination destination) throws NonAMQPDestinationException
+ {
+ if(destination == null)
+ {
+ setTo(null);
+ }
+ else if (destination instanceof org.apache.qpid.amqp_1_0.jms.Destination)
+ {
+ setTo(((org.apache.qpid.amqp_1_0.jms.Destination)destination).getAddress());
+ }
+ else
+ {
+ throw new NonAMQPDestinationException(destination);
+ }
+ }
+
+ public int getJMSDeliveryMode() throws JMSException
+ {
+ if(Boolean.FALSE.equals(getDurable()))
+ {
+ return DeliveryMode.NON_PERSISTENT;
+ }
+ else
+ {
+ return DeliveryMode.PERSISTENT;
+ }
+ }
+
+ public void setJMSDeliveryMode(int deliveryMode) throws JMSException
+ {
+ switch(deliveryMode)
+ {
+ case DeliveryMode.NON_PERSISTENT:
+ setDurable(false);
+ break;
+ case DeliveryMode.PERSISTENT:
+ setDurable(true);
+ break;
+ default:
+ //TODO
+ }
+ }
+
+ public boolean getJMSRedelivered()
+ {
+ UnsignedInteger failures = getDeliveryFailures();
+ return failures != null && (failures.intValue() != 0);
+ }
+
+ public void setJMSRedelivered(boolean redelivered)
+ {
+ UnsignedInteger failures = getDeliveryFailures();
+ if(redelivered)
+ {
+ if(failures == null)
+ {
+ setDeliveryFailures(UnsignedInteger.valueOf(1));
+ }
+ }
+ else
+ {
+ setDeliveryFailures(null);
+ }
+ }
+
+ public String getJMSType() throws JMSException
+ {
+ Map messageAttrs = _messageAnnotations == null ? null : _messageAnnotations.getValue();
+ final Object attrValue = messageAttrs == null ? null : messageAttrs.get(JMS_TYPE);
+
+ return attrValue instanceof String ? attrValue.toString() : null;
+ }
+
+ public void setJMSType(String s) throws JMSException
+ {
+ Map messageAttrs = _messageAnnotations == null ? null : _messageAnnotations.getValue();
+ if(messageAttrs == null)
+ {
+ messageAttrs = new HashMap();
+ _messageAnnotations = new MessageAnnotations(messageAttrs);
+ }
+
+ messageAttrs.put(JMS_TYPE, s);
+ }
+
+ public long getJMSExpiration() throws JMSException
+ {
+ final UnsignedInteger ttl = getTtl();
+ return ttl == null || ttl.longValue() == 0 ? 0 : getJMSTimestamp() + ttl.longValue();
+ }
+
+ public void setJMSExpiration(long l) throws JMSException
+ {
+ _expiration = l;
+ if(l == 0)
+ {
+ setTtl(UnsignedInteger.ZERO);
+ }
+ else
+ {
+ if(getTransmitTime() == null)
+ {
+ setTransmitTime(new Date());
+ }
+ setTtl(UnsignedInteger.valueOf(l - getTransmitTime().getTime()));
+ }
+ }
+
+ public int getJMSPriority() throws JMSException
+ {
+ UnsignedByte priority = getPriority();
+ return priority == null ? DEFAULT_PRIORITY : priority.intValue();
+ }
+
+ public void setJMSPriority(int i) throws InvalidJMSPriorityException
+ {
+ if(i >= 0 && i <= 255)
+ {
+ setPriority(UnsignedByte.valueOf((byte)i));
+ }
+ else
+ {
+ throw new InvalidJMSPriorityException(i);
+ }
+ }
+
+ public void clearProperties() throws JMSException
+ {
+ _applicationProperties.getValue().clear();
+ }
+
+ public boolean propertyExists(final String s) throws JMSException
+ {
+ return propertyExists((Object) s);
+ }
+
+ public boolean getBooleanProperty(final String s) throws JMSException
+ {
+ return getBooleanProperty((Object) s);
+ }
+
+ public byte getByteProperty(final String s) throws JMSException
+ {
+ return getByteProperty((Object)s);
+ }
+
+ public short getShortProperty(final String s) throws JMSException
+ {
+ return getShortProperty((Object)s);
+ }
+
+ public int getIntProperty(final String s) throws JMSException
+ {
+ return getIntProperty((Object)s);
+ }
+
+ public long getLongProperty(final String s) throws JMSException
+ {
+ return getLongProperty((Object)s);
+ }
+
+ public float getFloatProperty(final String s) throws JMSException
+ {
+ return getFloatProperty((Object)s);
+ }
+
+ public double getDoubleProperty(final String s) throws JMSException
+ {
+ return getDoubleProperty((Object)s);
+ }
+
+ public String getStringProperty(final String s) throws JMSException
+ {
+ return getStringProperty((Object)s);
+ }
+
+ public Object getObjectProperty(final String s) throws JMSException
+ {
+ return getObjectProperty((Object)s);
+ }
+
+ public boolean propertyExists(Object name) throws JMSException
+ {
+ return _applicationProperties.getValue().containsKey(name);
+ }
+
+ public boolean getBooleanProperty(Object name) throws JMSException
+ {
+
+ Object value = getProperty(name);
+
+ if (value instanceof Boolean)
+ {
+ return ((Boolean) value).booleanValue();
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Boolean.valueOf((String) value);
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+ + " cannot be converted to boolean.");
+ }
+ }
+
+ public byte getByteProperty(Object name) throws JMSException
+ {
+ Object value = getProperty(name);
+
+ if (value instanceof Byte)
+ {
+ return ((Byte) value).byteValue();
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Byte.valueOf((String) value).byteValue();
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+ + " cannot be converted to byte.");
+ }
+ }
+
+ public short getShortProperty(Object name) throws JMSException
+ {
+ Object value = getProperty(name);
+
+ if (value instanceof Short)
+ {
+ return ((Short) value).shortValue();
+ }
+ else if (value instanceof Byte)
+ {
+ return ((Byte) value).shortValue();
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Short.valueOf((String) value).shortValue();
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+ + " cannot be converted to short.");
+ }
+ }
+
+ private Object getProperty(final Object name)
+ {
+ return _applicationProperties.getValue().get(name);
+ }
+
+ public int getIntProperty(Object name) throws JMSException
+ {
+ Object value = getProperty(name);
+
+ if (value instanceof Integer)
+ {
+ return ((Integer) value).intValue();
+ }
+ else if (value instanceof Short)
+ {
+ return ((Short) value).intValue();
+ }
+ else if (value instanceof Byte)
+ {
+ return ((Byte) value).intValue();
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Integer.valueOf((String) value).intValue();
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+ + " cannot be converted to int.");
+ }
+ }
+
+ public long getLongProperty(Object name) throws JMSException
+ {
+ Object value = getProperty(name);
+
+ if (value instanceof Long)
+ {
+ return ((Long) value).longValue();
+ }
+ else if (value instanceof Integer)
+ {
+ return ((Integer) value).longValue();
+ }
+
+ if (value instanceof Short)
+ {
+ return ((Short) value).longValue();
+ }
+
+ if (value instanceof Byte)
+ {
+ return ((Byte) value).longValue();
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Long.valueOf((String) value).longValue();
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+ + " cannot be converted to long.");
+ }
+ }
+
+ public float getFloatProperty(Object name) throws JMSException
+ {
+ Object value = getProperty(name);
+
+ if (value instanceof Float)
+ {
+ return ((Float) value).floatValue();
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Float.valueOf((String) value).floatValue();
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+ + " cannot be converted to float.");
+ }
+ }
+
+ public double getDoubleProperty(Object name) throws JMSException
+ {
+ Object value = getProperty(name);
+
+ if (value instanceof Double)
+ {
+ return ((Double) value).doubleValue();
+ }
+ else if (value instanceof Float)
+ {
+ return ((Float) value).doubleValue();
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Double.valueOf((String) value).doubleValue();
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+ + " cannot be converted to double.");
+ }
+ }
+
+ public String getStringProperty(Object name) throws JMSException
+ {
+ Object value = getProperty(name);
+
+ if ((value instanceof String) || (value == null))
+ {
+ return (String) value;
+ }
+ else if (value instanceof byte[])
+ {
+ throw new MessageFormatException("Property " + name + " of type byte[] " + "cannot be converted to String.");
+ }
+ else
+ {
+ return value.toString();
+ }
+ }
+
+ public Object getObjectProperty(Object name) throws JMSException
+ {
+ return getProperty(name);
+ }
+
+ public List<Object> getListProperty(final Object name) throws JMSException
+ {
+ Object value = getProperty(name);
+ if(value instanceof List || value == null)
+ {
+ return (List<Object>)value;
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+ + " cannot be converted to List.");
+ }
+ }
+
+ public Map<Object, Object> getMapProperty(final Object name) throws JMSException
+ {
+ Object value = getProperty(name);
+ if(value instanceof Map || value == null)
+ {
+ return (Map<Object,Object>)value;
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+ + " cannot be converted to Map.");
+ }
+ }
+
+ public UnsignedByte getUnsignedByteProperty(final Object name) throws JMSException
+ {
+ Object value = getProperty(name);
+
+ if (value instanceof UnsignedByte)
+ {
+ return (UnsignedByte) value;
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return UnsignedByte.valueOf((String) value);
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+ + " cannot be converted to UnsignedByte.");
+ }
+ }
+
+ public UnsignedShort getUnsignedShortProperty(final Object name) throws JMSException
+ {
+ Object value = getProperty(name);
+
+ if (value instanceof UnsignedShort)
+ {
+ return (UnsignedShort) value;
+ }
+ else if (value instanceof UnsignedByte)
+ {
+ return UnsignedShort.valueOf(((UnsignedByte)value).shortValue());
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return UnsignedShort.valueOf((String) value);
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+ + " cannot be converted to UnsignedShort.");
+ }
+ }
+
+ public UnsignedInteger getUnsignedIntProperty(final Object name) throws JMSException
+ {
+ Object value = getProperty(name);
+
+ if (value instanceof UnsignedInteger)
+ {
+ return (UnsignedInteger) value;
+ }
+ else if (value instanceof UnsignedByte)
+ {
+ return UnsignedInteger.valueOf(((UnsignedByte)value).intValue());
+ }
+ else if (value instanceof UnsignedShort)
+ {
+ return UnsignedInteger.valueOf(((UnsignedShort)value).intValue());
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return UnsignedInteger.valueOf((String) value);
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+ + " cannot be converted to UnsignedShort.");
+ }
+ }
+
+ public UnsignedLong getUnsignedLongProperty(final Object name) throws JMSException
+ {
+ Object value = getProperty(name);
+
+ if (value instanceof UnsignedLong)
+ {
+ return (UnsignedLong) value;
+ }
+ else if (value instanceof UnsignedByte)
+ {
+ return UnsignedLong.valueOf(((UnsignedByte)value).longValue());
+ }
+ else if (value instanceof UnsignedShort)
+ {
+ return UnsignedLong.valueOf(((UnsignedShort)value).longValue());
+ }
+ else if (value instanceof UnsignedInteger)
+ {
+ return UnsignedLong.valueOf(((UnsignedInteger)value).longValue());
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return UnsignedLong.valueOf((String) value);
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+ + " cannot be converted to UnsignedShort.");
+ }
+ }
+
+ public Enumeration getPropertyNames() throws JMSException
+ {
+ final Collection<String> names = new ArrayList<String>();
+ for(Object key : _applicationProperties.getValue().keySet())
+ {
+ if(key instanceof String)
+ {
+ names.add((String)key);
+ }
+ }
+ return Collections.enumeration(names);
+ }
+
+ public void setBooleanProperty(final String s, final boolean b) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(s);
+ setBooleanProperty((Object)s, b);
+ }
+
+ protected void checkPropertyName(CharSequence propertyName)
+ {
+ if (propertyName == null)
+ {
+ throw new IllegalArgumentException("Property name must not be null");
+ }
+ else if (propertyName.length() == 0)
+ {
+ throw new IllegalArgumentException("Property name must not be the empty string");
+ }
+
+ checkIdentiferFormat(propertyName);
+ }
+
+ protected void checkIdentiferFormat(CharSequence propertyName)
+ {
+// JMS requirements 3.5.1 Property Names
+// Identifiers:
+// - An identifier is an unlimited-length character sequence that must begin
+// with a Java identifier start character; all following characters must be Java
+// identifier part characters. An identifier start character is any character for
+// which the method Character.isJavaIdentifierStart returns true. This includes
+// '_' and '$'. An identifier part character is any character for which the
+// method Character.isJavaIdentifierPart returns true.
+// - Identifiers cannot be the names NULL, TRUE, or FALSE.
+// Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or
+// ESCAPE.
+// Identifiers are either header field references or property references. The
+// type of a property value in a message selector corresponds to the type
+// used to set the property. If a property that does not exist in a message is
+// referenced, its value is NULL. The semantics of evaluating NULL values
+// in a selector are described in Section 3.8.1.2, Null Values.
+// The conversions that apply to the get methods for properties do not
+// apply when a property is used in a message selector expression. For
+// example, suppose you set a property as a string value, as in the
+// following:
+// myMessage.setStringProperty("NumberOfOrders", "2")
+// The following expression in a message selector would evaluate to false,
+// because a string cannot be used in an arithmetic expression:
+// "NumberOfOrders > 1"
+// Identifiers are case sensitive.
+// Message header field references are restricted to JMSDeliveryMode,
+// JMSPriority, JMSMessageID, JMSTimestamp, JMSCorrelationID, and
+// JMSType. JMSMessageID, JMSCorrelationID, and JMSType values may be
+// null and if so are treated as a NULL value.
+
+ // JMS start character
+ if (!(Character.isJavaIdentifierStart(propertyName.charAt(0))))
+ {
+ throw new IllegalArgumentException("Identifier '" + propertyName + "' does not start with a valid JMS identifier start character");
+ }
+
+ // JMS part character
+ int length = propertyName.length();
+ for (int c = 1; c < length; c++)
+ {
+ if (!(Character.isJavaIdentifierPart(propertyName.charAt(c))))
+ {
+ throw new IllegalArgumentException("Identifier '" + propertyName + "' contains an invalid JMS identifier character");
+ }
+ }
+
+ // JMS invalid names
+ if ((propertyName.equals("NULL")
+ || propertyName.equals("TRUE")
+ || propertyName.equals("FALSE")
+ || propertyName.equals("NOT")
+ || propertyName.equals("AND")
+ || propertyName.equals("OR")
+ || propertyName.equals("BETWEEN")
+ || propertyName.equals("LIKE")
+ || propertyName.equals("IN")
+ || propertyName.equals("IS")
+ || propertyName.equals("ESCAPE")))
+ {
+ throw new IllegalArgumentException("Identifier '" + propertyName + "' is not allowed in JMS");
+ }
+
+ }
+
+ public void setByteProperty(final String s, final byte b) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(s);
+
+ setByteProperty((Object)s, b);
+ }
+
+ public void setShortProperty(final String s, final short i) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(s);
+
+ setShortProperty((Object)s, i);
+ }
+
+ public void setIntProperty(final String s, final int i) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(s);
+
+ setIntProperty((Object)s, i);
+ }
+
+ public void setLongProperty(final String s, final long l) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(s);
+
+ setLongProperty((Object)s, l);
+ }
+
+ public void setFloatProperty(final String s, final float v) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(s);
+
+ setFloatProperty((Object) s, v);
+ }
+
+ public void setDoubleProperty(final String s, final double v) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(s);
+
+ setDoubleProperty((Object)s, v);
+ }
+
+ public void setStringProperty(final String s, final String s1) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(s);
+
+ setStringProperty((Object)s, s1);
+ }
+
+ public void setObjectProperty(final String s, final Object o) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(s);
+
+ if(o != null && (_supportedClasses.contains(o.getClass())))
+ {
+ setObjectProperty((Object)s, o);
+ }
+ else
+ {
+ throw new MessageFormatException("Cannot call setObjectProperty with a value of " + ((o == null) ? "null" : " class "+o.getClass().getName()) + ".");
+ }
+ }
+
+ public void setBooleanProperty(Object name, boolean b) throws JMSException
+ {
+ _applicationProperties.getValue().put(name, b);
+ }
+
+ public void setByteProperty(Object name, byte b) throws JMSException
+ {
+ _applicationProperties.getValue().put(name, b);
+ }
+
+ public void setShortProperty(Object name, short i) throws JMSException
+ {
+ _applicationProperties.getValue().put(name, i);
+ }
+
+ public void setIntProperty(Object name, int i) throws JMSException
+ {
+ _applicationProperties.getValue().put(name, i);
+ }
+
+ public void setLongProperty(Object name, long l) throws JMSException
+ {
+ _applicationProperties.getValue().put(name, l);
+ }
+
+ public void setFloatProperty(Object name, float v) throws JMSException
+ {
+ _applicationProperties.getValue().put(name, v);
+ }
+
+ public void setDoubleProperty(Object name, double v) throws JMSException
+ {
+ _applicationProperties.getValue().put(name, v);
+ }
+
+ public void setStringProperty(Object name, String value) throws JMSException
+ {
+ _applicationProperties.getValue().put(name, value);
+ }
+
+ public void setObjectProperty(Object name, Object value) throws JMSException
+ {
+ _applicationProperties.getValue().put(name, value);
+ }
+
+ public void setListProperty(final Object name, final List<Object> list) throws JMSException
+ {
+ _applicationProperties.getValue().put(name, list);
+ }
+
+ public void setMapProperty(final Object name, final Map<Object, Object> map) throws JMSException
+ {
+ _applicationProperties.getValue().put(name, map);
+ }
+
+ public void setUnsignedByteProperty(final Object name, final UnsignedByte b) throws JMSException
+ {
+ _applicationProperties.getValue().put(name, b);
+ }
+
+ public void setUnsignedShortProperty(final Object name, final UnsignedShort s) throws JMSException
+ {
+ _applicationProperties.getValue().put(name, s);
+ }
+
+ public void setUnsignedIntProperty(final Object name, final UnsignedInteger i) throws JMSException
+ {
+ _applicationProperties.getValue().put(name, i);
+ }
+
+ public void setUnsignedLongProperty(final Object name, final UnsignedLong l) throws JMSException
+ {
+ _applicationProperties.getValue().put(name, l);
+ }
+
+ public UnsignedInteger getDeliveryFailures()
+ {
+ return _header.getDeliveryCount();
+ }
+
+ public void setDeliveryFailures(UnsignedInteger failures)
+ {
+ _header.setDeliveryCount(failures);
+ }
+
+ public MessageAttributes getHeaderMessageAttrs()
+ {
+ // TODO
+ return null ; // _header.getMessageAttrs();
+ }
+
+ public void setHeaderMessageAttrs(final MessageAttributes messageAttrs)
+ {
+ // TODO
+ }
+
+ public MessageAttributes getHeaderDeliveryAttrs()
+ {
+ // TODO
+ return null ; //_header.getDeliveryAttrs();
+ }
+
+ public void setHeaderDeliveryAttrs(final MessageAttributes deliveryAttrs)
+ {
+ //TODO
+ }
+
+ public Boolean getDurable()
+ {
+ return _header.getDurable();
+ }
+
+ public void setDurable(final Boolean durable)
+ {
+ _header.setDurable(durable);
+ }
+
+ public UnsignedByte getPriority()
+ {
+ return _header.getPriority();
+ }
+
+ public void setPriority(final UnsignedByte priority)
+ {
+ _header.setPriority(priority);
+ }
+
+ public Date getTransmitTime()
+ {
+ return _properties.getCreationTime();
+ }
+
+ public void setTransmitTime(final Date transmitTime)
+ {
+ _properties.setCreationTime(transmitTime);
+ }
+
+ public UnsignedInteger getTtl()
+ {
+ return _header.getTtl();
+ }
+
+ public void setTtl(final UnsignedInteger ttl)
+ {
+ _header.setTtl(ttl);
+ }
+
+ public UnsignedInteger getFormerAcquirers()
+ {
+ return _header.getDeliveryCount();
+ }
+
+ public void setFormerAcquirers(final UnsignedInteger formerAcquirers)
+ {
+ _header.setDeliveryCount(formerAcquirers);
+ }
+
+ public Object getMessageId()
+ {
+ return _properties.getMessageId();
+ }
+
+ public void setMessageId(final Object messageId)
+ {
+ _properties.setMessageId(messageId);
+ }
+
+ public Binary getUserId()
+ {
+ return _properties.getUserId();
+ }
+
+ public void setUserId(final Binary userId)
+ {
+ _properties.setUserId(userId);
+ }
+
+ public String getTo()
+ {
+ return _properties.getTo();
+ }
+
+ public void setTo(final String to)
+ {
+ _properties.setTo(to);
+ }
+
+ public String getSubject()
+ {
+ return _properties.getSubject();
+ }
+
+ public void setSubject(final String subject)
+ {
+ _properties.setSubject(subject);
+ }
+
+ public String getReplyTo()
+ {
+ return _properties.getReplyTo();
+ }
+
+ public void setReplyTo(final String replyTo)
+ {
+ _properties.setReplyTo(replyTo);
+ }
+
+ public Object getCorrelationId()
+ {
+ return _properties.getCorrelationId();
+ }
+
+ public void setCorrelationId(final Binary correlationId)
+ {
+ _properties.setCorrelationId(correlationId);
+ }
+
+ public Symbol getContentType()
+ {
+ return _properties.getContentType();
+ }
+
+ public void setContentType(final Symbol contentType)
+ {
+ _properties.setContentType(contentType);
+ }
+
+ public void acknowledge() throws JMSException
+ {
+ _sessionImpl.acknowledgeAll();
+ }
+
+ public void clearBody() throws JMSException
+ {
+ _readOnly = false;
+ }
+
+ protected boolean isReadOnly()
+ {
+ return _readOnly;
+ }
+
+ protected void checkReadable() throws MessageNotReadableException
+ {
+ if (!isReadOnly())
+ {
+ throw new MessageNotReadableException("You need to call reset() to make the message readable");
+ }
+ }
+
+ protected void checkWritable() throws MessageNotWriteableException
+ {
+ if (isReadOnly())
+ {
+ throw new MessageNotWriteableException("You need to call clearBody() to make the message writable");
+ }
+ }
+
+ public void setReadOnly()
+ {
+ _readOnly = true;
+ }
+
+ private static class InvalidJMSMEssageIdException extends JMSException
+ {
+ public InvalidJMSMEssageIdException(String messageId)
+ {
+ super("Invalid JMSMessageID: '" + messageId + "', JMSMessageID MUST start with 'ID:'");
+ }
+ }
+
+ private class NonAMQPDestinationException extends JMSException
+ {
+ public NonAMQPDestinationException(Destination destination)
+ {
+ super("Destinations not a valid AMQP Destination, class of type: '"
+ + destination.getClass().getName()
+ + "', require '"
+ + org.apache.qpid.amqp_1_0.jms.Destination.class.getName() + "'.");
+ }
+ }
+
+ private class InvalidJMSPriorityException extends JMSException
+ {
+ public InvalidJMSPriorityException(int priority)
+ {
+ super("The provided priority: " + priority + " is not valid in AMQP, valid values are from 0 to 255");
+ }
+ }
+
+ Header getHeader()
+ {
+ return _header;
+ }
+
+ Properties getProperties()
+ {
+ return _properties;
+ }
+
+
+ Footer getFooter()
+ {
+ return _footer;
+ }
+
+ MessageAnnotations getMessageAnnotations()
+ {
+ return _messageAnnotations;
+ }
+
+ public ApplicationProperties getApplicationProperties()
+ {
+ return _applicationProperties;
+ }
+
+ public void reset() throws JMSException
+ {
+ _readOnly = true;
+ }
+
+ void setFromQueue(final boolean fromQueue)
+ {
+ _isFromQueue = fromQueue;
+ }
+
+ void setFromTopic(final boolean fromTopic)
+ {
+ _isFromTopic = fromTopic;
+ }
+
+ abstract Collection<Section> getSections();
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
new file mode 100644
index 0000000000..5bb8845eb7
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.client.Sender;
+import org.apache.qpid.amqp_1_0.jms.MessageProducer;
+import org.apache.qpid.amqp_1_0.jms.Queue;
+import org.apache.qpid.amqp_1_0.jms.QueueSender;
+import org.apache.qpid.amqp_1_0.jms.TemporaryDestination;
+import org.apache.qpid.amqp_1_0.jms.TopicPublisher;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+import java.util.UUID;
+
+public class MessageProducerImpl implements MessageProducer, QueueSender, TopicPublisher
+{
+ private boolean _disableMessageID;
+ private boolean _disableMessageTimestamp;
+ private int _deliveryMode = Message.DEFAULT_DELIVERY_MODE;
+ private int _priority = Message.DEFAULT_PRIORITY;
+ private long _timeToLive;
+
+ private DestinationImpl _destination;
+ private SessionImpl _session;
+ private Sender _sender;
+ private boolean _closed;
+
+ protected MessageProducerImpl(final Destination destination,
+ final SessionImpl session) throws JMSException
+ {
+ if(destination instanceof DestinationImpl)
+ {
+ _destination = (DestinationImpl) destination;
+ }
+ else if(destination != null)
+ {
+ throw new InvalidDestinationException("Invalid Destination Class" + destination.getClass().getName());
+ }
+ _session = session;
+
+ if(_destination != null)
+ {
+ try
+ {
+ _sender = _session.getClientSession().createSender(_destination.getAddress());
+ }
+ catch (Sender.SenderCreationException e)
+ {
+ // TODO - refine exception
+ JMSException jmsEx = new JMSException(e.getMessage());
+ jmsEx.initCause(e);
+ jmsEx.setLinkedException(e);
+ throw jmsEx;
+ }
+ }
+ }
+
+ private void checkClosed() throws IllegalStateException
+ {
+ if(_closed)
+ {
+ throw new javax.jms.IllegalStateException("Producer closed");
+ }
+ }
+
+ public boolean getDisableMessageID() throws IllegalStateException
+ {
+ checkClosed();
+ return _disableMessageID;
+ }
+
+ public void setDisableMessageID(final boolean disableMessageID) throws IllegalStateException
+ {
+ checkClosed();
+ _disableMessageID = disableMessageID;
+ }
+
+ public boolean getDisableMessageTimestamp() throws IllegalStateException
+ {
+ checkClosed();
+ return _disableMessageTimestamp;
+ }
+
+ public void setDisableMessageTimestamp(final boolean disableMessageTimestamp) throws IllegalStateException
+ {
+ checkClosed();
+ _disableMessageTimestamp = disableMessageTimestamp;
+ }
+
+ public int getDeliveryMode() throws IllegalStateException
+ {
+ checkClosed();
+ return _deliveryMode;
+ }
+
+ public void setDeliveryMode(final int deliveryMode) throws IllegalStateException
+ {
+ checkClosed();
+ _deliveryMode = deliveryMode;
+ }
+
+ public int getPriority() throws IllegalStateException
+ {
+ checkClosed();
+ return _priority;
+ }
+
+ public void setPriority(final int priority) throws IllegalStateException
+ {
+ checkClosed();
+ _priority = priority;
+ }
+
+ public long getTimeToLive() throws IllegalStateException
+ {
+ checkClosed();
+ return _timeToLive;
+ }
+
+ public void setTimeToLive(final long timeToLive) throws IllegalStateException
+ {
+ checkClosed();
+ _timeToLive = timeToLive;
+ }
+
+ public DestinationImpl getDestination() throws JMSException
+ {
+ checkClosed();
+ return _destination;
+ }
+
+ public void close() throws JMSException
+ {
+ try
+ {
+ if(!_closed)
+ {
+ _closed = true;
+ if(_sender != null)
+ {
+ _sender.close();
+ }
+ }
+
+ }
+ catch (Sender.SenderClosingException e)
+ {
+ final JMSException jmsException = new JMSException("error closing");
+ jmsException.setLinkedException(e);
+ throw jmsException;
+ }
+ }
+
+ public void send(final Message message) throws JMSException
+ {
+ send(message, getDeliveryMode(), getPriority(), getTimeToLive());
+ }
+
+ public void send(final Message message, final int deliveryMode, final int priority, final long ttl) throws JMSException
+ {
+ if(_sender == null)
+ {
+ throw new UnsupportedOperationException("No Destination provided");
+ }
+ if(_destination instanceof TemporaryDestination && ((TemporaryDestination)_destination).isDeleted())
+ {
+ throw new IllegalStateException("Destination is deleted");
+ }
+
+
+ //TODO
+ MessageImpl msg;
+ if(message instanceof org.apache.qpid.amqp_1_0.jms.Message)
+ {
+ msg = (MessageImpl) message;
+ }
+ else
+ {
+ msg = _session.convertMessage(message);
+ }
+
+
+
+ msg.setJMSDeliveryMode(deliveryMode);
+ msg.setJMSPriority(priority);
+
+ msg.setJMSDestination(_destination);
+
+ long timestamp = 0l;
+
+ if(!getDisableMessageTimestamp() || ttl != 0)
+ {
+ timestamp = System.currentTimeMillis();
+ msg.setJMSTimestamp(timestamp);
+
+ }
+ if(ttl != 0)
+ {
+ msg.setTtl(UnsignedInteger.valueOf(ttl));
+ }
+ else
+ {
+ msg.setTtl(null);
+ }
+
+ if(!getDisableMessageID() && msg.getMessageId() == null)
+ {
+ final Binary messageId = generateMessageId();
+ msg.setMessageId(messageId);
+
+ }
+
+ if(message != msg)
+ {
+ message.setJMSTimestamp(msg.getJMSTimestamp());
+ message.setJMSMessageID(msg.getJMSMessageID());
+ message.setJMSDeliveryMode(msg.getJMSDeliveryMode());
+ message.setJMSPriority(msg.getJMSPriority());
+ message.setJMSExpiration(msg.getJMSExpiration());
+ }
+
+
+ final org.apache.qpid.amqp_1_0.client.Message clientMessage = new org.apache.qpid.amqp_1_0.client.Message(msg.getSections());
+
+ _sender.send(clientMessage, _session.getTxn());
+
+ if(getDestination() != null)
+ {
+ message.setJMSDestination(getDestination());
+ }
+ }
+
+ public void send(final javax.jms.Queue queue, final Message message) throws JMSException
+ {
+ send((Destination)queue, message);
+ }
+
+ public void send(final javax.jms.Queue queue, final Message message, final int deliveryMode, final int priority, final long ttl)
+ throws JMSException
+ {
+ send((Destination)queue, message, deliveryMode, priority, ttl);
+ }
+
+ private Binary generateMessageId()
+ {
+ UUID uuid = UUID.randomUUID();
+ return new Binary(uuid.toString().getBytes());
+ }
+
+ public void send(final Destination destination, final Message message) throws JMSException
+ {
+ send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive());
+ }
+
+ public void send(final Destination destination, final Message message, final int deliveryMode, final int priority, final long ttl)
+ throws JMSException
+ {
+
+ checkClosed();
+ if(destination == null)
+ {
+ send(message, deliveryMode, priority, ttl);
+ }
+ else
+ {
+ if(_destination != null)
+ {
+ throw new UnsupportedOperationException("Cannot use explicit destination pon non-anonymous producer");
+ }
+ else if(!(destination instanceof DestinationImpl))
+ {
+ throw new InvalidDestinationException("Invalid Destination Class" + destination.getClass().getName());
+ }
+ else if(destination instanceof TemporaryDestination && ((TemporaryDestination)destination).isDeleted())
+ {
+ throw new IllegalStateException("Destination has been deleted");
+ }
+ try
+ {
+ _destination = (DestinationImpl) destination;
+ _sender = _session.getClientSession().createSender(_destination.getAddress());
+
+ send(message, deliveryMode, priority, ttl);
+
+ _sender.close();
+
+
+
+ }
+ catch (Sender.SenderCreationException e)
+ {
+ // TODO - refine exception
+ JMSException jmsEx = new JMSException(e.getMessage());
+ jmsEx.initCause(e);
+ jmsEx.setLinkedException(e);
+ throw jmsEx;
+ }
+ catch (Sender.SenderClosingException e)
+ {
+ JMSException jmsEx = new JMSException(e.getMessage());
+ jmsEx.initCause(e);
+ jmsEx.setLinkedException(e);
+ throw jmsEx;
+ }
+ finally
+ {
+ _sender = null;
+ _destination = null;
+ }
+ }
+ }
+
+ public QueueImpl getQueue() throws JMSException
+ {
+ return (QueueImpl) getDestination();
+ }
+
+ public TopicImpl getTopic() throws JMSException
+ {
+ return (TopicImpl) getDestination();
+ }
+
+ public void publish(final Message message) throws JMSException
+ {
+ send(message);
+ }
+
+ public void publish(final Message message, final int deliveryMode, final int priority, final long ttl) throws JMSException
+ {
+ send(message, deliveryMode, priority, ttl);
+ }
+
+ public void publish(final Topic topic, final Message message) throws JMSException
+ {
+ send(topic, message);
+ }
+
+ public void publish(final Topic topic, final Message message, final int deliveryMode, final int priority, final long ttl)
+ throws JMSException
+ {
+ send(topic, message, deliveryMode, priority, ttl);
+ }
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java
new file mode 100644
index 0000000000..2a52b0557a
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.jms.ObjectMessage;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+
+import javax.jms.JMSException;
+import javax.jms.MessageNotWriteableException;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.*;
+
+public class ObjectMessageImpl extends MessageImpl implements ObjectMessage
+{
+ static final Symbol CONTENT_TYPE = Symbol.valueOf("application/x-java-serialized-object");
+
+ private Data _objectData;
+
+ protected ObjectMessageImpl(Header header,
+ MessageAnnotations messageAnnotations,
+ Properties properties,
+ ApplicationProperties appProperties,
+ Data dataSection,
+ Footer footer,
+ SessionImpl session)
+ {
+ super(header, messageAnnotations, properties, appProperties, footer, session);
+ getProperties().setContentType(CONTENT_TYPE);
+ Serializable serializable = null;
+ _objectData = dataSection;
+
+ }
+
+ protected ObjectMessageImpl(final SessionImpl session)
+ {
+ super(new Header(), new MessageAnnotations(new HashMap()),
+ new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
+ session);
+ getProperties().setContentType(CONTENT_TYPE);
+ }
+
+ public void setObject(final Serializable serializable) throws MessageNotWriteableException
+ {
+ checkWritable();
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try
+ {
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(serializable);
+ oos.flush();
+ oos.close();
+
+ _objectData = new Data(new Binary(baos.toByteArray()));
+
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace(); //TODO
+ }
+ }
+
+ public Serializable getObject() throws JMSException
+ {
+
+ if(_objectData == null)
+ {
+ return null;
+ }
+
+ Binary data = _objectData.getValue();
+
+ try
+ {
+ ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data.getArray(), data.getArrayOffset(), data.getLength()));
+ return (Serializable) ois.readObject();
+ }
+ catch (IOException e)
+ {
+ JMSException jmsException = new JMSException(e.getMessage());
+ jmsException.setLinkedException(e);
+ throw jmsException;
+ }
+ catch (ClassNotFoundException e)
+ {
+
+ JMSException jmsException = new JMSException(e.getMessage());
+ jmsException.setLinkedException(e);
+ throw jmsException;
+ }
+
+ }
+
+ @Override
+ public void clearBody() throws JMSException
+ {
+ super.clearBody();
+ _objectData = null;
+ }
+
+ @Override Collection<Section> getSections()
+ {
+ List<Section> sections = new ArrayList<Section>();
+ sections.add(getHeader());
+ if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty())
+ {
+ sections.add(getMessageAnnotations());
+ }
+ sections.add(getProperties());
+ sections.add(getApplicationProperties());
+
+ sections.add(_objectData);
+
+ sections.add(getFooter());
+ return sections;
+ }
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java
new file mode 100644
index 0000000000..a068ca7a08
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.client.AcknowledgeMode;
+import org.apache.qpid.amqp_1_0.client.Message;
+import org.apache.qpid.amqp_1_0.client.Receiver;
+import org.apache.qpid.amqp_1_0.jms.QueueBrowser;
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.DistributionMode;
+import org.apache.qpid.amqp_1_0.type.Outcome;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.messaging.Filter;
+import org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter;
+import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
+import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
+
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.Map;
+
+public class QueueBrowserImpl implements QueueBrowser
+{
+ private static final String JMS_SELECTOR = "jms-selector";
+ private QueueImpl _queue;
+ private String _selector;
+ private Receiver _receiver;
+ private Message _nextElement;
+ private MessageEnumeration _enumeration;
+
+ QueueBrowserImpl(final QueueImpl queue, final String selector, SessionImpl session) throws JMSException
+ {
+ _queue = queue;
+ _selector = selector;
+
+
+ Map<Symbol, Filter> filters;
+ if(selector == null || selector.trim().equals(""))
+ {
+ filters = null;
+ }
+ else
+ {
+ filters = Collections.singletonMap(Symbol.valueOf(JMS_SELECTOR),(Filter) new JMSSelectorFilter(_selector));
+ }
+
+
+ try
+ {
+ _receiver = session.getClientSession().createReceiver(queue.getAddress(),
+ StdDistMode.COPY,
+ AcknowledgeMode.AMO,null,
+ false,
+ filters, null);
+ _nextElement = _receiver.receive(0L);
+ _enumeration = new MessageEnumeration();
+ }
+ catch(AmqpErrorException e)
+ {
+ org.apache.qpid.amqp_1_0.type.transport.Error error = e.getError();
+ if(AmqpError.INVALID_FIELD.equals(error.getCondition())
+ && error.getInfo() != null && Symbol.valueOf("filter").equals(error.getInfo().get(Symbol.valueOf
+ ("field"))))
+ {
+ throw new InvalidSelectorException(e.getMessage());
+ }
+ else
+ {
+ throw new JMSException(e.getMessage(), error.getCondition().getValue().toString());
+ }
+
+ }
+ }
+
+ public QueueImpl getQueue()
+ {
+ return _queue;
+ }
+
+ public String getMessageSelector()
+ {
+ return _selector;
+ }
+
+ public Enumeration getEnumeration() throws JMSException
+ {
+ if(_enumeration == null)
+ {
+ throw new IllegalStateException("Browser has been closed");
+ }
+ return _enumeration;
+ }
+
+ public void close() throws JMSException
+ {
+ _receiver.close();
+ _enumeration = null;
+ }
+
+ private final class MessageEnumeration implements Enumeration<Message>
+ {
+
+ @Override
+ public boolean hasMoreElements()
+ {
+ return _nextElement != null;
+ }
+
+ @Override
+ public Message nextElement()
+ {
+
+ Message message = _nextElement;
+ if(message == null)
+ {
+ message = _receiver.receive(0l);
+ }
+ if(message != null)
+ {
+ _nextElement = _receiver.receive(0l);
+ }
+ return message;
+ }
+ }
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueConnectionImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueConnectionImpl.java
new file mode 100644
index 0000000000..657efd80a3
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueConnectionImpl.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.jms.QueueConnection;
+
+import javax.jms.ConnectionConsumer;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.ServerSessionPool;
+
+public class QueueConnectionImpl extends ConnectionImpl implements QueueConnection
+{
+ QueueConnectionImpl(String host, int port, String username, String password, String clientId)
+ throws JMSException
+ {
+ super(host, port, username, password, clientId);
+ }
+
+ public QueueSessionImpl createQueueSession(final boolean b, final int i) throws JMSException
+ {
+ return null; //TODO
+ }
+
+ public ConnectionConsumer createConnectionConsumer(final Queue queue,
+ final String s,
+ final ServerSessionPool serverSessionPool,
+ final int i) throws JMSException
+ {
+ return null; //TODO
+ }
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueImpl.java
new file mode 100644
index 0000000000..c88bd8268c
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueImpl.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.jms.Queue;
+
+import java.util.WeakHashMap;
+
+public class QueueImpl extends DestinationImpl implements Queue
+{
+ private static final WeakHashMap<String, QueueImpl> QUEUE_CACHE =
+ new WeakHashMap<String, QueueImpl>();
+
+ public QueueImpl(String address)
+ {
+ super(address);
+ }
+
+ public String getQueueName()
+ {
+ return getAddress();
+ }
+
+ public static synchronized QueueImpl createQueue(final String address)
+ {
+ QueueImpl queue = QUEUE_CACHE.get(address);
+ if(queue == null)
+ {
+ queue = new QueueImpl(address);
+ QUEUE_CACHE.put(address, queue);
+ }
+ return queue;
+ }
+
+ public static QueueImpl valueOf(String address)
+ {
+ return address == null ? null : createQueue(address);
+ }
+
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java
new file mode 100644
index 0000000000..d46ed7183f
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.client.Receiver;
+import org.apache.qpid.amqp_1_0.jms.Queue;
+import org.apache.qpid.amqp_1_0.jms.QueueReceiver;
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
+
+import javax.jms.*;
+
+public class QueueReceiverImpl extends MessageConsumerImpl implements QueueReceiver
+{
+ QueueReceiverImpl(final QueueImpl destination,
+ final SessionImpl session,
+ final String selector,
+ final boolean noLocal)
+ throws JMSException
+ {
+ super(destination, session, selector, noLocal);
+ setQueueConsumer(true);
+ }
+
+ protected Receiver createClientReceiver() throws JMSException
+ {
+ try
+ {
+ return getSession().getClientSession().createMovingReceiver(getDestination().getAddress());
+ }
+ catch (AmqpErrorException e)
+ {
+ throw new JMSException(e.getMessage(), e.getError().getCondition().toString());
+ }
+ }
+
+ public Queue getQueue() throws JMSException
+ {
+ return (QueueImpl) getDestination();
+ }
+
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSenderImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSenderImpl.java
new file mode 100644
index 0000000000..b3db43801a
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSenderImpl.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.jms.QueueSender;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+public class QueueSenderImpl extends MessageProducerImpl implements QueueSender
+{
+ protected QueueSenderImpl(final Destination destination, final SessionImpl session)
+ throws JMSException
+ {
+ super(destination, session);
+ }
+
+
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSessionImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSessionImpl.java
new file mode 100644
index 0000000000..e5ed8b3b3d
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSessionImpl.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.jms.QueueSession;
+
+import javax.jms.JMSException;
+import javax.jms.Queue;
+
+public class QueueSessionImpl extends SessionImpl implements QueueSession
+{
+ protected QueueSessionImpl(final ConnectionImpl connection, final AcknowledgeMode acknowledgeMode)
+ {
+ super(connection, acknowledgeMode);
+ setQueueSession(true);
+ }
+
+ public QueueReceiverImpl createReceiver(final Queue queue) throws JMSException
+ {
+ return createReceiver(queue, null);
+ }
+
+ public QueueReceiverImpl createReceiver(final Queue queue, final String selector) throws JMSException
+ {
+ // TODO - assert queue is a queueimpl and throw relevant JMS Exception
+ final QueueReceiverImpl messageConsumer;
+ synchronized(getClientSession().getEndpoint().getLock())
+ {
+ messageConsumer = new QueueReceiverImpl((QueueImpl)queue, this, selector, false);
+ addConsumer(messageConsumer);
+ }
+ return messageConsumer;
+
+ }
+
+ public QueueSenderImpl createSender(final Queue queue) throws JMSException
+ {
+ return null; //TODO
+ }
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
new file mode 100644
index 0000000000..08a6ac6f20
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
@@ -0,0 +1,885 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.client.Connection;
+import org.apache.qpid.amqp_1_0.client.Message;
+import org.apache.qpid.amqp_1_0.client.Receiver;
+import org.apache.qpid.amqp_1_0.client.Sender;
+import org.apache.qpid.amqp_1_0.client.Transaction;
+import org.apache.qpid.amqp_1_0.jms.QueueReceiver;
+import org.apache.qpid.amqp_1_0.jms.QueueSender;
+import org.apache.qpid.amqp_1_0.jms.QueueSession;
+import org.apache.qpid.amqp_1_0.jms.Session;
+import org.apache.qpid.amqp_1_0.jms.TemporaryDestination;
+import org.apache.qpid.amqp_1_0.jms.TopicPublisher;
+import org.apache.qpid.amqp_1_0.jms.TopicSession;
+import org.apache.qpid.amqp_1_0.jms.TopicSubscriber;
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
+import org.apache.qpid.amqp_1_0.type.messaging.Target;
+import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
+
+import javax.jms.*;
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.MapMessage;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.UUID;
+
+public class SessionImpl implements Session, QueueSession, TopicSession
+{
+ private ConnectionImpl _connection;
+ private AcknowledgeMode _acknowledgeMode;
+ private org.apache.qpid.amqp_1_0.client.Session _session;
+ private MessageFactory _messageFactory;
+ private List<MessageConsumerImpl> _consumers = new ArrayList<MessageConsumerImpl>();
+ private List<MessageProducerImpl> _producers = new ArrayList<MessageProducerImpl>();
+
+ private MessageListener _messageListener;
+ private Dispatcher _dispatcher = new Dispatcher();
+ private Thread _dispatcherThread;
+
+ private boolean _closed;
+
+ private boolean _isQueueSession;
+ private boolean _isTopicSession;
+ private Transaction _txn;
+
+ protected SessionImpl(final ConnectionImpl connection, final AcknowledgeMode acknowledgeMode)
+ {
+ _connection = connection;
+ _acknowledgeMode = acknowledgeMode;
+ Connection clientConn = _connection.getClientConnection();
+ _session = clientConn.createSession();
+ if(_acknowledgeMode == AcknowledgeMode.SESSION_TRANSACTED)
+ {
+ _txn = _session.createSessionLocalTransaction();
+ }
+
+ _messageFactory = new MessageFactory(this);
+
+ _dispatcherThread = new Thread(_dispatcher);
+ _dispatcherThread.start();
+ }
+
+ public BytesMessageImpl createBytesMessage() throws IllegalStateException
+ {
+ checkClosed();
+ return new BytesMessageImpl(this);
+
+ }
+
+ public MapMessageImpl createMapMessage() throws JMSException
+ {
+ checkClosed();
+ return new MapMessageImpl(this);
+ }
+
+ public MessageImpl createMessage() throws IllegalStateException
+ {
+ return createAmqpMessage();
+ }
+
+ public ObjectMessageImpl createObjectMessage() throws JMSException
+ {
+ checkClosed();
+ return new ObjectMessageImpl(this);
+ }
+
+ public ObjectMessageImpl createObjectMessage(final Serializable serializable) throws JMSException
+ {
+ checkClosed();
+ ObjectMessageImpl msg = new ObjectMessageImpl(this);
+ msg.setObject(serializable);
+ return msg;
+ }
+
+ public StreamMessageImpl createStreamMessage() throws JMSException
+ {
+ checkClosed();
+ return new StreamMessageImpl(this);
+ }
+
+ public TextMessageImpl createTextMessage() throws JMSException
+ {
+ return createTextMessage("");
+ }
+
+ public TextMessageImpl createTextMessage(final String s) throws JMSException
+ {
+ checkClosed();
+ TextMessageImpl msg = new TextMessageImpl(this);
+ msg.setText(s);
+ return msg;
+ }
+
+ public AmqpMessageImpl createAmqpMessage() throws IllegalStateException
+ {
+ checkClosed();
+ return new AmqpMessageImpl(this);
+ }
+
+ public boolean getTransacted() throws JMSException
+ {
+ checkClosed();
+ return _acknowledgeMode == AcknowledgeMode.SESSION_TRANSACTED;
+ }
+
+ public int getAcknowledgeMode() throws IllegalStateException
+ {
+ checkClosed();
+ return _acknowledgeMode.ordinal();
+ }
+
+ AcknowledgeMode getAckModeEnum()
+ {
+ return _acknowledgeMode;
+ }
+
+ public void commit() throws JMSException
+ {
+ checkClosed();
+ checkTransactional();
+
+ _txn.commit();
+ for(MessageConsumerImpl consumer : _consumers)
+ {
+ consumer.postCommit();
+ }
+
+ _txn = _session.createSessionLocalTransaction();
+ //TODO
+ }
+
+ public void rollback() throws JMSException
+ {
+ checkClosed();
+ checkTransactional();
+
+ _txn.rollback();
+
+ for(MessageConsumerImpl consumer : _consumers)
+ {
+ consumer.postRollback();
+ }
+
+ _txn = _session.createSessionLocalTransaction();
+
+ //TODO
+ }
+
+ private void checkTransactional() throws JMSException
+ {
+ if(!getTransacted())
+ {
+ throw new IllegalStateException("Session must be transacted in order to perform this operation");
+ }
+ }
+
+ public void close() throws JMSException
+ {
+ if(!_closed)
+ {
+ _closed = true;
+ _dispatcher.close();
+ for(MessageConsumerImpl consumer : _consumers)
+ {
+ consumer.close();
+ }
+ for(MessageProducerImpl producer : _producers)
+ {
+ producer.close();
+ }
+ _session.close();
+ }
+ }
+
+ private void checkClosed() throws IllegalStateException
+ {
+ if(_closed)
+ {
+ throw new IllegalStateException("Closed");
+ }
+ }
+
+ public void recover() throws JMSException
+ {
+ checkClosed();
+ checkNotTransactional();
+
+ if(_acknowledgeMode == AcknowledgeMode.CLIENT_ACKNOWLEDGE)
+ {
+ synchronized(_session.getEndpoint().getLock())
+ {
+ for(MessageConsumerImpl consumer : _consumers)
+ {
+ consumer.doRecover();
+ }
+ }
+ }
+ else
+ {
+ if(Thread.currentThread() == _dispatcherThread)
+ {
+ _dispatcher.doRecover();
+ }
+ }
+
+ }
+
+ private void checkNotTransactional() throws JMSException
+ {
+
+ if(getTransacted())
+ {
+ throw new IllegalStateException("This operation cannot be carried out on a transacted session");
+ }
+ }
+
+ public MessageListener getMessageListener() throws JMSException
+ {
+ return _messageListener;
+ }
+
+ public void setMessageListener(final MessageListener messageListener) throws JMSException
+ {
+ if(_messageListener != null)
+ {
+ // TODO
+ }
+ else
+ {
+ _messageListener = messageListener;
+ }
+ }
+
+ public void run()
+ {
+ //TODO
+ }
+
+ public MessageProducerImpl createProducer(final Destination destination) throws JMSException
+ {
+ checkClosed();
+
+ final MessageProducerImpl messageProducer = new MessageProducerImpl(destination, this);
+
+ _producers.add(messageProducer);
+
+ return messageProducer;
+ }
+
+ public MessageConsumerImpl createConsumer(final Destination destination) throws JMSException
+ {
+ checkClosed();
+ return createConsumer(destination, null, false);
+ }
+
+ public MessageConsumerImpl createConsumer(final Destination destination, final String selector) throws JMSException
+ {
+ checkClosed();
+ return createConsumer(destination, selector, false);
+ }
+
+ public MessageConsumerImpl createConsumer(final Destination destination, final String selector, final boolean noLocal)
+ throws JMSException
+ {
+ checkClosed();
+ checkValidDestination(destination);
+ if(destination instanceof TemporaryDestination)
+ {
+ TemporaryDestination temporaryDestination = (TemporaryDestination) destination;
+ if(temporaryDestination.getSession() != this)
+ {
+ throw new JMSException("Cannot consume from a temporary destination created on another session");
+ }
+ if(temporaryDestination.isDeleted())
+ {
+ throw new IllegalStateException("Destination is deleted");
+ }
+ }
+ final MessageConsumerImpl messageConsumer;
+ synchronized(_session.getEndpoint().getLock())
+ {
+ messageConsumer = new MessageConsumerImpl(destination, this, selector, noLocal);
+ addConsumer(messageConsumer);
+ if(_connection.isStarted())
+ {
+ messageConsumer.start();
+ }
+ }
+ return messageConsumer;
+ }
+
+ private void checkValidDestination(Destination destination) throws InvalidDestinationException
+ {
+ if (destination == null || !(destination instanceof DestinationImpl))
+ {
+ throw new InvalidDestinationException("Invalid Destination");
+ }
+ }
+
+
+ protected void addConsumer(final MessageConsumerImpl messageConsumer)
+ {
+ _consumers.add(messageConsumer);
+ }
+
+ public QueueImpl createQueue(final String s) throws JMSException
+ {
+ checkClosed();
+ checkNotTopicSession();
+ return new QueueImpl(s);
+ }
+
+ public QueueReceiver createReceiver(final Queue queue) throws JMSException
+ {
+ checkClosed();
+ checkNotTopicSession();
+ return createConsumer(queue);
+ }
+
+ public QueueReceiver createReceiver(final Queue queue, final String selector) throws JMSException
+ {
+ checkClosed();
+ checkNotTopicSession();
+ return createConsumer(queue, selector);
+ }
+
+ public QueueSender createSender(final Queue queue) throws JMSException
+ {
+ checkClosed();
+ checkNotTopicSession();
+ return createProducer(queue);
+ }
+
+ public TopicImpl createTopic(final String s) throws JMSException
+ {
+ checkClosed();
+ checkNotQueueSession();
+ return new TopicImpl(s);
+ }
+
+ public TopicSubscriber createSubscriber(final Topic topic) throws JMSException
+ {
+ checkClosed();
+ checkNotQueueSession();
+ return createConsumer(topic);
+ }
+
+ public TopicSubscriber createSubscriber(final Topic topic, final String selector, final boolean noLocal) throws JMSException
+ {
+ checkClosed();
+ checkNotQueueSession();
+ return createConsumer(topic, selector, noLocal);
+ }
+
+ public TopicSubscriberImpl createDurableSubscriber(final Topic topic, final String name) throws JMSException
+ {
+ checkClosed();
+ checkNotQueueSession();
+ return createDurableSubscriber(topic, name, null, false);
+ }
+
+ private void checkNotQueueSession() throws IllegalStateException
+ {
+ if(_isQueueSession)
+ {
+ throw new IllegalStateException("Cannot perform this operation on a QueueSession");
+ }
+ }
+
+
+ private void checkNotTopicSession() throws IllegalStateException
+ {
+ if(_isTopicSession)
+ {
+ throw new IllegalStateException("Cannot perform this operation on a TopicSession");
+ }
+ }
+
+ public TopicSubscriberImpl createDurableSubscriber(final Topic topic, final String name, final String selector, final boolean noLocal)
+ throws JMSException
+ {
+ checkClosed();
+ checkNotQueueSession();
+ if(!(topic instanceof TopicImpl))
+ {
+ throw new InvalidDestinationException("invalid destination " + topic);
+ }
+ final TopicSubscriberImpl messageConsumer;
+ synchronized(_session.getEndpoint().getLock())
+ {
+ messageConsumer = new TopicSubscriberImpl(name, true, (org.apache.qpid.amqp_1_0.jms.Topic) topic, this,
+ selector,
+ noLocal);
+ addConsumer(messageConsumer);
+ if(_connection.isStarted())
+ {
+ messageConsumer.start();
+ }
+ }
+ return messageConsumer;
+ }
+
+ public TopicPublisher createPublisher(final Topic topic) throws JMSException
+ {
+ checkClosed();
+ checkNotQueueSession();
+ return createProducer(topic);
+ }
+
+ public QueueBrowserImpl createBrowser(final Queue queue) throws JMSException
+ {
+ checkClosed();
+ checkNotTopicSession();
+ checkValidDestination(queue);
+ return createBrowser(queue, null);
+ }
+
+ public QueueBrowserImpl createBrowser(final Queue queue, final String selector) throws JMSException
+ {
+ checkClosed();
+ checkNotTopicSession();
+ checkValidDestination(queue);
+
+ return new QueueBrowserImpl((QueueImpl) queue, selector, this);
+
+ }
+
+ public TemporaryQueueImpl createTemporaryQueue() throws JMSException
+ {
+ checkClosed();
+ checkNotTopicSession();
+ try
+ {
+ Sender send = _session.createTemporaryQueueSender();
+
+ TemporaryQueueImpl tempQ = new TemporaryQueueImpl(((Target)send.getTarget()).getAddress(), send, this);
+ return tempQ;
+ }
+ catch (Sender.SenderCreationException e)
+ {
+ throw new JMSException("Unable to create temporary queue");
+ }
+ }
+
+ public TemporaryTopicImpl createTemporaryTopic() throws JMSException
+ {
+ checkClosed();
+ checkNotQueueSession();
+ try
+ {
+ Sender send = _session.createTemporaryQueueSender();
+
+ TemporaryTopicImpl tempQ = new TemporaryTopicImpl(((Target)send.getTarget()).getAddress(), send, this);
+ return tempQ;
+ }
+ catch (Sender.SenderCreationException e)
+ {
+ throw new JMSException("Unable to create temporary queue");
+ }
+ }
+
+ public void unsubscribe(final String s) throws JMSException
+ {
+ checkClosed();
+
+ checkNotQueueSession();
+
+ Target target = new Target();
+ target.setAddress(UUID.randomUUID().toString());
+
+ try
+ {
+ Receiver receiver = new Receiver(getClientSession(), s, target, null,
+ org.apache.qpid.amqp_1_0.client.AcknowledgeMode.ALO, false);
+ receiver.close();
+ }
+ catch(AmqpErrorException e)
+ {
+ if(e.getError().getCondition() == AmqpError.NOT_FOUND)
+ {
+ throw new InvalidDestinationException(s);
+ }
+ else
+ {
+ JMSException jmsException = new JMSException(e.getMessage());
+ jmsException.setLinkedException(e);
+ throw jmsException;
+ }
+ }
+
+ //TODO
+ }
+
+ void stop()
+ {
+ //TODO
+ }
+
+ void start()
+ {
+ _dispatcher.start();
+ for(MessageConsumerImpl consumer : _consumers)
+ {
+ consumer.start();
+ }
+ }
+
+ org.apache.qpid.amqp_1_0.client.Session getClientSession()
+ {
+ return _session;
+ }
+
+ public MessageFactory getMessageFactory()
+ {
+ return _messageFactory;
+ }
+
+ void acknowledgeAll() throws IllegalStateException
+ {
+ synchronized(_session.getEndpoint().getLock())
+ {
+ checkClosed();
+ for(MessageConsumerImpl consumer : _consumers)
+ {
+ consumer.acknowledgeAll();
+ }
+ }
+ }
+
+ void messageListenerSet(final MessageConsumerImpl messageConsumer)
+ {
+ _dispatcher.updateMessageListener(messageConsumer);
+ }
+
+ public void messageArrived(final MessageConsumerImpl messageConsumer)
+ {
+ _dispatcher.messageArrivedAtConsumer(messageConsumer);
+ }
+
+ MessageImpl convertMessage(final javax.jms.Message message) throws JMSException
+ {
+ MessageImpl replacementMessage;
+
+ if(message instanceof BytesMessage)
+ {
+ replacementMessage = convertBytesMessage((BytesMessage) message);
+ }
+ else
+ {
+ if(message instanceof MapMessage)
+ {
+ replacementMessage = convertMapMessage((MapMessage) message);
+ }
+ else
+ {
+ if(message instanceof ObjectMessage)
+ {
+ replacementMessage = convertObjectMessage((ObjectMessage) message);
+ }
+ else
+ {
+ if(message instanceof StreamMessage)
+ {
+ replacementMessage = convertStreamMessage((StreamMessage) message);
+ }
+ else
+ {
+ if(message instanceof TextMessage)
+ {
+ replacementMessage = convertTextMessage((TextMessage) message);
+ }
+ else
+ {
+ replacementMessage = createMessage();
+ }
+ }
+ }
+ }
+ }
+
+ convertMessageProperties(message, replacementMessage);
+
+ return replacementMessage;
+ }
+
+
+ private void convertMessageProperties(final javax.jms.Message message, final MessageImpl replacementMessage)
+ throws JMSException
+ {
+ Enumeration propertyNames = message.getPropertyNames();
+ while (propertyNames.hasMoreElements())
+ {
+ String propertyName = String.valueOf(propertyNames.nextElement());
+ // TODO: Shouldn't need to check for JMS properties here as don't think getPropertyNames() should return them
+ if (!propertyName.startsWith("JMSX_"))
+ {
+ Object value = message.getObjectProperty(propertyName);
+ replacementMessage.setObjectProperty(propertyName, value);
+ }
+ }
+
+
+ replacementMessage.setJMSDeliveryMode(message.getJMSDeliveryMode());
+
+ if (message.getJMSReplyTo() != null)
+ {
+ replacementMessage.setJMSReplyTo(message.getJMSReplyTo());
+ }
+
+ replacementMessage.setJMSType(message.getJMSType());
+
+ replacementMessage.setJMSCorrelationID(message.getJMSCorrelationID());
+ }
+
+ private MessageImpl convertMapMessage(final MapMessage message) throws JMSException
+ {
+ MapMessageImpl mapMessage = createMapMessage();
+
+ Enumeration mapNames = message.getMapNames();
+ while (mapNames.hasMoreElements())
+ {
+ String name = (String) mapNames.nextElement();
+ mapMessage.setObject(name, message.getObject(name));
+ }
+
+ return mapMessage;
+ }
+
+ private MessageImpl convertBytesMessage(final BytesMessage message) throws JMSException
+ {
+ BytesMessageImpl bytesMessage = createBytesMessage();
+
+ message.reset();
+
+ byte[] buf = new byte[1024];
+
+ int len;
+
+ while ((len = message.readBytes(buf)) != -1)
+ {
+ bytesMessage.writeBytes(buf, 0, len);
+ }
+
+ return bytesMessage;
+ }
+
+ private MessageImpl convertObjectMessage(final ObjectMessage message) throws JMSException
+ {
+ ObjectMessageImpl objectMessage = createObjectMessage();
+ objectMessage.setObject(message.getObject());
+ return objectMessage;
+ }
+
+ private MessageImpl convertStreamMessage(final StreamMessage message) throws JMSException
+ {
+ StreamMessageImpl streamMessage = createStreamMessage();
+
+ try
+ {
+ message.reset();
+ while (true)
+ {
+ streamMessage.writeObject(message.readObject());
+ }
+ }
+ catch (MessageEOFException e)
+ {
+ // we're at the end so don't mind the exception
+ }
+
+ return streamMessage;
+ }
+
+ private MessageImpl convertTextMessage(final TextMessage message) throws JMSException
+ {
+ return createTextMessage(message.getText());
+ }
+
+ ConnectionImpl getConnection()
+ {
+ return _connection;
+ }
+
+ Transaction getTxn()
+ {
+ return _txn;
+ }
+
+
+ private class Dispatcher implements Runnable
+ {
+
+ private final List<MessageConsumerImpl> _messageConsumerList = new ArrayList<MessageConsumerImpl>();
+
+ private boolean _closed;
+ private boolean _started;
+
+ private Message _recoveredMessage;
+ private MessageConsumerImpl _recoveredConsumer;
+ private MessageConsumerImpl _currentConsumer;
+ private Message _currentMessage;
+
+ public void run()
+ {
+ synchronized(getLock())
+ {
+ while(!_closed)
+ {
+ while(!_started || (_recoveredMessage == null && _messageConsumerList.isEmpty()))
+ {
+ try
+ {
+ getLock().wait();
+ }
+ catch (InterruptedException e)
+ {
+ return;
+ }
+ }
+ while(_started && (_recoveredMessage != null || !_messageConsumerList.isEmpty()))
+ {
+ Message msg;
+
+ MessageConsumerImpl consumer;
+
+ boolean recoveredMessage = _recoveredMessage != null;
+ if(recoveredMessage)
+ {
+ consumer = _recoveredConsumer;
+ msg = _recoveredMessage;
+ _recoveredMessage = null;
+ _recoveredConsumer = null;
+ }
+ else
+ {
+ consumer = _messageConsumerList.remove(0);
+ msg = consumer.receive0(0L);
+ }
+
+
+ MessageListener listener = consumer._messageListener;
+
+ MessageImpl message = consumer.createJMSMessage(msg, recoveredMessage);
+
+ if(message != null)
+ {
+ _currentConsumer = consumer;
+ _currentMessage = msg;
+ try
+ {
+ listener.onMessage(message);
+ }
+ finally
+ {
+ _currentConsumer = null;
+ _currentMessage = null;
+ }
+
+ if((_recoveredMessage == null) && (_acknowledgeMode == AcknowledgeMode.AUTO_ACKNOWLEDGE
+ || _acknowledgeMode == AcknowledgeMode.DUPS_OK_ACKNOWLEDGE))
+ {
+ consumer.acknowledge(msg);
+ }
+ }
+
+ }
+
+
+ }
+ }
+ }
+
+ private Object getLock()
+ {
+ return _session.getEndpoint().getLock();
+ }
+
+ public void messageArrivedAtConsumer(MessageConsumerImpl impl)
+ {
+ synchronized (getLock())
+ {
+ _messageConsumerList.add(impl);
+ getLock().notifyAll();
+ }
+ }
+
+ public void close()
+ {
+ synchronized (getLock())
+ {
+ _closed = true;
+ getLock().notifyAll();
+ }
+ }
+
+ public void updateMessageListener(final MessageConsumerImpl messageConsumer)
+ {
+ synchronized (getLock())
+ {
+ getLock().notifyAll();
+ }
+ }
+
+ public void start()
+ {
+ synchronized (getLock())
+ {
+ _started = true;
+ getLock().notifyAll();
+ }
+ }
+
+ public void stop()
+ {
+ synchronized (getLock())
+ {
+ _started = false;
+ getLock().notifyAll();
+ }
+ }
+
+ public void doRecover()
+ {
+ _recoveredConsumer = _currentConsumer;
+ _recoveredMessage = _currentMessage;
+ }
+ }
+
+ void setQueueSession(final boolean queueSession)
+ {
+ _isQueueSession = queueSession;
+ }
+
+ void setTopicSession(final boolean topicSession)
+ {
+ _isTopicSession = topicSession;
+ }
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java
new file mode 100644
index 0000000000..8275de884e
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.jms.StreamMessage;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
+import java.io.EOFException;
+import java.util.*;
+
+public class StreamMessageImpl extends MessageImpl implements StreamMessage
+{
+ private List _list;
+ private boolean _readOnly;
+ private int _position = -1;
+ private int _offset = -1;
+
+
+
+ protected StreamMessageImpl(Header header, MessageAnnotations messageAnnotations, Properties properties, ApplicationProperties appProperties, List list,
+ Footer footer, SessionImpl session)
+ {
+ super(header, messageAnnotations, properties, appProperties, footer, session);
+ _list = list;
+ }
+
+ StreamMessageImpl(final SessionImpl session)
+ {
+ super(new Header(), new MessageAnnotations(new HashMap()), new Properties(),
+ new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
+ session);
+ _list = new ArrayList();
+ }
+
+ public StreamMessageImpl(final Header header,
+ final MessageAnnotations messageAnnotations,
+ final Properties properties,
+ final ApplicationProperties appProperties,
+ final List amqpListSection, final Footer footer)
+ {
+ super(header, messageAnnotations, properties, appProperties, footer, null);
+ _list = amqpListSection;
+ }
+
+ public boolean readBoolean() throws JMSException
+ {
+ Object obj = readObject();
+ if(obj instanceof Boolean)
+ {
+ return (Boolean) obj;
+ }
+ if(obj instanceof String || obj == null)
+ {
+ return Boolean.valueOf((String)obj);
+ }
+ else
+ {
+ throw new MessageFormatException("Cannot read " + obj.getClass().getName() + " as boolean");
+ }
+ }
+
+ @Override
+ public void clearBody() throws JMSException
+ {
+ super.clearBody();
+ _list.clear();
+ _position = -1;
+ _offset = -1;
+ }
+
+ public byte readByte() throws JMSException
+ {
+ Object obj = readObject();
+ if(obj instanceof Byte)
+ {
+ return (Byte) obj;
+ }
+ else if(obj instanceof String || obj == null)
+ {
+ try
+ {
+ return Byte.valueOf((String)obj);
+ }
+ catch(RuntimeException e)
+ {
+ backup();
+ throw e;
+ }
+ }
+ else
+ {
+ backup();
+ throw new MessageFormatException("Cannot convert value of type " + obj.getClass().getName());
+ }
+ }
+
+ private void backup()
+ {
+ _position--;
+ }
+
+ public short readShort() throws JMSException
+ {
+ Object obj = readObject();
+ if(obj instanceof Short)
+ {
+ return (Short) obj;
+ }
+ else if(obj instanceof Byte)
+ {
+ return (Byte) obj;
+ }
+ else if(obj instanceof String || obj == null)
+ {
+ try
+ {
+ return Short.valueOf((String)obj);
+ }
+ catch(RuntimeException e)
+ {
+ backup();
+ throw e;
+ }
+ }
+ else
+ {
+ backup();
+ throw new MessageFormatException("Cannot convert value of type " + obj.getClass().getName());
+ }
+
+ }
+
+ public char readChar() throws JMSException
+ {
+ Object obj = readObject();
+ if(obj instanceof Character)
+ {
+ return (Character) obj;
+ }
+ if(obj == null)
+ {
+ backup();
+ throw new NullPointerException();
+ }
+ else
+ {
+ backup();
+ throw new MessageFormatException("Cannot read " + obj.getClass().getName() + " as boolean");
+ }
+
+ }
+
+ public int readInt() throws JMSException
+ {
+ Object obj = readObject();
+ if(obj instanceof Integer)
+ {
+ return (Integer) obj;
+ }
+ else if(obj instanceof Short)
+ {
+ return (Short) obj;
+ }
+ else if(obj instanceof Byte)
+ {
+ return (Byte) obj;
+ }
+ else if(obj instanceof String || obj == null)
+ {
+ try
+ {
+ return Integer.valueOf((String)obj);
+ }
+ catch (RuntimeException e)
+ {
+ backup();
+ throw e;
+ }
+ }
+ else
+ {
+ backup();
+ throw new MessageFormatException("Cannot convert value of type " + obj.getClass().getName());
+ }
+ }
+
+ public long readLong() throws JMSException
+ {
+ Object obj = readObject();
+ if(obj instanceof Long)
+ {
+ return (Long) obj;
+ }
+ else if(obj instanceof Integer)
+ {
+ return (Integer) obj;
+ }
+ else if(obj instanceof Short)
+ {
+ return (Short) obj;
+ }
+ else if(obj instanceof Byte)
+ {
+ return (Byte) obj;
+ }
+ else if(obj instanceof String || obj == null)
+ {
+ try
+ {
+ return Long.valueOf((String)obj);
+ }
+ catch (RuntimeException e)
+ {
+ backup();
+ throw e;
+ }
+ }
+ else
+ {
+ backup();
+ throw new MessageFormatException("Cannot convert value of type " + obj.getClass().getName());
+ }
+ }
+
+ public float readFloat() throws JMSException
+ {
+ Object obj = readObject();
+ if(obj instanceof Float)
+ {
+ return (Float) obj;
+ }
+ else if(obj instanceof String || obj == null)
+ {
+ try
+ {
+ return Float.valueOf((String)obj);
+ }
+ catch (RuntimeException e)
+ {
+ backup();
+ throw e;
+ }
+ }
+ else
+ {
+ backup();
+ throw new MessageFormatException("Cannot convert value of type " + obj.getClass().getName());
+ }
+ }
+
+ public double readDouble() throws JMSException
+ {
+ Object obj = readObject();
+ if(obj instanceof Double)
+ {
+ return (Double) obj;
+ }
+ else if(obj instanceof Float)
+ {
+ return (Float) obj;
+ }
+ else if(obj instanceof String || obj == null)
+ {
+ try
+ {
+ return Double.valueOf((String)obj);
+ }
+ catch (RuntimeException e)
+ {
+ backup();
+ throw e;
+ }
+ }
+ else
+ {
+ backup();
+ throw new MessageFormatException("Cannot convert value of type " + obj.getClass().getName());
+ }
+ }
+
+ public String readString() throws JMSException
+ {
+ Object obj = readObject();
+ if(obj instanceof Binary)
+ {
+ backup();
+ throw new MessageFormatException("Cannot convert value of type " + obj.getClass().getName());
+ }
+ return String.valueOf(obj);
+ }
+
+ public int readBytes(final byte[] bytes) throws JMSException
+ {
+ Object obj = readObject();
+ if(!(obj instanceof Binary))
+ {
+ backup();
+ if(_position > -1 && _list.get(_position) instanceof Binary)
+ {
+ return -1;
+ }
+ throw new MessageFormatException("Cannot convert value of type " + obj.getClass().getName());
+ }
+ Binary binary = (Binary) obj;
+ if(bytes.length >= binary.getLength())
+ {
+ System.arraycopy(binary.getArray(),binary.getArrayOffset(),bytes,0,binary.getLength());
+ return binary.getLength();
+ }
+ return -1;
+ }
+
+ public Object readObject() throws JMSException
+ {
+ checkReadable();
+ if(_offset == -1)
+ {
+ try
+ {
+ return _list.get(++_position);
+ }
+ catch (IndexOutOfBoundsException e)
+ {
+ throw new MessageEOFException("No more data in message stream");
+ }
+ }
+ else
+ {
+ return null; //TODO
+ }
+ }
+
+ public void writeBoolean(final boolean b) throws JMSException
+ {
+ checkWritable();
+ _list.add(b);
+ }
+
+ public void writeByte(final byte b) throws JMSException
+ {
+ checkWritable();
+ _list.add(b);
+ }
+
+ public void writeShort(final short i) throws JMSException
+ {
+ checkWritable();
+ _list.add(i);
+ }
+
+ public void writeChar(final char c) throws JMSException
+ {
+ checkWritable();
+ _list.add(c);
+ }
+
+ public void writeInt(final int i) throws JMSException
+ {
+ checkWritable();
+ _list.add(i);
+ }
+
+ public void writeLong(final long l) throws JMSException
+ {
+ checkWritable();
+ _list.add(l);
+ }
+
+ public void writeFloat(final float v) throws JMSException
+ {
+ checkWritable();
+ _list.add(v);
+ }
+
+ public void writeDouble(final double v) throws JMSException
+ {
+ checkWritable();
+ _list.add(v);
+ }
+
+ public void writeString(final String s) throws JMSException
+ {
+ checkWritable();
+ _list.add(s);
+ }
+
+ public void writeBytes(final byte[] bytes) throws JMSException
+ {
+ checkWritable();
+ writeBytes(bytes, 0, bytes.length);
+ }
+
+ public void writeBytes(final byte[] bytes, final int offset, final int size) throws JMSException
+ {
+ checkWritable();
+
+ if(!_list.isEmpty() && _list.get(_list.size()-1) instanceof byte[])
+ {
+ Binary oldVal = (Binary) _list.get(_list.size()-1);
+ byte[] allBytes = new byte[oldVal.getLength() + size];
+ System.arraycopy(oldVal.getArray(),oldVal.getArrayOffset(),allBytes,0,oldVal.getLength());
+ System.arraycopy(bytes, offset, allBytes, oldVal.getLength(), size);
+ _list.set(_list.size()-1, allBytes);
+ }
+ else
+ {
+ byte[] dup = new byte[size];
+ System.arraycopy(bytes,offset,dup,0,size);
+ _list.add(new Binary(dup));
+ }
+ }
+
+ public void writeObject(final Object o) throws JMSException
+ {
+ checkWritable();
+ if(o == null || _supportedClasses.contains(o.getClass()))
+ {
+ _list.add(o);
+ }
+ }
+
+ public void reset() throws JMSException
+ {
+ super.reset();
+ _position = -1;
+ _offset = -1;
+ }
+
+ @Override Collection<Section> getSections()
+ {
+ List<Section> sections = new ArrayList<Section>();
+ sections.add(getHeader());
+ if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty())
+ {
+ sections.add(getMessageAnnotations());
+ }
+ sections.add(getProperties());
+ sections.add(getApplicationProperties());
+ sections.add(new AmqpValue(_list));
+ sections.add(getFooter());
+ return sections;
+ }
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryQueueImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryQueueImpl.java
new file mode 100644
index 0000000000..76608c421b
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryQueueImpl.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.client.Sender;
+import org.apache.qpid.amqp_1_0.jms.MessageConsumer;
+import org.apache.qpid.amqp_1_0.jms.TemporaryQueue;
+
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+public class TemporaryQueueImpl extends QueueImpl implements TemporaryQueue
+{
+ private Sender _sender;
+ private SessionImpl _session;
+ private final Set<MessageConsumer> _consumers =
+ Collections.synchronizedSet(new HashSet<MessageConsumer>());
+
+ protected TemporaryQueueImpl(String address, Sender sender, SessionImpl session)
+ {
+ super(address);
+ _sender = sender;
+ _session = session;
+ _session.getConnection().addOnCloseTask(new ConnectionImpl.CloseTask()
+ {
+ public void onClose() throws JMSException
+ {
+ synchronized (TemporaryQueueImpl.this)
+ {
+ close();
+ }
+ }
+ });
+ }
+
+ public synchronized void delete() throws JMSException
+ {
+ if(_consumers.isEmpty())
+ {
+ close();
+ }
+ else
+ {
+ throw new IllegalStateException("Cannot delete destination as it has consumers");
+ }
+ }
+
+ private void close() throws JMSException
+ {
+ if(_sender != null)
+ {
+ try
+ {
+ _sender.close();
+ _sender = null;
+ }
+ catch (Sender.SenderClosingException e)
+ {
+ final JMSException jmsException = new JMSException(e.getMessage());
+ jmsException.setLinkedException(e);
+ throw jmsException;
+ }
+ }
+
+ }
+
+ public SessionImpl getSession()
+ {
+ return _session;
+ }
+
+ public void addConsumer(MessageConsumer consumer)
+ {
+ _consumers.add(consumer);
+ }
+
+ public void removeConsumer(MessageConsumer consumer)
+ {
+ _consumers.remove(consumer);
+ }
+
+ public boolean isDeleted()
+ {
+ return _sender == null;
+ }
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java
new file mode 100644
index 0000000000..8e0d07e78b
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.client.Sender;
+import org.apache.qpid.amqp_1_0.jms.MessageConsumer;
+import org.apache.qpid.amqp_1_0.jms.TemporaryTopic;
+
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+public class TemporaryTopicImpl extends TopicImpl implements TemporaryTopic
+{
+ private Sender _sender;
+ private SessionImpl _session;
+ private final Set<MessageConsumer> _consumers =
+ Collections.synchronizedSet(new HashSet<MessageConsumer>());
+
+ protected TemporaryTopicImpl(String address, Sender sender, SessionImpl session)
+ {
+ super(address);
+ _sender = sender;
+ _session = session;
+
+ _session.getConnection().addOnCloseTask(new ConnectionImpl.CloseTask()
+ {
+ public void onClose() throws JMSException
+ {
+ synchronized (TemporaryTopicImpl.this)
+ {
+ close();
+ }
+ }
+ });
+ }
+
+ public void delete() throws JMSException
+ {
+ if(_consumers.isEmpty())
+ {
+ close();
+ }
+ else
+ {
+ throw new IllegalStateException("Cannot delete destination as it has consumers");
+ }
+
+ }
+
+
+ private void close() throws JMSException
+ {
+ if(_sender != null)
+ {
+ try
+ {
+
+ _sender.close();
+ _sender = null;
+ }
+ catch (Sender.SenderClosingException e)
+ {
+ final JMSException jmsException = new JMSException(e.getMessage());
+ jmsException.setLinkedException(e);
+ throw jmsException;
+ }
+ }
+
+ }
+
+ public SessionImpl getSession()
+ {
+ return _session;
+ }
+
+
+ public void addConsumer(MessageConsumer consumer)
+ {
+ _consumers.add(consumer);
+ }
+
+ public void removeConsumer(MessageConsumer consumer)
+ {
+ _consumers.remove(consumer);
+ }
+
+ public boolean isDeleted()
+ {
+ return _sender == null;
+ }
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TextMessageImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TextMessageImpl.java
new file mode 100644
index 0000000000..5d9172229c
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TextMessageImpl.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.jms.TextMessage;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+
+import javax.jms.JMSException;
+import javax.jms.MessageNotWriteableException;
+import java.util.*;
+
+public class TextMessageImpl extends MessageImpl implements TextMessage
+{
+ private String _text;
+
+ protected TextMessageImpl(Header header,
+ MessageAnnotations messageAnnotations,
+ Properties properties,
+ ApplicationProperties appProperties,
+ String text,
+ Footer footer,
+ SessionImpl session)
+ {
+ super(header, messageAnnotations, properties, appProperties, footer, session);
+ _text = text;
+ }
+
+ protected TextMessageImpl(final SessionImpl session)
+ {
+ super(new Header(), new MessageAnnotations(new HashMap()),
+ new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
+ session);
+ }
+
+ public void setText(final String text) throws MessageNotWriteableException
+ {
+ if(isReadOnly())
+ {
+ throw new MessageNotWriteableException("Cannot set object, message is in read only mode");
+ }
+
+ _text = text;
+ }
+
+ public String getText() throws JMSException
+ {
+ return _text;
+ }
+
+ @Override
+ public void clearBody() throws JMSException
+ {
+ super.clearBody();
+ _text = null;
+ }
+
+ @Override Collection<Section> getSections()
+ {
+ List<Section> sections = new ArrayList<Section>();
+ sections.add(getHeader());
+ if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty())
+ {
+ sections.add(getMessageAnnotations());
+ }
+ sections.add(getProperties());
+ sections.add(getApplicationProperties());
+ AmqpValue section = new AmqpValue(_text);
+ sections.add(section);
+ sections.add(getFooter());
+ return sections;
+ }
+
+
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicConnectionImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicConnectionImpl.java
new file mode 100644
index 0000000000..de456532cb
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicConnectionImpl.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.jms.TopicConnection;
+
+import javax.jms.ConnectionConsumer;
+import javax.jms.JMSException;
+import javax.jms.ServerSessionPool;
+import javax.jms.Topic;
+
+public class TopicConnectionImpl extends ConnectionImpl implements TopicConnection
+{
+ TopicConnectionImpl(String host, int port, String username, String password, String clientId)
+ throws JMSException
+ {
+ super(host, port, username, password, clientId);
+ }
+
+ public TopicSessionImpl createTopicSession(final boolean b, final int i) throws JMSException
+ {
+ return null; //TODO
+ }
+
+ public ConnectionConsumer createConnectionConsumer(final Topic topic,
+ final String s,
+ final ServerSessionPool serverSessionPool,
+ final int i) throws JMSException
+ {
+ return null; //TODO
+ }
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicImpl.java
new file mode 100644
index 0000000000..e54a660963
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicImpl.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.jms.Topic;
+
+import java.util.WeakHashMap;
+
+public class TopicImpl extends DestinationImpl implements Topic
+{
+ private static final WeakHashMap<String, TopicImpl> TOPIC_CACHE =
+ new WeakHashMap<String, TopicImpl>();
+
+
+ public TopicImpl(String address)
+ {
+ super(address);
+ }
+
+ public String getTopicName()
+ {
+ return getAddress();
+ }
+
+ public static synchronized TopicImpl createTopic(final String address)
+ {
+ TopicImpl topic = TOPIC_CACHE.get(address);
+ if(topic == null)
+ {
+ topic = new TopicImpl(address);
+ TOPIC_CACHE.put(address, topic);
+ }
+ return topic;
+ }
+
+ public static TopicImpl valueOf(String address)
+ {
+ return address == null ? null : createTopic(address);
+ }
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicPublisherImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicPublisherImpl.java
new file mode 100644
index 0000000000..a2d2f34043
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicPublisherImpl.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.jms.TopicPublisher;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Topic;
+
+public class TopicPublisherImpl extends MessageProducerImpl implements TopicPublisher
+{
+ protected TopicPublisherImpl(final Destination destination, final SessionImpl session)
+ throws JMSException
+ {
+ super(destination, session);
+ }
+
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSessionImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSessionImpl.java
new file mode 100644
index 0000000000..052a3f2a6b
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSessionImpl.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.jms.TopicSession;
+
+import javax.jms.JMSException;
+import javax.jms.Topic;
+
+public class TopicSessionImpl extends SessionImpl implements TopicSession
+{
+ protected TopicSessionImpl(final ConnectionImpl connection, final AcknowledgeMode acknowledgeMode)
+ {
+ super(connection, acknowledgeMode);
+ setTopicSession(true);
+ }
+
+ public TopicSubscriberImpl createSubscriber(final Topic topic) throws JMSException
+ {
+ return createSubscriber(topic,null, false);
+ }
+
+ public TopicSubscriberImpl createSubscriber(final Topic topic, final String selector, final boolean noLocal) throws JMSException
+ {
+
+ final TopicSubscriberImpl messageConsumer;
+ synchronized(getClientSession().getEndpoint().getLock())
+ {
+ messageConsumer = new TopicSubscriberImpl((TopicImpl) topic, this, selector, noLocal);
+ addConsumer(messageConsumer);
+ }
+ return messageConsumer;
+ }
+
+ public TopicPublisherImpl createPublisher(final Topic topic) throws JMSException
+ {
+ return null; //TODO
+ }
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java
new file mode 100644
index 0000000000..374a4c1ef8
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.client.AcknowledgeMode;
+import org.apache.qpid.amqp_1_0.client.Receiver;
+import org.apache.qpid.amqp_1_0.jms.Topic;
+import org.apache.qpid.amqp_1_0.jms.TopicSubscriber;
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.messaging.Filter;
+import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+
+import javax.jms.*;
+import java.util.Map;
+
+public class TopicSubscriberImpl extends MessageConsumerImpl implements TopicSubscriber
+{
+
+ TopicSubscriberImpl(String name,
+ boolean durable,
+ final Topic destination,
+ final SessionImpl session,
+ final String selector,
+ final boolean noLocal)
+ throws JMSException
+ {
+ super(destination, session, selector, noLocal, name, durable);
+ setTopicSubscriber(true);
+ }
+
+ TopicSubscriberImpl(final Topic destination,
+ final SessionImpl session,
+ final String selector,
+ final boolean noLocal)
+ throws JMSException
+ {
+ super(destination, session, selector, noLocal);
+ setTopicSubscriber(true);
+ }
+
+ public TopicImpl getTopic() throws JMSException
+ {
+ return (TopicImpl) getDestination();
+ }
+
+
+ protected Receiver createClientReceiver() throws JMSException
+ {
+ try
+ {
+ String address = getDestination().getAddress();
+ Receiver receiver = getSession().getClientSession().createReceiver(address,
+ StdDistMode.COPY, AcknowledgeMode.ALO,
+ getLinkName(), isDurable(), getFilters(),
+ null);
+ String actualAddress = receiver.getAddress();
+
+ @SuppressWarnings("unchecked")
+ Map<Symbol, Filter> actualFilters = (Map<Symbol, Filter>) receiver.getFilter();
+
+ if(!address.equals(actualAddress) || !filtersEqual(getFilters(), actualFilters))
+ {
+ receiver.close();
+ receiver = getSession().getClientSession().createReceiver(address,
+ StdDistMode.COPY, AcknowledgeMode.ALO,
+ getLinkName(), isDurable(), getFilters(),
+ null);
+ }
+
+
+ return receiver;
+ }
+ catch (AmqpErrorException e)
+ {
+ org.apache.qpid.amqp_1_0.type.transport.Error error = e.getError();
+ if(AmqpError.INVALID_FIELD.equals(error.getCondition())
+ && error.getInfo() != null && Symbol.valueOf("filter").equals(error.getInfo().get(Symbol.valueOf
+ ("field"))))
+ {
+ throw new InvalidSelectorException(e.getMessage());
+ }
+ else
+ {
+ throw new JMSException(e.getMessage(), error.getCondition().getValue().toString());
+
+ }
+
+ }
+ }
+
+ private boolean filtersEqual(Map<Symbol, Filter> filters, Map<Symbol, Filter> actualFilters)
+ {
+ if(filters == null || filters.isEmpty())
+ {
+ return actualFilters == null || actualFilters.isEmpty();
+ }
+ else
+ {
+ return actualFilters != null && filters.equals(actualFilters);
+ }
+
+ }
+
+
+ protected void closeUnderlyingReceiver(Receiver receiver)
+ {
+ receiver.detach();
+ }
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/jndi/NameParserImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/jndi/NameParserImpl.java
new file mode 100644
index 0000000000..7c3857f6c9
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/jndi/NameParserImpl.java
@@ -0,0 +1,37 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.amqp_1_0.jms.jndi;
+
+import javax.naming.CompositeName;
+import javax.naming.Name;
+import javax.naming.NameParser;
+import javax.naming.NamingException;
+
+/**
+ * A default implementation of {@link NameParser}
+ * <p/>
+ * Based on class from ActiveMQ.
+ */
+public class NameParserImpl implements NameParser
+{
+ public Name parse(String name) throws NamingException
+ {
+ return new CompositeName(name);
+ }
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/jndi/PropertiesFileInitialContextFactory.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/jndi/PropertiesFileInitialContextFactory.java
new file mode 100644
index 0000000000..31030a7d30
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/jndi/PropertiesFileInitialContextFactory.java
@@ -0,0 +1,230 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.jms.jndi;
+
+import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
+import org.apache.qpid.amqp_1_0.jms.impl.DestinationImpl;
+import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
+import org.apache.qpid.amqp_1_0.jms.impl.TopicImpl;
+
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.Queue;
+import javax.jms.Topic;
+import javax.naming.Context;
+import javax.naming.NamingException;
+import javax.naming.spi.InitialContextFactory;
+
+
+public class PropertiesFileInitialContextFactory implements InitialContextFactory
+{
+
+ private String CONNECTION_FACTORY_PREFIX = "connectionfactory.";
+ private String DESTINATION_PREFIX = "destination.";
+ private String QUEUE_PREFIX = "queue.";
+ private String TOPIC_PREFIX = "topic.";
+
+ public Context getInitialContext(Hashtable environment) throws NamingException
+ {
+ Map data = new ConcurrentHashMap();
+
+ String file = null;
+ try
+ {
+
+
+ if (environment.containsKey(Context.PROVIDER_URL))
+ {
+ file = (String) environment.get(Context.PROVIDER_URL);
+ }
+ else
+ {
+ file = System.getProperty(Context.PROVIDER_URL);
+ }
+
+ if (file != null)
+ {
+
+ // Load the properties specified
+ BufferedInputStream inputStream = new BufferedInputStream(new FileInputStream(file));
+ Properties p = new Properties();
+ try
+ {
+ p.load(inputStream);
+ }
+ finally
+ {
+ inputStream.close();
+ }
+
+
+ for (Map.Entry me : p.entrySet())
+ {
+ String key = (String) me.getKey();
+ String value = (String) me.getValue();
+ environment.put(key, value);
+ if (System.getProperty(key) == null)
+ {
+ System.setProperty(key, value);
+ }
+ }
+ }
+ else
+ {
+ throw new NamingException("No Provider URL specified.");
+ }
+ }
+ catch (IOException ioe)
+ {
+ NamingException ne = new NamingException("Unable to load property file:" + file +".");
+ ne.setRootCause(ioe);
+ throw ne;
+ }
+
+ try
+ {
+ createConnectionFactories(data, environment);
+ }
+ catch (MalformedURLException e)
+ {
+ NamingException ne = new NamingException();
+ ne.setRootCause(e);
+ throw ne;
+ }
+
+ createDestinations(data, environment);
+
+ createQueues(data, environment);
+
+ createTopics(data, environment);
+
+ return createContext(data, environment);
+ }
+
+ protected ReadOnlyContext createContext(Map data, Hashtable environment)
+ {
+ return new ReadOnlyContext(environment, data);
+ }
+
+ protected void createConnectionFactories(Map data, Hashtable environment) throws MalformedURLException
+ {
+ for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();)
+ {
+ Map.Entry entry = (Map.Entry) iter.next();
+ String key = entry.getKey().toString();
+ if (key.startsWith(CONNECTION_FACTORY_PREFIX))
+ {
+ String jndiName = key.substring(CONNECTION_FACTORY_PREFIX.length());
+ ConnectionFactory cf = createFactory(entry.getValue().toString().trim());
+ if (cf != null)
+ {
+ data.put(jndiName, cf);
+ }
+ }
+ }
+ }
+
+ protected void createDestinations(Map data, Hashtable environment)
+ {
+ for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();)
+ {
+ Map.Entry entry = (Map.Entry) iter.next();
+ String key = entry.getKey().toString();
+ if (key.startsWith(DESTINATION_PREFIX))
+ {
+ String jndiName = key.substring(DESTINATION_PREFIX.length());
+ Destination dest = createDestination(entry.getValue().toString().trim());
+ if (dest != null)
+ {
+ data.put(jndiName, dest);
+ }
+ }
+ }
+ }
+
+ protected void createQueues(Map data, Hashtable environment)
+ {
+ for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();)
+ {
+ Map.Entry entry = (Map.Entry) iter.next();
+ String key = entry.getKey().toString();
+ if (key.startsWith(QUEUE_PREFIX))
+ {
+ String jndiName = key.substring(QUEUE_PREFIX.length());
+ Queue q = createQueue(entry.getValue().toString().trim());
+ if (q != null)
+ {
+ data.put(jndiName, q);
+ }
+ }
+ }
+ }
+
+ protected void createTopics(Map data, Hashtable environment)
+ {
+ for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();)
+ {
+ Map.Entry entry = (Map.Entry) iter.next();
+ String key = entry.getKey().toString();
+ if (key.startsWith(TOPIC_PREFIX))
+ {
+ String jndiName = key.substring(TOPIC_PREFIX.length());
+ Topic t = createTopic(entry.getValue().toString().trim());
+ if (t != null)
+ {
+ data.put(jndiName, t);
+ }
+ }
+ }
+ }
+
+
+ private ConnectionFactory createFactory(String url) throws MalformedURLException
+ {
+ return ConnectionFactoryImpl.createFromURL(url);
+ }
+
+ private DestinationImpl createDestination(String str)
+ {
+ return DestinationImpl.createDestination(str);
+ }
+
+ private QueueImpl createQueue(String address)
+ {
+ return QueueImpl.createQueue(address);
+ }
+
+ private TopicImpl createTopic(String address)
+ {
+ return TopicImpl.createTopic(address);
+ }
+
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/jndi/ReadOnlyContext.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/jndi/ReadOnlyContext.java
new file mode 100644
index 0000000000..4e0f994b94
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/jndi/ReadOnlyContext.java
@@ -0,0 +1,527 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.jms.jndi;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.Map;
+
+import javax.naming.Binding;
+import javax.naming.CompositeName;
+import javax.naming.Context;
+import javax.naming.LinkRef;
+import javax.naming.Name;
+import javax.naming.NameClassPair;
+import javax.naming.NameNotFoundException;
+import javax.naming.NameParser;
+import javax.naming.NamingEnumeration;
+import javax.naming.NamingException;
+import javax.naming.NotContextException;
+import javax.naming.OperationNotSupportedException;
+import javax.naming.Reference;
+import javax.naming.spi.NamingManager;
+
+/**
+ * Based on class from ActiveMQ.
+ * A read-only Context
+ * <p/>
+ * This version assumes it and all its subcontext are read-only and any attempt
+ * to modify (e.g. through bind) will result in an OperationNotSupportedException.
+ * Each Context in the tree builds a cache of the entries in all sub-contexts
+ * to optimise the performance of lookup.
+ * </p>
+ * <p>This implementation is intended to optimise the performance of lookup(String)
+ * to about the level of a HashMap get. It has been observed that the scheme
+ * resolution phase performed by the JVM takes considerably longer, so for
+ * optimum performance lookups should be coded like:</p>
+ * <code>
+ * Context componentContext = (Context)new InitialContext().lookup("java:comp");
+ * String envEntry = (String) componentContext.lookup("env/myEntry");
+ * String envEntry2 = (String) componentContext.lookup("env/myEntry2");
+ * </code>
+ */
+public class ReadOnlyContext implements Context, Serializable
+{
+ private static final long serialVersionUID = -5754338187296859149L;
+ protected static final NameParser nameParser = new NameParserImpl();
+
+ protected final Hashtable environment; // environment for this context
+ protected final Map bindings; // bindings at my level
+ protected final Map treeBindings; // all bindings under me
+
+ private boolean frozen = false;
+ private String nameInNamespace = "";
+ public static final String SEPARATOR = "/";
+
+ public ReadOnlyContext()
+ {
+ environment = new Hashtable();
+ bindings = new HashMap();
+ treeBindings = new HashMap();
+ }
+
+ public ReadOnlyContext(Hashtable env)
+ {
+ if (env == null)
+ {
+ this.environment = new Hashtable();
+ }
+ else
+ {
+ this.environment = new Hashtable(env);
+ }
+
+ this.bindings = Collections.EMPTY_MAP;
+ this.treeBindings = Collections.EMPTY_MAP;
+ }
+
+ public ReadOnlyContext(Hashtable environment, Map bindings)
+ {
+ if (environment == null)
+ {
+ this.environment = new Hashtable();
+ }
+ else
+ {
+ this.environment = new Hashtable(environment);
+ }
+
+ this.bindings = bindings;
+ treeBindings = new HashMap();
+ frozen = true;
+ }
+
+ public ReadOnlyContext(Hashtable environment, Map bindings, String nameInNamespace)
+ {
+ this(environment, bindings);
+ this.nameInNamespace = nameInNamespace;
+ }
+
+ protected ReadOnlyContext(ReadOnlyContext clone, Hashtable env)
+ {
+ this.bindings = clone.bindings;
+ this.treeBindings = clone.treeBindings;
+ this.environment = new Hashtable(env);
+ }
+
+ protected ReadOnlyContext(ReadOnlyContext clone, Hashtable env, String nameInNamespace)
+ {
+ this(clone, env);
+ this.nameInNamespace = nameInNamespace;
+ }
+
+ public void freeze()
+ {
+ frozen = true;
+ }
+
+ boolean isFrozen()
+ {
+ return frozen;
+ }
+
+ /**
+ * internalBind is intended for use only during setup or possibly by suitably synchronized superclasses.
+ * It binds every possible lookup into a map in each context. To do this, each context
+ * strips off one name segment and if necessary creates a new context for it. Then it asks that context
+ * to bind the remaining name. It returns a map containing all the bindings from the next context, plus
+ * the context it just created (if it in fact created it). (the names are suitably extended by the segment
+ * originally lopped off).
+ *
+ * @param name
+ * @param value
+ * @return
+ * @throws javax.naming.NamingException
+ */
+ protected Map internalBind(String name, Object value) throws NamingException
+ {
+ assert (name != null) && (name.length() > 0);
+ assert !frozen;
+
+ Map newBindings = new HashMap();
+ int pos = name.indexOf('/');
+ if (pos == -1)
+ {
+ if (treeBindings.put(name, value) != null)
+ {
+ throw new NamingException("Something already bound at " + name);
+ }
+
+ bindings.put(name, value);
+ newBindings.put(name, value);
+ }
+ else
+ {
+ String segment = name.substring(0, pos);
+ assert segment != null;
+ assert !segment.equals("");
+ Object o = treeBindings.get(segment);
+ if (o == null)
+ {
+ o = newContext();
+ treeBindings.put(segment, o);
+ bindings.put(segment, o);
+ newBindings.put(segment, o);
+ }
+ else if (!(o instanceof ReadOnlyContext))
+ {
+ throw new NamingException("Something already bound where a subcontext should go");
+ }
+
+ ReadOnlyContext readOnlyContext = (ReadOnlyContext) o;
+ String remainder = name.substring(pos + 1);
+ Map subBindings = readOnlyContext.internalBind(remainder, value);
+ for (Iterator iterator = subBindings.entrySet().iterator(); iterator.hasNext();)
+ {
+ Map.Entry entry = (Map.Entry) iterator.next();
+ String subName = segment + "/" + (String) entry.getKey();
+ Object bound = entry.getValue();
+ treeBindings.put(subName, bound);
+ newBindings.put(subName, bound);
+ }
+ }
+
+ return newBindings;
+ }
+
+ protected ReadOnlyContext newContext()
+ {
+ return new ReadOnlyContext();
+ }
+
+ public Object addToEnvironment(String propName, Object propVal) throws NamingException
+ {
+ return environment.put(propName, propVal);
+ }
+
+ public Hashtable getEnvironment() throws NamingException
+ {
+ return (Hashtable) environment.clone();
+ }
+
+ public Object removeFromEnvironment(String propName) throws NamingException
+ {
+ return environment.remove(propName);
+ }
+
+ public Object lookup(String name) throws NamingException
+ {
+ if (name.length() == 0)
+ {
+ return this;
+ }
+
+ Object result = treeBindings.get(name);
+ if (result == null)
+ {
+ result = bindings.get(name);
+ }
+
+ if (result == null)
+ {
+ int pos = name.indexOf(':');
+ if (pos > 0)
+ {
+ String scheme = name.substring(0, pos);
+ Context ctx = NamingManager.getURLContext(scheme, environment);
+ if (ctx == null)
+ {
+ throw new NamingException("scheme " + scheme + " not recognized");
+ }
+
+ return ctx.lookup(name);
+ }
+ else
+ {
+ // Split out the first name of the path
+ // and look for it in the bindings map.
+ CompositeName path = new CompositeName(name);
+
+ if (path.size() == 0)
+ {
+ return this;
+ }
+ else
+ {
+ String first = path.get(0);
+ Object obj = bindings.get(first);
+ if (obj == null)
+ {
+ throw new NameNotFoundException(name);
+ }
+ else if ((obj instanceof Context) && (path.size() > 1))
+ {
+ Context subContext = (Context) obj;
+ obj = subContext.lookup(path.getSuffix(1));
+ }
+
+ return obj;
+ }
+ }
+ }
+
+ if (result instanceof LinkRef)
+ {
+ LinkRef ref = (LinkRef) result;
+ result = lookup(ref.getLinkName());
+ }
+
+ if (result instanceof Reference)
+ {
+ try
+ {
+ result = NamingManager.getObjectInstance(result, null, null, this.environment);
+ }
+ catch (NamingException e)
+ {
+ throw e;
+ }
+ catch (Exception e)
+ {
+ throw (NamingException) new NamingException("could not look up : " + name).initCause(e);
+ }
+ }
+
+ if (result instanceof ReadOnlyContext)
+ {
+ String prefix = getNameInNamespace();
+ if (prefix.length() > 0)
+ {
+ prefix = prefix + SEPARATOR;
+ }
+
+ result = new ReadOnlyContext((ReadOnlyContext) result, environment, prefix + name);
+ }
+
+ return result;
+ }
+
+ public Object lookup(Name name) throws NamingException
+ {
+ return lookup(name.toString());
+ }
+
+ public Object lookupLink(String name) throws NamingException
+ {
+ return lookup(name);
+ }
+
+ public Name composeName(Name name, Name prefix) throws NamingException
+ {
+ Name result = (Name) prefix.clone();
+ result.addAll(name);
+
+ return result;
+ }
+
+ public String composeName(String name, String prefix) throws NamingException
+ {
+ CompositeName result = new CompositeName(prefix);
+ result.addAll(new CompositeName(name));
+
+ return result.toString();
+ }
+
+ public NamingEnumeration list(String name) throws NamingException
+ {
+ Object o = lookup(name);
+ if (o == this)
+ {
+ return new ListEnumeration();
+ }
+ else if (o instanceof Context)
+ {
+ return ((Context) o).list("");
+ }
+ else
+ {
+ throw new NotContextException();
+ }
+ }
+
+ public NamingEnumeration listBindings(String name) throws NamingException
+ {
+ Object o = lookup(name);
+ if (o == this)
+ {
+ return new ListBindingEnumeration();
+ }
+ else if (o instanceof Context)
+ {
+ return ((Context) o).listBindings("");
+ }
+ else
+ {
+ throw new NotContextException();
+ }
+ }
+
+ public Object lookupLink(Name name) throws NamingException
+ {
+ return lookupLink(name.toString());
+ }
+
+ public NamingEnumeration list(Name name) throws NamingException
+ {
+ return list(name.toString());
+ }
+
+ public NamingEnumeration listBindings(Name name) throws NamingException
+ {
+ return listBindings(name.toString());
+ }
+
+ public void bind(Name name, Object obj) throws NamingException
+ {
+ throw new OperationNotSupportedException();
+ }
+
+ public void bind(String name, Object obj) throws NamingException
+ {
+ throw new OperationNotSupportedException();
+ }
+
+ public void close() throws NamingException
+ {
+ // ignore
+ }
+
+ public Context createSubcontext(Name name) throws NamingException
+ {
+ throw new OperationNotSupportedException();
+ }
+
+ public Context createSubcontext(String name) throws NamingException
+ {
+ throw new OperationNotSupportedException();
+ }
+
+ public void destroySubcontext(Name name) throws NamingException
+ {
+ throw new OperationNotSupportedException();
+ }
+
+ public void destroySubcontext(String name) throws NamingException
+ {
+ throw new OperationNotSupportedException();
+ }
+
+ public String getNameInNamespace() throws NamingException
+ {
+ return nameInNamespace;
+ }
+
+ public NameParser getNameParser(Name name) throws NamingException
+ {
+ return nameParser;
+ }
+
+ public NameParser getNameParser(String name) throws NamingException
+ {
+ return nameParser;
+ }
+
+ public void rebind(Name name, Object obj) throws NamingException
+ {
+ throw new OperationNotSupportedException();
+ }
+
+ public void rebind(String name, Object obj) throws NamingException
+ {
+ throw new OperationNotSupportedException();
+ }
+
+ public void rename(Name oldName, Name newName) throws NamingException
+ {
+ throw new OperationNotSupportedException();
+ }
+
+ public void rename(String oldName, String newName) throws NamingException
+ {
+ throw new OperationNotSupportedException();
+ }
+
+ public void unbind(Name name) throws NamingException
+ {
+ throw new OperationNotSupportedException();
+ }
+
+ public void unbind(String name) throws NamingException
+ {
+ throw new OperationNotSupportedException();
+ }
+
+ private abstract class LocalNamingEnumeration implements NamingEnumeration
+ {
+ private Iterator i = bindings.entrySet().iterator();
+
+ public boolean hasMore() throws NamingException
+ {
+ return i.hasNext();
+ }
+
+ public boolean hasMoreElements()
+ {
+ return i.hasNext();
+ }
+
+ protected Map.Entry getNext()
+ {
+ return (Map.Entry) i.next();
+ }
+
+ public void close() throws NamingException
+ { }
+ }
+
+ private class ListEnumeration extends LocalNamingEnumeration
+ {
+ public Object next() throws NamingException
+ {
+ return nextElement();
+ }
+
+ public Object nextElement()
+ {
+ Map.Entry entry = getNext();
+
+ return new NameClassPair((String) entry.getKey(), entry.getValue().getClass().getName());
+ }
+ }
+
+ private class ListBindingEnumeration extends LocalNamingEnumeration
+ {
+ public Object next() throws NamingException
+ {
+ return nextElement();
+ }
+
+ public Object nextElement()
+ {
+ Map.Entry entry = getNext();
+
+ return new Binding((String) entry.getKey(), entry.getValue());
+ }
+ }
+}