summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBob Halley <halley@dnspython.org>2021-11-09 07:36:30 -0800
committerBob Halley <halley@dnspython.org>2021-11-09 07:36:30 -0800
commit23dd006a6f234949b9fb3072c00ed9b7932020ea (patch)
treeeb5e2ed35f42add68b1ea6909a5849d674fda6e7
parent246febc405089c8551cdba3e636d174e7446ca13 (diff)
downloaddnspython-win32util.tar.gz
Add WMI-based method for finding resolver info on Windows.win32util
-rwxr-xr-xdns/win32util.py248
-rw-r--r--tests/test_resolver.py16
2 files changed, 255 insertions, 9 deletions
diff --git a/dns/win32util.py b/dns/win32util.py
new file mode 100755
index 0000000..b2300dd
--- /dev/null
+++ b/dns/win32util.py
@@ -0,0 +1,248 @@
+import sys
+
+if sys.platform == 'win32':
+
+ import dns.name
+
+ _prefer_wmi = True
+
+ import winreg
+
+ try:
+ try:
+ import threading as _threading
+ except ImportError: # pragma: no cover
+ import dummy_threading as _threading # type: ignore
+ import pythoncom
+ import wmi
+ _have_wmi = True
+ except Exception as e:
+ _have_wmi = False
+
+ def _config_domain(domain):
+ # Sometimes DHCP servers add a '.' prefix to the default domain, and
+ # Windows just stores such values in the registry (see #687).
+ # Check for this and fix it.
+ if domain.startswith('.'):
+ domain = domain[1:]
+ return dns.name.from_text(domain)
+
+ class DnsInfo:
+ def __init__(self):
+ self.domain = None
+ self.nameservers = []
+ self.search = []
+
+ if _have_wmi:
+ class _WMIGetter(_threading.Thread):
+ def __init__(self):
+ super().__init__()
+ self.info = DnsInfo()
+
+ def run(self):
+ pythoncom.CoInitialize()
+ try:
+ system = wmi.WMI()
+ for interface in system.Win32_NetworkAdapterConfiguration():
+ if interface.IPEnabled:
+ self.info.domain = _config_domain(interface.DNSDomain)
+ self.info.nameservers = list(interface.DNSServerSearchOrder)
+ self.info.search = [dns.name.from_text(x) for x in
+ interface.DNSDomainSuffixSearchOrder]
+ break
+ finally:
+ pythoncom.CoUninitialize()
+
+ def get(self):
+ # We always run in a separate thread to avoid any issues with
+ # the COM threading model.
+ self.start()
+ self.join()
+ return self.info
+
+
+ def get_dns_info_from_wmi():
+ getter = _WMIGetter()
+ return getter.get()
+ else:
+ class _WMIGetter:
+ pass
+ def get(self):
+ return None
+
+
+ class _RegistryGetter:
+ def __init__(self):
+ self.info = DnsInfo()
+
+ def _determine_split_char(self, entry):
+ #
+ # The windows registry irritatingly changes the list element
+ # delimiter in between ' ' and ',' (and vice-versa) in various
+ # versions of windows.
+ #
+ if entry.find(' ') >= 0:
+ split_char = ' '
+ elif entry.find(',') >= 0:
+ split_char = ','
+ else:
+ # probably a singleton; treat as a space-separated list.
+ split_char = ' '
+ return split_char
+
+ def _config_nameservers(self, nameservers):
+ split_char = self._determine_split_char(nameservers)
+ ns_list = nameservers.split(split_char)
+ for ns in ns_list:
+ if ns not in self.info.nameservers:
+ self.info.nameservers.append(ns)
+
+ def _config_search(self, search):
+ split_char = self._determine_split_char(search)
+ search_list = search.split(split_char)
+ for s in search_list:
+ s = dns.name.from_text(s)
+ if s not in self.info.search:
+ self.info.search.append(s)
+
+ def _config_fromkey(self, key, always_try_domain):
+ try:
+ servers, _ = winreg.QueryValueEx(key, 'NameServer')
+ except WindowsError:
+ servers = None
+ if servers:
+ self._config_nameservers(servers)
+ if servers or always_try_domain:
+ try:
+ dom, _ = winreg.QueryValueEx(key, 'Domain')
+ if dom:
+ self.info.domain = _config_domain(dom)
+ except WindowsError:
+ pass
+ else:
+ try:
+ servers, _ = winreg.QueryValueEx(key, 'DhcpNameServer')
+ except WindowsError:
+ servers = None
+ if servers:
+ self._config_nameservers(servers)
+ try:
+ dom, _ = winreg.QueryValueEx(key, 'DhcpDomain')
+ if dom:
+ self.info.domain = _config_domain(dom)
+ except WindowsError:
+ pass
+ try:
+ search, _ = winreg.QueryValueEx(key, 'SearchList')
+ except WindowsError:
+ search = None
+ if search is None:
+ try:
+ search, _ = winreg.QueryValueEx(key, 'DhcpSearchList')
+ except WindowsError:
+ search = None
+ if search:
+ self._config_search(search)
+
+ def _is_nic_enabled(self, lm, guid):
+ # Look in the Windows Registry to determine whether the network
+ # interface corresponding to the given guid is enabled.
+ #
+ # (Code contributed by Paul Marks, thanks!)
+ #
+ try:
+ # This hard-coded location seems to be consistent, at least
+ # from Windows 2000 through Vista.
+ connection_key = winreg.OpenKey(
+ lm,
+ r'SYSTEM\CurrentControlSet\Control\Network'
+ r'\{4D36E972-E325-11CE-BFC1-08002BE10318}'
+ r'\%s\Connection' % guid)
+
+ try:
+ # The PnpInstanceID points to a key inside Enum
+ (pnp_id, ttype) = winreg.QueryValueEx(
+ connection_key, 'PnpInstanceID')
+
+ if ttype != winreg.REG_SZ:
+ raise ValueError # pragma: no cover
+
+ device_key = winreg.OpenKey(
+ lm, r'SYSTEM\CurrentControlSet\Enum\%s' % pnp_id)
+
+ try:
+ # Get ConfigFlags for this device
+ (flags, ttype) = winreg.QueryValueEx(
+ device_key, 'ConfigFlags')
+
+ if ttype != winreg.REG_DWORD:
+ raise ValueError # pragma: no cover
+
+ # Based on experimentation, bit 0x1 indicates that the
+ # device is disabled.
+ #
+ # XXXRTH I suspect we really want to & with 0x03 so
+ # that CONFIGFLAGS_REMOVED devices are also ignored,
+ # but we're shifting to WMI as ConfigFlags is not
+ # supposed to be used.
+ return not flags & 0x1
+
+ finally:
+ device_key.Close()
+ finally:
+ connection_key.Close()
+ except Exception: # pragma: no cover
+ return False
+
+ def get(self):
+ """Extract resolver configuration from the Windows registry."""
+
+ lm = winreg.ConnectRegistry(None, winreg.HKEY_LOCAL_MACHINE)
+ try:
+ tcp_params = winreg.OpenKey(lm,
+ r'SYSTEM\CurrentControlSet'
+ r'\Services\Tcpip\Parameters')
+ try:
+ self._config_fromkey(tcp_params, True)
+ finally:
+ tcp_params.Close()
+ interfaces = winreg.OpenKey(lm,
+ r'SYSTEM\CurrentControlSet'
+ r'\Services\Tcpip\Parameters'
+ r'\Interfaces')
+ try:
+ i = 0
+ while True:
+ try:
+ guid = winreg.EnumKey(interfaces, i)
+ i += 1
+ key = winreg.OpenKey(interfaces, guid)
+ try:
+ if not self._is_nic_enabled(lm, guid):
+ continue
+ self._config_fromkey(key, False)
+ finally:
+ key.Close()
+ except EnvironmentError:
+ break
+ finally:
+ interfaces.Close()
+ finally:
+ lm.Close()
+ return self.info
+
+ def get_dns_info_from_registry():
+ """Extract resolver configuration from the Windows registry."""
+ getter = _RegistryGetter()
+ return getter.get()
+
+ if _have_wmi and _prefer_wmi:
+ _getter_class = _WMIGetter
+ else:
+ _getter_class = _RegistryGetter
+
+ def get_dns_info():
+ """Extract resolver configuration."""
+ getter = _getter_class()
+ return getter.get()
+
diff --git a/tests/test_resolver.py b/tests/test_resolver.py
index eb83893..75822d9 100644
--- a/tests/test_resolver.py
+++ b/tests/test_resolver.py
@@ -886,15 +886,13 @@ class ResolverMiscTestCase(unittest.TestCase):
# not raising is the test
res._compute_timeout(now + 0.5)
- def test_configure_win32_domain(self):
- # This is a win32-related test but it works on all platforms so we
- # test it that way to make coverage analysis easier.
- n = dns.name.from_text('home.')
- res = dns.resolver.Resolver(configure=False)
- res._config_win32_domain('home')
- self.assertEqual(res.domain, n)
- res._config_win32_domain('.home')
- self.assertEqual(res.domain, n)
+ if sys.platform == 'win32':
+ def test_configure_win32_domain(self):
+ # This is a win32-related test but it works on all platforms so we
+ # test it that way to make coverage analysis easier.
+ n = dns.name.from_text('home.')
+ self.assertEqual(n, dns.win32util._config_domain('home'))
+ self.assertEqual(n, dns.win32util._config_domain('.home'))
class ResolverNameserverValidTypeTestCase(unittest.TestCase):