summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins/websocket/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-01-09 16:53:51 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-01-09 16:53:51 +0000
commit30d213dc1e6d743f2f0abb44c8bc91868d5126b1 (patch)
treef3d16257ed0a431f2f4c43166f4df84ccb877a6c /qpid/java/broker-plugins/websocket/src
parentb165cf52a4ef16ac5a5ee181d4da2db351f7882d (diff)
downloadqpid-python-30d213dc1e6d743f2f0abb44c8bc91868d5126b1.tar.gz
QPID-5459 : Add WebSocket transport support to the Java Broker and AMQP 1-0 JMS client
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1556873 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins/websocket/src')
-rw-r--r--qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java294
-rw-r--r--qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProvider.java51
-rw-r--r--qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProviderFactory.java53
-rw-r--r--qpid/java/broker-plugins/websocket/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.TransportProviderFactory19
4 files changed, 417 insertions, 0 deletions
diff --git a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
new file mode 100644
index 0000000000..b44ed70040
--- /dev/null
+++ b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
@@ -0,0 +1,294 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport.websocket;
+
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.protocol.AmqpProtocolVersion;
+import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
+import org.apache.qpid.server.transport.AcceptingTransport;
+import org.apache.qpid.transport.Binary;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.nio.SelectChannelConnector;
+import org.eclipse.jetty.server.ssl.SslSocketConnector;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.websocket.WebSocket;
+import org.eclipse.jetty.websocket.WebSocketHandler;
+
+import javax.net.ssl.SSLContext;
+import javax.servlet.http.HttpServletRequest;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.util.Set;
+
+class WebSocketProvider implements AcceptingTransport
+{
+ public static final String AMQP_WEBSOCKET_SUBPROTOCOL = "AMQPWSB10";
+ private final Transport _transport;
+ private final SSLContext _sslContext;
+ private final Port _port;
+ private final Set<AmqpProtocolVersion> _supported;
+ private final AmqpProtocolVersion _defaultSupportedProtocolReply;
+ private final ProtocolEngineFactory _factory;
+ private Server _server;
+
+ WebSocketProvider(final Transport transport,
+ final SSLContext sslContext,
+ final Port port,
+ final Set<AmqpProtocolVersion> supported,
+ final AmqpProtocolVersion defaultSupportedProtocolReply)
+ {
+ _transport = transport;
+ _sslContext = sslContext;
+ _port = port;
+ _supported = supported;
+ _defaultSupportedProtocolReply = defaultSupportedProtocolReply;
+ _factory = new MultiVersionProtocolEngineFactory(
+ _port.getParent(Broker.class), null,
+ (Boolean)_port.getAttribute(Port.WANT_CLIENT_AUTH),
+ (Boolean)_port.getAttribute(Port.NEED_CLIENT_AUTH),
+ _supported,
+ _defaultSupportedProtocolReply,
+ _port,
+ _transport);
+
+ }
+
+ @Override
+ public void start()
+ {
+ _server = new Server();
+
+ Connector connector = null;
+
+
+ if (_transport == Transport.WS)
+ {
+ connector = new SelectChannelConnector();
+ }
+ else if (_transport == Transport.WSS)
+ {
+ SslContextFactory factory = new SslContextFactory();
+ factory.setSslContext(_sslContext);
+ connector = new SslSocketConnector(factory);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Unexpected transport on port " + _port.getName() + ":" + _transport);
+ }
+ String bindingAddress = _port.getBindingAddress();
+ if(bindingAddress != null && !bindingAddress.trim().equals("") && !bindingAddress.trim().equals("*"))
+ {
+ connector.setHost(bindingAddress.trim());
+ }
+ connector.setPort(_port.getPort());
+ _server.addConnector(connector);
+
+ WebSocketHandler wshandler = new WebSocketHandler()
+ {
+ @Override
+ public WebSocket doWebSocketConnect(final HttpServletRequest request, final String protocol)
+ {
+ SocketAddress remoteAddress = new InetSocketAddress(request.getRemoteHost(), request.getRemotePort());
+ SocketAddress localAddress = new InetSocketAddress(request.getLocalName(), request.getLocalPort());
+ return AMQP_WEBSOCKET_SUBPROTOCOL.equals(protocol) ? new AmqpWebSocket(_transport, localAddress, remoteAddress) : null;
+ }
+ };
+
+ _server.setHandler(wshandler);
+ try
+ {
+ _server.start();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ @Override
+ public void close()
+ {
+
+ }
+
+ private class AmqpWebSocket implements WebSocket,WebSocket.OnBinaryMessage
+ {
+ private final SocketAddress _localAddress;
+ private final SocketAddress _remoteAddress;
+ private Connection _connection;
+ private final Transport _transport;
+ private ProtocolEngine _engine;
+
+ private AmqpWebSocket(final Transport transport,
+ final SocketAddress localAddress,
+ final SocketAddress remoteAddress)
+ {
+ _transport = transport;
+ _localAddress = localAddress;
+ _remoteAddress = remoteAddress;
+ }
+
+ @Override
+ public void onMessage(final byte[] data, final int offset, final int length)
+ {
+ _engine.received(ByteBuffer.wrap(data, offset, length).slice());
+ }
+
+ @Override
+ public void onOpen(final Connection connection)
+ {
+ _connection = connection;
+
+ _engine = _factory.newProtocolEngine();
+
+ final NetworkConnection connectionWrapper = new ConnectionWrapper(connection, _localAddress, _remoteAddress);
+ _engine.setNetworkConnection(connectionWrapper, connectionWrapper.getSender());
+
+ }
+
+ @Override
+ public void onClose(final int closeCode, final String message)
+ {
+ _engine.closed();
+ }
+ }
+
+ private class ConnectionWrapper implements NetworkConnection, Sender<ByteBuffer>
+ {
+ private final WebSocket.Connection _connection;
+ private final SocketAddress _localAddress;
+ private final SocketAddress _remoteAddress;
+ private Principal _principal;
+ private int _maxWriteIdle;
+ private int _maxReadIdle;
+
+ public ConnectionWrapper(final WebSocket.Connection connection,
+ final SocketAddress localAddress,
+ final SocketAddress remoteAddress)
+ {
+ _connection = connection;
+ _localAddress = localAddress;
+ _remoteAddress = remoteAddress;
+ }
+
+ @Override
+ public Sender<ByteBuffer> getSender()
+ {
+ return this;
+ }
+
+ @Override
+ public void start()
+ {
+
+ }
+
+ @Override
+ public void setIdleTimeout(final int i)
+ {
+
+ }
+
+ @Override
+ public void send(final ByteBuffer msg)
+ {
+ try
+ {
+ _connection.sendMessage(msg.array(),msg.arrayOffset()+msg.position(),msg.remaining());
+ }
+ catch (IOException e)
+ {
+ close();
+ }
+ }
+
+ @Override
+ public void flush()
+ {
+
+ }
+
+ @Override
+ public void close()
+ {
+ _connection.close();
+ }
+
+ @Override
+ public SocketAddress getRemoteAddress()
+ {
+ return _remoteAddress;
+ }
+
+ @Override
+ public SocketAddress getLocalAddress()
+ {
+ return _localAddress;
+ }
+
+ @Override
+ public void setMaxWriteIdle(final int sec)
+ {
+ _maxWriteIdle = sec;
+ }
+
+ @Override
+ public void setMaxReadIdle(final int sec)
+ {
+ _maxReadIdle = sec;
+ }
+
+ @Override
+ public void setPeerPrincipal(final Principal principal)
+ {
+ _principal = principal;
+ }
+
+ @Override
+ public Principal getPeerPrincipal()
+ {
+ return _principal;
+ }
+
+ @Override
+ public int getMaxReadIdle()
+ {
+ return _maxReadIdle;
+ }
+
+ @Override
+ public int getMaxWriteIdle()
+ {
+ return _maxWriteIdle;
+ }
+ }
+}
diff --git a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProvider.java b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProvider.java
new file mode 100644
index 0000000000..02d1100315
--- /dev/null
+++ b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProvider.java
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport.websocket;
+
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.protocol.AmqpProtocolVersion;
+import org.apache.qpid.server.transport.AcceptingTransport;
+import org.apache.qpid.server.transport.TransportProvider;
+
+import javax.net.ssl.SSLContext;
+import java.util.Set;
+
+class WebSocketTransportProvider implements TransportProvider
+{
+ public WebSocketTransportProvider()
+ {
+ }
+
+ @Override
+ public AcceptingTransport createTransport(final Set<Transport> transports,
+ final SSLContext sslContext,
+ final Port port,
+ final Set<AmqpProtocolVersion> supported,
+ final AmqpProtocolVersion defaultSupportedProtocolReply)
+ {
+ return new WebSocketProvider(transports.iterator().next(),
+ sslContext,
+ port,
+ supported,
+ defaultSupportedProtocolReply);
+ }
+}
diff --git a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProviderFactory.java b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProviderFactory.java
new file mode 100644
index 0000000000..662f16ce5b
--- /dev/null
+++ b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProviderFactory.java
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport.websocket;
+
+import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.plugin.TransportProviderFactory;
+import org.apache.qpid.server.transport.TransportProvider;
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Set;
+
+public class WebSocketTransportProviderFactory implements TransportProviderFactory
+{
+
+ private static final String TYPE = "Websocket";
+
+ @Override
+ public Set<Set<Transport>> getSupportedTransports()
+ {
+ return Collections.singleton((Set<Transport>)EnumSet.of(Transport.WS));
+ }
+
+ @Override
+ public TransportProvider getTransportProvider(final Set<Transport> transports)
+ {
+ return new WebSocketTransportProvider();
+ }
+
+ @Override
+ public String getType()
+ {
+ return TYPE;
+ }
+}
diff --git a/qpid/java/broker-plugins/websocket/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.TransportProviderFactory b/qpid/java/broker-plugins/websocket/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.TransportProviderFactory
new file mode 100644
index 0000000000..55b88cc7be
--- /dev/null
+++ b/qpid/java/broker-plugins/websocket/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.TransportProviderFactory
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.qpid.server.transport.websocket.WebSocketTransportProviderFactory \ No newline at end of file