#!/usr/bin/env python3 import dokuwiki import zonetoolsconf from glob import glob import pathlib import re import sys import datetime class RevDuplicateAddress(Exception): pass class RevZoneMissing(Exception): pass class RevInvalidAddress(Exception): pass class Rev: def __init__(self): self.revzones = {} self.revmasks = {} def _check_zone(self, zone): if zone not in self.revzones: raise RevZoneMissing(msg=f"rev zone not found for {zone}") def add_zone(self, zone): if not re.match(r"(\d+)(\.\d+){0,2}", zone): raise RevInvalidAddress(f"invalid address for rev zone: {zone}") if zone not in self.revzones: self.revzones[zone] = { 'ptr': {}, 'other': [], 'info': "" } self.revmasks[zone] = len(re.split(r"\.", zone)) #print(f"added zone '{zone}' mask {self.revmasks[zone]}") def add_zone_info(self, zone, info): self._check_zone(zone) self.revzones[zone]['info'] += info #print(f"zoneinfo {zone} - {info}") def ptr(self, zone, addr, name, network = "", noauto = False, generate = False, wikionly = False): self._check_zone(zone) if not (ipbyte := re.match(r"(\d+)\.(\d+)\.(\d+)\.(\d+)", addr)): raise RevInvalidAddress(f"invalid address: {addr} ({name} / rev zone: {zone})") sortable_ip = f"{int(ipbyte[1]):03}.{int(ipbyte[2]):03}.{int(ipbyte[3]):03}.{int(ipbyte[4]):03}" if sortable_ip in self.revzones[zone]['ptr']: raise RevDuplicateAddress(f"duplicate address: {addr}: {self.revzones[zone]['ptr'][sortable_ip]['name']} / {name}") ptr = "" for x in range(4, self.revmasks[zone], -1): ptr += f"{ipbyte[x]}." ptr = re.sub(r"\.$", "", ptr) self.revzones[zone]['ptr'][sortable_ip] = { 'addr': addr, 'ptr': ptr, 'name': name, 'network': network, 'noauto': noauto, 'generate': generate, 'wikionly': wikionly } #print(f"{zone} {addr} {ptr}") def record(self, zone, record): self._check_zone(zone) self.revzones[zone]['other'].append(record) def zones(self): return self.revzones.keys() def zone_autoptr(self, zone): self._check_zone(zone) other = self.revzones[zone]['other'] ptr = [] for ip in sorted(self.revzones[zone]['ptr']): rec = self.revzones[zone]['ptr'][ip] if rec['noauto'] or rec['wikionly']: continue; ptr.append(f"{rec['ptr']}\t\tPTR\t{rec['name']}") return other + ptr def zone_wiki(self, zone): self._check_zone(zone) other = self.revzones[zone]['other'] wiki = [f"^ {zone} - {self.revzones[zone]['info']} ^^^", f"^IP cím^név^hálózat^"] last_addr = None for ip in sorted(self.revzones[zone]['ptr']): rec = self.revzones[zone]['ptr'][ip] addr, name = rec['addr'], rec['name'] if last_addr and re.sub(r"(\d+)$", lambda r : str(int(r[1])+1), last_addr) != addr: wiki.append("| |||") last_addr = addr if rec['generate']: if rec['generate'] == 'first': gen_first_name = name gen_first_addr = addr continue elif rec['generate'] == 'middle': continue elif rec['generate'] == 'last': name = f"{gen_first_name}\\\\ ...\\\\ {name}" addr = f"{gen_first_addr}\\\\ ...\\\\ {addr}" network = re.sub(r'(v\d+)', '**\\1**', rec['network']) if rec['noauto']: network = "**M** "+ network if rec['wikionly']: addr = f"//{addr}//" name = f"//{name}//" network = f"//{network}//" wiki.append(f"|{addr}|{name}|{network}|") return wiki def read_zone(zonefile, rev): with open(zonefile, 'r') as f: origin = None revzone = None network = "" network_ptrs = False while line := f.readline(): if r := re.match(r"\s*\$ORIGIN\s*(\S*)", line): origin = r[1] elif r := re.match(r"\s*;\s*\[autorev\s*=\s*(\S*)\s*\]", line): revzone = r[1] if revzone == 'off': revzone = None continue if not re.match(r"\d+(\.\d+)*$", revzone): print(f"invalid autorev: {revzone}", file=sys.stderr) exit(1) rev.add_zone(revzone) elif r:= re.match(r"\s*;.*?\[net(work)?\]\s*(.*)", line): prev_network = network network = r[2] if prev_network and not network_ptrs: # no ptr records in previous network if r:= re.match(r"[0-9./]+", prev_network): try: rev.ptr(revzone, r[0], f"(még üres hálózat)", network=prev_network, wikionly=True) except: pass network_ptrs = False elif r:= re.match(r"\s*;.*?\[vlan\]\s*(.*)", line): network = r[1] network_ptrs = False print(f"warning: legacy [vlan] in zone file {zonefile}") if not revzone or not origin: continue if r := re.match(r"([a-z0-9.-]+)\s+(IN\s+)?A\s+([0-9.]+)(.*)", line, flags=re.I): name, addr, extra = r[1], r[3], r[4] if re.search(r"\[no_autorev\]", extra): continue try: rev.ptr(revzone, addr, f"{name}.{origin}", network=network) network_ptrs = True except (RevDuplicateAddress, RevInvalidAddress) as e: print(e, file=sys.stderr) exit(1) elif r := re.match(r"\s*\$GENERATE\s+(\d+)-(\d+)\s+([a-z0-9.$-]+)\s+A\s+([0-9.$]+)", line, flags=re.I): begin, end, name_pattern, addr_pattern = int(r[1]), int(r[2]), r[3], r[4] if begin < 0 or begin >= end or end > 255: raise RevInvalidAddress(f"invalid generate range: {begin}-{end}") for i in range(begin, end + 1): name = re.sub(r"\$", str(i), name_pattern) addr = re.sub(r"\$", str(i), addr_pattern) if i == begin: gen = 'first' elif i == end: gen = 'last' else: gen = 'middle' try: rev.ptr(revzone, addr, f"{name}.{origin}", network=network, generate=gen) network_ptrs = True except (RevDuplicateAddress, RevInvalidAddress) as e: print(e, file=sys.stderr) exit(1) elif r := re.match(r"\s*;\s*\[autorev\]\s*(.*)", line): rev.record(revzone, r[1]) def process_oldrev(oldrev, revfile, rev): newrev = "" revzone = None for line in re.split(r"\n", oldrev): if re.match(r";\s*\[autorev begin\]", line): return newrev if r := re.match(r"\s*\$ORIGIN\s+([0-9.]+)\.in-addr\.arpa\.", line, flags=re.I): revzone = ".".join(reversed(re.split(r"\.", r[1]))) #print(f"revzone: {revzone}") elif r := re.match(r"\s*;\s*\[info\]\s*(.*)", line): rev.add_zone_info(revzone, r[1]) elif r := re.match(r"([0-9.]+)\s+(IN\s+)?PTR\s+(\S+)", line, flags=re.I): if not revzone: print(f"PTR record before $ORIGIN in: {revfile}") exit(1) ip = revzone +"."+ ".".join(reversed(re.split(r"\.", r[1]))) #print(f"ptr: {ip} {r[3]}") try: rev.ptr(revzone, ip, r[3], noauto=True) except (RevDuplicateAddress, RevInvalidAddress) as e: print(e, file=sys.stderr) exit(1) newrev += line +"\n" print(f"autorev begin marker not found in: {revfile}", file=sys.stderr) exit(1) def serial_incr(zone, zonefile): if not (r := re.search(r"(IN\s+)?SOA.*?\s+(\d{1,10})\s", zone)): print(f"no SOA record found in {zonefile}", file=sys.stderr) oldserial = r[2] if len(oldserial) == 9: revdigits = 1 revbase = 10 elif len(oldserial) == 10: revdigits = 2 revbase = 100 else: return str(int(oldserial) + 1) newdate = int(datetime.datetime.now().strftime("%Y%m%d")) olddate = int(oldserial[0:8]) oldrev = int(oldserial[8:]) if olddate < newdate: # newdate marad newrev = 1 elif olddate == newdate: # newdate marad newrev = oldrev + 1 else: # olddate > newdate newdate = olddate newrev = oldrev + 1 if newrev >= revbase: newdate += 1 newrev -= revbase if revdigits == 1: newserial = f"{newdate}{newrev}" else: newserial = f"{newdate}{newrev:02}" #print(f"serial {oldserial} -> {newserial}") return re.sub(r"((IN\s+)?SOA.*?\s+)"+ oldserial, f"\\g<1>{newserial}", zone) def dokuwiki_update(text): #print(f"dokuwiki url: {conf.dokuwiki_url}") try: wiki = dokuwiki.DokuWiki(conf.dokuwiki_url, conf.dokuwiki_user, conf.dokuwiki_passwd, cookieAuth=True) except (dokuwiki.DokuWikiError, Exception) as err: print(f"Cannot connect to wiki: {err}") exit(1) try: wiki.pages.set(conf.dokuwiki_page, text) except (dokuwiki.DokuWikiError, Exception) as err: print(f"Cannot update wiki page: {err}") exit(1) conf = zonetoolsconf.Conf() rev = Rev() for zf in glob(f"{conf.bind_path}/{conf.zone_glob}"): read_zone(zf, rev=rev) for z in rev.zones(): revfile = pathlib.Path(f"{conf.bind_path}/{conf.autorev_base}{z}") if not revfile.is_file(): print(f"autorev file does not exist: {revfile}", file=sys.stderr) exit(1) oldrev = revfile.read_text() newrev = process_oldrev(oldrev, revfile, rev=rev) newrev += "; [autorev begin]\n" + "\n".join(rev.zone_autoptr(z)) + "\n" if newrev == oldrev: continue newrev = serial_incr(newrev, revfile) revfile.write_text(newrev) print(f"{revfile} modified.") if conf.dokuwiki_url: wikitext = "" for z in rev.zones(): wikitext += "\n".join(rev.zone_wiki(z)) +"\n\n" dokuwiki_update(wikitext) # vim: set tabstop=4 shiftwidth=4 expandtab smarttab: