summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGary Lockyer <gary@catalyst.net.nz>2020-04-14 13:32:32 +1200
committerKarolin Seeger <kseeger@samba.org>2020-04-22 12:50:42 +0200
commit4aeb07ef49e4e8734fc5f5cd092bbf165e9cc9f3 (patch)
tree6c29330255a424e71f1b4937f9a268faac176943
parent16da9c6e3d87d11e358441804dc7ff842eb5a9e7 (diff)
downloadsamba-4aeb07ef49e4e8734fc5f5cd092bbf165e9cc9f3.tar.gz
CVE-2020-10704: ldapserver tests: Limit search request sizes
Add tests to ensure that overly long (> 256000 bytes) LDAP search requests are rejected. Credit to OSS-Fuzz REF: https://bugs.chromium.org/p/oss-fuzz/issues/detail?id=20454 BUG: https://bugzilla.samba.org/show_bug.cgi?id=14334 Signed-off-by: Gary Lockyer <gary@catalyst.net.nz> Reviewed-by: Andrew Bartlett <abartlet@samba.org>
-rw-r--r--python/samba/tests/ldap_raw.py234
-rw-r--r--selftest/knownfail.d/ldap_raw1
-rwxr-xr-xsource4/selftest/tests.py5
3 files changed, 240 insertions, 0 deletions
diff --git a/python/samba/tests/ldap_raw.py b/python/samba/tests/ldap_raw.py
new file mode 100644
index 00000000000..334fabce230
--- /dev/null
+++ b/python/samba/tests/ldap_raw.py
@@ -0,0 +1,234 @@
+# Integration tests for the ldap server, using raw socket IO
+#
+# Tests for handling of malformed or large packets.
+#
+# Copyright (C) Catalyst.Net Ltd 2020
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import socket
+
+import samba.tests
+from samba.tests import TestCase
+
+
+#
+# LDAP Operations
+#
+SEARCH = b'\x63'
+
+EQUALS = b'\xa3'
+
+
+#
+# ASN.1 Element types
+#
+BOOLEAN = b'\x01'
+INTEGER = b'\x02'
+OCTET_STRING = b'\x04'
+NULL = b'\x05'
+ENUMERATED = b'\x0a'
+SEQUENCE = b'\x30'
+SET = b'\x31'
+
+
+#
+# ASN.1 Helper functions.
+#
+def encode_element(ber_type, data):
+ ''' Encode an ASN.1 BER element. '''
+ if data is None:
+ return ber_type + encode_length(0)
+ return ber_type + encode_length(len(data)) + data
+
+
+def encode_length(length):
+ ''' Encode the length of an ASN.1 BER element. '''
+
+ if length > 0xFFFFFF:
+ return b'\x84' + length.to_bytes(4, "big")
+ if length > 0xFFFF:
+ return b'\x83' + length.to_bytes(3, "big")
+ if length > 0xFF:
+ return b'\x82' + length.to_bytes(2, "big")
+ if length > 0x7F:
+ return b'\x81' + length.to_bytes(1, "big")
+ return length.to_bytes(1, "big")
+
+
+def encode_string(string):
+ ''' Encode an octet string '''
+ return encode_element(OCTET_STRING, string)
+
+
+def encode_boolean(boolean):
+ ''' Encode a boolean value '''
+ if boolean:
+ return encode_element(BOOLEAN, b'\xFF')
+ return encode_element(BOOLEAN, b'\x00')
+
+
+def encode_integer(integer):
+ ''' Encode an integer value '''
+ bit_len = integer.bit_length()
+ byte_len = (bit_len // 8) + 1
+ return encode_element(INTEGER, integer.to_bytes(byte_len, "big"))
+
+
+def encode_enumerated(enum):
+ ''' Encode an enumerated value '''
+ return encode_element(ENUMERATED, enum.to_bytes(1, "big"))
+
+
+def encode_sequence(sequence):
+ ''' Encode a sequence '''
+ return encode_element(SEQUENCE, sequence)
+
+
+class RawLdapTest(TestCase):
+ """A raw Ldap Test case."""
+
+ def setUp(self):
+ super(RawLdapTest, self).setUp()
+
+ self.host = samba.tests.env_get_var_value('SERVER')
+ self.port = 389
+ self.socket = None
+ self.connect()
+
+ def tearDown(self):
+ self.disconnect()
+ super(RawLdapTest, self).tearDown()
+
+ def disconnect(self):
+ ''' Disconnect from and clean up the connection to the server '''
+ if self.socket is None:
+ return
+ self.socket.close()
+ self.socket = None
+
+ def connect(self):
+ ''' Open a socket stream connection to the server '''
+ try:
+ self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.socket.settimeout(10)
+ self.socket.connect((self.host, self.port))
+ except socket.error:
+ self.socket.close()
+ raise
+
+ def send(self, req):
+ ''' Send the request to the server '''
+ try:
+ self.socket.sendall(req)
+ except socket.error:
+ self.disconnect()
+ raise
+
+ def recv(self, num_recv=0xffff, timeout=None):
+ ''' recv an array of bytes from the server '''
+ data = None
+ try:
+ if timeout is not None:
+ self.socket.settimeout(timeout)
+ data = self.socket.recv(num_recv, 0)
+ self.socket.settimeout(10)
+ if len(data) == 0:
+ self.disconnect()
+ return None
+ except socket.timeout:
+ # We ignore timeout's as the ldap server will drop the connection
+ # on the errors we're testing. So returning None on a timeout is
+ # the desired behaviour.
+ self.socket.settimeout(10)
+ except socket.error:
+ self.disconnect()
+ raise
+ return data
+
+ def test_search_equals_maximum_permitted_size(self):
+ '''
+ Check that an LDAP search request equal to the maximum size is accepted
+ '''
+
+ # Lets build an ldap search packet to query the RootDSE
+ header = encode_string(None) # Base DN, ""
+ header += encode_enumerated(0) # Enumeration scope
+ header += encode_enumerated(0) # Enumeration dereference
+ header += encode_integer(0) # Integer size limit
+ header += encode_integer(0) # Integer time limit
+ header += encode_boolean(False) # Boolean attributes only
+
+ #
+ # build an equality search of the form x...x=y...y
+ # With the length of x...x and y...y chosen to generate an
+ # ldap request of 256000 bytes.
+ x = encode_string(b'x' * 127974)
+ y = encode_string(b'y' * 127979)
+ equals = encode_element(EQUALS, x + y)
+ trailer = encode_sequence(None)
+ search = encode_element(SEARCH, header + equals + trailer)
+
+ msg_no = encode_integer(1)
+ packet = encode_sequence(msg_no + search)
+ #
+ # The length of the packet should be equal to the
+ # Maximum length of a search query
+ self.assertEqual(256000, len(packet))
+
+ self.send(packet)
+ data = self.recv()
+ self.assertIsNotNone(data)
+
+ # Should be a sequence
+ self.assertEqual(SEQUENCE, data[0:1])
+
+ def test_search_exceeds_maximum_permitted_size(self):
+ '''
+ Test that a search query longer than the maximum permitted
+ size is rejected.
+ '''
+
+ # Lets build an ldap search packet to query the RootDSE
+ header = encode_string(None) # Base DN, ""
+ header += encode_enumerated(0) # Enumeration scope
+ header += encode_enumerated(0) # Enumeration dereference
+ header += encode_integer(0) # Integer size limit
+ header += encode_integer(0) # Integer time limit
+ header += encode_boolean(False) # Boolean attributes only
+
+ #
+ # build an equality search of the form x...x=y...y
+ # With the length of x...x and y...y chosen to generate an
+ # ldap request of 256001 bytes.
+ x = encode_string(b'x' * 127979)
+ y = encode_string(b'y' * 127975)
+ equals = encode_element(EQUALS, x + y)
+ trailer = encode_sequence(None)
+ search = encode_element(SEARCH, header + equals + trailer)
+
+ msg_no = encode_integer(1)
+ packet = encode_sequence(msg_no + search)
+ #
+ # The length of the sequence data should be one greater than the
+ # Maximum length of a search query
+ self.assertEqual(256001, len(packet))
+
+ self.send(packet)
+ data = self.recv()
+ #
+ # The connection should be closed by the server and we should not
+ # see any data.
+ self.assertIsNone(data)
diff --git a/selftest/knownfail.d/ldap_raw b/selftest/knownfail.d/ldap_raw
new file mode 100644
index 00000000000..8bd2ee55166
--- /dev/null
+++ b/selftest/knownfail.d/ldap_raw
@@ -0,0 +1 @@
+^samba.tests.ldap_raw.samba.tests.ldap_raw.RawLdapTest.test_search_exceeds_maximum_permitted_size\(ad_dc\)
diff --git a/source4/selftest/tests.py b/source4/selftest/tests.py
index 1bf598832ca..cbb38f0ee62 100755
--- a/source4/selftest/tests.py
+++ b/source4/selftest/tests.py
@@ -894,6 +894,11 @@ plantestsuite_loadlist("samba4.ldap_modify_order.normal_user.python(ad_dc_defaul
'$LOADLIST',
'$LISTOPT'])
+planoldpythontestsuite("ad_dc",
+ "samba.tests.ldap_raw",
+ extra_args=['-U"$USERNAME%$PASSWORD"'],
+ environ={'TEST_ENV': 'ad_dc'})
+
plantestsuite_loadlist("samba4.tokengroups.krb5.python(ad_dc_default)", "ad_dc_default:local", [python, os.path.join(DSDB_PYTEST_DIR, "token_group.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN', '-k', 'yes', '$LOADLIST', '$LISTOPT'])
plantestsuite_loadlist("samba4.tokengroups.ntlm.python(ad_dc_default)", "ad_dc_default:local", [python, os.path.join(DSDB_PYTEST_DIR, "token_group.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN', '-k', 'no', '$LOADLIST', '$LISTOPT'])
plantestsuite("samba4.sam.python(fl2008r2dc)", "fl2008r2dc", [python, os.path.join(DSDB_PYTEST_DIR, "sam.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN'])