diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2007-01-19 22:19:51 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2007-01-19 22:19:51 +0000 |
| commit | 9016b88d0519e556975017150019a9b8a037b6d1 (patch) | |
| tree | c2f7bf734581ef9bb2bb51a3d49b00d63a17bd23 /java/cluster | |
| parent | e861284318186f8d9cd64a7ddcc28b8d20b98721 (diff) | |
| download | qpid-python-9016b88d0519e556975017150019a9b8a037b6d1.tar.gz | |
Introduced channel close methods into AMQMinaProtocolSession.java; Refactored StateAwareMethodListener.java to simplify call and remove redundant parameters, reworked all affected handlers. Connected the AMQP version information to the protocol session in all broker handlers.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@497974 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/cluster')
14 files changed, 60 insertions, 73 deletions
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java index c4107a435b..6cbacf4d27 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java @@ -54,19 +54,19 @@ public class ChainedClusterMethodHandler <A extends AMQMethodBody> extends Clust } } - protected final void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + protected final void peer(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException { for(ClusterMethodHandler<A> handler : _handlers) { - handler.peer(stateMgr, queues, exchanges, session, evt); + handler.peer(session, evt); } } - protected final void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + protected final void client(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException { for(ClusterMethodHandler<A> handler : _handlers) { - handler.client(stateMgr, queues, exchanges, session, evt); + handler.client(session, evt); } } } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java index 9612efbedf..f1ba4a0032 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java @@ -83,22 +83,22 @@ class ChannelQueueManager private class QueueDeclareHandler extends ClusterMethodHandler<QueueDeclareBody> { - protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException + protected void peer(AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException { } - protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException + protected void client(AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException { set(evt.getChannelId(), evt.getMethod().queue); } } private class QueueBindHandler extends ClusterMethodHandler<QueueBindBody> { - protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueBindBody> evt) throws AMQException + protected void peer(AMQProtocolSession session, AMQMethodEvent<QueueBindBody> evt) throws AMQException { } - protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueBindBody> evt) throws AMQException + protected void client(AMQProtocolSession session, AMQMethodEvent<QueueBindBody> evt) throws AMQException { if(evt.getMethod().queue == null) { @@ -108,11 +108,11 @@ class ChannelQueueManager } private class QueueDeleteHandler extends ClusterMethodHandler<QueueDeleteBody> { - protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException + protected void peer(AMQProtocolSession session, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException { } - protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException + protected void client(AMQProtocolSession session, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException { if(evt.getMethod().queue == null) { @@ -123,11 +123,11 @@ class ChannelQueueManager private class MessageConsumeHandler extends ClusterMethodHandler<MessageConsumeBody> { - protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<MessageConsumeBody> evt) throws AMQException + protected void peer(AMQProtocolSession session, AMQMethodEvent<MessageConsumeBody> evt) throws AMQException { } - protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<MessageConsumeBody> evt) throws AMQException + protected void client(AMQProtocolSession session, AMQMethodEvent<MessageConsumeBody> evt) throws AMQException { if(evt.getMethod().queue == null) { diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java index 260f27fa82..25e7abe965 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java @@ -32,18 +32,18 @@ import org.apache.qpid.AMQException; public abstract class ClusterMethodHandler<A extends AMQMethodBody> implements StateAwareMethodListener<A> { - public final void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + public final void methodReceived(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException { if (ClusteredProtocolSession.isPeerSession(session)) { - peer(stateMgr, queues, exchanges, session, evt); + peer(session, evt); } else { - client(stateMgr, queues, exchanges, session, evt); + client(session, evt); } } - protected abstract void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException; - protected abstract void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException; + protected abstract void peer(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException; + protected abstract void client(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException; } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java index 6aaa66dbfa..7ec4a5db0c 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java @@ -162,8 +162,7 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory private class SynchHandler implements StateAwareMethodListener<ClusterSynchBody> { - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession session, + public void methodReceived(AMQProtocolSession session, AMQMethodEvent<ClusterSynchBody> evt) throws AMQException { _groupMgr.handleSynch(ClusteredProtocolSession.getSessionPeer(session)); @@ -172,8 +171,7 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory private class JoinHandler implements StateAwareMethodListener<ClusterJoinBody> { - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession session, + public void methodReceived(AMQProtocolSession session, AMQMethodEvent<ClusterJoinBody> evt) throws AMQException { _groupMgr.handleJoin(new SimpleMemberHandle(evt.getMethod().broker)); @@ -182,8 +180,7 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory private class LeaveHandler implements StateAwareMethodListener<ClusterLeaveBody> { - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, + public void methodReceived(AMQProtocolSession protocolSession, AMQMethodEvent<ClusterLeaveBody> evt) throws AMQException { _groupMgr.handleLeave(new SimpleMemberHandle(evt.getMethod().broker)); @@ -192,8 +189,7 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory private class SuspectHandler implements StateAwareMethodListener<ClusterSuspectBody> { - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, + public void methodReceived(AMQProtocolSession protocolSession, AMQMethodEvent<ClusterSuspectBody> evt) throws AMQException { _groupMgr.handleSuspect(new SimpleMemberHandle(evt.getMethod().broker)); @@ -202,8 +198,7 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory private class MembershipHandler implements StateAwareMethodListener<ClusterMembershipBody> { - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession session, + public void methodReceived(AMQProtocolSession session, AMQMethodEvent<ClusterMembershipBody> evt) throws AMQException { ClusterMembershipBody body = evt.getMethod(); @@ -213,8 +208,7 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory private class PingHandler implements StateAwareMethodListener<ClusterPingBody> { - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession session, + public void methodReceived(AMQProtocolSession session, AMQMethodEvent<ClusterPingBody> evt) throws AMQException { MemberHandle peer = new SimpleMemberHandle(evt.getMethod().broker); diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java index 8250d57d98..d05c7042d1 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java @@ -38,18 +38,18 @@ class ExtendedHandler<A extends AMQMethodBody> implements StateAwareMethodListen _base = base; } - public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + public void methodReceived(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException { - preHandle(stateMgr, session, evt); - _base.methodReceived(stateMgr, queues, exchanges, session, evt); - postHandle(stateMgr, session, evt); + preHandle(session, evt); + _base.methodReceived(session, evt); + postHandle(session, evt); } - void preHandle(AMQStateManager stateMgr, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + void preHandle(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException { } - void postHandle(AMQStateManager stateMgr, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + void postHandle(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException { } } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java index 5cd94f12d1..2d516ebcab 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java @@ -31,7 +31,7 @@ import org.apache.qpid.server.state.StateAwareMethodListener; public class NullListener<T extends AMQMethodBody> implements StateAwareMethodListener<T> { - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, AMQMethodEvent<T> evt) throws AMQException + public void methodReceived(AMQProtocolSession protocolSession, AMQMethodEvent<T> evt) throws AMQException { } } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java index eaf80338d2..1d115c2f9b 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java @@ -47,14 +47,14 @@ public class PeerHandler<A extends AMQMethodBody> extends ClusterMethodHandler<A _client = client; } - protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + protected void peer(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException { - _peer.methodReceived(stateMgr, queues, exchanges, session, evt); + _peer.methodReceived(session, evt); } - protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + protected void client(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException { - _client.methodReceived(stateMgr, queues, exchanges, session, evt); + _client.methodReceived(session, evt); } } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java index 3bf40fbaee..c1b4c8b82c 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java @@ -41,12 +41,11 @@ class QueueNameGenerator extends ClusterMethodHandler<QueueDeclareBody> _handler = handler; } - protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException + protected void peer(AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException { } - protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, - AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) + protected void client(AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException { setName(evt.getMethod());//need to set the name before propagating this method diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java index f17a329c44..a278815324 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java @@ -37,10 +37,10 @@ public class RemoteCancelHandler implements StateAwareMethodListener<MessageCanc { private final Logger _logger = Logger.getLogger(RemoteCancelHandler.class); - public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<MessageCancelBody> evt) throws AMQException + public void methodReceived(AMQProtocolSession session, AMQMethodEvent<MessageCancelBody> evt) throws AMQException { //By convention, consumers setup between brokers use the queue name as the consumer tag: - AMQQueue queue = queues.getQueue(evt.getMethod().getDestination()); + AMQQueue queue = session.getQueueRegistry().getQueue(evt.getMethod().getDestination()); if (queue instanceof ClusteredQueue) { ((ClusteredQueue) queue).removeRemoteSubscriber(ClusteredProtocolSession.getSessionPeer(session)); diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java index a3b2b42bc9..e34ef38608 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java @@ -42,9 +42,9 @@ public class RemoteConsumeHandler implements StateAwareMethodListener<MessageCon { private final Logger _logger = Logger.getLogger(RemoteConsumeHandler.class); - public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<MessageConsumeBody> evt) throws AMQException + public void methodReceived(AMQProtocolSession session, AMQMethodEvent<MessageConsumeBody> evt) throws AMQException { - AMQQueue queue = queues.getQueue(evt.getMethod().queue); + AMQQueue queue = session.getQueueRegistry().getQueue(evt.getMethod().queue); if (queue instanceof ClusteredQueue) { ((ClusteredQueue) queue).addRemoteSubcriber(ClusteredProtocolSession.getSessionPeer(session)); diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java index 5d25bab23e..59df01fda4 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java @@ -46,17 +46,17 @@ public class ReplicatingConsumeHandler extends ReplicatingHandler<MessageConsume super(groupMgr, base(), policy); } - protected void replicate(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<MessageConsumeBody> evt) throws AMQException + protected void replicate(AMQProtocolSession session, AMQMethodEvent<MessageConsumeBody> evt) throws AMQException { //only replicate if the queue in question is a shared queue - if (isShared(queues.getQueue(evt.getMethod().queue))) + if (isShared(session.getQueueRegistry().getQueue(evt.getMethod().queue))) { - super.replicate(stateMgr, queues, exchanges, session, evt); + super.replicate(session, evt); } else { _logger.info(new LogMessage("Handling consume for private queue ({0}) locally", evt.getMethod())); - local(stateMgr, queues, exchanges, session, evt); + local(session, evt); _logger.info(new LogMessage("Handled consume for private queue ({0}) locally", evt.getMethod())); } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java index e68ae9eb3d..ffc4325d63 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java @@ -65,51 +65,45 @@ class ReplicatingHandler<A extends AMQMethodBody> extends ClusterMethodHandler<A _policy = policy; } - protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + protected void peer(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException { - local(stateMgr, queues, exchanges, session, evt); + local(session, evt); _logger.debug(new LogMessage("Handled {0} locally", evt.getMethod())); } - protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + protected void client(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException { - replicate(stateMgr, queues, exchanges, session, evt); + replicate(session, evt); } - protected void replicate(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + protected void replicate(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException { if (_policy == null) { //asynch delivery _groupMgr.broadcast(new SimpleSendable(evt.getMethod())); - local(stateMgr, queues, exchanges, session, evt); + local(session, evt); } else { - Callback callback = new Callback(stateMgr, queues, exchanges, session, evt); + Callback callback = new Callback(session, evt); _groupMgr.broadcast(new SimpleSendable(evt.getMethod()), _policy, callback); } _logger.debug(new LogMessage("Replicated {0} to peers", evt.getMethod())); } - protected void local(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + protected void local(AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException { - _base.methodReceived(stateMgr, queues, exchanges, session, evt); + _base.methodReceived(session, evt); } private class Callback implements GroupResponseHandler { - private final AMQStateManager _stateMgr; - private final QueueRegistry _queues; - private final ExchangeRegistry _exchanges; private final AMQProtocolSession _session; private final AMQMethodEvent<A> _evt; - Callback(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) + Callback(AMQProtocolSession session, AMQMethodEvent<A> evt) { - _stateMgr = stateMgr; - _queues = queues; - _exchanges = exchanges; _session = session; _evt = evt; } @@ -118,7 +112,7 @@ class ReplicatingHandler<A extends AMQMethodBody> extends ClusterMethodHandler<A { try { - local(_stateMgr, _queues, _exchanges, _session, _evt); + local(_session, _evt); _logger.debug(new LogMessage("Handled {0} locally, in response to completion of replication", _evt.getMethod())); } catch (AMQException e) diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java index fd38ea6153..62fbdf89d3 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java @@ -42,11 +42,11 @@ public class WrappedListener<T extends AMQMethodBody> implements StateAwareMetho _primary = check(primary); } - public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<T> evt) throws AMQException + public void methodReceived(AMQProtocolSession session, AMQMethodEvent<T> evt) throws AMQException { - _pre.methodReceived(stateMgr, queues, exchanges, session, evt); - _primary.methodReceived(stateMgr, queues, exchanges, session, evt); - _post.methodReceived(stateMgr, queues, exchanges, session, evt); + _pre.methodReceived(session, evt); + _primary.methodReceived(session, evt); + _post.methodReceived(session, evt); } private static <T extends AMQMethodBody> StateAwareMethodListener<T> check(StateAwareMethodListener<T> in) diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java index 102628c6de..b82e3ad0ac 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java @@ -81,7 +81,7 @@ public class ReplayStore implements ReplayManager, StateAwareMethodListener _localRecorders.put(ExchangeDeleteBody.class, new ExchangeDeleteRecorder()); } - public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent evt) throws AMQException + public void methodReceived(AMQProtocolSession session, AMQMethodEvent evt) throws AMQException { _logger.debug(new LogMessage("Replay store received {0}", evt.getMethod())); AMQMethodBody request = evt.getMethod(); |
