diff options
author | Pierre Ossman <ossman@cendio.se> | 2017-02-08 15:45:48 +0100 |
---|---|---|
committer | Pierre Ossman <ossman@cendio.se> | 2017-02-08 16:05:16 +0100 |
commit | c7bde00a4e34d7194242bbe61da540394ec2a8c6 (patch) | |
tree | a5739e49ff53abd2d11f95d47d7e279e0aaf2fa0 /tests | |
parent | 94783ea0cd3a374410be61283f538c93cad1650e (diff) | |
download | websockify-c7bde00a4e34d7194242bbe61da540394ec2a8c6.tar.gz |
Force choice of sub-protocol
The WebSocket standard require us to choose one of the protocols
supported by the client. Enforce this with a specific check in the
base class rather than relying on generous clients.
Diffstat (limited to 'tests')
-rw-r--r-- | tests/test_websocket.py | 98 |
1 files changed, 98 insertions, 0 deletions
diff --git a/tests/test_websocket.py b/tests/test_websocket.py index 77d0eca..49cc620 100644 --- a/tests/test_websocket.py +++ b/tests/test_websocket.py @@ -18,6 +18,104 @@ import unittest from websockify import websocket +class FakeSocket: + def __init__(self): + self.data = b'' + + def send(self, buf): + self.data += buf + return len(buf) + +class AcceptTestCase(unittest.TestCase): + def test_success(self): + ws = websocket.WebSocket() + sock = FakeSocket() + ws.accept(sock, {'upgrade': 'websocket', + 'Sec-WebSocket-Version': '13', + 'Sec-WebSocket-Key': 'DKURYVK9cRFul1vOZVA56Q=='}) + self.assertEqual(sock.data[:13], b'HTTP/1.1 101 ') + self.assertIn(b'\r\nUpgrade: websocket\r\n', sock.data) + self.assertIn(b'\r\nConnection: Upgrade\r\n', sock.data) + self.assertIn(b'\r\nSec-WebSocket-Accept: pczpYSQsvE1vBpTQYjFQPcuoj6M=\r\n', sock.data) + + def test_bad_version(self): + ws = websocket.WebSocket() + sock = FakeSocket() + self.assertRaises(Exception, ws.accept, + sock, {'upgrade': 'websocket', + 'Sec-WebSocket-Key': 'DKURYVK9cRFul1vOZVA56Q=='}) + self.assertRaises(Exception, ws.accept, + sock, {'upgrade': 'websocket', + 'Sec-WebSocket-Version': '5', + 'Sec-WebSocket-Key': 'DKURYVK9cRFul1vOZVA56Q=='}) + self.assertRaises(Exception, ws.accept, + sock, {'upgrade': 'websocket', + 'Sec-WebSocket-Version': '20', + 'Sec-WebSocket-Key': 'DKURYVK9cRFul1vOZVA56Q=='}) + + def test_bad_upgrade(self): + ws = websocket.WebSocket() + sock = FakeSocket() + self.assertRaises(Exception, ws.accept, + sock, {'Sec-WebSocket-Version': '13', + 'Sec-WebSocket-Key': 'DKURYVK9cRFul1vOZVA56Q=='}) + self.assertRaises(Exception, ws.accept, + sock, {'upgrade': 'websocket2', + 'Sec-WebSocket-Version': '13', + 'Sec-WebSocket-Key': 'DKURYVK9cRFul1vOZVA56Q=='}) + + def test_missing_key(self): + ws = websocket.WebSocket() + sock = FakeSocket() + self.assertRaises(Exception, ws.accept, + sock, {'upgrade': 'websocket', + 'Sec-WebSocket-Version': '13'}) + + def test_protocol(self): + class ProtoSocket(websocket.WebSocket): + def select_subprotocol(self, protocol): + return 'gazonk' + + ws = ProtoSocket() + sock = FakeSocket() + ws.accept(sock, {'upgrade': 'websocket', + 'Sec-WebSocket-Version': '13', + 'Sec-WebSocket-Key': 'DKURYVK9cRFul1vOZVA56Q==', + 'Sec-WebSocket-Protocol': 'foobar gazonk'}) + self.assertEqual(sock.data[:13], b'HTTP/1.1 101 ') + self.assertIn(b'\r\nSec-WebSocket-Protocol: gazonk\r\n', sock.data) + + def test_no_protocol(self): + ws = websocket.WebSocket() + sock = FakeSocket() + ws.accept(sock, {'upgrade': 'websocket', + 'Sec-WebSocket-Version': '13', + 'Sec-WebSocket-Key': 'DKURYVK9cRFul1vOZVA56Q=='}) + self.assertEqual(sock.data[:13], b'HTTP/1.1 101 ') + self.assertNotIn(b'\r\nSec-WebSocket-Protocol:', sock.data) + + def test_missing_protocol(self): + ws = websocket.WebSocket() + sock = FakeSocket() + self.assertRaises(Exception, ws.accept, + sock, {'upgrade': 'websocket', + 'Sec-WebSocket-Version': '13', + 'Sec-WebSocket-Key': 'DKURYVK9cRFul1vOZVA56Q==', + 'Sec-WebSocket-Protocol': 'foobar gazonk'}) + + def test_protocol(self): + class ProtoSocket(websocket.WebSocket): + def select_subprotocol(self, protocol): + return 'oddball' + + ws = ProtoSocket() + sock = FakeSocket() + self.assertRaises(Exception, ws.accept, + sock, {'upgrade': 'websocket', + 'Sec-WebSocket-Version': '13', + 'Sec-WebSocket-Key': 'DKURYVK9cRFul1vOZVA56Q==', + 'Sec-WebSocket-Protocol': 'foobar gazonk'}) + class HyBiEncodeDecodeTestCase(unittest.TestCase): def test_decode_hybi_text(self): buf = b'\x81\x85\x37\xfa\x21\x3d\x7f\x9f\x4d\x51\x58' |