From 6f5d96325706a81a91e5bdfbdafb37a296478bf0 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Fri, 18 Dec 2009 16:23:19 +0000 Subject: QPID-2273 : Fix Protocol Negotiation git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@892301 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/main/java/org/apache/qpid/server/Main.java | 15 ++++ .../server/configuration/ServerConfiguration.java | 5 ++ .../output/ProtocolOutputConverterRegistry.java | 1 - .../protocol/MultiVersionProtocolEngine.java | 83 ++++++++++++++++++---- 4 files changed, 91 insertions(+), 13 deletions(-) (limited to 'qpid/java/broker/src') diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java index 845983857c..90afd2e4ac 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -140,6 +140,12 @@ public class Main .withDescription("when listening on the specified port do not accept AMQP0-10 connections. The specified port must be one specified on the command line") .withLongOpt("exclude-0-10").create(); + Option exclude0_9_1 = + OptionBuilder.withArgName("exclude-0-9-1").hasArg() + .withDescription("when listening on the specified port do not accept AMQP0-9-1 connections. The specified port must be one specified on the command line") + .withLongOpt("exclude-0-9-1").create(); + + Option exclude0_9 = OptionBuilder.withArgName("exclude-0-9").hasArg() .withDescription("when listening on the specified port do not accept AMQP0-9 connections. The specified port must be one specified on the command line") @@ -179,6 +185,7 @@ public class Main options.addOption(logwatchconfig); options.addOption(port); options.addOption(exclude0_10); + options.addOption(exclude0_9_1); options.addOption(exclude0_9); options.addOption(exclude0_8); options.addOption(mport); @@ -335,6 +342,7 @@ public class Main Set ports = new HashSet(); Set exclude_0_10 = new HashSet(); + Set exclude_0_9_1 = new HashSet(); Set exclude_0_9 = new HashSet(); Set exclude_0_8 = new HashSet(); @@ -343,6 +351,7 @@ public class Main parsePortList(ports, serverConfig.getPorts()); parsePortList(exclude_0_10, serverConfig.getPortExclude010()); + parsePortList(exclude_0_9_1, serverConfig.getPortExclude091()); parsePortList(exclude_0_9, serverConfig.getPortExclude09()); parsePortList(exclude_0_8, serverConfig.getPortExclude08()); @@ -351,6 +360,7 @@ public class Main { parsePortArray(ports, portStr); parsePortArray(exclude_0_10, commandLine.getOptionValues("exclude-0-10")); + parsePortArray(exclude_0_9_1, commandLine.getOptionValues("exclude-0-9-1")); parsePortArray(exclude_0_9, commandLine.getOptionValues("exclude-0-9")); parsePortArray(exclude_0_8, commandLine.getOptionValues("exclude-0-8")); @@ -399,6 +409,11 @@ public class Main { supported.remove(VERSION.v0_10); } + + if(exclude_0_9_1.contains(port)) + { + supported.remove(VERSION.v0_9_1); + } if(exclude_0_9.contains(port)) { supported.remove(VERSION.v0_9); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java index 66a7279134..879eb7c9e6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -538,6 +538,11 @@ public class ServerConfiguration implements SignalHandler return getConfig().getList("connector.non010port", Collections.EMPTY_LIST); } + public List getPortExclude091() + { + return getConfig().getList("connector.non091port", Collections.EMPTY_LIST); + } + public List getPortExclude09() { return getConfig().getList("connector.non09port", Collections.EMPTY_LIST); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java index 3a94160e22..dbefeb61f2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java @@ -45,7 +45,6 @@ public class ProtocolOutputConverterRegistry register(ProtocolVersion.v8_0, org.apache.qpid.server.output.amqp0_8.ProtocolOutputConverterImpl.getInstanceFactory()); register(ProtocolVersion.v0_9, org.apache.qpid.server.output.amqp0_9.ProtocolOutputConverterImpl.getInstanceFactory()); register(ProtocolVersion.v0_91, org.apache.qpid.server.output.amqp0_9_1.ProtocolOutputConverterImpl.getInstanceFactory()); - } private static void register(ProtocolVersion version, Factory converter) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index 78e21a8f14..9a1c6c9418 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -135,7 +135,7 @@ private static final byte[] AMQP_0_9_1_HEADER = (byte) 'M', (byte) 'Q', (byte) 'P', - (byte) 1, + (byte) 0, (byte) 0, (byte) 9, (byte) 1 @@ -250,6 +250,59 @@ private static final byte[] AMQP_0_9_1_HEADER = new DelegateCreator[] { creator_0_8, creator_0_9, creator_0_9_1, creator_0_10 }; + private class ClosedDelegateProtocolEngine implements ProtocolEngine + { + public void setNetworkDriver(NetworkDriver driver) + { + _networkDriver = driver; + } + + public SocketAddress getRemoteAddress() + { + return _networkDriver.getRemoteAddress(); + } + + public SocketAddress getLocalAddress() + { + return _networkDriver.getLocalAddress(); + } + + public long getWrittenBytes() + { + return 0; + } + + public long getReadBytes() + { + return 0; + } + + public void received(ByteBuffer msg) + { + _logger.error("Error processing incoming data, could not negotiate a common protocol"); + } + + public void exception(Throwable t) + { + _logger.error("Error establishing session", t); + } + + public void closed() + { + + } + + public void writerIdle() + { + + } + + public void readerIdle() + { + + } + } + private class SelfDelegateProtocolEngine implements ProtocolEngine { @@ -303,12 +356,14 @@ private static final byte[] AMQP_0_9_1_HEADER = ProtocolEngine newDelegate = null; + byte[] newestSupported = null; for(int i = 0; newDelegate == null && i < _creators.length; i++) { if(_supported.contains(_creators[i].getVersion())) { + newestSupported = _creators[i].getHeaderIdentifier(); byte[] compareBytes = _creators[i].getHeaderIdentifier(); boolean equal = true; for(int j = 0; equal && j