From bb3fe508f98ec31109f951fb059ee49746a36d48 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Fri, 27 Jan 2012 20:15:31 +0000 Subject: NO-JIRA: Encapsulate fields, use private members and accesors (keep checkstyle happy) git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1236867 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/example/OptionParser.java | 26 +- .../publisher/MonitorMessageDispatcher.java | 2 +- .../qpid/example/publisher/MonitorPublisher.java | 14 +- .../apache/qpid/example/publisher/Publisher.java | 87 ++++-- .../qpid/example/publisher/TopicPublisher.java | 5 +- .../org/apache/qpid/example/pubsub/Client.java | 32 ++- .../org/apache/qpid/example/pubsub/Publisher.java | 10 +- .../org/apache/qpid/example/pubsub/Subscriber.java | 12 +- .../apache/qpid/example/simple/reqresp/Client.java | 12 +- .../apache/qpid/example/simple/reqresp/Server.java | 12 +- .../java/org/apache/qpid/client/AMQConnection.java | 47 +++- .../qpid/client/AMQConnectionDelegate_0_10.java | 15 +- .../qpid/client/AMQConnectionDelegate_8_0.java | 22 +- .../org/apache/qpid/client/AMQDestination.java | 82 ++++-- .../org/apache/qpid/client/AMQQueueBrowser.java | 2 +- .../apache/qpid/client/AMQQueueSessionAdaptor.java | 2 +- .../java/org/apache/qpid/client/AMQSession.java | 303 +++++++++++++++------ .../org/apache/qpid/client/AMQSession_0_10.java | 70 ++--- .../org/apache/qpid/client/AMQSession_0_8.java | 112 ++++---- .../main/java/org/apache/qpid/client/AMQTopic.java | 2 +- .../apache/qpid/client/AMQTopicSessionAdaptor.java | 2 +- .../apache/qpid/client/BasicMessageConsumer.java | 82 ++++-- .../qpid/client/BasicMessageConsumer_0_10.java | 44 +-- .../qpid/client/BasicMessageConsumer_0_8.java | 14 +- .../apache/qpid/client/BasicMessageProducer.java | 105 +++++-- .../qpid/client/BasicMessageProducer_0_10.java | 16 +- .../qpid/client/BasicMessageProducer_0_8.java | 34 +-- .../org/apache/qpid/client/DispatcherCallback.java | 2 +- .../apache/qpid/client/MessageConsumerPair.java | 4 +- .../org/apache/qpid/client/QueueSenderAdapter.java | 2 +- .../org/apache/qpid/client/XAConnectionImpl.java | 4 +- .../java/org/apache/qpid/client/XASessionImpl.java | 2 +- .../qpid/client/failover/FailoverNoopSupport.java | 4 +- .../qpid/client/failover/FailoverRetrySupport.java | 4 +- .../client/handler/ClientMethodDispatcherImpl.java | 2 +- .../client/message/AbstractAMQMessageDelegate.java | 12 +- .../client/message/AbstractBytesTypedMessage.java | 7 +- .../qpid/client/message/AbstractJMSMessage.java | 2 +- .../client/message/AbstractJMSMessageFactory.java | 10 +- .../qpid/client/message/JMSBytesMessage.java | 2 +- .../qpid/client/message/JMSStreamMessage.java | 2 +- .../client/message/UnprocessedMessage_0_8.java | 8 +- .../apache/qpid/client/messaging/address/Link.java | 20 +- .../apache/qpid/client/messaging/address/Node.java | 27 +- .../qpid/client/protocol/AMQProtocolHandler.java | 236 ++++++++-------- .../qpid/client/protocol/AMQProtocolSession.java | 71 ++++- .../protocol/BlockingMethodFrameListener.java | 2 +- .../apache/qpid/client/state/AMQStateManager.java | 2 +- .../AMQNoTransportForProtocolException.java | 2 +- .../client/transport/ClientConnectionDelegate.java | 8 +- .../apache/qpid/client/util/BlockingWaiter.java | 2 +- .../org/apache/qpid/collections/ReferenceMap.java | 20 +- .../collections/keyvalue/AbstractKeyValue.java | 19 +- .../collections/keyvalue/AbstractMapEntry.java | 15 +- .../apache/qpid/filter/ArithmeticExpression.java | 4 +- .../org/apache/qpid/filter/BinaryExpression.java | 19 +- .../apache/qpid/filter/ComparisonExpression.java | 10 +- .../org/apache/qpid/filter/LogicExpression.java | 8 +- .../nclient/util/MessagePartListenerAdapter.java | 4 +- .../java/org/apache/qpid/client/AMQQueueTest.java | 10 +- .../org/apache/qpid/client/MockAMQConnection.java | 6 +- .../client/protocol/AMQProtocolHandlerTest.java | 6 +- .../ClassLoadingAwareObjectInputStreamTest.java | 4 +- .../qpid/test/unit/jndi/JNDIPropertyFileTest.java | 2 +- 64 files changed, 1065 insertions(+), 665 deletions(-) (limited to 'qpid/java/client') diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/OptionParser.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/OptionParser.java index 9548eab4c5..6cc6db1974 100644 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/OptionParser.java +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/OptionParser.java @@ -147,8 +147,8 @@ public class OptionParser for (Option option: optDefs) { - if ((op.startsWith("-") && option.shortForm != null && option.shortForm.equals(key)) || - (op.startsWith("--") && option.longForm != null && option.longForm.equals(key)) ) + if ((op.startsWith("-") && option.getShortForm() != null && option.getShortForm().equals(key)) || + (op.startsWith("--") && option.getLongForm() != null && option.getLongForm().equals(key)) ) { match = true; break; @@ -219,18 +219,18 @@ public class OptionParser protected boolean containsOp(Option op) { - return optMap.containsKey(op.shortForm) || optMap.containsKey(op.longForm); + return optMap.containsKey(op.getShortForm()) || optMap.containsKey(op.getLongForm()); } protected String getOp(Option op) { - if (optMap.containsKey(op.shortForm)) + if (optMap.containsKey(op.getShortForm())) { - return (String)optMap.get(op.shortForm); + return (String)optMap.get(op.getShortForm()); } - else if (optMap.containsKey(op.longForm)) + else if (optMap.containsKey(op.getLongForm())) { - return (String)optMap.get(op.longForm); + return (String)optMap.get(op.getLongForm()); } else { @@ -286,12 +286,12 @@ public class OptionParser static class Option { - private String shortForm; - private String longForm; - private String desc; - private String valueLabel; - private String defaultValue; - private Class type; + private final String shortForm; + private final String longForm; + private final String desc; + private final String valueLabel; + private final String defaultValue; + private final Class type; public Option(String shortForm, String longForm, String desc, String valueLabel, String defaultValue, Class type) diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java index 3d16e01af4..2b1e641689 100644 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java @@ -103,7 +103,7 @@ public class MonitorMessageDispatcher // (FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(),"monitor:" +System.currentTimeMillis())); getMonitorPublisher().sendMessage - (getMonitorPublisher()._session, + (getMonitorPublisher().getSession(), FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(), "monitor:" + System.currentTimeMillis()), DeliveryMode.PERSISTENT, false, true); diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java index 750f57d9dc..b2bb0893d8 100644 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java @@ -36,7 +36,7 @@ public class MonitorPublisher extends Publisher private static final Logger _log = LoggerFactory.getLogger(Publisher.class); - BasicMessageProducer _producer; + private BasicMessageProducer _producer; public MonitorPublisher() { @@ -51,14 +51,14 @@ public class MonitorPublisher extends Publisher { try { - _producer = (BasicMessageProducer) session.createProducer(_destination); + _producer = (BasicMessageProducer) session.createProducer(getDestination()); _producer.send(message, deliveryMode, immediate); if (commit) { //commit the message send and close the transaction - _session.commit(); + getSession().commit(); } } @@ -70,7 +70,7 @@ public class MonitorPublisher extends Publisher throw new UndeliveredMessageException("Cannot deliver immediate message", e); } - _log.info(_name + " finished sending message: " + message); + _log.info(getName() + " finished sending message: " + message); return true; } @@ -81,14 +81,14 @@ public class MonitorPublisher extends Publisher { try { - _producer = (BasicMessageProducer) _session.createProducer(_destination); + _producer = (BasicMessageProducer) getSession().createProducer(getDestination()); //Send message via our producer which is not persistent and is immediate //NB: not available via jms interface MessageProducer _producer.send(message, DeliveryMode.NON_PERSISTENT, true); //commit the message send and close the transaction - _session.commit(); + getSession().commit(); } catch (JMSException e) @@ -99,7 +99,7 @@ public class MonitorPublisher extends Publisher throw new UndeliveredMessageException("Cannot deliver immediate message", e); } - _log.info(_name + " finished sending message: " + message); + _log.info(getName() + " finished sending message: " + message); return true; } } diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java index b5f44557a4..76531523b9 100644 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java @@ -34,19 +34,19 @@ public class Publisher protected InitialContextHelper _contextHelper; - protected Connection _connection; + private Connection _connection; - protected Session _session; + private Session _session; - protected MessageProducer _producer; + private MessageProducer _producer; - protected String _destinationDir; + private String _destinationDir; - protected String _name = "Publisher"; + private String _name = "Publisher"; - protected Destination _destination; + private Destination _destination; - protected static final String _defaultDestinationDir = "/tmp"; + private static final String _defaultDestinationDir = "/tmp"; /** * Creates a Publisher instance using properties from example.properties @@ -62,9 +62,9 @@ public class Publisher //then create a connection using the AMQConnectionFactory AMQConnectionFactory cf = (AMQConnectionFactory) ctx.lookup("local"); - _connection = cf.createConnection(); + setConnection(cf.createConnection()); - _connection.setExceptionListener(new ExceptionListener() + getConnection().setExceptionListener(new ExceptionListener() { public void onException(JMSException jmse) { @@ -76,19 +76,19 @@ public class Publisher }); //create a transactional session - _session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + setSession(getConnection().createSession(true, Session.AUTO_ACKNOWLEDGE)); //lookup the example queue and use it //Queue is non-exclusive and not deleted when last consumer detaches - _destination = (Queue) ctx.lookup("MyQueue"); + setDestination((Queue) ctx.lookup("MyQueue")); //create a message producer - _producer = _session.createProducer(_destination); + setProducer(getSession().createProducer(getDestination())); //set destination dir for files that have been processed - _destinationDir = _defaultDestinationDir; + setDestinationDir(get_defaultDestinationDir()); - _connection.start(); + getConnection().start(); } catch (Exception e) { @@ -97,6 +97,11 @@ public class Publisher } } + public static String get_defaultDestinationDir() + { + return _defaultDestinationDir; + } + /** * Creates and sends the number of messages specified in the param */ @@ -104,7 +109,7 @@ public class Publisher { try { - TextMessage txtMessage = _session.createTextMessage("msg"); + TextMessage txtMessage = getSession().createTextMessage("msg"); for (int i=0;i args; + private String exchange; + private String bindingKey; + private String queue; + private Map args; public Binding(String exchange, String queue, @@ -902,4 +937,5 @@ public abstract class AMQDestination implements Destination, Referenceable return _rejectBehaviour; } + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java index 3f9eadeef3..465d858091 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java @@ -108,7 +108,7 @@ public class AMQQueueBrowser implements QueueBrowser private class QueueBrowserEnumeration implements Enumeration { - Message _nextMessage; + private Message _nextMessage; private BasicMessageConsumer _consumer; public QueueBrowserEnumeration(BasicMessageConsumer consumer) throws JMSException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueSessionAdaptor.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueSessionAdaptor.java index c59eba60b8..d1c796c34a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueSessionAdaptor.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueSessionAdaptor.java @@ -31,7 +31,7 @@ import java.io.Serializable; public class AMQQueueSessionAdaptor implements QueueSession, AMQSessionAdapter { //holds a session for delegation - protected final AMQSession _session; + private final AMQSession _session; /** * Construct an adaptor with a session to wrap diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index cca76ebac5..92579c31f0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -96,6 +96,166 @@ import java.util.concurrent.locks.ReentrantLock; */ public abstract class AMQSession extends Closeable implements Session, QueueSession, TopicSession { + /** + * Used to reference durable subscribers so that requests for unsubscribe can be handled correctly. Note this only + * keeps a record of subscriptions which have been created in the current instance. It does not remember + * subscriptions between executions of the client. + */ + protected ConcurrentHashMap> getSubscriptions() + { + return _subscriptions; + } + + /** + * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked + * up in the {@link #_subscriptions} map. + */ + protected ConcurrentHashMap getReverseSubscriptionMap() + { + return _reverseSubscriptionMap; + } + + /** + * Locks to keep access to subscriber details atomic. + *

+ * Added for QPID2418 + */ + protected Lock getSubscriberDetails() + { + return _subscriberDetails; + } + + protected Lock getSubscriberAccess() + { + return _subscriberAccess; + } + + /** + * Used to hold incoming messages. + * + * @todo Weaken the type once {@link org.apache.qpid.client.util.FlowControllingBlockingQueue} implements Queue. + */ + protected FlowControllingBlockingQueue getQueue() + { + return _queue; + } + + /** Holds the highest received delivery tag. */ + protected AtomicLong getHighestDeliveryTag() + { + return _highestDeliveryTag; + } + + /** Pre-fetched message tags */ + protected ConcurrentLinkedQueue getPrefetchedMessageTags() + { + return _prefetchedMessageTags; + } + + protected void setPrefetchedMessageTags(ConcurrentLinkedQueue prefetchedMessageTags) + { + _prefetchedMessageTags = prefetchedMessageTags; + } + + /** All the not yet acknowledged message tags */ + protected ConcurrentLinkedQueue getUnacknowledgedMessageTags() + { + return _unacknowledgedMessageTags; + } + + protected void setUnacknowledgedMessageTags(ConcurrentLinkedQueue unacknowledgedMessageTags) + { + _unacknowledgedMessageTags = unacknowledgedMessageTags; + } + + /** All the delivered message tags */ + protected ConcurrentLinkedQueue getDeliveredMessageTags() + { + return _deliveredMessageTags; + } + + protected void setDeliveredMessageTags(ConcurrentLinkedQueue deliveredMessageTags) + { + _deliveredMessageTags = deliveredMessageTags; + } + + /** Holds the dispatcher thread for this session. */ + protected Dispatcher getDispatcher() + { + return _dispatcher; + } + + protected void setDispatcher(Dispatcher dispatcher) + { + _dispatcher = dispatcher; + } + + protected Thread getDispatcherThread() + { + return _dispatcherThread; + } + + protected void setDispatcherThread(Thread dispatcherThread) + { + _dispatcherThread = dispatcherThread; + } + + /** Holds the message factory factory for this session. */ + protected MessageFactoryRegistry getMessageFactoryRegistry() + { + return _messageFactoryRegistry; + } + + protected void setMessageFactoryRegistry(MessageFactoryRegistry messageFactoryRegistry) + { + _messageFactoryRegistry = messageFactoryRegistry; + } + + /** + * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right + * consumer. + */ + protected IdToConsumerMap getConsumers() + { + return _consumers; + } + + /** + * Set when the dispatcher should direct incoming messages straight into the UnackedMessage list instead of + * to the syncRecieveQueue or MessageListener. Used during cleanup, e.g. in Session.recover(). + */ + protected boolean isUsingDispatcherForCleanup() + { + return _usingDispatcherForCleanup; + } + + protected void setUsingDispatcherForCleanup(boolean usingDispatcherForCleanup) + { + _usingDispatcherForCleanup = usingDispatcherForCleanup; + } + + /** + * Used to ensure that only the first call to start the dispatcher can unsuspend the channel. + * + * @todo This is accessed only within a synchronized method, so does not need to be atomic. + */ + protected AtomicBoolean getFirstDispatcher() + { + return _firstDispatcher; + } + + /** Used to indicate that the session should start pre-fetching messages as soon as it is started. */ + protected boolean isImmediatePrefetch() + { + return _immediatePrefetch; + } + + /** Indicates that runtime exceptions should be generated on vilations of the strict AMQP. */ + protected boolean isStrictAMQPFATAL() + { + return _strictAMQPFATAL; + } + public static final class IdToConsumerMap { private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; @@ -173,8 +333,6 @@ public abstract class AMQSession _thisSession = this; - /** Used for debugging. */ private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class); @@ -182,34 +340,34 @@ public abstract class AMQSession> _subscriptions = + private final ConcurrentHashMap> _subscriptions = new ConcurrentHashMap>(); - /** - * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked - * up in the {@link #_subscriptions} map. - */ - protected final ConcurrentHashMap _reverseSubscriptionMap = new ConcurrentHashMap(); + private final ConcurrentHashMap _reverseSubscriptionMap = new ConcurrentHashMap(); - /** - * Locks to keep access to subscriber details atomic. - *

- * Added for QPID2418 - */ - protected final Lock _subscriberDetails = new ReentrantLock(true); - protected final Lock _subscriberAccess = new ReentrantLock(true); + private final Lock _subscriberDetails = new ReentrantLock(true); + private final Lock _subscriberAccess = new ReentrantLock(true); - /** - * Used to hold incoming messages. - * - * @todo Weaken the type once {@link FlowControllingBlockingQueue} implements Queue. - */ - protected final FlowControllingBlockingQueue _queue; + private final FlowControllingBlockingQueue _queue; - /** Holds the highest received delivery tag. */ - protected final AtomicLong _highestDeliveryTag = new AtomicLong(-1); + private final AtomicLong _highestDeliveryTag = new AtomicLong(-1); private final AtomicLong _rollbackMark = new AtomicLong(-1); - /** Pre-fetched message tags */ - protected ConcurrentLinkedQueue _prefetchedMessageTags = new ConcurrentLinkedQueue(); + private ConcurrentLinkedQueue _prefetchedMessageTags = new ConcurrentLinkedQueue(); - /** All the not yet acknowledged message tags */ - protected ConcurrentLinkedQueue _unacknowledgedMessageTags = new ConcurrentLinkedQueue(); + private ConcurrentLinkedQueue _unacknowledgedMessageTags = new ConcurrentLinkedQueue(); - /** All the delivered message tags */ - protected ConcurrentLinkedQueue _deliveredMessageTags = new ConcurrentLinkedQueue(); + private ConcurrentLinkedQueue _deliveredMessageTags = new ConcurrentLinkedQueue(); - /** Holds the dispatcher thread for this session. */ - protected Dispatcher _dispatcher; + private Dispatcher _dispatcher; - protected Thread _dispatcherThread; + private Thread _dispatcherThread; - /** Holds the message factory factory for this session. */ - protected MessageFactoryRegistry _messageFactoryRegistry; + private MessageFactoryRegistry _messageFactoryRegistry; /** Holds all of the producers created by this session, keyed by their unique identifiers. */ private Map _producers = new ConcurrentHashMap(); @@ -314,11 +447,7 @@ public abstract class AMQSession _consumers = new IdToConsumerMap(); + private final IdToConsumerMap _consumers = new IdToConsumerMap(); /** * Contains a list of consumers which have been removed but which might still have @@ -344,11 +473,7 @@ public abstract class AMQSession= System.currentTimeMillis() ) { - _flowControl.wait(FLOW_CONTROL_WAIT_PERIOD); - _logger.warn("Message send delayed by " + (System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE - expiryTime)/1000 + "s due to broker enforced flow control"); + _flowControl.wait(_flowControlWaitPeriod); + _logger.warn("Message send delayed by " + (System.currentTimeMillis() + _flowControlWaitFailure - expiryTime)/1000 + "s due to broker enforced flow control"); } if(!_flowControl.getFlowControl()) { _logger.error("Message send failed due to timeout waiting on broker enforced flow control"); - throw new JMSException("Unable to send message for " + FLOW_CONTROL_WAIT_FAILURE/1000 + " seconds due to broker enforced flow control"); + throw new JMSException("Unable to send message for " + _flowControlWaitFailure /1000 + " seconds due to broker enforced flow control"); } } @@ -3198,6 +3316,11 @@ public abstract class AMQSession= prefetch/2 || maxAckDelay <= 0 || _acknowledgeMode == javax.jms.Session.AUTO_ACKNOWLEDGE) + if (unackedCount >= prefetch/2 || maxAckDelay <= 0 || getAcknowledgeMode() == javax.jms.Session.AUTO_ACKNOWLEDGE) { flushAcknowledgments(); } @@ -276,7 +282,7 @@ public class AMQSession_0_10 extends AMQSession 0) { messageAcknowledge - (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE,setSyncBit); + (unacked, getAcknowledgeMode() != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE,setSyncBit); clearUnacked(); } } @@ -444,8 +450,8 @@ public class AMQSession_0_10 extends AMQSession deliveredIter = delivered.iterator(); deliveredIter.hasNext();) { Range range = deliveredIter.next(); @@ -526,9 +532,9 @@ public class AMQSession_0_10 extends AMQSession 0 && _dispatcher != null && (isStarted() || _immediatePrefetch)) + if(capacity > 0 && getDispatcher() != null && (isStarted() || isImmediatePrefetch())) { // set the flow getQpidSession().messageFlow(consumerTag, @@ -653,7 +659,7 @@ public class AMQSession_0_10 extends AMQSession 0 && (_connection.getMaxPrefetch() == 1 || - _connection.getMaxPrefetch() != 0 && _txSize % (_connection.getMaxPrefetch() / 2) == 0)) + if (_txSize > 0 && (getAMQConnection().getMaxPrefetch() == 1 || + getAMQConnection().getMaxPrefetch() != 0 && _txSize % (getAMQConnection().getMaxPrefetch() / 2) == 0)) { // send completed so consumer credits don't dry up messageAcknowledge(_txRangeSet, false); @@ -1040,7 +1046,7 @@ public class AMQSession_0_10 extends AMQSession 0 ) { @@ -1345,13 +1351,13 @@ public class AMQSession_0_10 extends AMQSession // messages sent by the brokers following the first rollback // after failover - _highestDeliveryTag.set(-1); + getHighestDeliveryTag().set(-1); // Clear txRangeSet/unacknowledgedMessageTags so we don't complete commands corresponding to //messages that came from the old broker. _txRangeSet.clear(); _txSize = 0; - _unacknowledgedMessageTags.clear(); - _prefetchedMessageTags.clear(); + getUnacknowledgedMessageTags().clear(); + getPrefetchedMessageTags().clear(); super.resubscribe(); getQpidSession().sync(); } @@ -1362,18 +1368,18 @@ public class AMQSession_0_10 extends AMQSession tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags(); - _prefetchedMessageTags.addAll(tags); + getPrefetchedMessageTags().addAll(tags); } } - _usingDispatcherForCleanup = true; + setUsingDispatcherForCleanup(true); drainDispatchQueue(); - _usingDispatcherForCleanup = false; + setUsingDispatcherForCleanup(false); - RangeSet delivered = gatherRangeSet(_unacknowledgedMessageTags); - RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags); + RangeSet delivered = gatherRangeSet(getUnacknowledgedMessageTags()); + RangeSet prefetched = gatherRangeSet(getPrefetchedMessageTags()); RangeSet all = RangeSetFactory.createRangeSet(delivered.size() + prefetched.size()); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 574775804b..96994e7963 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -103,7 +103,7 @@ public class AMQSession_0_8 extends AMQSession arguments) throws AMQException, @@ -190,22 +190,22 @@ public class AMQSession_0_8 extends AMQSession consumersToCheck = new ArrayList(_consumers.values()); + ArrayList consumersToCheck = new ArrayList(getConsumers().values()); boolean messageListenerFound = false; boolean serverRejectBehaviourFound = false; for(BasicMessageConsumer_0_8 consumer : consumersToCheck) @@ -259,7 +259,7 @@ public class AMQSession_0_8 extends AMQSession getQueueDestinationCache() @@ -579,6 +576,15 @@ public class AMQSession_0_8 extends AMQSession extends Closeable implements Messa { private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class); - /** The connection being used by this consumer */ - protected final AMQConnection _connection; + private final AMQConnection _connection; - protected final MessageFilter _messageSelectorFilter; + private final MessageFilter _messageSelectorFilter; private final boolean _noLocal; - protected AMQDestination _destination; + private AMQDestination _destination; /** * When true indicates that a blocking receive call is in progress @@ -78,23 +77,17 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa */ private final AtomicReference _messageListener = new AtomicReference(); - /** The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker */ - protected int _consumerTag; + private int _consumerTag; - /** We need to know the channel id when constructing frames */ - protected final int _channelId; + private final int _channelId; - /** - * Used in the blocking receive methods to receive a message from the Session thread.

Or to notify of errors - *

Argument true indicates we want strict FIFO semantics - */ - protected final BlockingQueue _synchronousQueue; + private final BlockingQueue _synchronousQueue; - protected final MessageFactoryRegistry _messageFactory; + private final MessageFactoryRegistry _messageFactory; - protected final AMQSession _session; + private final AMQSession _session; - protected final AMQProtocolHandler _protocolHandler; + private final AMQProtocolHandler _protocolHandler; /** * We need to store the "raw" field table so that we can resubscribe in the event of failover being required @@ -113,17 +106,9 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa */ private final int _prefetchLow; - /** - * We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover - */ - protected boolean _exclusive; + private boolean _exclusive; - /** - * The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per - * consumer whereas JMS defines this at the session level, hence why we associate it with the consumer in our - * implementation. - */ - protected final int _acknowledgeMode; + private final int _acknowledgeMode; /** * List of tags delievered, The last of which which should be acknowledged on commit in transaction mode. @@ -238,6 +223,11 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa return _messageListener.get(); } + /** + * The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per + * consumer whereas JMS defines this at the session level, hence why we associate it with the consumer in our + * implementation. + */ public int getAcknowledgeMode() { return _acknowledgeMode; @@ -377,6 +367,9 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa return _noLocal; } + /** + * We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover + */ public boolean isExclusive() { return _exclusive; @@ -865,6 +858,7 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa _session.deregisterConsumer(this); } + /** The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker */ public int getConsumerTag() { return _consumerTag; @@ -1014,4 +1008,40 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa public void failedOverPost() {} + /** The connection being used by this consumer */ + protected AMQConnection getConnection() + { + return _connection; + } + + protected void setDestination(AMQDestination destination) + { + _destination = destination; + } + + /** We need to know the channel id when constructing frames */ + protected int getChannelId() + { + return _channelId; + } + + /** + * Used in the blocking receive methods to receive a message from the Session thread.

Or to notify of errors + *

Argument true indicates we want strict FIFO semantics + */ + protected BlockingQueue getSynchronousQueue() + { + return _synchronousQueue; + } + + protected MessageFactoryRegistry getMessageFactory() + { + return _messageFactory; + } + + protected AMQProtocolHandler getProtocolHandler() + { + return _protocolHandler; + } + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 5dadcd5ca3..ccde720673 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -57,7 +57,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer 0) + if (getSynchronousQueue().size() > 0) { RangeSet ranges = RangeSetFactory.createRangeSet(); - Iterator iterator = _synchronousQueue.iterator(); + Iterator iterator = getSynchronousQueue().iterator(); while (iterator.hasNext()) { @@ -497,7 +497,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer { - protected final Logger _logger = LoggerFactory.getLogger(getClass()); + private final Logger _logger = LoggerFactory.getLogger(getClass()); private AMQSession_0_8.DestinationCache _topicDestinationCache; private AMQSession_0_8.DestinationCache _queueDestinationCache; @@ -95,11 +95,11 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer 0 || !_disableTimestamps) + if (timeToLive > 0 || !isDisableTimestamps()) { currentTime = System.currentTimeMillis(); } @@ -136,7 +136,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer message.setJMSExpiration(currentTime + timeToLive); } - if (!_disableTimestamps) + if (!isDisableTimestamps()) { deliveryProp.setTimestamp(currentTime); @@ -213,8 +213,8 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer // if true, we need to sync the delivery of this message boolean sync = false; - sync = ( (publishMode == PublishMode.SYNC_PUBLISH_ALL) || - (publishMode == PublishMode.SYNC_PUBLISH_PERSISTENT && + sync = ( (getPublishMode() == PublishMode.SYNC_PUBLISH_ALL) || + (getPublishMode() == PublishMode.SYNC_PUBLISH_PERSISTENT && deliveryMode == DeliveryMode.PERSISTENT) ); @@ -248,14 +248,14 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer @Override public boolean isBound(AMQDestination destination) throws JMSException { - return _session.isQueueBound(destination); + return getSession().isQueueBound(destination); } @Override public void close() throws JMSException { super.close(); - AMQDestination dest = _destination; + AMQDestination dest = getAMQDestination(); if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) { if (dest.getDelete() == AddressOption.ALWAYS || @@ -264,7 +264,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer try { ((AMQSession_0_10) getSession()).getQpidSession().queueDelete( - _destination.getQueueName()); + getAMQDestination().getQueueName()); } catch(TransportException e) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java index 94121db99f..3b5e361f97 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java @@ -54,7 +54,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer final MethodRegistry methodRegistry = getSession().getMethodRegistry(); ExchangeDeclareBody body = - methodRegistry.createExchangeDeclareBody(_session.getTicket(), + methodRegistry.createExchangeDeclareBody(getSession().getTicket(), destination.getExchangeName(), destination.getExchangeClass(), destination.getExchangeName().toString().startsWith("amq."), @@ -66,29 +66,29 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer // Declare the exchange // Note that the durable and internal arguments are ignored since passive is set to false - AMQFrame declare = body.generateFrame(_channelId); + AMQFrame declare = body.generateFrame(getChannelId()); - _protocolHandler.writeFrame(declare); + getProtocolHandler().writeFrame(declare); } void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message, UUID messageId, int deliveryMode,int priority, long timeToLive, boolean mandatory, boolean immediate) throws JMSException { - BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(_session.getTicket(), + BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(getSession().getTicket(), destination.getExchangeName(), destination.getRoutingKey(), mandatory, immediate); - AMQFrame publishFrame = body.generateFrame(_channelId); + AMQFrame publishFrame = body.generateFrame(getChannelId()); message.prepareForSending(); ByteBuffer payload = message.getData(); AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8) message.getDelegate(); BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties(); - contentHeaderProperties.setUserId(_userID); + contentHeaderProperties.setUserId(getUserID()); //Set the JMS_QPID_DESTTYPE for 0-8/9 messages int type; @@ -108,7 +108,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer //Set JMS_QPID_DESTTYPE delegate.getContentHeaderProperties().getHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type); - if (!_disableTimestamps) + if (!isDisableTimestamps()) { final long currentTime = System.currentTimeMillis(); contentHeaderProperties.setTimestamp(currentTime); @@ -132,12 +132,12 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer if (payload != null) { - createContentBodies(payload, frames, 2, _channelId); + createContentBodies(payload, frames, 2, getChannelId()); } - if ((contentBodyFrameCount != 0) && _logger.isDebugEnabled()) + if ((contentBodyFrameCount != 0) && getLogger().isDebugEnabled()) { - _logger.debug("Sending content body frames to " + destination); + getLogger().debug("Sending content body frames to " + destination); } @@ -145,11 +145,11 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer int classIfForBasic = getSession().getMethodRegistry().createBasicQosOkBody().getClazz(); AMQFrame contentHeaderFrame = - ContentHeaderBody.createAMQFrame(_channelId, + ContentHeaderBody.createAMQFrame(getChannelId(), classIfForBasic, 0, contentHeaderProperties, size); - if (_logger.isDebugEnabled()) + if (getLogger().isDebugEnabled()) { - _logger.debug("Sending content header frame to " + destination); + getLogger().debug("Sending content header frame to " + destination); } frames[0] = publishFrame; @@ -158,7 +158,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer try { - _session.checkFlowControl(); + getSession().checkFlowControl(); } catch (InterruptedException e) { @@ -168,7 +168,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer throw jmse; } - _protocolHandler.writeFrame(compositeFrame); + getProtocolHandler().writeFrame(compositeFrame); } /** @@ -192,7 +192,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer else { - final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1; + final long framePayloadMax = getSession().getAMQConnection().getMaximumFrameSize() - 1; long remaining = payload.remaining(); for (int i = offset; i < frames.length; i++) { @@ -222,7 +222,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer else { int dataLength = payload.remaining(); - final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1; + final long framePayloadMax = getSession().getAMQConnection().getMaximumFrameSize() - 1; int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0; frameCount = (int) (dataLength / framePayloadMax) + lastFrame; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java b/qpid/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java index 81a55006ed..1cd8df6e4a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java @@ -24,7 +24,7 @@ import java.util.Queue; public abstract class DispatcherCallback { - BasicMessageConsumer _consumer; + private BasicMessageConsumer _consumer; public DispatcherCallback(BasicMessageConsumer mc) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java b/qpid/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java index 585d6db3fd..134159afe1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java @@ -22,8 +22,8 @@ package org.apache.qpid.client; public class MessageConsumerPair { - BasicMessageConsumer _consumer; - Object _item; + private BasicMessageConsumer _consumer; + private Object _item; public MessageConsumerPair(BasicMessageConsumer consumer, Object item) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java index 295c6a4091..0b797df9dd 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java @@ -202,7 +202,7 @@ public class QueueSenderAdapter implements QueueSender { if (_delegate.getSession().isStrictAMQP()) { - _delegate._logger.warn("AMQP does not support destination validation before publish, "); + _delegate.getLogger().warn("AMQP does not support destination validation before publish, "); destination.setCheckedForQueueBinding(true); } else diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java index a7494305c6..d9514338ce 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java @@ -53,7 +53,7 @@ public class XAConnectionImpl extends AMQConnection implements XAConnection, XAQ public synchronized XASession createXASession() throws JMSException { checkNotClosed(); - return _delegate.createXASession(); + return getDelegate().createXASession(); } //-- Interface XAQueueConnection @@ -86,6 +86,6 @@ public class XAConnectionImpl extends AMQConnection implements XAConnection, XAQ public XASession createXASession(int ackMode) throws JMSException { checkNotClosed(); - return _delegate.createXASession(ackMode); + return getDelegate().createXASession(ackMode); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java index 1d991372df..85623df8c0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java @@ -86,7 +86,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic */ public void createSession() { - _qpidDtxSession = _qpidConnection.createSession(0); + _qpidDtxSession = getQpidConnection().createSession(0); _qpidDtxSession.setSessionListener(this); _qpidDtxSession.dtxSelect(); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java index 51cc94965a..a69e808880 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java @@ -38,10 +38,10 @@ import org.apache.qpid.client.AMQConnection; public class FailoverNoopSupport implements FailoverSupport { /** The protected operation that is to be retried in the event of fail-over. */ - FailoverProtectedOperation operation; + private FailoverProtectedOperation operation; /** The connection on which the fail-over protected operation is to be performed. */ - AMQConnection connection; + private AMQConnection connection; /** * Creates an automatic retrying fail-over handler for the specified operation. diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java index 0146f1935d..d3d33d3c75 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java @@ -73,10 +73,10 @@ public class FailoverRetrySupport implements FailoverSup private static final Logger _log = LoggerFactory.getLogger(FailoverRetrySupport.class); /** The protected operation that is to be retried in the event of fail-over. */ - FailoverProtectedOperation operation; + private FailoverProtectedOperation operation; /** The connection on which the fail-over protected operation is to be performed. */ - AMQConnection connection; + private AMQConnection connection; /** * Creates an automatic retrying fail-over handler for the specified operation. diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java index 9af225aded..558d93538b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java @@ -104,7 +104,7 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher return factory.createMethodDispatcher(session); } - AMQProtocolSession _session; + private AMQProtocolSession _session; public ClientMethodDispatcherImpl(AMQProtocolSession session) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java index 98ca8ed8cb..1395f39b99 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java @@ -121,18 +121,18 @@ public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate exchangeInfo = new ExchangeInfo(exchange.asString(),"",AMQDestination.UNKNOWN_TYPE); } - if ("topic".equals(exchangeInfo.exchangeType)) + if ("topic".equals(exchangeInfo.getExchangeType())) { dest = new AMQTopic(exchange, routingKey, null); } - else if ("direct".equals(exchangeInfo.exchangeType)) + else if ("direct".equals(exchangeInfo.getExchangeType())) { dest = new AMQQueue(exchange, routingKey, routingKey); } else { dest = new AMQAnyDestination(exchange, - new AMQShortString(exchangeInfo.exchangeType), + new AMQShortString(exchangeInfo.getExchangeType()), routingKey, false, false, @@ -223,9 +223,9 @@ public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate class ExchangeInfo { - String exchangeName; - String exchangeType; - int destType = AMQDestination.QUEUE_TYPE; + private String exchangeName; + private String exchangeType; + private int destType = AMQDestination.QUEUE_TYPE; public ExchangeInfo(String exchangeName, String exchangeType, int destType) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java index a4173be1d7..9c7bd0bdcf 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java @@ -34,7 +34,7 @@ import java.nio.ByteBuffer; */ public abstract class AbstractBytesTypedMessage extends AbstractJMSMessage { - protected boolean _readableMessage = false; + private boolean _readableMessage = false; AbstractBytesTypedMessage(AMQMessageDelegateFactory delegateFactory, boolean fromReceivedMessage) { @@ -75,6 +75,11 @@ public abstract class AbstractBytesTypedMessage extends AbstractJMSMessage _readableMessage = false; } + protected void setReadable(boolean readable) + { + _readableMessage = readable; + } + public String toBodyString() throws JMSException { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 442fca6fe3..d1e43447cc 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -36,7 +36,7 @@ public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */ - protected AMQMessageDelegate _delegate; + private AMQMessageDelegate _delegate; private boolean _redelivered; private boolean _receivedFromServer; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java index 6fbcea8aed..608567674a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java @@ -57,25 +57,25 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory { if (debug) { - _logger.debug("Non-fragmented message body (bodySize=" + contentHeader.bodySize + ")"); + _logger.debug("Non-fragmented message body (bodySize=" + contentHeader.getBodySize() + ")"); } - data = ByteBuffer.wrap(((ContentBody) bodies.get(0))._payload); + data = ByteBuffer.wrap(((ContentBody) bodies.get(0)).getPayload()); } else if (bodies != null) { if (debug) { _logger.debug("Fragmented message body (" + bodies - .size() + " frames, bodySize=" + contentHeader.bodySize + ")"); + .size() + " frames, bodySize=" + contentHeader.getBodySize() + ")"); } - data = ByteBuffer.allocate((int) contentHeader.bodySize); // XXX: Is cast a problem? + data = ByteBuffer.allocate((int) contentHeader.getBodySize()); // XXX: Is cast a problem? final Iterator it = bodies.iterator(); while (it.hasNext()) { ContentBody cb = (ContentBody) it.next(); - final ByteBuffer payload = ByteBuffer.wrap(cb._payload); + final ByteBuffer payload = ByteBuffer.wrap(cb.getPayload()); if(payload.isDirect() || payload.isReadOnly()) { data.put(payload); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java index 98cc323ad3..b0320d0f4e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java @@ -52,7 +52,7 @@ public class JMSBytesMessage extends AbstractBytesTypedMessage implements BytesM public void reset() { - _readableMessage = true; + setReadable(true); if(_typedBytesContentReader != null) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java index 7fca76268f..b958d89515 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java @@ -54,7 +54,7 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea public void reset() { - _readableMessage = true; + setReadable(true); if(_typedBytesContentReader != null) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java index 847975a5e5..c78b6ced93 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java @@ -87,13 +87,13 @@ public class UnprocessedMessage_0_8 extends UnprocessedMessage public void receiveBody(ContentBody body) { - if (body._payload != null) + if (body.getPayload() != null) { - final long payloadSize = body._payload.length; + final long payloadSize = body.getPayload().length; if (_bodies == null) { - if (payloadSize == getContentHeader().bodySize) + if (payloadSize == getContentHeader().getBodySize()) { _bodies = Collections.singletonList(body); } @@ -124,7 +124,7 @@ public class UnprocessedMessage_0_8 extends UnprocessedMessage public boolean isAllBodyDataReceived() { - return _bytesReceived == getContentHeader().bodySize; + return _bytesReceived == getContentHeader().getBodySize(); } public BasicDeliverBody getDeliverBody() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java index c73d800b14..41f6725c8f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java @@ -29,16 +29,16 @@ public class Link public enum Reliability { UNRELIABLE, AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE } - protected String name; - protected String _filter; - protected FilterType _filterType = FilterType.SUBJECT; - protected boolean _isNoLocal; - protected boolean _isDurable; - protected int _consumerCapacity = 0; - protected int _producerCapacity = 0; - protected Node node; - protected Subscription subscription; - protected Reliability reliability = Reliability.AT_LEAST_ONCE; + private String name; + private String _filter; + private FilterType _filterType = FilterType.SUBJECT; + private boolean _isNoLocal; + private boolean _isDurable; + private int _consumerCapacity = 0; + private int _producerCapacity = 0; + private Node node; + private Subscription subscription; + private Reliability reliability = Reliability.AT_LEAST_ONCE; public Reliability getReliability() { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java index fe469090d8..bb5bba9068 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java @@ -31,13 +31,18 @@ import java.util.Map; public abstract class Node { - protected int _nodeType = AMQDestination.UNKNOWN_TYPE; - protected boolean _isDurable; - protected boolean _isAutoDelete; - protected String _alternateExchange; - protected List _bindings = new ArrayList(); - protected Map _declareArgs = Collections.emptyMap(); - + private int _nodeType = AMQDestination.UNKNOWN_TYPE; + private boolean _isDurable; + private boolean _isAutoDelete; + private String _alternateExchange; + private List _bindings = new ArrayList(); + private Map _declareArgs = Collections.emptyMap(); + + protected Node(int nodeType) + { + _nodeType = nodeType; + } + public int getType() { return _nodeType; @@ -104,7 +109,7 @@ public abstract class Node public QueueNode() { - _nodeType = AMQDestination.QUEUE_TYPE; + super(AMQDestination.QUEUE_TYPE); } public boolean isExclusive() @@ -125,7 +130,7 @@ public abstract class Node public ExchangeNode() { - _nodeType = AMQDestination.TOPIC_TYPE; + super(AMQDestination.TOPIC_TYPE); } public String getExchangeType() @@ -142,5 +147,9 @@ public abstract class Node public static class UnknownNodeType extends Node { + public UnknownNodeType() + { + super(AMQDestination.UNKNOWN_TYPE); + } } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index c23e2ba985..d4da0ede32 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -865,159 +865,159 @@ public class AMQProtocolHandler implements ProtocolEngine } private static class BytesDataOutput implements DataOutput + { + private int _pos = 0; + private byte[] _buf; + + public BytesDataOutput(byte[] buf) { - int _pos = 0; - byte[] _buf; + _buf = buf; + } - public BytesDataOutput(byte[] buf) - { - _buf = buf; - } + public void setBuffer(byte[] buf) + { + _buf = buf; + _pos = 0; + } - public void setBuffer(byte[] buf) - { - _buf = buf; - _pos = 0; - } + public void reset() + { + _pos = 0; + } - public void reset() - { - _pos = 0; - } + public int length() + { + return _pos; + } - public int length() - { - return _pos; - } + public void write(int b) + { + _buf[_pos++] = (byte) b; + } - public void write(int b) - { - _buf[_pos++] = (byte) b; - } + public void write(byte[] b) + { + System.arraycopy(b, 0, _buf, _pos, b.length); + _pos+=b.length; + } - public void write(byte[] b) - { - System.arraycopy(b, 0, _buf, _pos, b.length); - _pos+=b.length; - } + public void write(byte[] b, int off, int len) + { + System.arraycopy(b, off, _buf, _pos, len); + _pos+=len; - public void write(byte[] b, int off, int len) - { - System.arraycopy(b, off, _buf, _pos, len); - _pos+=len; + } - } + public void writeBoolean(boolean v) + { + _buf[_pos++] = v ? (byte) 1 : (byte) 0; + } - public void writeBoolean(boolean v) - { - _buf[_pos++] = v ? (byte) 1 : (byte) 0; - } + public void writeByte(int v) + { + _buf[_pos++] = (byte) v; + } - public void writeByte(int v) - { - _buf[_pos++] = (byte) v; - } + public void writeShort(int v) + { + _buf[_pos++] = (byte) (v >>> 8); + _buf[_pos++] = (byte) v; + } - public void writeShort(int v) - { - _buf[_pos++] = (byte) (v >>> 8); - _buf[_pos++] = (byte) v; - } + public void writeChar(int v) + { + _buf[_pos++] = (byte) (v >>> 8); + _buf[_pos++] = (byte) v; + } - public void writeChar(int v) + public void writeInt(int v) + { + _buf[_pos++] = (byte) (v >>> 24); + _buf[_pos++] = (byte) (v >>> 16); + _buf[_pos++] = (byte) (v >>> 8); + _buf[_pos++] = (byte) v; + } + + public void writeLong(long v) + { + _buf[_pos++] = (byte) (v >>> 56); + _buf[_pos++] = (byte) (v >>> 48); + _buf[_pos++] = (byte) (v >>> 40); + _buf[_pos++] = (byte) (v >>> 32); + _buf[_pos++] = (byte) (v >>> 24); + _buf[_pos++] = (byte) (v >>> 16); + _buf[_pos++] = (byte) (v >>> 8); + _buf[_pos++] = (byte)v; + } + + public void writeFloat(float v) + { + writeInt(Float.floatToIntBits(v)); + } + + public void writeDouble(double v) + { + writeLong(Double.doubleToLongBits(v)); + } + + public void writeBytes(String s) + { + int len = s.length(); + for (int i = 0 ; i < len ; i++) { - _buf[_pos++] = (byte) (v >>> 8); - _buf[_pos++] = (byte) v; + _buf[_pos++] = ((byte)s.charAt(i)); } + } - public void writeInt(int v) + public void writeChars(String s) + { + int len = s.length(); + for (int i = 0 ; i < len ; i++) { - _buf[_pos++] = (byte) (v >>> 24); - _buf[_pos++] = (byte) (v >>> 16); + int v = s.charAt(i); _buf[_pos++] = (byte) (v >>> 8); _buf[_pos++] = (byte) v; } + } - public void writeLong(long v) - { - _buf[_pos++] = (byte) (v >>> 56); - _buf[_pos++] = (byte) (v >>> 48); - _buf[_pos++] = (byte) (v >>> 40); - _buf[_pos++] = (byte) (v >>> 32); - _buf[_pos++] = (byte) (v >>> 24); - _buf[_pos++] = (byte) (v >>> 16); - _buf[_pos++] = (byte) (v >>> 8); - _buf[_pos++] = (byte)v; - } + public void writeUTF(String s) + { + int strlen = s.length(); - public void writeFloat(float v) - { - writeInt(Float.floatToIntBits(v)); - } + int pos = _pos; + _pos+=2; - public void writeDouble(double v) - { - writeLong(Double.doubleToLongBits(v)); - } - public void writeBytes(String s) + for (int i = 0; i < strlen; i++) { - int len = s.length(); - for (int i = 0 ; i < len ; i++) + int c = s.charAt(i); + if ((c >= 0x0001) && (c <= 0x007F)) { - _buf[_pos++] = ((byte)s.charAt(i)); - } - } + c = s.charAt(i); + _buf[_pos++] = (byte) c; - public void writeChars(String s) - { - int len = s.length(); - for (int i = 0 ; i < len ; i++) + } + else if (c > 0x07FF) { - int v = s.charAt(i); - _buf[_pos++] = (byte) (v >>> 8); - _buf[_pos++] = (byte) v; + _buf[_pos++] = (byte) (0xE0 | ((c >> 12) & 0x0F)); + _buf[_pos++] = (byte) (0x80 | ((c >> 6) & 0x3F)); + _buf[_pos++] = (byte) (0x80 | (c & 0x3F)); } - } - - public void writeUTF(String s) - { - int strlen = s.length(); - - int pos = _pos; - _pos+=2; - - - for (int i = 0; i < strlen; i++) + else { - int c = s.charAt(i); - if ((c >= 0x0001) && (c <= 0x007F)) - { - c = s.charAt(i); - _buf[_pos++] = (byte) c; - - } - else if (c > 0x07FF) - { - _buf[_pos++] = (byte) (0xE0 | ((c >> 12) & 0x0F)); - _buf[_pos++] = (byte) (0x80 | ((c >> 6) & 0x3F)); - _buf[_pos++] = (byte) (0x80 | (c & 0x3F)); - } - else - { - _buf[_pos++] = (byte) (0xC0 | ((c >> 6) & 0x1F)); - _buf[_pos++] = (byte) (0x80 | (c & 0x3F)); - } + _buf[_pos++] = (byte) (0xC0 | ((c >> 6) & 0x1F)); + _buf[_pos++] = (byte) (0x80 | (c & 0x3F)); } - - int len = _pos - (pos + 2); - - _buf[pos++] = (byte) (len >>> 8); - _buf[pos] = (byte) len; } + int len = _pos - (pos + 2); + + _buf[pos++] = (byte) (len >>> 8); + _buf[pos] = (byte) len; } + } + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 93a51adb68..c9b2e9cdc4 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -73,16 +73,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession protected static final String SASL_CLIENT = "SASLClient"; - /** - * The handler from which this session was created and which is used to handle protocol events. We send failover - * events to the handler. - */ - protected final AMQProtocolHandler _protocolHandler; + private final AMQProtocolHandler _protocolHandler; - /** Maps from the channel id to the AMQSession that it represents. */ - protected ConcurrentMap _channelId2SessionMap = new ConcurrentHashMap(); + private ConcurrentMap _channelId2SessionMap = new ConcurrentHashMap(); - protected ConcurrentMap _closingChannels = new ConcurrentHashMap(); + private ConcurrentMap _closingChannels = new ConcurrentHashMap(); /** * Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives @@ -91,9 +86,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession private final ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap(); private final UnprocessedMessage[] _channelId2UnprocessedMsgArray = new UnprocessedMessage[16]; - /** Counter to ensure unique queue names */ - protected int _queueId = 1; - protected final Object _queueIdLock = new Object(); + private int _queueId = 1; + private final Object _queueIdLock = new Object(); private ProtocolVersion _protocolVersion; // private VersionSpecificRegistry _registry = @@ -104,7 +98,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession private MethodDispatcher _methodDispatcher; - protected final AMQConnection _connection; + private final AMQConnection _connection; private ConnectionTuneParameters _connectionTuneParameters; @@ -223,7 +217,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession } msg.setContentHeader(contentHeader); - if (contentHeader.bodySize == 0) + if (contentHeader.getBodySize() == 0) { deliverMessageToAMQSession(channelId, msg); } @@ -470,4 +464,55 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { return "AMQProtocolSession[" + _connection + ']'; } + + /** + * The handler from which this session was created and which is used to handle protocol events. We send failover + * events to the handler. + */ + protected AMQProtocolHandler getProtocolHandler() + { + return _protocolHandler; + } + + /** Maps from the channel id to the AMQSession that it represents. */ + protected ConcurrentMap getChannelId2SessionMap() + { + return _channelId2SessionMap; + } + + protected void setChannelId2SessionMap(ConcurrentMap channelId2SessionMap) + { + _channelId2SessionMap = channelId2SessionMap; + } + + protected ConcurrentMap getClosingChannels() + { + return _closingChannels; + } + + protected void setClosingChannels(ConcurrentMap closingChannels) + { + _closingChannels = closingChannels; + } + + /** Counter to ensure unique queue names */ + protected int getQueueId() + { + return _queueId; + } + + protected void setQueueId(int queueId) + { + _queueId = queueId; + } + + protected Object getQueueIdLock() + { + return _queueIdLock; + } + + protected AMQConnection getConnection() + { + return _connection; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java index 4350b48a10..b865c51cb7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java @@ -63,7 +63,7 @@ public abstract class BlockingMethodFrameListener extends BlockingWaiter _waiters = new CopyOnWriteArrayList(); + private final List _waiters = new CopyOnWriteArrayList(); private Exception _lastException; public AMQStateManager() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java index 6e47e2ce28..9e21e1c4ab 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java @@ -36,7 +36,7 @@ import org.apache.qpid.jms.BrokerDetails; */ public class AMQNoTransportForProtocolException extends AMQTransportConnectionException { - BrokerDetails _details; + private BrokerDetails _details; public AMQNoTransportForProtocolException(BrokerDetails details, String message, Throwable cause) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java index db7c16974a..3c9a6e1500 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java @@ -85,7 +85,7 @@ public class ClientConnectionDelegate extends ClientDelegate protected SaslClient createSaslClient(List brokerMechs) throws ConnectionException, SaslException { final String brokerMechanisms = Strings.join(" ", brokerMechs); - final String restrictionList = _conSettings.getSaslMechs(); + final String restrictionList = getConnectionSettings().getSaslMechs(); final String selectedMech = CallbackHandlerRegistry.getInstance().selectMechanism(brokerMechanisms, restrictionList); if (selectedMech == null) { @@ -96,14 +96,14 @@ public class ClientConnectionDelegate extends ClientDelegate } Map saslProps = new HashMap(); - if (_conSettings.isUseSASLEncryption()) + if (getConnectionSettings().isUseSASLEncryption()) { saslProps.put(Sasl.QOP, "auth-conf"); } final AMQCallbackHandler handler = CallbackHandlerRegistry.getInstance().createCallbackHandler(selectedMech); handler.initialise(_connectionURL); - final SaslClient sc = Sasl.createSaslClient(new String[] {selectedMech}, null, _conSettings.getSaslProtocol(), _conSettings.getSaslServerName(), saslProps, handler); + final SaslClient sc = Sasl.createSaslClient(new String[] {selectedMech}, null, getConnectionSettings().getSaslProtocol(), getConnectionSettings().getSaslServerName(), saslProps, handler); return sc; } @@ -137,7 +137,7 @@ public class ClientConnectionDelegate extends ClientDelegate private String getKerberosUser() { LOGGER.debug("Obtaining userID from kerberos"); - String service = _conSettings.getSaslProtocol() + "@" + _conSettings.getSaslServerName(); + String service = getConnectionSettings().getSaslProtocol() + "@" + getConnectionSettings().getSaslServerName(); GSSManager manager = GSSManager.getInstance(); try diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java index 5d36b2f19e..c371341265 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java @@ -85,7 +85,7 @@ public abstract class BlockingWaiter private volatile Exception _error; /** Holds the incomming Object. */ - protected Object _doneObject = null; + private Object _doneObject = null; private AtomicBoolean _waiting = new AtomicBoolean(false); private boolean _closed = false; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/collections/ReferenceMap.java b/qpid/java/client/src/main/java/org/apache/qpid/collections/ReferenceMap.java index 7bc1322e02..84e4704867 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/collections/ReferenceMap.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/collections/ReferenceMap.java @@ -795,10 +795,10 @@ public class ReferenceMap extends AbstractMap // the mapping is stale and should be removed. private class Entry implements Map.Entry, KeyValue { - Object key; - Object value; - int hash; - Entry next; + private Object key; + private Object value; + private int hash; + private Entry next; public Entry(Object key, int hash, Object value, Entry next) { @@ -887,17 +887,17 @@ public class ReferenceMap extends AbstractMap private class EntryIterator implements Iterator { // These fields keep track of where we are in the table. - int index; - Entry entry; - Entry previous; + private int index; + private Entry entry; + private Entry previous; // These Object fields provide hard references to the // current and next entry; this assures that if hasNext() // returns true, next() will actually return a valid element. - Object nextKey, nextValue; - Object currentKey, currentValue; + private Object nextKey, nextValue; + private Object currentKey, currentValue; - int expectedModCount; + private int expectedModCount; public EntryIterator() { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/collections/keyvalue/AbstractKeyValue.java b/qpid/java/client/src/main/java/org/apache/qpid/collections/keyvalue/AbstractKeyValue.java index a7ca67ad15..f1b6d11bee 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/collections/keyvalue/AbstractKeyValue.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/collections/keyvalue/AbstractKeyValue.java @@ -33,9 +33,9 @@ import org.apache.qpid.collections.KeyValue; public abstract class AbstractKeyValue implements KeyValue { /** The key */ - protected Object key; + private Object key; /** The value */ - protected Object value; + private Object value; /** * Constructs a new pair with the specified key and given value. @@ -67,6 +67,21 @@ public abstract class AbstractKeyValue implements KeyValue { return value; } + /** + * Sets the value stored in this Map.Entry. + *

+ * This Map.Entry is not connected to a Map, so only the + * local data is changed. + * + * @param value the new value + * @return the previous value + */ + public Object setValue(Object value) { + Object answer = this.value; + this.value = value; + return answer; + } + /** * Gets a debugging String view of the pair. * diff --git a/qpid/java/client/src/main/java/org/apache/qpid/collections/keyvalue/AbstractMapEntry.java b/qpid/java/client/src/main/java/org/apache/qpid/collections/keyvalue/AbstractMapEntry.java index a5223d2361..7135c31fd7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/collections/keyvalue/AbstractMapEntry.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/collections/keyvalue/AbstractMapEntry.java @@ -43,20 +43,7 @@ public abstract class AbstractMapEntry extends AbstractKeyValue implements Map.E // Map.Entry interface //------------------------------------------------------------------------- - /** - * Sets the value stored in this Map.Entry. - *

- * This Map.Entry is not connected to a Map, so only the - * local data is changed. - * - * @param value the new value - * @return the previous value - */ - public Object setValue(Object value) { - Object answer = this.value; - this.value = value; - return answer; - } + /** * Compares this Map.Entry with another Map.Entry. diff --git a/qpid/java/client/src/main/java/org/apache/qpid/filter/ArithmeticExpression.java b/qpid/java/client/src/main/java/org/apache/qpid/filter/ArithmeticExpression.java index a86613f10c..df5e2acd66 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/filter/ArithmeticExpression.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/filter/ArithmeticExpression.java @@ -243,13 +243,13 @@ public abstract class ArithmeticExpression extends BinaryExpression public Object evaluate(AbstractJMSMessage message) throws AMQInternalException { - Object lvalue = left.evaluate(message); + Object lvalue = getLeft().evaluate(message); if (lvalue == null) { return null; } - Object rvalue = right.evaluate(message); + Object rvalue = getRight().evaluate(message); if (rvalue == null) { return null; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/filter/BinaryExpression.java b/qpid/java/client/src/main/java/org/apache/qpid/filter/BinaryExpression.java index f97f858fad..a08a6cc094 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/filter/BinaryExpression.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/filter/BinaryExpression.java @@ -22,8 +22,8 @@ package org.apache.qpid.filter; */ public abstract class BinaryExpression implements Expression { - protected Expression left; - protected Expression right; + private final Expression left; + private final Expression right; public BinaryExpression(Expression left, Expression right) { @@ -84,20 +84,5 @@ public abstract class BinaryExpression implements Expression */ public abstract String getExpressionSymbol(); - /** - * @param expression - */ - public void setRight(Expression expression) - { - right = expression; - } - - /** - * @param expression - */ - public void setLeft(Expression expression) - { - left = expression; - } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/filter/ComparisonExpression.java b/qpid/java/client/src/main/java/org/apache/qpid/filter/ComparisonExpression.java index eebfec0b2d..87d43ec343 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/filter/ComparisonExpression.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/filter/ComparisonExpression.java @@ -69,7 +69,7 @@ public abstract class ComparisonExpression extends BinaryExpression implements B static class LikeExpression extends UnaryExpression implements BooleanExpression { - Pattern likePattern; + private Pattern likePattern; /** * @param right @@ -236,8 +236,8 @@ public abstract class ComparisonExpression extends BinaryExpression implements B public Object evaluate(AbstractJMSMessage message) throws AMQInternalException { - Object lv = left.evaluate(message); - Object rv = right.evaluate(message); + Object lv = getLeft().evaluate(message); + Object rv = getRight().evaluate(message); // Iff one of the values is null if ((lv == null) ^ (rv == null)) @@ -419,13 +419,13 @@ public abstract class ComparisonExpression extends BinaryExpression implements B public Object evaluate(AbstractJMSMessage message) throws AMQInternalException { - Comparable lv = (Comparable) left.evaluate(message); + Comparable lv = (Comparable) getLeft().evaluate(message); if (lv == null) { return null; } - Comparable rv = (Comparable) right.evaluate(message); + Comparable rv = (Comparable) getRight().evaluate(message); if (rv == null) { return null; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/filter/LogicExpression.java b/qpid/java/client/src/main/java/org/apache/qpid/filter/LogicExpression.java index 7ef85cbacb..b08b93228f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/filter/LogicExpression.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/filter/LogicExpression.java @@ -35,14 +35,14 @@ public abstract class LogicExpression extends BinaryExpression implements Boolea public Object evaluate(AbstractJMSMessage message) throws AMQInternalException { - Boolean lv = (Boolean) left.evaluate(message); + Boolean lv = (Boolean) getLeft().evaluate(message); // Can we do an OR shortcut?? if ((lv != null) && lv.booleanValue()) { return Boolean.TRUE; } - Boolean rv = (Boolean) right.evaluate(message); + Boolean rv = (Boolean) getRight().evaluate(message); return (rv == null) ? null : rv; } @@ -62,7 +62,7 @@ public abstract class LogicExpression extends BinaryExpression implements Boolea public Object evaluate(AbstractJMSMessage message) throws AMQInternalException { - Boolean lv = (Boolean) left.evaluate(message); + Boolean lv = (Boolean) getLeft().evaluate(message); // Can we do an AND shortcut?? if (lv == null) @@ -75,7 +75,7 @@ public abstract class LogicExpression extends BinaryExpression implements Boolea return Boolean.FALSE; } - Boolean rv = (Boolean) right.evaluate(message); + Boolean rv = (Boolean) getRight().evaluate(message); return (rv == null) ? null : rv; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java b/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java index ad0a625c6a..9a2e9de3d9 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java @@ -38,8 +38,8 @@ import java.nio.ByteBuffer; */ public class MessagePartListenerAdapter implements MessagePartListener { - MessageListener _adaptee; - ByteBufferMessage _currentMsg; + private MessageListener _adaptee; + private ByteBufferMessage _currentMsg; public MessagePartListenerAdapter(MessageListener listener) { diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQQueueTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQQueueTest.java index d862acf28d..bc48ee8895 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQQueueTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQQueueTest.java @@ -26,11 +26,11 @@ import org.apache.qpid.framing.AMQShortString; public class AMQQueueTest extends TestCase { - AMQShortString exchange = new AMQShortString("test.exchange"); - AMQShortString routingkey = new AMQShortString("test-route"); - AMQShortString qname = new AMQShortString("test-queue"); - AMQShortString[] oneBinding = new AMQShortString[]{new AMQShortString("bindingA")}; - AMQShortString[] bindings = new AMQShortString[]{new AMQShortString("bindingB"), + private AMQShortString exchange = new AMQShortString("test.exchange"); + private AMQShortString routingkey = new AMQShortString("test-route"); + private AMQShortString qname = new AMQShortString("test-queue"); + private AMQShortString[] oneBinding = new AMQShortString[]{new AMQShortString("bindingA")}; + private AMQShortString[] bindings = new AMQShortString[]{new AMQShortString("bindingB"), new AMQShortString("bindingC")}; public void testToURLNoBindings() diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java b/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java index 919809edc3..009598d8a4 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java @@ -51,13 +51,13 @@ public class MockAMQConnection extends AMQConnection @Override public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException { - _connected = true; - _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_OPEN); + setConnected(true); + getProtocolHandler().getStateManager().changeState(AMQState.CONNECTION_OPEN); return null; } public AMQConnectionDelegate getDelegate() { - return _delegate; + return super.getDelegate(); } } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java index 6b5fc81be6..9a5ca33174 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java @@ -59,15 +59,15 @@ public class AMQProtocolHandlerTest extends TestCase private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandlerTest.class); // The handler to test - AMQProtocolHandler _handler; + private AMQProtocolHandler _handler; // A frame to block upon whilst waiting the exception - AMQFrame _blockFrame; + private AMQFrame _blockFrame; // Latch to know when the listener receives an exception private CountDownLatch _handleCountDown; // The listener that will receive an exception - BlockToAccessFrameListener _listener; + private BlockToAccessFrameListener _listener; @Override public void setUp() throws Exception diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/util/ClassLoadingAwareObjectInputStreamTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/util/ClassLoadingAwareObjectInputStreamTest.java index 8cd320b06e..91460ab4e7 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/util/ClassLoadingAwareObjectInputStreamTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/util/ClassLoadingAwareObjectInputStreamTest.java @@ -31,8 +31,8 @@ import java.util.List; public class ClassLoadingAwareObjectInputStreamTest extends QpidTestCase { - InputStream _in; - ClassLoadingAwareObjectInputStream _claOIS; + private InputStream _in; + private ClassLoadingAwareObjectInputStream _claOIS; protected void setUp() throws Exception { diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java index 5690a05254..576ab4fa05 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java @@ -34,7 +34,7 @@ import java.util.Properties; public class JNDIPropertyFileTest extends TestCase { - Context ctx; + private Context ctx; public JNDIPropertyFileTest() throws Exception { -- cgit v1.2.1