summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src
diff options
context:
space:
mode:
authorAlex Rudyy <orudyy@apache.org>2013-06-26 14:53:34 +0000
committerAlex Rudyy <orudyy@apache.org>2013-06-26 14:53:34 +0000
commit8bdc5010fef590e5dc3c424b1f41f8df42c47c11 (patch)
tree4805fd38f0f9a321581cfb0079e1ef2dd571e84e /qpid/java/broker/src
parent92067361c279257be2b2ee73484da61c9327c05a (diff)
downloadqpid-python-8bdc5010fef590e5dc3c424b1f41f8df42c47c11.tar.gz
QPID-4951: Add cause code and message into operational logs for session close initiated by the Broker
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1496951 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java11
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java15
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java30
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ChannelMessagesTest.java9
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java5
11 files changed, 82 insertions, 9 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 8588aea2d4..4df40585d9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -60,6 +60,7 @@ import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.AMQPChannelActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
@@ -574,13 +575,21 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
@Override
public void close() throws AMQException
{
+ close(null, null);
+ }
+
+ public void close(AMQConstant cause, String message) throws AMQException
+ {
if(!_closing.compareAndSet(false, true))
{
//Channel is already closing
return;
}
- CurrentActor.get().message(_logSubject, ChannelMessages.CLOSE());
+ LogMessage operationalLogMessage = cause == null ?
+ ChannelMessages.CLOSE() :
+ ChannelMessages.CLOSE_FORCED(cause.getCode(), message);
+ CurrentActor.get().message(_logSubject, operationalLogMessage);
unsubscribeAllConsumers();
_transaction.rollback();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
index b5df212904..397c12d73c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
@@ -22,6 +22,8 @@ CREATE = CHN-1001 : Create
# 0 - flow
FLOW = CHN-1002 : Flow {0}
CLOSE = CHN-1003 : Close
+CLOSE_FORCED = CHN-1003 : Close : {0,number} - {1}
+
# 0 - bytes allowed in prefetch
# 1 - number of messagse.
PREFETCH_SIZE = CHN-1004 : Prefetch Size (bytes) {0,number} : Count {1,number}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index e9b0fd9f10..1b24671575 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -450,12 +450,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
catch (AMQException e)
{
- closeChannel(channelId);
+ closeChannel(channelId, e.getErrorCode() == null ? AMQConstant.INTERNAL_ERROR : e.getErrorCode(), e.getMessage());
throw e;
}
catch (TransportException e)
{
- closeChannel(channelId);
+ closeChannel(channelId, AMQConstant.CHANNEL_ERROR, e.getMessage());
throw e;
}
@@ -601,7 +601,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
writeFrame(e.getCloseFrame(channelId));
- closeChannel(channelId);
+ closeChannel(channelId, e.getErrorCode() == null ? AMQConstant.INTERNAL_ERROR : e.getErrorCode(), e.getMessage());
}
else
{
@@ -824,6 +824,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
@Override
public void closeChannel(int channelId) throws AMQException
{
+ closeChannel(channelId, null, null);
+ }
+
+ public void closeChannel(int channelId, AMQConstant cause, String message) throws AMQException
+ {
final AMQChannel channel = getChannel(channelId);
if (channel == null)
{
@@ -833,7 +838,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
{
try
{
- channel.close();
+ channel.close(cause, message);
markChannelAwaitingCloseOk(channelId);
}
finally
@@ -1490,7 +1495,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
{
int channelId = ((AMQChannel)session).getChannelId();
- closeChannel(channelId);
+ closeChannel(channelId, cause, message);
MethodRegistry methodRegistry = getMethodRegistry();
ChannelCloseBody responseBody =
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index 1842117d6f..6fa497c853 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -33,6 +33,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.MethodDispatcher;
import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.logging.LogActor;
@@ -119,6 +120,8 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth
*/
void closeChannel(int channelId) throws AMQException;
+ void closeChannel(int channelId, AMQConstant cause, String message) throws AMQException;
+
/**
* Markes the specific channel as closed. This will release the lock for that channel id so a new channel can be
* created on that id.
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
index 9d9bbe807b..a3833eebb9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
@@ -24,6 +24,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListSet;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.queue.AMQQueue;
@@ -44,6 +45,8 @@ public interface AMQSessionModel extends Comparable<AMQSessionModel>
public void close() throws AMQException;
+ public void close(AMQConstant cause, String message) throws AMQException;
+
public LogSubject getLogSubject();
/**
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 60bb8e4044..2b39fb1ff0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -36,6 +36,7 @@ import org.apache.qpid.amqp_1_0.type.transport.*;
import org.apache.qpid.amqp_1_0.type.transport.Error;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.message.InboundMessage;
@@ -477,6 +478,13 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
// TODO - required for AMQSessionModel / management initiated closing
}
+
+ @Override
+ public void close(AMQConstant cause, String message) throws AMQException
+ {
+ // TODO - required for AMQSessionModel
+ }
+
@Override
public LogSubject getLogSubject()
{
@@ -603,4 +611,5 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
{
return _connection;
}
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
index cc28aba981..3a2cb7556d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
@@ -208,7 +208,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
ex.setDescription(message);
((ServerSession)session).invoke(ex);
- ((ServerSession)session).close();
+ ((ServerSession)session).close(cause, message);
}
public LogSubject getLogSubject()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index 6152ddd228..3449cc5237 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -38,6 +38,8 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
import javax.security.auth.Subject;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
@@ -45,6 +47,7 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.TransactionTimeoutHelper;
import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
@@ -140,6 +143,7 @@ public class ServerSession extends Session
private final TransactionTimeoutHelper _transactionTimeoutHelper;
+ private AtomicReference<LogMessage> _forcedCloseLogMessage = new AtomicReference<LogMessage>();
public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry)
{
@@ -380,7 +384,12 @@ public class ServerSession extends Session
task.doTask(this);
}
- CurrentActor.get().message(getLogSubject(), ChannelMessages.CLOSE());
+ LogMessage operationalLoggingMessage = _forcedCloseLogMessage.get();
+ if (operationalLoggingMessage == null)
+ {
+ operationalLoggingMessage = ChannelMessages.CLOSE();
+ }
+ CurrentActor.get().message(getLogSubject(), operationalLoggingMessage);
}
@Override
@@ -787,6 +796,25 @@ public class ServerSession extends Session
}
@Override
+ public void close(AMQConstant cause, String message)
+ {
+ if (cause == null)
+ {
+ close();
+ }
+ else
+ {
+ close(cause.getCode(), message);
+ }
+ }
+
+ void close(int cause, String message)
+ {
+ _forcedCloseLogMessage.compareAndSet(null, ChannelMessages.CLOSE_FORCED(cause, message));
+ close();
+ }
+
+ @Override
public void close()
{
// unregister subscriptions in order to prevent sending of new messages
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index 63419fce3f..0a4bb79ed7 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -835,7 +835,7 @@ public class ServerSessionDelegate extends SessionDelegate
session.invoke(ex);
- session.close();
+ ((ServerSession)session).close(errorCode.getValue(), description);
}
private Exchange getExchange(Session session, String exchangeName)
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ChannelMessagesTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ChannelMessagesTest.java
index e94b79ba95..2f1276508c 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ChannelMessagesTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ChannelMessagesTest.java
@@ -61,4 +61,13 @@ public class ChannelMessagesTest extends AbstractTestMessages
validateLogMessage(log, "CHN-1003", expected);
}
+ public void testChannelCloseForced()
+ {
+ _logMessage = ChannelMessages.CLOSE_FORCED(1, "Test");
+ List<Object> log = performLog();
+
+ String[] expected = {"Close : 1 - Test"};
+
+ validateLogMessage(log, "CHN-1003", expected);
+ }
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
index 7f797afeda..ed60d5374b 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
@@ -405,6 +405,11 @@ public class MockSubscription implements Subscription
{
return getId().compareTo(o.getId());
}
+
+ @Override
+ public void close(AMQConstant cause, String message) throws AMQException
+ {
+ }
}
private static class MockConnectionModel implements AMQConnectionModel