zonetools/autorev2

243 lines
8.0 KiB
Python
Executable File

#!/usr/bin/env python3
import dokuwiki
import zonetoolsconf
from glob import glob
import pathlib
import re
import sys
import datetime
class RevDuplicateAddress(Exception):
pass
class RevZoneMissing(Exception):
pass
class RevInvalidAddress(Exception):
pass
class Rev:
def __init__(self):
self.revzones = {}
self.revmasks = {}
def _check_zone(self, zone):
if zone not in self.revzones:
raise RevZoneMissing(msg=f"rev zone not found for {zone}")
def add_zone(self, zone):
if not re.match(r"(\d+)(\.\d+){0,2}", zone):
raise RevInvalidAddress(f"invalid address for rev zone: {zone}")
if zone not in self.revzones:
self.revzones[zone] = {
'ptr': {},
'other': [],
'info': ""
}
self.revmasks[zone] = len(re.split(r"\.", zone))
#print(f"added zone '{zone}' mask {self.revmasks[zone]}")
def add_zone_info(self, zone, info):
self._check_zone(zone)
self.revzones[zone]['info'] += info
#print(f"zoneinfo {zone} - {info}")
def ptr(self, zone, addr, name, vlan = "", noauto = False):
self._check_zone(zone)
if not (ipbyte := re.match(r"(\d+)\.(\d+)\.(\d+)\.(\d+)", addr)):
raise RevInvalidAddress(f"invalid address: {addr} ({name} / rev zone: {zone})")
sortable_ip = f"{int(ipbyte[1]):03}.{int(ipbyte[2]):03}.{int(ipbyte[3]):03}.{int(ipbyte[4]):03}"
if sortable_ip in self.revzones[zone]['ptr']:
raise RevDuplicateAddress(f"duplicate address: {addr}: {self.revzones[zone]['ptr'][sortable_ip]['name']} / {name}")
ptr = ""
for x in range(4, self.revmasks[zone], -1):
ptr += f"{ipbyte[x]}."
ptr = re.sub(r"\.$", "", ptr)
self.revzones[zone]['ptr'][sortable_ip] = {
'addr': addr,
'ptr': ptr,
'name': name,
'vlan': vlan,
'noauto': noauto
}
#print(f"{zone} {addr} {ptr}")
def record(self, zone, record):
self._check_zone(zone)
self.revzones[zone]['other'].append(record)
def zones(self):
return self.revzones.keys()
def zone_autoptr(self, zone):
self._check_zone(zone)
other = self.revzones[zone]['other']
ptr = []
for ip in sorted(self.revzones[zone]['ptr']):
rec = self.revzones[zone]['ptr'][ip]
if rec['noauto']:
continue;
ptr.append(f"{rec['ptr']}\t\tIN PTR\t{rec['name']}")
return other + ptr
def zone_wiki(self, zone):
self._check_zone(zone)
other = self.revzones[zone]['other']
wiki = [f"^ {zone} - {self.revzones[zone]['info']} ^^^",
f"^IP cím^név^vlan^"]
last_addr = None
for ip in sorted(self.revzones[zone]['ptr']):
rec = self.revzones[zone]['ptr'][ip]
if last_addr and re.sub(r"(\d+)$", lambda r : str(int(r[1])+1), last_addr) != rec['addr']:
wiki.append("| |||")
vlan = rec['vlan']
if rec['noauto']:
vlan = "**M** "+ vlan
wiki.append(f"|{rec['addr']}|{rec['name']}|{vlan}|")
last_addr = rec['addr']
return wiki
def read_zone(zonefile, rev):
with open(zonefile, 'r') as f:
origin = None
revzone = None
vlan = ""
while line := f.readline():
if r := re.match(r"\s*\$ORIGIN\s*(\S*)", line):
origin = r[1]
elif r := re.match(r"\s*;\s*\[autorev\s*=\s*(\S*)\s*\]", line):
revzone = r[1]
if revzone == 'off':
revzone = None
continue
if not re.match(r"\d+(\.\d+)*$", revzone):
print(f"invalid autorev: {revzone}", file=sys.stderr)
exit(1)
rev.add_zone(revzone)
elif r:= re.match(r"\s*;.*?\[vlan\]\s*(.*)", line):
vlan = r[1]
if not revzone or not origin:
continue
if r := re.match(r"\s*([a-z0-9.-]+)\s+IN\s+A\s+([0-9.]+)(.*)", line, flags=re.I):
name, addr, extra = r[1], r[2], r[3]
if re.search(r"\[no_autorev\]", extra):
continue
try:
rev.ptr(revzone, addr, f"{name}.{origin}", vlan=vlan)
except (RevDuplicateAddress, RevInvalidAddress) as e:
print(e, file=sys.stderr)
exit(1)
elif r := re.match(r"\s*;\s*\[autorev\]\s*(.*)", line):
rev.record(revzone, r[1])
def process_oldrev(oldrev, revfile, rev):
newrev = ""
revzone = None
for line in re.split(r"\n", oldrev):
if re.match(r";\s*\[autorev begin\]", line):
return newrev
if r := re.match(r"\s*\$ORIGIN\s+([0-9.]+)\.in-addr\.arpa\.", line, flags=re.I):
revzone = ".".join(reversed(re.split(r"\.", r[1])))
#print(f"revzone: {revzone}")
elif r := re.match(r"\s*;\s*\[info\]\s*(.*)", line):
rev.add_zone_info(revzone, r[1])
elif r := re.match(r"\s*([0-9.]+)\s+IN\s+PTR\s+(\S+)", line, flags=re.I):
if not revzone:
print(f"PTR record before $ORIGIN in: {revfile}")
exit(1)
ip = revzone +"."+ ".".join(reversed(re.split(r"\.", r[1])))
#print(f"ptr: {ip} {r[2]}")
try:
rev.ptr(revzone, ip, r[2], noauto=True)
except (RevDuplicateAddress, RevInvalidAddress) as e:
print(e, file=sys.stderr)
exit(1)
newrev += line +"\n"
print(f"autorev begin marker not found in: {revfile}", file=sys.stderr)
exit(1)
def serial_incr(zone, zonefile):
if not (r := re.search(r"IN\s+SOA.*?\s+(\d{1,10})\s", zone)):
print(f"no SOA record found in {zonefile}", file=sys.stderr)
oldserial = r[1]
if len(oldserial) == 9:
revdigits = 1
revbase = 10
elif len(oldserial) == 10:
revdigits = 2
revbase = 100
else:
return str(int(oldserial) + 1)
newdate = int(datetime.datetime.now().strftime("%Y%m%d"))
olddate = int(oldserial[0:8])
oldrev = int(oldserial[8:])
if olddate < newdate:
# newdate marad
newrev = 1
elif olddate == newdate:
# newdate marad
newrev = oldrev + 1
else: # olddate > newdate
newdate = olddate
newrev = oldrev + 1
if newrev >= revbase:
newdate += 1
newrev -= revbase
if revdigits == 1:
newserial = f"{newdate}{newrev}"
else:
newserial = f"{newdate}{newrev:02}"
#print(f"serial {oldserial} -> {newserial}")
return re.sub(r"(IN\s+SOA.*?\s+)"+ oldserial, f"\\g<1>{newserial}", zone)
def dokuwiki_update(text):
#print(f"dokuwiki url: {conf.dokuwiki_url}")
try:
wiki = dokuwiki.DokuWiki(conf.dokuwiki_url, conf.dokuwiki_user, conf.dokuwiki_passwd)
except (dokuwiki.DokuWikiError, Exception) as err:
print(f"Cannot connect to wiki: {err}")
exit(1)
wiki.pages.set("auto:dns_ip_cim_kiosztas", text)
conf = zonetoolsconf.Conf()
rev = Rev()
for zf in glob(f"{conf.bind_path}/{conf.zone_glob}"):
read_zone(zf, rev=rev)
for z in rev.zones():
revfile = pathlib.Path(f"{conf.bind_path}/{conf.autorev_base}{z}")
if not revfile.is_file():
print(f"autorev file does not exist: {revfile}", file=sys.stderr)
exit(1)
oldrev = revfile.read_text()
newrev = process_oldrev(oldrev, revfile, rev=rev)
newrev += "; [autorev begin]\n" + "\n".join(rev.zone_autoptr(z)) + "\n"
if newrev == oldrev:
continue
newrev = serial_incr(newrev, revfile)
revfile.write_text(newrev)
print(f"{revfile} modified.")
if conf.dokuwiki_url:
wikitext = ""
for z in rev.zones():
wikitext += "\n".join(rev.zone_wiki(z)) +"\n\n"
dokuwiki_update(wikitext)
# vim: set tabstop=4 shiftwidth=4 expandtab smarttab: