summaryrefslogtreecommitdiff
path: root/tests/test_ssl.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_ssl.py')
-rw-r--r--tests/test_ssl.py46
1 files changed, 42 insertions, 4 deletions
diff --git a/tests/test_ssl.py b/tests/test_ssl.py
index bfc5ca8..e6c0cdc 100644
--- a/tests/test_ssl.py
+++ b/tests/test_ssl.py
@@ -9,6 +9,7 @@ import datetime
import gc
import select
import sys
+import time
import uuid
from errno import (
EAFNOSUPPORT,
@@ -4369,10 +4370,11 @@ class TestDTLS:
# new versions of OpenSSL, this is unnecessary, but harmless, because the
# DTLS state machine treats it like a network hiccup that duplicated a
# packet, which DTLS is robust against.
- def test_it_works_at_all(self):
- # arbitrary number larger than any conceivable handshake volley
- LARGE_BUFFER = 65536
+ # Arbitrary number larger than any conceivable handshake volley.
+ LARGE_BUFFER = 65536
+
+ def test_it_works_at_all(self):
s_ctx = Context(DTLS_METHOD)
def generate_cookie(ssl):
@@ -4403,7 +4405,7 @@ class TestDTLS:
def pump_membio(label, source, sink):
try:
- chunk = source.bio_read(LARGE_BUFFER)
+ chunk = source.bio_read(self.LARGE_BUFFER)
except WantReadError:
return False
# I'm not sure this check is needed, but I'm not sure it's *not*
@@ -4483,3 +4485,39 @@ class TestDTLS:
assert 0 < c.get_cleartext_mtu() < 500
except NotImplementedError: # OpenSSL 1.1.0 and earlier
pass
+
+ def test_timeout(self, monkeypatch):
+ c_ctx = Context(DTLS_METHOD)
+ c = Connection(c_ctx)
+
+ # No timeout before the handshake starts.
+ assert c.DTLSv1_get_timeout() is None
+ assert c.DTLSv1_handle_timeout() is False
+
+ # Start handshake and check there is data to send.
+ c.set_connect_state()
+ try:
+ c.do_handshake()
+ except SSL.WantReadError:
+ pass
+ assert c.bio_read(self.LARGE_BUFFER)
+
+ # There should now be an active timeout.
+ seconds = c.DTLSv1_get_timeout()
+ assert seconds is not None
+
+ # Handle the timeout and check there is data to send.
+ time.sleep(seconds)
+ assert c.DTLSv1_handle_timeout() is True
+ assert c.bio_read(self.LARGE_BUFFER)
+
+ # After the maximum number of allowed timeouts is reached,
+ # DTLSv1_handle_timeout will return -1.
+ #
+ # Testing this directly is prohibitively time consuming as the timeout
+ # duration is doubled on each retry, so the best we can do is to mock
+ # this condition.
+ monkeypatch.setattr(_lib, "DTLSv1_handle_timeout", lambda x: -1)
+
+ with pytest.raises(Error):
+ c.DTLSv1_handle_timeout()