summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xqpid/cpp/managementgen/qmfgen/generate.py6
-rwxr-xr-xqpid/cpp/rubygen/amqpgen.rb9
-rw-r--r--qpid/cpp/src/qpid/client/QueueOptions.cpp7
-rw-r--r--qpid/cpp/src/qpid/client/QueueOptions.h22
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h2
-rw-r--r--qpid/doc/book/src/cpp-broker/Cheat-Sheet-for-configuring-Queue-Options.xml27
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java47
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java24
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java5
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java44
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java7
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java53
-rw-r--r--qpid/python/qpid/messaging/driver.py3
-rw-r--r--qpid/python/qpid/messaging/endpoints.py18
-rw-r--r--qpid/python/qpid/tests/messaging/endpoints.py25
-rw-r--r--qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QpidConfig.java18
-rwxr-xr-xqpid/tools/src/py/qpid-config6
20 files changed, 199 insertions, 130 deletions
diff --git a/qpid/cpp/managementgen/qmfgen/generate.py b/qpid/cpp/managementgen/qmfgen/generate.py
index a7ad43cc30..22c53aa2e0 100755
--- a/qpid/cpp/managementgen/qmfgen/generate.py
+++ b/qpid/cpp/managementgen/qmfgen/generate.py
@@ -257,6 +257,8 @@ class CMakeLists(Makefile):
class Generator:
+ verbose = False
+
"""
This class manages code generation using template files. It is instantiated
once for an entire code generation session.
@@ -350,7 +352,9 @@ class Generator:
pass
os.rename (tempFile, target)
- print "Generated:", target
+
+ if self.verbose:
+ print "Generated:", target
def targetPackageFile (self, schema, templateFile):
dot = templateFile.find(".")
diff --git a/qpid/cpp/rubygen/amqpgen.rb b/qpid/cpp/rubygen/amqpgen.rb
index f42e177225..7eee953c04 100755
--- a/qpid/cpp/rubygen/amqpgen.rb
+++ b/qpid/cpp/rubygen/amqpgen.rb
@@ -489,6 +489,7 @@ class Generator
@prefix=[''] # For indentation or comments.
@indentstr=' ' # One indent level.
@outdent=2
+ @verbose=false
end
# Declare next file to be public API
@@ -504,10 +505,14 @@ class Generator
@out=String.new # Generate in memory first
yield if block
if @path.exist? and @path.read == @out
- puts "Skipped #{@path} - unchanged" # Dont generate if unchanged
+ if @verbose
+ puts "Skipped #{@path} - unchanged" # Dont generate if unchanged
+ end
else
@path.open('w') { |f| f << @out }
- puts "Generated #{@path}"
+ if @verbose
+ puts "Generated #{@path}"
+ end
end
end
end
diff --git a/qpid/cpp/src/qpid/client/QueueOptions.cpp b/qpid/cpp/src/qpid/client/QueueOptions.cpp
index e589bd76cd..f7705afeb0 100644
--- a/qpid/cpp/src/qpid/client/QueueOptions.cpp
+++ b/qpid/cpp/src/qpid/client/QueueOptions.cpp
@@ -40,8 +40,6 @@ const std::string QueueOptions::strRING_STRICT("ring_strict");
const std::string QueueOptions::strLastValueQueue("qpid.last_value_queue");
const std::string QueueOptions::strLVQMatchProperty("qpid.LVQ_key");
const std::string QueueOptions::strLastValueQueueNoBrowse("qpid.last_value_queue_no_browse");
-const std::string QueueOptions::strQueueEventMode("qpid.queue_event_generation");
-
QueueOptions::~QueueOptions()
{}
@@ -101,11 +99,6 @@ void QueueOptions::clearOrdering()
erase(strLastValueQueue);
}
-void QueueOptions::enableQueueEvents(bool enqueueOnly)
-{
- setInt(strQueueEventMode, enqueueOnly ? ENQUEUE_ONLY : ENQUEUE_AND_DEQUEUE);
-}
-
}
}
diff --git a/qpid/cpp/src/qpid/client/QueueOptions.h b/qpid/cpp/src/qpid/client/QueueOptions.h
index a2f30a50b5..f7ace15ff9 100644
--- a/qpid/cpp/src/qpid/client/QueueOptions.h
+++ b/qpid/cpp/src/qpid/client/QueueOptions.h
@@ -75,28 +75,6 @@ class QPID_CLIENT_CLASS_EXTERN QueueOptions: public framing::FieldTable
*/
QPID_CLIENT_EXTERN void clearOrdering();
- /**
- * Turns on event generation for this queue (either enqueue only
- * or for enqueue and dequeue events); the events can then be
- * processed by a regsitered broker plugin.
- *
- * DEPRECATED
- *
- * This is confusing to anyone who sees only the function call
- * and not the variable name / doxygen. Consider the following call:
- *
- * options.enableQueueEvents(false);
- *
- * It looks like it disables queue events, but what it really does is
- * enable both enqueue and dequeue events.
- *
- * Use setInt() instead:
- *
- * options.setInt("qpid.queue_event_generation", 2);
- */
-
- QPID_CLIENT_EXTERN void enableQueueEvents(bool enqueueOnly);
-
static QPID_CLIENT_EXTERN const std::string strMaxCountKey;
static QPID_CLIENT_EXTERN const std::string strMaxSizeKey;
static QPID_CLIENT_EXTERN const std::string strTypeKey;
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
index 3ed3b457ba..35ce82cf5d 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
@@ -36,7 +36,7 @@ namespace amqp0_10 {
class AddressResolution;
class MessageSink;
-struct OutgoingMessage;
+class OutgoingMessage;
/**
*
diff --git a/qpid/doc/book/src/cpp-broker/Cheat-Sheet-for-configuring-Queue-Options.xml b/qpid/doc/book/src/cpp-broker/Cheat-Sheet-for-configuring-Queue-Options.xml
index e693ee463b..125372e463 100644
--- a/qpid/doc/book/src/cpp-broker/Cheat-Sheet-for-configuring-Queue-Options.xml
+++ b/qpid/doc/book/src/cpp-broker/Cheat-Sheet-for-configuring-Queue-Options.xml
@@ -179,34 +179,7 @@
<section role="h3" id="CheatSheetforconfiguringQueueOptions-Settingadditionalbehaviors"><title>
Setting additional behaviors
</title>
- <section role="h4" id="CheatSheetforconfiguringQueueOptions-Queueeventgeneration"><title>
- Queue
- event generation
- </title>
- <para>
- This option is used to determine whether enqueue/dequeue events
- representing changes made to queue state are generated. These
- events can then be processed by plugins such as that used for
- <xref linkend="queue-state-replication"/>.
- </para><para>
- Example:
- </para>
- <programlisting>
-#include "qpid/client/QueueOptions.h"
-
- QueueOptions options;
- options.enableQueueEvents(1);
- session.queueDeclare(arg::queue="my-queue", arg::arguments=options);
-</programlisting>
- <para>
- The boolean option indicates whether only enqueue events should
- be generated. The key set by this is
- 'qpid.queue_event_generation' and the value is and integer value
- of 1 (to replicate only enqueue events) or 2 (to replicate both
- enqueue and dequeue events).
- </para>
<!--h3--></section>
- <!--h4--></section>
<section role="h3" id="CheatSheetforconfiguringQueueOptions-OtherClients"><title>
Other
Clients
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 4c596b88a0..8e7b5b90d8 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -1216,11 +1216,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
return _failoverMutex;
}
- public void failoverPrep()
- {
- _delegate.failoverPrep();
- }
-
public void resubscribeSessions() throws JMSException, AMQException, FailoverException
{
_delegate.resubscribeSessions();
@@ -1653,4 +1648,46 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
return _messageCompressionThresholdSize;
}
+
+ void doWithAllLocks(Runnable r)
+ {
+ doWithAllLocks(r, _sessions.values());
+
+ }
+
+ private void doWithAllLocks(final Runnable r, final List<AMQSession> sessions)
+ {
+ if (!sessions.isEmpty())
+ {
+ AMQSession session = sessions.remove(0);
+
+ final Object dispatcherLock = session.getDispatcherLock();
+ if (dispatcherLock != null)
+ {
+ synchronized (dispatcherLock)
+ {
+ synchronized (session.getMessageDeliveryLock())
+ {
+ doWithAllLocks(r, sessions);
+ }
+ }
+ }
+ else
+ {
+ synchronized (session.getMessageDeliveryLock())
+ {
+ doWithAllLocks(r, sessions);
+ }
+ }
+ }
+ else
+ {
+ synchronized (getFailoverMutex())
+ {
+ r.run();
+ }
+ }
+ }
+
+
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
index 74ca1ed74f..c359fbcc84 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
@@ -52,8 +52,6 @@ public interface AMQConnectionDelegate
XASession createXASession(int ackMode) throws JMSException;
- void failoverPrep();
-
void resubscribeSessions() throws JMSException, AMQException, FailoverException;
void closeConnection(long timeout) throws JMSException, AMQException;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index fdeab7ae70..e22a341205 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
@@ -249,7 +250,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values());
for (AMQSession s : sessions)
{
- s.failoverPrep();
+ ((AMQSession_0_10)s).failoverPrep();
}
}
@@ -306,16 +307,21 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
_qpidConnection.notifyFailoverRequired();
- synchronized (_conn.getFailoverMutex())
+ final AtomicBoolean failoverDone = new AtomicBoolean();
+
+ _conn.doWithAllLocks(new Runnable()
{
+ @Override
+ public void run()
+ {
try
{
if (_conn.firePreFailover(false) && _conn.attemptReconnection())
{
- _conn.failoverPrep();
+ failoverPrep();
_conn.resubscribeSessions();
_conn.fireFailoverComplete();
- return;
+ failoverDone.set(true);
}
}
catch (Exception e)
@@ -327,9 +333,19 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
_conn.getProtocolHandler().getFailoverLatch().countDown();
_conn.getProtocolHandler().setFailoverLatch(null);
}
+
+ }
+ });
+
+
+ if (failoverDone.get())
+ {
+ return;
}
+
}
+
_conn.setClosed();
final ExceptionListener listener = _conn.getExceptionListenerNoCheck();
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index d0d9d28398..8176358733 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -350,11 +350,6 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
}
}
- public void failoverPrep()
- {
- // do nothing
- }
-
/**
* For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling.
* The caller must hold the failover mutex before calling this method.
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 8f5e9524b6..3966e75423 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -169,7 +169,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private final Lock _subscriberDetails = new ReentrantLock(true);
private final Lock _subscriberAccess = new ReentrantLock(true);
- private final FlowControllingBlockingQueue _queue;
+ private final FlowControllingBlockingQueue<Dispatchable> _queue;
private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
private final AtomicLong _rollbackMark = new AtomicLong(-1);
@@ -358,7 +358,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (_acknowledgeMode == NO_ACKNOWLEDGE)
{
_queue =
- new FlowControllingBlockingQueue(_prefetchHighMark, _prefetchLowMark,
+ new FlowControllingBlockingQueue<Dispatchable>(_prefetchHighMark, _prefetchLowMark,
new FlowControllingBlockingQueue.ThresholdListener()
{
private final AtomicBoolean _suspendState = new AtomicBoolean();
@@ -423,7 +423,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
else
{
- _queue = new FlowControllingBlockingQueue(_prefetchHighMark, null);
+ _queue = new FlowControllingBlockingQueue<Dispatchable>(_prefetchHighMark, null);
}
// Add creation logging to tie in with the existing close logging
@@ -1789,7 +1789,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
//in the pre-dispatch queue.
_usingDispatcherForCleanup = true;
- syncDispatchQueue();
+ syncDispatchQueue(false);
// Set to false before sending the recover as 0-8/9/9-1 will
//send messages back before the recover completes, and we
@@ -1881,7 +1881,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
setRollbackMark();
- syncDispatchQueue();
+ syncDispatchQueue(false);
_dispatcher.rollback();
@@ -2201,21 +2201,17 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
- void failoverPrep()
- {
- syncDispatchQueue();
- }
- void syncDispatchQueue()
+ void syncDispatchQueue(final boolean holdDispatchLock)
{
- if (Thread.currentThread() == _dispatcherThread)
+ if (Thread.currentThread() == _dispatcherThread || holdDispatchLock)
{
while (!super.isClosed() && !_queue.isEmpty())
{
Dispatchable disp;
try
{
- disp = (Dispatchable) _queue.take();
+ disp = _queue.take();
}
catch (InterruptedException e)
{
@@ -2267,7 +2263,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
Dispatchable disp;
try
{
- disp = (Dispatchable) _queue.take();
+ disp = _queue.take();
}
catch (InterruptedException e)
{
@@ -3086,7 +3082,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private void rejectMessagesForConsumerTag(int consumerTag, boolean requeue, boolean rejectAllConsumers)
{
- Iterator messages = _queue.iterator();
+ Iterator<Dispatchable> messages = _queue.iterator();
if (_logger.isDebugEnabled())
{
_logger.debug("Rejecting messages from _queue for Consumer tag(" + consumerTag + ") (PDispatchQ) requeue:"
@@ -3237,6 +3233,12 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public abstract void setFlowControl(final boolean active);
+ Object getDispatcherLock()
+ {
+ Dispatcher dispatcher = _dispatcher;
+ return dispatcher == null ? null : dispatcher._lock;
+ }
+
public interface Dispatchable
{
void dispatch(AMQSession ssn);
@@ -3389,10 +3391,18 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
try
{
- Dispatchable disp;
- while (((disp = (Dispatchable) _queue.take()) != null) && !_closed.get())
+
+ while (((_queue.blockingPeek()) != null) && !_closed.get())
{
- disp.dispatch(AMQSession.this);
+ synchronized (_lock)
+ {
+ Dispatchable disp = _queue.nonBlockingTake();
+
+ if(disp != null)
+ {
+ disp.dispatch(AMQSession.this);
+ }
+ }
}
}
catch (InterruptedException e)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 206ca15c82..08d7ea3f67 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -235,7 +235,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
void failoverPrep()
{
- super.failoverPrep();
+ syncDispatchQueue(true);
clearUnacked();
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index e0d8ac3702..0cb103f0cb 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -412,7 +412,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
_capacity,
Option.UNRELIABLE);
}
- _0_10session.syncDispatchQueue();
+ _0_10session.syncDispatchQueue(false);
o = super.getMessageFromQueue(-1);
}
if (_capacity == 0)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java b/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java
index f46c61daa7..0ba5cfdacb 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java
@@ -22,13 +22,16 @@ package org.apache.qpid.client;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
public final class ChannelToSessionMap
{
- private final Map<Integer, AMQSession> _sessionMap = new ConcurrentHashMap<>();
+ private final Map<Integer, AMQSession> _sessionMap = Collections.synchronizedMap(new LinkedHashMap<Integer, AMQSession>());
private AtomicInteger _idFactory = new AtomicInteger(0);
private int _maxChannelID;
private int _minChannelID;
@@ -48,7 +51,7 @@ public final class ChannelToSessionMap
_sessionMap.remove(channelId);
}
- public Collection<AMQSession> values()
+ public List<AMQSession> values()
{
return new ArrayList<>(_sessionMap.values());
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
index b194ac88de..df54b7066b 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
@@ -20,13 +20,13 @@
*/
package org.apache.qpid.client.util;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow
* control) to try to prevent the queue growing (much) further. The underlying queue itself is not bounded therefore the
@@ -37,12 +37,12 @@ import java.util.concurrent.ConcurrentLinkedQueue;
* <p>
* TODO Make this implement java.util.Queue and hide the implementation. Then different queue types can be substituted.
*/
-public class FlowControllingBlockingQueue
+public class FlowControllingBlockingQueue<T>
{
private static final Logger _logger = LoggerFactory.getLogger(FlowControllingBlockingQueue.class);
/** This queue is bounded and is used to store messages before being dispatched to the consumer */
- private final Queue _queue = new ConcurrentLinkedQueue();
+ private final Queue<T> _queue = new ConcurrentLinkedQueue<T>();
private final int _flowControlHighThreshold;
private final int _flowControlLowThreshold;
@@ -82,9 +82,44 @@ public class FlowControllingBlockingQueue
}
}
- public Object take() throws InterruptedException
+ public T blockingPeek() throws InterruptedException
+ {
+ T o = _queue.peek();
+ if (o == null)
+ {
+ synchronized (this)
+ {
+ while ((o = _queue.peek()) == null)
+ {
+ wait();
+ }
+ }
+ }
+ return o;
+ }
+
+ public T nonBlockingTake() throws InterruptedException
+ {
+ T o = _queue.poll();
+
+ if (o != null && !disableFlowControl && _listener != null)
+ {
+ synchronized (_listener)
+ {
+ if (_count-- == _flowControlLowThreshold)
+ {
+ _listener.underThreshold(_count);
+ }
+ }
+
+ }
+
+ return o;
+ }
+
+ public T take() throws InterruptedException
{
- Object o = _queue.poll();
+ T o = _queue.poll();
if(o == null)
{
synchronized(this)
@@ -110,7 +145,7 @@ public class FlowControllingBlockingQueue
return o;
}
- public void add(Object o)
+ public void add(T o)
{
synchronized(this)
{
@@ -130,7 +165,7 @@ public class FlowControllingBlockingQueue
}
}
- public Iterator iterator()
+ public Iterator<T> iterator()
{
return _queue.iterator();
}
diff --git a/qpid/python/qpid/messaging/driver.py b/qpid/python/qpid/messaging/driver.py
index e7d564f555..7c30e5d4ba 100644
--- a/qpid/python/qpid/messaging/driver.py
+++ b/qpid/python/qpid/messaging/driver.py
@@ -1368,7 +1368,8 @@ class Engine:
assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending)
rcv.received += 1
log.debug("RCVD[%s]: %s", ssn.log_id, msg)
- ssn.incoming.append(msg)
+ ssn.message_received(msg)
+
def _decode(self, xfr):
dp = EMPTY_DP
diff --git a/qpid/python/qpid/messaging/endpoints.py b/qpid/python/qpid/messaging/endpoints.py
index 7d353e1cb4..6d58b4ac25 100644
--- a/qpid/python/qpid/messaging/endpoints.py
+++ b/qpid/python/qpid/messaging/endpoints.py
@@ -569,6 +569,7 @@ class Session(Endpoint):
self.closed = False
self._lock = connection._lock
+ self._msg_received = None
def __repr__(self):
return "<Session %s>" % self.name
@@ -600,6 +601,11 @@ class Session(Endpoint):
if self.closed:
raise SessionClosed()
+ def message_received(self, msg):
+ self.incoming.append(msg)
+ if self._msg_received:
+ self._msg_received()
+
@synchronized
def sender(self, target, **options):
"""
@@ -685,6 +691,18 @@ class Session(Endpoint):
return None
@synchronized
+ def set_message_received_handler(self, handler):
+ """Register a callback that will be invoked when a message arrives on the
+ session. Use with caution: since this callback is invoked in the context
+ of the driver thread, it is not safe to call any of the public messaging
+ APIs from within this callback. The intent of the handler is to provide
+ an efficient way to notify the application that a message has arrived.
+ This can be useful for those applications that need to schedule a task
+ to poll for received messages without blocking in the messaging API.
+ """
+ self._msg_received = handler
+
+ @synchronized
def next_receiver(self, timeout=None):
if self._ecwait(lambda: self.incoming, timeout):
return self.incoming[0]._receiver
diff --git a/qpid/python/qpid/tests/messaging/endpoints.py b/qpid/python/qpid/tests/messaging/endpoints.py
index 247d6e9a29..56722374e5 100644
--- a/qpid/python/qpid/tests/messaging/endpoints.py
+++ b/qpid/python/qpid/tests/messaging/endpoints.py
@@ -660,6 +660,31 @@ class SessionTests(Base):
except Detached:
pass
+ def testRxCallback(self):
+ """Verify that the callback is invoked when a message is received.
+ """
+ ADDR = 'test-rx_callback-queue; {create: always, delete: receiver}'
+ class CallbackHandler:
+ def __init__(self):
+ self.handler_called = False
+ def __call__(self):
+ self.handler_called = True
+ cb = CallbackHandler()
+ self.ssn.set_message_received_handler(cb)
+ rcv = self.ssn.receiver(ADDR)
+ rcv.capacity = UNLIMITED
+ snd = self.ssn.sender(ADDR)
+ assert not cb.handler_called
+ snd.send("Ping")
+ deadline = time.time() + self.timeout()
+ while time.time() < deadline:
+ if cb.handler_called:
+ break;
+ assert cb.handler_called
+ snd.close()
+ rcv.close()
+
+
RECEIVER_Q = 'test-receiver-queue; {create: always, delete: always}'
class ReceiverTests(Base):
diff --git a/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QpidConfig.java b/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QpidConfig.java
index 04a0642e2e..8cca763e4c 100644
--- a/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QpidConfig.java
+++ b/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QpidConfig.java
@@ -344,7 +344,6 @@ public final class QpidConfig
private String _order = "fifo";
private boolean _msgSequence = false;
private boolean _ive = false;
- private long _eventGeneration = 0;
private String _file = null;
// New to Qpid 0.10 qpid-config
@@ -364,7 +363,6 @@ public final class QpidConfig
private static final String LVQNB = "qpid.last_value_queue_no_browse";
private static final String MSG_SEQUENCE = "qpid.msg_sequence";
private static final String IVE = "qpid.ive";
- private static final String QUEUE_EVENT_GENERATION = "qpid.queue_event_generation";
private static final String FLOW_STOP_COUNT = "qpid.flow_stop_count";
private static final String FLOW_RESUME_COUNT = "qpid.flow_resume_count";
private static final String FLOW_STOP_SIZE = "qpid.flow_stop_size";
@@ -387,7 +385,6 @@ public final class QpidConfig
SPECIAL_ARGS.add(LVQNB);
SPECIAL_ARGS.add(MSG_SEQUENCE);
SPECIAL_ARGS.add(IVE);
- SPECIAL_ARGS.add(QUEUE_EVENT_GENERATION);
SPECIAL_ARGS.add(FLOW_STOP_COUNT);
SPECIAL_ARGS.add(FLOW_RESUME_COUNT);
SPECIAL_ARGS.add(FLOW_STOP_SIZE);
@@ -714,11 +711,6 @@ for (Map.Entry<String, Object> entry : args.entrySet()) {
System.out.printf("--order lvq-no-browse ");
}
- if (args.containsKey(QUEUE_EVENT_GENERATION))
- {
- System.out.printf("--generate-queue-events=%d ", QmfData.getLong(args.get(QUEUE_EVENT_GENERATION)));
- }
-
if (queue.hasValue("altExchange"))
{
ObjectId altExchangeRef = queue.getRefValue("altExchange");
@@ -936,11 +928,6 @@ for (Map.Entry<String, Object> entry : args.entrySet()) {
properties.put(LVQNB, 1l);
}
- if (_eventGeneration > 0)
- {
- properties.put(QUEUE_EVENT_GENERATION, _eventGeneration);
- }
-
if (_altExchange != null)
{
properties.put("alternate-exchange", _altExchange);
@@ -1393,11 +1380,6 @@ for (Map.Entry<String, Object> entry : args.entrySet()) {
_ive = true;
}
- if (opt[0].equals("--generate-queue-events"))
- {
- _eventGeneration = Long.parseLong(opt[1]);
- }
-
if (opt[0].equals("--force"))
{
_ifEmpty = false;
diff --git a/qpid/tools/src/py/qpid-config b/qpid/tools/src/py/qpid-config
index 8d38b1a342..816e0f0a08 100755
--- a/qpid/tools/src/py/qpid-config
+++ b/qpid/tools/src/py/qpid-config
@@ -147,7 +147,6 @@ POLICY_TYPE = "qpid.policy_type"
LVQ_KEY = "qpid.last_value_queue_key"
MSG_SEQUENCE = "qpid.msg_sequence"
IVE = "qpid.ive"
-QUEUE_EVENT_GENERATION = "qpid.queue_event_generation"
FLOW_STOP_COUNT = "qpid.flow_stop_count"
FLOW_RESUME_COUNT = "qpid.flow_resume_count"
FLOW_STOP_SIZE = "qpid.flow_stop_size"
@@ -164,7 +163,7 @@ REPLICATE = "qpid.replicate"
SPECIAL_ARGS=[
FILECOUNT,FILESIZE,EFP_PARTITION_NUM,EFP_POOL_FILE_SIZE,
MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,
- LVQ_KEY,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION,
+ LVQ_KEY,MSG_SEQUENCE,IVE,
FLOW_STOP_COUNT,FLOW_RESUME_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE,
MSG_GROUP_HDR_KEY,SHARED_MSG_GROUP,REPLICATE]
@@ -541,7 +540,6 @@ class BrokerManager:
if MAX_QUEUE_COUNT in args: print "--max-queue-count=%s" % args[MAX_QUEUE_COUNT],
if POLICY_TYPE in args: print "--limit-policy=%s" % args[POLICY_TYPE].replace("_", "-"),
if LVQ_KEY in args: print "--lvq-key=%s" % args[LVQ_KEY],
- if QUEUE_EVENT_GENERATION in args: print "--generate-queue-events=%s" % args[QUEUE_EVENT_GENERATION],
if q.altExchange:
print "--alternate-exchange=%s" % q.altExchange,
if FLOW_STOP_SIZE in args: print "--flow-stop-size=%s" % args[FLOW_STOP_SIZE],
@@ -638,8 +636,6 @@ class BrokerManager:
if config._lvq_key:
declArgs[LVQ_KEY] = config._lvq_key
- if config._eventGeneration:
- declArgs[QUEUE_EVENT_GENERATION] = config._eventGeneration
if config._flowStopSize is not None:
declArgs[FLOW_STOP_SIZE] = config._flowStopSize