summaryrefslogtreecommitdiff
path: root/java/cluster
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2007-01-19 22:19:51 +0000
committerKim van der Riet <kpvdr@apache.org>2007-01-19 22:19:51 +0000
commit9016b88d0519e556975017150019a9b8a037b6d1 (patch)
treec2f7bf734581ef9bb2bb51a3d49b00d63a17bd23 /java/cluster
parente861284318186f8d9cd64a7ddcc28b8d20b98721 (diff)
downloadqpid-python-9016b88d0519e556975017150019a9b8a037b6d1.tar.gz
Introduced channel close methods into AMQMinaProtocolSession.java; Refactored StateAwareMethodListener.java to simplify call and remove redundant parameters, reworked all affected handlers. Connected the AMQP version information to the protocol session in all broker handlers.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@497974 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/cluster')
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java8
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java16
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java10
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java18
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java12
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java2
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java8
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java5
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java4
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java4
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java8
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java28
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java8
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java2
14 files changed, 60 insertions, 73 deletions
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java
index c4107a435b..6cbacf4d27 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java
@@ -54,19 +54,19 @@ public class ChainedClusterMethodHandler <A extends AMQMethodBody> extends Clust
}
}
- protected final void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ protected final void peer(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
{
for(ClusterMethodHandler<A> handler : _handlers)
{
- handler.peer(stateMgr, queues, exchanges, session, evt);
+ handler.peer(session, evt);
}
}
- protected final void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ protected final void client(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
{
for(ClusterMethodHandler<A> handler : _handlers)
{
- handler.client(stateMgr, queues, exchanges, session, evt);
+ handler.client(session, evt);
}
}
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java
index 9612efbedf..f1ba4a0032 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java
@@ -83,22 +83,22 @@ class ChannelQueueManager
private class QueueDeclareHandler extends ClusterMethodHandler<QueueDeclareBody>
{
- protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
+ protected void peer(AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
{
}
- protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
+ protected void client(AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
{
set(evt.getChannelId(), evt.getMethod().queue);
}
}
private class QueueBindHandler extends ClusterMethodHandler<QueueBindBody>
{
- protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueBindBody> evt) throws AMQException
+ protected void peer(AMQProtocolSession session, AMQMethodEvent<QueueBindBody> evt) throws AMQException
{
}
- protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueBindBody> evt) throws AMQException
+ protected void client(AMQProtocolSession session, AMQMethodEvent<QueueBindBody> evt) throws AMQException
{
if(evt.getMethod().queue == null)
{
@@ -108,11 +108,11 @@ class ChannelQueueManager
}
private class QueueDeleteHandler extends ClusterMethodHandler<QueueDeleteBody>
{
- protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
+ protected void peer(AMQProtocolSession session, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
{
}
- protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
+ protected void client(AMQProtocolSession session, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
{
if(evt.getMethod().queue == null)
{
@@ -123,11 +123,11 @@ class ChannelQueueManager
private class MessageConsumeHandler extends ClusterMethodHandler<MessageConsumeBody>
{
- protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<MessageConsumeBody> evt) throws AMQException
+ protected void peer(AMQProtocolSession session, AMQMethodEvent<MessageConsumeBody> evt) throws AMQException
{
}
- protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<MessageConsumeBody> evt) throws AMQException
+ protected void client(AMQProtocolSession session, AMQMethodEvent<MessageConsumeBody> evt) throws AMQException
{
if(evt.getMethod().queue == null)
{
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java
index 260f27fa82..25e7abe965 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java
@@ -32,18 +32,18 @@ import org.apache.qpid.AMQException;
public abstract class ClusterMethodHandler<A extends AMQMethodBody> implements StateAwareMethodListener<A>
{
- public final void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ public final void methodReceived(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
{
if (ClusteredProtocolSession.isPeerSession(session))
{
- peer(stateMgr, queues, exchanges, session, evt);
+ peer(session, evt);
}
else
{
- client(stateMgr, queues, exchanges, session, evt);
+ client(session, evt);
}
}
- protected abstract void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException;
- protected abstract void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException;
+ protected abstract void peer(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException;
+ protected abstract void client(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException;
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
index 6aaa66dbfa..7ec4a5db0c 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
@@ -162,8 +162,7 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory
private class SynchHandler implements StateAwareMethodListener<ClusterSynchBody>
{
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession session,
+ public void methodReceived(AMQProtocolSession session,
AMQMethodEvent<ClusterSynchBody> evt) throws AMQException
{
_groupMgr.handleSynch(ClusteredProtocolSession.getSessionPeer(session));
@@ -172,8 +171,7 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory
private class JoinHandler implements StateAwareMethodListener<ClusterJoinBody>
{
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession session,
+ public void methodReceived(AMQProtocolSession session,
AMQMethodEvent<ClusterJoinBody> evt) throws AMQException
{
_groupMgr.handleJoin(new SimpleMemberHandle(evt.getMethod().broker));
@@ -182,8 +180,7 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory
private class LeaveHandler implements StateAwareMethodListener<ClusterLeaveBody>
{
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+ public void methodReceived(AMQProtocolSession protocolSession,
AMQMethodEvent<ClusterLeaveBody> evt) throws AMQException
{
_groupMgr.handleLeave(new SimpleMemberHandle(evt.getMethod().broker));
@@ -192,8 +189,7 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory
private class SuspectHandler implements StateAwareMethodListener<ClusterSuspectBody>
{
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+ public void methodReceived(AMQProtocolSession protocolSession,
AMQMethodEvent<ClusterSuspectBody> evt) throws AMQException
{
_groupMgr.handleSuspect(new SimpleMemberHandle(evt.getMethod().broker));
@@ -202,8 +198,7 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory
private class MembershipHandler implements StateAwareMethodListener<ClusterMembershipBody>
{
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession session,
+ public void methodReceived(AMQProtocolSession session,
AMQMethodEvent<ClusterMembershipBody> evt) throws AMQException
{
ClusterMembershipBody body = evt.getMethod();
@@ -213,8 +208,7 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory
private class PingHandler implements StateAwareMethodListener<ClusterPingBody>
{
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession session,
+ public void methodReceived(AMQProtocolSession session,
AMQMethodEvent<ClusterPingBody> evt) throws AMQException
{
MemberHandle peer = new SimpleMemberHandle(evt.getMethod().broker);
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java
index 8250d57d98..d05c7042d1 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java
@@ -38,18 +38,18 @@ class ExtendedHandler<A extends AMQMethodBody> implements StateAwareMethodListen
_base = base;
}
- public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ public void methodReceived(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
{
- preHandle(stateMgr, session, evt);
- _base.methodReceived(stateMgr, queues, exchanges, session, evt);
- postHandle(stateMgr, session, evt);
+ preHandle(session, evt);
+ _base.methodReceived(session, evt);
+ postHandle(session, evt);
}
- void preHandle(AMQStateManager stateMgr, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ void preHandle(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
{
}
- void postHandle(AMQStateManager stateMgr, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ void postHandle(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
{
}
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java
index 5cd94f12d1..2d516ebcab 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java
@@ -31,7 +31,7 @@ import org.apache.qpid.server.state.StateAwareMethodListener;
public class NullListener<T extends AMQMethodBody> implements StateAwareMethodListener<T>
{
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, AMQMethodEvent<T> evt) throws AMQException
+ public void methodReceived(AMQProtocolSession protocolSession, AMQMethodEvent<T> evt) throws AMQException
{
}
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java
index eaf80338d2..1d115c2f9b 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java
@@ -47,14 +47,14 @@ public class PeerHandler<A extends AMQMethodBody> extends ClusterMethodHandler<A
_client = client;
}
- protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ protected void peer(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
{
- _peer.methodReceived(stateMgr, queues, exchanges, session, evt);
+ _peer.methodReceived(session, evt);
}
- protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ protected void client(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
{
- _client.methodReceived(stateMgr, queues, exchanges, session, evt);
+ _client.methodReceived(session, evt);
}
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java
index 3bf40fbaee..c1b4c8b82c 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java
@@ -41,12 +41,11 @@ class QueueNameGenerator extends ClusterMethodHandler<QueueDeclareBody>
_handler = handler;
}
- protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
+ protected void peer(AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
{
}
- protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges,
- AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt)
+ protected void client(AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt)
throws AMQException
{
setName(evt.getMethod());//need to set the name before propagating this method
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java
index f17a329c44..a278815324 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java
@@ -37,10 +37,10 @@ public class RemoteCancelHandler implements StateAwareMethodListener<MessageCanc
{
private final Logger _logger = Logger.getLogger(RemoteCancelHandler.class);
- public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<MessageCancelBody> evt) throws AMQException
+ public void methodReceived(AMQProtocolSession session, AMQMethodEvent<MessageCancelBody> evt) throws AMQException
{
//By convention, consumers setup between brokers use the queue name as the consumer tag:
- AMQQueue queue = queues.getQueue(evt.getMethod().getDestination());
+ AMQQueue queue = session.getQueueRegistry().getQueue(evt.getMethod().getDestination());
if (queue instanceof ClusteredQueue)
{
((ClusteredQueue) queue).removeRemoteSubscriber(ClusteredProtocolSession.getSessionPeer(session));
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
index a3b2b42bc9..e34ef38608 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
@@ -42,9 +42,9 @@ public class RemoteConsumeHandler implements StateAwareMethodListener<MessageCon
{
private final Logger _logger = Logger.getLogger(RemoteConsumeHandler.class);
- public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<MessageConsumeBody> evt) throws AMQException
+ public void methodReceived(AMQProtocolSession session, AMQMethodEvent<MessageConsumeBody> evt) throws AMQException
{
- AMQQueue queue = queues.getQueue(evt.getMethod().queue);
+ AMQQueue queue = session.getQueueRegistry().getQueue(evt.getMethod().queue);
if (queue instanceof ClusteredQueue)
{
((ClusteredQueue) queue).addRemoteSubcriber(ClusteredProtocolSession.getSessionPeer(session));
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java
index 5d25bab23e..59df01fda4 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java
@@ -46,17 +46,17 @@ public class ReplicatingConsumeHandler extends ReplicatingHandler<MessageConsume
super(groupMgr, base(), policy);
}
- protected void replicate(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<MessageConsumeBody> evt) throws AMQException
+ protected void replicate(AMQProtocolSession session, AMQMethodEvent<MessageConsumeBody> evt) throws AMQException
{
//only replicate if the queue in question is a shared queue
- if (isShared(queues.getQueue(evt.getMethod().queue)))
+ if (isShared(session.getQueueRegistry().getQueue(evt.getMethod().queue)))
{
- super.replicate(stateMgr, queues, exchanges, session, evt);
+ super.replicate(session, evt);
}
else
{
_logger.info(new LogMessage("Handling consume for private queue ({0}) locally", evt.getMethod()));
- local(stateMgr, queues, exchanges, session, evt);
+ local(session, evt);
_logger.info(new LogMessage("Handled consume for private queue ({0}) locally", evt.getMethod()));
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java
index e68ae9eb3d..ffc4325d63 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java
@@ -65,51 +65,45 @@ class ReplicatingHandler<A extends AMQMethodBody> extends ClusterMethodHandler<A
_policy = policy;
}
- protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ protected void peer(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
{
- local(stateMgr, queues, exchanges, session, evt);
+ local(session, evt);
_logger.debug(new LogMessage("Handled {0} locally", evt.getMethod()));
}
- protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ protected void client(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
{
- replicate(stateMgr, queues, exchanges, session, evt);
+ replicate(session, evt);
}
- protected void replicate(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ protected void replicate(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
{
if (_policy == null)
{
//asynch delivery
_groupMgr.broadcast(new SimpleSendable(evt.getMethod()));
- local(stateMgr, queues, exchanges, session, evt);
+ local(session, evt);
}
else
{
- Callback callback = new Callback(stateMgr, queues, exchanges, session, evt);
+ Callback callback = new Callback(session, evt);
_groupMgr.broadcast(new SimpleSendable(evt.getMethod()), _policy, callback);
}
_logger.debug(new LogMessage("Replicated {0} to peers", evt.getMethod()));
}
- protected void local(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ protected void local(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
{
- _base.methodReceived(stateMgr, queues, exchanges, session, evt);
+ _base.methodReceived(session, evt);
}
private class Callback implements GroupResponseHandler
{
- private final AMQStateManager _stateMgr;
- private final QueueRegistry _queues;
- private final ExchangeRegistry _exchanges;
private final AMQProtocolSession _session;
private final AMQMethodEvent<A> _evt;
- Callback(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt)
+ Callback(AMQProtocolSession session, AMQMethodEvent<A> evt)
{
- _stateMgr = stateMgr;
- _queues = queues;
- _exchanges = exchanges;
_session = session;
_evt = evt;
}
@@ -118,7 +112,7 @@ class ReplicatingHandler<A extends AMQMethodBody> extends ClusterMethodHandler<A
{
try
{
- local(_stateMgr, _queues, _exchanges, _session, _evt);
+ local(_session, _evt);
_logger.debug(new LogMessage("Handled {0} locally, in response to completion of replication", _evt.getMethod()));
}
catch (AMQException e)
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java
index fd38ea6153..62fbdf89d3 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java
@@ -42,11 +42,11 @@ public class WrappedListener<T extends AMQMethodBody> implements StateAwareMetho
_primary = check(primary);
}
- public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<T> evt) throws AMQException
+ public void methodReceived(AMQProtocolSession session, AMQMethodEvent<T> evt) throws AMQException
{
- _pre.methodReceived(stateMgr, queues, exchanges, session, evt);
- _primary.methodReceived(stateMgr, queues, exchanges, session, evt);
- _post.methodReceived(stateMgr, queues, exchanges, session, evt);
+ _pre.methodReceived(session, evt);
+ _primary.methodReceived(session, evt);
+ _post.methodReceived(session, evt);
}
private static <T extends AMQMethodBody> StateAwareMethodListener<T> check(StateAwareMethodListener<T> in)
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
index 102628c6de..b82e3ad0ac 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
@@ -81,7 +81,7 @@ public class ReplayStore implements ReplayManager, StateAwareMethodListener
_localRecorders.put(ExchangeDeleteBody.class, new ExchangeDeleteRecorder());
}
- public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQProtocolSession session, AMQMethodEvent evt) throws AMQException
{
_logger.debug(new LogMessage("Replay store received {0}", evt.getMethod()));
AMQMethodBody request = evt.getMethod();