import logging from typing import List, Optional import dns.query import dns.rdatatype import dns.resolver from django.db import IntegrityError from django.db.models.functions import Length from django_rq import job from dns import rcode from netaddr import ip from netbox_ddns.models import ACTION_CREATE, ACTION_DELETE, DNSStatus, ReverseZone, Zone from netbox_ddns.utils import normalize_fqdn logger = logging.getLogger('netbox_ddns') def get_zone(dns_name: str) -> Optional[Zone]: # Generate all possible zones zones = [] parts = dns_name.lower().split('.') for i in range(len(parts)): zones.append('.'.join(parts[-i - 1:])) # Find the zone, if any return Zone.objects.filter(name__in=zones).order_by(Length('name').desc()).first() def get_soa(dns_name: str) -> str: parts = dns_name.rstrip('.').split('.') for i in range(len(parts)): zone_name = normalize_fqdn('.'.join(parts[i:])) try: dns.resolver.query(zone_name, dns.rdatatype.SOA) return zone_name except dns.resolver.NoAnswer: # The name exists, but has no SOA. Continue one level further up continue except dns.resolver.NXDOMAIN as e: # Look for a SOA record in the authority section for query, response in e.responses().items(): for rrset in response.authority: if rrset.rdtype == dns.rdatatype.SOA: return rrset.name.to_text() def get_reverse_zone(address: ip.IPAddress) -> Optional[ReverseZone]: # Find the zone, if any zones = list(ReverseZone.objects.filter(prefix__net_contains=address)) if not zones: return None zones.sort(key=lambda zone: zone.prefix.prefixlen) return zones[-1] def status_update(output: List[str], operation: str, response) -> None: code = response.rcode() if code == dns.rcode.NOERROR: message = f"{operation} successful" logger.info(message) else: message = f"{operation} failed: {dns.rcode.to_text(code)}" logger.error(message) output.append(message) def create_forward(dns_name: str, address: ip.IPAddress, status: Optional[DNSStatus], output: List[str]): zone = get_zone(dns_name) if zone: logger.debug(f"Found zone {zone.name} for {dns_name}") if status: status.forward_action = ACTION_CREATE # Check the SOA, we don't want to write to a parent zone if it has delegated authority soa = get_soa(dns_name) if soa == zone.name: record_type = 'A' if address.version == 4 else 'AAAA' update = zone.server.create_update(zone.name) update.add( dns_name, zone.ttl, record_type, str(address) ) response = dns.query.udp(update, zone.server.address) status_update(output, f'Adding {dns_name} {record_type} {address}', response) if status: status.forward_rcode = response.rcode() else: logger.warning(f"Can't update zone {zone.name} for {dns_name}, " f"it has delegated authority for {soa}") if status: status.forward_rcode = rcode.NOTAUTH else: logger.debug(f"No zone found for {dns_name}") def delete_forward(dns_name: str, address: ip.IPAddress, status: Optional[DNSStatus], output: List[str]): zone = get_zone(dns_name) if zone: logger.debug(f"Found zone {zone.name} for {dns_name}") if status: status.forward_action = ACTION_DELETE # Check the SOA, we don't want to write to a parent zone if it has delegated authority soa = get_soa(dns_name) if soa == zone.name: record_type = 'A' if address.version == 4 else 'AAAA' update = zone.server.create_update(zone.name) update.delete( dns_name, record_type, str(address) ) response = dns.query.udp(update, zone.server.address) status_update(output, f'Deleting {dns_name} {record_type} {address}', response) if status: status.forward_rcode = response.rcode() else: logger.warning(f"Can't update zone {zone.name} {dns_name}, " f"it has delegated authority for {soa}") if status: status.forward_rcode = rcode.NOTAUTH else: logger.debug(f"No zone found for {dns_name}") def create_reverse(dns_name: str, address: ip.IPAddress, status: Optional[DNSStatus], output: List[str]): zone = get_reverse_zone(address) if zone and dns_name: record_name = zone.record_name(address) logger.debug(f"Found zone {zone.name} for {record_name}") if status: status.reverse_action = ACTION_CREATE # Check the SOA, we don't want to write to a parent zone if it has delegated authority soa = get_soa(record_name) if soa == zone.name: update = zone.server.create_update(zone.name) update.add( record_name, zone.ttl, 'ptr', dns_name ) response = dns.query.udp(update, zone.server.address) status_update(output, f'Adding {record_name} PTR {dns_name}', response) if status: status.reverse_rcode = response.rcode() else: logger.warning(f"Can't update zone {zone.name} for {record_name}, " f"it has delegated authority for {soa}") if status: status.reverse_rcode = rcode.NOTAUTH else: logger.debug(f"No zone found for {address}") def delete_reverse(dns_name: str, address: ip.IPAddress, status: Optional[DNSStatus], output: List[str]): zone = get_reverse_zone(address) if zone and dns_name: record_name = zone.record_name(address) logger.debug(f"Found zone {zone.name} for {record_name}") if status: status.reverse_action = ACTION_DELETE # Check the SOA, we don't want to write to a parent zone if it has delegated authority soa = get_soa(record_name) if soa == zone.name: update = zone.server.create_update(zone.name) update.delete( record_name, 'ptr', dns_name ) response = dns.query.udp(update, zone.server.address) status_update(output, f'Deleting {record_name} PTR {dns_name}', response) if status: status.reverse_rcode = response.rcode() else: logger.warning(f"Can't update zone {zone.name} for {record_name}, " f"it has delegated authority for {soa}") if status: status.reverse_rcode = rcode.NOTAUTH else: logger.debug(f"No zone found for {address}") @job def dns_create(dns_name: str, address: ip.IPAddress, forward=True, reverse=True, status: DNSStatus = None): output = [] if forward: create_forward(dns_name, address, status, output) if reverse: create_reverse(dns_name, address, status, output) if status: try: status.save() except IntegrityError: # Race condition when creating? status.save(force_update=True) return ', '.join(output) @job def dns_delete(dns_name: str, address: ip.IPAddress, forward=True, reverse=True, status: DNSStatus = None): output = [] if forward: delete_forward(dns_name, address, status, output) if reverse: delete_reverse(dns_name, address, status, output) if status: try: status.save() except IntegrityError: # Race condition when creating? status.save(force_update=True) return ', '.join(output)