summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsebres <serg.brester@sebres.de>2023-01-10 12:20:48 +0100
committersebres <serg.brester@sebres.de>2023-01-10 12:20:48 +0100
commitcb8674e68a4f2752132443d3be36462078339673 (patch)
tree12b151476709590ef1622b5f7bd26522d2d95dd1
parent09c23fd5b8593cecc1744db3d6e0dba7de67480c (diff)
downloadfail2ban-cb8674e68a4f2752132443d3be36462078339673.tar.gz
amend with few improvements, IPv6IsAllowed prefers IPs from network interfaces (if available for platform) and uses DNS (socket.getaddrinfo) as a fallback only
-rw-r--r--fail2ban/server/ipdns.py225
-rw-r--r--fail2ban/tests/filtertestcase.py14
2 files changed, 139 insertions, 100 deletions
diff --git a/fail2ban/server/ipdns.py b/fail2ban/server/ipdns.py
index 75e21a31..8c5c277e 100644
--- a/fail2ban/server/ipdns.py
+++ b/fail2ban/server/ipdns.py
@@ -92,7 +92,7 @@ class DNSUtils:
# retrieve ips
ips = set()
saveerr = None
- for fam in ((socket.AF_INET,socket.AF_INET6) if DNSUtils.IPv6IsAllowed(True) else (socket.AF_INET,)):
+ for fam in ((socket.AF_INET,socket.AF_INET6) if DNSUtils.IPv6IsAllowed() else (socket.AF_INET,)):
try:
for result in socket.getaddrinfo(dns, None, fam, 0, socket.IPPROTO_TCP):
# if getaddrinfo returns something unexpected:
@@ -188,6 +188,25 @@ class DNSUtils:
DNSUtils.CACHE_ipToName.set(DNSUtils._getSelfNames_key, names)
return names
+ # key to find cached network interfaces IPs (this tuple-key cannot be used elsewhere):
+ _getNetIntrfIPs_key = ('netintrf','ips')
+
+ @staticmethod
+ def getNetIntrfIPs():
+ """Get own IP addresses of self"""
+ # to find cached own IPs:
+ ips = DNSUtils.CACHE_nameToIp.get(DNSUtils._getNetIntrfIPs_key)
+ if ips is not None:
+ return ips
+ # try to obtain from network interfaces if possible (implemented for this platform):
+ try:
+ ips = IPAddrSet([a for ni, a in DNSUtils._NetworkInterfacesAddrs()])
+ except:
+ ips = IPAddrSet()
+ # cache and return :
+ DNSUtils.CACHE_nameToIp.set(DNSUtils._getNetIntrfIPs_key, ips)
+ return ips
+
# key to find cached own IPs (this tuple-key cannot be used elsewhere):
_getSelfIPs_key = ('self','ips')
@@ -199,14 +218,11 @@ class DNSUtils:
if ips is not None:
return ips
# firstly try to obtain from network interfaces if possible (implemented for this platform):
- try:
- ips = IPAddrSet([a for ni, a in DNSUtils._NetworkInterfacesAddrs()])
- except:
- ips = IPAddrSet()
+ ips = IPAddrSet(DNSUtils.getNetIntrfIPs())
# extend it using different ways (a set with IPs of localhost, hostname, fully qualified):
for hostname in DNSUtils.getSelfNames():
try:
- ips |= IPAddrSet(DNSUtils.textToIp(hostname, 'yes'))
+ ips |= IPAddrSet(DNSUtils.dnsToIp(hostname))
except Exception as e: # pragma: no cover
logSys.warning("Retrieving own IPs of %s failed: %s", hostname, e)
# cache and return :
@@ -257,7 +273,7 @@ class DNSUtils:
_IPv6IsAllowed_key = ('self','ipv6-allowed')
@staticmethod
- def IPv6IsAllowed(knownOnly=False):
+ def IPv6IsAllowed():
if DNSUtils._IPv6IsAllowed is not None:
return DNSUtils._IPv6IsAllowed
v = DNSUtils.CACHE_nameToIp.get(DNSUtils._IPv6IsAllowed_key)
@@ -265,11 +281,15 @@ class DNSUtils:
return v
v = DNSUtils._IPv6IsSupportedBySystem()
if v is None:
- # avoid self recursion (and assume we may have IPv6 during auto-detection):
- if knownOnly:
- return True
# detect by IPs of host:
- v = any((':' in ip.ntoa) for ip in DNSUtils.getSelfIPs())
+ ips = DNSUtils.getNetIntrfIPs()
+ if not ips:
+ DNSUtils._IPv6IsAllowed = True; # avoid self recursion from getSelfIPs -> dnsToIp -> IPv6IsAllowed
+ try:
+ ips = DNSUtils.getSelfIPs()
+ finally:
+ DNSUtils._IPv6IsAllowed = None
+ v = any((':' in ip.ntoa) for ip in ips)
DNSUtils.CACHE_nameToIp.set(DNSUtils._IPv6IsAllowed_key, v)
return v
@@ -659,95 +679,102 @@ def _NetworkInterfacesAddrs():
# Closure implementing lazy load modules and libc and define _NetworkInterfacesAddrs on demand:
# Currently tested on Linux only (TODO: implement for MacOS, Solaris, etc)
+ try:
+ from ctypes import (
+ Structure, Union, POINTER,
+ pointer, get_errno, cast,
+ c_ushort, c_byte, c_void_p, c_char_p, c_uint, c_int, c_uint16, c_uint32
+ )
+ import ctypes.util
+ import ctypes
+
+ class struct_sockaddr(Structure):
+ _fields_ = [
+ ('sa_family', c_ushort),
+ ('sa_data', c_byte * 14),]
+
+ class struct_sockaddr_in(Structure):
+ _fields_ = [
+ ('sin_family', c_ushort),
+ ('sin_port', c_uint16),
+ ('sin_addr', c_byte * 4)]
+
+ class struct_sockaddr_in6(Structure):
+ _fields_ = [
+ ('sin6_family', c_ushort),
+ ('sin6_port', c_uint16),
+ ('sin6_flowinfo', c_uint32),
+ ('sin6_addr', c_byte * 16),
+ ('sin6_scope_id', c_uint32)]
+
+ class union_ifa_ifu(Union):
+ _fields_ = [
+ ('ifu_broadaddr', POINTER(struct_sockaddr)),
+ ('ifu_dstaddr', POINTER(struct_sockaddr)),]
+
+ class struct_ifaddrs(Structure):
+ pass
+ struct_ifaddrs._fields_ = [
+ ('ifa_next', POINTER(struct_ifaddrs)),
+ ('ifa_name', c_char_p),
+ ('ifa_flags', c_uint),
+ ('ifa_addr', POINTER(struct_sockaddr)),
+ ('ifa_netmask', POINTER(struct_sockaddr)),
+ ('ifa_ifu', union_ifa_ifu),
+ ('ifa_data', c_void_p),]
+
+ libc = ctypes.CDLL(ctypes.util.find_library('c') or "")
+ if not libc.getifaddrs: # pragma: no cover
+ raise NotImplementedError('libc.getifaddrs is not available')
+
+ def ifap_iter(ifap):
+ ifa = ifap.contents
+ while True:
+ yield ifa
+ if not ifa.ifa_next:
+ break
+ ifa = ifa.ifa_next.contents
+
+ def getfamaddr(ifa):
+ sa = ifa.ifa_addr.contents
+ fam = sa.sa_family
+ if fam == socket.AF_INET:
+ sa = cast(pointer(sa), POINTER(struct_sockaddr_in)).contents
+ addr = socket.inet_ntop(fam, sa.sin_addr)
+ nm = ifa.ifa_netmask.contents
+ if nm is not None and nm.sa_family == socket.AF_INET:
+ nm = cast(pointer(nm), POINTER(struct_sockaddr_in)).contents
+ addr += '/'+socket.inet_ntop(fam, nm.sin_addr)
+ return IPAddr(addr)
+ elif fam == socket.AF_INET6:
+ sa = cast(pointer(sa), POINTER(struct_sockaddr_in6)).contents
+ addr = socket.inet_ntop(fam, sa.sin6_addr)
+ nm = ifa.ifa_netmask.contents
+ if nm is not None and nm.sa_family == socket.AF_INET6:
+ nm = cast(pointer(nm), POINTER(struct_sockaddr_in6)).contents
+ addr += '/'+socket.inet_ntop(fam, nm.sin6_addr)
+ return IPAddr(addr)
+ return None
- from ctypes import (
- Structure, Union, POINTER,
- pointer, get_errno, cast,
- c_ushort, c_byte, c_void_p, c_char_p, c_uint, c_int, c_uint16, c_uint32
- )
- import ctypes.util
- import ctypes
-
- class struct_sockaddr(Structure):
- _fields_ = [
- ('sa_family', c_ushort),
- ('sa_data', c_byte * 14),]
-
- class struct_sockaddr_in(Structure):
- _fields_ = [
- ('sin_family', c_ushort),
- ('sin_port', c_uint16),
- ('sin_addr', c_byte * 4)]
-
- class struct_sockaddr_in6(Structure):
- _fields_ = [
- ('sin6_family', c_ushort),
- ('sin6_port', c_uint16),
- ('sin6_flowinfo', c_uint32),
- ('sin6_addr', c_byte * 16),
- ('sin6_scope_id', c_uint32)]
-
- class union_ifa_ifu(Union):
- _fields_ = [
- ('ifu_broadaddr', POINTER(struct_sockaddr)),
- ('ifu_dstaddr', POINTER(struct_sockaddr)),]
-
- class struct_ifaddrs(Structure):
- pass
- struct_ifaddrs._fields_ = [
- ('ifa_next', POINTER(struct_ifaddrs)),
- ('ifa_name', c_char_p),
- ('ifa_flags', c_uint),
- ('ifa_addr', POINTER(struct_sockaddr)),
- ('ifa_netmask', POINTER(struct_sockaddr)),
- ('ifa_ifu', union_ifa_ifu),
- ('ifa_data', c_void_p),]
-
- libc = ctypes.CDLL(ctypes.util.find_library('c'))
-
- def ifap_iter(ifap):
- ifa = ifap.contents
- while True:
- yield ifa
- if not ifa.ifa_next:
- break
- ifa = ifa.ifa_next.contents
-
- def getfamaddr(ifa):
- sa = ifa.ifa_addr.contents
- fam = sa.sa_family
- if fam == socket.AF_INET:
- sa = cast(pointer(sa), POINTER(struct_sockaddr_in)).contents
- addr = socket.inet_ntop(fam, sa.sin_addr)
- nm = ifa.ifa_netmask.contents
- if nm is not None and nm.sa_family == socket.AF_INET:
- nm = cast(pointer(nm), POINTER(struct_sockaddr_in)).contents
- addr += '/'+socket.inet_ntop(fam, nm.sin_addr)
- return IPAddr(addr)
- elif fam == socket.AF_INET6:
- sa = cast(pointer(sa), POINTER(struct_sockaddr_in6)).contents
- addr = socket.inet_ntop(fam, sa.sin6_addr)
- nm = ifa.ifa_netmask.contents
- if nm is not None and nm.sa_family == socket.AF_INET6:
- nm = cast(pointer(nm), POINTER(struct_sockaddr_in6)).contents
- addr += '/'+socket.inet_ntop(fam, nm.sin6_addr)
- return IPAddr(addr)
- return None
-
- def _NetworkInterfacesAddrs():
- ifap = POINTER(struct_ifaddrs)()
- result = libc.getifaddrs(pointer(ifap))
- if result != 0:
- raise OSError(get_errno())
- del result
- try:
- for ifa in ifap_iter(ifap):
- name = ifa.ifa_name.decode("UTF-8")
- addr = getfamaddr(ifa)
- if addr:
- yield name, addr
- finally:
- libc.freeifaddrs(ifap)
+ def _NetworkInterfacesAddrs():
+ ifap = POINTER(struct_ifaddrs)()
+ result = libc.getifaddrs(pointer(ifap))
+ if result != 0:
+ raise OSError(get_errno())
+ del result
+ try:
+ for ifa in ifap_iter(ifap):
+ name = ifa.ifa_name.decode("UTF-8")
+ addr = getfamaddr(ifa)
+ if addr:
+ yield name, addr
+ finally:
+ libc.freeifaddrs(ifap)
+
+ except Exception as e: # pragma: no cover
+ _init_error = NotImplementedError(e)
+ def _NetworkInterfacesAddrs():
+ raise _init_error
DNSUtils._NetworkInterfacesAddrs = staticmethod(_NetworkInterfacesAddrs);
return _NetworkInterfacesAddrs()
diff --git a/fail2ban/tests/filtertestcase.py b/fail2ban/tests/filtertestcase.py
index 24f5272e..9f96c190 100644
--- a/fail2ban/tests/filtertestcase.py
+++ b/fail2ban/tests/filtertestcase.py
@@ -2333,6 +2333,17 @@ class DNSUtilsNetworkTests(unittest.TestCase):
ip1 = IPAddr('93.184.216.34'); ip2 = IPAddr('93.184.216.34'); self.assertEqual(id(ip1), id(ip2))
ip1 = IPAddr('2606:2800:220:1:248:1893:25c8:1946'); ip2 = IPAddr('2606:2800:220:1:248:1893:25c8:1946'); self.assertEqual(id(ip1), id(ip2))
+ def test_NetworkInterfacesAddrs(self):
+ try:
+ ips = IPAddrSet([a for ni, a in DNSUtils._NetworkInterfacesAddrs()])
+ ip = IPAddr('127.0.0.1')
+ self.assertEqual(ip in ips, any(ip in n for n in ips))
+ ip = IPAddr('::1')
+ self.assertEqual(ip in ips, any(ip in n for n in ips))
+ except Exception as e: # pragma: no cover
+ # simply skip if not available, TODO: make coverage platform dependent
+ raise unittest.SkipTest(e)
+
def test_IPAddrSet(self):
ips = IPAddrSet([IPAddr('192.0.2.1/27'), IPAddr('2001:DB8::/32')])
self.assertTrue(IPAddr('192.0.2.1') in ips)
@@ -2347,7 +2358,7 @@ class DNSUtilsNetworkTests(unittest.TestCase):
if cov == 'dns': # mock-up _NetworkInterfacesAddrs like it's not implemented (raises error)
_org_NetworkInterfacesAddrs = DNSUtils._NetworkInterfacesAddrs
def _tmp_NetworkInterfacesAddrs():
- raise NotImplementedError();
+ raise NotImplementedError()
DNSUtils._NetworkInterfacesAddrs = staticmethod(_tmp_NetworkInterfacesAddrs)
try:
ips = DNSUtils.getSelfIPs()
@@ -2364,6 +2375,7 @@ class DNSUtilsNetworkTests(unittest.TestCase):
DNSUtils._NetworkInterfacesAddrs = staticmethod(_org_NetworkInterfacesAddrs)
if cov != 'last':
DNSUtils.CACHE_nameToIp.unset(DNSUtils._getSelfIPs_key)
+ DNSUtils.CACHE_nameToIp.unset(DNSUtils._getNetIntrfIPs_key)
def testFQDN(self):
unittest.F2B.SkipIfNoNetwork()