From 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Thu, 28 Feb 2013 16:14:30 +0000 Subject: Update from trunk r1375509 through r1450773 git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68 --- java/amqp-1-0-client-jms/build.xml | 3 + java/amqp-1-0-client-jms/example/build.xml | 28 +++ java/amqp-1-0-client-jms/resources/LICENSE | 204 +++++++++++++++++++++ java/amqp-1-0-client-jms/resources/NOTICE | 5 + java/amqp-1-0-client-jms/resources/README.txt | 7 + .../amqp_1_0/jms/impl/ConnectionFactoryImpl.java | 79 +++++++- .../qpid/amqp_1_0/jms/impl/ConnectionImpl.java | 172 ++++++++++++++--- .../qpid/amqp_1_0/jms/impl/DecodedDestination.java | 47 +++++ .../amqp_1_0/jms/impl/MessageConsumerImpl.java | 6 +- .../apache/qpid/amqp_1_0/jms/impl/MessageImpl.java | 183 +++++++++++++++--- .../amqp_1_0/jms/impl/MessageProducerImpl.java | 5 +- .../qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java | 20 +- .../qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java | 151 ++++++++++----- .../qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java | 2 +- .../apache/qpid/amqp_1_0/jms/impl/SessionImpl.java | 20 +- .../amqp_1_0/jms/impl/TopicSubscriberImpl.java | 2 +- .../jndi/PropertiesFileInitialContextFactory.java | 3 + 17 files changed, 818 insertions(+), 119 deletions(-) create mode 100644 java/amqp-1-0-client-jms/example/build.xml create mode 100644 java/amqp-1-0-client-jms/resources/LICENSE create mode 100644 java/amqp-1-0-client-jms/resources/NOTICE create mode 100644 java/amqp-1-0-client-jms/resources/README.txt create mode 100644 java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DecodedDestination.java (limited to 'java/amqp-1-0-client-jms') diff --git a/java/amqp-1-0-client-jms/build.xml b/java/amqp-1-0-client-jms/build.xml index cfc6e0babe..d501be1d8d 100644 --- a/java/amqp-1-0-client-jms/build.xml +++ b/java/amqp-1-0-client-jms/build.xml @@ -24,6 +24,9 @@ + + + diff --git a/java/amqp-1-0-client-jms/example/build.xml b/java/amqp-1-0-client-jms/example/build.xml new file mode 100644 index 0000000000..cb9ab0994c --- /dev/null +++ b/java/amqp-1-0-client-jms/example/build.xml @@ -0,0 +1,28 @@ + + + + + + + + + diff --git a/java/amqp-1-0-client-jms/resources/LICENSE b/java/amqp-1-0-client-jms/resources/LICENSE new file mode 100644 index 0000000000..de4b130f35 --- /dev/null +++ b/java/amqp-1-0-client-jms/resources/LICENSE @@ -0,0 +1,204 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + diff --git a/java/amqp-1-0-client-jms/resources/NOTICE b/java/amqp-1-0-client-jms/resources/NOTICE new file mode 100644 index 0000000000..8d1c3f3122 --- /dev/null +++ b/java/amqp-1-0-client-jms/resources/NOTICE @@ -0,0 +1,5 @@ +Apache Qpid +Copyright 2006-2012 Apache Software Foundation +This product includes software developed at +Apache Software Foundation (http://www.apache.org/) + diff --git a/java/amqp-1-0-client-jms/resources/README.txt b/java/amqp-1-0-client-jms/resources/README.txt new file mode 100644 index 0000000000..35d25050fe --- /dev/null +++ b/java/amqp-1-0-client-jms/resources/README.txt @@ -0,0 +1,7 @@ + +Documentation +-------------- +All of our user documentation can be accessed at: + +http://qpid.apache.org/documentation.html + diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java index d9e6dfe36d..4856a7c491 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java @@ -20,8 +20,12 @@ */ package org.apache.qpid.amqp_1_0.jms.impl; +import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; +import java.net.URLConnection; +import java.net.URLDecoder; +import java.net.URLStreamHandler; import javax.jms.JMSException; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; @@ -39,6 +43,8 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection private String _remoteHost; private boolean _ssl; + private String _queuePrefix; + private String _topicPrefix; public ConnectionFactoryImpl(final String host, final int port, @@ -86,36 +92,70 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection public ConnectionImpl createConnection() throws JMSException { - return new ConnectionImpl(_host, _port, _username, _password, _clientId, _remoteHost, _ssl); + return createConnection(_username, _password); } public ConnectionImpl createConnection(final String username, final String password) throws JMSException { - return new ConnectionImpl(_host, _port, username, password, _clientId, _remoteHost, _ssl); + ConnectionImpl connection = new ConnectionImpl(_host, _port, username, password, _clientId, _remoteHost, _ssl); + connection.setQueuePrefix(_queuePrefix); + connection.setTopicPrefix(_topicPrefix); + return connection; } public static ConnectionFactoryImpl createFromURL(final String urlString) throws MalformedURLException { - URL url = new URL(urlString); + URL url = new URL(null, urlString, new URLStreamHandler() + { + @Override + protected URLConnection openConnection(URL u) throws IOException + { + throw new UnsupportedOperationException(); + } + }); + String protocol = url.getProtocol(); + if(protocol == null || "".equals(protocol)) + { + protocol = "amqp"; + } + else if(!protocol.equals("amqp") && !protocol.equals("amqps")) + { + throw new MalformedURLException("Protocol '"+protocol+"' unknown. Must be one of 'amqp' or 'amqps'."); + } String host = url.getHost(); int port = url.getPort(); + + boolean ssl = false; + if(port == -1) { - port = 5672; + if("amqps".equals(protocol)) + { + port = 5671; + ssl = true; + } + else + { + port = 5672; + } } + else if("amqps".equals(protocol)) + { + ssl = true; + } + String userInfo = url.getUserInfo(); String username = null; String password = null; String clientId = null; String remoteHost = null; - boolean ssl = false; if(userInfo != null) { String[] components = userInfo.split(":",2); - username = components[0]; + username = URLDecoder.decode(components[0]); if(components.length == 2) { - password = components[1]; + password = URLDecoder.decode(components[1]); } } String query = url.getQuery(); @@ -139,6 +179,11 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection } } + if(remoteHost == null) + { + remoteHost = host; + } + return new ConnectionFactoryImpl(host, port, username, password, clientId, remoteHost, ssl); } @@ -170,4 +215,24 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection connection.setTopicConnection(true); return connection; } + + public String getTopicPrefix() + { + return _topicPrefix; + } + + public void setTopicPrefix(String topicPrefix) + { + _topicPrefix = topicPrefix; + } + + public String getQueuePrefix() + { + return _queuePrefix; + } + + public void setQueuePrefix(String queuePrefix) + { + _queuePrefix = queuePrefix; + } } diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java index 587b12b51a..be1c2d6514 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java @@ -25,9 +25,8 @@ import org.apache.qpid.amqp_1_0.transport.Container; import javax.jms.*; import javax.jms.IllegalStateException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; +import javax.jms.Queue; +import java.util.*; public class ConnectionImpl implements Connection, QueueConnection, TopicConnection { @@ -43,16 +42,26 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect private boolean _isQueueConnection; private boolean _isTopicConnection; private final Collection _closeTasks = new ArrayList(); + private final String _host; + private final int _port; + private final String _username; + private final String _password; + private final String _remoteHost; + private final boolean _ssl; + private String _clientId; + private String _queuePrefix; + private String _topicPrefix; private static enum State { + UNCONNECTED, STOPPED, STARTED, CLOSED } - private volatile State _state = State.STOPPED; + private volatile State _state = State.UNCONNECTED; public ConnectionImpl(String host, int port, String username, String password, String clientId) throws JMSException { @@ -66,20 +75,52 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect public ConnectionImpl(String host, int port, String username, String password, String clientId, String remoteHost, boolean ssl) throws JMSException { - Container container = clientId == null ? new Container() : new Container(clientId); - // TODO - authentication, containerId, clientId, ssl?, etc - try + _host = host; + _port = port; + _username = username; + _password = password; + _clientId = clientId; + _remoteHost = remoteHost; + _ssl = ssl; + } + + private void connect() throws JMSException + { + synchronized(_lock) { - _conn = new org.apache.qpid.amqp_1_0.client.Connection(host, port, username, password, container, remoteHost, ssl); - // TODO - retrieve negotiated AMQP version - _connectionMetaData = new ConnectionMetaDataImpl(1,0,0); + // already connected? + if( _state == State.UNCONNECTED ) + { + _state = State.STOPPED; + + Container container = _clientId == null ? new Container() : new Container(_clientId); + // TODO - authentication, containerId, clientId, ssl?, etc + try + { + _conn = new org.apache.qpid.amqp_1_0.client.Connection(_host, + _port, _username, _password, container, _remoteHost, _ssl); + // TODO - retrieve negotiated AMQP version + _connectionMetaData = new ConnectionMetaDataImpl(1,0,0); + } + catch (org.apache.qpid.amqp_1_0.client.Connection.ConnectionException e) + { + JMSException jmsEx = new JMSException(e.getMessage()); + jmsEx.setLinkedException(e); + jmsEx.initCause(e); + throw jmsEx; + } + } } - catch (org.apache.qpid.amqp_1_0.client.Connection.ConnectionException e) + } + + private void checkNotConnected(String msg) throws IllegalStateException + { + synchronized(_lock) { - JMSException jmsEx = new JMSException(e.getMessage()); - jmsEx.setLinkedException(e); - jmsEx.initCause(e); - throw jmsEx; + if( _state != State.UNCONNECTED ) + { + throw new IllegalStateException(msg); + } } } @@ -111,7 +152,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect { throw new IllegalStateException("Cannot create a session on a closed connection"); } - + connect(); SessionImpl session = new SessionImpl(this, acknowledgeMode); session.setQueueSession(_isQueueConnection); session.setTopicSession(_isTopicConnection); @@ -125,14 +166,19 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect public String getClientID() throws JMSException { checkClosed(); - return _conn.getEndpoint().getContainer().getId(); + return _clientId; } - public void setClientID(final String s) throws JMSException + public void setClientID(final String value) throws JMSException { - throw new IllegalStateException("Cannot set client-id to \"" - + s - + "\"; client-id must be set on connection creation"); + checkNotConnected("Cannot set client-id to \"" + + value + + "\"; client-id must be set before the connection is used"); + if( _clientId !=null ) + { + throw new IllegalStateException("client-id has already been set"); + } + _clientId = value; } public ConnectionMetaData getMetaData() throws JMSException @@ -158,6 +204,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect synchronized(_lock) { checkClosed(); + connect(); if(_state == State.STOPPED) { // TODO @@ -187,6 +234,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect { session.stop(); } + case UNCONNECTED: _state = State.STOPPED; break; case CLOSED: @@ -235,7 +283,9 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect { task.onClose(); } - _conn.close(); + if(_conn != null && _state != State.UNCONNECTED ) { + _conn.close(); + } _state = State.CLOSED; } @@ -282,6 +332,10 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect final int i) throws JMSException { checkClosed(); + if (_isQueueConnection) + { + throw new IllegalStateException("QueueConnection cannot be used to create Pub/Sub based resources."); + } return null; //TODO } @@ -326,4 +380,78 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect { _isTopicConnection = topicConnection; } + + public String getTopicPrefix() + { + return _topicPrefix; + } + + public void setTopicPrefix(String topicPrefix) + { + _topicPrefix = topicPrefix; + } + + public String getQueuePrefix() + { + return _queuePrefix; + } + + public void setQueuePrefix(String queueprefix) + { + _queuePrefix = queueprefix; + } + + DecodedDestination toDecodedDestination(DestinationImpl dest) + { + String address = dest.getAddress(); + Set kind = null; + Class clazz = dest.getClass(); + if( clazz==QueueImpl.class ) + { + kind = MessageImpl.JMS_QUEUE_ATTRIBUTES; + if( _queuePrefix!=null ) + { + // Avoid double prefixing.. + if( !address.startsWith(_queuePrefix) ) + { + address = _queuePrefix+address; + } + } + } + else if( clazz==TopicImpl.class ) + { + kind = MessageImpl.JMS_TOPIC_ATTRIBUTES; + if( _topicPrefix!=null ) + { + // Avoid double prefixing.. + if( !address.startsWith(_topicPrefix) ) + { + address = _topicPrefix+address; + } + } + } + else if( clazz==TemporaryQueueImpl.class ) + { + kind = MessageImpl.JMS_TEMP_QUEUE_ATTRIBUTES; + } + else if( clazz==TemporaryTopicImpl.class ) + { + kind = MessageImpl.JMS_TEMP_TOPIC_ATTRIBUTES; + } + return new DecodedDestination(address, kind); + } + + DecodedDestination toDecodedDestination(String address, Set kind) + { + if( (kind == null || kind.equals(MessageImpl.JMS_QUEUE_ATTRIBUTES)) && _queuePrefix!=null && address.startsWith(_queuePrefix)) + { + return new DecodedDestination(address.substring(_queuePrefix.length()), MessageImpl.JMS_QUEUE_ATTRIBUTES); + } + if( (kind == null || kind.equals(MessageImpl.JMS_TOPIC_ATTRIBUTES)) && _topicPrefix!=null && address.startsWith(_topicPrefix)) + { + return new DecodedDestination(address.substring(_topicPrefix.length()), MessageImpl.JMS_TOPIC_ATTRIBUTES); + } + return new DecodedDestination(address, kind); + } + } diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DecodedDestination.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DecodedDestination.java new file mode 100644 index 0000000000..74e98c2163 --- /dev/null +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DecodedDestination.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.amqp_1_0.jms.impl; + +import java.util.Set; + +/** +* @author Hiram Chirino +*/ +class DecodedDestination +{ + private final String _address; + private final Set _attributes; + + DecodedDestination(String address, Set kind) + { + _address = address; + _attributes = kind; + } + + public String getAddress() + { + return _address; + } + + public Set getAttributes() + { + return _attributes; + } +} diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java index e0402cd0a7..3c15c74d6f 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java @@ -127,7 +127,7 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi { try { - return _session.getClientSession(). createReceiver(_destination.getAddress(), AcknowledgeMode.ALO, + return _session.getClientSession(). createReceiver(_session.toAddress(_destination), AcknowledgeMode.ALO, _linkName, _durable, getFilters(), null); } catch (AmqpErrorException e) @@ -316,9 +316,9 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi _lastUnackedMessage = deliveryTag; } - void preReceiveAction(final org.apache.qpid.amqp_1_0.client.Message msg) throws IllegalStateException + void preReceiveAction(final org.apache.qpid.amqp_1_0.client.Message msg) { - final int acknowledgeMode = _session.getAcknowledgeMode(); + int acknowledgeMode = _session.getAckModeEnum().ordinal(); if(acknowledgeMode == Session.AUTO_ACKNOWLEDGE || acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java index f1056b94fd..fba50c5477 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java @@ -50,14 +50,24 @@ public abstract class MessageImpl implements Message static final Set _supportedClasses = new HashSet(Arrays.asList(Boolean.class, Byte.class, Short.class, Integer.class, Long.class, Float.class, Double.class, Character.class, String.class, byte[].class)); - private static final Symbol JMS_TYPE = Symbol.valueOf("x-opt-jms-type"); + static final Symbol JMS_TYPE = Symbol.valueOf("x-opt-jms-type"); + static final Symbol TO_TYPE = Symbol.valueOf("x-opt-to-type"); + static final Symbol REPLY_TO_TYPE = Symbol.valueOf("x-opt-reply-type"); + + static final String QUEUE_ATTRIBUTE = "queue"; + static final String TOPIC_ATTRIBUTE = "topic"; + static final String TEMPORARY_ATTRIBUTE = "temporary"; + + static final Set JMS_QUEUE_ATTRIBUTES = set(QUEUE_ATTRIBUTE); + static final Set JMS_TOPIC_ATTRIBUTES = set(TOPIC_ATTRIBUTE); + static final Set JMS_TEMP_QUEUE_ATTRIBUTES = set(QUEUE_ATTRIBUTE, TEMPORARY_ATTRIBUTE); + static final Set JMS_TEMP_TOPIC_ATTRIBUTES = set(TOPIC_ATTRIBUTE, TEMPORARY_ATTRIBUTE); private Header _header; private Properties _properties; private ApplicationProperties _applicationProperties; private Footer _footer; - public static final Charset UTF_8_CHARSET = Charset.forName("UTF-8"); - private SessionImpl _sessionImpl; + private final SessionImpl _sessionImpl; private boolean _readOnly; private MessageAnnotations _messageAnnotations; @@ -171,45 +181,53 @@ public abstract class MessageImpl implements Message public DestinationImpl getJMSReplyTo() throws JMSException { - return DestinationImpl.valueOf(getReplyTo()); + return toDestination(getReplyTo(), splitCommaSeparateSet((String) getMessageAnnotation(REPLY_TO_TYPE))); } public void setJMSReplyTo(Destination destination) throws NonAMQPDestinationException { - if(destination == null) + if( destination==null ) { setReplyTo(null); - } - else if (destination instanceof org.apache.qpid.amqp_1_0.jms.Destination) - { - setReplyTo(((org.apache.qpid.amqp_1_0.jms.Destination)destination).getAddress()); + messageAnnotationMap().remove(REPLY_TO_TYPE); } else { - throw new NonAMQPDestinationException(destination); + DecodedDestination dd = toDecodedDestination(destination); + setReplyTo(dd.getAddress()); + messageAnnotationMap().put(REPLY_TO_TYPE, join(",", dd.getAttributes())); } } public DestinationImpl getJMSDestination() throws JMSException { - return _isFromQueue ? QueueImpl.valueOf(getTo()) - : _isFromTopic ? TopicImpl.valueOf(getTo()) - : DestinationImpl.valueOf(getTo()); + Set type = splitCommaSeparateSet((String) getMessageAnnotation(TO_TYPE)); + if( type==null ) + { + if( _isFromQueue ) + { + type = JMS_QUEUE_ATTRIBUTES; + } + else if( _isFromTopic ) + { + type = JMS_TOPIC_ATTRIBUTES; + } + } + return toDestination(getTo(), type); } public void setJMSDestination(Destination destination) throws NonAMQPDestinationException { - if(destination == null) + if( destination==null ) { setTo(null); - } - else if (destination instanceof org.apache.qpid.amqp_1_0.jms.Destination) - { - setTo(((org.apache.qpid.amqp_1_0.jms.Destination)destination).getAddress()); + messageAnnotationMap().remove(TO_TYPE); } else { - throw new NonAMQPDestinationException(destination); + DecodedDestination dd = toDecodedDestination(destination); + setTo(dd.getAddress()); + messageAnnotationMap().put(TO_TYPE, join(",", dd.getAttributes())); } } @@ -264,22 +282,13 @@ public abstract class MessageImpl implements Message public String getJMSType() throws JMSException { - Map messageAttrs = _messageAnnotations == null ? null : _messageAnnotations.getValue(); - final Object attrValue = messageAttrs == null ? null : messageAttrs.get(JMS_TYPE); - + final Object attrValue = getMessageAnnotation(JMS_TYPE); return attrValue instanceof String ? attrValue.toString() : null; } public void setJMSType(String s) throws JMSException { - Map messageAttrs = _messageAnnotations == null ? null : _messageAnnotations.getValue(); - if(messageAttrs == null) - { - messageAttrs = new HashMap(); - _messageAnnotations = new MessageAnnotations(messageAttrs); - } - - messageAttrs.put(JMS_TYPE, s); + messageAnnotationMap().put(JMS_TYPE, s); } public long getJMSExpiration() throws JMSException @@ -1206,4 +1215,118 @@ public abstract class MessageImpl implements Message } abstract Collection
getSections(); + + DecodedDestination toDecodedDestination(Destination destination) throws NonAMQPDestinationException + { + if(destination == null) + { + return null; + } + if (destination instanceof DestinationImpl) + { + return _sessionImpl.getConnection().toDecodedDestination((DestinationImpl) destination); + } + throw new NonAMQPDestinationException(destination); + } + + DestinationImpl toDestination(String address, Set kind) + { + if( address == null ) + { + return null; + } + + // If destination prefixes are in play, we have to strip the the prefix, and we might + // be able to infer the kind, if we don't know it yet. + DecodedDestination decoded = _sessionImpl.getConnection().toDecodedDestination(address, kind); + address = decoded.getAddress(); + kind = decoded.getAttributes(); + + if( kind == null ) + { + return DestinationImpl.valueOf(address); + } + if( kind.contains(QUEUE_ATTRIBUTE) ) + { + if( kind.contains(TEMPORARY_ATTRIBUTE) ) + { + return new TemporaryQueueImpl(address, null, _sessionImpl); + } + else + { + return QueueImpl.valueOf(address); + } + } + else if ( kind.contains(TOPIC_ATTRIBUTE) ) + { + if( kind.contains(TEMPORARY_ATTRIBUTE) ) + { + return new TemporaryTopicImpl(address, null, _sessionImpl); + } + else + { + return TopicImpl.valueOf(address); + } + } + + return DestinationImpl.valueOf(address); + } + + private Object getMessageAnnotation(Symbol key) + { + Map messageAttrs = _messageAnnotations == null ? null : _messageAnnotations.getValue(); + return messageAttrs == null ? null : messageAttrs.get(key); + } + + private Map messageAnnotationMap() + { + Map messageAttrs = _messageAnnotations == null ? null : _messageAnnotations.getValue(); + if(messageAttrs == null) + { + messageAttrs = new HashMap(); + _messageAnnotations = new MessageAnnotations(messageAttrs); + } + return messageAttrs; + } + + Set splitCommaSeparateSet(String value) + { + if( value == null ) + { + return null; + } + HashSet rc = new HashSet(); + for( String x: value.split("\\s*,\\s*") ) + { + rc.add(x); + } + return rc; + } + + private static Set set(String ...args) + { + HashSet s = new HashSet(); + for (String arg : args) + { + s.add(arg); + } + return Collections.unmodifiableSet(s); + } + + static final String join(String sep, Iterable items) + { + StringBuilder result = new StringBuilder(); + + for (Object o : items) + { + if (result.length() > 0) + { + result.append(sep); + } + result.append(o.toString()); + } + + return result.toString(); + } + } diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java index 5bb8845eb7..badc20472b 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java @@ -20,7 +20,6 @@ package org.apache.qpid.amqp_1_0.jms.impl; import org.apache.qpid.amqp_1_0.client.Sender; import org.apache.qpid.amqp_1_0.jms.MessageProducer; -import org.apache.qpid.amqp_1_0.jms.Queue; import org.apache.qpid.amqp_1_0.jms.QueueSender; import org.apache.qpid.amqp_1_0.jms.TemporaryDestination; import org.apache.qpid.amqp_1_0.jms.TopicPublisher; @@ -61,7 +60,7 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP { try { - _sender = _session.getClientSession().createSender(_destination.getAddress()); + _sender = _session.getClientSession().createSender(_session.toAddress(_destination)); } catch (Sender.SenderCreationException e) { @@ -297,7 +296,7 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP try { _destination = (DestinationImpl) destination; - _sender = _session.getClientSession().createSender(_destination.getAddress()); + _sender = _session.getClientSession().createSender(_session.toAddress(_destination)); send(message, deliveryMode, priority, ttl); diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java index 2a52b0557a..95c1497d07 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java @@ -40,7 +40,25 @@ public class ObjectMessageImpl extends MessageImpl implements ObjectMessage { static final Symbol CONTENT_TYPE = Symbol.valueOf("application/x-java-serialized-object"); - private Data _objectData; + static final Data NULL_OBJECT_DATA; + static + { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try + { + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(null); + oos.flush(); + oos.close(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + NULL_OBJECT_DATA = new Data(new Binary(baos.toByteArray())); + } + + private Data _objectData = NULL_OBJECT_DATA; protected ObjectMessageImpl(Header header, MessageAnnotations messageAnnotations, diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java index 527e82eaed..8fab315b10 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java @@ -18,9 +18,7 @@ */ package org.apache.qpid.amqp_1_0.jms.impl; -import java.util.Collections; -import java.util.Enumeration; -import java.util.Map; +import java.util.*; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import org.apache.qpid.amqp_1_0.client.AcknowledgeMode; @@ -29,6 +27,7 @@ import org.apache.qpid.amqp_1_0.client.Receiver; import org.apache.qpid.amqp_1_0.jms.QueueBrowser; import org.apache.qpid.amqp_1_0.type.AmqpErrorException; import org.apache.qpid.amqp_1_0.type.Symbol; +import org.apache.qpid.amqp_1_0.type.UnsignedInteger; import org.apache.qpid.amqp_1_0.type.messaging.Filter; import org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter; import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode; @@ -39,49 +38,27 @@ public class QueueBrowserImpl implements QueueBrowser private static final String JMS_SELECTOR = "jms-selector"; private QueueImpl _queue; private String _selector; - private Receiver _receiver; - private Message _nextElement; - private MessageEnumeration _enumeration; + private final SessionImpl _session; + private Map _filters; + private HashSet _enumerations = new HashSet(); + private boolean _closed; QueueBrowserImpl(final QueueImpl queue, final String selector, SessionImpl session) throws JMSException { _queue = queue; _selector = selector; + _session = session; - Map filters; if(selector == null || selector.trim().equals("")) { - filters = null; + _filters = null; } else { - filters = Collections.singletonMap(Symbol.valueOf(JMS_SELECTOR),(Filter) new JMSSelectorFilter(_selector)); - } - - - try - { - _receiver = session.getClientSession().createReceiver(queue.getAddress(), - StdDistMode.COPY, - AcknowledgeMode.AMO,null, - false, - filters, null); - _nextElement = _receiver.receive(0L); - _enumeration = new MessageEnumeration(); - } - catch(AmqpErrorException e) - { - org.apache.qpid.amqp_1_0.type.transport.Error error = e.getError(); - if(AmqpError.INVALID_FIELD.equals(error.getCondition())) - { - throw new InvalidSelectorException(e.getMessage()); - } - else - { - throw new JMSException(e.getMessage(), error.getCondition().getValue().toString()); - } - + _filters = Collections.singletonMap(Symbol.valueOf(JMS_SELECTOR),(Filter) new JMSSelectorFilter(_selector)); + // We do this just to have the server validate the filter.. + new MessageEnumeration().close(); } } @@ -97,42 +74,124 @@ public class QueueBrowserImpl implements QueueBrowser public Enumeration getEnumeration() throws JMSException { - if(_enumeration == null) + if(_closed) { throw new IllegalStateException("Browser has been closed"); } - return _enumeration; + return new MessageEnumeration(); } public void close() throws JMSException { - _receiver.close(); - _enumeration = null; + _closed = true; + for(MessageEnumeration me : new ArrayList(_enumerations)) + { + me.close(); + } } - private final class MessageEnumeration implements Enumeration + private final class MessageEnumeration implements Enumeration { + private Receiver _receiver; + private MessageImpl _nextElement; + private boolean _needNext = true; + + MessageEnumeration() throws JMSException + { + try + { + _receiver = _session.getClientSession().createReceiver(_session.toAddress(_queue), + StdDistMode.COPY, + AcknowledgeMode.AMO, null, + false, + _filters, null); + _receiver.setCredit(UnsignedInteger.valueOf(100), true); + } + catch(AmqpErrorException e) + { + org.apache.qpid.amqp_1_0.type.transport.Error error = e.getError(); + if(AmqpError.INVALID_FIELD.equals(error.getCondition())) + { + throw new InvalidSelectorException(e.getMessage()); + } + else + { + throw new JMSException(e.getMessage(), error.getCondition().getValue().toString()); + } + + } + _enumerations.add(this); + + } + + public void close() + { + _enumerations.remove(this); + _receiver.close(); + _receiver = null; + } @Override public boolean hasMoreElements() { + if( _receiver == null ) + { + return false; + } + if( _needNext ) + { + _needNext = false; + _nextElement = createJMSMessage(_receiver.receive(0L)); + if( _nextElement == null ) + { + // Drain to verify there really are no more messages. + _receiver.drain(); + _receiver.drainWait(); + _nextElement = createJMSMessage(_receiver.receive(0L)); + if( _nextElement == null ) + { + close(); + } + else + { + // there are still more messages, open up the credit window again.. + _receiver.clearDrain(); + } + } + } return _nextElement != null; } @Override - public Message nextElement() + public MessageImpl nextElement() { - - Message message = _nextElement; - if(message == null) + if( hasMoreElements() ) { - message = _receiver.receive(0l); + MessageImpl message = _nextElement; + _nextElement = null; + _needNext = true; + return message; } - if(message != null) + else { - _nextElement = _receiver.receive(0l); + throw new NoSuchElementException(); } + } + } + + MessageImpl createJMSMessage(final Message msg) + { + if(msg != null) + { + final MessageImpl message = _session.getMessageFactory().createMessage(_queue, msg); + message.setFromQueue(true); + message.setFromTopic(false); return message; } + else + { + return null; + } } + } diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java index d46ed7183f..67b597f5cf 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java @@ -41,7 +41,7 @@ public class QueueReceiverImpl extends MessageConsumerImpl implements QueueRecei { try { - return getSession().getClientSession().createMovingReceiver(getDestination().getAddress()); + return getSession().getClientSession().createMovingReceiver(getSession().toAddress(getDestination())); } catch (AmqpErrorException e) { diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java index e321245a0e..58b7d4f625 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java @@ -766,7 +766,7 @@ public class SessionImpl implements Session, QueueSession, TopicSession { while(!_closed) { - while(!_started || (_recoveredMessage == null && _messageConsumerList.isEmpty())) + while(!_closed && (!_started || (_recoveredMessage == null && _messageConsumerList.isEmpty()))) { try { @@ -777,7 +777,7 @@ public class SessionImpl implements Session, QueueSession, TopicSession return; } } - while(_started && (_recoveredMessage != null || !_messageConsumerList.isEmpty())) + while(!_closed && (_started && (_recoveredMessage != null || !_messageConsumerList.isEmpty()))) { Message msg; @@ -804,6 +804,10 @@ public class SessionImpl implements Session, QueueSession, TopicSession if(message != null) { + if(_acknowledgeMode == AcknowledgeMode.CLIENT_ACKNOWLEDGE) + { + consumer.setLastUnackedMessage(msg.getDeliveryTag()); + } _currentConsumer = consumer; _currentMessage = msg; try @@ -816,11 +820,11 @@ public class SessionImpl implements Session, QueueSession, TopicSession _currentMessage = null; } - if((_recoveredMessage == null) && (_acknowledgeMode == AcknowledgeMode.AUTO_ACKNOWLEDGE - || _acknowledgeMode == AcknowledgeMode.DUPS_OK_ACKNOWLEDGE)) + if(_recoveredMessage == null) { - consumer.acknowledge(msg); + consumer.preReceiveAction(msg); } + } } @@ -895,4 +899,10 @@ public class SessionImpl implements Session, QueueSession, TopicSession { _isTopicSession = topicSession; } + + String toAddress(DestinationImpl dest) + { + return _connection.toDecodedDestination(dest).getAddress(); + } + } diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java index 52d8c412ec..f267794796 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java @@ -66,7 +66,7 @@ public class TopicSubscriberImpl extends MessageConsumerImpl implements TopicSub { try { - String address = getDestination().getAddress(); + String address = getSession().toAddress(getDestination()); Receiver receiver = getSession().getClientSession().createReceiver(address, StdDistMode.COPY, AcknowledgeMode.ALO, getLinkName(), isDurable(), getFilters(), diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/jndi/PropertiesFileInitialContextFactory.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/jndi/PropertiesFileInitialContextFactory.java index 31030a7d30..091ab41304 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/jndi/PropertiesFileInitialContextFactory.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/jndi/PropertiesFileInitialContextFactory.java @@ -57,6 +57,9 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor Map data = new ConcurrentHashMap(); String file = null; + String fileName = (environment.containsKey(Context.PROVIDER_URL)) + ? (String)environment.get(Context.PROVIDER_URL) : System.getProperty(Context.PROVIDER_URL); + try { -- cgit v1.2.1