summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorGary Lockyer <gary@catalyst.net.nz>2020-10-27 09:32:21 +1300
committerAndrew Bartlett <abartlet@samba.org>2020-11-04 22:54:41 +0000
commit005435dc4d7de9d442c7513edec8c782fe20fda3 (patch)
treea39262ed116b1218d59300ede4d542659caa6030 /python
parent41c8aa4b991aad306d731b08d068c480eb5c7fed (diff)
downloadsamba-005435dc4d7de9d442c7513edec8c782fe20fda3.tar.gz
tests python krb5: Add python kerberos canonicalization tests
Add python canonicalization tests, loosely based on the code in source4/torture/krb5/kdc-canon-heimdal.c. The long term goal is to move the integration level tests out of kdc-canon-heimdal, leaving it as a heimdal library unit test. Signed-off-by: Gary Lockyer <gary@catalyst.net.nz> Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Diffstat (limited to 'python')
-rwxr-xr-xpython/samba/tests/krb5/as_canonicalization_tests.py499
-rw-r--r--python/samba/tests/usage.py1
2 files changed, 500 insertions, 0 deletions
diff --git a/python/samba/tests/krb5/as_canonicalization_tests.py b/python/samba/tests/krb5/as_canonicalization_tests.py
new file mode 100755
index 00000000000..7b599ad6e44
--- /dev/null
+++ b/python/samba/tests/krb5/as_canonicalization_tests.py
@@ -0,0 +1,499 @@
+#!/usr/bin/env python3
+# Unix SMB/CIFS implementation.
+#
+# Copyright (C) Catalyst IT Ltd. 2020
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import sys
+import os
+from enum import Enum, unique
+import pyasn1
+
+sys.path.insert(0, "bin/python")
+os.environ["PYTHONUNBUFFERED"] = "1"
+
+from samba.tests.krb5.raw_testcase import RawKerberosTest
+import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
+import samba
+from samba.auth import system_session
+from samba.credentials import (
+ Credentials,
+ CLI_CRED_NTLMv2_AUTH,
+ CLI_CRED_NTLM_AUTH,
+ DONT_USE_KERBEROS)
+from samba.dcerpc.misc import SEC_CHAN_WKSTA
+from samba.dsdb import (
+ UF_WORKSTATION_TRUST_ACCOUNT,
+ UF_PASSWD_NOTREQD,
+ UF_NORMAL_ACCOUNT)
+from samba.samdb import SamDB
+from samba.tests import delete_force, DynamicTestCase
+
+global_asn1_print = False
+global_hexdump = False
+
+
+@unique
+class TestOptions(Enum):
+ Canonicalize = 1
+ Enterprise = 2
+ UpperRealm = 4
+ UpperUserName = 8
+ NetbiosRealm = 16
+ UPN = 32
+ RemoveDollar = 64
+ Last = 128
+
+ def is_set(self, x):
+ return self.value & x
+
+
+@unique
+class CredentialsType(Enum):
+ User = 1
+ Machine = 2
+
+ def is_set(self, x):
+ return self.value & x
+
+
+class TestData:
+
+ def __init__(self, options, creds):
+ self.options = options
+ self.user_creds = creds
+ self.user_name = self.get_username(options, creds)
+ self.realm = self.get_realm(options, creds)
+ self.cname = RawKerberosTest.PrincipalName_create(
+ name_type=1, names=[self.user_name])
+ self.sname = RawKerberosTest.PrincipalName_create(
+ name_type=2, names=["krbtgt", self.realm])
+ self.canonicalize = TestOptions.Canonicalize.is_set(options)
+
+ def get_realm(self, options, creds):
+ realm = creds.get_realm()
+ if TestOptions.NetbiosRealm.is_set(options):
+ realm = creds.get_domain()
+ if TestOptions.UpperRealm.is_set(options):
+ realm = realm.upper()
+ else:
+ realm = realm.lower()
+ return realm
+
+ def get_username(self, options, creds):
+ name = creds.get_username()
+ if TestOptions.RemoveDollar.is_set(options) and name.endswith("$"):
+ name = name[:-1]
+ if TestOptions.Enterprise.is_set(options):
+ realm = creds.get_realm()
+ name = "{0}@{1}".format(name, realm)
+ if TestOptions.UpperUserName.is_set(options):
+ name = name.upper()
+ return name
+
+ def __repr__(self):
+ rep = "Test Data: "
+ rep += "options = '" + "{:08b}".format(self.options) + "'"
+ rep += "user name = '" + self.user_name + "'"
+ rep += ", realm = '" + self.realm + "'"
+ rep += ", cname = '" + str(self.cname) + "'"
+ rep += ", sname = '" + str(self.sname) + "'"
+ return rep
+
+
+MACHINE_NAME = "tstkrb5cnnusr"
+USER_NAME = "tstkrb5cnnmch"
+
+# Encryption types
+AES256_CTS_HMAC_SHA1_96 = int(
+ krb5_asn1.EncryptionTypeValues('kRB5-ENCTYPE-AES256-CTS-HMAC-SHA1-96'))
+AES128_CTS_HMAC_SHA1_96 = int(
+ krb5_asn1.EncryptionTypeValues('kRB5-ENCTYPE-AES128-CTS-HMAC-SHA1-96'))
+ARCFOUR_HMAC_MD5 = int(
+ krb5_asn1.EncryptionTypeValues('kRB5-ENCTYPE-ARCFOUR-HMAC-MD5'))
+
+# Message types
+KRB_ERROR = int(krb5_asn1.MessageTypeValues('krb-error'))
+KRB_AS_REP = int(krb5_asn1.MessageTypeValues('krb-as-rep'))
+
+# PAData types
+PADATA_ENC_TIMESTAMP = int(
+ krb5_asn1.PADataTypeValues('kRB5-PADATA-ENC-TIMESTAMP'))
+PADATA_ETYPE_INFO2 = int(
+ krb5_asn1.PADataTypeValues('kRB5-PADATA-ETYPE-INFO2'))
+
+# Error codes
+KDC_ERR_C_PRINCIPAL_UNKNOWN = 6
+KDC_ERR_PREAUTH_REQUIRED = 25
+
+# Name types
+NT_UNKNOWN = int(krb5_asn1.NameTypeValues('kRB5-NT-UNKNOWN'))
+NT_PRINCIPAL = int(krb5_asn1.NameTypeValues('kRB5-NT-PRINCIPAL'))
+NT_SRV_INST = int(krb5_asn1.NameTypeValues('kRB5-NT-SRV-INST'))
+
+
+@DynamicTestCase
+class KerberosASCanonicalizationTests(RawKerberosTest):
+
+ @classmethod
+ def setUpDynamicTestCases(cls):
+
+ def skip(ct, options):
+ ''' Filter out any mutually exclusive test options '''
+ if ct != CredentialsType.Machine and\
+ TestOptions.RemoveDollar.is_set(options):
+ return True
+ return False
+
+ def build_test_name(ct, options):
+ name = "%sCredentials" % ct.name
+ for opt in TestOptions:
+ if opt.is_set(options):
+ name += ("_%s" % opt.name)
+ return name
+
+ for ct in CredentialsType:
+ for x in range(TestOptions.Last.value):
+ if skip(ct, x):
+ continue
+ name = build_test_name(ct, x)
+ cls.generate_dynamic_test("test", name, x, ct)
+
+ @classmethod
+ def setUpClass(cls):
+ cls.lp = cls.get_loadparm(cls)
+ cls.username = os.environ["USERNAME"]
+ cls.password = os.environ["PASSWORD"]
+ cls.domain = os.environ["DOMAIN"]
+ cls.realm = os.environ["REALM"]
+ cls.host = os.environ["SERVER"]
+
+ c = Credentials()
+ c.set_username(cls.username)
+ c.set_password(cls.password)
+ c.set_domain(cls.domain)
+ c.set_realm(cls.realm)
+ cls.credentials = c
+
+ cls.session = system_session()
+ cls.ldb = SamDB(url="ldap://%s" % cls.host,
+ session_info=cls.session,
+ credentials=cls.credentials,
+ lp=cls.lp)
+ cls.create_machine_account()
+ cls.create_user_account()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(KerberosASCanonicalizationTests, cls).tearDownClass()
+ delete_force(cls.ldb, cls.machine_dn)
+ delete_force(cls.ldb, cls.user_dn)
+
+ def setUp(self):
+ super(KerberosASCanonicalizationTests, self).setUp()
+ self.do_asn1_print = global_asn1_print
+ self.do_hexdump = global_hexdump
+
+ #
+ # Create a test user account
+ @classmethod
+ def create_user_account(cls):
+ cls.user_pass = samba.generate_random_password(32, 32)
+ cls.user_name = USER_NAME
+ cls.user_dn = "cn=%s,%s" % (cls.user_name, cls.ldb.domain_dn())
+
+ # remove the account if it exists, this will happen if a previous test
+ # run failed
+ delete_force(cls.ldb, cls.user_dn)
+
+ utf16pw = ('"%s"' % cls.user_pass).encode('utf-16-le')
+ cls.ldb.add({
+ "dn": cls.user_dn,
+ "objectclass": "user",
+ "sAMAccountName": "%s" % cls.user_name,
+ "userAccountControl": str(UF_NORMAL_ACCOUNT),
+ "unicodePwd": utf16pw})
+
+ cls.user_creds = Credentials()
+ cls.user_creds.guess(cls.lp)
+ cls.user_creds.set_password(cls.user_pass)
+ cls.user_creds.set_username(cls.user_name)
+ cls.user_creds.set_workstation(cls.machine_name)
+
+ #
+ # Create the machine account
+ @classmethod
+ def create_machine_account(cls):
+ cls.machine_pass = samba.generate_random_password(32, 32)
+ cls.machine_name = MACHINE_NAME
+ cls.machine_dn = "cn=%s,%s" % (cls.machine_name, cls.ldb.domain_dn())
+
+ # remove the account if it exists, this will happen if a previous test
+ # run failed
+ delete_force(cls.ldb, cls.machine_dn)
+
+ utf16pw = ('"%s"' % cls.machine_pass).encode('utf-16-le')
+ cls.ldb.add({
+ "dn": cls.machine_dn,
+ "objectclass": "computer",
+ "sAMAccountName": "%s$" % cls.machine_name,
+ "userAccountControl":
+ str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PASSWD_NOTREQD),
+ "unicodePwd": utf16pw})
+
+ cls.machine_creds = Credentials()
+ cls.machine_creds.guess(cls.lp)
+ cls.machine_creds.set_secure_channel_type(SEC_CHAN_WKSTA)
+ cls.machine_creds.set_kerberos_state(DONT_USE_KERBEROS)
+ cls.machine_creds.set_password(cls.machine_pass)
+ cls.machine_creds.set_username(cls.machine_name + "$")
+ cls.machine_creds.set_workstation(cls.machine_name)
+
+ def _test_with_args(self, x, ct):
+ if ct == CredentialsType.User:
+ creds = self.user_creds
+ elif ct == CredentialsType.Machine:
+ creds = self.machine_creds
+ else:
+ raise Exception("Unexpected credential type")
+ data = TestData(x, creds)
+
+ try:
+ (rep, as_rep) = self.as_req(data)
+ except pyasn1.error.PyAsn1Error as e:
+ import traceback
+ self.fail("ASN1 Error, Options {0:08b}:{1} {2}".format(
+ traceback.format_exc(),
+ data.options,
+ e))
+ # If as_req triggered an expected server error response
+ # No need to test the response data.
+ if rep is not None:
+ # The kvno is optional, heimdal includes it
+ # MIT does not.
+ if 'kvno' in rep['enc-part']:
+ kvno = rep['enc-part']['kvno']
+ self.check_kvno(kvno, data)
+
+ cname = rep['cname']
+ self.check_cname(cname, data)
+
+ crealm = rep['crealm'].decode('ascii')
+ self.check_crealm(crealm, data)
+
+ sname = as_rep['sname']
+ self.check_sname(sname, data)
+
+ srealm = as_rep['srealm'].decode('ascii')
+ self.check_srealm(srealm, data)
+
+ def as_req(self, data):
+ user_creds = data.user_creds
+ realm = data.realm
+
+ cname = data.cname
+ sname = data.sname
+
+ till = self.get_KerberosTime(offset=36000)
+
+ kdc_options = "0"
+ if data.canonicalize:
+ kdc_options = str(krb5_asn1.KDCOptions('canonicalize'))
+
+ padata = None
+
+ # Set the allowable encryption types
+ etypes = (
+ AES256_CTS_HMAC_SHA1_96,
+ AES128_CTS_HMAC_SHA1_96,
+ ARCFOUR_HMAC_MD5)
+
+ req = self.AS_REQ_create(padata=padata,
+ kdc_options=kdc_options,
+ cname=cname,
+ realm=realm,
+ sname=sname,
+ from_time=None,
+ till_time=till,
+ renew_time=None,
+ nonce=0x7fffffff,
+ etypes=etypes,
+ addresses=None,
+ EncAuthorizationData=None,
+ EncAuthorizationData_key=None,
+ additional_tickets=None)
+ rep = self.send_recv_transaction(req)
+ self.assertIsNotNone(rep)
+
+ #
+ # Check the protocol version, should be 5
+ self.assertEqual(
+ rep['pvno'], 5, "Data {0}".format(str(data)))
+
+ self.assertEqual(
+ rep['msg-type'], KRB_ERROR, "Data {0}".format(str(data)))
+
+ # We should get KDC_ERR_PREAUTH_REQUIRED
+ # unless the RemoveDollar and Enterprise options are set
+ # then we should get a KDC_ERR_C_PRINCIPAL_UNKNOWN
+ if TestOptions.RemoveDollar.is_set(data.options) and\
+ TestOptions.Enterprise.is_set(data.options):
+ self.assertEqual(
+ rep['error-code'],
+ KDC_ERR_C_PRINCIPAL_UNKNOWN,
+ "Error code {0}, Data {1}".format(rep['error-code'], str(data)))
+ return (None, None)
+
+ self.assertEqual(
+ rep['error-code'],
+ KDC_ERR_PREAUTH_REQUIRED,
+ "Error code {0}, Data {1}".format(rep['error-code'], str(data)))
+
+ rep_padata = self.der_decode(
+ rep['e-data'], asn1Spec=krb5_asn1.METHOD_DATA())
+
+ for pa in rep_padata:
+ if pa['padata-type'] == 19:
+ etype_info2 = pa['padata-value']
+ break
+
+ etype_info2 = self.der_decode(
+ etype_info2, asn1Spec=krb5_asn1.ETYPE_INFO2())
+
+ key = self.PasswordKey_from_etype_info2(user_creds, etype_info2[0])
+
+ (patime, pausec) = self.get_KerberosTimeWithUsec()
+ pa_ts = self.PA_ENC_TS_ENC_create(patime, pausec)
+ pa_ts = self.der_encode(pa_ts, asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
+
+ enc_pa_ts_usage = 1
+ pa_ts = self.EncryptedData_create(key, enc_pa_ts_usage, pa_ts)
+ pa_ts = self.der_encode(pa_ts, asn1Spec=krb5_asn1.EncryptedData())
+
+ pa_ts = self.PA_DATA_create(PADATA_ENC_TIMESTAMP, pa_ts)
+
+ kdc_options = "0"
+ if data.canonicalize:
+ kdc_options = str(krb5_asn1.KDCOptions('canonicalize'))
+ padata = [pa_ts]
+
+ req = self.AS_REQ_create(padata=padata,
+ kdc_options=kdc_options,
+ cname=cname,
+ realm=realm,
+ sname=sname,
+ from_time=None,
+ till_time=till,
+ renew_time=None,
+ nonce=0x7fffffff,
+ etypes=etypes,
+ addresses=None,
+ EncAuthorizationData=None,
+ EncAuthorizationData_key=None,
+ additional_tickets=None)
+ rep = self.send_recv_transaction(req)
+ self.assertIsNotNone(rep)
+
+ #
+ # Check the protocol version, should be 5
+ self.assertEqual(
+ rep['pvno'], 5, "Data {0}".format(str(data)))
+
+ msg_type = rep['msg-type']
+ # Should not have got an error.
+ # If we did, fail and print the error code to help debugging
+ self.assertNotEqual(
+ msg_type,
+ KRB_ERROR,
+ "Error code {0}, Data {1}".format(
+ rep.get('error-code', ''),
+ str(data)))
+
+ self.assertEqual(msg_type, KRB_AS_REP, "Data {0}".format(str(data)))
+
+ # Decrypt and decode the EncKdcRepPart
+ enc = key.decrypt(3, rep['enc-part']['cipher'])
+ if enc[0] == 0x7A:
+ # MIT Kerberos Tags the EncASRepPart as a EncKDCRepPart
+ # i.e. tag number 26 instead of tag number 25
+ as_rep = self.der_decode(enc, asn1Spec=krb5_asn1.EncTGSRepPart())
+ else:
+ as_rep = self.der_decode(enc, asn1Spec=krb5_asn1.EncASRepPart())
+
+ return (rep, as_rep)
+
+ def check_cname(self, cname, data):
+ nt = cname['name-type']
+ self.assertEqual(
+ NT_PRINCIPAL,
+ nt,
+ "cname name-type, Options {0:08b}".format(data.options))
+
+ ns = cname['name-string']
+ name = ns[0].decode('ascii')
+
+ expected = data.user_name
+ if TestOptions.Canonicalize.is_set(data.options):
+ expected = data.user_creds.get_username()
+ self.assertEqual(
+ expected,
+ name,
+ "cname principal, Options {0:08b}".format(data.options))
+
+ def check_crealm(self, crealm, data):
+ realm = data.user_creds.get_realm()
+ self.assertEqual(
+ realm, crealm, "crealm, Options {0:08b}".format(data.options))
+
+ def check_sname(self, sname, data):
+ nt = sname['name-type']
+ self.assertEqual(
+ NT_SRV_INST,
+ nt,
+ "sname name-type, Options {0:08b}".format(data.options))
+
+ ns = sname['name-string']
+ name = ns[0].decode('ascii')
+ self.assertEqual(
+ 'krbtgt',
+ name,
+ "sname principal, Options {0:08b}".format(data.options))
+
+ realm = ns[1].decode('ascii')
+ expected = data.realm
+ if TestOptions.Canonicalize.is_set(data.options):
+ expected = data.user_creds.get_realm().upper()
+ self.assertEqual(
+ expected,
+ realm,
+ "sname realm, Options {0:08b}".format(data.options))
+
+ def check_srealm(self, srealm, data):
+ realm = data.user_creds.get_realm()
+ self.assertEqual(
+ realm, srealm, "srealm, Options {0:08b}".format(data.options))
+
+ def check_kvno(self, kvno, data):
+ self.assertEqual(
+ 1, kvno, "kvno, Options {0:08b}".format(data.options))
+
+
+if __name__ == "__main__":
+ global_asn1_print = True
+ global_hexdump = True
+ import unittest
+
+ unittest.main()
diff --git a/python/samba/tests/usage.py b/python/samba/tests/usage.py
index 89b5e957407..2f813760814 100644
--- a/python/samba/tests/usage.py
+++ b/python/samba/tests/usage.py
@@ -89,6 +89,7 @@ EXCLUDE_USAGE = {
'python/samba/tests/krb5/simple_tests.py',
'python/samba/tests/krb5/s4u_tests.py',
'python/samba/tests/krb5/xrealm_tests.py',
+ 'python/samba/tests/krb5/as_canonicalization_tests.py',
}
EXCLUDE_HELP = {