From 9b1bf6023a9869af744e2fbc03664d41ced37df6 Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Wed, 30 Oct 2013 21:38:03 +0000 Subject: QPID-4534: unify client heartbeat system properties/connection url options. * Connection url 'heartbeat' broker-option (and deprecated 'idle_timeout') now understood for all protocols * System property 'qpid.heartbeat' (and deprecated 'amqj.heartbeat.delay' and 'idle_timeout') now understood for all protocols * Enhanced heartbeat system tests * Docbook updates Original patch from Keith Wall, plus updates from Robbie Gemmell git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1537313 13f79535-47bb-0310-9956-ffa450edef68 --- .../book/src/cpp-broker/Active-Passive-Cluster.xml | 4 +- .../src/programming/Programming-In-Apache-Qpid.xml | 8 +- .../server/protocol/v0_8/AMQProtocolEngine.java | 5 - .../org/apache/qpid/client/AMQBrokerDetails.java | 23 ++++ .../qpid/client/AMQConnectionDelegate_0_10.java | 35 +----- .../qpid/client/AMQConnectionDelegate_8_0.java | 3 +- .../qpid/client/ConnectionTuneParameters.java | 29 ++--- .../handler/ConnectionTuneMethodHandler.java | 18 +-- .../qpid/client/protocol/AMQProtocolHandler.java | 6 +- .../qpid/client/protocol/AMQProtocolSession.java | 56 +++++---- .../qpid/client/protocol/HeartbeatConfig.java | 61 --------- .../java/org/apache/qpid/jms/BrokerDetails.java | 3 +- .../client/BrokerDetails/BrokerDetailsTest.java | 26 ++++ .../org/apache/qpid/configuration/Accessor.java | 136 +++------------------ .../qpid/configuration/ClientProperties.java | 51 ++++++-- .../apache/qpid/configuration/QpidProperty.java | 24 ++++ .../protocol/AMQVersionAwareProtocolSession.java | 1 - .../org/apache/qpid/transport/ClientDelegate.java | 20 +-- .../apache/qpid/transport/ConnectionSettings.java | 60 ++++++++- .../qpid/configuration/QpidPropertyTest.java | 19 +++ .../qpid/transport/ConnectionSettingsTest.java | 39 +++++- .../java/org/apache/qpid/client/HeartbeatTest.java | 116 +++++++++++++++--- qpid/java/test-profiles/CPPExcludes | 3 +- qpid/java/test-profiles/Java010Excludes | 3 +- 24 files changed, 422 insertions(+), 327 deletions(-) delete mode 100644 qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatConfig.java diff --git a/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml b/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml index f747cdb747..7e1b905373 100644 --- a/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml +++ b/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml @@ -781,11 +781,11 @@ NOTE: fencing is not shown, you must configure fencing appropriately for your cl - In a Connection URL, heartbeat is set using the idle_timeout property, which is an integer corresponding to the heartbeat period in seconds. For instance, the following line from a JNDI properties file sets the heartbeat time out to 3 seconds: + In a Connection URL, heartbeat is set using the heartbeat property, which is an integer corresponding to the heartbeat period in seconds. For instance, the following line from a JNDI properties file sets the heartbeat time out to 3 seconds: - connectionfactory.qpidConnectionfactory = amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672',idle_timeout=3 + connectionfactory.qpidConnectionfactory = amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'&heartbeat='3' diff --git a/qpid/doc/book/src/programming/Programming-In-Apache-Qpid.xml b/qpid/doc/book/src/programming/Programming-In-Apache-Qpid.xml index e2f6d8756c..d5527850c8 100644 --- a/qpid/doc/book/src/programming/Programming-In-Apache-Qpid.xml +++ b/qpid/doc/book/src/programming/Programming-In-Apache-Qpid.xml @@ -3176,7 +3176,8 @@ spout - -content "$(cat rdu.xml | sed -e 's/70/45/')" xml/weather integer - frequency of heartbeat messages (in seconds) + Frequency of heartbeat messages (in seconds). A value of 0 disables heartbeating. For compatibility + with old client configuration, option idle_timeout (in milliseconds) is also supported. @@ -3599,10 +3600,9 @@ spout - -content "$(cat rdu.xml | sed -e 's/70/45/')" xml/weather qpid.heartbeat int - 120 (secs) - The heartbeat interval in seconds. Two consective misssed heartbeats will result in the connection timing out.This can also be set per connection using the Connection URL options. + When using the 0-10 protocol, the default is 120 (secs)When using protocols 0-8...0-91, the default is the broker-supplied value. + Frequency of heartbeat messages (in seconds). A value of 0 disables heartbeating. Two consective misssed heartbeats will result in the connection timing out.This can also be set per connection using the Connection URL options.For compatibility with old client configuration, the synonym amqj.heartbeat.delay is supported. - ignore_setclientID boolean diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 33300e9e59..4cacae134e 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -1287,11 +1287,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } } - public void init() - { - // Do nothing - } - public void setSender(Sender sender) { // Do nothing diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java index 597096db57..25d37aafb1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java @@ -290,6 +290,19 @@ public class AMQBrokerDetails implements BrokerDetails } } + private int getIntegerProperty(String key) + { + String stringValue = getProperty(key); + try + { + return Integer.parseInt(stringValue); + } + catch (NumberFormatException e) + { + throw new IllegalArgumentException("Cannot parse key " + key + " with value '" + stringValue + "' as integer.", e); + } + } + public String toString() { StringBuffer sb = new StringBuffer(); @@ -464,6 +477,16 @@ public class AMQBrokerDetails implements BrokerDetails conSettings.setConnectTimeout(lookupConnectTimeout()); + if (getProperty(BrokerDetails.OPTIONS_HEARTBEAT) != null) + { + conSettings.setHeartbeatInterval(getIntegerProperty(BrokerDetails.OPTIONS_HEARTBEAT)); + } + else if (getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT) != null) + { + conSettings.setHeartbeatInterval(getIntegerProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT) / 1000); + } + return conSettings; } + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 66590aa0d7..95b1178407 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -29,7 +29,7 @@ import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.transport.ClientConnectionDelegate; import org.apache.qpid.common.ServerPropertyNames; -import org.apache.qpid.configuration.ClientProperties; + import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ChannelLimitReachedException; @@ -448,8 +448,6 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec // Ignore } - conSettings.setHeartbeatInterval(getHeartbeatInterval(brokerDetail)); - //Check connection-level ssl override setting String connectionSslOption = _conn.getConnectionURL().getOption(ConnectionURL.OPTIONS_SSL); if(connectionSslOption != null) @@ -470,37 +468,6 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec return conSettings; } - - // The idle_timeout prop is in milisecs while - // the new heartbeat prop is in secs - private int getHeartbeatInterval(BrokerDetails brokerDetail) - { - int heartbeat = 0; - if (brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT) != null) - { - _logger.warn("Broker property idle_timeout= is deprecated, please use heartbeat="); - heartbeat = Integer.parseInt(brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT))/1000; - } - else if (brokerDetail.getProperty(BrokerDetails.OPTIONS_HEARTBEAT) != null) - { - heartbeat = Integer.parseInt(brokerDetail.getProperty(BrokerDetails.OPTIONS_HEARTBEAT)); - } - else if (Integer.getInteger(ClientProperties.IDLE_TIMEOUT_PROP_NAME) != null) - { - heartbeat = Integer.getInteger(ClientProperties.IDLE_TIMEOUT_PROP_NAME)/1000; - _logger.warn("JVM arg -Didle_timeout= is deprecated, please use -Dqpid.heartbeat="); - } - else if(Integer.getInteger(ClientProperties.HEARTBEAT) != null) - { - heartbeat = Integer.getInteger(ClientProperties.HEARTBEAT,ClientProperties.HEARTBEAT_DEFAULT); - } - else - { - heartbeat = Integer.getInteger("amqj.heartbeat.delay", ClientProperties.HEARTBEAT_DEFAULT); - } - return heartbeat; - } - protected org.apache.qpid.transport.Connection getQpidConnection() { return _qpidConnection; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 340aca70eb..dfbf7ec60a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -124,10 +124,11 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn.getProtocolHandler()), _conn.getProtocolHandler()); + _conn.getProtocolHandler().setNetworkConnection(network, securityLayer.sender(network.getSender())); StateWaiter waiter = _conn.getProtocolHandler().createWaiter(openOrClosedStates); - _conn.getProtocolHandler().getProtocolSession().init(); + _conn.getProtocolHandler().getProtocolSession().init(settings); // this blocks until the connection has been set up or when an error // has prevented the connection being set up diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/ConnectionTuneParameters.java b/qpid/java/client/src/main/java/org/apache/qpid/client/ConnectionTuneParameters.java index b1ec7216bc..40264f837e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/ConnectionTuneParameters.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/ConnectionTuneParameters.java @@ -26,9 +26,20 @@ public class ConnectionTuneParameters private int _channelMax; - private int _heartbeat; + /** Heart-beating interval in seconds, null if not set, use 0 to disable */ + private Integer _heartbeat; - private long _txnLimit; + private float _heartbeatTimeoutFactor; + + public float getHeartbeatTimeoutFactor() + { + return _heartbeatTimeoutFactor; + } + + public void setHeartbeatTimeoutFactor(float heartbeatTimeoutFactor) + { + _heartbeatTimeoutFactor = heartbeatTimeoutFactor; + } public long getFrameMax() { @@ -50,23 +61,13 @@ public class ConnectionTuneParameters _channelMax = channelMax; } - public int getHeartbeat() + public Integer getHeartbeat() { return _heartbeat; } - public void setHeartbeat(int hearbeat) + public void setHeartbeat(Integer hearbeat) { _heartbeat = hearbeat; } - - public long getTxnLimit() - { - return _txnLimit; - } - - public void setTxnLimit(long txnLimit) - { - _txnLimit = txnLimit; - } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java index f77718672e..617380e149 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java @@ -52,20 +52,20 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener 0) { _network.setMaxWriteIdle(delay); - _network.setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay)); + int readerIdle = (int)(delay * timeoutFactor); + _network.setMaxReadIdle(readerIdle); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 67bd8de846..4027ccb725 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -43,6 +43,7 @@ import org.apache.qpid.framing.ProtocolInitiation; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; @@ -63,18 +64,10 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession protected static final Logger _logger = LoggerFactory.getLogger(AMQProtocolSession.class); - public static final String PROTOCOL_INITIATION_RECEIVED = "ProtocolInitiatiionReceived"; - //Usable channels are numbered 1 to public static final int MAX_CHANNEL_MAX = 0xFFFF; public static final int MIN_USABLE_CHANNEL_NUM = 1; - protected static final String CONNECTION_TUNE_PARAMETERS = "ConnectionTuneParameters"; - - protected static final String AMQ_CONNECTION = "AMQConnection"; - - protected static final String SASL_CLIENT = "SASLClient"; - private final AMQProtocolHandler _protocolHandler; private ConcurrentMap _channelId2SessionMap = new ConcurrentHashMap(); @@ -120,13 +113,38 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession _connection = connection; } - public void init() + public void init(ConnectionSettings settings) { // start the process of setting up the connection. This is the first place that // data is written to the server. + initialiseTuneParameters(settings); + _protocolHandler.writeFrame(new ProtocolInitiation(_connection.getProtocolVersion())); } + public ConnectionTuneParameters getConnectionTuneParameters() + { + return _connectionTuneParameters; + } + + private void initialiseTuneParameters(ConnectionSettings settings) + { + _connectionTuneParameters = new ConnectionTuneParameters(); + _connectionTuneParameters.setHeartbeat(settings.getHeartbeatInterval08()); + _connectionTuneParameters.setHeartbeatTimeoutFactor(settings.getHeartbeatTimeoutFactor()); + } + + public void tuneConnection(ConnectionTuneParameters params) + { + _connectionTuneParameters = params; + AMQConnection con = getAMQConnection(); + + con.setMaximumChannelCount(params.getChannelMax()); + con.setMaximumFrameSize(params.getFrameMax()); + + _protocolHandler.initHeartbeats(params.getHeartbeat(), params.getHeartbeatTimeoutFactor()); + } + public String getClientID() { try @@ -170,24 +188,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession _saslClient = client; } - public ConnectionTuneParameters getConnectionTuneParameters() - { - return _connectionTuneParameters; - } - - public void setConnectionTuneParameters(ConnectionTuneParameters params) - { - _connectionTuneParameters = params; - AMQConnection con = getAMQConnection(); - - con.setMaximumChannelCount(params.getChannelMax()); - con.setMaximumFrameSize(params.getFrameMax()); - _protocolHandler.initHeartbeats((int) params.getHeartbeat()); - } - /** - * Callback invoked from the BasicDeliverMethodHandler when a message has been received. This is invoked on the MINA - * dispatcher thread. + * Callback invoked from the BasicDeliverMethodHandler when a message has been received. * * @param message * @@ -409,7 +411,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { if (_logger.isDebugEnabled()) { - _logger.debug("Setting ProtocolVersion to :" + pv); + _logger.debug("Setting ProtocolVersion to :" + pv); } _protocolVersion = pv; _methodRegistry = MethodRegistry.getMethodRegistry(pv); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatConfig.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatConfig.java deleted file mode 100644 index 35ea44a331..0000000000 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatConfig.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.protocol; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -class HeartbeatConfig -{ - private static final Logger _logger = LoggerFactory.getLogger(HeartbeatConfig.class); - static final HeartbeatConfig CONFIG = new HeartbeatConfig(); - - /** - * The factor used to get the timeout from the delay between heartbeats. - */ - private float timeoutFactor = 2; - - HeartbeatConfig() - { - String property = System.getProperty("amqj.heartbeat.timeoutFactor"); - if (property != null) - { - try - { - timeoutFactor = Float.parseFloat(property); - } - catch (NumberFormatException e) - { - _logger.warn("Invalid timeout factor (amqj.heartbeat.timeoutFactor): " + property); - } - } - } - - float getTimeoutFactor() - { - return timeoutFactor; - } - - int getTimeout(int writeDelay) - { - return (int) (timeoutFactor * writeDelay); - } -} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java index 4a7fca1efa..b039d8b005 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java @@ -34,8 +34,9 @@ public interface BrokerDetails public static final String OPTIONS_RETRY = "retries"; public static final String OPTIONS_CONNECT_TIMEOUT = "connecttimeout"; public static final String OPTIONS_CONNECT_DELAY = "connectdelay"; - public static final String OPTIONS_IDLE_TIMEOUT = "idle_timeout"; // deprecated public static final String OPTIONS_HEARTBEAT = "heartbeat"; + @Deprecated + public static final String OPTIONS_IDLE_TIMEOUT = "idle_timeout"; public static final String OPTIONS_SASL_MECHS = "sasl_mechs"; public static final String OPTIONS_SASL_ENCRYPTION = "sasl_encryption"; public static final String OPTIONS_SSL = "ssl"; diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java index 1e9e5b00a5..ad9d3d3516 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java @@ -164,4 +164,30 @@ public class BrokerDetailsTest extends TestCase assertFalse("value should be false", Boolean.valueOf(broker.getProperty(BrokerDetails.OPTIONS_SSL))); } + + public void testHeartbeatDefaultsToNull() throws Exception + { + String brokerURL = "tcp://localhost:5672"; + AMQBrokerDetails broker = new AMQBrokerDetails(brokerURL); + assertNull("unexpected default value for " + BrokerDetails.OPTIONS_HEARTBEAT, broker.getProperty(BrokerDetails.OPTIONS_HEARTBEAT)); + } + + public void testOverriddingHeartbeat() throws Exception + { + String brokerURL = "tcp://localhost:5672?heartbeat='60'"; + AMQBrokerDetails broker = new AMQBrokerDetails(brokerURL); + assertEquals(60, Integer.parseInt(broker.getProperty(BrokerDetails.OPTIONS_HEARTBEAT))); + + assertEquals(Integer.valueOf(60), broker.buildConnectionSettings().getHeartbeatInterval08()); + } + + @SuppressWarnings("deprecation") + public void testLegacyHeartbeat() throws Exception + { + String brokerURL = "tcp://localhost:5672?idle_timeout='60000'"; + AMQBrokerDetails broker = new AMQBrokerDetails(brokerURL); + assertEquals(60000, Integer.parseInt(broker.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT))); + + assertEquals(Integer.valueOf(60), broker.buildConnectionSettings().getHeartbeatInterval08()); + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java b/qpid/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java index 517fd1829f..b73f08f824 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java @@ -20,13 +20,7 @@ */ package org.apache.qpid.configuration; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.Arrays; -import java.util.List; import java.util.Map; -import java.util.Properties; public interface Accessor { @@ -34,6 +28,7 @@ public interface Accessor public Integer getInt(String name); public Long getLong(String name); public String getString(String name); + public Float getFloat(String name); static class SystemPropertyAccessor implements Accessor { @@ -56,6 +51,11 @@ public interface Accessor { return System.getProperty(name); } + + public Float getFloat(String name) + { + return System.getProperty(name) == null ? null : Float.parseFloat(System.getProperty(name)); + } } static class MapAccessor implements Accessor @@ -147,132 +147,24 @@ public interface Accessor return null; } } - } - - static class PropertyFileAccessor extends MapAccessor - { - public PropertyFileAccessor(String fileName) throws FileNotFoundException, IOException - { - super(null); - Properties props = new Properties(); - FileInputStream inStream = new FileInputStream(fileName); - try - { - props.load(inStream); - } - finally - { - inStream.close(); - } - setSource(props); - } - - - } - - static class CombinedAccessor implements Accessor - { - private List accessors; - - public CombinedAccessor(Accessor...accessors) - { - this.accessors = Arrays.asList(accessors); - } - - public Boolean getBoolean(String name) - { - for (Accessor accessor: accessors) - { - if (accessor.getBoolean(name) != null) - { - return accessor.getBoolean(name); - } - } - return null; - } - public Integer getInt(String name) - { - for (Accessor accessor: accessors) - { - if (accessor.getBoolean(name) != null) - { - return accessor.getInt(name); - } - } - return null; - } - - public Long getLong(String name) + public Float getFloat(String name) { - for (Accessor accessor: accessors) + if (source != null && source.containsKey(name)) { - if (accessor.getBoolean(name) != null) + if (source.get(name) instanceof Float) { - return accessor.getLong(name); + return (Float)source.get(name); } - } - return null; - } - - public String getString(String name) - { - for (Accessor accessor: accessors) - { - if (accessor.getBoolean(name) != null) + else { - return accessor.getString(name); + return Float.parseFloat((String)source.get(name)); } } - return null; - } - } - - static class ValidationAccessor implements Accessor - { - private List validators; - private Accessor delegate; - - public ValidationAccessor(Accessor delegate,Validator...validators) - { - this.validators = Arrays.asList(validators); - this.delegate = delegate; - } - - public Boolean getBoolean(String name) - { - // there is nothing to validate in a boolean - return delegate.getBoolean(name); - } - - public Integer getInt(String name) - { - Integer v = delegate.getInt(name); - for (Validator validator: validators) - { - validator.validate(v); - } - return v; - } - - public Long getLong(String name) - { - Long v = delegate.getLong(name); - for (Validator validator: validators) - { - validator.validate(v); - } - return v; - } - - public String getString(String name) - { - String v = delegate.getString(name); - for (Validator validator: validators) + else { - validator.validate(v); + return null; } - return v; } } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java index 51dad51bf9..b43b9d450b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java @@ -63,20 +63,47 @@ public class ClientProperties public static final String SYNC_PUBLISH_PROP_NAME = "sync_publish"; /** - * This value will be used in the following settings - * To calculate the SO_TIMEOUT option of the socket (2*idle_timeout) - * If this values is between the max and min values specified for heartbeat - * by the broker in TuneOK it will be used as the heartbeat interval. - * If not a warning will be printed and the max value specified for - * heartbeat in TuneOK will be used - * - * The default idle timeout is set to 120 secs + * Frequency of heartbeat messages (in milliseconds) + * @see #QPID_HEARTBEAT_INTERVAL */ + @Deprecated public static final String IDLE_TIMEOUT_PROP_NAME = "idle_timeout"; - public static final long DEFAULT_IDLE_TIMEOUT = 120000; - public static final String HEARTBEAT = "qpid.heartbeat"; - public static final int HEARTBEAT_DEFAULT = 120; + /** + * Frequency of heartbeat messages (in seconds) + * @see #QPID_HEARTBEAT_INTERVAL + */ + @Deprecated + public static final String AMQJ_HEARTBEAT_DELAY = "amqj.heartbeat.delay"; + + /** + * Frequency of heartbeat messages (in seconds) + */ + public static final String QPID_HEARTBEAT_INTERVAL = "qpid.heartbeat"; + + /** + * Default heartbeat interval (used by 0-10 protocol). + */ + public static final int QPID_HEARTBEAT_INTERVAL_010_DEFAULT = 120; + + /** + * @see #QPID_HEARTBEAT_TIMEOUT_FACTOR + */ + @Deprecated + public static final String AMQJ_HEARTBEAT_TIMEOUT_FACTOR = "amqj.heartbeat.timeoutFactor"; + + /** + * The factor applied to {@link #QPID_HEARTBEAT_INTERVAL} that determines the maximum + * length of time that may elapse before the peer is deemed to have failed. + * + * @see #QPID_HEARTBEAT_TIMEOUT_FACTOR_DEFAULT + */ + public static final String QPID_HEARTBEAT_TIMEOUT_FACTOR = "qpid.heartbeat_timeout_factor"; + + /** + * Default heartbeat timeout factor. + */ + public static final float QPID_HEARTBEAT_TIMEOUT_FACTOR_DEFAULT = 2.0f; /** * This value will be used to determine the default destination syntax type. @@ -215,6 +242,8 @@ public class ClientProperties */ public static final String SET_EXPIRATION_AS_TTL = "qpid.set_expiration_as_ttl"; + + private ClientProperties() { //No instances diff --git a/qpid/java/common/src/main/java/org/apache/qpid/configuration/QpidProperty.java b/qpid/java/common/src/main/java/org/apache/qpid/configuration/QpidProperty.java index e0989495bb..3ed32a604a 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/configuration/QpidProperty.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/configuration/QpidProperty.java @@ -102,6 +102,11 @@ public abstract class QpidProperty return new QpidStringProperty(accessor,defaultValue, names); } + public static QpidProperty floatProperty(Float defaultValue, String... names) + { + return new QpidFloatProperty(defaultValue, names); + } + protected Accessor getAccessor() { return accessor; @@ -183,4 +188,23 @@ public abstract class QpidProperty } } + static class QpidFloatProperty extends QpidProperty + { + QpidFloatProperty(Float defValue, String... names) + { + super(defValue, names); + } + + QpidFloatProperty(Accessor accessor,Float defValue, String... names) + { + super(accessor,defValue, names); + } + + @Override + protected Float getByName(String name) + { + return getAccessor().getFloat(name); + } + } + } \ No newline at end of file diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java index 185c01d3df..33604b05d9 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java @@ -62,6 +62,5 @@ public interface AMQVersionAwareProtocolSession extends AMQProtocolWriter, Proto public void setSender(Sender sender); - public void init(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java index c75adab444..75eb0e19a7 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java @@ -133,15 +133,17 @@ public class ClientDelegate extends ConnectionDelegate @Override public void connectionTune(Connection conn, ConnectionTune tune) { - int hb_interval = calculateHeartbeatInterval(_connectionSettings.getHeartbeatInterval(), - tune.getHeartbeatMin(), - tune.getHeartbeatMax() - ); + int heartbeatInterval = _connectionSettings.getHeartbeatInterval010(); + float heartbeatTimeoutFactor = _connectionSettings.getHeartbeatTimeoutFactor(); + int actualHeartbeatInterval = calculateHeartbeatInterval(heartbeatInterval, + tune.getHeartbeatMin(), + tune.getHeartbeatMax()); conn.connectionTuneOk(tune.getChannelMax(), tune.getMaxFrameSize(), - hb_interval); - // The idle timeout is twice the heartbeat amount (in milisecs) - conn.setIdleTimeout(hb_interval*1000*2); + actualHeartbeatInterval); + + int idleTimeout = (int)(actualHeartbeatInterval * 1000 * heartbeatTimeoutFactor); + conn.setIdleTimeout(idleTimeout); int channelMax = tune.getChannelMax(); //0 means no implied limit, except available server resources @@ -184,7 +186,7 @@ public class ClientDelegate extends ConnectionDelegate int i = heartbeat; if (i == 0) { - log.info("Idle timeout is 0 sec. Heartbeats are disabled."); + log.info("Heartbeat interval is 0 sec. Heartbeats are disabled."); return 0; // heartbeats are disabled. } else if (i >= min && i <= max) @@ -193,7 +195,7 @@ public class ClientDelegate extends ConnectionDelegate } else { - log.info("The broker does not support the configured connection idle timeout of %s sec," + + log.info("The broker does not support the configured connection heartbeat interval of %s sec," + " using the brokers max supported value of %s sec instead.", i,max); return max; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java index 14dfeb18ec..2ff08fd751 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java @@ -20,6 +20,13 @@ */ package org.apache.qpid.transport; +import static org.apache.qpid.configuration.ClientProperties.AMQJ_HEARTBEAT_DELAY; +import static org.apache.qpid.configuration.ClientProperties.AMQJ_HEARTBEAT_TIMEOUT_FACTOR; +import static org.apache.qpid.configuration.ClientProperties.IDLE_TIMEOUT_PROP_NAME; +import static org.apache.qpid.configuration.ClientProperties.QPID_HEARTBEAT_INTERVAL; +import static org.apache.qpid.configuration.ClientProperties.QPID_HEARTBEAT_INTERVAL_010_DEFAULT; +import static org.apache.qpid.configuration.ClientProperties.QPID_HEARTBEAT_TIMEOUT_FACTOR; +import static org.apache.qpid.configuration.ClientProperties.QPID_HEARTBEAT_TIMEOUT_FACTOR_DEFAULT; import static org.apache.qpid.configuration.ClientProperties.AMQJ_TCP_NODELAY_PROP_NAME; import static org.apache.qpid.configuration.ClientProperties.QPID_SSL_KEY_MANAGER_FACTORY_ALGORITHM_PROP_NAME; import static org.apache.qpid.configuration.ClientProperties.QPID_SSL_KEY_STORE_CERT_TYPE_PROP_NAME; @@ -50,6 +57,7 @@ public class ConnectionSettings { public static final String WILDCARD_ADDRESS = "*"; + private String protocol = "tcp"; private String host = "localhost"; private String vhost; @@ -59,7 +67,9 @@ public class ConnectionSettings private boolean tcpNodelay = QpidProperty.booleanProperty(Boolean.TRUE, QPID_TCP_NODELAY_PROP_NAME, AMQJ_TCP_NODELAY_PROP_NAME).get(); private int maxChannelCount = 32767; private int maxFrameSize = 65535; - private int heartbeatInterval; + private Integer hearbeatIntervalLegacyMs = QpidProperty.intProperty(null, IDLE_TIMEOUT_PROP_NAME).get(); + private Integer heartbeatInterval = QpidProperty.intProperty(null, QPID_HEARTBEAT_INTERVAL, AMQJ_HEARTBEAT_DELAY).get(); + private float heartbeatTimeoutFactor = QpidProperty.floatProperty(QPID_HEARTBEAT_TIMEOUT_FACTOR_DEFAULT, QPID_HEARTBEAT_TIMEOUT_FACTOR, AMQJ_HEARTBEAT_TIMEOUT_FACTOR).get(); private int connectTimeout = 30000; private int readBufferSize = QpidProperty.intProperty(65535, RECEIVE_BUFFER_SIZE_PROP_NAME, LEGACY_RECEIVE_BUFFER_SIZE_PROP_NAME).get(); private int writeBufferSize = QpidProperty.intProperty(65535, SEND_BUFFER_SIZE_PROP_NAME, LEGACY_SEND_BUFFER_SIZE_PROP_NAME).get();; @@ -95,9 +105,45 @@ public class ConnectionSettings this.tcpNodelay = tcpNodelay; } - public int getHeartbeatInterval() - { - return heartbeatInterval; + /** + * Gets the heartbeat interval (seconds) for 0-8/9/9-1 protocols. + * 0 means heartbeating is disabled. + * null means use the broker-supplied value. + */ + public Integer getHeartbeatInterval08() + { + if (heartbeatInterval != null) + { + return heartbeatInterval; + } + else if (hearbeatIntervalLegacyMs != null) + { + return hearbeatIntervalLegacyMs / 1000; + } + else + { + return null; + } + } + + /** + * Gets the heartbeat interval (seconds) for the 0-10 protocol. + * 0 means heartbeating is disabled. + */ + public int getHeartbeatInterval010() + { + if (heartbeatInterval != null) + { + return heartbeatInterval; + } + else if (hearbeatIntervalLegacyMs != null) + { + return hearbeatIntervalLegacyMs / 1000; + } + else + { + return QPID_HEARTBEAT_INTERVAL_010_DEFAULT; + } } public void setHeartbeatInterval(int heartbeatInterval) @@ -105,6 +151,11 @@ public class ConnectionSettings this.heartbeatInterval = heartbeatInterval; } + public float getHeartbeatTimeoutFactor() + { + return this.heartbeatTimeoutFactor; + } + public String getProtocol() { return protocol; @@ -374,4 +425,5 @@ public class ConnectionSettings { this.writeBufferSize = writeBufferSize; } + } diff --git a/qpid/java/common/src/test/java/org/apache/qpid/configuration/QpidPropertyTest.java b/qpid/java/common/src/test/java/org/apache/qpid/configuration/QpidPropertyTest.java index 2a8c177f64..335270264c 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/configuration/QpidPropertyTest.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/configuration/QpidPropertyTest.java @@ -145,6 +145,25 @@ public class QpidPropertyTest extends QpidTestCase assertEquals(expectedValue, propertyValue); } + public void testFloatValueReadFromSystemProperty() throws Exception + { + float expectedValue = 1.5f; + setTestSystemProperty(_systemPropertyName, Float.valueOf(expectedValue).toString()); + assertSystemPropertiesSet(_systemPropertyName); + + float propertyValue = QpidProperty.floatProperty(1.5f, _systemPropertyName).get(); + assertEquals(expectedValue, propertyValue, 0.1); + } + + public void testFloatValueIsDefaultWhenOneSystemPropertyIsNotSet() throws Exception + { + float expectedValue = 1.5f; + assertSystemPropertiesNotSet(_systemPropertyName); + + float propertyValue = QpidProperty.floatProperty(expectedValue, _systemPropertyName).get(); + assertEquals(expectedValue, propertyValue, 0.1); + } + private void assertSystemPropertiesSet(String... systemPropertyNames) { for (String systemPropertyName : systemPropertyNames) diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionSettingsTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionSettingsTest.java index fc4f5374f0..d031842f9d 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionSettingsTest.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionSettingsTest.java @@ -128,11 +128,48 @@ public class ConnectionSettingsTest extends QpidTestCase } @SuppressWarnings("deprecation") - public void testtestReceiveBufferSizeOverriddenLegacyOverridden() + public void testReceiveBufferSizeOverriddenLegacyOverridden() { systemPropertyOverrideForSocketBufferSize(ClientProperties.LEGACY_RECEIVE_BUFFER_SIZE_PROP_NAME, 1024, true); } + public void testHeartbeatingDefaults() + { + assertNull(_conConnectionSettings.getHeartbeatInterval08()); + assertEquals(ClientProperties.QPID_HEARTBEAT_INTERVAL_010_DEFAULT,_conConnectionSettings.getHeartbeatInterval010()); + assertEquals(2.0, _conConnectionSettings.getHeartbeatTimeoutFactor(), 0.1); + } + + public void testHeartbeatingOverridden() + { + resetSystemProperty(ClientProperties.QPID_HEARTBEAT_INTERVAL, "60"); + resetSystemProperty(ClientProperties.QPID_HEARTBEAT_TIMEOUT_FACTOR, "2.5"); + + assertEquals(Integer.valueOf(60), _conConnectionSettings.getHeartbeatInterval08()); + assertEquals(60, _conConnectionSettings.getHeartbeatInterval010()); + assertEquals(2.5, _conConnectionSettings.getHeartbeatTimeoutFactor(), 0.1); + } + + @SuppressWarnings("deprecation") + public void testHeartbeatingOverriddenUsingAmqjLegacyOption() + { + resetSystemProperty(ClientProperties.AMQJ_HEARTBEAT_DELAY, "30"); + resetSystemProperty(ClientProperties.AMQJ_HEARTBEAT_TIMEOUT_FACTOR, "1.5"); + + assertEquals(Integer.valueOf(30), _conConnectionSettings.getHeartbeatInterval08()); + assertEquals(30, _conConnectionSettings.getHeartbeatInterval010()); + assertEquals(1.5, _conConnectionSettings.getHeartbeatTimeoutFactor(), 0.1); + } + + @SuppressWarnings("deprecation") + public void testHeartbeatingOverriddenUsingOlderLegacyOption() + { + resetSystemProperty(ClientProperties.IDLE_TIMEOUT_PROP_NAME, "30000"); + + assertEquals(Integer.valueOf(30), _conConnectionSettings.getHeartbeatInterval08()); + assertEquals(30, _conConnectionSettings.getHeartbeatInterval010()); + } + private void systemPropertyOverrideForTcpDelay(String propertyName, boolean value) { resetSystemProperty(propertyName, String.valueOf(value)); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java index 0e01bda8d0..143565c648 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java @@ -18,49 +18,86 @@ */ package org.apache.qpid.client; +import static org.apache.qpid.configuration.ClientProperties.AMQJ_HEARTBEAT_DELAY; +import static org.apache.qpid.configuration.ClientProperties.IDLE_TIMEOUT_PROP_NAME; +import static org.apache.qpid.configuration.ClientProperties.QPID_HEARTBEAT_INTERVAL; + import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; + +import org.apache.qpid.server.model.Broker; import org.apache.qpid.test.utils.QpidBrokerTestCase; public class HeartbeatTest extends QpidBrokerTestCase { - public void testHeartbeats() throws Exception + private static final String CONNECTION_URL_WITH_HEARTBEAT = "amqp://guest:guest@clientid/?brokerlist='localhost:%d?heartbeat='%d''"; + private TestListener _listener = new TestListener(); + + @Override + public void setUp() throws Exception + { + if (getName().equals("testHeartbeatsEnabledBrokerSide")) + { + getBrokerConfiguration().setBrokerAttribute(Broker.CONNECTION_HEART_BEAT_DELAY, "1"); + } + super.setUp(); + } + + public void testHeartbeatsEnabledUsingUrl() throws Exception + { + final String url = String.format(CONNECTION_URL_WITH_HEARTBEAT, DEFAULT_PORT, 1); + AMQConnection conn = (AMQConnection) getConnection(new AMQConnectionURL(url)); + conn.setHeartbeatListener(_listener); + conn.start(); + + Thread.sleep(2500); + + assertTrue("Too few heartbeats received: "+_listener._heartbeatsReceived+" (expected at least 2)", _listener._heartbeatsReceived>=2); + assertTrue("Too few heartbeats sent "+_listener._heartbeatsSent+" (expected at least 2)", _listener._heartbeatsSent>=2); + + conn.close(); + } + + public void testHeartbeatsEnabledUsingSystemProperty() throws Exception { - setTestSystemProperty("amqj.heartbeat.delay", "1"); + setTestSystemProperty(QPID_HEARTBEAT_INTERVAL, "1"); AMQConnection conn = (AMQConnection) getConnection(); - TestListener listener = new TestListener(); - conn.setHeartbeatListener(listener); + conn.setHeartbeatListener(_listener); conn.start(); Thread.sleep(2500); - assertTrue("Too few heartbeats received: "+listener._heartbeatsReceived+" (expected at least 2)", listener._heartbeatsReceived>=2); - assertTrue("Too few heartbeats sent "+listener._heartbeatsSent+" (expected at least 2)", listener._heartbeatsSent>=2); + assertTrue("Too few heartbeats received: "+_listener._heartbeatsReceived+" (expected at least 2)", _listener._heartbeatsReceived>=2); + assertTrue("Too few heartbeats sent "+_listener._heartbeatsSent+" (expected at least 2)", _listener._heartbeatsSent>=2); conn.close(); } - public void testNoHeartbeats() throws Exception + public void testHeartbeatsDisabledUsingSystemProperty() throws Exception { - setTestSystemProperty("amqj.heartbeat.delay", "0"); + setTestSystemProperty(QPID_HEARTBEAT_INTERVAL, "0"); AMQConnection conn = (AMQConnection) getConnection(); - TestListener listener = new TestListener(); - conn.setHeartbeatListener(listener); + conn.setHeartbeatListener(_listener); conn.start(); Thread.sleep(2500); - assertEquals("Heartbeats unexpectedly received", 0, listener._heartbeatsReceived); - assertEquals("Heartbeats unexpectedly sent ", 0, listener._heartbeatsSent); + assertEquals("Heartbeats unexpectedly received", 0, _listener._heartbeatsReceived); + assertEquals("Heartbeats unexpectedly sent ", 0, _listener._heartbeatsSent); conn.close(); } - public void testReadOnlyConnectionHeartbeats() throws Exception + /** + * This test carefully arranges message flow so that bytes flow only from producer to broker + * on the producer side and broker to consumer on the consumer side, deliberately leaving the + * reverse path quiet so heartbeats will flow. + */ + public void testUnidirectionalHeartbeating() throws Exception { - setTestSystemProperty("amqj.heartbeat.delay","1"); + setTestSystemProperty(QPID_HEARTBEAT_INTERVAL,"1"); AMQConnection receiveConn = (AMQConnection) getConnection(); AMQConnection sendConn = (AMQConnection) getConnection(); Destination destination = getTestQueue(); @@ -83,10 +120,9 @@ public class HeartbeatTest extends QpidBrokerTestCase producer.send(senderSession.createTextMessage("Msg " + i)); Thread.sleep(500); assertNotNull("Expected to received message", consumer.receive(500)); + // Consumer does not ack the message in order to generate no bytes from consumer back to Broker } - - assertTrue("Too few heartbeats sent "+receiveListener._heartbeatsSent+" (expected at least 2)", receiveListener._heartbeatsSent>=2); assertEquals("Unexpected sent at the sender: ",0,sendListener._heartbeatsSent); @@ -97,6 +133,54 @@ public class HeartbeatTest extends QpidBrokerTestCase sendConn.close(); } + public void testHeartbeatsEnabledBrokerSide() throws Exception + { + + AMQConnection conn = (AMQConnection) getConnection(); + conn.setHeartbeatListener(_listener); + conn.start(); + + Thread.sleep(2500); + + assertTrue("Too few heartbeats received: "+_listener._heartbeatsReceived+" (expected at least 2)", _listener._heartbeatsReceived>=2); + assertTrue("Too few heartbeats sent "+_listener._heartbeatsSent+" (expected at least 2)", _listener._heartbeatsSent>=2); + + conn.close(); + } + + + @SuppressWarnings("deprecation") + public void testHeartbeatsEnabledUsingAmqjLegacySystemProperty() throws Exception + { + setTestSystemProperty(AMQJ_HEARTBEAT_DELAY, "1"); + AMQConnection conn = (AMQConnection) getConnection(); + conn.setHeartbeatListener(_listener); + conn.start(); + + Thread.sleep(2500); + + assertTrue("Too few heartbeats received: "+_listener._heartbeatsReceived+" (expected at least 2)", _listener._heartbeatsReceived>=2); + assertTrue("Too few heartbeats sent "+_listener._heartbeatsSent+" (expected at least 2)", _listener._heartbeatsSent>=2); + + conn.close(); + } + + @SuppressWarnings("deprecation") + public void testHeartbeatsEnabledUsingOlderLegacySystemProperty() throws Exception + { + setTestSystemProperty(IDLE_TIMEOUT_PROP_NAME, "1000"); + AMQConnection conn = (AMQConnection) getConnection(); + conn.setHeartbeatListener(_listener); + conn.start(); + + Thread.sleep(2500); + + assertTrue("Too few heartbeats received: "+_listener._heartbeatsReceived+" (expected at least 2)", _listener._heartbeatsReceived>=2); + assertTrue("Too few heartbeats sent "+_listener._heartbeatsSent+" (expected at least 2)", _listener._heartbeatsSent>=2); + + conn.close(); + } + private class TestListener implements HeartbeatListener { int _heartbeatsReceived; diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes index 8ea592daf9..6ce0936c08 100755 --- a/qpid/java/test-profiles/CPPExcludes +++ b/qpid/java/test-profiles/CPPExcludes @@ -186,7 +186,8 @@ org.apache.qpid.client.ssl.SSLTest#testCreateSSLandTCPonSamePort // QPID-2796 : Java 0-10 client only sends heartbeats in response to heartbeats from the server, not timeout based -org.apache.qpid.client.HeartbeatTest#testReadOnlyConnectionHeartbeats +org.apache.qpid.client.HeartbeatTest#testUnidirectionalHeartbeating +org.apache.qpid.client.HeartbeatTest#testHeartbeatsEnabledBrokerSide // Exclude java broker specific behavior allowing queue re-bind to topic exchanges on 0.8/0-10 paths org.apache.qpid.server.queue.QueueBindTest#testQueueCanBeReboundOnTopicExchange diff --git a/qpid/java/test-profiles/Java010Excludes b/qpid/java/test-profiles/Java010Excludes index 02c2db1134..093821647d 100755 --- a/qpid/java/test-profiles/Java010Excludes +++ b/qpid/java/test-profiles/Java010Excludes @@ -68,7 +68,8 @@ org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#testFlowContro org.apache.qpid.client.AsynchMessageListenerTest#testImmediatePrefetchWithMessageListener // QPID-2796 : Java 0-10 client only sends heartbeats in response to heartbeats from the server, not timeout based -org.apache.qpid.client.HeartbeatTest#testReadOnlyConnectionHeartbeats +org.apache.qpid.client.HeartbeatTest#testUnidirectionalHeartbeating +org.apache.qpid.client.HeartbeatTest#testHeartbeatsEnabledBrokerSide // Java 0-10 client does not support re-binding the queue to the same exchange org.apache.qpid.server.queue.QueueBindTest#testQueueCanBeReboundOnTopicExchange -- cgit v1.2.1