summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorUXabre <arend.lapere@gmail.com>2019-01-17 08:53:01 -0500
committerUXabre <you@example.com>2019-02-15 03:45:57 -0500
commitf2031eff057cb0a43cdebca9688cdb5706088421 (patch)
tree04938a501b39ffca9ce360a2e3684de22f4ed8d8 /tests
parentf0bdb0a621a4f3fb328d1410adfeaff76f088bfd (diff)
downloadwebsockify-f2031eff057cb0a43cdebca9688cdb5706088421.tar.gz
Added JWT/JWS/JWE tokens capability
Diffstat (limited to 'tests')
-rw-r--r--tests/fixtures/private.pem27
-rw-r--r--tests/fixtures/public.pem9
-rw-r--r--tests/fixtures/symmetric.key1
-rw-r--r--tests/test_websocketproxy.py94
4 files changed, 131 insertions, 0 deletions
diff --git a/tests/fixtures/private.pem b/tests/fixtures/private.pem
new file mode 100644
index 0000000..413cee2
--- /dev/null
+++ b/tests/fixtures/private.pem
@@ -0,0 +1,27 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEowIBAAKCAQEArwNQal2INbSfoVT50dZ0s8lQ+yMhu45TDc91iuwtDjlFBJ50
+E4m3/M6ESBW0S7UTP1bIOGkd/M+u38h0Aruo4qkngdguu9N3BnsU2kOeicdjxc+v
+tqRc7/kbkTdT4SrpG8EFP6T2U9U1gtBpLnau02gPrzjaQzyYDLGOBq+Ozt/mN0YJ
+UhJ3hlwi18dNKreTzWgJ6mmXQWS0eAmHx8TIs2Nz9x3EfRo9CIMuaaeUjRogIEg5
+Tg4xC00ZtDO0/EfgpFkeHJGVQA2DgdBJsr6rq69MjhMfFxRJItxJMJzP6an2HkJ8
+onUPBtjEBmk3/fnfiaflzRyEb5zdii6r2TD8TwIDAQABAoIBAGDrzu742WQUoYqx
+CqDAyWR/is9px1adHTW6vHexD8qewLAsKFBhpnjkzbE2A+EhaIVdRAipfifxxAC+
+fDC/SGouD2kDFe6Cz5nRM90kMXpP59s2hzL4l1d2d2PWZid+ohXysTtr2dbXbokB
+bh6DL5J4QKdjLsypk/MDqYneU5IQ1k9ezWzcRgM8/V3M+t+1dLRFLIWsSLbNUgbF
+px81efNw8E0voV/d7kZ+6RwUThPHqR0eyLm6djPwHE7/FarZIx4AImwV+9ex44CH
+OkrTFOVYenF6jEtYoUuqYCouaWtG7jNVM/f1fksoR8SD6PTq2vn7F4wTLXG1b+K7
+45PKMhECgYEA22NH8mK9ICFVd7S6caeAnki+K9cpwmiOUWEIuDInmPn5FOlv7awE
+uBFN86v14PqDBtF7Aagyib0NUPo7rIw5+V5SCBZv8gQatjZUkT0vZvpxPU5jmB++
+w58yfK7zgdAWCepLxIPyTA7CAT1dmiVmuosz2pJjbo4fecVG222IE10CgYEAzDg+
+RVlvMYGy04UMmUoUNeeRlW6km/W6dqQ7EtcxfDv4O7boRDTBSRBzfIsRdXHZhcHN
+gCeB2Uiz8IO3s0Yt0+y/6cTI60uJ4S7Mb2JvWJvDCKWhS3pE1BL+LJJC4Hn7khJH
+yHYFOLOfnuCbOs8VA7IMmbdTPHirIKWTT5j5H5sCgYEAygK/KweUUlOfWVyHGUQ9
+gIJG6iNzhlm0QmbxGnrET25N1t2kfNsadUsp1igPfhvuLocRltMDxiTYcCoabKWq
+dF5PdrcCWX1CA2o/sIUAcvhE8UiPGHKSu5qJaJnIC05KHNMq9UbyAurL5UxWNiwe
+TcMD+k01VYV0ojHvLvnKhNkCgYArkoh+xXE7D+A2zzl771lWkvz19DB88jYBoFLW
+V0HArw7str7h5pui2ja5yPZFp6/woQQWptdGpAN4erIUNxIKGIZt+0WfJnPZruGB
+lnAJaNp5GtXKQ+ExmofOvLo2KPCrHulf9QZyLakN/gBA0PQ74J5docbJrTld8tX2
+cr4cpwKBgHqr2zybmywAmjn8wY0bUjRAyhdN8eiwYaGPtOSFt6IcWxEnNbAo5Jc2
+KsywpagjFsXZsi4Obn2XsqR7VX5bNbpNXIyLaMwBOy7MixyecgPF8tu7I4zo/CWm
+7gewTKBhwVPTDAOzHqIpJGrOnUgzJM3ijkCWMn3eAh4ccOjsrKq9
+-----END RSA PRIVATE KEY----- \ No newline at end of file
diff --git a/tests/fixtures/public.pem b/tests/fixtures/public.pem
new file mode 100644
index 0000000..7b1d284
--- /dev/null
+++ b/tests/fixtures/public.pem
@@ -0,0 +1,9 @@
+-----BEGIN PUBLIC KEY-----
+MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEArwNQal2INbSfoVT50dZ0
+s8lQ+yMhu45TDc91iuwtDjlFBJ50E4m3/M6ESBW0S7UTP1bIOGkd/M+u38h0Aruo
+4qkngdguu9N3BnsU2kOeicdjxc+vtqRc7/kbkTdT4SrpG8EFP6T2U9U1gtBpLnau
+02gPrzjaQzyYDLGOBq+Ozt/mN0YJUhJ3hlwi18dNKreTzWgJ6mmXQWS0eAmHx8TI
+s2Nz9x3EfRo9CIMuaaeUjRogIEg5Tg4xC00ZtDO0/EfgpFkeHJGVQA2DgdBJsr6r
+q69MjhMfFxRJItxJMJzP6an2HkJ8onUPBtjEBmk3/fnfiaflzRyEb5zdii6r2TD8
+TwIDAQAB
+-----END PUBLIC KEY-----
diff --git a/tests/fixtures/symmetric.key b/tests/fixtures/symmetric.key
new file mode 100644
index 0000000..668b39c
--- /dev/null
+++ b/tests/fixtures/symmetric.key
@@ -0,0 +1 @@
+secret_sauce \ No newline at end of file
diff --git a/tests/test_websocketproxy.py b/tests/test_websocketproxy.py
index 5bd3a16..c0a8d93 100644
--- a/tests/test_websocketproxy.py
+++ b/tests/test_websocketproxy.py
@@ -16,6 +16,7 @@
""" Unit tests for websocketproxy """
+import sys
import unittest
import unittest
import socket
@@ -27,6 +28,9 @@ from websockify import websocketproxy
from websockify import token_plugins
from websockify import auth_plugins
+if sys.version_info >= (2,7):
+ from jwcrypto import jwt
+
try:
from StringIO import StringIO
BytesIO = StringIO
@@ -125,6 +129,96 @@ class ProxyRequestHandlerTestCase(unittest.TestCase):
self.assertEqual(self.handler.server.target_host, "somehost")
self.assertEqual(self.handler.server.target_port, "blah")
+ if sys.version_info >= (2,7):
+ def test_asymmetric_jws_token_plugin(self):
+ key = jwt.JWK()
+ private_key = open("./tests/fixtures/private.pem", "rb").read()
+ key.import_from_pem(private_key)
+ jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port"})
+ jwt_token.make_signed_token(key)
+ self.handler.path = "https://localhost:6080/websockify?token={jwt_token}".format(jwt_token=jwt_token.serialize())
+
+ self.stubs.Set(websocketproxy.ProxyRequestHandler, 'send_auth_error',
+ staticmethod(lambda *args, **kwargs: None))
+
+ self.handler.server.token_plugin = token_plugins.JWTTokenApi("./tests/fixtures/public.pem")
+ self.handler.validate_connection()
+
+ self.assertEqual(self.handler.server.target_host, "remote_host")
+ self.assertEqual(self.handler.server.target_port, "remote_port")
+
+ def test_asymmetric_jws_token_plugin_with_illigal_key_exception(self):
+ key = jwt.JWK()
+ private_key = open("./tests/fixtures/private.pem", "rb").read()
+ key.import_from_pem(private_key)
+ jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port"})
+ jwt_token.make_signed_token(key)
+ self.handler.path = "https://localhost:6080/websockify?token={jwt_token}".format(jwt_token=jwt_token.serialize())
+
+ self.stubs.Set(websocketproxy.ProxyRequestHandler, 'send_auth_error',
+ staticmethod(lambda *args, **kwargs: None))
+
+ self.handler.server.token_plugin = token_plugins.JWTTokenApi("wrong.pub")
+ self.assertRaises(self.handler.server.EClose,
+ self.handler.validate_connection)
+
+
+ def test_symmetric_jws_token_plugin(self):
+ secret = open("./tests/fixtures/symmetric.key").read()
+ key = jwt.JWK()
+ key.import_key(kty="oct",k=secret)
+ jwt_token = jwt.JWT({"alg": "HS256"}, {'host': "remote_host", 'port': "remote_port"})
+ jwt_token.make_signed_token(key)
+ self.handler.path = "https://localhost:6080/websockify?token={jwt_token}".format(jwt_token=jwt_token.serialize())
+
+ self.stubs.Set(websocketproxy.ProxyRequestHandler, 'send_auth_error',
+ staticmethod(lambda *args, **kwargs: None))
+
+ self.handler.server.token_plugin = token_plugins.JWTTokenApi("./tests/fixtures/symmetric.key")
+ self.handler.validate_connection()
+
+ self.assertEqual(self.handler.server.target_host, "remote_host")
+ self.assertEqual(self.handler.server.target_port, "remote_port")
+
+ def test_symmetric_jws_token_plugin_with_illigal_key_exception(self):
+ secret = open("./tests/fixtures/symmetric.key").read()
+ key = jwt.JWK()
+ key.import_key(kty="oct",k=secret)
+ jwt_token = jwt.JWT({"alg": "HS256"}, {'host': "remote_host", 'port': "remote_port"})
+ jwt_token.make_signed_token(key)
+ self.handler.path = "https://localhost:6080/websockify?token={jwt_token}".format(jwt_token=jwt_token.serialize())
+
+ self.stubs.Set(websocketproxy.ProxyRequestHandler, 'send_auth_error',
+ staticmethod(lambda *args, **kwargs: None))
+
+ self.handler.server.token_plugin = token_plugins.JWTTokenApi("wrong_sauce")
+ self.assertRaises(self.handler.server.EClose,
+ self.handler.validate_connection)
+
+ def test_asymmetric_jwe_token_plugin(self):
+ private_key = jwt.JWK()
+ public_key = jwt.JWK()
+ private_key_data = open("./tests/fixtures/private.pem", "rb").read()
+ public_key_data = open("./tests/fixtures/public.pem", "rb").read()
+ private_key.import_from_pem(private_key_data)
+ public_key.import_from_pem(public_key_data)
+ jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port"})
+ jwt_token.make_signed_token(private_key)
+ jwe_token = jwt.JWT(header={"alg": "RSA1_5", "enc": "A256CBC-HS512"},
+ claims=jwt_token.serialize())
+ jwe_token.make_encrypted_token(public_key)
+
+ self.handler.path = "https://localhost:6080/websockify?token={jwt_token}".format(jwt_token=jwe_token.serialize())
+
+ self.stubs.Set(websocketproxy.ProxyRequestHandler, 'send_auth_error',
+ staticmethod(lambda *args, **kwargs: None))
+
+ self.handler.server.token_plugin = token_plugins.JWTTokenApi("./tests/fixtures/private.pem")
+ self.handler.validate_connection()
+
+ self.assertEqual(self.handler.server.target_host, "remote_host")
+ self.assertEqual(self.handler.server.target_port, "remote_port")
+
def test_auth_plugin(self):
class TestPlugin(auth_plugins.BasePlugin):
def authenticate(self, headers, target_host, target_port):