From 30d213dc1e6d743f2f0abb44c8bc91868d5126b1 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Thu, 9 Jan 2014 16:53:51 +0000 Subject: QPID-5459 : Add WebSocket transport support to the Java Broker and AMQP 1-0 JMS client git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1556873 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/java/broker-plugins/websocket/build.xml | 32 +++ qpid/java/broker-plugins/websocket/pom.xml | 158 +++++++++++ .../transport/websocket/WebSocketProvider.java | 294 +++++++++++++++++++++ .../websocket/WebSocketTransportProvider.java | 51 ++++ .../WebSocketTransportProviderFactory.java | 53 ++++ ...che.qpid.server.plugin.TransportProviderFactory | 19 ++ 6 files changed, 607 insertions(+) create mode 100644 qpid/java/broker-plugins/websocket/build.xml create mode 100644 qpid/java/broker-plugins/websocket/pom.xml create mode 100644 qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java create mode 100644 qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProvider.java create mode 100644 qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProviderFactory.java create mode 100644 qpid/java/broker-plugins/websocket/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.TransportProviderFactory (limited to 'qpid/java/broker-plugins') diff --git a/qpid/java/broker-plugins/websocket/build.xml b/qpid/java/broker-plugins/websocket/build.xml new file mode 100644 index 0000000000..fc3dd3b846 --- /dev/null +++ b/qpid/java/broker-plugins/websocket/build.xml @@ -0,0 +1,32 @@ + + + + + + + + + + + + + + + diff --git a/qpid/java/broker-plugins/websocket/pom.xml b/qpid/java/broker-plugins/websocket/pom.xml new file mode 100644 index 0000000000..2029bd33aa --- /dev/null +++ b/qpid/java/broker-plugins/websocket/pom.xml @@ -0,0 +1,158 @@ + + + + + qpid-project + org.apache.qpid + 0.26-SNAPSHOT + ../../pom.xml + + 4.0.0 + + qpid-broker-plugins-websocket + + + + org.apache.qpid + qpid-broker-core + 0.26-SNAPSHOT + provided + + + + org.apache.geronimo.specs + geronimo-servlet_2.5_spec + 1.2 + compile + + + + org.eclipse.jetty + jetty-server + 7.6.10.v20130312 + compile + + + org.eclipse.jetty.orbit + javax.servlet + + + org.eclipse.jetty + jetty-continuation + + + org.eclipse.jetty + jetty-http + + + + + + org.eclipse.jetty + jetty-continuation + 7.6.10.v20130312 + compile + + + + org.eclipse.jetty + jetty-security + 7.6.10.v20130312 + compile + + + org.eclipse.jetty + jetty-server + + + + + + org.eclipse.jetty + jetty-http + 7.6.10.v20130312 + compile + + + org.eclipse.jetty + jetty-io + + + + + + org.eclipse.jetty + jetty-io + 7.6.10.v20130312 + compile + + + org.eclipse.jetty + jetty-util + + + + + + org.eclipse.jetty + jetty-servlet + 7.6.10.v20130312 + compile + + + org.eclipse.jetty + jetty-security + + + + + + org.eclipse.jetty + jetty-util + 7.6.10.v20130312 + compile + + + + org.eclipse.jetty + jetty-websocket + 7.6.10.v20130312 + compile + + + org.eclipse.jetty + jetty-util + + + org.eclipse.jetty + jetty-io + + + org.eclipse.jetty + jetty-http + + + + + + + + + \ No newline at end of file diff --git a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java new file mode 100644 index 0000000000..b44ed70040 --- /dev/null +++ b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java @@ -0,0 +1,294 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transport.websocket; + +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.protocol.AmqpProtocolVersion; +import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory; +import org.apache.qpid.server.transport.AcceptingTransport; +import org.apache.qpid.transport.Binary; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.network.NetworkConnection; +import org.eclipse.jetty.server.Connector; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.nio.SelectChannelConnector; +import org.eclipse.jetty.server.ssl.SslSocketConnector; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.websocket.WebSocket; +import org.eclipse.jetty.websocket.WebSocketHandler; + +import javax.net.ssl.SSLContext; +import javax.servlet.http.HttpServletRequest; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.security.Principal; +import java.util.Set; + +class WebSocketProvider implements AcceptingTransport +{ + public static final String AMQP_WEBSOCKET_SUBPROTOCOL = "AMQPWSB10"; + private final Transport _transport; + private final SSLContext _sslContext; + private final Port _port; + private final Set _supported; + private final AmqpProtocolVersion _defaultSupportedProtocolReply; + private final ProtocolEngineFactory _factory; + private Server _server; + + WebSocketProvider(final Transport transport, + final SSLContext sslContext, + final Port port, + final Set supported, + final AmqpProtocolVersion defaultSupportedProtocolReply) + { + _transport = transport; + _sslContext = sslContext; + _port = port; + _supported = supported; + _defaultSupportedProtocolReply = defaultSupportedProtocolReply; + _factory = new MultiVersionProtocolEngineFactory( + _port.getParent(Broker.class), null, + (Boolean)_port.getAttribute(Port.WANT_CLIENT_AUTH), + (Boolean)_port.getAttribute(Port.NEED_CLIENT_AUTH), + _supported, + _defaultSupportedProtocolReply, + _port, + _transport); + + } + + @Override + public void start() + { + _server = new Server(); + + Connector connector = null; + + + if (_transport == Transport.WS) + { + connector = new SelectChannelConnector(); + } + else if (_transport == Transport.WSS) + { + SslContextFactory factory = new SslContextFactory(); + factory.setSslContext(_sslContext); + connector = new SslSocketConnector(factory); + } + else + { + throw new IllegalArgumentException("Unexpected transport on port " + _port.getName() + ":" + _transport); + } + String bindingAddress = _port.getBindingAddress(); + if(bindingAddress != null && !bindingAddress.trim().equals("") && !bindingAddress.trim().equals("*")) + { + connector.setHost(bindingAddress.trim()); + } + connector.setPort(_port.getPort()); + _server.addConnector(connector); + + WebSocketHandler wshandler = new WebSocketHandler() + { + @Override + public WebSocket doWebSocketConnect(final HttpServletRequest request, final String protocol) + { + SocketAddress remoteAddress = new InetSocketAddress(request.getRemoteHost(), request.getRemotePort()); + SocketAddress localAddress = new InetSocketAddress(request.getLocalName(), request.getLocalPort()); + return AMQP_WEBSOCKET_SUBPROTOCOL.equals(protocol) ? new AmqpWebSocket(_transport, localAddress, remoteAddress) : null; + } + }; + + _server.setHandler(wshandler); + try + { + _server.start(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + + } + + @Override + public void close() + { + + } + + private class AmqpWebSocket implements WebSocket,WebSocket.OnBinaryMessage + { + private final SocketAddress _localAddress; + private final SocketAddress _remoteAddress; + private Connection _connection; + private final Transport _transport; + private ProtocolEngine _engine; + + private AmqpWebSocket(final Transport transport, + final SocketAddress localAddress, + final SocketAddress remoteAddress) + { + _transport = transport; + _localAddress = localAddress; + _remoteAddress = remoteAddress; + } + + @Override + public void onMessage(final byte[] data, final int offset, final int length) + { + _engine.received(ByteBuffer.wrap(data, offset, length).slice()); + } + + @Override + public void onOpen(final Connection connection) + { + _connection = connection; + + _engine = _factory.newProtocolEngine(); + + final NetworkConnection connectionWrapper = new ConnectionWrapper(connection, _localAddress, _remoteAddress); + _engine.setNetworkConnection(connectionWrapper, connectionWrapper.getSender()); + + } + + @Override + public void onClose(final int closeCode, final String message) + { + _engine.closed(); + } + } + + private class ConnectionWrapper implements NetworkConnection, Sender + { + private final WebSocket.Connection _connection; + private final SocketAddress _localAddress; + private final SocketAddress _remoteAddress; + private Principal _principal; + private int _maxWriteIdle; + private int _maxReadIdle; + + public ConnectionWrapper(final WebSocket.Connection connection, + final SocketAddress localAddress, + final SocketAddress remoteAddress) + { + _connection = connection; + _localAddress = localAddress; + _remoteAddress = remoteAddress; + } + + @Override + public Sender getSender() + { + return this; + } + + @Override + public void start() + { + + } + + @Override + public void setIdleTimeout(final int i) + { + + } + + @Override + public void send(final ByteBuffer msg) + { + try + { + _connection.sendMessage(msg.array(),msg.arrayOffset()+msg.position(),msg.remaining()); + } + catch (IOException e) + { + close(); + } + } + + @Override + public void flush() + { + + } + + @Override + public void close() + { + _connection.close(); + } + + @Override + public SocketAddress getRemoteAddress() + { + return _remoteAddress; + } + + @Override + public SocketAddress getLocalAddress() + { + return _localAddress; + } + + @Override + public void setMaxWriteIdle(final int sec) + { + _maxWriteIdle = sec; + } + + @Override + public void setMaxReadIdle(final int sec) + { + _maxReadIdle = sec; + } + + @Override + public void setPeerPrincipal(final Principal principal) + { + _principal = principal; + } + + @Override + public Principal getPeerPrincipal() + { + return _principal; + } + + @Override + public int getMaxReadIdle() + { + return _maxReadIdle; + } + + @Override + public int getMaxWriteIdle() + { + return _maxWriteIdle; + } + } +} diff --git a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProvider.java b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProvider.java new file mode 100644 index 0000000000..02d1100315 --- /dev/null +++ b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProvider.java @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transport.websocket; + +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.protocol.AmqpProtocolVersion; +import org.apache.qpid.server.transport.AcceptingTransport; +import org.apache.qpid.server.transport.TransportProvider; + +import javax.net.ssl.SSLContext; +import java.util.Set; + +class WebSocketTransportProvider implements TransportProvider +{ + public WebSocketTransportProvider() + { + } + + @Override + public AcceptingTransport createTransport(final Set transports, + final SSLContext sslContext, + final Port port, + final Set supported, + final AmqpProtocolVersion defaultSupportedProtocolReply) + { + return new WebSocketProvider(transports.iterator().next(), + sslContext, + port, + supported, + defaultSupportedProtocolReply); + } +} diff --git a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProviderFactory.java b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProviderFactory.java new file mode 100644 index 0000000000..662f16ce5b --- /dev/null +++ b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProviderFactory.java @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transport.websocket; + +import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.plugin.TransportProviderFactory; +import org.apache.qpid.server.transport.TransportProvider; + +import java.util.Collections; +import java.util.EnumSet; +import java.util.Set; + +public class WebSocketTransportProviderFactory implements TransportProviderFactory +{ + + private static final String TYPE = "Websocket"; + + @Override + public Set> getSupportedTransports() + { + return Collections.singleton((Set)EnumSet.of(Transport.WS)); + } + + @Override + public TransportProvider getTransportProvider(final Set transports) + { + return new WebSocketTransportProvider(); + } + + @Override + public String getType() + { + return TYPE; + } +} diff --git a/qpid/java/broker-plugins/websocket/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.TransportProviderFactory b/qpid/java/broker-plugins/websocket/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.TransportProviderFactory new file mode 100644 index 0000000000..55b88cc7be --- /dev/null +++ b/qpid/java/broker-plugins/websocket/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.TransportProviderFactory @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.qpid.server.transport.websocket.WebSocketTransportProviderFactory \ No newline at end of file -- cgit v1.2.1