summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsebres <serg.brester@sebres.de>2023-01-11 18:27:44 +0100
committersebres <serg.brester@sebres.de>2023-01-11 18:27:44 +0100
commit582436aadf310ff2db4d0dc76668c5fa7e5a3c03 (patch)
treef41612d45a97a596e74e51cda795ed44ba0f1b97
parentcb8674e68a4f2752132443d3be36462078339673 (diff)
downloadfail2ban-582436aadf310ff2db4d0dc76668c5fa7e5a3c03.tar.gz
don't add subnets to local addresses of `ignoreself` from network interfaces, use only IPs instead (subnets may be too heavy and not wanted, todo: make it configurable later)
-rw-r--r--fail2ban/server/ipdns.py45
-rw-r--r--fail2ban/tests/filtertestcase.py19
2 files changed, 41 insertions, 23 deletions
diff --git a/fail2ban/server/ipdns.py b/fail2ban/server/ipdns.py
index 8c5c277e..b435c6df 100644
--- a/fail2ban/server/ipdns.py
+++ b/fail2ban/server/ipdns.py
@@ -669,13 +669,28 @@ IPAddr.IP6_4COMPAT = IPAddr("::ffff:0:0", 96)
class IPAddrSet(set):
+ hasSubNet = False
+
+ def __init__(self, ips=[]):
+ ips2 = set()
+ for ip in ips:
+ if not isinstance(ip, IPAddr): ip = IPAddr(ip)
+ ips2.add(ip)
+ self.hasSubNet |= not ip.isSingle
+ set.__init__(self, ips2)
+
+ def add(self, ip):
+ if not isinstance(ip, IPAddr): ip = IPAddr(ip)
+ self.hasSubNet |= not ip.isSingle
+ set.add(self, ip)
+
def __contains__(self, ip):
if not isinstance(ip, IPAddr): ip = IPAddr(ip)
# IP can be found directly or IP is in each subnet:
- return set.__contains__(self, ip) or any(n.contains(ip) for n in self)
+ return set.__contains__(self, ip) or (self.hasSubNet and any(n.contains(ip) for n in self))
-def _NetworkInterfacesAddrs():
+def _NetworkInterfacesAddrs(withMask=False):
# Closure implementing lazy load modules and libc and define _NetworkInterfacesAddrs on demand:
# Currently tested on Linux only (TODO: implement for MacOS, Solaris, etc)
@@ -735,28 +750,30 @@ def _NetworkInterfacesAddrs():
break
ifa = ifa.ifa_next.contents
- def getfamaddr(ifa):
+ def getfamaddr(ifa, withMask=False):
sa = ifa.ifa_addr.contents
fam = sa.sa_family
if fam == socket.AF_INET:
sa = cast(pointer(sa), POINTER(struct_sockaddr_in)).contents
addr = socket.inet_ntop(fam, sa.sin_addr)
- nm = ifa.ifa_netmask.contents
- if nm is not None and nm.sa_family == socket.AF_INET:
- nm = cast(pointer(nm), POINTER(struct_sockaddr_in)).contents
- addr += '/'+socket.inet_ntop(fam, nm.sin_addr)
+ if withMask:
+ nm = ifa.ifa_netmask.contents
+ if nm is not None and nm.sa_family == socket.AF_INET:
+ nm = cast(pointer(nm), POINTER(struct_sockaddr_in)).contents
+ addr += '/'+socket.inet_ntop(fam, nm.sin_addr)
return IPAddr(addr)
elif fam == socket.AF_INET6:
sa = cast(pointer(sa), POINTER(struct_sockaddr_in6)).contents
addr = socket.inet_ntop(fam, sa.sin6_addr)
- nm = ifa.ifa_netmask.contents
- if nm is not None and nm.sa_family == socket.AF_INET6:
- nm = cast(pointer(nm), POINTER(struct_sockaddr_in6)).contents
- addr += '/'+socket.inet_ntop(fam, nm.sin6_addr)
+ if withMask:
+ nm = ifa.ifa_netmask.contents
+ if nm is not None and nm.sa_family == socket.AF_INET6:
+ nm = cast(pointer(nm), POINTER(struct_sockaddr_in6)).contents
+ addr += '/'+socket.inet_ntop(fam, nm.sin6_addr)
return IPAddr(addr)
return None
- def _NetworkInterfacesAddrs():
+ def _NetworkInterfacesAddrs(withMask=False):
ifap = POINTER(struct_ifaddrs)()
result = libc.getifaddrs(pointer(ifap))
if result != 0:
@@ -765,7 +782,7 @@ def _NetworkInterfacesAddrs():
try:
for ifa in ifap_iter(ifap):
name = ifa.ifa_name.decode("UTF-8")
- addr = getfamaddr(ifa)
+ addr = getfamaddr(ifa, withMask)
if addr:
yield name, addr
finally:
@@ -777,6 +794,6 @@ def _NetworkInterfacesAddrs():
raise _init_error
DNSUtils._NetworkInterfacesAddrs = staticmethod(_NetworkInterfacesAddrs);
- return _NetworkInterfacesAddrs()
+ return _NetworkInterfacesAddrs(withMask)
DNSUtils._NetworkInterfacesAddrs = staticmethod(_NetworkInterfacesAddrs);
diff --git a/fail2ban/tests/filtertestcase.py b/fail2ban/tests/filtertestcase.py
index 9f96c190..4e308e38 100644
--- a/fail2ban/tests/filtertestcase.py
+++ b/fail2ban/tests/filtertestcase.py
@@ -2334,15 +2334,16 @@ class DNSUtilsNetworkTests(unittest.TestCase):
ip1 = IPAddr('2606:2800:220:1:248:1893:25c8:1946'); ip2 = IPAddr('2606:2800:220:1:248:1893:25c8:1946'); self.assertEqual(id(ip1), id(ip2))
def test_NetworkInterfacesAddrs(self):
- try:
- ips = IPAddrSet([a for ni, a in DNSUtils._NetworkInterfacesAddrs()])
- ip = IPAddr('127.0.0.1')
- self.assertEqual(ip in ips, any(ip in n for n in ips))
- ip = IPAddr('::1')
- self.assertEqual(ip in ips, any(ip in n for n in ips))
- except Exception as e: # pragma: no cover
- # simply skip if not available, TODO: make coverage platform dependent
- raise unittest.SkipTest(e)
+ for withMask in (False, True):
+ try:
+ ips = IPAddrSet([a for ni, a in DNSUtils._NetworkInterfacesAddrs(withMask)])
+ ip = IPAddr('127.0.0.1')
+ self.assertEqual(ip in ips, any(ip in n for n in ips))
+ ip = IPAddr('::1')
+ self.assertEqual(ip in ips, any(ip in n for n in ips))
+ except Exception as e: # pragma: no cover
+ # simply skip if not available, TODO: make coverage platform dependent
+ raise unittest.SkipTest(e)
def test_IPAddrSet(self):
ips = IPAddrSet([IPAddr('192.0.2.1/27'), IPAddr('2001:DB8::/32')])