diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-01-09 16:53:51 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-01-09 16:53:51 +0000 |
| commit | 30d213dc1e6d743f2f0abb44c8bc91868d5126b1 (patch) | |
| tree | f3d16257ed0a431f2f4c43166f4df84ccb877a6c /qpid/java/broker-plugins/websocket/src | |
| parent | b165cf52a4ef16ac5a5ee181d4da2db351f7882d (diff) | |
| download | qpid-python-30d213dc1e6d743f2f0abb44c8bc91868d5126b1.tar.gz | |
QPID-5459 : Add WebSocket transport support to the Java Broker and AMQP 1-0 JMS client
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1556873 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins/websocket/src')
4 files changed, 417 insertions, 0 deletions
diff --git a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java new file mode 100644 index 0000000000..b44ed70040 --- /dev/null +++ b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java @@ -0,0 +1,294 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transport.websocket; + +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.protocol.AmqpProtocolVersion; +import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory; +import org.apache.qpid.server.transport.AcceptingTransport; +import org.apache.qpid.transport.Binary; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.network.NetworkConnection; +import org.eclipse.jetty.server.Connector; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.nio.SelectChannelConnector; +import org.eclipse.jetty.server.ssl.SslSocketConnector; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.websocket.WebSocket; +import org.eclipse.jetty.websocket.WebSocketHandler; + +import javax.net.ssl.SSLContext; +import javax.servlet.http.HttpServletRequest; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.security.Principal; +import java.util.Set; + +class WebSocketProvider implements AcceptingTransport +{ + public static final String AMQP_WEBSOCKET_SUBPROTOCOL = "AMQPWSB10"; + private final Transport _transport; + private final SSLContext _sslContext; + private final Port _port; + private final Set<AmqpProtocolVersion> _supported; + private final AmqpProtocolVersion _defaultSupportedProtocolReply; + private final ProtocolEngineFactory _factory; + private Server _server; + + WebSocketProvider(final Transport transport, + final SSLContext sslContext, + final Port port, + final Set<AmqpProtocolVersion> supported, + final AmqpProtocolVersion defaultSupportedProtocolReply) + { + _transport = transport; + _sslContext = sslContext; + _port = port; + _supported = supported; + _defaultSupportedProtocolReply = defaultSupportedProtocolReply; + _factory = new MultiVersionProtocolEngineFactory( + _port.getParent(Broker.class), null, + (Boolean)_port.getAttribute(Port.WANT_CLIENT_AUTH), + (Boolean)_port.getAttribute(Port.NEED_CLIENT_AUTH), + _supported, + _defaultSupportedProtocolReply, + _port, + _transport); + + } + + @Override + public void start() + { + _server = new Server(); + + Connector connector = null; + + + if (_transport == Transport.WS) + { + connector = new SelectChannelConnector(); + } + else if (_transport == Transport.WSS) + { + SslContextFactory factory = new SslContextFactory(); + factory.setSslContext(_sslContext); + connector = new SslSocketConnector(factory); + } + else + { + throw new IllegalArgumentException("Unexpected transport on port " + _port.getName() + ":" + _transport); + } + String bindingAddress = _port.getBindingAddress(); + if(bindingAddress != null && !bindingAddress.trim().equals("") && !bindingAddress.trim().equals("*")) + { + connector.setHost(bindingAddress.trim()); + } + connector.setPort(_port.getPort()); + _server.addConnector(connector); + + WebSocketHandler wshandler = new WebSocketHandler() + { + @Override + public WebSocket doWebSocketConnect(final HttpServletRequest request, final String protocol) + { + SocketAddress remoteAddress = new InetSocketAddress(request.getRemoteHost(), request.getRemotePort()); + SocketAddress localAddress = new InetSocketAddress(request.getLocalName(), request.getLocalPort()); + return AMQP_WEBSOCKET_SUBPROTOCOL.equals(protocol) ? new AmqpWebSocket(_transport, localAddress, remoteAddress) : null; + } + }; + + _server.setHandler(wshandler); + try + { + _server.start(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + + } + + @Override + public void close() + { + + } + + private class AmqpWebSocket implements WebSocket,WebSocket.OnBinaryMessage + { + private final SocketAddress _localAddress; + private final SocketAddress _remoteAddress; + private Connection _connection; + private final Transport _transport; + private ProtocolEngine _engine; + + private AmqpWebSocket(final Transport transport, + final SocketAddress localAddress, + final SocketAddress remoteAddress) + { + _transport = transport; + _localAddress = localAddress; + _remoteAddress = remoteAddress; + } + + @Override + public void onMessage(final byte[] data, final int offset, final int length) + { + _engine.received(ByteBuffer.wrap(data, offset, length).slice()); + } + + @Override + public void onOpen(final Connection connection) + { + _connection = connection; + + _engine = _factory.newProtocolEngine(); + + final NetworkConnection connectionWrapper = new ConnectionWrapper(connection, _localAddress, _remoteAddress); + _engine.setNetworkConnection(connectionWrapper, connectionWrapper.getSender()); + + } + + @Override + public void onClose(final int closeCode, final String message) + { + _engine.closed(); + } + } + + private class ConnectionWrapper implements NetworkConnection, Sender<ByteBuffer> + { + private final WebSocket.Connection _connection; + private final SocketAddress _localAddress; + private final SocketAddress _remoteAddress; + private Principal _principal; + private int _maxWriteIdle; + private int _maxReadIdle; + + public ConnectionWrapper(final WebSocket.Connection connection, + final SocketAddress localAddress, + final SocketAddress remoteAddress) + { + _connection = connection; + _localAddress = localAddress; + _remoteAddress = remoteAddress; + } + + @Override + public Sender<ByteBuffer> getSender() + { + return this; + } + + @Override + public void start() + { + + } + + @Override + public void setIdleTimeout(final int i) + { + + } + + @Override + public void send(final ByteBuffer msg) + { + try + { + _connection.sendMessage(msg.array(),msg.arrayOffset()+msg.position(),msg.remaining()); + } + catch (IOException e) + { + close(); + } + } + + @Override + public void flush() + { + + } + + @Override + public void close() + { + _connection.close(); + } + + @Override + public SocketAddress getRemoteAddress() + { + return _remoteAddress; + } + + @Override + public SocketAddress getLocalAddress() + { + return _localAddress; + } + + @Override + public void setMaxWriteIdle(final int sec) + { + _maxWriteIdle = sec; + } + + @Override + public void setMaxReadIdle(final int sec) + { + _maxReadIdle = sec; + } + + @Override + public void setPeerPrincipal(final Principal principal) + { + _principal = principal; + } + + @Override + public Principal getPeerPrincipal() + { + return _principal; + } + + @Override + public int getMaxReadIdle() + { + return _maxReadIdle; + } + + @Override + public int getMaxWriteIdle() + { + return _maxWriteIdle; + } + } +} diff --git a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProvider.java b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProvider.java new file mode 100644 index 0000000000..02d1100315 --- /dev/null +++ b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProvider.java @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transport.websocket; + +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.protocol.AmqpProtocolVersion; +import org.apache.qpid.server.transport.AcceptingTransport; +import org.apache.qpid.server.transport.TransportProvider; + +import javax.net.ssl.SSLContext; +import java.util.Set; + +class WebSocketTransportProvider implements TransportProvider +{ + public WebSocketTransportProvider() + { + } + + @Override + public AcceptingTransport createTransport(final Set<Transport> transports, + final SSLContext sslContext, + final Port port, + final Set<AmqpProtocolVersion> supported, + final AmqpProtocolVersion defaultSupportedProtocolReply) + { + return new WebSocketProvider(transports.iterator().next(), + sslContext, + port, + supported, + defaultSupportedProtocolReply); + } +} diff --git a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProviderFactory.java b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProviderFactory.java new file mode 100644 index 0000000000..662f16ce5b --- /dev/null +++ b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProviderFactory.java @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transport.websocket; + +import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.plugin.TransportProviderFactory; +import org.apache.qpid.server.transport.TransportProvider; + +import java.util.Collections; +import java.util.EnumSet; +import java.util.Set; + +public class WebSocketTransportProviderFactory implements TransportProviderFactory +{ + + private static final String TYPE = "Websocket"; + + @Override + public Set<Set<Transport>> getSupportedTransports() + { + return Collections.singleton((Set<Transport>)EnumSet.of(Transport.WS)); + } + + @Override + public TransportProvider getTransportProvider(final Set<Transport> transports) + { + return new WebSocketTransportProvider(); + } + + @Override + public String getType() + { + return TYPE; + } +} diff --git a/qpid/java/broker-plugins/websocket/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.TransportProviderFactory b/qpid/java/broker-plugins/websocket/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.TransportProviderFactory new file mode 100644 index 0000000000..55b88cc7be --- /dev/null +++ b/qpid/java/broker-plugins/websocket/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.TransportProviderFactory @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.qpid.server.transport.websocket.WebSocketTransportProviderFactory
\ No newline at end of file |
