zonetools/autorev2

298 lines
11 KiB
Python
Executable File

#!/usr/bin/env python3
import dokuwiki
import zonetoolsconf
from glob import glob
import pathlib
import re
import sys
import datetime
class RevDuplicateAddress(Exception):
pass
class RevZoneMissing(Exception):
pass
class RevInvalidAddress(Exception):
pass
class Rev:
def __init__(self):
self.revzones = {}
self.revmasks = {}
def _check_zone(self, zone):
if zone not in self.revzones:
raise RevZoneMissing(msg=f"rev zone not found for {zone}")
def add_zone(self, zone):
if not re.match(r"(\d+)(\.\d+){0,2}", zone):
raise RevInvalidAddress(f"invalid address for rev zone: {zone}")
if zone not in self.revzones:
self.revzones[zone] = {
'ptr': {},
'other': [],
'info': ""
}
self.revmasks[zone] = len(re.split(r"\.", zone))
#print(f"added zone '{zone}' mask {self.revmasks[zone]}")
def add_zone_info(self, zone, info):
self._check_zone(zone)
self.revzones[zone]['info'] += info
#print(f"zoneinfo {zone} - {info}")
def ptr(self, zone, addr, name, network = "", noauto = False, generate = False, wikionly = False):
self._check_zone(zone)
if not (ipbyte := re.match(r"(\d+)\.(\d+)\.(\d+)\.(\d+)", addr)):
raise RevInvalidAddress(f"invalid address: {addr} ({name} / rev zone: {zone})")
sortable_ip = f"{int(ipbyte[1]):03}.{int(ipbyte[2]):03}.{int(ipbyte[3]):03}.{int(ipbyte[4]):03}"
if sortable_ip in self.revzones[zone]['ptr']:
raise RevDuplicateAddress(f"duplicate address: {addr}: {self.revzones[zone]['ptr'][sortable_ip]['name']} / {name}")
ptr = ""
for x in range(4, self.revmasks[zone], -1):
ptr += f"{ipbyte[x]}."
ptr = re.sub(r"\.$", "", ptr)
self.revzones[zone]['ptr'][sortable_ip] = {
'addr': addr,
'ptr': ptr,
'name': name,
'network': network,
'noauto': noauto,
'generate': generate,
'wikionly': wikionly
}
#print(f"{zone} {addr} {ptr}")
def record(self, zone, record):
self._check_zone(zone)
self.revzones[zone]['other'].append(record)
def zones(self):
return self.revzones.keys()
def zone_autoptr(self, zone):
self._check_zone(zone)
other = self.revzones[zone]['other']
ptr = []
for ip in sorted(self.revzones[zone]['ptr']):
rec = self.revzones[zone]['ptr'][ip]
if rec['noauto'] or rec['wikionly']:
continue;
ptr.append(f"{rec['ptr']}\t\tIN PTR\t{rec['name']}")
return other + ptr
def zone_wiki(self, zone):
self._check_zone(zone)
other = self.revzones[zone]['other']
wiki = [f"^ {zone} - {self.revzones[zone]['info']} ^^^",
f"^IP cím^név^hálózat^"]
last_addr = None
for ip in sorted(self.revzones[zone]['ptr']):
rec = self.revzones[zone]['ptr'][ip]
addr, name = rec['addr'], rec['name']
if last_addr and re.sub(r"(\d+)$", lambda r : str(int(r[1])+1), last_addr) != addr:
wiki.append("| |||")
last_addr = addr
if rec['generate']:
if rec['generate'] == 'first':
gen_first_name = name
gen_first_addr = addr
continue
elif rec['generate'] == 'middle':
continue
elif rec['generate'] == 'last':
name = f"{gen_first_name}\\\\ ...\\\\ {name}"
addr = f"{gen_first_addr}\\\\ ...\\\\ {addr}"
network = re.sub(r'(v\d+)', '**\\1**', rec['network'])
if rec['noauto']:
network = "**M** "+ network
if rec['wikionly']:
addr = f"//{addr}//"
name = f"//{name}//"
network = f"//{network}//"
wiki.append(f"|{addr}|{name}|{network}|")
return wiki
def read_zone(zonefile, rev):
with open(zonefile, 'r') as f:
origin = None
revzone = None
network = ""
network_ptrs = False
while line := f.readline():
if r := re.match(r"\s*\$ORIGIN\s*(\S*)", line):
origin = r[1]
elif r := re.match(r"\s*;\s*\[autorev\s*=\s*(\S*)\s*\]", line):
revzone = r[1]
if revzone == 'off':
revzone = None
continue
if not re.match(r"\d+(\.\d+)*$", revzone):
print(f"invalid autorev: {revzone}", file=sys.stderr)
exit(1)
rev.add_zone(revzone)
elif r:= re.match(r"\s*;.*?\[net(work)?\]\s*(.*)", line):
prev_network = network
network = r[2]
if prev_network and not network_ptrs:
# no ptr records in previous network
if r:= re.match(r"[0-9./]+", prev_network):
try:
rev.ptr(revzone, r[0], f"(még üres hálózat)", network=prev_network, wikionly=True)
except:
pass
network_ptrs = False
elif r:= re.match(r"\s*;.*?\[vlan\]\s*(.*)", line):
network = r[1]
network_ptrs = False
print(f"warning: legacy [vlan] in zone file {zonefile}")
if not revzone or not origin:
continue
if r := re.match(r"\s*([a-z0-9.-]+)\s+IN\s+A\s+([0-9.]+)(.*)", line, flags=re.I):
name, addr, extra = r[1], r[2], r[3]
if re.search(r"\[no_autorev\]", extra):
continue
try:
rev.ptr(revzone, addr, f"{name}.{origin}", network=network)
network_ptrs = True
except (RevDuplicateAddress, RevInvalidAddress) as e:
print(e, file=sys.stderr)
exit(1)
elif r := re.match(r"\s*\$GENERATE\s+(\d+)-(\d+)\s+([a-z0-9.$-]+)\s+A\s+([0-9.$]+)", line, flags=re.I):
begin, end, name_pattern, addr_pattern = int(r[1]), int(r[2]), r[3], r[4]
if begin < 0 or begin >= end or end > 255:
raise RevInvalidAddress(f"invalid generate range: {begin}-{end}")
for i in range(begin, end + 1):
name = re.sub(r"\$", str(i), name_pattern)
addr = re.sub(r"\$", str(i), addr_pattern)
if i == begin:
gen = 'first'
elif i == end:
gen = 'last'
else:
gen = 'middle'
try:
rev.ptr(revzone, addr, f"{name}.{origin}", network=network, generate=gen)
network_ptrs = True
except (RevDuplicateAddress, RevInvalidAddress) as e:
print(e, file=sys.stderr)
exit(1)
elif r := re.match(r"\s*;\s*\[autorev\]\s*(.*)", line):
rev.record(revzone, r[1])
def process_oldrev(oldrev, revfile, rev):
newrev = ""
revzone = None
for line in re.split(r"\n", oldrev):
if re.match(r";\s*\[autorev begin\]", line):
return newrev
if r := re.match(r"\s*\$ORIGIN\s+([0-9.]+)\.in-addr\.arpa\.", line, flags=re.I):
revzone = ".".join(reversed(re.split(r"\.", r[1])))
#print(f"revzone: {revzone}")
elif r := re.match(r"\s*;\s*\[info\]\s*(.*)", line):
rev.add_zone_info(revzone, r[1])
elif r := re.match(r"\s*([0-9.]+)\s+IN\s+PTR\s+(\S+)", line, flags=re.I):
if not revzone:
print(f"PTR record before $ORIGIN in: {revfile}")
exit(1)
ip = revzone +"."+ ".".join(reversed(re.split(r"\.", r[1])))
#print(f"ptr: {ip} {r[2]}")
try:
rev.ptr(revzone, ip, r[2], noauto=True)
except (RevDuplicateAddress, RevInvalidAddress) as e:
print(e, file=sys.stderr)
exit(1)
newrev += line +"\n"
print(f"autorev begin marker not found in: {revfile}", file=sys.stderr)
exit(1)
def serial_incr(zone, zonefile):
if not (r := re.search(r"IN\s+SOA.*?\s+(\d{1,10})\s", zone)):
print(f"no SOA record found in {zonefile}", file=sys.stderr)
oldserial = r[1]
if len(oldserial) == 9:
revdigits = 1
revbase = 10
elif len(oldserial) == 10:
revdigits = 2
revbase = 100
else:
return str(int(oldserial) + 1)
newdate = int(datetime.datetime.now().strftime("%Y%m%d"))
olddate = int(oldserial[0:8])
oldrev = int(oldserial[8:])
if olddate < newdate:
# newdate marad
newrev = 1
elif olddate == newdate:
# newdate marad
newrev = oldrev + 1
else: # olddate > newdate
newdate = olddate
newrev = oldrev + 1
if newrev >= revbase:
newdate += 1
newrev -= revbase
if revdigits == 1:
newserial = f"{newdate}{newrev}"
else:
newserial = f"{newdate}{newrev:02}"
#print(f"serial {oldserial} -> {newserial}")
return re.sub(r"(IN\s+SOA.*?\s+)"+ oldserial, f"\\g<1>{newserial}", zone)
def dokuwiki_update(text):
#print(f"dokuwiki url: {conf.dokuwiki_url}")
try:
wiki = dokuwiki.DokuWiki(conf.dokuwiki_url, conf.dokuwiki_user, conf.dokuwiki_passwd, cookieAuth=True)
except (dokuwiki.DokuWikiError, Exception) as err:
print(f"Cannot connect to wiki: {err}")
exit(1)
try:
wiki.pages.set(conf.dokuwiki_page, text)
except (dokuwiki.DokuWikiError, Exception) as err:
print(f"Cannot update wiki page: {err}")
exit(1)
conf = zonetoolsconf.Conf()
rev = Rev()
for zf in glob(f"{conf.bind_path}/{conf.zone_glob}"):
read_zone(zf, rev=rev)
for z in rev.zones():
revfile = pathlib.Path(f"{conf.bind_path}/{conf.autorev_base}{z}")
if not revfile.is_file():
print(f"autorev file does not exist: {revfile}", file=sys.stderr)
exit(1)
oldrev = revfile.read_text()
newrev = process_oldrev(oldrev, revfile, rev=rev)
newrev += "; [autorev begin]\n" + "\n".join(rev.zone_autoptr(z)) + "\n"
if newrev == oldrev:
continue
newrev = serial_incr(newrev, revfile)
revfile.write_text(newrev)
print(f"{revfile} modified.")
if conf.dokuwiki_url:
wikitext = ""
for z in rev.zones():
wikitext += "\n".join(rev.zone_wiki(z)) +"\n\n"
dokuwiki_update(wikitext)
# vim: set tabstop=4 shiftwidth=4 expandtab smarttab: