diff options
20 files changed, 199 insertions, 130 deletions
diff --git a/qpid/cpp/managementgen/qmfgen/generate.py b/qpid/cpp/managementgen/qmfgen/generate.py index a7ad43cc30..22c53aa2e0 100755 --- a/qpid/cpp/managementgen/qmfgen/generate.py +++ b/qpid/cpp/managementgen/qmfgen/generate.py @@ -257,6 +257,8 @@ class CMakeLists(Makefile): class Generator: + verbose = False + """ This class manages code generation using template files. It is instantiated once for an entire code generation session. @@ -350,7 +352,9 @@ class Generator: pass os.rename (tempFile, target) - print "Generated:", target + + if self.verbose: + print "Generated:", target def targetPackageFile (self, schema, templateFile): dot = templateFile.find(".") diff --git a/qpid/cpp/rubygen/amqpgen.rb b/qpid/cpp/rubygen/amqpgen.rb index f42e177225..7eee953c04 100755 --- a/qpid/cpp/rubygen/amqpgen.rb +++ b/qpid/cpp/rubygen/amqpgen.rb @@ -489,6 +489,7 @@ class Generator @prefix=[''] # For indentation or comments. @indentstr=' ' # One indent level. @outdent=2 + @verbose=false end # Declare next file to be public API @@ -504,10 +505,14 @@ class Generator @out=String.new # Generate in memory first yield if block if @path.exist? and @path.read == @out - puts "Skipped #{@path} - unchanged" # Dont generate if unchanged + if @verbose + puts "Skipped #{@path} - unchanged" # Dont generate if unchanged + end else @path.open('w') { |f| f << @out } - puts "Generated #{@path}" + if @verbose + puts "Generated #{@path}" + end end end end diff --git a/qpid/cpp/src/qpid/client/QueueOptions.cpp b/qpid/cpp/src/qpid/client/QueueOptions.cpp index e589bd76cd..f7705afeb0 100644 --- a/qpid/cpp/src/qpid/client/QueueOptions.cpp +++ b/qpid/cpp/src/qpid/client/QueueOptions.cpp @@ -40,8 +40,6 @@ const std::string QueueOptions::strRING_STRICT("ring_strict"); const std::string QueueOptions::strLastValueQueue("qpid.last_value_queue"); const std::string QueueOptions::strLVQMatchProperty("qpid.LVQ_key"); const std::string QueueOptions::strLastValueQueueNoBrowse("qpid.last_value_queue_no_browse"); -const std::string QueueOptions::strQueueEventMode("qpid.queue_event_generation"); - QueueOptions::~QueueOptions() {} @@ -101,11 +99,6 @@ void QueueOptions::clearOrdering() erase(strLastValueQueue); } -void QueueOptions::enableQueueEvents(bool enqueueOnly) -{ - setInt(strQueueEventMode, enqueueOnly ? ENQUEUE_ONLY : ENQUEUE_AND_DEQUEUE); -} - } } diff --git a/qpid/cpp/src/qpid/client/QueueOptions.h b/qpid/cpp/src/qpid/client/QueueOptions.h index a2f30a50b5..f7ace15ff9 100644 --- a/qpid/cpp/src/qpid/client/QueueOptions.h +++ b/qpid/cpp/src/qpid/client/QueueOptions.h @@ -75,28 +75,6 @@ class QPID_CLIENT_CLASS_EXTERN QueueOptions: public framing::FieldTable */ QPID_CLIENT_EXTERN void clearOrdering(); - /** - * Turns on event generation for this queue (either enqueue only - * or for enqueue and dequeue events); the events can then be - * processed by a regsitered broker plugin. - * - * DEPRECATED - * - * This is confusing to anyone who sees only the function call - * and not the variable name / doxygen. Consider the following call: - * - * options.enableQueueEvents(false); - * - * It looks like it disables queue events, but what it really does is - * enable both enqueue and dequeue events. - * - * Use setInt() instead: - * - * options.setInt("qpid.queue_event_generation", 2); - */ - - QPID_CLIENT_EXTERN void enableQueueEvents(bool enqueueOnly); - static QPID_CLIENT_EXTERN const std::string strMaxCountKey; static QPID_CLIENT_EXTERN const std::string strMaxSizeKey; static QPID_CLIENT_EXTERN const std::string strTypeKey; diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h index 3ed3b457ba..35ce82cf5d 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h @@ -36,7 +36,7 @@ namespace amqp0_10 { class AddressResolution; class MessageSink; -struct OutgoingMessage; +class OutgoingMessage; /** * diff --git a/qpid/doc/book/src/cpp-broker/Cheat-Sheet-for-configuring-Queue-Options.xml b/qpid/doc/book/src/cpp-broker/Cheat-Sheet-for-configuring-Queue-Options.xml index e693ee463b..125372e463 100644 --- a/qpid/doc/book/src/cpp-broker/Cheat-Sheet-for-configuring-Queue-Options.xml +++ b/qpid/doc/book/src/cpp-broker/Cheat-Sheet-for-configuring-Queue-Options.xml @@ -179,34 +179,7 @@ <section role="h3" id="CheatSheetforconfiguringQueueOptions-Settingadditionalbehaviors"><title> Setting additional behaviors </title> - <section role="h4" id="CheatSheetforconfiguringQueueOptions-Queueeventgeneration"><title> - Queue - event generation - </title> - <para> - This option is used to determine whether enqueue/dequeue events - representing changes made to queue state are generated. These - events can then be processed by plugins such as that used for - <xref linkend="queue-state-replication"/>. - </para><para> - Example: - </para> - <programlisting> -#include "qpid/client/QueueOptions.h" - - QueueOptions options; - options.enableQueueEvents(1); - session.queueDeclare(arg::queue="my-queue", arg::arguments=options); -</programlisting> - <para> - The boolean option indicates whether only enqueue events should - be generated. The key set by this is - 'qpid.queue_event_generation' and the value is and integer value - of 1 (to replicate only enqueue events) or 2 (to replicate both - enqueue and dequeue events). - </para> <!--h3--></section> - <!--h4--></section> <section role="h3" id="CheatSheetforconfiguringQueueOptions-OtherClients"><title> Other Clients diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 4c596b88a0..8e7b5b90d8 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -1216,11 +1216,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _failoverMutex; } - public void failoverPrep() - { - _delegate.failoverPrep(); - } - public void resubscribeSessions() throws JMSException, AMQException, FailoverException { _delegate.resubscribeSessions(); @@ -1653,4 +1648,46 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return _messageCompressionThresholdSize; } + + void doWithAllLocks(Runnable r) + { + doWithAllLocks(r, _sessions.values()); + + } + + private void doWithAllLocks(final Runnable r, final List<AMQSession> sessions) + { + if (!sessions.isEmpty()) + { + AMQSession session = sessions.remove(0); + + final Object dispatcherLock = session.getDispatcherLock(); + if (dispatcherLock != null) + { + synchronized (dispatcherLock) + { + synchronized (session.getMessageDeliveryLock()) + { + doWithAllLocks(r, sessions); + } + } + } + else + { + synchronized (session.getMessageDeliveryLock()) + { + doWithAllLocks(r, sessions); + } + } + } + else + { + synchronized (getFailoverMutex()) + { + r.run(); + } + } + } + + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index 74ca1ed74f..c359fbcc84 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -52,8 +52,6 @@ public interface AMQConnectionDelegate XASession createXASession(int ackMode) throws JMSException; - void failoverPrep(); - void resubscribeSessions() throws JMSException, AMQException, FailoverException; void closeConnection(long timeout) throws JMSException, AMQException; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index fdeab7ae70..e22a341205 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.ExceptionListener; import javax.jms.JMSException; @@ -249,7 +250,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values()); for (AMQSession s : sessions) { - s.failoverPrep(); + ((AMQSession_0_10)s).failoverPrep(); } } @@ -306,16 +307,21 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec _qpidConnection.notifyFailoverRequired(); - synchronized (_conn.getFailoverMutex()) + final AtomicBoolean failoverDone = new AtomicBoolean(); + + _conn.doWithAllLocks(new Runnable() { + @Override + public void run() + { try { if (_conn.firePreFailover(false) && _conn.attemptReconnection()) { - _conn.failoverPrep(); + failoverPrep(); _conn.resubscribeSessions(); _conn.fireFailoverComplete(); - return; + failoverDone.set(true); } } catch (Exception e) @@ -327,9 +333,19 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec _conn.getProtocolHandler().getFailoverLatch().countDown(); _conn.getProtocolHandler().setFailoverLatch(null); } + + } + }); + + + if (failoverDone.get()) + { + return; } + } + _conn.setClosed(); final ExceptionListener listener = _conn.getExceptionListenerNoCheck(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index d0d9d28398..8176358733 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -350,11 +350,6 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate } } - public void failoverPrep() - { - // do nothing - } - /** * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling. * The caller must hold the failover mutex before calling this method. diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 8f5e9524b6..3966e75423 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -169,7 +169,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private final Lock _subscriberDetails = new ReentrantLock(true); private final Lock _subscriberAccess = new ReentrantLock(true); - private final FlowControllingBlockingQueue _queue; + private final FlowControllingBlockingQueue<Dispatchable> _queue; private final AtomicLong _highestDeliveryTag = new AtomicLong(-1); private final AtomicLong _rollbackMark = new AtomicLong(-1); @@ -358,7 +358,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (_acknowledgeMode == NO_ACKNOWLEDGE) { _queue = - new FlowControllingBlockingQueue(_prefetchHighMark, _prefetchLowMark, + new FlowControllingBlockingQueue<Dispatchable>(_prefetchHighMark, _prefetchLowMark, new FlowControllingBlockingQueue.ThresholdListener() { private final AtomicBoolean _suspendState = new AtomicBoolean(); @@ -423,7 +423,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } else { - _queue = new FlowControllingBlockingQueue(_prefetchHighMark, null); + _queue = new FlowControllingBlockingQueue<Dispatchable>(_prefetchHighMark, null); } // Add creation logging to tie in with the existing close logging @@ -1789,7 +1789,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic //in the pre-dispatch queue. _usingDispatcherForCleanup = true; - syncDispatchQueue(); + syncDispatchQueue(false); // Set to false before sending the recover as 0-8/9/9-1 will //send messages back before the recover completes, and we @@ -1881,7 +1881,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic setRollbackMark(); - syncDispatchQueue(); + syncDispatchQueue(false); _dispatcher.rollback(); @@ -2201,21 +2201,17 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } - void failoverPrep() - { - syncDispatchQueue(); - } - void syncDispatchQueue() + void syncDispatchQueue(final boolean holdDispatchLock) { - if (Thread.currentThread() == _dispatcherThread) + if (Thread.currentThread() == _dispatcherThread || holdDispatchLock) { while (!super.isClosed() && !_queue.isEmpty()) { Dispatchable disp; try { - disp = (Dispatchable) _queue.take(); + disp = _queue.take(); } catch (InterruptedException e) { @@ -2267,7 +2263,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic Dispatchable disp; try { - disp = (Dispatchable) _queue.take(); + disp = _queue.take(); } catch (InterruptedException e) { @@ -3086,7 +3082,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private void rejectMessagesForConsumerTag(int consumerTag, boolean requeue, boolean rejectAllConsumers) { - Iterator messages = _queue.iterator(); + Iterator<Dispatchable> messages = _queue.iterator(); if (_logger.isDebugEnabled()) { _logger.debug("Rejecting messages from _queue for Consumer tag(" + consumerTag + ") (PDispatchQ) requeue:" @@ -3237,6 +3233,12 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public abstract void setFlowControl(final boolean active); + Object getDispatcherLock() + { + Dispatcher dispatcher = _dispatcher; + return dispatcher == null ? null : dispatcher._lock; + } + public interface Dispatchable { void dispatch(AMQSession ssn); @@ -3389,10 +3391,18 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic try { - Dispatchable disp; - while (((disp = (Dispatchable) _queue.take()) != null) && !_closed.get()) + + while (((_queue.blockingPeek()) != null) && !_closed.get()) { - disp.dispatch(AMQSession.this); + synchronized (_lock) + { + Dispatchable disp = _queue.nonBlockingTake(); + + if(disp != null) + { + disp.dispatch(AMQSession.this); + } + } } } catch (InterruptedException e) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 206ca15c82..08d7ea3f67 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -235,7 +235,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic void failoverPrep() { - super.failoverPrep(); + syncDispatchQueue(true); clearUnacked(); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index e0d8ac3702..0cb103f0cb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -412,7 +412,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM _capacity, Option.UNRELIABLE); } - _0_10session.syncDispatchQueue(); + _0_10session.syncDispatchQueue(false); o = super.getMessageFromQueue(-1); } if (_capacity == 0) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java b/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java index f46c61daa7..0ba5cfdacb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java @@ -22,13 +22,16 @@ package org.apache.qpid.client; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; public final class ChannelToSessionMap { - private final Map<Integer, AMQSession> _sessionMap = new ConcurrentHashMap<>(); + private final Map<Integer, AMQSession> _sessionMap = Collections.synchronizedMap(new LinkedHashMap<Integer, AMQSession>()); private AtomicInteger _idFactory = new AtomicInteger(0); private int _maxChannelID; private int _minChannelID; @@ -48,7 +51,7 @@ public final class ChannelToSessionMap _sessionMap.remove(channelId); } - public Collection<AMQSession> values() + public List<AMQSession> values() { return new ArrayList<>(_sessionMap.values()); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java index b194ac88de..df54b7066b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java @@ -20,13 +20,13 @@ */ package org.apache.qpid.client.util; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.Iterator; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow * control) to try to prevent the queue growing (much) further. The underlying queue itself is not bounded therefore the @@ -37,12 +37,12 @@ import java.util.concurrent.ConcurrentLinkedQueue; * <p> * TODO Make this implement java.util.Queue and hide the implementation. Then different queue types can be substituted. */ -public class FlowControllingBlockingQueue +public class FlowControllingBlockingQueue<T> { private static final Logger _logger = LoggerFactory.getLogger(FlowControllingBlockingQueue.class); /** This queue is bounded and is used to store messages before being dispatched to the consumer */ - private final Queue _queue = new ConcurrentLinkedQueue(); + private final Queue<T> _queue = new ConcurrentLinkedQueue<T>(); private final int _flowControlHighThreshold; private final int _flowControlLowThreshold; @@ -82,9 +82,44 @@ public class FlowControllingBlockingQueue } } - public Object take() throws InterruptedException + public T blockingPeek() throws InterruptedException + { + T o = _queue.peek(); + if (o == null) + { + synchronized (this) + { + while ((o = _queue.peek()) == null) + { + wait(); + } + } + } + return o; + } + + public T nonBlockingTake() throws InterruptedException + { + T o = _queue.poll(); + + if (o != null && !disableFlowControl && _listener != null) + { + synchronized (_listener) + { + if (_count-- == _flowControlLowThreshold) + { + _listener.underThreshold(_count); + } + } + + } + + return o; + } + + public T take() throws InterruptedException { - Object o = _queue.poll(); + T o = _queue.poll(); if(o == null) { synchronized(this) @@ -110,7 +145,7 @@ public class FlowControllingBlockingQueue return o; } - public void add(Object o) + public void add(T o) { synchronized(this) { @@ -130,7 +165,7 @@ public class FlowControllingBlockingQueue } } - public Iterator iterator() + public Iterator<T> iterator() { return _queue.iterator(); } diff --git a/qpid/python/qpid/messaging/driver.py b/qpid/python/qpid/messaging/driver.py index e7d564f555..7c30e5d4ba 100644 --- a/qpid/python/qpid/messaging/driver.py +++ b/qpid/python/qpid/messaging/driver.py @@ -1368,7 +1368,8 @@ class Engine: assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending) rcv.received += 1 log.debug("RCVD[%s]: %s", ssn.log_id, msg) - ssn.incoming.append(msg) + ssn.message_received(msg) + def _decode(self, xfr): dp = EMPTY_DP diff --git a/qpid/python/qpid/messaging/endpoints.py b/qpid/python/qpid/messaging/endpoints.py index 7d353e1cb4..6d58b4ac25 100644 --- a/qpid/python/qpid/messaging/endpoints.py +++ b/qpid/python/qpid/messaging/endpoints.py @@ -569,6 +569,7 @@ class Session(Endpoint): self.closed = False self._lock = connection._lock + self._msg_received = None def __repr__(self): return "<Session %s>" % self.name @@ -600,6 +601,11 @@ class Session(Endpoint): if self.closed: raise SessionClosed() + def message_received(self, msg): + self.incoming.append(msg) + if self._msg_received: + self._msg_received() + @synchronized def sender(self, target, **options): """ @@ -685,6 +691,18 @@ class Session(Endpoint): return None @synchronized + def set_message_received_handler(self, handler): + """Register a callback that will be invoked when a message arrives on the + session. Use with caution: since this callback is invoked in the context + of the driver thread, it is not safe to call any of the public messaging + APIs from within this callback. The intent of the handler is to provide + an efficient way to notify the application that a message has arrived. + This can be useful for those applications that need to schedule a task + to poll for received messages without blocking in the messaging API. + """ + self._msg_received = handler + + @synchronized def next_receiver(self, timeout=None): if self._ecwait(lambda: self.incoming, timeout): return self.incoming[0]._receiver diff --git a/qpid/python/qpid/tests/messaging/endpoints.py b/qpid/python/qpid/tests/messaging/endpoints.py index 247d6e9a29..56722374e5 100644 --- a/qpid/python/qpid/tests/messaging/endpoints.py +++ b/qpid/python/qpid/tests/messaging/endpoints.py @@ -660,6 +660,31 @@ class SessionTests(Base): except Detached: pass + def testRxCallback(self): + """Verify that the callback is invoked when a message is received. + """ + ADDR = 'test-rx_callback-queue; {create: always, delete: receiver}' + class CallbackHandler: + def __init__(self): + self.handler_called = False + def __call__(self): + self.handler_called = True + cb = CallbackHandler() + self.ssn.set_message_received_handler(cb) + rcv = self.ssn.receiver(ADDR) + rcv.capacity = UNLIMITED + snd = self.ssn.sender(ADDR) + assert not cb.handler_called + snd.send("Ping") + deadline = time.time() + self.timeout() + while time.time() < deadline: + if cb.handler_called: + break; + assert cb.handler_called + snd.close() + rcv.close() + + RECEIVER_Q = 'test-receiver-queue; {create: always, delete: always}' class ReceiverTests(Base): diff --git a/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QpidConfig.java b/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QpidConfig.java index 04a0642e2e..8cca763e4c 100644 --- a/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QpidConfig.java +++ b/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QpidConfig.java @@ -344,7 +344,6 @@ public final class QpidConfig private String _order = "fifo"; private boolean _msgSequence = false; private boolean _ive = false; - private long _eventGeneration = 0; private String _file = null; // New to Qpid 0.10 qpid-config @@ -364,7 +363,6 @@ public final class QpidConfig private static final String LVQNB = "qpid.last_value_queue_no_browse"; private static final String MSG_SEQUENCE = "qpid.msg_sequence"; private static final String IVE = "qpid.ive"; - private static final String QUEUE_EVENT_GENERATION = "qpid.queue_event_generation"; private static final String FLOW_STOP_COUNT = "qpid.flow_stop_count"; private static final String FLOW_RESUME_COUNT = "qpid.flow_resume_count"; private static final String FLOW_STOP_SIZE = "qpid.flow_stop_size"; @@ -387,7 +385,6 @@ public final class QpidConfig SPECIAL_ARGS.add(LVQNB); SPECIAL_ARGS.add(MSG_SEQUENCE); SPECIAL_ARGS.add(IVE); - SPECIAL_ARGS.add(QUEUE_EVENT_GENERATION); SPECIAL_ARGS.add(FLOW_STOP_COUNT); SPECIAL_ARGS.add(FLOW_RESUME_COUNT); SPECIAL_ARGS.add(FLOW_STOP_SIZE); @@ -714,11 +711,6 @@ for (Map.Entry<String, Object> entry : args.entrySet()) { System.out.printf("--order lvq-no-browse "); } - if (args.containsKey(QUEUE_EVENT_GENERATION)) - { - System.out.printf("--generate-queue-events=%d ", QmfData.getLong(args.get(QUEUE_EVENT_GENERATION))); - } - if (queue.hasValue("altExchange")) { ObjectId altExchangeRef = queue.getRefValue("altExchange"); @@ -936,11 +928,6 @@ for (Map.Entry<String, Object> entry : args.entrySet()) { properties.put(LVQNB, 1l); } - if (_eventGeneration > 0) - { - properties.put(QUEUE_EVENT_GENERATION, _eventGeneration); - } - if (_altExchange != null) { properties.put("alternate-exchange", _altExchange); @@ -1393,11 +1380,6 @@ for (Map.Entry<String, Object> entry : args.entrySet()) { _ive = true; } - if (opt[0].equals("--generate-queue-events")) - { - _eventGeneration = Long.parseLong(opt[1]); - } - if (opt[0].equals("--force")) { _ifEmpty = false; diff --git a/qpid/tools/src/py/qpid-config b/qpid/tools/src/py/qpid-config index 8d38b1a342..816e0f0a08 100755 --- a/qpid/tools/src/py/qpid-config +++ b/qpid/tools/src/py/qpid-config @@ -147,7 +147,6 @@ POLICY_TYPE = "qpid.policy_type" LVQ_KEY = "qpid.last_value_queue_key" MSG_SEQUENCE = "qpid.msg_sequence" IVE = "qpid.ive" -QUEUE_EVENT_GENERATION = "qpid.queue_event_generation" FLOW_STOP_COUNT = "qpid.flow_stop_count" FLOW_RESUME_COUNT = "qpid.flow_resume_count" FLOW_STOP_SIZE = "qpid.flow_stop_size" @@ -164,7 +163,7 @@ REPLICATE = "qpid.replicate" SPECIAL_ARGS=[ FILECOUNT,FILESIZE,EFP_PARTITION_NUM,EFP_POOL_FILE_SIZE, MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE, - LVQ_KEY,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION, + LVQ_KEY,MSG_SEQUENCE,IVE, FLOW_STOP_COUNT,FLOW_RESUME_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE, MSG_GROUP_HDR_KEY,SHARED_MSG_GROUP,REPLICATE] @@ -541,7 +540,6 @@ class BrokerManager: if MAX_QUEUE_COUNT in args: print "--max-queue-count=%s" % args[MAX_QUEUE_COUNT], if POLICY_TYPE in args: print "--limit-policy=%s" % args[POLICY_TYPE].replace("_", "-"), if LVQ_KEY in args: print "--lvq-key=%s" % args[LVQ_KEY], - if QUEUE_EVENT_GENERATION in args: print "--generate-queue-events=%s" % args[QUEUE_EVENT_GENERATION], if q.altExchange: print "--alternate-exchange=%s" % q.altExchange, if FLOW_STOP_SIZE in args: print "--flow-stop-size=%s" % args[FLOW_STOP_SIZE], @@ -638,8 +636,6 @@ class BrokerManager: if config._lvq_key: declArgs[LVQ_KEY] = config._lvq_key - if config._eventGeneration: - declArgs[QUEUE_EVENT_GENERATION] = config._eventGeneration if config._flowStopSize is not None: declArgs[FLOW_STOP_SIZE] = config._flowStopSize |
