summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-01-09 16:53:51 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-01-09 16:53:51 +0000
commit30d213dc1e6d743f2f0abb44c8bc91868d5126b1 (patch)
treef3d16257ed0a431f2f4c43166f4df84ccb877a6c /qpid/java/broker-plugins
parentb165cf52a4ef16ac5a5ee181d4da2db351f7882d (diff)
downloadqpid-python-30d213dc1e6d743f2f0abb44c8bc91868d5126b1.tar.gz
QPID-5459 : Add WebSocket transport support to the Java Broker and AMQP 1-0 JMS client
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1556873 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
-rw-r--r--qpid/java/broker-plugins/websocket/build.xml32
-rw-r--r--qpid/java/broker-plugins/websocket/pom.xml158
-rw-r--r--qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java294
-rw-r--r--qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProvider.java51
-rw-r--r--qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProviderFactory.java53
-rw-r--r--qpid/java/broker-plugins/websocket/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.TransportProviderFactory19
6 files changed, 607 insertions, 0 deletions
diff --git a/qpid/java/broker-plugins/websocket/build.xml b/qpid/java/broker-plugins/websocket/build.xml
new file mode 100644
index 0000000000..fc3dd3b846
--- /dev/null
+++ b/qpid/java/broker-plugins/websocket/build.xml
@@ -0,0 +1,32 @@
+<!--
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -->
+<project name="Qpid Broker-Plugins Websocket Transport" default="build">
+ <property name="module.depends" value="common broker-core" />
+ <property name="module.test.depends" value="qpid-test-utils broker-core/tests" />
+
+ <property name="module.genpom" value="true"/>
+ <property name="module.genpom.args" value="-Sqpid-common=provided -Sqpid-broker-core=provided"/>
+
+ <property name="broker.plugin" value="true"/>
+ <property name="broker-plugins-websocket.libs" value="" />
+
+ <import file="../../module.xml" />
+
+ <target name="bundle" depends="bundle-tasks"/>
+</project>
diff --git a/qpid/java/broker-plugins/websocket/pom.xml b/qpid/java/broker-plugins/websocket/pom.xml
new file mode 100644
index 0000000000..2029bd33aa
--- /dev/null
+++ b/qpid/java/broker-plugins/websocket/pom.xml
@@ -0,0 +1,158 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>qpid-project</artifactId>
+ <groupId>org.apache.qpid</groupId>
+ <version>0.26-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>qpid-broker-plugins-websocket</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker-core</artifactId>
+ <version>0.26-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-servlet_2.5_spec</artifactId>
+ <version>1.2</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ <version>7.6.10.v20130312</version>
+ <scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.eclipse.jetty.orbit</groupId>
+ <artifactId>javax.servlet</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-continuation</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-http</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-continuation</artifactId>
+ <version>7.6.10.v20130312</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-security</artifactId>
+ <version>7.6.10.v20130312</version>
+ <scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-http</artifactId>
+ <version>7.6.10.v20130312</version>
+ <scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-io</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-io</artifactId>
+ <version>7.6.10.v20130312</version>
+ <scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlet</artifactId>
+ <version>7.6.10.v20130312</version>
+ <scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-security</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ <version>7.6.10.v20130312</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-websocket</artifactId>
+ <version>7.6.10.v20130312</version>
+ <scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-io</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-http</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+
+ <build>
+ </build>
+
+</project> \ No newline at end of file
diff --git a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
new file mode 100644
index 0000000000..b44ed70040
--- /dev/null
+++ b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
@@ -0,0 +1,294 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport.websocket;
+
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.protocol.AmqpProtocolVersion;
+import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
+import org.apache.qpid.server.transport.AcceptingTransport;
+import org.apache.qpid.transport.Binary;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.nio.SelectChannelConnector;
+import org.eclipse.jetty.server.ssl.SslSocketConnector;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.websocket.WebSocket;
+import org.eclipse.jetty.websocket.WebSocketHandler;
+
+import javax.net.ssl.SSLContext;
+import javax.servlet.http.HttpServletRequest;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.util.Set;
+
+class WebSocketProvider implements AcceptingTransport
+{
+ public static final String AMQP_WEBSOCKET_SUBPROTOCOL = "AMQPWSB10";
+ private final Transport _transport;
+ private final SSLContext _sslContext;
+ private final Port _port;
+ private final Set<AmqpProtocolVersion> _supported;
+ private final AmqpProtocolVersion _defaultSupportedProtocolReply;
+ private final ProtocolEngineFactory _factory;
+ private Server _server;
+
+ WebSocketProvider(final Transport transport,
+ final SSLContext sslContext,
+ final Port port,
+ final Set<AmqpProtocolVersion> supported,
+ final AmqpProtocolVersion defaultSupportedProtocolReply)
+ {
+ _transport = transport;
+ _sslContext = sslContext;
+ _port = port;
+ _supported = supported;
+ _defaultSupportedProtocolReply = defaultSupportedProtocolReply;
+ _factory = new MultiVersionProtocolEngineFactory(
+ _port.getParent(Broker.class), null,
+ (Boolean)_port.getAttribute(Port.WANT_CLIENT_AUTH),
+ (Boolean)_port.getAttribute(Port.NEED_CLIENT_AUTH),
+ _supported,
+ _defaultSupportedProtocolReply,
+ _port,
+ _transport);
+
+ }
+
+ @Override
+ public void start()
+ {
+ _server = new Server();
+
+ Connector connector = null;
+
+
+ if (_transport == Transport.WS)
+ {
+ connector = new SelectChannelConnector();
+ }
+ else if (_transport == Transport.WSS)
+ {
+ SslContextFactory factory = new SslContextFactory();
+ factory.setSslContext(_sslContext);
+ connector = new SslSocketConnector(factory);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Unexpected transport on port " + _port.getName() + ":" + _transport);
+ }
+ String bindingAddress = _port.getBindingAddress();
+ if(bindingAddress != null && !bindingAddress.trim().equals("") && !bindingAddress.trim().equals("*"))
+ {
+ connector.setHost(bindingAddress.trim());
+ }
+ connector.setPort(_port.getPort());
+ _server.addConnector(connector);
+
+ WebSocketHandler wshandler = new WebSocketHandler()
+ {
+ @Override
+ public WebSocket doWebSocketConnect(final HttpServletRequest request, final String protocol)
+ {
+ SocketAddress remoteAddress = new InetSocketAddress(request.getRemoteHost(), request.getRemotePort());
+ SocketAddress localAddress = new InetSocketAddress(request.getLocalName(), request.getLocalPort());
+ return AMQP_WEBSOCKET_SUBPROTOCOL.equals(protocol) ? new AmqpWebSocket(_transport, localAddress, remoteAddress) : null;
+ }
+ };
+
+ _server.setHandler(wshandler);
+ try
+ {
+ _server.start();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ @Override
+ public void close()
+ {
+
+ }
+
+ private class AmqpWebSocket implements WebSocket,WebSocket.OnBinaryMessage
+ {
+ private final SocketAddress _localAddress;
+ private final SocketAddress _remoteAddress;
+ private Connection _connection;
+ private final Transport _transport;
+ private ProtocolEngine _engine;
+
+ private AmqpWebSocket(final Transport transport,
+ final SocketAddress localAddress,
+ final SocketAddress remoteAddress)
+ {
+ _transport = transport;
+ _localAddress = localAddress;
+ _remoteAddress = remoteAddress;
+ }
+
+ @Override
+ public void onMessage(final byte[] data, final int offset, final int length)
+ {
+ _engine.received(ByteBuffer.wrap(data, offset, length).slice());
+ }
+
+ @Override
+ public void onOpen(final Connection connection)
+ {
+ _connection = connection;
+
+ _engine = _factory.newProtocolEngine();
+
+ final NetworkConnection connectionWrapper = new ConnectionWrapper(connection, _localAddress, _remoteAddress);
+ _engine.setNetworkConnection(connectionWrapper, connectionWrapper.getSender());
+
+ }
+
+ @Override
+ public void onClose(final int closeCode, final String message)
+ {
+ _engine.closed();
+ }
+ }
+
+ private class ConnectionWrapper implements NetworkConnection, Sender<ByteBuffer>
+ {
+ private final WebSocket.Connection _connection;
+ private final SocketAddress _localAddress;
+ private final SocketAddress _remoteAddress;
+ private Principal _principal;
+ private int _maxWriteIdle;
+ private int _maxReadIdle;
+
+ public ConnectionWrapper(final WebSocket.Connection connection,
+ final SocketAddress localAddress,
+ final SocketAddress remoteAddress)
+ {
+ _connection = connection;
+ _localAddress = localAddress;
+ _remoteAddress = remoteAddress;
+ }
+
+ @Override
+ public Sender<ByteBuffer> getSender()
+ {
+ return this;
+ }
+
+ @Override
+ public void start()
+ {
+
+ }
+
+ @Override
+ public void setIdleTimeout(final int i)
+ {
+
+ }
+
+ @Override
+ public void send(final ByteBuffer msg)
+ {
+ try
+ {
+ _connection.sendMessage(msg.array(),msg.arrayOffset()+msg.position(),msg.remaining());
+ }
+ catch (IOException e)
+ {
+ close();
+ }
+ }
+
+ @Override
+ public void flush()
+ {
+
+ }
+
+ @Override
+ public void close()
+ {
+ _connection.close();
+ }
+
+ @Override
+ public SocketAddress getRemoteAddress()
+ {
+ return _remoteAddress;
+ }
+
+ @Override
+ public SocketAddress getLocalAddress()
+ {
+ return _localAddress;
+ }
+
+ @Override
+ public void setMaxWriteIdle(final int sec)
+ {
+ _maxWriteIdle = sec;
+ }
+
+ @Override
+ public void setMaxReadIdle(final int sec)
+ {
+ _maxReadIdle = sec;
+ }
+
+ @Override
+ public void setPeerPrincipal(final Principal principal)
+ {
+ _principal = principal;
+ }
+
+ @Override
+ public Principal getPeerPrincipal()
+ {
+ return _principal;
+ }
+
+ @Override
+ public int getMaxReadIdle()
+ {
+ return _maxReadIdle;
+ }
+
+ @Override
+ public int getMaxWriteIdle()
+ {
+ return _maxWriteIdle;
+ }
+ }
+}
diff --git a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProvider.java b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProvider.java
new file mode 100644
index 0000000000..02d1100315
--- /dev/null
+++ b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProvider.java
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport.websocket;
+
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.protocol.AmqpProtocolVersion;
+import org.apache.qpid.server.transport.AcceptingTransport;
+import org.apache.qpid.server.transport.TransportProvider;
+
+import javax.net.ssl.SSLContext;
+import java.util.Set;
+
+class WebSocketTransportProvider implements TransportProvider
+{
+ public WebSocketTransportProvider()
+ {
+ }
+
+ @Override
+ public AcceptingTransport createTransport(final Set<Transport> transports,
+ final SSLContext sslContext,
+ final Port port,
+ final Set<AmqpProtocolVersion> supported,
+ final AmqpProtocolVersion defaultSupportedProtocolReply)
+ {
+ return new WebSocketProvider(transports.iterator().next(),
+ sslContext,
+ port,
+ supported,
+ defaultSupportedProtocolReply);
+ }
+}
diff --git a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProviderFactory.java b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProviderFactory.java
new file mode 100644
index 0000000000..662f16ce5b
--- /dev/null
+++ b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProviderFactory.java
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport.websocket;
+
+import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.plugin.TransportProviderFactory;
+import org.apache.qpid.server.transport.TransportProvider;
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Set;
+
+public class WebSocketTransportProviderFactory implements TransportProviderFactory
+{
+
+ private static final String TYPE = "Websocket";
+
+ @Override
+ public Set<Set<Transport>> getSupportedTransports()
+ {
+ return Collections.singleton((Set<Transport>)EnumSet.of(Transport.WS));
+ }
+
+ @Override
+ public TransportProvider getTransportProvider(final Set<Transport> transports)
+ {
+ return new WebSocketTransportProvider();
+ }
+
+ @Override
+ public String getType()
+ {
+ return TYPE;
+ }
+}
diff --git a/qpid/java/broker-plugins/websocket/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.TransportProviderFactory b/qpid/java/broker-plugins/websocket/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.TransportProviderFactory
new file mode 100644
index 0000000000..55b88cc7be
--- /dev/null
+++ b/qpid/java/broker-plugins/websocket/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.TransportProviderFactory
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.qpid.server.transport.websocket.WebSocketTransportProviderFactory \ No newline at end of file