summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPierre Ossman <ossman@cendio.se>2021-04-16 11:27:56 +0200
committerPierre Ossman <ossman@cendio.se>2021-04-16 11:27:56 +0200
commit984dcc62d3e2730997738d8086b3208427a6a661 (patch)
tree5e1564a8f46f76c9c18cd6b2a372f0b0fa031fd3
parentc5d365dd1dbfee89881f1c1c02a2ac64838d645f (diff)
downloadwebsockify-984dcc62d3e2730997738d8086b3208427a6a661.tar.gz
Move JWT token plugins tests to separate file
Let's try to match the test units with the modules we have.
-rw-r--r--tests/test_token_plugins.py135
-rw-r--r--tests/test_websocketproxy.py129
2 files changed, 135 insertions, 129 deletions
diff --git a/tests/test_token_plugins.py b/tests/test_token_plugins.py
new file mode 100644
index 0000000..5d63a2b
--- /dev/null
+++ b/tests/test_token_plugins.py
@@ -0,0 +1,135 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+""" Unit tests for Token plugins"""
+
+import unittest
+from unittest.mock import patch
+from jwcrypto import jwt
+
+from websockify.token_plugins import JWTTokenApi
+
+class JWSTokenTestCase(unittest.TestCase):
+ def test_asymmetric_jws_token_plugin(self):
+ plugin = JWTTokenApi("./tests/fixtures/public.pem")
+
+ key = jwt.JWK()
+ private_key = open("./tests/fixtures/private.pem", "rb").read()
+ key.import_from_pem(private_key)
+ jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port"})
+ jwt_token.make_signed_token(key)
+
+ result = plugin.lookup(jwt_token.serialize())
+
+ self.assertIsNotNone(result)
+ self.assertEqual(result[0], "remote_host")
+ self.assertEqual(result[1], "remote_port")
+
+ def test_asymmetric_jws_token_plugin_with_illigal_key_exception(self):
+ plugin = JWTTokenApi("wrong.pub")
+
+ key = jwt.JWK()
+ private_key = open("./tests/fixtures/private.pem", "rb").read()
+ key.import_from_pem(private_key)
+ jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port"})
+ jwt_token.make_signed_token(key)
+
+ result = plugin.lookup(jwt_token.serialize())
+
+ self.assertIsNone(result)
+
+ @patch('time.time')
+ def test_jwt_valid_time(self, mock_time):
+ plugin = JWTTokenApi("./tests/fixtures/public.pem")
+
+ key = jwt.JWK()
+ private_key = open("./tests/fixtures/private.pem", "rb").read()
+ key.import_from_pem(private_key)
+ jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port", 'nbf': 100, 'exp': 200 })
+ jwt_token.make_signed_token(key)
+ mock_time.return_value = 150
+
+ result = plugin.lookup(jwt_token.serialize())
+
+ self.assertIsNotNone(result)
+ self.assertEqual(result[0], "remote_host")
+ self.assertEqual(result[1], "remote_port")
+
+ @patch('time.time')
+ def test_jwt_early_time(self, mock_time):
+ plugin = JWTTokenApi("./tests/fixtures/public.pem")
+
+ key = jwt.JWK()
+ private_key = open("./tests/fixtures/private.pem", "rb").read()
+ key.import_from_pem(private_key)
+ jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port", 'nbf': 100, 'exp': 200 })
+ jwt_token.make_signed_token(key)
+ mock_time.return_value = 50
+
+ result = plugin.lookup(jwt_token.serialize())
+
+ self.assertIsNone(result)
+
+ @patch('time.time')
+ def test_jwt_late_time(self, mock_time):
+ plugin = JWTTokenApi("./tests/fixtures/public.pem")
+
+ key = jwt.JWK()
+ private_key = open("./tests/fixtures/private.pem", "rb").read()
+ key.import_from_pem(private_key)
+ jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port", 'nbf': 100, 'exp': 200 })
+ jwt_token.make_signed_token(key)
+ mock_time.return_value = 250
+
+ result = plugin.lookup(jwt_token.serialize())
+
+ self.assertIsNone(result)
+
+ def test_symmetric_jws_token_plugin(self):
+ plugin = JWTTokenApi("./tests/fixtures/symmetric.key")
+
+ secret = open("./tests/fixtures/symmetric.key").read()
+ key = jwt.JWK()
+ key.import_key(kty="oct",k=secret)
+ jwt_token = jwt.JWT({"alg": "HS256"}, {'host': "remote_host", 'port': "remote_port"})
+ jwt_token.make_signed_token(key)
+
+ result = plugin.lookup(jwt_token.serialize())
+
+ self.assertIsNotNone(result)
+ self.assertEqual(result[0], "remote_host")
+ self.assertEqual(result[1], "remote_port")
+
+ def test_symmetric_jws_token_plugin_with_illigal_key_exception(self):
+ plugin = JWTTokenApi("wrong_sauce")
+
+ secret = open("./tests/fixtures/symmetric.key").read()
+ key = jwt.JWK()
+ key.import_key(kty="oct",k=secret)
+ jwt_token = jwt.JWT({"alg": "HS256"}, {'host': "remote_host", 'port': "remote_port"})
+ jwt_token.make_signed_token(key)
+
+ result = plugin.lookup(jwt_token.serialize())
+
+ self.assertIsNone(result)
+
+ def test_asymmetric_jwe_token_plugin(self):
+ plugin = JWTTokenApi("./tests/fixtures/private.pem")
+
+ private_key = jwt.JWK()
+ public_key = jwt.JWK()
+ private_key_data = open("./tests/fixtures/private.pem", "rb").read()
+ public_key_data = open("./tests/fixtures/public.pem", "rb").read()
+ private_key.import_from_pem(private_key_data)
+ public_key.import_from_pem(public_key_data)
+ jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port"})
+ jwt_token.make_signed_token(private_key)
+ jwe_token = jwt.JWT(header={"alg": "RSA1_5", "enc": "A256CBC-HS512"},
+ claims=jwt_token.serialize())
+ jwe_token.make_encrypted_token(public_key)
+
+ result = plugin.lookup(jwt_token.serialize())
+
+ self.assertIsNotNone(result)
+ self.assertEqual(result[0], "remote_host")
+ self.assertEqual(result[1], "remote_port")
+
diff --git a/tests/test_websocketproxy.py b/tests/test_websocketproxy.py
index ffdecb5..a05e3b1 100644
--- a/tests/test_websocketproxy.py
+++ b/tests/test_websocketproxy.py
@@ -24,8 +24,6 @@ from io import StringIO
from io import BytesIO
from unittest.mock import patch, MagicMock
-from jwcrypto import jwt
-
from websockify import websocketproxy
from websockify import token_plugins
from websockify import auth_plugins
@@ -115,133 +113,6 @@ class ProxyRequestHandlerTestCase(unittest.TestCase):
self.assertEqual(self.handler.server.target_port, "blah")
@patch('websockify.websocketproxy.ProxyRequestHandler.send_auth_error', MagicMock())
- def test_asymmetric_jws_token_plugin(self):
- key = jwt.JWK()
- private_key = open("./tests/fixtures/private.pem", "rb").read()
- key.import_from_pem(private_key)
- jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port"})
- jwt_token.make_signed_token(key)
- self.handler.path = "https://localhost:6080/websockify?token={jwt_token}".format(jwt_token=jwt_token.serialize())
-
- self.handler.server.token_plugin = token_plugins.JWTTokenApi("./tests/fixtures/public.pem")
- self.handler.validate_connection()
-
- self.assertEqual(self.handler.server.target_host, "remote_host")
- self.assertEqual(self.handler.server.target_port, "remote_port")
-
- @patch('websockify.websocketproxy.ProxyRequestHandler.send_auth_error', MagicMock())
- def test_asymmetric_jws_token_plugin_with_illigal_key_exception(self):
- key = jwt.JWK()
- private_key = open("./tests/fixtures/private.pem", "rb").read()
- key.import_from_pem(private_key)
- jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port"})
- jwt_token.make_signed_token(key)
- self.handler.path = "https://localhost:6080/websockify?token={jwt_token}".format(jwt_token=jwt_token.serialize())
-
- self.handler.server.token_plugin = token_plugins.JWTTokenApi("wrong.pub")
- with self.assertRaises(self.handler.server.EClose):
- self.handler.validate_connection()
-
-
- @patch('websockify.websocketproxy.ProxyRequestHandler.send_auth_error', MagicMock())
- @patch('time.time')
- def test_jwt_valid_time(self, mock_time):
- key = jwt.JWK()
- private_key = open("./tests/fixtures/private.pem", "rb").read()
- key.import_from_pem(private_key)
- jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port", 'nbf': 100, 'exp': 200 })
- jwt_token.make_signed_token(key)
- self.handler.path = "https://localhost:6080/websockify?token={jwt_token}".format(jwt_token=jwt_token.serialize())
- mock_time.return_value = 150
-
- self.handler.server.token_plugin = token_plugins.JWTTokenApi("./tests/fixtures/public.pem")
- self.handler.validate_connection()
-
- self.assertEqual(self.handler.server.target_host, "remote_host")
- self.assertEqual(self.handler.server.target_port, "remote_port")
-
- @patch('websockify.websocketproxy.ProxyRequestHandler.send_auth_error', MagicMock())
- @patch('time.time')
- def test_jwt_early_time(self, mock_time):
- key = jwt.JWK()
- private_key = open("./tests/fixtures/private.pem", "rb").read()
- key.import_from_pem(private_key)
- jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port", 'nbf': 100, 'exp': 200 })
- jwt_token.make_signed_token(key)
- self.handler.path = "https://localhost:6080/websockify?token={jwt_token}".format(jwt_token=jwt_token.serialize())
- mock_time.return_value = 50
-
- self.handler.server.token_plugin = token_plugins.JWTTokenApi("./tests/fixtures/public.pem")
- with self.assertRaises(self.handler.server.EClose):
- self.handler.validate_connection()
-
- @patch('websockify.websocketproxy.ProxyRequestHandler.send_auth_error', MagicMock())
- @patch('time.time')
- def test_jwt_late_time(self, mock_time):
- key = jwt.JWK()
- private_key = open("./tests/fixtures/private.pem", "rb").read()
- key.import_from_pem(private_key)
- jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port", 'nbf': 100, 'exp': 200 })
- jwt_token.make_signed_token(key)
- self.handler.path = "https://localhost:6080/websockify?token={jwt_token}".format(jwt_token=jwt_token.serialize())
- mock_time.return_value = 250
-
- self.handler.server.token_plugin = token_plugins.JWTTokenApi("./tests/fixtures/public.pem")
- with self.assertRaises(self.handler.server.EClose):
- self.handler.validate_connection()
-
-
- @patch('websockify.websocketproxy.ProxyRequestHandler.send_auth_error', MagicMock())
- def test_symmetric_jws_token_plugin(self):
- secret = open("./tests/fixtures/symmetric.key").read()
- key = jwt.JWK()
- key.import_key(kty="oct",k=secret)
- jwt_token = jwt.JWT({"alg": "HS256"}, {'host': "remote_host", 'port': "remote_port"})
- jwt_token.make_signed_token(key)
- self.handler.path = "https://localhost:6080/websockify?token={jwt_token}".format(jwt_token=jwt_token.serialize())
-
- self.handler.server.token_plugin = token_plugins.JWTTokenApi("./tests/fixtures/symmetric.key")
- self.handler.validate_connection()
-
- self.assertEqual(self.handler.server.target_host, "remote_host")
- self.assertEqual(self.handler.server.target_port, "remote_port")
-
- @patch('websockify.websocketproxy.ProxyRequestHandler.send_auth_error', MagicMock())
- def test_symmetric_jws_token_plugin_with_illigal_key_exception(self):
- secret = open("./tests/fixtures/symmetric.key").read()
- key = jwt.JWK()
- key.import_key(kty="oct",k=secret)
- jwt_token = jwt.JWT({"alg": "HS256"}, {'host': "remote_host", 'port': "remote_port"})
- jwt_token.make_signed_token(key)
- self.handler.path = "https://localhost:6080/websockify?token={jwt_token}".format(jwt_token=jwt_token.serialize())
-
- self.handler.server.token_plugin = token_plugins.JWTTokenApi("wrong_sauce")
- self.assertRaises(self.handler.server.EClose,
- self.handler.validate_connection)
-
- @patch('websockify.websocketproxy.ProxyRequestHandler.send_auth_error', MagicMock())
- def test_asymmetric_jwe_token_plugin(self):
- private_key = jwt.JWK()
- public_key = jwt.JWK()
- private_key_data = open("./tests/fixtures/private.pem", "rb").read()
- public_key_data = open("./tests/fixtures/public.pem", "rb").read()
- private_key.import_from_pem(private_key_data)
- public_key.import_from_pem(public_key_data)
- jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port"})
- jwt_token.make_signed_token(private_key)
- jwe_token = jwt.JWT(header={"alg": "RSA1_5", "enc": "A256CBC-HS512"},
- claims=jwt_token.serialize())
- jwe_token.make_encrypted_token(public_key)
-
- self.handler.path = "https://localhost:6080/websockify?token={jwt_token}".format(jwt_token=jwe_token.serialize())
-
- self.handler.server.token_plugin = token_plugins.JWTTokenApi("./tests/fixtures/private.pem")
- self.handler.validate_connection()
-
- self.assertEqual(self.handler.server.target_host, "remote_host")
- self.assertEqual(self.handler.server.target_port, "remote_port")
-
- @patch('websockify.websocketproxy.ProxyRequestHandler.send_auth_error', MagicMock())
def test_auth_plugin(self):
class TestPlugin(auth_plugins.BasePlugin):
def authenticate(self, headers, target_host, target_port):