From e83af1b169f5d2ccb6e6be87ba495049497cb13e Mon Sep 17 00:00:00 2001
From: Arnaud Simon The retuned session is suspended
- * (i.e. this session is not attached with an underlying channel)
- *
- * @param expiryInSeconds Expiry time expressed in seconds, if the value is <= 0 then the session does not expire.
- * @return A Newly created (suspended) session.
- */
- public Session createSession(long expiryInSeconds);
-
- /**
- * Create a DtxSession for this connection.
- * A Dtx Session must be used when resources have to be manipulated as
- * part of a global transaction.
- * The retuned DtxSession is suspended
- * (i.e. this session is not attached with an underlying channel)
- *
- * @param expiryInSeconds Expiry time expressed in seconds, if the value is <= 0 then the session does not expire.
- * @return A Newly created (suspended) DtxSession.
- */
- public DtxSession createDTXSession(int expiryInSeconds);
-
- /**
- * If the communication layer detects a serious problem with a connection, it
- * informs the connection's ExceptionListener
- *
- * @param exceptionListner The execptionListener
- */
-
- public void setExceptionListener(ExceptionListener exceptionListner);
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java b/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java
deleted file mode 100644
index 6fe019c3a1..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.nclient;
-
-import org.apache.qpidity.transport.DtxCoordinationCommitResult;
-import org.apache.qpidity.transport.DtxCoordinationGetTimeoutResult;
-import org.apache.qpidity.transport.DtxCoordinationPrepareResult;
-import org.apache.qpidity.transport.DtxCoordinationRecoverResult;
-import org.apache.qpidity.transport.DtxCoordinationRollbackResult;
-import org.apache.qpidity.transport.DtxDemarcationEndResult;
-import org.apache.qpidity.transport.DtxDemarcationStartResult;
-import org.apache.qpidity.transport.Future;
-import org.apache.qpidity.transport.Option;
-
-/**
- * This session�s resources are control under the scope of a distributed transaction.
- */
-public interface DtxSession extends Session
-{
-
- /**
- * This method is called when messages should be produced and consumed on behalf a transaction
- * branch identified by xid.
- * possible options are:
- * The sequence of event for transferring a message is as follows:
- * A session is associated with a connection.
- * When created a Session is not attached with an underlying channel.
- * Session is single threaded The session timer will start to tick in suspend.
- * When a session is suspend any operation of this session and of the associated resources are unavailable.
- */
- public void sessionSuspend();
-
- //------------------------------------------------------
- // Messaging methods
- // Producer
- //------------------------------------------------------
- /**
- * Transfer the given message to a specified exchange.
- * This is a convinience method for providing a complete message
- * using a single method which internaly is mapped to messageTransfer(), headers() followed
- * by data() and an endData().
- * This method should only be used by small messages This is a convinience method for streaming a message using pull semantics
- * using a single method as opposed to doing a push using messageTransfer(), headers() followed
- * by a series of data() and an endData(). Internally data will be pulled from Message object(which wrap's a data stream) using it's read()
- * and pushed using messageTransfer(), headers() followed by a series of data() and an endData().
- *
- *
- *
- * @param xid Specifies the xid of the transaction branch to be started.
- * @param options Possible options are: {@link Option#JOIN} and {@link Option#RESUME}.
- * @return Confirms to the client that the transaction branch is started or specify the error condition.
- */
- public Future
- *
- *
- * @param xid Specifies the xid of the transaction branch to be ended.
- * @param options Available options are: {@link Option#FAIL} and {@link Option#SUSPEND}.
- * @return Confirms to the client that the transaction branch is ended or specify the error condition.
- */
- public Future
- *
- *
- * @param xid Specifies the xid of the transaction branch to be committed.
- * @param options Available option is: {@link Option#ONE_PHASE}
- * @return Confirms to the client that the transaction branch is committed or specify the error condition.
- */
- public Futureconnection, it
- * informs the connection's ExceptionListener
- */
-public interface ExceptionListener
-{
- /**
- * If the communication layer detects a serious problem with a connection, it
- * informs the connection's ExceptionListener
- *
- * @param exception The exception comming from the communication layer.
- * @see Connection
- */
- public void onException(QpidException exception);
-}
\ No newline at end of file
diff --git a/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java b/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java
deleted file mode 100644
index 42dad7329e..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package org.apache.qpidity.nclient;
-
-import org.apache.qpidity.njms.ConnectionFactoryImpl;
-import org.apache.qpidity.njms.TopicImpl;
-
-public class JMSTestCase
-{
- public static void main(String[] args)
- {
- try
- {
- javax.jms.Connection con = (new ConnectionFactoryImpl("localhost",5672, "test", "guest","guest")).createConnection();
- con.start();
-
- javax.jms.Session ssn = con.createSession(false, 1);
-
- javax.jms.Destination dest = new TopicImpl("myTopic");
- javax.jms.MessageProducer prod = ssn.createProducer(dest);
- javax.jms.MessageConsumer cons = ssn.createConsumer(dest);
-
- javax.jms.BytesMessage msg = ssn.createBytesMessage();
- msg.writeInt(123);
- prod.send(msg);
-
- javax.jms.BytesMessage m = (javax.jms.BytesMessage)cons.receive();
- System.out.println("Data : " + m.readInt());
-
- }
- catch(Exception e)
- {
- e.printStackTrace();
- }
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java b/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java
deleted file mode 100644
index ecdd2d7952..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.nclient;
-
-import java.nio.ByteBuffer;
-
-import org.apache.qpidity.transport.Header;
-
-/**
- * Assembles message parts.
- *
- *
- * This is up to the implementation to assembled the message when the different parts
- * are transferred.
- */
-public interface MessagePartListener
-{
- /**
- * Indicates the Message transfer has started.
- *
- * @param transferId
- */
- public void messageTransfer(long transferId);
-
- /**
- * Add the following headers ( {@link org.apache.qpidity.DeliveryProperties}
- * or {@link org.apache.qpidity.ApplicationProperties} ) to the message being received.
- *
- * @param headers Either DeliveryProperties or ApplicationProperties
- */
- public void messageHeader(Header header);
-
- /**
- * Add the following byte array to the content of the message being received
- *
- * @param data Data to be added or streamed.
- */
- public void data(ByteBuffer src);
-
- /**
- * Indicates that the message has been fully received.
- */
- public void messageReceived();
-
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/client/Session.java b/java/client/src/main/java/org/apache/qpidity/client/Session.java
deleted file mode 100644
index 88d85e823b..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/client/Session.java
+++ /dev/null
@@ -1,614 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.nclient;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Map;
-
-import org.apache.qpidity.transport.*;
-import org.apache.qpidity.api.Message;
-
-/**
- * command1 will be synchronously invoked using the following sequence:
- *
- *
- */
-public interface Session
-{
- public static final short TRANSFER_ACQUIRE_MODE_NO_ACQUIRE = 0;
- public static final short TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE = 1;
- public static final short TRANSFER_CONFIRM_MODE_REQUIRED = 1;
- public static final short TRANSFER_CONFIRM_MODE_NOT_REQUIRED = 0;
- public static final short MESSAGE_FLOW_MODE_CREDIT = 0;
- public static final short MESSAGE_FLOW_MODE_WINDOW = 1;
- public static final short MESSAGE_FLOW_UNIT_MESSAGE = 0;
- public static final short MESSAGE_FLOW_UNIT_BYTE = 1;
- public static final short MESSAGE_REJECT_CODE_GENERIC = 0;
- public static final short MESSAGE_REJECT_CODE_IMMEDIATE_DELIVERY_FAILED = 1;
- public static final short MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE = 0;
- public static final short MESSAGE_ACQUIRE_MESSAGES_IF_ALL_ARE_AVAILABLE = 1;
-
- //------------------------------------------------------
- // Session housekeeping methods
- //------------------------------------------------------
-
- /**
- * Sync method will block until all outstanding commands
- * are executed.
- */
- public void sync();
-
- /**
- * Close this session and any associated resources.
- */
- public void sessionClose();
-
- /**
- * Suspend this session resulting in interrupting the traffic with the broker.
- * session.command1()
- * session.sync()
- * off ({@link Session#TRANSFER_CONFIRM_MODE_NOT_REQUIRED}): confirmation
- * is not required, once a message has been transferred in pre-acquire
- * mode (or once acquire has been sent in no-acquire mode) the message is considered
- * transferred
- *
- *
- * @param acquireMode
- *
- * @throws java.io.IOException If transferring a message fails due to some internal communication error.
- */
- public void messageTransfer(String destination, Message msg, short confirmMode, short acquireMode)
- throws IOException;
-
- /**
- *
This method should only be used by large messages
- * There are two convinience Message classes provided to facilitate this.
- *
- *
- * You could also implement a the {@link org.apache.qpidity.nclient.util.FileMessage}
- * {@link org.apache.qpidity.nclient.util.StreamingMessage}
- * Message interface to and wrap any
- * data stream you want.
- *
This way of transferring messages is useful when streaming large messages - *
In the interval [messageTransfer endData] any attempt to call a method other than - * {@link Session#header}, {@link Session#endData} ore {@link Session#sessionClose} - * will result in an exception being thrown. - * - * @param destination The exchange the message is being sent. - * @param confirmMode
{@link org.apache.qpidity.transport.DeliveryProperties}
- * or {@link org.apache.qpidity.transport.MessageProperties}
- * @see org.apache.qpidity.transport.DeliveryProperties
- * @see org.apache.qpidity.transport.MessageProperties
- */
- public void header(Struct... headers);
-
- /**
- * Add the following byte array to the content of the message being sent.
- *
- * @param data Data to be added.
- */
- public void data(byte[] data);
-
- /**
- * Add the following ByteBuffer to the content of the message being sent.
- * Note that only the data between the buffer current position and the - * buffer limit is added. - * It is therefore recommended to flip the buffer before adding it to the message, - * - * @param buf Data to be added. - */ - public void data(ByteBuffer buf); - - /** - * Add the following String to the content of the message being sent. - * - * @param str String to be added. - */ - public void data(String str); - - /** - * Signals the end of data for the message. - */ - public void endData(); - - //------------------------------------------------------ - // Messaging methods - // Consumer - //------------------------------------------------------ - - /** - * Associate a message listener with a destination. - *
The destination is bound to a queue and messages are filtered based - * on the provider filter map (message filtering is specific to the provider and may not be handled). - *
Following are valid options: - *
If the no-local field is set the server will not send - * messages to the connection that - * published them. - *
Request exclusive subscription access, meaning only this - * ubscription can access the queue. - *
Has no effect as it represents an �empty� option. - *
Only one listerner per destination is allowed. This means - * that the previous message listener is replaced. This is done gracefully i.e. the message - * listener is replaced once it return from the processing of a message. - * - * @param destination The destination the listener is associated with. - * @param listener The new listener for this destination. - */ - public void setMessageListener(String destination, MessagePartListener listener); - - /** - * Sets the mode of flow control used for a given destination. - *
With credit based flow control, the broker continually maintains its current - * credit balance with the recipient. The credit balance consists of two values, a message - * count, and a byte count. Whenever message data is sent, both counts must be decremented. - * If either value reaches zero, the flow of message data must stop. Additional credit is - * received via the {@link Session#messageFlow} method. - *
Window based flow control is identical to credit based flow control, however message - * acknowledgment implicitly grants a single unit of message credit, and the size of the - * message in byte credits for each acknowledged message. - * - * @param destination The destination to set the flow mode on. - * @param mode
The broker's credit will always be zero when - * this method completes. - * - * @param destination The destination to call flush on. - */ - public void messageFlush(String destination); - - /** - * On receipt of this method, the brokers MUST set his credit to zero for the given - * destination. This obeys the generic semantics of command completion, i.e. when confirmation - * is issued credit MUST be zero and no further messages will be sent until such a time as - * further credit is received. - * - * @param destination The destination to stop. - */ - public void messageStop(String destination); - - /** - * Acknowledge the receipt of ranges of messages. - *
Message must have been previously acquired either by receiving them in - * pre-acquire mode or by explicitly acquiring them. - * - * @param ranges Range of acknowledged messages. - */ - public void messageAcknowledge(RangeSet ranges); - - /** - * Reject ranges of acquired messages. - *
The broker MUST deliver rejected messages to the - * alternate-exchange on the queue from which it was delivered. If no alternate-exchange is - * defined for that queue the broker MAY discard the message. - * - * @param ranges Range of rejected messages. - * @param code The reject code must be one of {@link Session#MESSAGE_REJECT_CODE_GENERIC} or - * {@link Session#MESSAGE_REJECT_CODE_IMMEDIATE_DELIVERY_FAILED} (immediate delivery was attempted but - * failed). - * @param text String describing the reason for a message transfer rejection. - */ - public void messageReject(RangeSet ranges, int code, String text); - - /** - * As it is possible that the broker does not manage to reject some messages, after completion of - * {@link Session#messageReject} this method will return the ranges of rejected messages. - *
Note that {@link Session#messageReject} and this methods are asynchronous therefore for accessing to the - * previously rejected messages this method must be invoked in conjunction with {@link Session#sync()}. - *
A recommended invocation sequence would be: - *
As those messages may have been consumed by another receivers hence, - * message acquisition can fail. - * The outcome of the acquisition is returned as an array of ranges of qcquired messages. - *
This method should only be called on non-acquired messages. - * - * @param mode One of:
Note that {@link Session#messageAcquire} and this methods are asynchronous therefore for accessing to the - * previously acquired messages this method must be invoked in conjunction with {@link Session#sync()}. - *
A recommended invocation sequence would be: - *
Released messages are re-enqueued. - * - * @param ranges Ranges of messages to be released. - */ - public void messageRelease(RangeSet ranges); - - // ----------------------------------------------- - // Local transaction methods - // ---------------------------------------------- - /** - * Selects the session for local transaction support. - */ - public void txSelect(); - - /** - * Commit the receipt and the delivery of all messages exchanged by this session resources. - * - * @throws IllegalStateException If this session is not transacted. - */ - public void txCommit() throws IllegalStateException; - - /** - * Rollback the receipt and the delivery of all messages exchanged by this session resources. - * - * @throws IllegalStateException If this session is not transacted. - */ - public void txRollback() throws IllegalStateException; - - //--------------------------------------------- - // Queue methods - //--------------------------------------------- - - /** - * Declare a queue with the given queueName - *
Following are the valid options: - *
If this field is set and the exclusive field is also set, - * then the queue is deleted when the connection closes. - * If this field is set and the exclusive field is not set the queue is deleted when all - * the consumers have finished using it. - *
If set when creating a new queue, - * the queue will be marked as durable. Durable queues - * remain active when a server restarts. Non-durable queues (transient queues) are purged - * if/when a server restarts. Note that durable queues do not necessarily hold persistent - * messages, although it does not make sense to send persistent messages to a transient - * queue. - *
Exclusive queues can only be used from one connection at a time. - * Once a connection declares an exclusive queue, that queue cannot be used by any other connections until the - * declaring connection closes. - *
If set, the server will not create the queue. - * This field allows the client to assert the presence of a queue without modifying the server state. - *
Has no effect as it represents an �empty� option. - *
In the absence of a particular option, the defaul value is false for each option - * - * @param queueName The name of the delcared queue. - * @param alternateExchange If a message is rejected by a queue, then it is sent to the alternate-exchange. A message - * may be rejected by a queue for the following reasons: - *
Following are the valid options: - *
If set, the server will only delete the queue if it has no messages. - *
If set, the server will only delete the queue if it has no consumers. - * If the queue has consumers the server does does not delete it but raises a channel exception instead. - *
Has no effect as it represents an �empty� option. - *
In the absence of a particular option, the defaul value is false for each option
- * - * @param queueName Specifies the name of the queue to delete. If the queue name is empty, refers to the - * current queue for the session, which is the last declared queue. - * @param options Set of options (Valid options are: {@link Option#IF_EMPTY}, {@link Option#IF_UNUSED} - * and {@link Option#NO_OPTION}) - * @see Option - */ - public void queueDelete(String queueName, Option... options); - - // -------------------------------------- - // exhcange methods - // -------------------------------------- - - /** - * This method creates an exchange if it does not already exist, and if the exchange exists, - * verifies that it is of the correct and expected class. - *Following are the valid options: - *
If set, the exchange is deleted when all queues have finished using it. - *
If set when creating a new exchange, the exchange will - * be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient - * exchanges) are purged if/when a server restarts. - *
If set, the server will not create the exchange. - * The client can use this to check whether an exchange exists without modifying the server state. - *
Has no effect as it represents an �empty� option. - *
In the absence of a particular option, the defaul value is false for each option
- * - * @param exchangeName The exchange name. - * @param type Each exchange belongs to one of a set of exchange types implemented by the server. The - * exchange types define the functionality of the exchange - i.e. how messages are routed - * through it. It is not valid or meaningful to attempt to change the type of an existing - * exchange. Default exchange types are: direct, topic, headers and fanout. - * @param alternateExchange In the event that a message cannot be routed, this is the name of the exchange to which - * the message will be sent. - * @param options Set of options (valid options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE}, - * {@link Option#PASSIVE}, {@link Option#NO_OPTION}) - * @param arguments Used for backward compatibility - * @see Option - */ - public void exchangeDeclare(String exchangeName, String type, String alternateExchange, MapFollowing are the valid options: - *
If set, the server will only delete the exchange if it has no queue bindings. If the - * exchange has queue bindings the server does not delete it but raises a channel exception - * instead. - *
Has no effect as it represents an �empty� option. - *
In the absence of a particular option, the defaul value is false for each option
- *
- * @param exchangeName The name of exchange to be deleted.
- * @param options Set of options (valid options are: {@link Option#IF_UNUSED}, {@link Option#NO_OPTION})
- * @see Option
- */
- public void exchangeDelete(String exchangeName, Option... options);
-
-
- /**
- * This method is used to request information on a particular exchange.
- *
- * @param exchangeName The name of the exchange for which information is requested. If not specified explicitly
- * the default exchange is implied.
- * @result Information on the specified exchange.
- */
- public Future A Simple implementation of the message interface
- * for small messages. When the readData methods are called
- * we assume the message is complete. i.e there want be any
- * appendData operations after that. If you need large message support please see
- * In all the implementations in our code base
- * when we create a Reference we pass in In our construction of the Reference the last param. is null,
- * we could put a url to a jar that contains our {@link ObjectFactory} so that
- * any of our objects stored in JNDI can be recreated without even having
- * the classes locally. As it is the The connection is created in stopped mode. No messages
- * will be delivered until the The connection is created in stopped mode. No messages
- * will be delivered until the The queueConnection is created in stopped mode. No messages
- * will be delivered until the The queueConnection is created in stopped mode. No messages
- * will be delivered until the The topicConnection is created in stopped mode. No messages
- * will be delivered until the The topicConnection is created in stopped mode. No messages
- * will be delivered until the The XAConnection is created in stopped mode. No messages
- * will be delivered until the The XAConnection is created in stopped mode. No messages
- * will be delivered until the The XATopicConnection is created in stopped mode. No messages
- * will be delivered until the The XATopicConnection is created in stopped mode. No messages
- * will be delivered until the The XAQueueConnection is created in stopped mode. No messages
- * will be delivered until the The XAQueueConnection is created in stopped mode. No messages
- * will be delivered until the It is either preconfigured as a JNDI property or assigned dynamically by the application
- * by calling the The preferred way to assign a JMS client's client identifier is for
- * it to be configured in a client-specific In Qpid it is not possible to change the client ID. If one is not specified
- * upon connection construction, an id is generated automatically. Therefore
- * we can always throw an exception.
- * TODO: Make sure that the client identifier can be set on the The JMS specification says:
- * If a JMS provider detects a serious problem with a connection, it
- * informs the connection's A connection serializes execution of its
- * A JMS provider should attempt to resolve connection problems
- * itself before it notifies the client of them.
- *
- * @param exceptionListener The connection listener.
- * @throws JMSException If the connection is closed.
- */
- public synchronized void setExceptionListener(ExceptionListener exceptionListener) throws JMSException
- {
- checkNotClosed();
- _exceptionListener = exceptionListener;
- _qpidExceptionListener.setJMSExceptionListner(_exceptionListener);
- }
-
- /**
- * Starts (or restarts) a connection's delivery of incoming messages.
- * A call to start on a connection that has already been
- * started is ignored.
- *
- * @throws JMSException In case of a problem due to some internal error.
- */
- public synchronized void start() throws JMSException
- {
- checkNotClosed();
- if (!_started)
- {
- // start all the sessions
- for (SessionImpl session : _sessions)
- {
- try
- {
- session.start();
- }
- catch (Exception e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- }
- _started = true;
- }
- }
-
- /**
- * Temporarily stops a connection's delivery of incoming messages.
- * The JMS specification says:
- * Delivery can be restarted using the connection's This call blocks until receives and/or message listeners in progress have completed.
- *
- * @throws JMSException In case of a problem due to some internal error.
- */
- public synchronized void stop() throws JMSException
- {
- checkNotClosed();
- if (_started)
- {
- // stop all the sessions
- for (SessionImpl session : _sessions)
- {
- try
- {
- session.stop();
- }
- catch (Exception e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- }
- _started = false;
- }
- }
-
- /**
- * Closes the connection.
- * The JMS specification says:
- * Since a provider typically allocates significant resources outside
- * the JVM on behalf of a connection, clients should close these resources
- * when they are not needed. Relying on garbage collection to eventually
- * reclaim these resources may not be timely enough.
- * There is no need to close the sessions, producers, and consumers of a closed connection.
- * Closing a connection causes all temporary destinations to be deleted.
- * When this method is invoked, it should not return until message
- * processing has been shut down in an orderly fashion.
- *
- * @throws JMSException In case of a problem due to some internal error.
- */
- public synchronized void close() throws JMSException
- {
- checkNotClosed();
- if (!_isClosed)
- {
- _isClosed = true;
- _started = false;
- // close all the sessions
- for (SessionImpl session : _sessions)
- {
- session.close();
- }
- // close the underlaying Qpid connection
- try
- {
- _qpidConnection.close();
- }
- catch (QpidException e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- }
- }
-
- /**
- * Creates a connection consumer for this connection (optional operation).
- * This is an expert facility for App server integration.
- *
- * @param destination The destination to access.
- * @param messageSelector Only messages with properties matching the message selector expression are delivered.
- * @param sessionPool The session pool to associate with this connection consumer.
- * @param maxMessages The maximum number of messages that can be assigned to a server session at one time.
- * @return Null for the moment.
- * @throws JMSException In case of a problem due to some internal error.
- */
- public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
- ServerSessionPool sessionPool, int maxMessages)
- throws JMSException
- {
- checkNotClosed();
- return null;
- }
-
- /**
- * Create a durable connection consumer for this connection (optional operation).
- *
- * @param topic The topic to access.
- * @param subscriptionName Durable subscription name.
- * @param messageSelector Only messages with properties matching the message selector expression are delivered.
- * @param sessionPool The server session pool to associate with this durable connection consumer.
- * @param maxMessages The maximum number of messages that can be assigned to a server session at one time.
- * @return Null for the moment.
- * @throws JMSException In case of a problem due to some internal error.
- */
- public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName,
- String messageSelector, ServerSessionPool sessionPool,
- int maxMessages) throws JMSException
- {
- checkNotClosed();
- return null;
- }
-
- //-------------- QueueConnection API
-
- /**
- * Create a QueueSession.
- *
- * @param transacted Indicates whether the session is transacted.
- * @param acknowledgeMode Indicates whether the consumer or the
- * client will acknowledge any messages it receives; ignored if the session
- * is transacted. Legal values are If the MessageActor is closed, throw a javax.njms.IllegalStateException.
- * The method is not synchronized, since MessageProducers can only be used by a single thread.
- *
- * @throws IllegalStateException if the MessageActor is closed
- */
- protected void checkNotClosed() throws IllegalStateException
- {
- if (_isClosed || _session == null)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Actor " + this + " is already closed");
- }
- throw new IllegalStateException("Actor " + this + " is already closed");
- }
- _session.checkNotClosed();
- }
-
- /**
- * Closes a MessageActor.
- * This method is invoked when the session is closing or when this
- * messageActor is closing.
- *
- * @throws JMSException If the MessaeActor cannot be closed due to some internal error.
- */
- protected void closeMessageActor() throws JMSException
- {
- if (!_isClosed)
- {
- getSession().getQpidSession().messageCancel(getMessageActorID());
- _isClosed = true;
- }
- }
-
- /**
- * Get the associated session object.
- *
- * @return This Actor's Session.
- */
- public SessionImpl getSession()
- {
- return _session;
- }
-
- /**
- * Get the ID of this actor within its session.
- *
- * @return This actor ID.
- */
- protected String getMessageActorID()
- {
- return _messageActorID;
- }
-
-
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
deleted file mode 100644
index 7f55dcbd67..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
+++ /dev/null
@@ -1,662 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.njms;
-
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Queue;
-
-import org.apache.qpidity.QpidException;
-import org.apache.qpidity.nclient.MessagePartListener;
-import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
-import org.apache.qpidity.exchange.ExchangeDefaults;
-import org.apache.qpidity.filter.JMSSelectorFilter;
-import org.apache.qpidity.filter.MessageFilter;
-import org.apache.qpidity.njms.message.MessageFactory;
-import org.apache.qpidity.njms.message.QpidMessage;
-import org.apache.qpidity.transport.Option;
-import org.apache.qpidity.transport.RangeSet;
-
-/**
- * Implementation of JMS message consumer
- */
-public class MessageConsumerImpl extends MessageActor
- implements MessageConsumer, org.apache.qpidity.nclient.util.MessageListener
-{
- // we can receive up to 100 messages for an asynchronous listener
- public static final int MAX_MESSAGE_TRANSFERRED = 100;
-
- /**
- * This MessageConsumer's messageselector.
- */
- private String _messageSelector = null;
-
- /**
- * The message selector filter associated with this consumer message selector
- */
- private MessageFilter _filter = null;
-
- /**
- * NoLocal
- * If true, and the destination is a topic then inhibits the delivery of messages published
- * by its own connection. The behavior for NoLocal is not specified if the destination is a queue.
- */
- protected boolean _noLocal;
-
- /**
- * The subscription name
- */
- protected String _subscriptionName;
-
- /**
- * Indicates whether this consumer receives pre-acquired messages
- */
- private boolean _preAcquire = true;
-
- /**
- * A MessagePartListener set up for this consumer.
- */
- private MessageListener _messageListener;
-
- /**
- * A lcok on the syncrhonous message
- */
- private final Object _incomingMessageLock = new Object();
-
-
- /**
- * Number of mesages received asynchronously
- * Nether exceed MAX_MESSAGE_TRANSFERRED
- */
- private int _messageAsyncrhonouslyReceived = 0;
-
- private LinkedBlockingQueue The JMS specification says:
- * Setting the message listener to null is the equivalent of
- * unsetting the message listener for the message consumer.
- * The effect of calling This call blocks indefinitely until a message is produced or until this message consumer is closed.
- *
- * @return The next message produced for this message consumer, or
- * null if this message consumer is concurrently closed
- * @throws JMSException If receiving the next message fails due to some internal error.
- */
- public Message receive() throws JMSException
- {
- // Check if we can get a message immediately
- Message result;
- result = receiveNoWait();
- if (result != null)
- {
- return result;
- }
- try
- {
- // Now issue a credit and wait for the broker to send a message
- // IMO no point doing a credit() flush() and sync() in a loop.
- // This will only overload the broker. After the initial try we can wait
- // for the broker to send a message when it gets one
- requestCredit(1);
- return (Message) _queue.take();
- }
- catch (Exception e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- }
-
- /**
- * Receive the next message that arrives within the specified timeout interval.
- * This call blocks until a message arrives, the timeout expires, or this message consumer
- * is closed.
- * A timeout of zero never expires, and the call blocks indefinitely.
- * A timeout less than 0 throws a JMSException.
- *
- * @param timeout The timeout value (in milliseconds)
- * @return The next message that arrives within the specified timeout interval.
- * @throws JMSException If receiving the next message fails due to some internal error.
- */
- public Message receive(long timeout) throws JMSException
- {
- checkClosed();
- checkIfListenerSet();
- if (timeout < 0)
- {
- throw new JMSException("Invalid timeout value: " + timeout);
- }
-
- Message result;
- try
- {
- // first check if we have any in the queue already
- result = (Message) _queue.poll();
- if (result == null)
- {
- requestCredit(1);
- requestFlush();
- // We shouldn't do a sync(). Bcos the timeout can happen
- // before the sync() returns
- return (Message) _queue.poll(timeout, TimeUnit.MILLISECONDS);
- }
- else
- {
- return result;
- }
- }
- catch (Exception e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- }
-
- /**
- * Receive the next message if one is immediately available.
- *
- * @return the next message or null if one is not available.
- * @throws JMSException If receiving the next message fails due to some internal error.
- */
- public Message receiveNoWait() throws JMSException
- {
- checkClosed();
- checkIfListenerSet();
- Message result;
- try
- {
- // first check if we have any in the queue already
- result = (Message) _queue.poll();
- if (result == null)
- {
- requestCredit(1);
- requestFlush();
- requestSync();
- return (Message) _queue.poll();
- }
- else
- {
- return result;
- }
- }
- catch (Exception e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- }
-
- // not public methods
- /**
- * Upon receipt of this method, the broker adds "value"
- * number of messages to the available credit balance for this consumer.
- *
- * @param value Number of credits, a value of 0 indicates an infinite amount of credit.
- */
- private void requestCredit(int value)
- {
- getSession().getQpidSession()
- .messageFlow(getMessageActorID(), org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, value);
- }
-
- /**
- * Forces the broker to exhaust its credit supply.
- * The broker's credit will always be zero when
- * this method completes.
- */
- private void requestFlush()
- {
- getSession().getQpidSession().messageFlush(getMessageActorID());
- }
-
- /**
- * Sync method will block until all outstanding broker
- * commands
- * are executed.
- */
- private void requestSync()
- {
- getSession().getQpidSession().sync();
- }
-
- /**
- * Check whether this consumer is closed.
- *
- * @throws JMSException If this consumer is closed.
- */
- private void checkClosed() throws JMSException
- {
- if (_isStopped)
- {
- throw new JMSException("Session is closed");
- }
- }
-
- /**
- * Stop the delivery of messages to this consumer.
- * For asynchronous receiver, this operation blocks until the message listener
- * finishes processing the current message,
- *
- * @throws Exception If the consumer cannot be stopped due to some internal error.
- */
- protected void stop() throws Exception
- {
- getSession().getQpidSession().messageStop(getMessageActorID());
- _isStopped = true;
- }
-
- /**
- * Start the delivery of messages to this consumer.
- *
- * @throws Exception If the consumer cannot be started due to some internal error.
- */
- protected void start() throws Exception
- {
- synchronized (_incomingMessageLock)
- {
- _isStopped = false;
- }
- }
-
- /**
- * This method notifies this consumer that a message has been delivered
- * @param message The received message.
- */
- public void onMessage(org.apache.qpidity.api.Message message)
- {
- try
- {
- QpidMessage jmsMessage = MessageFactory.getQpidMessage(message);
- if (checkPreConditions(jmsMessage))
- {
- preApplicationProcessing(jmsMessage);
-
- if (_messageListener == null)
- {
- _queue.offer(jmsMessage);
- }
- else
- {
- // I still think we don't need that additional thread in SessionImpl
- // if the Application blocks on a message thats fine
- // getSession().dispatchMessage(getMessageActorID(), jmsMessage);
- notifyMessageListener(jmsMessage);
- }
- }
- }
- catch (Exception e)
- {
- throw new RuntimeException(e.getMessage());
- }
- }
-
-
- public void notifyMessageListener(QpidMessage message) throws RuntimeException
- {
- try
- {
- _messageAsyncrhonouslyReceived++;
- if (_messageAsyncrhonouslyReceived >= MAX_MESSAGE_TRANSFERRED)
- {
- // ask the server for the delivery of MAX_MESSAGE_TRANSFERRED more messages
- resetAsynchMessageReceived();
- }
-
- // The JMS specs says:
- /* The result of a listener throwing a RuntimeException depends on the session?s
- * acknowledgment mode.
- ? --- AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE - the message
- * will be immediately redelivered. The number of times a JMS provider will
- * redeliver the same message before giving up is provider-dependent.
- ? --- CLIENT_ACKNOWLEDGE - the next message for the listener is delivered.
- * --- Transacted Session - the next message for the listener is delivered.
- *
- * The number of time we try redelivering the message is 0
- **/
- try
- {
-
- _messageListener.onMessage((Message) message);
- }
- catch (RuntimeException re)
- {
- // do nothing as this message will not be redelivered
- }
-
-
- }
- catch (Exception e)
- {
- throw new RuntimeException(e.getMessage());
- }
- }
-
- /**
- * Check whether this consumer is asynchronous
- *
- * @throws javax.jms.IllegalStateException If this consumer is asynchronous.
- */
- private void checkIfListenerSet() throws javax.jms.IllegalStateException
- {
-
- if (_messageListener != null)
- {
- throw new javax.jms.IllegalStateException("A listener has already been set.");
- }
- }
-
- /**
- * pre process a received message.
- *
- * @param message The message to pre-process.
- * @throws Exception If the message cannot be pre-processed due to some internal error.
- */
- private void preApplicationProcessing(QpidMessage message) throws Exception
- {
- getSession().preProcessMessage(message);
- // If the session is transacted we need to ack the message first
- // This is because a message is associated with its tx only when acked
- if (getSession().getTransacted())
- {
- getSession().acknowledgeMessage(message);
- }
- message.afterMessageReceive();
- }
-
- /**
- * Check whether a message can be delivered to this consumer.
- *
- * @param message The message to be checked.
- * @return true if the message matches the selector and can be acquired, false otherwise.
- * @throws QpidException If the message preConditions cannot be checked due to some internal error.
- */
- private boolean checkPreConditions(QpidMessage message) throws QpidException
- {
- boolean messageOk = true;
- if (_messageSelector != null)
- {
- messageOk = _filter.matches((Message) message);
- }
- if (_logger.isDebugEnabled())
- {
- _logger.debug("messageOk " + messageOk);
- _logger.debug("_preAcquire " + _preAcquire);
- }
- if (!messageOk && _preAcquire)
- {
- // this is the case for topics
- // We need to ack this message
- if (_logger.isDebugEnabled())
- {
- _logger.debug("filterMessage - trying to ack message");
- }
- acknowledgeMessage(message);
- }
- else if (!messageOk)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Message not OK, releasing");
- }
- releaseMessage(message);
- }
- // now we need to acquire this message if needed
- // this is the case of queue with a message selector set
- if (!_preAcquire && messageOk)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("filterMessage - trying to acquire message");
- }
- messageOk = acquireMessage(message);
- }
- return messageOk;
- }
-
- /**
- * Release a message
- *
- * @param message The message to be released
- * @throws QpidException If the message cannot be released due to some internal error.
- */
- private void releaseMessage(QpidMessage message) throws QpidException
- {
- if (_preAcquire)
- {
- RangeSet ranges = new RangeSet();
- ranges.add(message.getMessageTransferId());
- getSession().getQpidSession().messageRelease(ranges);
- getSession().testQpidException();
- }
- }
-
- /**
- * Acquire a message
- *
- * @param message The message to be acquired
- * @return true if the message has been acquired, false otherwise.
- * @throws QpidException If the message cannot be acquired due to some internal error.
- */
- private boolean acquireMessage(QpidMessage message) throws QpidException
- {
- boolean result = false;
- if (!_preAcquire)
- {
- RangeSet ranges = new RangeSet();
- ranges.add(message.getMessageTransferId());
-
- getSession().getQpidSession()
- .messageAcquire(ranges, org.apache.qpidity.nclient.Session.MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE);
- getSession().getQpidSession().sync();
- RangeSet acquired = getSession().getQpidSession().getAccquiredMessages();
- if (acquired.size() > 0)
- {
- result = true;
- }
- getSession().testQpidException();
- }
- return result;
- }
-
- /**
- * Acknowledge a message
- *
- * @param message The message to be acknowledged
- * @throws QpidException If the message cannot be acquired due to some internal error.
- */
- private void acknowledgeMessage(QpidMessage message) throws QpidException
- {
- if (!_preAcquire)
- {
- RangeSet ranges = new RangeSet();
- ranges.add(message.getMessageTransferId());
- getSession().getQpidSession().messageAcknowledge(ranges);
- getSession().testQpidException();
- }
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java
deleted file mode 100644
index a61f4f1431..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java
+++ /dev/null
@@ -1,384 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.njms;
-
-import org.apache.qpidity.njms.message.MessageHelper;
-import org.apache.qpidity.njms.message.MessageImpl;
-import org.apache.qpidity.QpidException;
-
-import javax.jms.*;
-import java.util.UUID;
-import java.io.IOException;
-
-/**
- * Implements MessageProducer
- */
-public class MessageProducerImpl extends MessageActor implements MessageProducer
-{
- /**
- * If true, messages will not get a timestamp.
- */
- private boolean _disableTimestamps = false;
-
- /**
- * Priority of messages created by this producer.
- */
- private int _messagePriority = Message.DEFAULT_PRIORITY;
-
- /**
- * Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution.
- */
- private long _timeToLive;
-
- /**
- * Delivery mode used for this producer.
- */
- private int _deliveryMode = DeliveryMode.PERSISTENT;
-
- /**
- * Speicify whether the messageID is disable
- */
- private boolean _disableMessageId = false;
-
- //-- constructors
- public MessageProducerImpl(SessionImpl session, DestinationImpl destination)
- {
- super(session, destination,"");
- }
-
- //--- Interface javax.njms.MessageProducer
- /**
- * Sets whether message IDs are disabled.
- *
- * @param value Specify whether the MessageID must be disabled
- * @throws JMSException If disabling messageID fails due to some internal error.
- */
- public void setDisableMessageID(boolean value) throws JMSException
- {
- checkNotClosed();
- _disableMessageId = value;
- }
-
- /**
- * Gets an indication of whether message IDs are disabled.
- *
- * @return true is messageID is disabled, false otherwise
- * @throws JMSException If getting whether messagID is disabled fails due to some internal error.
- */
- public boolean getDisableMessageID() throws JMSException
- {
- checkNotClosed();
- return _disableMessageId;
- }
-
- /**
- * Sets whether message timestamps are disabled.
- * JMS spec says:
- * Since timestamps take some effort to create and increase a
- * message's size, some JMS providers may be able to optimize message
- * overhead if they are given a hint that the timestamp is not used by an
- * application....
- * these messages must have the timestamp set to zero; if the provider
- * ignores the hint, the timestamp must be set to its normal value.
- * Message timestamps are enabled by default.
- *
- * @param value Indicates if message timestamps are disabled
- * @throws JMSException if disabling the timestamps fails due to some internal error.
- */
- public void setDisableMessageTimestamp(boolean value) throws JMSException
- {
- checkNotClosed();
- _disableTimestamps = value;
- }
-
- /**
- * Gets an indication of whether message timestamps are disabled.
- *
- * @return an indication of whether message timestamps are disabled
- * @throws JMSException if getting whether timestamps are disabled fails due to some internal error.
- */
- public boolean getDisableMessageTimestamp() throws JMSException
- {
- checkNotClosed();
- return _disableTimestamps;
- }
-
- /**
- * Sets the producer's default delivery mode.
- * JMS specification says:
- * Delivery mode is set to {@link DeliveryMode#PERSISTENT} by default.
- *
- * @param deliveryMode The message delivery mode for this message producer; legal
- * values are {@link DeliveryMode#NON_PERSISTENT}
- * and {@link DeliveryMode#PERSISTENT}.
- * @throws JMSException if setting the delivery mode fails due to some internal error.
- */
- public void setDeliveryMode(int deliveryMode) throws JMSException
- {
- checkNotClosed();
- if ((deliveryMode != DeliveryMode.NON_PERSISTENT) && (deliveryMode != DeliveryMode.PERSISTENT))
- {
- throw new JMSException(
- "DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + deliveryMode + " is illegal");
- }
- _deliveryMode = deliveryMode;
- }
-
- /**
- * Gets the producer's delivery mode.
- *
- * @return The message delivery mode for this message producer
- * @throws JMSException If getting the delivery mode fails due to some internal error.
- */
- public int getDeliveryMode() throws JMSException
- {
- checkNotClosed();
- return _deliveryMode;
- }
-
- /**
- * Sets the producer's message priority.
- * The njms spec says:
- * The JMS API defines ten levels of priority value, with 0 as the
- * lowest priority and 9 as the highest. Clients should consider priorities
- * 0-4 as gradations of normal priority and priorities 5-9 as gradations
- * of expedited priority.
- * Priority is set to 4 by default.
- *
- * @param priority The message priority for this message producer; must be a value between 0 and 9
- * @throws JMSException if setting this producer priority fails due to some internal error.
- */
- public void setPriority(int priority) throws JMSException
- {
- checkNotClosed();
- if ((priority < 0) || (priority > 9))
- {
- throw new IllegalArgumentException(
- "Priority of " + priority + " is illegal. Value must be in range 0 to 9");
- }
- _messagePriority = priority;
- }
-
- /**
- * Gets the producer's message priority.
- *
- * @return The message priority for this message producer.
- * @throws JMSException If getting this producer message priority fails due to some internal error.
- */
- public int getPriority() throws JMSException
- {
- checkNotClosed();
- return _messagePriority;
- }
-
- /**
- * Sets the default length of time in milliseconds from its dispatch time
- * that a produced message should be retained by the message system.
- * The JMS spec says that time to live must be set to zero by default.
- *
- * @param timeToLive The message time to live in milliseconds; zero is unlimited
- * @throws JMSException If setting the default time to live fails due to some internal error.
- */
- public void setTimeToLive(long timeToLive) throws JMSException
- {
- checkNotClosed();
- if (timeToLive < 0)
- {
- throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + timeToLive);
- }
- _timeToLive = timeToLive;
- }
-
- /**
- * Gets the default length of time in milliseconds from its dispatch time
- * that a produced message should be retained by the message system.
- *
- * @return The default message time to live in milliseconds; zero is unlimited
- * @throws JMSException if getting the default time to live fails due to some internal error.
- * @see javax.jms.MessageProducer#setTimeToLive
- */
- public long getTimeToLive() throws JMSException
- {
- checkNotClosed();
- return _timeToLive;
- }
-
- /**
- * Gets the destination associated with this producer.
- *
- * @return This producer's destination.
- * @throws JMSException If getting the destination for this producer fails
- * due to some internal error.
- */
- public Destination getDestination() throws JMSException
- {
- checkNotClosed();
- return _destination;
- }
-
- /**
- * Sends a message using the producer's default delivery mode, priority, destination
- * and time to live.
- *
- * @param message the message to be sent
- * @throws JMSException If sending the message fails due to some internal error.
- * @throws MessageFormatException If an invalid message is specified.
- * @throws InvalidDestinationException If this producer destination is invalid.
- * @throws java.lang.UnsupportedOperationException
- * If a client uses this method with a producer that did
- * not specify a destination at creation time.
- */
- public void send(Message message) throws JMSException
- {
- send(message, _deliveryMode, _messagePriority, _timeToLive);
- }
-
- /**
- * Sends a message to this producer default destination, specifying delivery mode,
- * priority, and time to live.
- *
- * @param message The message to send
- * @param deliveryMode The delivery mode to use
- * @param priority The priority for this message
- * @param timeToLive The message's lifetime (in milliseconds)
- * @throws JMSException If sending the message fails due to some internal error.
- * @throws MessageFormatException If an invalid message is specified.
- * @throws InvalidDestinationException If this producer's destination is invalid.
- * @throws java.lang.UnsupportedOperationException
- * If a client uses this method with a producer that did
- * not specify a destination at creation time.
- */
- public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
- {
- send(_destination, message, deliveryMode, priority, timeToLive);
- }
-
- /**
- * Sends a message to a specified destination using this producer's default
- * delivery mode, priority and time to live.
- * Typically, a message producer is assigned a destination at creation
- * time; however, the JMS API also supports unidentified message producers,
- * which require that the destination be supplied every time a message is
- * sent.
- *
- * @param destination The destination to send this message to
- * @param message The message to send
- * @throws JMSException If sending the message fails due to some internal error.
- * @throws MessageFormatException If an invalid message is specified.
- * @throws InvalidDestinationException If an invalid destination is specified.
- */
- public void send(Destination destination, Message message) throws JMSException
- {
- send(destination, message, _deliveryMode, _messagePriority, _timeToLive);
- }
-
- /**
- * Sends a message to a destination specifying delivery mode, priority and time to live.
- *
- * @param destination The destination to send this message to.
- * @param message The message to be sent.
- * @param deliveryMode The delivery mode to use.
- * @param priority The priority for this message.
- * @param timeToLive The message's lifetime (in milliseconds)
- * @throws JMSException If sending the message fails due to some internal error.
- * @throws MessageFormatException If an invalid message is specified.
- * @throws InvalidDestinationException If an invalid destination is specified.
- */
- public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
- throws JMSException
- {
- checkNotClosed();
- getSession().checkDestination(destination);
- // Do not allow negative timeToLive values
- if (timeToLive < 0)
- {
- throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + timeToLive);
- }
- // Only get current time if required
- long currentTime = Long.MIN_VALUE;
- if (!((timeToLive == 0) && _disableTimestamps))
- {
- currentTime = System.currentTimeMillis();
- }
- // the messae UID
- String uid = (_disableMessageId) ? "MSG_ID_DISABLED" : UUID.randomUUID().toString();
- MessageImpl qpidMessage;
- // check that the message is not a foreign one
- try
- {
- qpidMessage = (MessageImpl) message;
- }
- catch (ClassCastException cce)
- {
- // this is a foreign message
- qpidMessage = MessageHelper.transformMessage(message);
- // set message's properties in case they are queried after send.
- message.setJMSDestination(destination);
- message.setJMSDeliveryMode(deliveryMode);
- message.setJMSPriority(priority);
- message.setJMSMessageID(uid);
- if (timeToLive != 0)
- {
- message.setJMSExpiration(timeToLive + currentTime);
- _logger.debug("Setting JMSExpiration:" + message.getJMSExpiration());
- }
- else
- {
- message.setJMSExpiration(timeToLive);
- }
- message.setJMSTimestamp(currentTime);
- }
- // set the message properties
- qpidMessage.setJMSDestination(destination);
- qpidMessage.setJMSMessageID(uid);
- qpidMessage.setJMSDeliveryMode(deliveryMode);
- qpidMessage.setJMSPriority(priority);
- if (timeToLive != 0)
- {
- qpidMessage.setJMSExpiration(timeToLive + currentTime);
- }
- else
- {
- qpidMessage.setJMSExpiration(timeToLive);
- }
- qpidMessage.setJMSTimestamp(currentTime);
- qpidMessage.setRoutingKey(((DestinationImpl) destination).getDestinationName());
- qpidMessage.setExchangeName(((DestinationImpl) destination).getExchangeName());
- // call beforeMessageDispatch
- try
- {
- qpidMessage.beforeMessageDispatch();
- }
- catch (QpidException e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- try
- {
- getSession().getQpidSession().messageTransfer(qpidMessage.getExchangeName(),
- qpidMessage.getQpidityMessage(),
- org.apache.qpidity.nclient.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
- org.apache.qpidity.nclient.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE);
- }
- catch (IOException e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QpidBrowserListener.java b/java/client/src/main/java/org/apache/qpidity/jms/QpidBrowserListener.java
deleted file mode 100644
index 59e772db85..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/jms/QpidBrowserListener.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.njms;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.qpidity.api.Message;
-import org.apache.qpidity.nclient.util.MessageListener;
-
-/**
- * This listener idspatches messaes to its browser.
- */
-public class QpidBrowserListener implements MessageListener
-{
- /**
- * Used for debugging.
- */
- private static final Logger _logger = LoggerFactory.getLogger(SessionImpl.class);
-
- /**
- * This message listener's browser.
- */
- QueueBrowserImpl _browser = null;
-
- //---- constructor
- /**
- * Create a message listener wrapper for a given browser
- *
- * @param browser The browser of this listener
- */
- public QpidBrowserListener(QueueBrowserImpl browser)
- {
- _browser = browser;
- }
-
- //---- org.apache.qpidity.MessagePartListener API
- /**
- * Deliver a message to the listener.
- *
- * @param message The message delivered to the listner.
- */
- public void onMessage(Message message)
- {
- try
- {
- //convert this message into a JMS one
- javax.jms.Message jmsMessage = null; // todo
- _browser.receiveMessage(jmsMessage);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e.getMessage());
- }
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QpidExceptionListenerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QpidExceptionListenerImpl.java
deleted file mode 100644
index 0b6068f05d..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/jms/QpidExceptionListenerImpl.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.njms;
-
-import org.apache.qpidity.QpidException;
-
-import javax.jms.JMSException;
-
-/**
- * An exception listner
- */
-public class QpidExceptionListenerImpl //implements ExceptionListener
-{
- private javax.jms.ExceptionListener _jmsExceptionListener;
-
- public QpidExceptionListenerImpl()
- {
- }
-
- void setJMSExceptionListner(javax.jms.ExceptionListener jmsExceptionListener)
- {
- _jmsExceptionListener = jmsExceptionListener;
- }
- //----- ExceptionListener API
-
- public void onException(QpidException exception)
- {
- // convert this exception in a JMS exception
- JMSException jmsException = ExceptionHelper.convertQpidExceptionToJMSException(exception);
- // propagate to the njms exception listener
- if (_jmsExceptionListener != null)
- {
- _jmsExceptionListener.onException(jmsException);
- }
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
deleted file mode 100644
index 4478048528..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.njms;
-
-import java.util.Enumeration;
-import java.util.NoSuchElementException;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.Queue;
-import javax.jms.QueueBrowser;
-
-import org.apache.qpidity.nclient.MessagePartListener;
-import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
-import org.apache.qpidity.filter.JMSSelectorFilter;
-import org.apache.qpidity.filter.MessageFilter;
-
-/**
- * Implementation of the JMS QueueBrowser interface
- */
-public class QueueBrowserImpl extends MessageActor implements QueueBrowser
-{
- /**
- * The browsers MessageSelector.
- */
- private String _messageSelector = null;
-
- /**
- * The message selector filter associated with this browser
- */
- private MessageFilter _filter = null;
-
- /**
- * The batch of messages to browse.
- */
- private Message[] _messages;
-
- /**
- * The number of messages read from current batch.
- */
- private int _browsed = 0;
-
- /**
- * The number of messages received from current batch.
- */
- private int _received = 0;
-
- /**
- * Indicates whether the last message has been received.
- */
- private int _batchLength;
-
- /**
- * The batch max size
- */
- private final int _maxbatchlength = 10;
-
- //--- constructor
-
- /**
- * Create a QueueBrowser for a specific queue and a given message selector.
- *
- * @param session The session of this browser.
- * @param queue The queue name for this browser
- * @param messageSelector only messages with properties matching the message selector expression are delivered.
- * @throws Exception In case of internal problem when creating this browser.
- */
- protected QueueBrowserImpl(SessionImpl session, Queue queue, String messageSelector,String consumerTag) throws Exception
- {
- super(session, (DestinationImpl) queue,consumerTag);
- // this is an array representing a batch of messages for this browser.
- _messages = new Message[_maxbatchlength];
- if (messageSelector != null)
- {
- _messageSelector = messageSelector;
- _filter = new JMSSelectorFilter(messageSelector);
- }
- MessagePartListener messageAssembler = new MessagePartListenerAdapter(new QpidBrowserListener(this));
- // this is a queue we expect that this queue exists
- getSession().getQpidSession()
- .messageSubscribe(queue.getQueueName(), getMessageActorID(),
- org.apache.qpidity.nclient.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
- // We do not acquire those messages
- org.apache.qpidity.nclient.Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE, messageAssembler, null);
-
- }
-
- //--- javax.njms.QueueBrowser API
- /**
- * Get an enumeration for browsing the current queue messages in the order they would be received.
- *
- * @return An enumeration for browsing the messages
- * @throws JMSException If getting the enumeration for this browser fails due to some internal error.
- */
- public Enumeration getEnumeration() throws JMSException
- {
- requestMessages();
- return new MessageEnumeration();
- }
-
-
- /**
- * Get the queue associated with this queue browser.
- *
- * @return The queue associated with this queue browser.
- * @throws JMSException If getting the queue associated with this browser failts due to some internal error.
- */
- public Queue getQueue() throws JMSException
- {
- checkNotClosed();
- return (Queue) _destination;
- }
-
- /**
- * Get this queue browser's message selector expression.
- *
- * @return This queue browser's message selector, or null if no message selector exists.
- * @throws JMSException if getting the message selector for this browser fails due to some internal error.
- */
- public String getMessageSelector() throws JMSException
- {
- checkNotClosed();
- return _messageSelector;
- }
-
- //-- overwritten methods.
- /**
- * Closes the browser and deregister it from its session.
- *
- * @throws JMSException if the MessaeActor cannot be closed due to some internal error.
- */
- public void close() throws JMSException
- {
- synchronized (_messages)
- {
- _received = 0;
- _browsed = 0;
- _batchLength = 0;
- _messages.notify();
- }
- super.close();
- }
-
- //-- nonpublic methods
- /**
- * Request _maxbatchlength messages
- *
- * @throws JMSException If requesting more messages fails due to some internal error.
- */
- private void requestMessages() throws JMSException
- {
- _browsed = 0;
- _received = 0;
- // request messages
- int received = 0;
- getSession().getQpidSession()
- .messageFlow(getMessageActorID(), org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
- _maxbatchlength);
- _batchLength = 0; //getSession().getQpidSession().messageFlush(getMessageActorID());
- }
-
- /**
- * This method is invoked by the listener when a message is dispatched to this browser.
- *
- * @param m A received message
- */
- protected void receiveMessage(Message m)
- {
- synchronized (_messages)
- {
- _messages[_received] = m;
- _received++;
- _messages.notify();
- }
- }
-
- //-- inner class
- /**
- * This is an implementation of the Enumeration interface.
- */
- private class MessageEnumeration implements Enumeration
- {
- /*
- * Whether this enumeration has any more elements.
- *
- * @return True if there any more elements.
- */
- public boolean hasMoreElements()
- {
- boolean result = false;
- // Try to work out whether there are any more messages available.
- try
- {
- if (_browsed >= _maxbatchlength)
- {
- requestMessages();
- }
- synchronized (_messages)
- {
- while (_received == _browsed && _batchLength > _browsed)
- {
- // we expect more messages
- _messages.wait();
- }
- if (_browsed < _received && _batchLength != _browsed)
- {
- result = true;
- }
- }
- }
- catch (Exception e)
- {
- // If no batch could be returned, the result should be false, therefore do nothing
- }
- return result;
- }
-
- /**
- * Get the next message element
- *
- * @return The next element.
- */
- public Object nextElement()
- {
- if (hasMoreElements())
- {
- synchronized (_messages)
- {
- Message message = _messages[_browsed];
- _browsed = _browsed + 1;
- return message;
- }
- }
- else
- {
- throw new NoSuchElementException();
- }
- }
- }
-
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueConnectionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueConnectionImpl.java
deleted file mode 100644
index cab683b0a4..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/jms/QueueConnectionImpl.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.njms;
-
-import org.apache.qpidity.njms.ConnectionImpl;
-import org.apache.qpidity.QpidException;
-
-import javax.jms.QueueConnection;
-
-/**
- *
- * Implements javax.njms.QueueConnection
- */
-public class QueueConnectionImpl extends ConnectionImpl implements QueueConnection
-{
- //-- constructor
- public QueueConnectionImpl(String host,int port,String virtualHost,String username,String password) throws QpidException
- {
- super(host, port, virtualHost, username, password);
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java
deleted file mode 100644
index 43e8e9e43f..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.njms;
-
-import org.apache.qpidity.QpidException;
-import org.apache.qpidity.transport.Option;
-import org.apache.qpidity.url.BindingURL;
-import org.apache.qpidity.exchange.ExchangeDefaults;
-
-import javax.jms.Queue;
-import javax.jms.JMSException;
-
-/**
- * Implementation of the JMS Queue interface
- */
-public class QueueImpl extends DestinationImpl implements Queue
-{
- //--- Constructor
- /**
- * Create a new QueueImpl with a given name.
- *
- * @param name The name of this queue.
- * @param session The session used to create this queue.
- * @throws QpidException If the queue name is not valid
- */
- protected QueueImpl(SessionImpl session, String name) throws QpidException
- {
- super(name);
- _queueName = name;
- _destinationName = name;
- _exchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
- _exchangeType = ExchangeDefaults.DIRECT_EXCHANGE_CLASS;
- _isAutoDelete = false;
- _isDurable = true;
- _isExclusive = false;
- registerQueue(session, false);
- }
-
- /**
- * Create a destiantion from a binding URL
- *
- * @param session The session used to create this queue.
- * @param binding The URL
- * @throws QpidException If the URL is not valid
- */
- protected QueueImpl(SessionImpl session, BindingURL binding) throws QpidException
- {
- super(binding);
- registerQueue(session, false);
- }
-
- /**
- * Create a destiantion from a binding URL
- *
- * @param binding The URL
- * @throws QpidException If the URL is not valid
- */
- public QueueImpl(BindingURL binding) throws QpidException
- {
- super(binding);
- }
-
- /**
- * Create a new QueueImpl with a given name.
- *
- * @param name The name of this queue.
- * @throws QpidException If the queue name is not valid
- */
- public QueueImpl(String name) throws QpidException
- {
- super(name);
- _queueName = name;
- _destinationName = name;
- _exchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
- _exchangeType = ExchangeDefaults.DIRECT_EXCHANGE_CLASS;
- _isAutoDelete = false;
- _isDurable = true;
- _isExclusive = false;
- }
-
- //---- Interface javax.njms.Queue
- /**
- * Gets the name of this queue.
- *
- * @return This queue's name.
- */
- public String getQueueName() throws JMSException
- {
- return _queueName;
- }
-
- //---Private methods
- /**
- * Check that this queue exists and declare it if required.
- *
- * @param session The session used to create this destination
- * @param declare Specify whether the queue should be declared
- * @throws QpidException If this queue does not exists on the broker.
- */
- protected void registerQueue(SessionImpl session, boolean declare) throws QpidException
- {
- // test if this exchange exist on the broker
- //todo we can also specify if the excahnge is autodlete and durable
- session.getQpidSession().exchangeDeclare(_exchangeName, _exchangeType, null, null, Option.PASSIVE);
- // wait for the broker response
- session.getQpidSession().sync();
- // If this exchange does not exist then we will get an Expection 404 does notexist
- //todo check for an execption
- // now check if the queue exists
- session.getQpidSession().queueDeclare(_queueName, null, null, _isDurable ? Option.DURABLE : Option.NO_OPTION,
- _isAutoDelete ? Option.AUTO_DELETE : Option.NO_OPTION,
- _isExclusive ? Option.EXCLUSIVE : Option.NO_OPTION,
- declare ? Option.PASSIVE : Option.NO_OPTION);
- // wait for the broker response
- session.getQpidSession().sync();
- // If this queue does not exist then we will get an Expection 404 does notexist
- session.getQpidSession().queueBind(_queueName, _exchangeName, _destinationName, null);
- // we don't have to sync as we don't expect an error
- }
-
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java
deleted file mode 100644
index 67db7e5001..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.njms;
-
-import javax.jms.QueueReceiver;
-import javax.jms.JMSException;
-import javax.jms.Queue;
-
-/**
- * Implements javax.njms.QueueReceiver
- */
-public class QueueReceiverImpl extends MessageConsumerImpl implements QueueReceiver
-{
- //--- Constructor
- /**
- * create a new QueueReceiverImpl.
- *
- * @param session The session from which the QueueReceiverImpl is instantiated.
- * @param queue The default queue for this QueueReceiverImpl.
- * @param messageSelector the message selector for this QueueReceiverImpl.
- * @throws Exception If the QueueReceiverImpl cannot be created due to some internal error.
- */
- protected QueueReceiverImpl(SessionImpl session, Queue queue, String messageSelector,String consumerTag) throws Exception
- {
- super(session, (DestinationImpl) queue, messageSelector, false, null,consumerTag);
- }
-
- //--- Interface QueueReceiver
- /**
- * Get the Queue associated with this queue receiver.
- *
- * @return this receiver's Queue
- * @throws JMSException If getting the queue for this queue receiver fails due to some internal error.
- */
- public Queue getQueue() throws JMSException
- {
- checkNotClosed();
- return (QueueImpl) _destination;
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueSenderImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueSenderImpl.java
deleted file mode 100644
index d03bb3a6b7..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/jms/QueueSenderImpl.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.njms;
-
-import javax.jms.QueueSender;
-import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.Message;
-
-/**
- * Implements javax.njms.QueueSender
- */
-public class QueueSenderImpl extends MessageProducerImpl implements QueueSender
-{
- //--- Constructor
- /**
- * Create a new QueueSenderImpl.
- *
- * @param session the session from which the QueueSenderImpl is instantiated
- * @param queue the default queue for this QueueSenderImpl
- * @throws JMSException If the QueueSenderImpl cannot be created due to some internal error.
- */
- protected QueueSenderImpl(SessionImpl session, QueueImpl queue) throws JMSException
- {
- super(session, queue);
- }
-
- //--- Interface javax.njms.QueueSender
- /**
- * Get the queue associated with this QueueSender.
- *
- * @return This QueueSender's queue
- * @throws JMSException If getting the queue for this QueueSender fails due to some internal error.
- */
- public Queue getQueue() throws JMSException
- {
- return (Queue) getDestination();
- }
-
- /**
- * Sends a message to the queue. Uses the The acknowledgement mode is set at the time that the session is created.
- * If the session is transacted, the acknowledgement mode is ignored.
- *
- * @return If the session is not transacted, returns the current acknowledgement mode for the session.
- * else returns SESSION_TRANSACTED.
- * @throws JMSException if geting the acknowledgement mode fails due to some internal error.
- */
- public int getAcknowledgeMode() throws JMSException
- {
- checkNotClosed();
- return _acknowledgeMode;
- }
-
- /**
- * Commits all messages done in this transaction.
- *
- * @throws JMSException If committing the transaction fails due to some internal error.
- * @throws TransactionRolledBackException If the transaction is rolled back due to some internal error during commit.
- * @throws javax.jms.IllegalStateException
- * If the method is not called by a transacted session.
- */
- public void commit() throws JMSException
- {
- checkNotClosed();
- //make sure the Session is a transacted one
- if (!_transacted)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Cannot commit non-transacted session, throwing IllegalStateException");
- }
- throw new IllegalStateException("Cannot commit non-transacted session", "Session is not transacted");
- }
- // commit the underlying Qpid Session
- _qpidSession.txCommit();
- try
- {
- testQpidException();
- }
- catch (QpidException e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- }
-
- /**
- * Rolls back any messages done in this transaction.
- *
- * @throws JMSException If rolling back the session fails due to some internal error.
- * @throws javax.jms.IllegalStateException
- * If the method is not called by a transacted session.
- */
- public void rollback() throws JMSException
- {
- checkNotClosed();
- //make sure the Session is a transacted one
- if (!_transacted)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Cannot rollback non-transacted session, throwing IllegalStateException");
- }
- throw new IllegalStateException("Cannot rollback non-transacted session", "Session is not transacted");
- }
- // rollback the underlying Qpid Session
- _qpidSession.txRollback();
- try
- {
- testQpidException();
- }
- catch (QpidException e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- }
-
- /**
- * Closes this session.
- * The JMS specification says
- * This call will block until a Closing a transacted session must roll back the transaction in progress.
- * This method is the only Invoking any other Closing a closed session must not throw an exception.
- *
- * @throws JMSException If closing the session fails due to some internal error.
- */
- public synchronized void close() throws JMSException
- {
- if (!_isClosed)
- {
- _messageDispatcherThread.interrupt();
- if (!_isClosing)
- {
- _isClosing = true;
- // if the session is stopped then restart it before notifying on the lock
- // that will stop the sessionThread
- if (_isStopped)
- {
- startDispatchThread();
- }
- //notify the sessionThread
- synchronized (_incomingAsynchronousMessages)
- {
- _incomingAsynchronousMessages.notifyAll();
- }
-
- try
- {
- _messageDispatcherThread.join();
- _messageDispatcherThread = null;
- }
- catch (InterruptedException ie)
- {
- /* ignore */
- }
- }
- // from now all the session methods will throw a IllegalStateException
- _isClosed = true;
- // close all the actors
- closeAllMessageActors();
- _messageActors.clear();
- // We may have a thread trying to add a message
- synchronized (_incomingAsynchronousMessages)
- {
- _incomingAsynchronousMessages.clear();
- _incomingAsynchronousMessages.notifyAll();
- }
- // close the underlaying QpidSession
- _qpidSession.sessionClose();
- try
- {
- testQpidException();
- }
- catch (QpidException e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- }
- }
-
- /**
- * Stops message delivery in this session, and restarts message delivery with
- * the oldest unacknowledged message.
- * Recovering a session causes it to take the following actions:
- * This is an expert facility used only by Application Servers.
- * This is an optional operation that is not yet supported
- *
- * @return The message listener associated with this session.
- * @throws JMSException If getting the message listener fails due to an internal error.
- */
- public MessageListener getMessageListener() throws JMSException
- {
- checkNotClosed();
- if (_logger.isDebugEnabled())
- {
- _logger.debug(
- "Getting session's distinguished message listener, not supported," + " throwing UnsupportedOperationException");
- }
- throw new UnsupportedOperationException();
- }
-
- /**
- * Sets the session's distinguished message listener.
- * This is an expert facility used only by Application Servers.
- * This is an optional operation that is not yet supported
- *
- * @param messageListener The message listener to associate with this session
- * @throws JMSException If setting the message listener fails due to an internal error.
- */
- public void setMessageListener(MessageListener messageListener) throws JMSException
- {
- checkNotClosed();
- if (_logger.isDebugEnabled())
- {
- _logger.debug(
- "Setting the session's distinguished message listener, not supported," + " throwing UnsupportedOperationException");
- }
- throw new UnsupportedOperationException();
- }
-
- /**
- * Optional operation, intended to be used only by Application Servers,
- * not by ordinary JMS clients.
- * This is an optional operation that is not yet supported
- */
- public void run()
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Running this session, not supported," + " throwing UnsupportedOperationException");
- }
- throw new UnsupportedOperationException();
- }
-
- /**
- * Creates a MessageProducer to send messages to the specified destination.
- *
- * @param destination the Destination to send messages to, or null if this is a producer
- * which does not have a specified destination.
- * @return A new MessageProducer
- * @throws JMSException If the session fails to create a MessageProducer
- * due to some internal error.
- * @throws InvalidDestinationException If an invalid destination is specified.
- */
- public MessageProducer createProducer(Destination destination) throws JMSException
- {
- checkNotClosed();
- MessageProducerImpl producer = new MessageProducerImpl(this, (DestinationImpl) destination);
- // register this actor with the session
- _messageActors.put(producer.getMessageActorID(), producer);
- return producer;
- }
-
- /**
- * Creates a MessageConsumer for the specified destination.
- *
- * @param destination The This method can specify whether messages published by its own connection should
- * be delivered to it, if the destination is a topic.
- * In some cases, a connection may both publish and subscribe to a topic. The consumer
- * NoLocal attribute allows a consumer to inhibit the delivery of messages published by its
- * own connection. The default value for this attribute is False.
- *
- * @param destination The This facility is provided for the rare cases where clients need to
- * dynamically manipulate queue identity. It allows the creation of a
- * queue identity with a provider-specific name. Clients that depend
- * on this ability are not portable.
- * Note that this method is not for creating the physical queue.
- * The physical creation of queues is an administrative task and is not
- * to be initiated by the JMS API. The one exception is the
- * creation of temporary queues, which is accomplished with the
- * This facility is provided for the rare cases where clients need to
- * dynamically manipulate queue identity. It allows the creation of a
- * queue identity with a provider-specific name. Clients that depend
- * on this ability are not portable.
- * Note that this method is not for creating the physical queue.
- * The physical creation of queues is an administrative task and is not
- * to be initiated by the JMS API. The one exception is the
- * creation of temporary queues, which is accomplished with the
- * A client can change an existing durable subscription by creating a durable This method deletes the state being maintained on behalf of the
- * subscriber by its provider.
- * It is erroneous for a client to delete a durable subscription
- * while there is an active This method is called when an actor is independently closed.
- *
- * @param messageActor The closed actor.
- */
- protected void closeMessageActor(MessageActor messageActor)
- {
- _messageActors.remove(messageActor.getMessageActorID());
- }
-
- /**
- * Idincates whether this session is stopped.
- *
- * @return True is this session is stopped, false otherwise.
- */
- protected boolean isStopped()
- {
- return _isStopped;
- }
-
- /**
- * Start the flow of message to this session.
- *
- * @throws Exception If starting the session fails due to some communication error.
- */
- protected synchronized void start() throws Exception
- {
- if (_isStopped)
- {
- // start all the MessageActors
- for (MessageActor messageActor : _messageActors.values())
- {
- messageActor.start();
- }
- startDispatchThread();
- }
- }
-
- /**
- * Restart delivery of asynch messages
- */
- private void startDispatchThread()
- {
- synchronized (_stoppingLock)
- {
- _isStopped = false;
- _stoppingLock.notify();
- }
- synchronized (_stoppingJoin)
- {
- _hasStopped = false;
- }
- }
-
- /**
- * Stop the flow of message to this session.
- *
- * @throws Exception If stopping the session fails due to some communication error.
- */
- protected synchronized void stop() throws Exception
- {
- if (!_isClosing && !_isStopped)
- {
- // stop all the MessageActors
- for (MessageActor messageActor : _messageActors.values())
- {
- messageActor.stop();
- }
- synchronized (_incomingAsynchronousMessages)
- {
- _isStopped = true;
- // unlock the sessionThread that will then wait on _stoppingLock
- _incomingAsynchronousMessages.notifyAll();
- }
- // wait for the sessionThread to stop processing messages
- synchronized (_stoppingJoin)
- {
- while (!_hasStopped)
- {
- try
- {
- _stoppingJoin.wait();
- }
- catch (InterruptedException e)
- {
- /* ignore */
- }
- }
- }
- }
- }
-
- /**
- * Notify this session that a message is processed
- *
- * @param message The processed message.
- */
- protected void preProcessMessage(QpidMessage message)
- {
- _inRecovery = false;
- }
-
- /**
- * Dispatch this message to this session asynchronous consumers
- *
- * @param consumerID The consumer ID.
- * @param message The message to be dispatched.
- */
- public void dispatchMessage(String consumerID, QpidMessage message)
- {
- synchronized (_incomingAsynchronousMessages)
- {
- _incomingAsynchronousMessages.addLast(new IncomingMessage(consumerID, message));
- _incomingAsynchronousMessages.notifyAll();
- }
- }
-
- /**
- * Indicate whether this session is recovering .
- *
- * @return true if this session is recovering.
- */
- protected boolean isInRecovery()
- {
- return _inRecovery;
- }
-
- /**
- * Validate that the Session is not closed.
- * We can use an ack heuristic for dups ok mode where bunch of messages are ack.
- * This has to be done.
- *
- * @param message The message to be acknowledged.
- * @throws JMSException If the message cannot be acknowledged due to an internal error.
- */
- protected void acknowledgeMessage(QpidMessage message) throws JMSException
- {
- if (getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
- {
- // messages will be acknowldeged by the client application.
- // store this message for acknowledging it afterward
- synchronized (_unacknowledgedMessages)
- {
- _unacknowledgedMessages.add(message);
- }
- }
- else
- {
- // acknowledge this message
- RangeSet ranges = new RangeSet();
- ranges.add(message.getMessageTransferId());
- getQpidSession().messageAcknowledge(ranges);
- }
- //tobedone: Implement DUPS OK heuristic
- }
-
- /**
- * This method is called when a message is acked.
- * Acknowledgment of a message automatically acknowledges all
- * messages previously received by the session. Clients may
- * individually acknowledge messages or they may choose to acknowledge
- * messages in application defined groups (which is done by acknowledging
- * the last received message in the group).
- *
- * @throws JMSException If this method is called on a closed session.
- */
- public void acknowledge() throws JMSException
- {
- checkNotClosed();
- if (getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
- {
- synchronized (_unacknowledgedMessages)
- {
- for (QpidMessage message : _unacknowledgedMessages)
- {
- // acknowledge this message
- RangeSet ranges = new RangeSet();
- ranges.add(message.getMessageTransferId());
- getQpidSession().messageAcknowledge(ranges);
- }
- //empty the list of unack messages
- _unacknowledgedMessages.clear();
- }
- }
- //else there is no effect
- }
-
- /**
- * Access to the underlying Qpid Session
- *
- * @return The associated Qpid Session.
- */
- protected org.apache.qpidity.nclient.Session getQpidSession()
- {
- return _qpidSession;
- }
-
- /**
- * Get this session's conneciton
- *
- * @return This session's connection
- */
- protected ConnectionImpl getConnection()
- {
- return _connection;
- }
-
- /**
- * sync and return the potential exception
- *
- * @throws QpidException If an exception has been thrown by the broker.
- */
- protected void testQpidException() throws QpidException
- {
- //_qpidSession.sync();
- QpidException qe = getCurrentException();
- if (qe != null)
- {
- throw qe;
- }
- }
-
- //------ Private Methods
- /**
- * Close the producer and the consumers of this session
- *
- * @throws JMSException If one of the MessaeActor cannot be closed due to some internal error.
- */
- private void closeAllMessageActors() throws JMSException
- {
- for (MessageActor messageActor : _messageActors.values())
- {
- messageActor.closeMessageActor();
- }
- }
-
- /**
- * create and start the MessageDispatcherThread.
- */
- private synchronized void initMessageDispatcherThread()
- {
- // Create and start a MessageDispatcherThread
- // This thread is dispatching messages to the async consumers
- _messageDispatcherThread = new MessageDispatcherThread();
- _messageDispatcherThread.start();
- }
-
- //------ Inner classes
-
- /**
- * Lstener for qpid protocol exceptions
- */
- private class QpidSessionExceptionListener implements org.apache.qpidity.nclient.ExceptionListener
- {
- public void onException(QpidException exception)
- {
- synchronized (this)
- {
- //todo check the error code for finding out if we need to notify the
- // JMS connection exception listener
- _currentException = exception;
- }
- }
- }
-
- /**
- * Convenient class for storing incoming messages associated with a consumer ID.
- * Those messages are enqueued in _incomingAsynchronousMessages
- */
- private class IncomingMessage
- {
- // The consumer ID
- private String _consumerId;
- // The message
- private QpidMessage _message;
-
- //-- constructor
- /**
- * Creat a new incoming message
- *
- * @param consumerId The consumer ID
- * @param message The message to be delivered
- */
- IncomingMessage(String consumerId, QpidMessage message)
- {
- _consumerId = consumerId;
- _message = message;
- }
-
- // Getters
- /**
- * Get the consumer ID
- *
- * @return The consumer ID for this message
- */
- public String getConsumerId()
- {
- return _consumerId;
- }
-
- /**
- * Get the message.
- *
- * @return The message.
- */
- public QpidMessage getMessage()
- {
- return _message;
- }
- }
-
- /**
- * A MessageDispatcherThread is attached to every SessionImpl.
- * Messages have to be dispatched serially.
- */
- private class MessageDispatcherThread extends Thread
- {
- //--- Constructor
- /**
- * Create a Deamon thread for dispatching messages to this session listeners.
- */
- MessageDispatcherThread()
- {
- super("MessageDispatcher");
- // this thread is Deamon
- setDaemon(true);
- }
-
- /**
- * Use to run this thread.
- */
- public void run()
- {
- IncomingMessage message = null;
- // deliver messages to asynchronous consumers until the stop flag is set.
- do
- {
- // When this session is not closing and and stopped
- // then this thread needs to wait until messages are delivered.
- synchronized (_incomingAsynchronousMessages)
- {
- while (!_isClosing && !_isStopped && _incomingAsynchronousMessages.isEmpty())
- {
- try
- {
- _incomingAsynchronousMessages.wait();
- }
- catch (InterruptedException ie)
- {
- /* ignore */
- }
- }
- }
- // If this session is stopped then we need to wait on the stoppingLock
- synchronized (_stoppingLock)
- {
- try
- {
- while (_isStopped)
- {
- // if the session is stopped we have to notify the stopper thread
- synchronized (_stoppingJoin)
- {
- _hasStopped = true;
- _stoppingJoin.notify();
- }
- _stoppingLock.wait();
- }
- }
- catch (Exception ie)
- {
- /* ignore */
- }
- }
- synchronized (_incomingAsynchronousMessages)
- {
- if (!_isClosing && !_incomingAsynchronousMessages.isEmpty())
- {
- message = _incomingAsynchronousMessages.getFirst();
- }
- }
-
- if (message != null)
- {
- MessageConsumerImpl mc;
- synchronized (_messageActors)
- {
- mc = (MessageConsumerImpl) _messageActors.get(message.getConsumerId());
- }
- if (mc != null)
- {
- try
- {
- // mc.onMessage(message.getMessage());
- mc.notifyMessageListener(message.getMessage());
- }
- catch (RuntimeException t)
- {
- // the JMS specification tells us to flag that to the client!
- _logger.error(
- "Warning! Asynchronous message consumer" + mc + " from session " + this + " has thrown a RunTimeException " + t);
- }
- }
- }
- message = null;
- }
- while (!_isClosing); // repeat as long as this session is not closing
- }
- }
-
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TemporaryDestination.java b/java/client/src/main/java/org/apache/qpidity/jms/TemporaryDestination.java
deleted file mode 100644
index 34d2cbd1b1..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/jms/TemporaryDestination.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.njms;
-
-import javax.jms.JMSException;
-
-/**
- * Interface to abstract functionalities of temporary destinations.
- */
-public interface TemporaryDestination
-{
- /**
- * Delete this temporary destination.
- *
- * @throws javax.jms.JMSException If the temporary destination cannot be deleted due to some internal error.
- */
- public void delete() throws JMSException;
-
- /**
- * Indicate whether this temporary destination is deleted
- * @return True is this temporary destination is deleted, false otherwise
- */
- public boolean isdeleted();
-
-}
\ No newline at end of file
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TemporaryQueueImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TemporaryQueueImpl.java
deleted file mode 100644
index 88dae74cc1..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/jms/TemporaryQueueImpl.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.njms;
-
-import org.apache.qpidity.QpidException;
-
-import javax.jms.TemporaryQueue;
-import javax.jms.JMSException;
-import java.util.UUID;
-
-/**
- * Implements TemporaryQueue
- */
-public class TemporaryQueueImpl extends QueueImpl implements TemporaryQueue, TemporaryDestination
-{
- /**
- * Indicates whether this temporary queue is deleted.
- */
- private boolean _isDeleted;
-
- /**
- * The session used to create this destination
- */
- private SessionImpl _session;
-
- //--- constructor
- /**
- * Create a new TemporaryQueueImpl.
- *
- * @param session The session used to create this TemporaryQueueImpl.
- * @throws QpidException If creating the TemporaryQueueImpl fails due to some error.
- */
- protected TemporaryQueueImpl(SessionImpl session) throws QpidException
- {
- super("TempQueue-" + UUID.randomUUID());
- // temporary destinations do not have names
- _isAutoDelete = false;
- _isDurable = false;
- _isExclusive = false;
- _isDeleted = false;
- _session = session;
- // we must create this queue
- registerQueue(session, true);
- }
-
- //-- TemporaryDestination Interface
- /**
- * Specify whether this temporary destination is deleted.
- *
- * @return true is this temporary destination is deleted.
- */
- public boolean isdeleted()
- {
- return _isDeleted;
- }
-
- //-- TemporaryQueue Interface
- /**
- * Delete this temporary destinaiton
- *
- * @throws JMSException If deleting this temporary queue fails due to some error.
- */
- public void delete() throws JMSException
- {
- if (!_isDeleted)
- {
- _session.getQpidSession().queueDelete(_queueName);
- }
- _isDeleted = true;
- }
-
-}
-
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TemporaryTopicImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TemporaryTopicImpl.java
deleted file mode 100644
index 63fdc68c65..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/jms/TemporaryTopicImpl.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.njms;
-
-import org.apache.qpidity.QpidException;
-
-import javax.jms.TemporaryTopic;
-import javax.jms.JMSException;
-import java.util.UUID;
-
-
-/**
- * Implements TemporaryTopic
- */
-public class TemporaryTopicImpl extends TopicImpl implements TemporaryTopic, TemporaryDestination
-{
- /**
- * Indicates whether this temporary topic is deleted.
- */
- private boolean _isDeleted = false;
-
- /**
- * The session used to create this destination
- */
- private SessionImpl _session;
-
- //--- constructor
- /**
- * Create a new TemporaryTopicImpl with a given name.
- *
- * @param session The session used to create this TemporaryTopicImpl.
- * @throws QpidException If creating the TemporaryTopicImpl fails due to some error.
- */
- protected TemporaryTopicImpl(SessionImpl session) throws QpidException
- {
- // temporary destinations do not have names.
- super(session, "TemporayTopic-" + UUID.randomUUID());
- _session = session;
- }
-
- //-- TemporaryDestination Interface
- public boolean isdeleted()
- {
- return _isDeleted;
- }
-
- //-- TemporaryTopic Interface
- public void delete() throws JMSException
- {
- if (!_isDeleted)
- {
- _session.getQpidSession().queueDelete(_queueName);
- }
- _isDeleted = true;
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TopicConnectionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TopicConnectionImpl.java
deleted file mode 100644
index 4174c6a5d5..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/jms/TopicConnectionImpl.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.njms;
-
-import org.apache.qpidity.QpidException;
-
-import javax.jms.TopicConnection;
-
-/**
- * Implements javax.njms.TopicConnection
- */
-public class TopicConnectionImpl extends ConnectionImpl implements TopicConnection
-{
- //-- constructor
- public TopicConnectionImpl(String host, int port, String virtualHost, String username, String password)
- throws QpidException
- {
- super(host, port, virtualHost, username, password);
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java
deleted file mode 100644
index fc6ea08897..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.njms;
-
-import org.apache.qpidity.QpidException;
-import org.apache.qpidity.exchange.ExchangeDefaults;
-import org.apache.qpidity.transport.Option;
-import org.apache.qpidity.url.BindingURL;
-
-import javax.jms.Topic;
-import java.util.UUID;
-
-/**
- * Implementation of the javax.njms.Topic interface.
- */
-public class TopicImpl extends DestinationImpl implements Topic
-{
- //--- Constructor
- /**
- * Create a new TopicImpl with a given name.
- *
- * @param name The name of this topic
- * @param session The session used to create this queue.
- * @throws QpidException If the topic name is not valid
- */
- protected TopicImpl(SessionImpl session, String name) throws QpidException
- {
- super(name);
- _queueName = "Topic-" + UUID.randomUUID();
- _routingKey = name;
- _destinationName = name;
- _exchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
- _exchangeType = ExchangeDefaults.TOPIC_EXCHANGE_CLASS;
- _isAutoDelete = true;
- _isDurable = false;
- _isExclusive = true;
- checkTopicExists(session);
- }
-
- /**
- * Create a new TopicImpl with a given name.
- *
- * @param name The name of this topic
- * @throws QpidException If the topic name is not valid
- */
- public TopicImpl(String name) throws QpidException
- {
- super(name);
- _queueName = "Topic-" + UUID.randomUUID();
- _routingKey = name;
- _destinationName = name;
- _exchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
- _exchangeType = ExchangeDefaults.TOPIC_EXCHANGE_CLASS;
- _isAutoDelete = true;
- _isDurable = false;
- _isExclusive = true;
- }
-
- /**
- * Create a TopicImpl from a binding URL
- *
- * @param session The session used to create this Topic.
- * @param binding The URL
- * @throws QpidException If the URL is not valid
- */
- protected TopicImpl(SessionImpl session, BindingURL binding) throws QpidException
- {
- super(binding);
- checkTopicExists(session);
- }
-
-
- /**
- * Create a TopicImpl from a binding URL
- *
- * @param binding The URL
- * @throws QpidException If the URL is not valid
- */
- public TopicImpl(BindingURL binding) throws QpidException
- {
- super(binding);
- }
-
- //--- javax.jsm.Topic Interface
- /**
- * Gets the name of this topic.
- *
- * @return This topic's name.
- */
- public String getTopicName()
- {
- return _destinationName;
- }
-
- /**
- * Check that this exchange exists
- *
- * @param session The session used to create this Topic.
- * @throws QpidException If this exchange does not exists on the broker.
- */
- private void checkTopicExists(SessionImpl session) throws QpidException
- {
- // test if this exchange exist on the broker
- session.getQpidSession().exchangeDeclare(_exchangeName, _exchangeType, null, null, Option.PASSIVE);
- // wait for the broker response
- System.out.println("Checking for exchange");
-
- session.getQpidSession().sync();
-
- System.out.println("Calling sync()");
- // todo get the exception
- }
-
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TopicPublisherImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TopicPublisherImpl.java
deleted file mode 100644
index 7f65261486..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/jms/TopicPublisherImpl.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.njms;
-
-import javax.jms.*;
-
-/**
- * Implements TopicPublisher
- */
-public class TopicPublisherImpl extends MessageProducerImpl implements TopicPublisher
-{
- //--- Constructor
- /**
- * Create a TopicPublisherImpl.
- *
- * @param session The session from which the TopicPublisherImpl is instantiated
- * @param topic The default topic for this TopicPublisherImpl
- * @throws JMSException If the TopicPublisherImpl cannot be created due to some internal error.
- */
- protected TopicPublisherImpl(SessionImpl session, Topic topic) throws JMSException
- {
- super(session, (DestinationImpl) topic);
- }
-
- //--- Interface javax.njms.TopicPublisher
- /**
- * Get the topic associated with this TopicPublisher.
- *
- * @return This publisher's topic
- * @throws JMSException If getting the topic fails due to some internal error.
- */
- public Topic getTopic() throws JMSException
- {
- return (Topic) getDestination();
- }
-
-
- /**
- * Publish a message to the topic using the default delivery mode, priority and time to live.
- *
- * @param message The message to publish
- * @throws JMSException If publishing the message fails due to some internal error.
- * @throws javax.jms.MessageFormatException
- * If an invalid message is specified.
- * @throws javax.jms.InvalidDestinationException
- * If an invalid topic is specified.
- * @throws java.lang.UnsupportedOperationException
- * If that publisher topic was not specified at creation time.
- */
- public void publish(Message message) throws JMSException
- {
- super.send(message);
- }
-
- /**
- * Publish a message to the topic, specifying delivery mode, priority and time to live.
- *
- * @param message The message to publish
- * @param deliveryMode The delivery mode to use
- * @param priority The priority for this message
- * @param timeToLive The message's lifetime (in milliseconds)
- * @throws JMSException If publishing the message fails due to some internal error.
- * @throws javax.jms.MessageFormatException
- * If an invalid message is specified.
- * @throws javax.jms.InvalidDestinationException
- * If an invalid topic is specified.
- * @throws java.lang.UnsupportedOperationException
- * If that publisher topic was not specified at creation time.
- */
- public void publish(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
- {
- super.send(message, deliveryMode, priority, timeToLive);
- }
-
-
- /**
- * Publish a message to a topic for an unidentified message producer.
- * Uses this TopicPublisher's default delivery mode, priority and time to live.
- *
- * @param topic The topic to publish this message to
- * @param message The message to publish
- * @throws JMSException If publishing the message fails due to some internal error.
- * @throws javax.jms.MessageFormatException
- * If an invalid message is specified.
- * @throws javax.jms.InvalidDestinationException
- * If an invalid topic is specified.
- */
- public void publish(Topic topic, Message message) throws JMSException
- {
- super.send(topic, message);
- }
-
- /**
- * Publishes a message to a topic for an unidentified message
- * producer, specifying delivery mode, priority and time to live.
- *
- * @param topic The topic to publish this message to
- * @param message The message to publish
- * @param deliveryMode The delivery mode
- * @param priority The priority for this message
- * @param timeToLive The message's lifetime (in milliseconds)
- * @throws JMSException If publishing the message fails due to some internal error.
- * @throws javax.jms.MessageFormatException
- * If an invalid message is specified.
- * @throws javax.jms.InvalidDestinationException
- * If an invalid topic is specified.
- */
- public void publish(Topic topic, Message message, int deliveryMode, int priority, long timeToLive) throws
- JMSException
- {
- super.send(topic, message, deliveryMode, priority, timeToLive);
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TopicSessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TopicSessionImpl.java
deleted file mode 100644
index 74c5a2c0f7..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/jms/TopicSessionImpl.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.njms;
-
-import javax.jms.*;
-import javax.jms.IllegalStateException;
-
-import org.apache.qpidity.QpidException;
-
-/**
- * Implements TopicSession
- */
-public class TopicSessionImpl extends SessionImpl implements TopicSession
-{
- //-- constructor
- /**
- * Create a new TopicSessionImpl.
- *
- * @param connection The ConnectionImpl object from which the Session is created.
- * @param transacted Specifiy whether this session is transacted?
- * @param acknowledgeMode The session's acknowledgement mode. This value is ignored and set to
- * {@link javax.jms.Session#SESSION_TRANSACTED} if the The value returned is the entire length of the message
- * body, regardless of where the pointer for reading the message
- * is currently located.
- *
- * @return Number of bytes in the message
- * @throws JMSException If reading the message body length fails due to some error.
- * @throws javax.jms.MessageNotReadableException
- * If the message is in write-only mode.
- */
- public long getBodyLength() throws JMSException
- {
- isReadable();
- return getMessageData().capacity();
- }
-
- /**
- * Reads a boolean.
- *
- * @return The boolean value read
- * @throws JMSException If reading fails due to some error.
- * @throws MessageEOFException If unexpected end of message data has been reached.
- * @throws javax.jms.MessageNotReadableException
- * If the message is in write-only mode.
- */
- public boolean readBoolean() throws JMSException
- {
- isReadable();
- try
- {
- return _dataIn.readBoolean();
- }
- catch (EOFException e)
- {
- throw new MessageEOFException("Reach end of data when reading message data");
- }
- catch (IOException ioe)
- {
- throw new JMSException("Problem when reading data", ioe.getLocalizedMessage());
- }
- }
-
- /**
- * Reads a signed 8-bit.
- *
- * @return The signed 8-bit read
- * @throws JMSException If reading a signed 8-bit fails due to some error.
- * @throws MessageEOFException If unexpected end of message data has been reached.
- * @throws javax.jms.MessageNotReadableException
- * If the message is in write-only mode.
- */
- public byte readByte() throws JMSException
- {
- isReadable();
- try
- {
- return _dataIn.readByte();
- }
- catch (EOFException e)
- {
- throw new MessageEOFException("Reach end of data when reading message data");
- }
- catch (IOException ioe)
- {
- throw new JMSException("Problem when reading data", ioe.getLocalizedMessage());
- }
- }
-
- /**
- * Reads an unsigned 8-bit.
- *
- * @return The signed 8-bit read
- * @throws JMSException If reading an unsigned 8-bit fails due to some error.
- * @throws MessageEOFException If unexpected end of message data has been reached.
- * @throws javax.jms.MessageNotReadableException
- * If the message is in write-only mode.
- */
- public int readUnsignedByte() throws JMSException
- {
- isReadable();
- try
- {
- return _dataIn.readUnsignedByte();
- }
- catch (EOFException e)
- {
- throw new MessageEOFException("Reach end of data when reading message data");
- }
- catch (IOException ioe)
- {
- throw new JMSException("Problem when reading data", ioe.getLocalizedMessage());
- }
- }
-
- /**
- * Reads a short.
- *
- * @return The short read
- * @throws JMSException If reading a short fails due to some error.
- * @throws MessageEOFException If unexpected end of message data has been reached.
- * @throws javax.jms.MessageNotReadableException
- * If the message is in write-only mode.
- */
- public short readShort() throws JMSException
- {
- isReadable();
- try
- {
- return _dataIn.readShort();
- }
- catch (EOFException e)
- {
- throw new MessageEOFException("Reach end of data when reading message data");
- }
- catch (IOException ioe)
- {
- throw new JMSException("Problem when reading data", ioe.getLocalizedMessage());
- }
- }
-
- /**
- * Reads an unsigned short.
- *
- * @return The unsigned short read
- * @throws JMSException If reading an unsigned short fails due to some error.
- * @throws MessageEOFException If unexpected end of message data has been reached.
- * @throws javax.jms.MessageNotReadableException
- * If the message is in write-only mode.
- */
- public int readUnsignedShort() throws JMSException
- {
- isReadable();
- try
- {
- return _dataIn.readUnsignedShort();
- }
- catch (EOFException e)
- {
- throw new MessageEOFException("Reach end of data when reading message data");
- }
- catch (IOException ioe)
- {
- throw new JMSException("Problem when reading data", ioe.getLocalizedMessage());
- }
- }
-
- /**
- * Reads a char.
- *
- * @return The char read
- * @throws JMSException If reading a char fails due to some error.
- * @throws MessageEOFException If unexpected end of message data has been reached.
- * @throws javax.jms.MessageNotReadableException
- * If the message is in write-only mode.
- */
- public char readChar() throws JMSException
- {
- isReadable();
- try
- {
- return _dataIn.readChar();
- }
- catch (EOFException e)
- {
- throw new MessageEOFException("Reach end of data when reading message data");
- }
- catch (IOException ioe)
- {
- throw new JMSException("Problem when reading data", ioe.getLocalizedMessage());
- }
- }
-
- /**
- * Reads an int.
- *
- * @return The int read
- * @throws JMSException If reading an int fails due to some error.
- * @throws MessageEOFException If unexpected end of message data has been reached.
- * @throws javax.jms.MessageNotReadableException
- * If the message is in write-only mode.
- */
- public int readInt() throws JMSException
- {
- isReadable();
- try
- {
- return _dataIn.readInt();
- }
- catch (EOFException e)
- {
- throw new MessageEOFException("Reach end of data when reading message data");
- }
- catch (IOException ioe)
- {
- throw new JMSException("Problem when reading data", ioe.getLocalizedMessage());
- }
- }
-
- /**
- * Reads a long.
- *
- * @return The long read
- * @throws JMSException If reading a long fails due to some error.
- * @throws MessageEOFException If unexpected end of message data has been reached.
- * @throws javax.jms.MessageNotReadableException
- * If the message is in write-only mode.
- */
- public long readLong() throws JMSException
- {
- isReadable();
- try
- {
- return _dataIn.readLong();
- }
- catch (EOFException e)
- {
- throw new MessageEOFException("Reach end of data when reading message data");
- }
- catch (IOException ioe)
- {
- throw new JMSException("Problem when reading data", ioe.getLocalizedMessage());
- }
- }
-
- /**
- * Read a float.
- *
- * @return The float read
- * @throws JMSException If reading a float fails due to some error.
- * @throws MessageEOFException If unexpected end of message data has been reached.
- * @throws javax.jms.MessageNotReadableException
- * If the message is in write-only mode.
- */
- public float readFloat() throws JMSException
- {
- isReadable();
- try
- {
- return _dataIn.readFloat();
- }
- catch (EOFException e)
- {
- throw new MessageEOFException("Reach end of data when reading message data");
- }
- catch (IOException ioe)
- {
- throw new JMSException("Problem when reading data", ioe.getLocalizedMessage());
- }
- }
-
- /**
- * Read a double.
- *
- * @return The double read
- * @throws JMSException If reading a double fails due to some error.
- * @throws MessageEOFException If unexpected end of message data has been reached.
- * @throws javax.jms.MessageNotReadableException
- * If the message is in write-only mode.
- */
- public double readDouble() throws JMSException
- {
- isReadable();
- try
- {
- return _dataIn.readDouble();
- }
- catch (EOFException e)
- {
- throw new MessageEOFException("Reach end of data when reading message data");
- }
- catch (IOException ioe)
- {
- throw new JMSException("Problem when reading data", ioe.getLocalizedMessage());
- }
- }
-
- /**
- * Reads a string that has been encoded using a modified UTF-8 format.
- *
- * @return The String read
- * @throws JMSException If reading a String fails due to some error.
- * @throws MessageEOFException If unexpected end of message data has been reached.
- * @throws javax.jms.MessageNotReadableException
- * If the message is in write-only mode.
- */
- public String readUTF() throws JMSException
- {
- isReadable();
- try
- {
- return _dataIn.readUTF();
- }
- catch (EOFException e)
- {
- throw new MessageEOFException("Reach end of data when reading message data");
- }
- catch (IOException ioe)
- {
- throw new JMSException("Problem when reading data", ioe.getLocalizedMessage());
- }
- }
-
- /**
- * Reads a byte array from the bytes message data.
- * JMS sepc says:
- * If the length of array If the number of bytes remaining in the stream is less than the
- * length of
- * array The JMS spec says
- * If the length of array If the number of bytes remaining in the stream is less than the
- * length of array If This method works only for the objectified primitive
- * object types Integer, Double, Long, String and byte
- * arrays.
- *
- * @param val The short value to be written
- * @throws JMSException If writting a short fails due to some error.
- * @throws NullPointerException if the parameter val is null.
- * @throws MessageFormatException If the object is of an invalid type.
- * @throws javax.jms.MessageNotWriteableException
- * If the message is in read-only mode.
- */
- public void writeObject(Object val) throws JMSException
- {
- if (val == null)
- {
- throw new NullPointerException("Cannot write null value to message");
- }
- if (val instanceof byte[])
- {
- writeBytes((byte[]) val);
- }
- else if (val instanceof String)
- {
- writeUTF((String) val);
- }
- else if (val instanceof Boolean)
- {
- writeBoolean(((Boolean) val).booleanValue());
- }
- else if (val instanceof Number)
- {
- if (val instanceof Byte)
- {
- writeByte(((Byte) val).byteValue());
- }
- else if (val instanceof Short)
- {
- writeShort(((Short) val).shortValue());
- }
- else if (val instanceof Integer)
- {
- writeInt(((Integer) val).intValue());
- }
- else if (val instanceof Long)
- {
- writeLong(((Long) val).longValue());
- }
- else if (val instanceof Float)
- {
- writeFloat(((Float) val).floatValue());
- }
- else if (val instanceof Double)
- {
- writeDouble(((Double) val).doubleValue());
- }
- else
- {
- throw new MessageFormatException("Trying to write an invalid obejct type: " + val);
- }
- }
- else if (val instanceof Character)
- {
- writeChar(((Character) val).charValue());
- }
- else
- {
- throw new MessageFormatException("Trying to write an invalid obejct type: " + val);
- }
- }
-
- /**
- * Puts the message body in read-only mode and repositions the stream of
- * bytes to the beginning.
- *
- * @throws JMSException If resetting the message fails due to some internal error.
- * @throws MessageFormatException If the message has an invalid format.
- */
- public void reset() throws JMSException
- {
- _readOnly = true;
- if (_dataIn == null)
- {
- // We were writting on this messsage so now read it
- _dataIn = new DataInputStream(new ByteArrayInputStream(_storedData.toByteArray()));
- }
- else
- {
- // We were reading so reset it
- try
- {
- _dataIn.reset();
- }
- catch (IOException e)
- {
- if (_logger.isDebugEnabled())
- {
- // we log this exception as this should not happen
- _logger.debug("Problem when resetting message: ", e);
- }
- throw new JMSException("Problem when resetting message: " + e.getLocalizedMessage());
- }
- }
- }
-
- //-- overwritten methods
- /**
- * Clear out the message body. Clearing a message's body does not clear
- * its header values or property entries.
- * If this message body was read-only, calling this method leaves
- * the message body is in the same state as an empty body in a newly
- * created message.
- *
- * @throws JMSException If clearing this message body fails to due to some error.
- */
- public void clearBody() throws JMSException
- {
- super.clearBody();
- _dataIn = null;
- _storedData = new ByteArrayOutputStream();
- _dataOut = new DataOutputStream(_storedData);
- }
-
-
- /**
- * This method is invoked before a message dispatch operation.
- *
- * @throws org.apache.qpidity.QpidException
- * If the destination is not set
- */
- public void beforeMessageDispatch() throws QpidException
- {
- if (_dataOut.size() > 0)
- {
- setMessageData(ByteBuffer.wrap(_storedData.toByteArray()));
- }
- super.beforeMessageDispatch();
- }
-
- /**
- * This method is invoked after this message is received.
- *
- * @throws QpidException
- */
- @Override
- public void afterMessageReceive() throws QpidException
- {
- super.afterMessageReceive();
- ByteBuffer messageData = getMessageData();
- if (messageData != null)
- {
- try
- {
- _dataIn = new DataInputStream(asInputStream());
- }
- catch (Exception e)
- {
- throw new QpidException("Cannot retrieve data from message ", null, e);
- }
- }
- }
-
- //-- helper mehtods
- /**
- * Test whether this message is readable by throwing a MessageNotReadableException if this
- * message cannot be read.
- *
- * @throws MessageNotReadableException If this message cannot be read.
- */
- protected void isReadable() throws MessageNotReadableException
- {
- if (_dataIn == null)
- {
- throw new MessageNotReadableException("Cannot read this message");
- }
- }
-}
-
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java
deleted file mode 100644
index 26012e63b3..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java
+++ /dev/null
@@ -1,628 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.njms.message;
-
-import org.apache.qpidity.QpidException;
-
-import javax.jms.MapMessage;
-import javax.jms.JMSException;
-import javax.jms.MessageFormatException;
-import java.util.Enumeration;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.Vector;
-import java.io.*;
-import java.nio.ByteBuffer;
-
-/**
- * Implements javax.njms.MapMessage
- */
-public class MapMessageImpl extends MessageImpl implements MapMessage
-{
-
- /**
- * The MapMessage's payload.
- */
- private Map When a message is sent this value is ignored. After completion
- * of the send method it holds the destination specified by the send.
- * When a message is received, its destination value must be
- * equivalent to the value assigned when it was sent.
- */
- private Destination _destination;
-
- /**
- * Indicates whether the message properties are in writeable status.
- */
- protected boolean _readOnly = false;
-
- /**
- * Indicate whether the message properties are in writeable status.
- */
- protected boolean _propertiesReadOnly = false;
-
- /**
- * The message consumer through which this message was received.
- */
- private MessageConsumerImpl _messageConsumer;
-
- //--- Constructor
- /**
- * Constructor used by SessionImpl.
- */
- public MessageImpl()
- {
- super();
- setMessageType(String.valueOf(MessageFactory.JAVAX_JMS_MESSAGE));
- }
-
- /**
- * Constructor used by MessageFactory
- *
- * @param message The new qpid message.
- * @throws QpidException In case of IO problem when reading the received message.
- */
- protected MessageImpl(org.apache.qpidity.api.Message message) throws QpidException
- {
- super(message);
- }
-
- //---- javax.njms.Message interface
- /**
- * Get the message ID.
- * The JMS sprec says:
- * The messageID header field contains a value that uniquely
- * identifies each message sent by a provider.
- * When a message is sent, messageID can be ignored. When
- * the send method returns it contains a provider-assigned value.
- * All JMSMessageID values must start with the prefix `ID:'.
- * Uniqueness of message ID values across different providers is
- * not required.
- *
- * @return The message ID
- * @throws JMSException If getting the message Id fails due to internal error.
- */
- public String getJMSMessageID() throws JMSException
- {
- String messageID = super.getMessageID();
-
- if (messageID != null)
- {
- messageID = "ID:" + messageID;
- }
- return messageID;
- }
-
- /**
- * Set the message ID.
- * The JMS spec says:
- * Providers set this field when a message is sent. This operation
- * can be used to change the value of a message that's been received.
- *
- * @param messageID The ID of the message
- * @throws JMSException If setting the message Id fails due to internal error.
- */
- public void setJMSMessageID(String messageID) throws JMSException
- {
- String qpidmessageID = null;
- if (messageID != null)
- {
- if (messageID.substring(0, 3).equals("ID:"))
- {
- qpidmessageID = messageID.substring(3, messageID.length());
- }
- }
- super.setMessageID(qpidmessageID);
- }
-
- /**
- * Get the message timestamp.
- * The JMS sepc says:
- * The JMSTimestamp header field contains the time a message was
- * handed off to a provider to be sent. It is not the time the
- * message was actually transmitted because the actual send may occur
- * later due to transactions or other client side queueing of messages.
- * When a message is sent, JMSTimestamp is ignored. When the send
- * method returns it contains a a time value somewhere in the interval
- * between the call and the return. It is in the format of a normal
- * Java millis time value.
- * Since timestamps take some effort to create and increase a
- * message's size, some JMS providers may be able to optimize message
- * overhead if they are given a hint that timestamp is not used by an
- * application. JMS message Producers provide a hint to disable
- * timestamps. When a client sets a producer to disable timestamps
- * they are saying that they do not depend on the value of timestamp
- * for the messages it produces. These messages must either have
- * timestamp set to null or, if the hint is ignored, timestamp must
- * be set to its normal value.
- *
- * @return the message timestamp
- * @throws JMSException If getting the Timestamp fails due to internal error.
- */
- public long getJMSTimestamp() throws JMSException
- {
- return super.getTimestamp();
- }
-
- /**
- * Set the message timestamp.
- * The JMS spec says:
- * Providers set this field when a message is sent. This operation
- * can be used to change the value of a message that's been received.
- *
- * @param timestamp The timestamp for this message
- * @throws JMSException If setting the timestamp fails due to some internal error.
- */
- public void setJMSTimestamp(long timestamp) throws JMSException
- {
- super.setTimestamp(timestamp);
- }
-
- /**
- * Get the correlation ID as an array of bytes for the message.
- * JMS spec says:
- * The use of a byte[] value for JMSCorrelationID is non-portable.
- *
- * @return the correlation ID of a message as an array of bytes.
- * @throws JMSException If getting correlationId fails due to some internal error.
- */
- public byte[] getJMSCorrelationIDAsBytes() throws JMSException
- {
- String correlationID = getJMSCorrelationID();
- if (correlationID != null)
- {
- return correlationID.getBytes();
- }
- return null;
- }
-
- /**
- * Set the correlation ID as an array of bytes for the message.
- * JMS spec says:
- * If a provider supports the native concept of correlation id, a
- * JMS client may need to assign specific JMSCorrelationID values to
- * match those expected by non-JMS clients. JMS providers without native
- * correlation id values are not required to support this (and the
- * corresponding get) method; their implementation may throw
- * java.lang.UnsupportedOperationException).
- * The use of a byte[] value for JMSCorrelationID is non-portable.
- *
- * @param correlationID The correlation ID value as an array of bytes.
- * @throws JMSException If setting correlationId fails due to some internal error.
- */
- public void setJMSCorrelationIDAsBytes(byte[] correlationID) throws JMSException
- {
- setJMSCorrelationID(new String(correlationID));
- }
-
- /**
- * Set the correlation ID for the message.
- * JMS spec says:
- * A client can use the JMSCorrelationID header field to link one
- * message with another. A typically use is to link a response message
- * with its request message.
- * Since each message sent by a JMS provider is assigned a message ID
- * value it is convenient to link messages via message ID. All message ID
- * values must start with the `ID:' prefix.
- * In some cases, an application (made up of several clients) needs to
- * use an application specific value for linking messages. For instance,
- * an application may use JMSCorrelationID to hold a value referencing
- * some external information. Application specified values must not start
- * with the `ID:' prefix; this is reserved for provider-generated message
- * ID values.
- *
- * @param correlationID The message ID of a message being referred to.
- * @throws JMSException If setting the correlationId fails due to some internal error.
- */
- public void setJMSCorrelationID(String correlationID) throws JMSException
- {
- super.setCorrelationID(correlationID);
-
- }
-
- /**
- * Get the correlation ID for the message.
- *
- * @return The correlation ID of a message as a String.
- * @throws JMSException If getting the correlationId fails due to some internal error.
- */
- public String getJMSCorrelationID() throws JMSException
- {
- return super.getCorrelationID();
- }
-
- /**
- * Get where a reply to this message should be sent.
- *
- * @return The destination where a reply to this message should be sent.
- * @throws JMSException If getting the ReplyTo Destination fails due to some internal error.
- */
- public Destination getJMSReplyTo() throws JMSException
- {
- return _replyTo;
- }
-
- /**
- * Set where a reply to this message should be sent.
- * The JMS spec says:
- * The replyTo header field contains the destination where a reply
- * to the current message should be sent. If it is null no reply is
- * expected. The destination may be either a Queue or a Topic.
- * Messages with a null replyTo value are called JMS datagrams.
- * Datagrams may be a notification of some change in the sender (i.e.
- * they signal a sender event) or they may just be some data the sender
- * thinks is of interest.
- * Messages with a replyTo value are typically expecting a response.
- * A response may be optional, it is up to the client to decide. These
- * messages are called JMS requests. A message sent in response to a
- * request is called a reply.
- *
- * @param destination The destination where a reply to this message should be sent.
- * @throws JMSException If setting the ReplyTo Destination fails due to some internal error.
- */
- public void setJMSReplyTo(Destination destination) throws JMSException
- {
- _replyTo = destination;
- }
-
- /**
- * Get the destination for this message.
- * The JMS spec says:
- * The destination field contains the destination to which the
- * message is being sent.
- * When a message is sent this value is ignored. After completion
- * of the send method it holds the destination specified by the send.
- * When a message is received, its destination value must be
- * equivalent to the value assigned when it was sent.
- *
- * @return The destination of this message.
- * @throws JMSException If getting the JMS Destination fails due to some internal error.
- */
- public Destination getJMSDestination() throws JMSException
- {
- return _destination;
- }
-
- /**
- * Set the destination for this message.
- * The JMS spec says:
- * Providers set this field when a message is sent. This operation
- * can be used to change the value of a message that's been received.
- *
- * @param deliveryMode the delivery mode for this message.
- * @throws JMSException If setting the JMS DeliveryMode fails due to some internal error.
- */
- public void setJMSDeliveryMode(int deliveryMode) throws JMSException
- {
- short amqpDeliveryMode = DELIVERY_MODE_PERSISTENT;
- if (deliveryMode == DeliveryMode.NON_PERSISTENT)
- {
- amqpDeliveryMode = DELIVERY_MODE_NON_PERSISTENT;
- }
- else if (deliveryMode != DeliveryMode.PERSISTENT)
- {
- throw new JMSException(
- "Problem when setting message delivery mode, " + deliveryMode + " is not a valid mode");
- }
- try
- {
- super.setDeliveryMode(amqpDeliveryMode);
- }
- catch (QpidException e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- }
-
- /**
- * Get an indication of whether this message is being redelivered.
- * The JMS spec says:
- * If a client receives a message with the redelivered indicator set,
- * it is likely, but not guaranteed, that this message was delivered to
- * the client earlier but the client did not acknowledge its receipt at
- * that earlier time.
- *
- * @return true if this message is being redelivered, false otherwise
- * @throws JMSException If getting the JMS Redelivered fails due to some internal error.
- */
- public boolean getJMSRedelivered() throws JMSException
- {
- return super.getRedelivered();
- }
-
- /**
- * Indicate whether this message is being redelivered.
- * The JMS spec says:
- * This field is set at the time the message is delivered. This
- * operation can be used to change the value of a message that's
- * been received.
- *
- * @param redelivered true indicates that the message is being redelivered.
- * @throws JMSException If setting the JMS Redelivered fails due to some internal error.
- */
- public void setJMSRedelivered(boolean redelivered) throws JMSException
- {
- super.setRedelivered(redelivered);
- }
-
- /**
- * Get the message type.
- * The JMS spec says:
- * Some JMS providers use a message repository that contains the
- * definition of messages sent by applications. The type header field
- * contains the name of a message's definition.
- * JMS does not define a standard message definition repository nor
- * does it define a naming policy for the definitions it contains. JMS
- * clients should use symbolic values for type that can be configured
- * at installation time to the values defined in the current providers
- * message repository.
- * JMS clients should assign a value to type whether the application
- * makes use of it or not. This insures that it is properly set for
- * those providers that require it.
- *
- * @return The message type
- * @throws JMSException If getting the JMS message type fails due to some internal error.
- */
- public String getJMSType() throws JMSException
- {
- return getStringProperty(JMS_MESSAGE_TYPE);
- }
-
- /**
- * Set this message type.
- *
- * @param type The type of message.
- * @throws JMSException If setting the JMS message type fails due to some internal error.
- */
- public void setJMSType(String type) throws JMSException
- {
- if (type == null)
- {
- throw new JMSException("Invalid message type null");
- }
- else
- {
- super.setProperty(JMS_MESSAGE_TYPE, type);
- }
- }
-
- /**
- * Get the message's expiration value.
- * The JMS spec says:
- * When a message is sent, expiration is left unassigned. After
- * completion of the send method, it holds the expiration time of the
- * message. This is the sum of the time-to-live value specified by the
- * client and the GMT at the time of the send.
- * If the time-to-live is specified as zero, expiration is set to
- * zero which indicates the message does not expire.
- *
- * @return The time the message expires.
- * @throws JMSException If getting the JMS message expiration fails due to some internal error.
- */
- public long getJMSExpiration() throws JMSException
- {
- return super.getExpiration();
- }
-
- /**
- * Set the message's expiration value.
- *
- * @param expiration the message's expiration time
- * @throws JMSException If setting the JMS message expiration fails due to some internal error.
- */
- public void setJMSExpiration(long expiration) throws JMSException
- {
- super.setExpiration(expiration);
- }
-
-
- /**
- * Get the message priority.
- * The JMS spec says:
- * JMS defines a ten level priority value with 0 as the lowest
- * priority and 9 as the highest. In addition, clients should consider
- * priorities 0-4 as gradations of normal priority and priorities 5-9
- * as gradations of expedited priority.
- *
- * @return The message priority.
- * @throws JMSException If getting the JMS message priority fails due to some internal error.
- */
- public int getJMSPriority() throws JMSException
- {
- return super.getMessagePriority();
- }
-
- /**
- * Set the priority for this message.
- *
- * @param priority The priority of this message.
- * @throws JMSException If setting the JMS message priority fails due to some internal error.
- */
- public void setJMSPriority(int priority) throws JMSException
- {
- super.setMessagePriority((short) priority);
- }
-
- /**
- * Clear the message's properties.
- * The JMS spec says:
- * The setObjectProperty method accepts values of class Boolean, Byte, Short, Integer,
- * Long, Float, Double, and String. An attempt to use any other class must throw a JMSException.
- *
- * @param name the name of the Java object property.
- * @param value the Java object property value to set in the Message.
- * @throws JMSException If setting the property fails due to some internal JMS error.
- * @throws MessageFormatException If the object is invalid
- * @throws MessageNotWriteableException If the message properties are read-only.
- */
- public void setObjectProperty(String name, Object value) throws JMSException
- {
- if (_propertiesReadOnly)
- {
- throw new MessageNotWriteableException("Error the message properties are read only");
- }
- if (!(value instanceof String || value instanceof Byte || value instanceof Short || value instanceof Integer || value instanceof Long || value instanceof Float || value instanceof Double || value instanceof Boolean || value == null))
- {
- throw new MessageFormatException("Format of object " + value + " is not supported");
- }
- super.setProperty(name, value);
- }
-
- /**
- * Acknowledgment of a message automatically acknowledges all
- * messages previously received by the session. Clients may
- * individually acknowledge messages or they may choose to acknowledge
- * messages in application defined groups (which is done by acknowledging
- * the last received message in the group).
- *
- * @throws JMSException If this method is called on a closed session.
- */
- public void acknowledge() throws JMSException
- {
- _messageConsumer.getSession().acknowledge();
- }
-
- /**
- * Clear out the message body. Clearing a message's body does not clear
- * its header values or property entries.
- * If this message body was read-only, calling this method leaves
- * the message body in the same state as an empty body in a newly
- * created message.
- *
- * @throws JMSException If clearing this message body fails to due to some error.
- */
- public void clearBody() throws JMSException
- {
- super.clearMessageData();
- _readOnly = false;
- }
-
- //--- Additional public methods
- /**
- * This method is invoked before a message dispatch operation.
- *
- * @throws QpidException If the destination is not set
- */
- public void beforeMessageDispatch() throws QpidException
- {
- if (_destination == null)
- {
- throw new QpidException("Invalid destination null", null, null);
- }
- super.beforeMessageDispatch();
- }
-
- /**
- * This method is invoked after this message is received.
- *
- * @throws QpidException If there is an internal error when procesing this message.
- */
- @Override
- public void afterMessageReceive() throws QpidException
- {
- // recreate a destination object for the encoded destination
- // _destination = // todo
- // recreate a destination object for the encoded ReplyTo destination (if it exists)
- // _replyTo = // todo
-
- _propertiesReadOnly = true;
- _readOnly = true;
- }
-
- /**
- * Test whether this message is readonly by throwing a MessageNotWriteableException if this
- * message is readonly
- *
- * @throws MessageNotWriteableException If this message is readonly
- */
- protected void isWriteable() throws MessageNotWriteableException
- {
- if (_readOnly)
- {
- throw new MessageNotWriteableException("Cannot update message");
- }
- }
-
- /**
- * Set the MessageConsumerImpl through which this message was received.
- * This method is called after a message is received.
- *
- * @param messageConsumer the MessageConsumerImpl reference through which this message was received.
- */
- public void setMessageConsumer(MessageConsumerImpl messageConsumer)
- {
- _messageConsumer = messageConsumer;
- }
-
- /**
- * Returns an {@link java.io.InputStream} that reads the data from this mesage buffer.
- * {@link java.io.InputStream#read()} returns -1 if the buffer position
- * reaches to the limit.
- *
- * @return An {@link java.io.InputStream} that reads the data from this mesage buffer.
- */
- public InputStream asInputStream()
- {
- return new InputStream()
- {
- @Override
- public int available()
- {
- return getMessageData().remaining();
- }
-
- @Override
- public synchronized void mark(int readlimit)
- {
- getMessageData().mark();
- }
-
- @Override
- public boolean markSupported()
- {
- return true;
- }
-
- @Override
- public int read()
- {
- if (getMessageData().hasRemaining())
- {
- return getMessageData().get() & 0xff;
- }
- else
- {
- return -1;
- }
- }
-
- @Override
- public int read(byte[] b, int off, int len)
- {
- int remaining = getMessageData().remaining();
- if (remaining > 0)
- {
- int readBytes = Math.min(remaining, len);
- getMessageData().get(b, off, readBytes);
- return readBytes;
- }
- else
- {
- return -1;
- }
- }
-
- @Override
- public synchronized void reset()
- {
- getMessageData().reset();
- }
-
- @Override
- public long skip(long n)
- {
- int bytes;
- if (n > Integer.MAX_VALUE)
- {
- bytes = getMessageData().remaining();
- }
- else
- {
- bytes = Math.min(getMessageData().remaining(), (int) n);
- }
- getMessageData().position(getMessageData().position() + bytes);
- return bytes;
- }
- };
- }
-
-
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java
deleted file mode 100644
index 70afddc9a3..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.njms.message;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.qpidity.QpidException;
-
-import javax.jms.ObjectMessage;
-import javax.jms.JMSException;
-import javax.jms.MessageNotWriteableException;
-import java.io.*;
-import java.nio.ByteBuffer;
-
-/**
- * Implemetns javax.njms.ObjectMessage
- */
-public class ObjectMessageImpl extends MessageImpl implements ObjectMessage
-{
-
- /**
- * this ObjectMessageImpl's logger
- */
- private static final Logger _logger = LoggerFactory.getLogger(ObjectMessageImpl.class);
-
- /**
- * The ObjectMessage's payload.
- */
- private Serializable _object = null;
-
- //--- Constructor
- /**
- * Constructor used by SessionImpl.
- */
- public ObjectMessageImpl()
- {
- super();
- setMessageType(String.valueOf(MessageFactory.JAVAX_JMS_OBJECTMESSAGE));
- }
-
- /**
- * Constructor used by MessageFactory
- *
- * @param message The new qpid message.
- * @throws QpidException In case of IO problem when reading the received message.
- */
- protected ObjectMessageImpl(org.apache.qpidity.api.Message message) throws QpidException
- {
- super(message);
- }
-
- //--- Interface ObjctMessage
- /**
- * Sets the serializable object containing this message's data.
- * JE JMS spec says:
- * It is important to note that an A value written as the row type can be read as the column type.
- * The JMS spec says:
- * To read the field value, readBytes should be successively called until
- * it returns a value less than the length
- * of the read buffer. The value of the bytes in the buffer following the last byte read is undefined.
- *
- * @param value The byte array into which the data is read.
- * @return the total number of bytes read into the array, or -1 if
- * there is no more data because the end of the byte field has been
- * reached.
- * @throws JMSException If reading fails due to some error.
- * @throws javax.jms.MessageEOFException If unexpected end of message data has been reached.
- * @throws javax.jms.MessageNotReadableException
- * If the message is in write-only mode.
- * @throws MessageFormatException If this type conversion is invalid.
- */
- public int readBytes(byte[] value) throws JMSException
- {
- isReadable();
- int result = -1;
- try
- {
- byte type = BYTEARRAY;
- if (_sizeOfByteArray == 0)
- {
- // we are not in the middle of reading this byte array
- _dataIn.mark(10);
- type = _dataIn.readByte();
- }
- switch (type)
- {
- case BYTEARRAY:
- if (_sizeOfByteArray == 0)
- {
- // we need to read the size of this byte array
- _sizeOfByteArray = _dataIn.readInt();
- }
- result = _dataIn.read(value, 0, value.length);
- if (result != -1)
- {
- _sizeOfByteArray = _sizeOfByteArray - result;
- }
- else
- {
- _sizeOfByteArray = 0;
- }
- case NULL:
- // result = -1;
- break;
- default:
- _dataIn.reset();
- throw new MessageFormatException("Invalid Object Type");
- }
- }
- catch (EOFException eof)
- {
- throw new MessageEOFException("End of file Reached when reading message");
- }
- catch (IOException io)
- {
- throw new JMSException("IO exception when reading message");
- }
- return result;
- }
-
- /**
- * Reads an object from the stream message.
- * The JMS spec says:
- * This method can be used to return, in objectified format,
- * an object in the Java programming language ("Java object") that has
- * been written to the stream with the equivalent
- * An attempt to call This method works only for the objectified primitive
- * object types Integer, Double, Long, String and byte
- * arrays.
- *
- * @param val The short value to be written
- * @throws JMSException If writting a short fails due to some error.
- * @throws NullPointerException if the parameter val is null.
- * @throws MessageFormatException If the object is of an invalid type.
- * @throws javax.jms.MessageNotWriteableException
- * If the message is in read-only mode.
- */
- public void writeObject(Object val) throws JMSException
- {
- isWriteable();
- try
- {
- if (val == null)
- {
- _dataOut.writeShort(NULL);
- }
- else if (val instanceof Byte)
- {
- writeByte((Byte) val);
- }
- else if (val instanceof Boolean)
- {
- writeBoolean((Boolean) val);
- }
- else if (val instanceof Short)
- {
- writeShort((Short) val);
- }
- else if (val instanceof Integer)
- {
- writeInt((Integer) val);
- }
- else if (val instanceof Long)
- {
- writeLong((Long) val);
- }
- else if (val instanceof Double)
- {
- writeDouble((Double) val);
- }
- else if (val instanceof Float)
- {
- writeFloat((Float) val);
- }
- else if (val instanceof Character)
- {
- writeChar((Character) val);
- }
- else if (val instanceof String)
- {
- writeString((String) val);
- }
- else if (val instanceof byte[])
- {
- writeBytes((byte[]) val);
- }
- else
- {
- throw new MessageFormatException(
- "The data type of the object specified as the value to writeObject " + "was of an invalid type.");
- }
- }
- catch (IOException e)
- {
- throw new JMSException("IO problem when writting " + e.getLocalizedMessage());
- }
- }
-
- //-- overwritten methods
- /**
- * Test whether this message is readable by throwing a MessageNotReadableException if this
- * message cannot be read.
- *
- * @throws javax.jms.MessageNotReadableException
- * If this message cannot be read.
- * @throws javax.jms.MessageFormatException
- * If reading a byte array.
- */
- protected void isReadableAndNotReadingByteArray() throws MessageNotReadableException, MessageFormatException
- {
- if (_dataIn == null)
- {
- throw new MessageNotReadableException("Cannot read this message");
- }
- if (_sizeOfByteArray > 0)
- {
- throw new MessageFormatException(
- "Read of object attempted while incomplete byteArray stored in message " + "- finish reading byte array first.");
- }
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java
deleted file mode 100644
index 577b586fe2..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.njms.message;
-
-import org.apache.qpidity.QpidException;
-
-import javax.jms.TextMessage;
-import javax.jms.JMSException;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.charset.*;
-import java.io.UnsupportedEncodingException;
-
-/**
- * Implements the interface javax.njms.TextMessage
- */
-public class TextMessageImpl extends MessageImpl implements TextMessage
-{
- /**
- * The character encoding for converting non ASCII characters
- * Default UTF-16
- */
- private static final String CHARACTER_ENCODING = "UTF-16";
-
- /**
- * This message text. The byte form is set when this message is sent
- * the text is set when the message is received.
- */
- private String _messageText;
-
- //--- Constructor
- /**
- * Constructor used by SessionImpl.
- */
- public TextMessageImpl()
- {
- super();
- setMessageType(String.valueOf(MessageFactory.JAVAX_JMS_STREAMMESSAGE));
- }
-
- /**
- * Constructor used by MessageFactory
- *
- * @param message The new qpid message.
- * @throws QpidException In case of IO problem when reading the received message.
- */
- protected TextMessageImpl(org.apache.qpidity.api.Message message) throws QpidException
- {
- super(message);
- }
-
- //--- interface TextMessage
-
- public String getText() throws JMSException
- {
- return _messageText;
- }
-
- /**
- * Set the text (String) of this TextMessage.
- *
- * @param text The String containing the text.
- * @throws JMSException If setting the text fails due some error.
- * @throws javax.jms.MessageNotWriteableException
- * If message in read-only mode.
- */
- public void setText(String text) throws JMSException
- {
- isWriteable();
- _messageText = text;
- }
-
- //-- Overwritten methods
-
- /**
- * This method is invoked before this message is dispatched.
- * This class uses it to convert its text payload into a ByteBuffer
- */
- public void beforeMessageDispatch() throws QpidException
- {
- if (_messageText != null)
- {
- // set this message data
- try
- {
- setMessageData(ByteBuffer.wrap(_messageText.getBytes(CHARACTER_ENCODING)));
- }
- catch (UnsupportedEncodingException e)
- {
- throw new QpidException("Problem when encoding text " + _messageText, null, e);
- }
- }
- super.beforeMessageDispatch();
- }
-
-
- /**
- * This method is invoked after this message has been received.
- */
- @Override
- public void afterMessageReceive() throws QpidException
- {
- super.afterMessageReceive();
- ByteBuffer messageData = getMessageData();
- if (messageData != null)
- {
- try
- {
- _messageText = getString();
- }
- catch (Exception e)
- {
- throw new QpidException("Problem when decoding text", null, e);
- }
- }
- }
-
- /**
- * Clear out the message body. Clearing a message's body does not clear
- * its header values or property entries.
- * If this message body was read-only, calling this method leaves
- * the message body is in the same state as an empty body in a newly
- * created message.
- *
- * @throws JMSException If clearing this message body fails to due to some error.
- */
- public void clearBody() throws JMSException
- {
- super.clearBody();
- _messageText = null;
- }
-
- /**
- * This method is taken from Mina code
- *
- * Reads a FileMessage and StreamingMessage
- * ConnectionFactoryImpl as the
- * factory for creating the objects. This is the factory (or
- * {@link ObjectFactory}) that is used to turn the description in to a real object.
- * ConnectionFactoryImpl must be on the
- * classpath when you do a lookup in a JNDI context.. else you'll get a
- * ClassNotFoundEx.
- */
-public class ConnectionFactoryImpl implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory,
- XATopicConnectionFactory, XAQueueConnectionFactory, XAConnectionFactory,
- ObjectFactory, Referenceable
-{
- /**
- * this ConnectionFactoryImpl's logger
- */
- private static final Logger _logger = LoggerFactory.getLogger(ConnectionFactoryImpl.class);
-
- /**
- * The virtual host on which the broker is deployed.
- */
- private String _host;
- /**
- * The port on which the broker is listening for connection.
- */
- private int _port;
- /**
- * The default user name used of user identification.
- */
- private String _defaultUsername;
- /**
- * The default password used of user identification.
- */
- private String _defaultPassword;
- /**
- * The virtual host on which the broker is deployed.
- */
- private String _virtualHost;
- /**
- * The URL used to build this factory, (not yet supported)
- */
- private QpidURL _qpidURL;
-
- // Undefined at the moment
- public ConnectionFactoryImpl(QpidURL url)
- {
- _qpidURL = url;
- }
-
- public ConnectionFactoryImpl(String url) throws MalformedURLException
- {
- _qpidURL = new QpidURLImpl(url);
- BrokerDetails bd = _qpidURL.getAllBrokerDetails().get(0);
- _host = bd.getHost();
- _port = bd.getPort();
- _defaultUsername = bd.getUserName();
- _defaultPassword = bd.getPassword();
- _virtualHost = bd.getVirtualHost();
- }
-
- /**
- * Create a connection Factory
- *
- * @param host The broker host name.
- * @param port The port on which the broker is listening for connection.
- * @param virtualHost The virtual host on which the broker is deployed.
- * @param defaultUsername The user name used of user identification.
- * @param defaultPassword The password used of user identification.
- */
- public ConnectionFactoryImpl(String host, int port, String virtualHost, String defaultUsername,
- String defaultPassword)
- {
- _host = host;
- _port = port;
- _defaultUsername = defaultUsername;
- _defaultPassword = defaultPassword;
- _virtualHost = virtualHost;
- }
-
- //-- Interface ConnectionFactory
-
- /**
- * Creates a connection with the default user identity.
- * Connection.start method
- * is explicitly called.
- *
- * @return A newly created connection.
- * @throws JMSException If creating the connection fails due to some internal error.
- * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
- */
- public Connection createConnection() throws JMSException
- {
- try
- {
- return new ConnectionImpl(_host, _port, _virtualHost, _defaultUsername, _defaultPassword);
- }
- catch (QpidException e)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("PRoblem when creating connection", e);
- }
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- }
-
- /**
- * Creates a connection with the specified user identity.
- * Connection.start method
- * is explicitly called.
- *
- * @param username the caller's user name
- * @param password the caller's password
- * @return A newly created connection.
- * @throws JMSException If creating the connection fails due to some internal error.
- * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
- */
- public Connection createConnection(String username, String password) throws JMSException
- {
- try
- {
- return new ConnectionImpl(_host, _port, _virtualHost, username, password);
- }
- catch (QpidException e)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("PRoblem when creating connection", e);
- }
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- }
-
- // ----------------------------------------
- // Support for JMS 1.0 classes
- // ----------------------------------------
- //--- Interface QueueConnection
- /**
- * Creates a queueConnection with the default user identity.
- * Connection.start method
- * is explicitly called.
- *
- * @return A newly created queueConnection
- * @throws JMSException If creating the queueConnection fails due to some internal error.
- * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
- */
- public QueueConnection createQueueConnection() throws JMSException
- {
- try
- {
- return new QueueConnectionImpl(_host, _port, _virtualHost, _defaultUsername, _defaultPassword);
- }
- catch (QpidException e)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("PRoblem when creating connection", e);
- }
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- }
-
- /**
- * Creates a queueConnection with the specified user identity.
- * Connection.start method
- * is explicitly called.
- *
- * @param username the caller's user name
- * @param password the caller's password
- * @return A newly created queueConnection.
- * @throws JMSException If creating the queueConnection fails due to some internal error.
- * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
- */
- public QueueConnection createQueueConnection(String username, String password) throws JMSException
- {
- try
- {
- return new QueueConnectionImpl(_host, _port, _virtualHost, username, password);
- }
- catch (QpidException e)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("PRoblem when creating connection", e);
- }
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- }
-
- //--- Interface TopicConnection
- /**
- * Creates a topicConnection with the default user identity.
- * Connection.start method
- * is explicitly called.
- *
- * @return A newly created topicConnection
- * @throws JMSException If creating the topicConnection fails due to some internal error.
- * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
- */
- public TopicConnection createTopicConnection() throws JMSException
- {
- try
- {
- return new TopicConnectionImpl(_host, _port, _virtualHost, _defaultUsername, _defaultPassword);
- }
- catch (QpidException e)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("PRoblem when creating connection", e);
- }
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- }
-
- /**
- * Creates a topicConnection with the specified user identity.
- * Connection.start method
- * is explicitly called.
- *
- * @param username the caller's user name
- * @param password the caller's password
- * @return A newly created topicConnection.
- * @throws JMSException If creating the topicConnection fails due to some internal error.
- * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
- */
- public TopicConnection createTopicConnection(String username, String password) throws JMSException
- {
- try
- {
- return new TopicConnectionImpl(_host, _port, _virtualHost, username, password);
- }
- catch (QpidException e)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("PRoblem when creating connection", e);
- }
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- }
-
- // ---------------------------------------------------------------------------------------------------
- // the following methods are provided for XA compatibility
- // ---------------------------------------------------------------------------------------------------
-
- /**
- * Creates a XAConnection with the default user identity.
- * Connection.start method
- * is explicitly called.
- *
- * @return A newly created XAConnection
- * @throws JMSException If creating the XAConnection fails due to some internal error.
- * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
- */
- public XAConnection createXAConnection() throws JMSException
- {
- try
- {
- return new XAConnectionImpl(_host, _port, _virtualHost, _defaultUsername, _defaultPassword);
- }
- catch (QpidException e)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("PRoblem when creating connection", e);
- }
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- }
-
- /**
- * Creates a XAConnection with the specified user identity.
- * Connection.start method
- * is explicitly called.
- *
- * @param username the caller's user name
- * @param password the caller's password
- * @return A newly created XAConnection.
- * @throws JMSException If creating the XAConnection fails due to some internal error.
- * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
- */
- public XAConnection createXAConnection(String username, String password) throws JMSException
- {
- try
- {
- return new XAConnectionImpl(_host, _port, _virtualHost, username, password);
- }
- catch (QpidException e)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("PRoblem when creating connection", e);
- }
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- }
-
-
- /**
- * Creates a XATopicConnection with the default user identity.
- * Connection.start method
- * is explicitly called.
- *
- * @return A newly created XATopicConnection
- * @throws JMSException If creating the XATopicConnection fails due to some internal error.
- * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
- */
- public XATopicConnection createXATopicConnection() throws JMSException
- {
- try
- {
- return new XATopicConnectionImpl(_host, _port, _virtualHost, _defaultUsername, _defaultPassword);
- }
- catch (QpidException e)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("PRoblem when creating connection", e);
- }
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- }
-
- /**
- * Creates a XATopicConnection with the specified user identity.
- * Connection.start method
- * is explicitly called.
- *
- * @param username the caller's user name
- * @param password the caller's password
- * @return A newly created XATopicConnection.
- * @throws JMSException If creating the XATopicConnection fails due to some internal error.
- * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
- */
- public XATopicConnection createXATopicConnection(String username, String password) throws JMSException
- {
- try
- {
- return new XATopicConnectionImpl(_host, _port, _virtualHost, username, password);
- }
- catch (QpidException e)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("PRoblem when creating connection", e);
- }
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- }
-
- /**
- * Creates a XAQueueConnection with the default user identity.
- * Connection.start method
- * is explicitly called.
- *
- * @return A newly created XAQueueConnection
- * @throws JMSException If creating the XAQueueConnection fails due to some internal error.
- * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
- */
- public XAQueueConnection createXAQueueConnection() throws JMSException
- {
- try
- {
- return new XAQueueConnectionImpl(_host, _port, _virtualHost, _defaultUsername, _defaultPassword);
- }
- catch (QpidException e)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("PRoblem when creating connection", e);
- }
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- }
-
- /**
- * Creates a XAQueueConnection with the specified user identity.
- * Connection.start method
- * is explicitly called.
- *
- * @param username the caller's user name
- * @param password the caller's password
- * @return A newly created XAQueueConnection.
- * @throws JMSException If creating the XAQueueConnection fails due to some internal error.
- * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
- */
- public XAQueueConnection createXAQueueConnection(String username, String password) throws JMSException
- {
- try
- {
- return new XAQueueConnectionImpl(_host, _port, _virtualHost, username, password);
- }
- catch (QpidException e)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("PRoblem when creating connection", e);
- }
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- }
-
- // ----------------------------------------
- // Support for JNDI
- // ----------------------------------------
-
- /**
- * Creates an object using the location or reference information
- * specified.
- *
- * @param obj The possibly null object containing location or reference
- * information that can be used in creating an object.
- * @param name The name of this object relative to nameCtx,
- * or null if no name is specified.
- * @param nameCtx The context relative to which the name
- * parameter is specified, or null if name is
- * relative to the default initial context.
- * @param environment The possibly null environment that is used in
- * creating the object.
- * @return The object created; null if an object cannot be created.
- * @throws Exception if this object factory encountered an exception
- * while attempting to create an object, and no other object factories are
- * to be tried.
- */
- public Object getObjectInstance(Object obj, Name name, Context nameCtx, Hashtable environment) throws Exception
- {
- if (obj instanceof Reference)
- {
- Reference ref = (Reference) obj;
-
- if (ref.getClassName().equals(QueueImpl.class.getName()))
- {
- RefAddr addr = ref.get(QueueImpl.class.getName());
-
- if (addr != null)
- {
- return new QueueImpl(new BindingURLImpl((String) addr.getContent()));
- }
- }
-
- if (ref.getClassName().equals(TopicImpl.class.getName()))
- {
- RefAddr addr = ref.get(TopicImpl.class.getName());
-
- if (addr != null)
- {
- return new TopicImpl(new BindingURLImpl((String) addr.getContent()));
- }
- }
-
- if (ref.getClassName().equals(DestinationImpl.class.getName()))
- {
- RefAddr addr = ref.get(DestinationImpl.class.getName());
-
- if (addr != null)
- {
- return new DestinationImpl(new BindingURLImpl((String) addr.getContent()));
- }
- }
-
- if (ref.getClassName().equals(ConnectionFactoryImpl.class.getName()))
- {
- RefAddr addr = ref.get(ConnectionFactoryImpl.class.getName());
- if (addr != null)
- {
- return new ConnectionFactoryImpl(new QpidURLImpl((String) addr.getContent()));
- }
- }
-
- }
- return null;
- }
-
- //-- interface Reference
- /**
- * Retrieves the Reference of this object.
- *
- * @return The non-null Reference of this object.
- * @throws NamingException If a naming exception was encountered while retrieving the reference.
- */
- public Reference getReference() throws NamingException
- {
- return new Reference(ConnectionFactoryImpl.class.getName(),
- new StringRefAddr(ConnectionFactoryImpl.class.getName(), _qpidURL.getURL()),
- ConnectionFactoryImpl.class.getName(), null);
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java
deleted file mode 100644
index e1ad6eb991..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java
+++ /dev/null
@@ -1,503 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.njms;
-
-import java.util.Vector;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionConsumer;
-import javax.jms.ConnectionMetaData;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.QueueSession;
-import javax.jms.ServerSessionPool;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicSession;
-
-import org.apache.qpidity.QpidException;
-import org.apache.qpidity.url.QpidURL;
-import org.apache.qpidity.nclient.Client;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Implements javax.njms.Connection, javax.njms.QueueConnection and javax.njms.TopicConnection
- */
-public class ConnectionImpl implements Connection
-{
- /**
- * This class's logger
- */
- private static final Logger _logger = LoggerFactory.getLogger(ConnectionImpl.class);
-
- /**
- * Maps from session id (Integer) to SessionImpl instance
- */
- protected final VectorSession.AUTO_ACKNOWLEDGE,
- * Session.CLIENT_ACKNOWLEDGE, and Session.DUPS_OK_ACKNOWLEDGE.
- * @return A newly created session
- * @throws JMSException If the Connection object fails to create a session due to some internal error.
- */
- public synchronized Session createSession(boolean transacted, int acknowledgeMode) throws JMSException
- {
- checkNotClosed();
- SessionImpl session;
- try
- {
- session = new SessionImpl(this, transacted, acknowledgeMode, false);
- }
- catch (QpidException e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- // add this session with the list of session that are handled by this connection
- _sessions.add(session);
- return session;
- }
-
- /**
- * Gets the client identifier for this connection.
- * setClientID method.
- * ConnectionFactory
- *
- * @return The unique client identifier.
- * @throws JMSException If this connection is closed.
- */
- public String getClientID() throws JMSException
- {
- checkNotClosed();
- return _clientID;
- }
-
- /**
- * Sets the client identifier for this connection.
- * ConnectionFactory
- * object and transparently assigned to the Connection object
- * it creates.
- * ConnectionFactory
- *
- * @param clientID the unique client identifier
- * @throws JMSException Always as clientID is always set at construction time.
- */
- public void setClientID(String clientID) throws JMSException
- {
- checkNotClosed();
- throw new IllegalStateException("Client name cannot be changed after being set");
- }
-
- /**
- * Gets the metadata for this connection.
- *
- * @return The connection metadata
- * @throws JMSException If there ie a problem getting the connection metadata for this connection.
- * @see javax.jms.ConnectionMetaData
- */
- public ConnectionMetaData getMetaData() throws JMSException
- {
- checkNotClosed();
- return ConnectionMetaDataImpl.getInstance();
- }
-
- /**
- * Gets the ExceptionListener object for this connection.
- *
- * @return the ExceptionListener for this connection
- * @throws JMSException In case of unforeseen problem
- */
- public synchronized ExceptionListener getExceptionListener() throws JMSException
- {
- checkNotClosed();
- return _exceptionListener;
- }
-
- /**
- * Sets an exception listener for this connection.
- * ExceptionListener, if one has been
- * registered. It does this by calling the listener's
- * onException method, passing it a JMSException
- * object describing the problem.
- * ExceptionListener.
- * start
- * method. When the connection is stopped, delivery to all the connection's message consumers is inhibited:
- * synchronous receives block, and messages are not delivered to message listeners.
- * Session.AUTO_ACKNOWLEDGE,
- * Session.CLIENT_ACKNOWLEDGE and Session.DUPS_OK_ACKNOWLEDGE.
- * @return A queueSession object/
- * @throws JMSException If creating a QueueSession fails due to some internal error.
- */
- public synchronized QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException
- {
- checkNotClosed();
- QueueSessionImpl queueSession;
- try
- {
- queueSession = new QueueSessionImpl(this, transacted, acknowledgeMode);
- }
- catch (QpidException e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- // add this session to the list of handled sessions.
- _sessions.add(queueSession);
- return queueSession;
- }
-
- /**
- * Creates a connection consumer for this connection (optional operation).
- * This is an expert facility for App server integration.
- *
- * @param queue The queue to access.
- * @param messageSelector Only messages with properties matching the message selector expression are delivered.
- * @param sessionPool The session pool to associate with this connection consumer.
- * @param maxMessages The maximum number of messages that can be assigned to a server session at one time.
- * @return Null for the moment.
- * @throws JMSException In case of a problem due to some internal error.
- */
- public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector,
- ServerSessionPool sessionPool, int maxMessages)
- throws JMSException
- {
- return createConnectionConsumer((Destination) queue, messageSelector, sessionPool, maxMessages);
- }
-
- //-------------- TopicConnection API
- /**
- * Create a TopicSession.
- *
- * @param transacted Indicates whether the session is transacted
- * @param acknowledgeMode Legal values are Session.AUTO_ACKNOWLEDGE, Session.CLIENT_ACKNOWLEDGE, and
- * Session.DUPS_OK_ACKNOWLEDGE.
- * @return a newly created topic session
- * @throws JMSException If creating the session fails due to some internal error.
- */
- public synchronized TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException
- {
- checkNotClosed();
- TopicSessionImpl session;
- try
- {
- session = new TopicSessionImpl(this, transacted, acknowledgeMode);
- }
- catch (QpidException e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- // add the session with this Connection's sessions
- // important for when the Connection is closed.
- _sessions.add(session);
- return session;
- }
-
- /**
- * Creates a connection consumer for this connection (optional operation).
- * This is an expert facility for App server integration.
- *
- * @param topic The topic to access.
- * @param messageSelector Only messages with properties matching the message selector expression are delivered.
- * @param sessionPool The session pool to associate with this connection consumer.
- * @param maxMessages The maximum number of messages that can be assigned to a server session at one time.
- * @return Null for the moment.
- * @throws JMSException In case of a problem due to some internal error.
- */
- public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector,
- ServerSessionPool sessionPool, int maxMessages)
- throws JMSException
- {
- return createConnectionConsumer((Destination) topic, messageSelector, sessionPool, maxMessages);
- }
-
- //-------------- protected and private methods
- /**
- * Validate that the Connection is not closed.
- * Connection.
- */
-public class ConnectionMetaDataImpl implements ConnectionMetaData
-{
-
- /**
- * A singleton instance.
- */
- static ConnectionMetaDataImpl _singleton = new ConnectionMetaDataImpl();
-
- // ------------------------ The metadata
- // JMS major version
- private static final int JMS_MAJOR_VERSION = 1;
- // JMS minor version
- private static final int JMS_MINOR_VERSION = 1;
- // JMS version
- private static final String JMS_VERSION = "1.1";
- // Provider name
- private static final String PROVIDER_NAME = "Apache " + QpidProperties.getProductName();
- // Provider major version
- private static final int PROVIDER_MAJOR_VERSION = 0;
- // Provider minor version
- private static final int PROVIDER_MINOR_VERSION = 10;
- // Provider version
- private static final String PROVIDER_VERSION = QpidProperties.getProductName() + " (Client: [" + QpidProperties.getBuildVersion() + "] ; Protocol: [ 0.10 ] )";
-
- /**
- * Prevent instantiation.
- */
- private ConnectionMetaDataImpl()
- {
- }
-
- /**
- * Get the singleton instance of ConnectionMetaDataImpl.
- *
- * @return the singleton instance of ConnectionMetaDataImpl.
- */
- public static ConnectionMetaDataImpl getInstance()
- {
- return _singleton;
- }
-
- //-- Connection MetaData API
-
- /**
- * Gets the JMS API version.
- *
- * @return the JMS API version
- * @throws JMSException Never
- */
- public String getJMSVersion() throws JMSException
- {
- return JMS_VERSION;
- }
-
-
- /**
- * Gets the JMS major version number.
- *
- * @return the JMS API major version number
- * @throws JMSException Never
- */
- public int getJMSMajorVersion() throws JMSException
- {
- return JMS_MAJOR_VERSION;
- }
-
-
- /**
- * Gets the JMS minor version number.
- *
- * @return the JMS API minor version number
- * @throws JMSException Never
- */
- public int getJMSMinorVersion() throws JMSException
- {
- return JMS_MINOR_VERSION;
- }
-
-
- /**
- * Gets Qpid name.
- *
- * @return Qpid name
- * @throws JMSException Never
- */
- public String getJMSProviderName() throws JMSException
- {
- return PROVIDER_NAME;
- }
-
- /**
- * Gets Qpid version.
- *
- * @return Qpid version
- * @throws JMSException Never
- */
- public String getProviderVersion() throws JMSException
- {
- return PROVIDER_VERSION;
- // TODO: We certainly can dynamically get the server version.
- }
-
- /**
- * Gets Qpid major version number.
- *
- * @return Qpid major version number
- * @throws JMSException Never
- */
- public int getProviderMajorVersion() throws JMSException
- {
- return PROVIDER_MAJOR_VERSION;
- }
-
- /**
- * Gets Qpid minor version number.
- *
- * @return Qpid minor version number
- * @throws JMSException Never
- */
- public int getProviderMinorVersion() throws JMSException
- {
- return PROVIDER_MINOR_VERSION;
- }
-
- /**
- * Gets an enumeration of the JMSX property names.
- *
- * @return an Enumeration of JMSX property names
- * @throws JMSException if cannot retrieve metadata due to some internal error.
- */
- public Enumeration getJMSXPropertyNames() throws JMSException
- {
- return CustomJMSXProperty.asEnumeration();
- }
-
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/CustomJMSXProperty.java b/java/client/src/main/java/org/apache/qpidity/jms/CustomJMSXProperty.java
deleted file mode 100644
index c2a3af1c9b..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/jms/CustomJMSXProperty.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.njms;
-
-import java.util.Enumeration;
-import java.util.ArrayList;
-import java.util.Collections;
-
-public enum CustomJMSXProperty
-{
- JMS_AMQP_NULL,
- JMS_QPID_DESTTYPE,
- JMSXGroupID,
- JMSXGroupSeq;
-
- private static Enumeration _names;
-
- public static synchronized Enumeration asEnumeration()
- {
- if (_names == null)
- {
- CustomJMSXProperty[] properties = values();
- ArrayListMessagePartListener.
- *
- * @return The listener for the MessageConsumer, or null if no listener is set
- * @throws JMSException if getting the message listener fails due to some internal error.
- */
- public MessageListener getMessageListener() throws JMSException
- {
- checkNotClosed();
- return _messageListener;
- }
-
- /**
- * Sets the MessageConsumer's MessagePartListener.
- * MessageConsumer.setMessageListener
- * while messages are being consumed by an existing listener
- * or the consumer is being used to consume messages synchronously
- * is undefined.
- *
- * @param messageListener The listener to which the messages are to be delivered
- * @throws JMSException If setting the message listener fails due to some internal error.
- */
- public synchronized void setMessageListener(MessageListener messageListener) throws JMSException
- {
- // this method is synchronized as onMessage also access _messagelistener
- // onMessage, getMessageListener and this method are the only synchronized methods
- checkNotClosed();
- try
- {
- _messageListener = messageListener;
- if (messageListener != null)
- {
- resetAsynchMessageReceived();
- }
- }
- catch (Exception e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- }
-
- /**
- * Contact the broker and ask for the delivery of MAX_MESSAGE_TRANSFERRED messages
- *
- * @throws QpidException If there is a communication error
- */
- private void resetAsynchMessageReceived() throws QpidException
- {
- if (!_isStopped && _messageAsyncrhonouslyReceived >= MAX_MESSAGE_TRANSFERRED)
- {
- getSession().getQpidSession().messageStop(getMessageActorID());
- }
- _messageAsyncrhonouslyReceived = 0;
- requestCredit(MAX_MESSAGE_TRANSFERRED);
- }
-
- /**
- * Receive the next message produced for this message consumer.
- * QueueSender's default delivery mode, priority,
- * and time to live.
- *
- * @param message The message to send.
- * @throws JMSException if sending the message fails due to some internal error.
- * @throws javax.jms.MessageFormatException
- * If an invalid message is specified.
- * @throws javax.jms.InvalidDestinationException
- * If the queue is invalid.
- * @throws java.lang.UnsupportedOperationException
- * If invoked on QueueSender that did not specify a queue at creation time.
- */
- public void send(Message message) throws JMSException
- {
- super.send(message);
- }
-
- /**
- * Send a message to the queue, specifying delivery mode, priority, and time to live.
- *
- * @param message The message to send
- * @param deliveryMode The delivery mode to use
- * @param priority The priority for this message
- * @param timeToLive The message's lifetime (in milliseconds)
- *
- * @throws JMSException if sending the message fails due to some internal error.
- * @throws javax.jms.MessageFormatException
- * If an invalid message is specified.
- * @throws javax.jms.InvalidDestinationException
- * If the queue is invalid.
- * @throws java.lang.UnsupportedOperationException
- * If invoked on QueueSender that did not specify a queue at creation time.
- */
- public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
- {
- super.send(message, deliveryMode, priority, timeToLive);
- }
-
- /**
- * Send a message to a queue for an unidentified message producer.
- * Uses the QueueSender's default delivery mode, priority,
- * and time to live.
- *
- * @param queue The queue to send this message to
- * @param message The message to send
- * @throws JMSException if sending the message fails due to some internal error.
- * @throws javax.jms.MessageFormatException
- * If an invalid message is specified.
- * @throws javax.jms.InvalidDestinationException
- * If the queue is invalid.
- */
- public void send(Queue queue, Message message) throws JMSException
- {
- super.send(queue, message);
- }
-
- /**
- * Sends a message to a queue for an unidentified message producer,
- * specifying delivery mode, priority and time to live.
- *
- * @param queue The queue to send this message to
- * @param message The message to send
- * @param deliveryMode The delivery mode to use
- * @param priority The priority for this message
- * @param timeToLive The message's lifetime (in milliseconds)
- * @throws JMSException if sending the message fails due to some internal error.
- * @throws javax.jms.MessageFormatException
- * If an invalid message is specified.
- * @throws javax.jms.InvalidDestinationException
- * If the queue is invalid.
- */
- public void send(Queue queue, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
- {
- super.send(queue, message, deliveryMode, priority, timeToLive);
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java
deleted file mode 100644
index 9ae0725f00..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.njms;
-
-import javax.jms.*;
-import javax.jms.IllegalStateException;
-
-import org.apache.qpidity.QpidException;
-
-/**
- * Implementation of javax.njms.QueueSession
- */
-public class QueueSessionImpl extends SessionImpl implements QueueSession
-{
- //--- constructor
- /**
- * Create a JMS Session
- *
- * @param connection The ConnectionImpl object from which the Session is created.
- * @param transacted Indicates if the session transacted.
- * @param acknowledgeMode The session's acknowledgement mode. This value is ignored and set to
- * {@link javax.jms.Session#SESSION_TRANSACTED} if the transacted
- * parameter is true.
- * @throws javax.jms.JMSSecurityException If the user could not be authenticated.
- * @throws javax.jms.JMSException In case of internal error.
- */
- protected QueueSessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode) throws QpidException, JMSException
- {
- super(connection, transacted, acknowledgeMode,false);
- }
-
- //-- Overwritten methods
- /**
- * Creates a durable subscriber to the specified topic,
- *
- * @param topic The non-temporary Topic to subscribe to.
- * @param name The name used to identify this subscription.
- * @return Always throws an exception
- * @throws IllegalStateException Always
- */
- @Override
- public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
- {
- throw new IllegalStateException("Cannot invoke createDurableSubscriber from QueueSession");
- }
-
- /**
- * Create a TemporaryTopic.
- *
- * @return Always throws an exception
- * @throws IllegalStateException Always
- */
- @Override
- public TemporaryTopic createTemporaryTopic() throws JMSException
- {
- throw new IllegalStateException("Cannot invoke createTemporaryTopic from QueueSession");
- }
-
- /**
- * Creates a topic identity given a Topicname.
- *
- * @param topicName The name of this Topic
- * @return Always throws an exception
- * @throws IllegalStateException Always
- */
- @Override
- public Topic createTopic(String topicName) throws JMSException
- {
- throw new IllegalStateException("Cannot invoke createTopic from QueueSession");
- }
-
- /**
- * Unsubscribes a durable subscription that has been created by a client.
- *
- * @param name the name used to identify this subscription
- * @throws IllegalStateException Always
- */
- @Override
- public void unsubscribe(String name) throws JMSException
- {
- throw new IllegalStateException("Cannot invoke unsubscribe from QueueSession");
- }
-
- //--- Interface javax.njms.QueueSession
- /**
- * Create a QueueReceiver to receive messages from the specified queue.
- *
- * @param queue the Queue to access
- * @return A QueueReceiver
- * @throws JMSException If creating a receiver fails due to some internal error.
- * @throws InvalidDestinationException If an invalid queue is specified.
- */
- public QueueReceiver createReceiver(Queue queue) throws JMSException
- {
- return createReceiver(queue, null);
- }
-
- /**
- * Create a QueueReceiver to receive messages from the specified queue for a given message selector.
- *
- * @param queue the Queue to access
- * @param messageSelector A value of null or an empty string indicates that
- * there is no message selector for the message consumer.
- * @return A QueueReceiver
- * @throws JMSException If creating a receiver fails due to some internal error.
- * @throws InvalidDestinationException If an invalid queue is specified.
- * @throws InvalidSelectorException If the message selector is invalid.
- */
- public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
- {
- checkNotClosed();
- checkDestination(queue);
- QueueReceiver receiver;
- try
- {
- receiver = new QueueReceiverImpl(this, queue, messageSelector,String.valueOf(_consumerTag.incrementAndGet()));
- }
- catch (Exception e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- return receiver;
- }
-
- /**
- * Create a QueueSender object to send messages to the specified queue.
- *
- * @param queue the Queue to access, or null if this is an unidentified producer
- * @return A QueueSender
- * @throws JMSException If creating the sender fails due to some internal error.
- * @throws InvalidDestinationException If an invalid queue is specified.
- */
- public QueueSender createSender(Queue queue) throws JMSException
- {
- checkNotClosed();
- // we do not check the destination since unidentified producers are allowed (no default destination).
- return new QueueSenderImpl(this, (QueueImpl) queue);
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
deleted file mode 100644
index a7c1d00b72..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
+++ /dev/null
@@ -1,1324 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.njms;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.qpidity.njms.message.*;
-import org.apache.qpidity.QpidException;
-import org.apache.qpidity.transport.RangeSet;
-
-import javax.jms.*;
-import javax.jms.IllegalStateException;
-import java.io.Serializable;
-import java.util.LinkedList;
-import java.util.HashMap;
-import java.util.ArrayList;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Implementation of the JMS Session interface
- */
-public class SessionImpl implements Session
-{
- /**
- * this session's logger
- */
- private static final Logger _logger = LoggerFactory.getLogger(SessionImpl.class);
-
- /**
- * A queue for incoming asynch messages.
- */
- private final LinkedListtransacted parameter is true.
- * @param isXA Indicates whether this session is an XA session.
- * @throws QpidException In case of internal error.
- */
- protected SessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode, boolean isXA)
- throws QpidException
- {
- _connection = connection;
- _transacted = transacted;
- // for transacted sessions we ignore the acknowledgeMode and use GenericAckMode.SESSION_TRANSACTED
- if (_transacted)
- {
- acknowledgeMode = Session.SESSION_TRANSACTED;
- }
- _acknowledgeMode = acknowledgeMode;
-
- // create the qpid session with an expiry <= 0 so that the session does not expire
- _qpidSession = _connection.getQpidConnection().createSession(0);
- // set the exception listnere for this session
- _qpidSession.setExceptionListener(new QpidSessionExceptionListener());
- // set transacted if required
- if (_transacted && !isXA)
- {
- _qpidSession.txSelect();
- }
- testQpidException();
- // init the message dispatcher.
- initMessageDispatcherThread();
- }
-
- //--- javax.njms.Session API
- /**
- * Creates a BytesMessage object used to send a message
- * containing a stream of uninterpreted bytes.
- *
- * @return A BytesMessage.
- * @throws JMSException If Creating a BytesMessage object fails due to some internal error.
- */
- public BytesMessage createBytesMessage() throws JMSException
- {
- checkNotClosed();
- return new BytesMessageImpl();
- }
-
- /**
- * Creates a MapMessage object used to send a self-defining set
- * of name-value pairs, where names are Strings and values are primitive values.
- *
- * @return A MapMessage.
- * @throws JMSException If Creating a MapMessage object fails due to some internal error.
- */
- public MapMessage createMapMessage() throws JMSException
- {
- checkNotClosed();
- return new MapMessageImpl();
- }
-
- /**
- * Creates a Message object that holds all the standard message header information.
- * It can be sent when a message containing only header information is sufficient.
- * We simply return a ByteMessage
- *
- * @return A Message.
- * @throws JMSException If Creating a Message object fails due to some internal error.
- */
- public Message createMessage() throws JMSException
- {
- return new MessageImpl();
- }
-
- /**
- * Creates an ObjectMessage used to send a message
- * that contains a serializable Java object.
- *
- * @return An ObjectMessage.
- * @throws JMSException If Creating an ObjectMessage object fails due to some internal error.
- */
- public ObjectMessage createObjectMessage() throws JMSException
- {
- checkNotClosed();
- return new ObjectMessageImpl();
- }
-
- /**
- * Creates an initialized ObjectMessage used to send a message that contains
- * a serializable Java object.
- *
- * @param serializable The object to use to initialize this message.
- * @return An initialised ObjectMessage.
- * @throws JMSException If Creating an initialised ObjectMessage object fails due to some internal error.
- */
- public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException
- {
- ObjectMessage msg = createObjectMessage();
- msg.setObject(serializable);
- return msg;
- }
-
- /**
- * Creates a StreamMessage used to send a
- * self-defining stream of primitive values in the Java programming
- * language.
- *
- * @return A StreamMessage
- * @throws JMSException If Creating an StreamMessage object fails due to some internal error.
- */
- public StreamMessage createStreamMessage() throws JMSException
- {
- checkNotClosed();
- return new StreamMessageImpl();
- }
-
- /**
- * Creates a TextMessage object used to send a message containing a String.
- *
- * @return A TextMessage object
- * @throws JMSException If Creating an TextMessage object fails due to some internal error.
- */
- public TextMessage createTextMessage() throws JMSException
- {
- checkNotClosed();
- return new TextMessageImpl();
- }
-
- /**
- * Creates an initialized TextMessage used to send
- * a message containing a String.
- *
- * @param text The string used to initialize this message.
- * @return An initialized TextMessage
- * @throws JMSException If Creating an initialised TextMessage object fails due to some internal error.
- */
- public TextMessage createTextMessage(String text) throws JMSException
- {
- TextMessage msg = createTextMessage();
- msg.setText(text);
- return msg;
- }
-
- /**
- * Indicates whether the session is in transacted mode.
- *
- * @return true if the session is in transacted mode
- * @throws JMSException If geting the transaction mode fails due to some internal error.
- */
- public boolean getTransacted() throws JMSException
- {
- checkNotClosed();
- return _transacted;
- }
-
- /**
- * Returns the acknowledgement mode of this session.
- * receive call or message
- * listener in progress has completed. A blocked message consumer
- * receive call returns null when this session is closed.
- * Session method that can be called concurrently.
- * Session method on a closed session
- * must throw a javax.njms.IllegalStateException.
- *
- *
- *
- * @throws JMSException If the JMS provider fails to stop and restart message delivery due to some internal error.
- * Not that this does not necessarily mean that the recovery has failed, but simply that it is
- * not possible to tell if it has or not.
- */
- public void recover() throws JMSException
- {
- // Ensure that the session is open.
- checkNotClosed();
- // we are recovering
- _inRecovery = true;
- // Ensure that the session is not transacted.
- if (getTransacted())
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Trying to recover a transacted Session, throwing IllegalStateException");
- }
- throw new IllegalStateException("Session is transacted");
- }
- // release all unack messages
- RangeSet ranges = new RangeSet();
- for (QpidMessage message : _unacknowledgedMessages)
- {
- // release this message
- ranges.add(message.getMessageTransferId());
- }
- getQpidSession().messageRelease(ranges);
- }
-
- /**
- * Returns the session's distinguished message listener (optional).
- * Destination to access
- * @return A new MessageConsumer for the specified destination.
- * @throws JMSException If the session fails to create a MessageConsumer due to some internal error.
- * @throws InvalidDestinationException If an invalid destination is specified.
- */
- public MessageConsumer createConsumer(Destination destination) throws JMSException
- {
- return createConsumer(destination, null);
- }
-
- /**
- * Creates a MessageConsumer for the specified destination, using a message selector.
- *
- * @param destination The Destination to access
- * @param messageSelector Only messages with properties matching the message selector expression are delivered.
- * @return A new MessageConsumer for the specified destination.
- * @throws JMSException If the session fails to create a MessageConsumer due to some internal error.
- * @throws InvalidDestinationException If an invalid destination is specified.
- * @throws InvalidSelectorException If the message selector is invalid.
- */
- public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
- {
- return createConsumer(destination, messageSelector, false);
- }
-
- /**
- * Creates MessageConsumer for the specified destination, using a message selector.
- * Destination to access
- * @param messageSelector Only messages with properties matching the message selector expression are delivered.
- * @param noLocal If true, and the destination is a topic, inhibits the delivery of messages published
- * by its own connection.
- * @return A new MessageConsumer for the specified destination.
- * @throws JMSException If the session fails to create a MessageConsumer due to some internal error.
- * @throws InvalidDestinationException If an invalid destination is specified.
- * @throws InvalidSelectorException If the message selector is invalid.
- */
- public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
- throws JMSException
- {
- checkNotClosed();
- checkDestination(destination);
- MessageConsumerImpl consumer;
- try
- {
- consumer = new MessageConsumerImpl(this, (DestinationImpl) destination, messageSelector, noLocal, null,
- String.valueOf(_consumerTag.incrementAndGet()));
- }
- catch (Exception e)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Problem when creating consumer.", e);
- }
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- // register this actor with the session
- _messageActors.put(consumer.getMessageActorID(), consumer);
- return consumer;
- }
-
- /**
- * Creates a queue identity by a given name.
- * createTemporaryQueue method.
- *
- * @param queueName the name of this Queue
- * @return a Queue with the given name
- * @throws JMSException If the session fails to create a queue due to some internal error.
- */
- public Queue createQueue(String queueName) throws JMSException
- {
- checkNotClosed();
- Queue result;
- try
- {
- result = new QueueImpl(this, queueName);
- }
- catch (QpidException e)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Problem when creating Queue.", e);
- }
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- return result;
- }
-
- /**
- * Creates a topic identity given a Topicname.
- * createTemporaryTopic method.
- *
- * @param topicName The name of this Topic
- * @return a Topic with the given name
- * @throws JMSException If the session fails to create a topic due to some internal error.
- */
- public Topic createTopic(String topicName) throws JMSException
- {
- checkNotClosed();
- Topic result;
- try
- {
- result = new TopicImpl(this, topicName);
- }
- catch (QpidException e)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Problem when creating Topic.", e);
- }
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- return result;
- }
-
- /**
- * Creates a durable subscriber to the specified topic,
- *
- * @param topic The non-temporary Topic to subscribe to.
- * @param name The name used to identify this subscription.
- * @return A durable subscriber to the specified topic,
- * @throws JMSException If creating a subscriber fails due to some internal error.
- * @throws InvalidDestinationException If an invalid topic is specified.
- * @throws InvalidSelectorException If the message selector is invalid.
- */
- public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
- {
- // by default, use a null messageselector and set noLocal to falsen
- return createDurableSubscriber(topic, name, null, false);
- }
-
- /**
- * Creates a durable subscriber to the specified topic, using a message selector and specifying whether messages
- * published by its
- * own connection should be delivered to it.
- * TopicSubscriber with
- * the same name and a new topic and/or message selector. Changing a durable subscriber is equivalent to
- * unsubscribing (deleting) the old one and creating a new one.
- *
- * @param topic The non-temporary Topic to subscribe to.
- * @param name The name used to identify this subscription.
- * @param messageSelector Only messages with properties matching the message selector expression are delivered.
- * @param noLocal If set, inhibits the delivery of messages published by its own connection
- * @return A durable subscriber to the specified topic,
- * @throws JMSException If creating a subscriber fails due to some internal error.
- * @throws InvalidDestinationException If an invalid topic is specified.
- * @throws InvalidSelectorException If the message selector is invalid.
- */
- public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
- throws JMSException
- {
- checkNotClosed();
- checkDestination(topic);
- TopicSubscriberImpl subscriber;
- try
- {
- subscriber = new TopicSubscriberImpl(this, topic, messageSelector, noLocal,
- _connection.getClientID() + ":" + name,
- String.valueOf(_consumerTag.incrementAndGet()));
- }
- catch (Exception e)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Problem when creating Durable Subscriber.", e);
- }
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- _messageActors.put(subscriber.getMessageActorID(), subscriber);
- return subscriber;
- }
-
- /**
- * Create a QueueBrowser to peek at the messages on the specified queue.
- *
- * @param queue The Queue to browse.
- * @return A QueueBrowser.
- * @throws JMSException If creating a browser fails due to some internal error.
- * @throws InvalidDestinationException If an invalid queue is specified.
- */
- public QueueBrowser createBrowser(Queue queue) throws JMSException
- {
- return createBrowser(queue, null);
- }
-
- /**
- * Create a QueueBrowser to peek at the messages on the specified queue using a message selector.
- *
- * @param queue The Queue to browse.
- * @param messageSelector Only messages with properties matching the message selector expression are delivered.
- * @return A QueueBrowser.
- * @throws JMSException If creating a browser fails due to some internal error.
- * @throws InvalidDestinationException If an invalid queue is specified.
- * @throws InvalidSelectorException If the message selector is invalid.
- */
- public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
- {
- checkNotClosed();
- checkDestination(queue);
- QueueBrowserImpl browser;
- try
- {
- browser =
- new QueueBrowserImpl(this, queue, messageSelector, String.valueOf(_consumerTag.incrementAndGet()));
- }
- catch (Exception e)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Problem when creating Durable Browser.", e);
- }
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- // register this actor with the session
- _messageActors.put(browser.getMessageActorID(), browser);
- return browser;
- }
-
- /**
- * Create a TemporaryQueue. Its lifetime will be the Connection unless it is deleted earlier.
- *
- * @return A temporary queue.
- * @throws JMSException If creating the temporary queue fails due to some internal error.
- */
- public TemporaryQueue createTemporaryQueue() throws JMSException
- {
- TemporaryQueue result;
- try
- {
- result = new TemporaryQueueImpl(this);
- }
- catch (QpidException e)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Problem when creating Durable Temporary Queue.", e);
- }
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- return result;
- }
-
- /**
- * Create a TemporaryTopic. Its lifetime will be the Connection unless it is deleted earlier.
- *
- * @return A temporary topic.
- * @throws JMSException If creating the temporary topic fails due to some internal error.
- */
- public TemporaryTopic createTemporaryTopic() throws JMSException
- {
- TemporaryTopic result;
- try
- {
- result = new TemporaryTopicImpl(this);
- }
- catch (QpidException e)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Problem when creating Durable Temporary Topic.", e);
- }
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- return result;
- }
-
- /**
- * Unsubscribes a durable subscription that has been created by a client.
- * TopicSubscriber for the
- * subscription, or while a consumed message is part of a pending
- * transaction or has not been acknowledged in the session.
- *
- * @param name the name used to identify this subscription
- * @throws JMSException if the session fails to unsubscribe to the durable subscription due to some internal error.
- * @throws InvalidDestinationException if an invalid subscription name
- * is specified.
- */
- public void unsubscribe(String name) throws JMSException
- {
- checkNotClosed();
- }
-
- /**
- * Get the latest thrown exception.
- *
- * @return The latest thrown exception.
- */
- public synchronized QpidException getCurrentException()
- {
- QpidException result = _currentException;
- _currentException = null;
- return result;
- }
- //----- Protected methods
-
- /**
- * Remove a message actor form this session
- * transacted parameter
- * is true.
- * @throws javax.jms.JMSSecurityException If the user could not be authenticated.
- * @throws javax.jms.JMSException In case of internal error.
- */
- protected TopicSessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode) throws QpidException, JMSException
- {
- super(connection, transacted, acknowledgeMode,false);
- }
-
- //-- Overwritten methods
- /**
- * Create a QueueBrowser.
- *
- * @param queue The Queue to browse.
- * @param messageSelector Only messages with properties matching the message selector expression are delivered.
- * @return Always throws an exception
- * @throws IllegalStateException Always
- */
- @Override
- public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
- {
- throw new IllegalStateException("Cannot invoke createBrowser from TopicSession");
- }
-
- /**
- * Create a QueueBrowser.
- *
- * @param queue The Queue to browse.
- * @return Always throws an exception
- * @throws IllegalStateException Always
- */
- @Override
- public QueueBrowser createBrowser(Queue queue) throws JMSException
- {
- throw new IllegalStateException("Cannot invoke createBrowser from TopicSession");
- }
-
- /**
- * Creates a temporary queue.
- *
- * @return Always throws an exception
- * @throws IllegalStateException Always
- */
- @Override
- public TemporaryQueue createTemporaryQueue() throws JMSException
- {
- throw new IllegalStateException("Cannot invoke createTemporaryQueue from TopicSession");
- }
-
- /**
- * Creates a queue identity by a given name.
- *
- * @param queueName the name of this Queue
- * @return Always throws an exception
- * @throws IllegalStateException Always
- */
- @Override
- public Queue createQueue(String queueName) throws JMSException
- {
- throw new IllegalStateException("Cannot invoke createQueue from TopicSession");
- }
-
- //--- Interface TopicSession
- /**
- * Create a publisher for the specified topic.
- *
- * @param topic the Topic to publish to, or null if this is an unidentified publisher.
- * @throws JMSException If the creating a publisher fails due to some internal error.
- * @throws InvalidDestinationException If an invalid topic is specified.
- */
- public TopicPublisher createPublisher(Topic topic) throws JMSException
- {
-
- checkNotClosed();
- // we do not check the destination topic here, since unidentified publishers are allowed.
- return new TopicPublisherImpl(this, topic);
- }
-
- /**
- * Creates a nondurable subscriber to the specified topic.
- *
- * @param topic The Topic to subscribe to
- * @throws JMSException If creating a subscriber fails due to some internal error.
- * @throws InvalidDestinationException If an invalid topic is specified.
- */
- public TopicSubscriber createSubscriber(Topic topic) throws JMSException
- {
- return createSubscriber(topic, null, false);
- }
-
- /**
- * Creates a nondurable subscriber to the specified topic, using a
- * message selector or specifying whether messages published by its
- * own connection should be delivered to it.
- *
- * @param topic The Topic to subscribe to
- * @param messageSelector A value of null or an empty string indicates that there is no message selector.
- * @param noLocal If true then inhibits the delivery of messages published by this subscriber's connection.
- * @throws JMSException If creating a subscriber fails due to some internal error.
- * @throws InvalidDestinationException If an invalid topic is specified.
- * @throws InvalidSelectorException If the message selector is invalid.
- */
- public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
- {
- checkNotClosed();
- checkDestination(topic);
- TopicSubscriber topicSubscriber;
- try
- {
- topicSubscriber = new TopicSubscriberImpl(this, topic, messageSelector, noLocal, null,String.valueOf(_consumerTag.incrementAndGet()));
- }
- catch (Exception e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- return topicSubscriber;
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TopicSubscriberImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TopicSubscriberImpl.java
deleted file mode 100644
index 73d03db055..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/jms/TopicSubscriberImpl.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.njms;
-
-import javax.jms.TopicSubscriber;
-import javax.jms.Topic;
-import javax.jms.JMSException;
-
-/**
- * Implementation of the JMS TopicSubscriber interface.
- */
-public class TopicSubscriberImpl extends MessageConsumerImpl implements TopicSubscriber
-{
- //--- Constructor
- /**
- * Create a new TopicSubscriberImpl.
- *
- * @param session The session of this topic subscriber.
- * @param topic The default topic for this TopicSubscriberImpl
- * @param messageSelector The MessageSelector
- * @param noLocal If true inhibits the delivery of messages published by its own connection.
- * @param subscriptionName Name of the subscription if this is to be created as a durable subscriber.
- * If this value is null, a non-durable subscription is created.
- * @throws Exception If the TopicSubscriberImpl cannot be created due to internal error.
- */
- protected TopicSubscriberImpl(SessionImpl session, Topic topic, String messageSelector, boolean noLocal,
- String subscriptionName,String consumerTag) throws Exception
- {
- super(session, (DestinationImpl) topic, messageSelector, noLocal, subscriptionName,consumerTag);
- }
-
- //--- javax.njms.TopicSubscriber interface
- /**
- * Get the Topic associated with this subscriber.
- *
- * @return This subscriber's Topic
- * @throws JMSException if getting the topic for this topicSubscriber fails due to some internal error.
- */
- public Topic getTopic() throws JMSException
- {
- checkNotClosed();
- return (TopicImpl) _destination;
- }
-
-
- /**
- * Get NoLocal for this subscriber.
- *
- * @return True if locally published messages are being inhibited, false otherwise
- * @throws JMSException If getting NoLocal for this topic subscriber fails due to some internal error.
- */
- public boolean getNoLocal() throws JMSException
- {
- checkNotClosed();
- return _noLocal;
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java
deleted file mode 100644
index 3d181db1f6..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.njms;
-
-import org.apache.qpidity.QpidException;
-
-import javax.jms.XAConnection;
-import javax.jms.JMSException;
-import javax.jms.XASession;
-
-/**
- * This class implements the javax.njms.XAConnection interface
- */
-public class XAConnectionImpl extends ConnectionImpl implements XAConnection
-{
- //-- constructor
- /**
- * Create a XAConnection.
- *
- * @param host The broker host name.
- * @param port The port on which the broker is listening for connection.
- * @param virtualHost The virtual host on which the broker is deployed.
- * @param username The user name used of user identification.
- * @param password The password name used of user identification.
- * @throws QpidException If creating a connection fails due to some internal error.
- */
- protected XAConnectionImpl(String host, int port, String virtualHost, String username, String password) throws QpidException
- {
- super(host, port, virtualHost, username, password);
- }
-
- //-- interface XAConnection
- /**
- * Creates an XASession.
- *
- * @return A newly created XASession.
- * @throws JMSException If the XAConnectiono fails to create an XASession due to
- * some internal error.
- */
- public synchronized XASession createXASession() throws JMSException
- {
- checkNotClosed();
- XASessionImpl xasession;
- try
- {
- xasession = new XASessionImpl(this);
- }
- catch (QpidException e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- // add this session with the list of session that are handled by this connection
- _sessions.add(xasession);
- return xasession;
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/XAQueueConnectionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/XAQueueConnectionImpl.java
deleted file mode 100644
index b500208036..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/jms/XAQueueConnectionImpl.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.njms;
-
-import org.apache.qpidity.QpidException;
-
-import javax.jms.XAQueueConnection;
-import javax.jms.JMSException;
-import javax.jms.XAQueueSession;
-
-/**
- * Implements XAQueueConnection
- */
-public class XAQueueConnectionImpl extends XAConnectionImpl implements XAQueueConnection
-{
- //-- constructor
- /**
- * Create a XAQueueConnection.
- *
- * @param host The broker host name.
- * @param port The port on which the broker is listening for connection.
- * @param virtualHost The virtual host on which the broker is deployed.
- * @param username The user name used of user identification.
- * @param password The password name used of user identification.
- * @throws QpidException If creating a XAQueueConnection fails due to some internal error.
- */
- public XAQueueConnectionImpl(String host, int port, String virtualHost, String username, String password)
- throws QpidException
- {
- super(host, port, virtualHost, username, password);
- }
-
- //-- Interface XAQueueConnection
- /**
- * Creates an XAQueueSession.
- *
- * @return A newly created XASession.
- * @throws JMSException If the XAQueueConnectionImpl fails to create an XASession due to
- * some internal error.
- */
- public synchronized XAQueueSession createXAQueueSession() throws JMSException
- {
- checkNotClosed();
- XAQueueSessionImpl xaQueueSession;
- try
- {
- xaQueueSession = new XAQueueSessionImpl(this);
- }
- catch (QpidException e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- // add this session with the list of session that are handled by this connection
- _sessions.add(xaQueueSession);
- return xaQueueSession;
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/XAQueueSessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/XAQueueSessionImpl.java
deleted file mode 100644
index 3df8a0928e..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/jms/XAQueueSessionImpl.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.njms;
-
-import org.apache.qpidity.QpidException;
-
-import javax.jms.QueueSession;
-import javax.jms.JMSException;
-import javax.jms.XAQueueSession;
-
-/**
- * Imeplements javax.njms.XAQueueSessionImpl
- */
-public class XAQueueSessionImpl extends XASessionImpl implements XAQueueSession
-{
- /**
- * The standard session
- */
- private QueueSession _jmsQueueSession;
-
- //-- Constructors
- /**
- * Create a JMS XAQueueSessionImpl
- *
- * @param connection The ConnectionImpl object from which the Session is created.
- * @throws org.apache.qpidity.QpidException
- * In case of internal error.
- */
- protected XAQueueSessionImpl(ConnectionImpl connection) throws QpidException
- {
- super(connection);
- }
-
- //--- interface XAQueueSession
- /**
- * Gets the topic session associated with this XATopicSession.
- *
- * @return the topic session object
- * @throws JMSException If an internal error occurs.
- */
- public QueueSession getQueueSession() throws JMSException
- {
- if (_jmsQueueSession == null)
- {
- _jmsQueueSession = getConnection().createQueueSession(true, getAcknowledgeMode());
- }
- return _jmsQueueSession;
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java
deleted file mode 100644
index 37921ece4d..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java
+++ /dev/null
@@ -1,507 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.njms;
-
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
-import org.apache.qpidity.QpidException;
-import org.apache.qpidity.dtx.XidImpl;
-import org.apache.qpidity.transport.*;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This is an implementation of javax.njms.XAResource.
- */
-public class XAResourceImpl implements XAResource
-{
- /**
- * this XAResourceImpl's logger
- */
- private static final Logger _logger = LoggerFactory.getLogger(XAResourceImpl.class);
-
- /**
- * Reference to the associated XASession
- */
- private XASessionImpl _xaSession = null;
-
- /**
- * The XID of this resource
- */
- private Xid _xid;
-
- //--- constructor
-
- /**
- * Create an XAResource associated with a XASession
- *
- * @param xaSession The session XAresource
- */
- protected XAResourceImpl(XASessionImpl xaSession)
- {
- _xaSession = xaSession;
- }
-
- //--- The XAResource
- /**
- * Commits the global transaction specified by xid.
- *
- * @param xid A global transaction identifier
- * @param b If true, use a one-phase commit protocol to commit the work done on behalf of xid.
- * @throws XAException An error has occurred. An error has occurred. Possible XAExceptions are XA_HEURHAZ,
- * XA_HEURCOM, XA_HEURRB, XA_HEURMIX, XAER_RMERR, XAER_RMFAIL, XAER_NOTA, XAER_INVAL, or XAER_PROTO.
- */
- public void commit(Xid xid, boolean b) throws XAException
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("commit ", xid);
- }
- if (xid == null)
- {
- throw new XAException(XAException.XAER_PROTO);
- }
- Future
- *
true if it's the same RM instance; otherwise false.
- * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL.
- */
- public boolean isSameRM(XAResource xaResource) throws XAException
- {
- // TODO : get the server identity of xaResource and compare it with our own one
- return false;
- }
-
- /**
- * Prepare for a transaction commit of the transaction specified in Xid.
- *
- * @param xid A global transaction identifier.
- * @return A value indicating the resource manager's vote on the outcome of the transaction.
- * The possible values are: XA_RDONLY or XA_OK.
- * @throws XAException An error has occurred. Possible exception values are: XAER_RMERR or XAER_NOTA
- */
- public int prepare(Xid xid) throws XAException
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("prepare ", xid);
- }
- if (xid == null)
- {
- throw new XAException(XAException.XAER_PROTO);
- }
- Future
- *
- *
- * @param xid A global transaction identifier to be associated with the resource
- * @param flag One of TMNOFLAGS, TMJOIN, or TMRESUME
- * @throws XAException An error has occurred. Possible exceptions
- * are XA_RB*, XAER_RMERR, XAER_RMFAIL, XAER_DUPID, XAER_OUTSIDE, XAER_NOTA, XAER_INVAL, or XAER_PROTO.
- */
- public void start(Xid xid, int flag) throws XAException
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("start ", xid);
- }
- if (xid == null)
- {
- throw new XAException(XAException.XAER_PROTO);
- }
- _xid = xid;
- FutureXATopicSession.
- *
- * @return the topic session object
- * @throws JMSException If an internal error occurs.
- */
- public TopicSession getTopicSession() throws JMSException
- {
- if (_jmsTopicSession == null)
- {
- _jmsTopicSession = getConnection().createTopicSession(true, getAcknowledgeMode());
- }
- return _jmsTopicSession;
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
deleted file mode 100644
index 8a5f910d03..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
+++ /dev/null
@@ -1,863 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.njms.message;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.qpidity.QpidException;
-
-import javax.jms.*;
-import java.io.*;
-import java.nio.ByteBuffer;
-
-/**
- * Implements javax.njms.BytesMessage
- */
-public class BytesMessageImpl extends MessageImpl implements BytesMessage
-{
- /**
- * this BytesMessageImpl's logger
- */
- private static final Logger _logger = LoggerFactory.getLogger(BytesMessageImpl.class);
-
- /**
- * An input stream for reading this message data
- * This stream wrappes the received byteBuffer.
- */
- protected DataInputStream _dataIn = null;
-
- /**
- * Used to store written data.
- */
- protected ByteArrayOutputStream _storedData = new ByteArrayOutputStream();
-
- /**
- * DataOutputStream used to write the data
- */
- protected DataOutputStream _dataOut = new DataOutputStream(_storedData);
-
- //--- Constructor
- /**
- * Constructor used by SessionImpl.
- */
- public BytesMessageImpl()
- {
- super();
- setMessageType(String.valueOf(MessageFactory.JAVAX_JMS_BYTESMESSAGE));
- }
-
- /**
- * Constructor used by MessageFactory
- *
- * @param message The new qpid message.
- * @throws QpidException In case of problem when receiving the message body.
- */
- protected BytesMessageImpl(org.apache.qpidity.api.Message message) throws QpidException
- {
- super(message);
- }
-
- //--- BytesMessage API
- /**
- * Gets the number of bytes of the message body when the message
- * is in read-only mode.
- * bytes is less than the number of
- * bytes remaining to be read from the stream, the array should
- * be filled. A subsequent call reads the next increment, and so on.
- * bytes, the bytes should be read into the array.
- * The return value of the total number of bytes read will be less than
- * the length of the array, indicating that there are no more bytes left
- * to be read from the stream. The next read of the stream returns -1.
- *
- * @param b The array into which the data is read.
- * @return The total number of bytes read into the buffer, or -1 if
- * there is no more data because the end of the stream has been reached
- * @throws JMSException If reading a byte array fails due to some error.
- * @throws javax.jms.MessageNotReadableException
- * If the message is in write-only mode.
- */
- public int readBytes(byte[] b) throws JMSException
- {
- isReadable();
- try
- {
- return _dataIn.read(b);
- }
- catch (IOException ioe)
- {
- throw new JMSException("Problem when reading data", ioe.getLocalizedMessage());
- }
- }
-
- /**
- * Reads a portion of the bytes message data.
- * b is less than the number of
- * bytes remaining to be read from the stream, the array should
- * be filled. A subsequent call reads the next increment, and so on.
- * b, the bytes should be read into the array.
- * The return value of the total number of bytes read will be less than
- * the length of the array, indicating that there are no more bytes left
- * to be read from the stream. The next read of the stream returns -1.
- * length is negative, or
- * length is greater than the length of the array
- * b, then an IndexOutOfBoundsException is
- * thrown. No bytes will be read from the stream for this exception case.
- *
- * @param b The buffer into which the data is read
- * @param length The number of bytes to read; must be less than or equal to length.
- * @return The total number of bytes read into the buffer, or -1 if
- * there is no more data because the end of the data has been reached
- * @throws JMSException If reading a byte array fails due to some error.
- * @throws javax.jms.MessageNotReadableException
- * If the message is in write-only mode.
- */
- public int readBytes(byte[] b, int length) throws JMSException
- {
- isReadable();
- try
- {
- return _dataIn.read(b, 0, length);
- }
- catch (IOException ioe)
- {
- throw new JMSException("Problem when reading data", ioe.getLocalizedMessage());
- }
- }
-
- /**
- * Writes a boolean to the bytes message.
- *
- * @param val The boolean value to be written
- * @throws JMSException If writting a boolean fails due to some error.
- * @throws javax.jms.MessageNotWriteableException
- * If the message is in read-only mode.
- */
- public void writeBoolean(boolean val) throws JMSException
- {
- isWriteable();
- try
- {
- _dataOut.writeBoolean(val);
- }
- catch (IOException e)
- {
- throw new JMSException("IO problem when writting " + e.getLocalizedMessage());
- }
- }
-
- /**
- * Writes a byte to the bytes message.
- *
- * @param val The byte value to be written
- * @throws JMSException If writting a byte fails due to some error.
- * @throws javax.jms.MessageNotWriteableException
- * If the message is in read-only mode.
- */
- public void writeByte(byte val) throws JMSException
- {
- isWriteable();
- try
- {
- _dataOut.writeByte(val);
- }
- catch (IOException e)
- {
- throw new JMSException("IO problem when writting " + e.getLocalizedMessage());
- }
- }
-
- /**
- * Writes a short to the bytes message.
- *
- * @param val The short value to be written
- * @throws JMSException If writting a short fails due to some error.
- * @throws javax.jms.MessageNotWriteableException
- * If the message is in read-only mode.
- */
- public void writeShort(short val) throws JMSException
- {
- isWriteable();
- try
- {
- _dataOut.writeShort(val);
- }
- catch (IOException e)
- {
- throw new JMSException("IO problem when writting " + e.getLocalizedMessage());
- }
-
- }
-
- /**
- * Writes a char to the bytes message.
- *
- * @param c The char value to be written
- * @throws JMSException If writting a char fails due to some error.
- * @throws javax.jms.MessageNotWriteableException
- * If the message is in read-only mode.
- */
- public void writeChar(char c) throws JMSException
- {
- isWriteable();
- try
- {
- _dataOut.writeChar(c);
- }
- catch (IOException e)
- {
- throw new JMSException("IO problem when writting " + e.getLocalizedMessage());
- }
- }
-
- /**
- * Writes an int to the bytes message.
- *
- * @param val The int value to be written
- * @throws JMSException If writting an int fails due to some error.
- * @throws javax.jms.MessageNotWriteableException
- * If the message is in read-only mode.
- */
- public void writeInt(int val) throws JMSException
- {
- isWriteable();
- try
- {
- _dataOut.writeInt(val);
- }
- catch (IOException e)
- {
- throw new JMSException("IO problem when writting " + e.getLocalizedMessage());
- }
-
- }
-
- /**
- * Writes a long to the bytes message.
- *
- * @param val The long value to be written
- * @throws JMSException If writting a long fails due to some error.
- * @throws javax.jms.MessageNotWriteableException
- * If the message is in read-only mode.
- */
- public void writeLong(long val) throws JMSException
- {
- isWriteable();
- try
- {
- _dataOut.writeLong(val);
- }
- catch (IOException e)
- {
- throw new JMSException("IO problem when writting " + e.getLocalizedMessage());
- }
- }
-
- /**
- * Writes a float to the bytes message.
- *
- * @param val The float value to be written
- * @throws JMSException If writting a float fails due to some error.
- * @throws javax.jms.MessageNotWriteableException
- * If the message is in read-only mode.
- */
- public void writeFloat(float val) throws JMSException
- {
- isWriteable();
- try
- {
- _dataOut.writeFloat(val);
- }
- catch (IOException e)
- {
- throw new JMSException("IO problem when writting " + e.getLocalizedMessage());
- }
- }
-
- /**
- * Writes a double to the bytes message.
- *
- * @param val The double value to be written
- * @throws JMSException If writting a double fails due to some error.
- * @throws javax.jms.MessageNotWriteableException
- * If the message is in read-only mode.
- */
- public void writeDouble(double val) throws JMSException
- {
- isWriteable();
- try
- {
- _dataOut.writeDouble(val);
- }
- catch (IOException e)
- {
- throw new JMSException("IO problem when writting " + e.getLocalizedMessage());
- }
- }
-
- /**
- * Writes a string to the bytes message.
- *
- * @param val The string value to be written
- * @throws JMSException If writting a string fails due to some error.
- * @throws javax.jms.MessageNotWriteableException
- * If the message is in read-only mode.
- */
- public void writeUTF(String val) throws JMSException
- {
- isWriteable();
- try
- {
- _dataOut.writeUTF(val);
- }
- catch (IOException e)
- {
- throw new JMSException("IO problem when writting " + e.getLocalizedMessage());
- }
-
- }
-
- /**
- * Writes a byte array to the bytes message.
- *
- * @param bytes The byte array value to be written
- * @throws JMSException If writting a byte array fails due to some error.
- * @throws javax.jms.MessageNotWriteableException
- * If the message is in read-only mode.
- */
- public void writeBytes(byte[] bytes) throws JMSException
- {
- isWriteable();
- try
- {
- _dataOut.write(bytes);
- }
- catch (IOException e)
- {
- throw new JMSException("IO problem when writting " + e.getLocalizedMessage());
- }
- }
-
- /**
- * Writes a portion of byte array to the bytes message.
- *
- * @param val The byte array value to be written
- * @throws JMSException If writting a byte array fails due to some error.
- * @throws javax.jms.MessageNotWriteableException
- * If the message is in read-only mode.
- */
- public void writeBytes(byte[] val, int offset, int length) throws JMSException
- {
- isWriteable();
- try
- {
- _dataOut.write(val, offset, length);
- }
- catch (IOException e)
- {
- throw new JMSException("IO problem when writting " + e.getLocalizedMessage());
- }
- }
-
- /**
- * Writes an Object to the bytes message.
- * JMS spec says:
- * short value with the specified name.
- *
- * @param key The key name.
- * @return The short value with the specified key.
- * @throws JMSException If reading the message fails due to some internal error.
- * @throws javax.jms.MessageFormatException
- * If this type conversion is invalid.
- */
- public short getShort(String key) throws JMSException
- {
- Object objValue = _map.get(key);
- if (objValue == null)
- {
- throw new NumberFormatException("Wrong type for key: " + key);
- }
- return MessageHelper.convertToShort(objValue);
- }
-
- /**
- * Returns the Unicode character value with the specified name.
- *
- * @param key The key name.
- * @return The Unicode charactervalue with the specified key.
- * @throws JMSException If reading the message fails due to some internal error.
- * @throws javax.jms.MessageFormatException
- * If this type conversion is invalid.
- */
- public char getChar(String key) throws JMSException
- {
- Object objValue = _map.get(key);
- if (objValue == null)
- {
- throw new java.lang.NullPointerException();
- }
- return MessageHelper.convertToChar(objValue);
- }
-
- /**
- * Returns the intvalue with the specified name.
- *
- * @param key The key name.
- * @return The int value with the specified key.
- * @throws JMSException If reading the message fails due to some internal error.
- * @throws javax.jms.MessageFormatException
- * If this type conversion is invalid.
- */
- public int getInt(String key) throws JMSException
- {
- Object objValue = _map.get(key);
- if (objValue == null)
- {
- throw new NumberFormatException("Wrong type for key: " + key);
- }
- return MessageHelper.convertToInt(objValue);
- }
-
- /**
- * Returns the longvalue with the specified name.
- *
- * @param key The key name.
- * @return The long value with the specified key.
- * @throws JMSException If reading the message fails due to some internal error.
- * @throws javax.jms.MessageFormatException
- * If this type conversion is invalid.
- */
- public long getLong(String key) throws JMSException
- {
- Object objValue = _map.get(key);
- if (objValue == null)
- {
- throw new NumberFormatException("Wrong type for key: " + key);
- }
- return MessageHelper.convertToLong(objValue);
- }
-
- /**
- * Returns the float value with the specified name.
- *
- * @param key The key name.
- * @return The float value with the specified key.
- * @throws JMSException If reading the message fails due to some internal error.
- * @throws javax.jms.MessageFormatException
- * If this type conversion is invalid.
- */
- public float getFloat(String key) throws JMSException
- {
- Object objValue = _map.get(key);
- if (objValue == null)
- {
- throw new NumberFormatException("Wrong type for key: " + key);
- }
- return MessageHelper.convertToFloat(objValue);
- }
-
- /**
- * Returns the double value with the specified name.
- *
- * @param key The key name.
- * @return The double value with the specified key.
- * @throws JMSException If reading the message fails due to some internal error.
- * @throws javax.jms.MessageFormatException
- * If this type conversion is invalid.
- */
- public double getDouble(String key) throws JMSException
- {
- Object objValue = _map.get(key);
- if (objValue == null)
- {
- throw new NumberFormatException("Wrong type for key: " + key);
- }
- return MessageHelper.convertToDouble(objValue);
- }
-
- /**
- * Returns the String value with the specified name.
- *
- * @param key The key name.
- * @return The String value with the specified key.
- * @throws JMSException If reading the message fails due to some internal error.
- * @throws javax.jms.MessageFormatException
- * If this type conversion is invalid.
- */
- public String getString(String key) throws JMSException
- {
- String result = null;
- Object objValue = _map.get(key);
- if (objValue != null)
- {
- if (objValue instanceof byte[])
- {
- throw new NumberFormatException("Wrong type for key: " + key);
- }
- else
- {
- result = objValue.toString();
- }
- }
- return result;
- }
-
- /**
- * Returns the byte array value with the specified name.
- *
- * @param key The key name.
- * @return The byte value with the specified key.
- * @throws JMSException If reading the message fails due to some internal error.
- * @throws javax.jms.MessageFormatException
- * If this type conversion is invalid.
- */
- public byte[] getBytes(String key) throws JMSException
- {
- Object objValue = _map.get(key);
- if (objValue == null)
- {
- return null;
- }
- if (objValue instanceof byte[])
- {
- byte[] value = (byte[]) objValue;
- byte[] toReturn = new byte[value.length];
- System.arraycopy(value, 0, toReturn, 0, value.length);
- return toReturn;
- }
- throw new MessageFormatException("Wrong type for key: " + key);
- }
-
- /**
- * Returns the value of the object with the specified name.
- *
- * @param key The key name.
- * @return The byte value with the specified key.
- * @throws JMSException If reading the message fails due to some internal error.
- */
- public Object getObject(String key) throws JMSException
- {
- try
- {
- Object objValue = _map.get(key);
- if (objValue == null)
- {
- return null;
- }
- else if (objValue instanceof byte[])
- {
- byte[] value = (byte[]) objValue;
- byte[] toReturn = new byte[value.length];
- System.arraycopy(value, 0, toReturn, 0, value.length);
- return toReturn;
- }
- else
- {
- return objValue;
- }
- }
- catch (java.lang.ClassCastException cce)
- {
- throw new MessageFormatException("Wrong type for key: " + key);
- }
- }
-
- /**
- * Returns an Enumeration of all the keys
- *
- * @return an enumeration of all the keys in this MapMessage
- * @throws JMSException If reading the message fails due to some internal error.
- */
- public Enumeration getMapNames() throws JMSException
- {
- Vectorname will
- * not be altered by future modifications
- * @throws JMSException If writting the message fails due to some internal error.
- * @throws IllegalArgumentException If the key is nul or an empty string.
- * @throws javax.jms.MessageNotWriteableException
- * If the message is in read-only mode.
- */
- public void setBytes(String key, byte[] value) throws JMSException, NullPointerException
- {
- isWriteable();
- checkNotNullKey(key);
- byte[] newBytes = new byte[value.length];
- System.arraycopy(value, 0, newBytes, 0, value.length);
- _map.put(key, value);
- }
-
- /**
- * Sets a portion of the byte array value with the specified name into the
- * Map.
- *
- * @param key the name of the byte array
- * @param value the byte array value to set in the Map; the array
- * is copied so that the value for name will
- * not be altered by future modifications
- * @throws JMSException If writting the message fails due to some internal error.
- * @throws IllegalArgumentException If the key is nul or an empty string.
- * @throws javax.jms.MessageNotWriteableException
- * If the message is in read-only mode.
- */
- public void setBytes(String key, byte[] value, int offset, int length) throws JMSException, IllegalArgumentException
- {
- isWriteable();
- checkNotNullKey(key);
- byte[] newBytes = new byte[length];
- System.arraycopy(value, offset, newBytes, 0, length);
- _map.put(key, newBytes);
- }
-
- /**
- * Sets an object value with the specified name into the Map.
- *
- * @param key the name of the byte array
- * @param value the byte array value to set in the Map; the array
- * is copied so that the value for name will
- * not be altered by future modifications
- * @throws JMSException If writting the message fails due to some internal error.
- * @throws IllegalArgumentException If the key is nul or an empty string.
- * @throws javax.jms.MessageNotWriteableException
- * If the message is in read-only mode.
- */
- public void setObject(String key, Object value) throws JMSException, IllegalArgumentException
- {
- isWriteable();
- checkNotNullKey(key);
- if ((value instanceof Boolean) || (value instanceof Byte) || (value instanceof Short) || (value instanceof Integer) || (value instanceof Long) || (value instanceof Character) || (value instanceof Float) || (value instanceof Double) || (value instanceof String) || (value instanceof byte[]) || (value == null))
- {
- _map.put(key, value);
- }
- else
- {
- throw new MessageFormatException("Cannot set property " + key + " to value " + value + "of type " + value
- .getClass().getName() + ".");
- }
- }
-
- //-- Overwritten methods
- /**
- * This method is invoked before this message is dispatched.
- */
- @Override
- public void beforeMessageDispatch() throws QpidException
- {
- try
- {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(baos);
- oos.writeObject(_map);
- byte[] bytes = baos.toByteArray();
- setMessageData(ByteBuffer.wrap(bytes));
- }
- catch (java.io.IOException ioe)
- {
- throw new QpidException("problem when dispatching message", null, ioe);
- }
- super.beforeMessageDispatch();
- }
-
-
- /**
- * This method is invoked after this message has been received.
- */
- @Override
- public void afterMessageReceive() throws QpidException
- {
- super.afterMessageReceive();
- ByteBuffer messageData = getMessageData();
- if (messageData != null)
- {
- try
- {
- ObjectInputStream ois = new ObjectInputStream(asInputStream());
- _map = (MapIllegalArgumentException if the supplied parameter is null.
- *
- * @param key The key to check.
- * @throws IllegalArgumentException If the key is null.
- */
- private void checkNotNullKey(String key) throws IllegalArgumentException
- {
- if (key == null || key.equals(""))
- {
- throw new IllegalArgumentException("Key cannot be null");
- }
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/MessageFactory.java b/java/client/src/main/java/org/apache/qpidity/jms/message/MessageFactory.java
deleted file mode 100644
index fa7ebe7d1b..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/jms/message/MessageFactory.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.njms.message;
-
-import org.apache.qpidity.QpidException;
-
-/**
- * A factory for JMS messages
- */
-public class MessageFactory
-{
- /**
- * JMS Message hierarchy.
- */
- public static final byte JAVAX_JMS_MESSAGE = 1;
- public static final byte JAVAX_JMS_TEXTMESSAGE = 2;
- public static final byte JAVAX_JMS_STREAMMESSAGE = 3;
- public static final byte JAVAX_JMS_BYTESMESSAGE = 4;
- public static final byte JAVAX_JMS_OBJECTMESSAGE = 5;
- public static final byte JAVAX_JMS_MAPMESSAGE = 6;
-
- /**
- * Create a QpidMessage subclass according to the JMS message type.
- *
- * @param message The received qpidity messsage
- * @return The newly craeted JMS message
- * @throws QpidException If an appropriate Message class cannot be created.
- */
- public static QpidMessage getQpidMessage(org.apache.qpidity.api.Message message) throws QpidException
- {
- QpidMessage result = null;
- byte type = Byte.valueOf(message.getMessageProperties().getType());
- switch (type)
- {
- case JAVAX_JMS_MESSAGE:
- result = new MessageImpl(message);
- break;
- case JAVAX_JMS_TEXTMESSAGE:
- result = new TextMessageImpl(message);
- break;
- case JAVAX_JMS_STREAMMESSAGE:
- result = new StreamMessageImpl(message);
- break;
- case JAVAX_JMS_BYTESMESSAGE:
- result = new BytesMessageImpl(message);
- break;
- case JAVAX_JMS_OBJECTMESSAGE:
- result = new ObjectMessageImpl(message);
- break;
- case JAVAX_JMS_MAPMESSAGE:
- result = new MapMessageImpl(message);
- break;
- default:
- throw new QpidException(
- "Message type identifier is not mapped " + "to a Message class in the current factory: " + type,
- null, null);
- }
- return result;
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/MessageHelper.java b/java/client/src/main/java/org/apache/qpidity/jms/message/MessageHelper.java
deleted file mode 100644
index 0cbe188f6e..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/jms/message/MessageHelper.java
+++ /dev/null
@@ -1,456 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.njms.message;
-
-import javax.jms.*;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.util.Enumeration;
-
-/**
- * This is an helper class for performing data convertion
- */
-public class MessageHelper
-{
- /**
- * Convert an object into a boolean value
- *
- * @param obj object that may contain boolean value
- * @return A boolean value.
- * @throws MessageFormatException If this type conversion is invalid.
- */
- public static boolean convertToBoolean(Object obj) throws JMSException
- {
- boolean result;
- if (obj instanceof Boolean)
- {
- result = (Boolean) obj;
- }
- else if (obj instanceof String)
- {
- result = ((String) obj).equalsIgnoreCase("true");
- }
- else
- {
- throw new MessageFormatException("boolean property type convertion error",
- "Messasge property type convertion error");
- }
- return result;
- }
-
- /**
- * Convert an object into a byte value
- *
- * @param obj The object that may contain byte value
- * @return The convertToed byte value.
- * @throws MessageFormatException If this type conversion is invalid.
- */
- public static byte convertToByte(Object obj) throws JMSException
- {
- byte result;
- if (obj instanceof Byte)
- {
- result = ((Number) obj).byteValue();
- }
- else if (obj instanceof String)
- {
- result = Byte.parseByte((String) obj);
- }
- else
- {
- throw new MessageFormatException("byte property type convertion error",
- "Messasge property type convertion error");
- }
- return result;
- }
-
- /**
- * Convert an object into a short value
- *
- * @param obj The object that may contain short value
- * @return The convertToed short value.
- * @throws MessageFormatException If this type conversion is invalid.
- */
- public static short convertToShort(Object obj) throws JMSException
- {
- short result;
- if ((obj instanceof Short) || (obj instanceof Byte))
- {
- result = ((Number) obj).shortValue();
- }
- else if (obj instanceof String)
- {
- result = Short.parseShort((String) obj);
- }
- else
- {
- throw new MessageFormatException("short property type convertion error",
- "Messasge property type convertion error");
- }
- return result;
- }
-
- /**
- * Convert an object into a int value
- *
- * @param obj The object that may contain int value
- * @return The convertToed int value.
- * @throws MessageFormatException If this type conversion is invalid.
- */
- public static int convertToInt(Object obj) throws JMSException
- {
- int result;
- if ((obj instanceof Integer) || (obj instanceof Byte) || (obj instanceof Short))
- {
- result = ((Number) obj).intValue();
- }
- else if (obj instanceof String)
- {
- result = Integer.parseInt((String) obj);
- }
- else
- {
- throw new MessageFormatException("int property type convertion error",
- "Messasge property type convertion error");
- }
- return result;
- }
-
- /**
- * Convert an object into a long value
- *
- * @param obj The object that may contain long value
- * @return The convertToed long value.
- * @throws MessageFormatException If this type conversion is invalid.
- */
- public static long convertToLong(Object obj) throws JMSException
- {
- long result;
- if ((obj instanceof Number) && !((obj instanceof Float) || (obj instanceof Double)))
- {
- result = ((Number) obj).longValue();
- }
- else if (obj instanceof String)
- {
-
- result = Long.parseLong((String) obj);
- }
- else
- {
- throw new MessageFormatException("long property type convertion error",
- "Messasge property type convertion error");
- }
- return result;
- }
-
- /**
- * Convert an object into a float value
- *
- * @param obj The object that may contain float value
- * @return The convertToed float value.
- * @throws MessageFormatException If this type conversion is invalid.
- */
- public static float convertToFloat(Object obj) throws JMSException
- {
- float result;
- if (obj instanceof Float)
- {
- result = ((Number) obj).floatValue();
- }
- else if (obj instanceof String)
- {
- result = Float.parseFloat((String) obj);
- }
- else
- {
- throw new MessageFormatException("float property type convertion error",
- "Messasge property type convertion error");
- }
- return result;
- }
-
- /**
- * Convert an object into a double value
- *
- * @param obj The object that may contain double value
- * @return The convertToed double value.
- * @throws MessageFormatException If this type conversion is invalid.
- */
- public static double convertToDouble(Object obj) throws JMSException
- {
- double result;
- if ((obj instanceof Double) || (obj instanceof Float))
- {
- result = ((Number) obj).doubleValue();
- }
- else if (obj instanceof String)
- {
- result = Double.parseDouble((String) obj);
- }
- else
- {
- throw new MessageFormatException("double property type convertion error",
- "Messasge property type convertion error");
- }
- return result;
- }
-
- /**
- * Convert an object into a char value
- *
- * @param obj The object that may contain char value
- * @return The convertToed char value.
- * @throws MessageFormatException If this type conversion is invalid.
- */
- public static char convertToChar(Object obj) throws JMSException
- {
- char result;
- if (obj instanceof Character)
- {
- result = (Character) obj;
- }
- else
- {
- throw new MessageFormatException("char property type convertion error",
- "Messasge property type convertion error");
- }
- return result;
- }
-
- /**
- * Convert an object into a String value
- *
- * @param obj The object that may contain String value
- * @return The convertToed String value.
- */
- public static String convertToString(Object obj)
- {
- String stringValue;
- if (obj instanceof String)
- {
- stringValue = (String) obj;
- }
- else
- {
- stringValue = obj.toString();
- }
- return stringValue;
- }
-
- /**
- * Check if the passed object represents Java primitive type
- *
- * @param value object for inspection
- * @return true if object represent Java primitive type; false otherwise
- */
- public static boolean isPrimitive(Object value)
- {
- // Innocent till proven guilty
- boolean isPrimitive = true;
- if (!((value instanceof String) || (value instanceof Boolean) || (value instanceof Character) || ((value instanceof Number) && !((value instanceof BigDecimal) || (value instanceof BigInteger)))))
- {
- isPrimitive = false;
- }
- return isPrimitive;
- }
-
- /**
- * Transform a foreign message into an equivalent QPID representation.
- *
- * @param message The foreign message to be converted.
- * @return A native message.
- * @throws JMSException In case of problem when converting the message.
- */
- public static MessageImpl transformMessage(Message message) throws JMSException
- {
- MessageImpl messageImpl;
-
- if (message instanceof BytesMessage)
- {
- messageImpl = transformBytesMessage((BytesMessage) message);
- }
- else if (message instanceof MapMessage)
- {
- messageImpl = transformMapMessage((MapMessage) message);
- }
- else if (message instanceof ObjectMessage)
- {
- messageImpl = transformObjectMessage((ObjectMessage) message);
- }
- else if (message instanceof StreamMessage)
- {
- messageImpl = transformStreamMessage((StreamMessage) message);
- }
- else if (message instanceof TextMessage)
- {
- messageImpl = transformTextMessage((TextMessage) message);
- }
- else
- {
- messageImpl = new MessageImpl();
- }
- transformHeaderAndProperties(message, messageImpl);
- return messageImpl;
- }
-
- //---- Private methods
- /**
- * Exposed JMS defined properties on converted message:
- * JMSDestination - we don't set here
- * JMSDeliveryMode - we don't set here
- * JMSExpiration - we don't set here
- * JMSPriority - we don't set here
- * JMSMessageID - we don't set here
- * JMSTimestamp - we don't set here
- * JMSCorrelationID - set
- * JMSReplyTo - set
- * JMSType - set
- * JMSRedlivered - we don't set here
- *
- * @param message The foreign message to be converted.
- * @param nativeMsg A native Qpid message.
- * @throws JMSException In case of problem when converting the message.
- */
- private static void transformHeaderAndProperties(Message message, MessageImpl nativeMsg) throws JMSException
- {
- //Set the correlation ID
- String correlationID = message.getJMSCorrelationID();
- if (correlationID != null)
- {
- nativeMsg.setJMSCorrelationID(correlationID);
- }
- //Set JMS ReplyTo
- if (message.getJMSReplyTo() != null)
- {
- nativeMsg.setJMSReplyTo(message.getJMSReplyTo());
- }
- //Set JMS type
- String jmsType = message.getJMSType();
- if (jmsType != null)
- {
- nativeMsg.setJMSType(jmsType);
- }
- // Sets all non-JMS defined properties on converted message
- Enumeration propertyNames = message.getPropertyNames();
- while (propertyNames.hasMoreElements())
- {
- String propertyName = String.valueOf(propertyNames.nextElement());
- if (!propertyName.startsWith("JMSX_"))
- {
- Object value = message.getObjectProperty(propertyName);
- nativeMsg.setObjectProperty(propertyName, value);
- }
- }
- }
-
- /**
- * Transform a BytesMessage.
- *
- * @param bytesMessage a BytesMessage to be converted.
- * @return a native BytesMessage.
- * @throws JMSException In case of problem when converting the message.
- */
- private static BytesMessageImpl transformBytesMessage(BytesMessage bytesMessage) throws JMSException
- {
- //reset the BytesMessage (makes the body read-only and repositions
- // the stream of bytes to the beginning
- bytesMessage.reset();
- BytesMessageImpl nativeMsg = new BytesMessageImpl();
- byte[] buf = new byte[1024];
- int len;
- while ((len = bytesMessage.readBytes(buf)) != -1)
- {
- nativeMsg.writeBytes(buf, 0, len);
- }
- return nativeMsg;
- }
-
- /**
- * Transform a MapMessage.
- *
- * @param mapMessage a MapMessage to be converted.
- * @return a native MapMessage.
- * @throws JMSException In case of problem when converting the message.
- */
- private static MapMessageImpl transformMapMessage(MapMessage mapMessage) throws JMSException
- {
- MapMessageImpl nativeMsg = new MapMessageImpl();
- Enumeration mapNames = mapMessage.getMapNames();
- while (mapNames.hasMoreElements())
- {
- String name = (String) mapNames.nextElement();
- nativeMsg.setObject(name, mapMessage.getObject(name));
- }
- return nativeMsg;
- }
-
- /**
- * Transform an ObjectMessage.
- *
- * @param objectMessage a ObjectMessage to be converted.
- * @return a native ObjectMessage.
- * @throws JMSException In case of problem when converting the message.
- */
- private static ObjectMessageImpl transformObjectMessage(ObjectMessage objectMessage) throws JMSException
- {
- ObjectMessageImpl nativeMsg = new ObjectMessageImpl();
- nativeMsg.setObject(objectMessage.getObject());
- return nativeMsg;
- }
-
- /**
- * Transform a StreamMessage.
- *
- * @param streamMessage a StreamMessage to be converted.
- * @return a native StreamMessage.
- * @throws JMSException In case of problem when converting the message.
- */
- private static StreamMessageImpl transformStreamMessage(StreamMessage streamMessage) throws JMSException
- {
- StreamMessageImpl nativeMsg = new StreamMessageImpl();
- try
- {
- //reset the stream message
- streamMessage.reset();
- while (true)
- {
- nativeMsg.writeObject(streamMessage.readObject());
- }
- }
- catch (MessageEOFException e)
- {
- // we're at the end so don't mind the exception
- }
- return nativeMsg;
- }
-
- /**
- * Transform a TextMessage.
- *
- * @param textMessage a TextMessage to be converted.
- * @return a native TextMessage.
- * @throws JMSException In case of problem when converting the message.
- */
- private static TextMessageImpl transformTextMessage(TextMessage textMessage) throws JMSException
- {
- TextMessageImpl nativeMsg = new TextMessageImpl();
- nativeMsg.setText(textMessage.getText());
- return nativeMsg;
- }
-
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java
deleted file mode 100644
index a2d974f9ad..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java
+++ /dev/null
@@ -1,1011 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.njms.message;
-
-import org.apache.qpidity.njms.ExceptionHelper;
-import org.apache.qpidity.njms.MessageConsumerImpl;
-import org.apache.qpidity.QpidException;
-
-import javax.jms.*;
-import java.util.Enumeration;
-import java.io.InputStream;
-
-/**
- * Implementation of javax.njms.Message
- */
-public class MessageImpl extends QpidMessage implements Message
-{
- /**
- * name used to store JMSType.
- */
- private static final String JMS_MESSAGE_TYPE = "JMSType";
-
- /**
- * The ReplyTo destination for this message
- */
- private Destination _replyTo;
-
- /**
- * The destination to which the message has been sent.
- * ObjectMessage
- * contains a snapshot of the object at the time setObject()
- * is called; subsequent modifications of the object will have no
- * effect on the ObjectMessage body.
- *
- * @param object The message's data
- * @throws JMSException If setting the object fails due to some error.
- * @throws javax.jms.MessageFormatException
- * If object serialization fails.
- * @throws javax.jms.MessageNotWriteableException
- * If the message is in read-only mode.
- */
- public void setObject(Serializable object) throws JMSException
- {
- isWriteable();
- try
- {
- // Serialize the passed in object, then de-serialize it
- // so that changes to it do not affect m_data (JAVA's way to perform a deep clone)
- ByteArrayOutputStream bOut = new ByteArrayOutputStream();
- ObjectOutputStream objOut = new ObjectOutputStream(bOut);
- objOut.writeObject(object);
- byte[] bArray = bOut.toByteArray();
- ByteArrayInputStream bIn = new ByteArrayInputStream(bArray);
- ObjectInputStream objIn = new ObjectInputStream(bIn);
- _object = (Serializable) objIn.readObject();
- objOut.close();
- objIn.close();
- }
- catch (Exception e)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Unexpected exeption when performing object deep clone", e);
- }
- throw new MessageNotWriteableException("Unexpected exeption when performing object deep clone",
- e.getMessage());
- }
- }
-
- /**
- * Gets the serializable object containing this message's data. The
- * default value is null.
- *
- * @return The serializable object containing this message's data
- * @throws JMSException If getting the object fails due to some internal error.
- */
- public Serializable getObject() throws JMSException
- {
- return _object;
- }
-
- //--- Overwritten methods
- /**
- * This method is invoked before a message dispatch operation.
- *
- * @throws org.apache.qpidity.QpidException
- * If the destination is not set
- */
- public void beforeMessageDispatch() throws QpidException
- {
- try
- {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(baos);
- oos.writeObject(_object);
- byte[] bytes = baos.toByteArray();
- setMessageData(ByteBuffer.wrap(bytes));
- }
- catch (IOException e)
- {
- throw new QpidException("Problem when setting object of object message", null, e);
- }
- super.beforeMessageDispatch();
- }
-
- /**
- * This method is invoked after this message is received.
- *
- * @throws QpidException
- */
- @Override
- public void afterMessageReceive() throws QpidException
- {
- super.afterMessageReceive();
- ByteBuffer messageData = getMessageData();
- if (messageData != null)
- {
- try
- {
- ObjectInputStream ois = new ObjectInputStream(asInputStream());
- _object = (Serializable) ois.readObject();
- }
- catch (IOException ioe)
- {
- throw new QpidException(
- "Unexpected error during rebuild of message in afterReceive() - " + "The Object stored in the message was not a Serializable object.",
- null, ioe);
- }
- catch (ClassNotFoundException clnfe)
- {
- throw new QpidException(
- "Unexpected error during rebuild of message in afterReceive() - " + "Could not find the required class in classpath.",
- null, clnfe);
- }
- }
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
deleted file mode 100644
index 2426231c15..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
+++ /dev/null
@@ -1,445 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpidity.njms.message;
-
-import java.nio.ByteBuffer;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Vector;
-import java.io.IOException;
-
-import org.apache.qpidity.ErrorCode;
-import org.apache.qpidity.QpidException;
-import org.apache.qpidity.nclient.util.ByteBufferMessage;
-import org.apache.qpidity.transport.ReplyTo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class QpidMessage
-{
- /**
- * this QpidMessage's logger
- */
- private static final Logger _logger = LoggerFactory.getLogger(QpidMessage.class);
-
- /**
- * The underlying qpidity message
- */
- private org.apache.qpidity.api.Message _qpidityMessage;
-
- /**
- * This message specific properties.
- */
- private MapwriteObject method call, or its equivalent primitive
- * writetype method.
- * readObject to read a byte field
- * value into a new byte[] object before the full value of the
- * byte field has been read will throw a
- * MessageFormatException.
- *
- * @return A Java object from the stream message, in objectified
- * format
- * @throws JMSException If reading fails due to some error.
- * @throws javax.jms.MessageEOFException If unexpected end of message data has been reached.
- * @throws javax.jms.MessageNotReadableException
- * If the message is in write-only mode.
- * @throws MessageFormatException If this type conversion is invalid.
- */
- public Object readObject() throws JMSException
- {
- isReadableAndNotReadingByteArray();
- Object result;
- try
- {
- _dataIn.mark(10);
- byte type = _dataIn.readByte();
- switch (type)
- {
- case BOOLEAN:
- result = super.readBoolean();
- break;
- case BYTE:
- result = super.readByte();
- break;
- case SHORT:
- result = super.readShort();
- break;
- case CHAR:
- result = super.readChar();
- break;
- case INT:
- result = super.readInt();
- break;
- case LONG:
- result = super.readLong();
- break;
- case FLOAT:
- result = super.readFloat();
- break;
- case DOUBLE:
- result = super.readDouble();
- break;
- case STRING:
- int len = _dataIn.readInt();
- if (len == 0)
- {
- result = null;
- }
- else
- {
- byte[] bArray = new byte[len];
- _dataIn.readFully(bArray);
- result = new String(bArray);
- }
- break;
- case BYTEARRAY:
- int totalBytes = _dataIn.readInt();
- byte[] bArray = new byte[totalBytes];
- _dataIn.read(bArray, 0, totalBytes);
- result = bArray;
- break;
- case NULL:
- result = null;
- break;
- default:
- _dataIn.reset();
- throw new MessageFormatException("Invalid Object Type");
- }
- }
- catch (EOFException eof)
- {
- throw new MessageEOFException("End of file Reached when reading message");
- }
- catch (IOException io)
- {
- throw new JMSException("IO exception when reading message");
- }
- return result;
- }
-
- /**
- * Writes a boolean to the stream message.
- *
- * @param val The boolean value to be written
- * @throws JMSException If writting a boolean fails due to some error.
- * @throws javax.jms.MessageNotWriteableException
- * If the message is in read-only mode.
- */
- public void writeBoolean(boolean val) throws JMSException
- {
- isWriteable();
- try
- {
- _dataOut.writeShort(BOOLEAN);
- super.writeBoolean(val);
- }
- catch (IOException e)
- {
- throw new JMSException("IO problem when writting " + e.getLocalizedMessage());
- }
- }
-
- /**
- * Writes a byte to the stream message.
- *
- * @param val The byte value to be written
- * @throws JMSException If writting a boolean fails due to some error.
- * @throws javax.jms.MessageNotWriteableException
- * If the message is in read-only mode.
- */
- public void writeByte(byte val) throws JMSException
- {
- isWriteable();
- try
- {
- _dataOut.writeShort(BYTE);
- super.writeByte(val);
- }
- catch (IOException e)
- {
- throw new JMSException("IO problem when writting " + e.getLocalizedMessage());
- }
- }
-
- /**
- * Writes a short to the stream message.
- *
- * @param val The short value to be written
- * @throws JMSException If writting a boolean fails due to some error.
- * @throws javax.jms.MessageNotWriteableException
- * If the message is in read-only mode.
- */
- public void writeShort(short val) throws JMSException
- {
- isWriteable();
- try
- {
- _dataOut.writeShort(SHORT);
- super.writeShort(val);
- }
- catch (IOException e)
- {
- throw new JMSException("IO problem when writting " + e.getLocalizedMessage());
- }
- }
-
- /**
- * Writes a char to the stream message.
- *
- * @param val The char value to be written
- * @throws JMSException If writting a boolean fails due to some error.
- * @throws javax.jms.MessageNotWriteableException
- * If the message is in read-only mode.
- */
- public void writeChar(char val) throws JMSException
- {
- isWriteable();
- try
- {
- _dataOut.writeShort(CHAR);
- super.writeChar(val);
- }
- catch (IOException e)
- {
- throw new JMSException("IO problem when writting " + e.getLocalizedMessage());
- }
- }
-
- /**
- * Writes a int to the stream message.
- *
- * @param val The int value to be written
- * @throws JMSException If writting a boolean fails due to some error.
- * @throws javax.jms.MessageNotWriteableException
- * If the message is in read-only mode.
- */
- public void writeInt(int val) throws JMSException
- {
- isWriteable();
- try
- {
- _dataOut.writeShort(INT);
- super.writeInt(val);
- }
- catch (IOException e)
- {
- throw new JMSException("IO problem when writting " + e.getLocalizedMessage());
- }
- }
-
- /**
- * Writes a long to the stream message.
- *
- * @param val The long value to be written
- * @throws JMSException If writting a boolean fails due to some error.
- * @throws javax.jms.MessageNotWriteableException
- * If the message is in read-only mode.
- */
- public void writeLong(long val) throws JMSException
- {
- isWriteable();
- try
- {
- _dataOut.writeShort(LONG);
- super.writeLong(val);
- }
- catch (IOException e)
- {
- throw new JMSException("IO problem when writting " + e.getLocalizedMessage());
- }
- }
-
- /**
- * Writes a float to the stream message.
- *
- * @param val The float value to be written
- * @throws JMSException If writting a boolean fails due to some error.
- * @throws javax.jms.MessageNotWriteableException
- * If the message is in read-only mode.
- */
- public void writeFloat(float val) throws JMSException
- {
- isWriteable();
- try
- {
- _dataOut.writeShort(FLOAT);
- super.writeFloat(val);
- }
- catch (IOException e)
- {
- throw new JMSException("IO problem when writting " + e.getLocalizedMessage());
- }
- }
-
- /**
- * Writes a double to the stream message.
- *
- * @param val The double value to be written
- * @throws JMSException If writting a boolean fails due to some error.
- * @throws javax.jms.MessageNotWriteableException
- * If the message is in read-only mode.
- */
- public void writeDouble(double val) throws JMSException
- {
- isWriteable();
- try
- {
- _dataOut.writeShort(DOUBLE);
- super.writeDouble(val);
- }
- catch (IOException e)
- {
- throw new JMSException("IO problem when writting " + e.getLocalizedMessage());
- }
- }
-
- /**
- * Writes a string to the stream message.
- *
- * @param val The string value to be written
- * @throws JMSException If writting a boolean fails due to some error.
- * @throws javax.jms.MessageNotWriteableException
- * If the message is in read-only mode.
- */
- public void writeString(String val) throws JMSException
- {
- isWriteable();
- try
- {
- _dataOut.writeShort(STRING);
- if (val == null)
- {
- _dataOut.writeInt(0);
- }
- else
- {
- byte[] bArray = val.getBytes();
- int len = bArray.length;
- _dataOut.writeInt(len);
- _dataOut.write(bArray);
- }
- }
- catch (IOException e)
- {
- throw new JMSException("IO problem when writting " + e.getLocalizedMessage());
- }
- }
-
- /**
- * Writes a byte array to the stream message.
- *
- * @param val The byte array value to be written
- * @throws JMSException If writting a boolean fails due to some error.
- * @throws javax.jms.MessageNotWriteableException
- * If the message is in read-only mode.
- */
- public void writeBytes(byte[] val) throws JMSException
- {
- isWriteable();
- try
- {
- _dataOut.writeShort(BYTEARRAY);
- _dataOut.writeInt(val.length);
- super.writeBytes(val);
- }
- catch (IOException e)
- {
- throw new JMSException("IO problem when writting " + e.getLocalizedMessage());
- }
- }
-
- /**
- * Writes a portion of byte array to the bytes message.
- *
- * @param val The byte array value to be written
- * @throws JMSException If writting a byte array fails due to some error.
- * @throws javax.jms.MessageNotWriteableException
- * If the message is in read-only mode.
- */
- public void writeBytes(byte[] val, int offset, int length) throws JMSException
- {
- isWriteable();
- try
- {
- _dataOut.writeShort(BYTEARRAY);
- _dataOut.writeInt(length);
- super.writeBytes(val, offset, length);
- }
- catch (IOException e)
- {
- throw new JMSException("IO problem when writting " + e.getLocalizedMessage());
- }
- }
-
- /**
- * Writes an Object to the bytes message.
- * JMS spec says:
- * NUL-terminated string from this buffer using the
- * specified decoder and returns it. This method reads
- * until the limit of this buffer if no NUL is found.
- *
- * @return
- * @throws java.nio.charset.CharacterCodingException
- *
- */
- public String getString() throws CharacterCodingException
- {
- if (!getMessageData().hasRemaining())
- {
- return "";
- }
- Charset charset = Charset.forName(CHARACTER_ENCODING);
- CharsetDecoder decoder = charset.newDecoder();
-
- boolean utf16 = decoder.charset().name().startsWith("UTF-16");
-
- int oldPos = getMessageData().position();
- int oldLimit = getMessageData().limit();
- int end = -1;
- int newPos;
-
- if (!utf16)
- {
- end = indexOf((byte) 0x00);
- if (end < 0)
- {
- newPos = end = oldLimit;
- }
- else
- {
- newPos = end + 1;
- }
- }
- else
- {
- int i = oldPos;
- for (; ;)
- {
- boolean wasZero = getMessageData().get(i) == 0;
- i++;
-
- if (i >= oldLimit)
- {
- break;
- }
-
- if (getMessageData().get(i) != 0)
- {
- i++;
- if (i >= oldLimit)
- {
- break;
- }
- else
- {
- continue;
- }
- }
-
- if (wasZero)
- {
- end = i - 1;
- break;
- }
- }
-
- if (end < 0)
- {
- newPos = end = oldPos + ((oldLimit - oldPos) & 0xFFFFFFFE);
- }
- else
- {
- if (end + 2 <= oldLimit)
- {
- newPos = end + 2;
- }
- else
- {
- newPos = end;
- }
- }
- }
-
- if (oldPos == end)
- {
- getMessageData().position(newPos);
- return "";
- }
-
- getMessageData().limit(end);
- decoder.reset();
-
- int expectedLength = (int) (getMessageData().remaining() * decoder.averageCharsPerByte()) + 1;
- CharBuffer out = CharBuffer.allocate(expectedLength);
- for (; ;)
- {
- CoderResult cr;
- if (getMessageData().hasRemaining())
- {
- cr = decoder.decode(getMessageData(), out, true);
- }
- else
- {
- cr = decoder.flush(out);
- }
-
- if (cr.isUnderflow())
- {
- break;
- }
-
- if (cr.isOverflow())
- {
- CharBuffer o = CharBuffer.allocate(out.capacity() + expectedLength);
- out.flip();
- o.put(out);
- out = o;
- continue;
- }
-
- if (cr.isError())
- {
- // Revert the buffer back to the previous state.
- getMessageData().limit(oldLimit);
- getMessageData().position(oldPos);
- cr.throwException();
- }
- }
-
- getMessageData().limit(oldLimit);
- getMessageData().position(newPos);
- return out.flip().toString();
- }
-
- /**
- * Returns the first occurence position of the specified byte from the current position to
- * the current limit.
- *
- * @return -1 if the specified byte is not found
- * @param b
- */
- public int indexOf(byte b)
- {
- if (getMessageData().hasArray())
- {
- int arrayOffset = getMessageData().arrayOffset();
- int beginPos = arrayOffset + getMessageData().position();
- int limit = arrayOffset + getMessageData().limit();
- byte[] array = getMessageData().array();
-
- for (int i = beginPos; i < limit; i++)
- {
- if (array[i] == b)
- {
- return i - arrayOffset;
- }
- }
- }
- else
- {
- int beginPos = getMessageData().position();
- int limit = getMessageData().limit();
-
- for (int i = beginPos; i < limit; i++)
- {
- if (getMessageData().get(i) == b)
- {
- return i;
- }
- }
- }
- return -1;
- }
-}
-
--
cgit v1.2.1