import dns.tsigkeyring import dns.update import logging import socket from django.core.exceptions import ValidationError from django.db import models from django.db.models.functions import Length from django.utils.html import format_html from django.utils.translation import gettext_lazy as _ from dns import rcode from dns.tsig import HMAC_MD5, HMAC_SHA1, HMAC_SHA224, HMAC_SHA256, HMAC_SHA384, HMAC_SHA512 from netaddr import IPNetwork, ip from typing import Optional from ipam.fields import IPNetworkField from ipam.models import IPAddress from .utils import normalize_fqdn from .validators import HostnameAddressValidator, HostnameValidator, validate_base64 logger = logging.getLogger('netbox_ddns') TSIG_ALGORITHM_CHOICES = ( (str(HMAC_MD5), 'HMAC MD5'), (str(HMAC_SHA1), 'HMAC SHA1'), (str(HMAC_SHA224), 'HMAC SHA224'), (str(HMAC_SHA256), 'HMAC SHA256'), (str(HMAC_SHA384), 'HMAC SHA384'), (str(HMAC_SHA512), 'HMAC SHA512'), ) ACTION_CREATE = 1 ACTION_DELETE = 2 ACTION_CHOICES = ( (ACTION_CREATE, 'Create'), (ACTION_DELETE, 'Delete'), ) # Use a private rcode for internal errors RCODE_NO_ZONE = 4095 def get_rcode_display(code): if code is None: return None elif code == rcode.NOERROR: return _('Success') elif code == rcode.SERVFAIL: return _('Server failure') elif code == rcode.NXDOMAIN: return _('Name does not exist') elif code == rcode.NOTIMP: return _('Not implemented') elif code == rcode.REFUSED: return _('Refused') elif code == rcode.NOTAUTH: return _('Server not authoritative') elif code == RCODE_NO_ZONE: return _('No zone configured') else: return _('Unknown response: {}').format(code) class Server(models.Model): server = models.CharField( verbose_name=_('DDNS Server'), max_length=255, validators=[HostnameAddressValidator()], ) tsig_key_name = models.CharField( verbose_name=_('TSIG Key Name'), max_length=255, validators=[HostnameValidator()], ) tsig_algorithm = models.CharField( verbose_name=_('TSIG Algorithm'), max_length=32, # Longest is 24 chars for historic reasons, new ones are shorter, so 32 is more than enough choices=TSIG_ALGORITHM_CHOICES, ) tsig_key = models.CharField( verbose_name=_('TSIG Key'), max_length=512, validators=[validate_base64], help_text=_('in base64 notation'), ) class Meta: unique_together = ( ('server', 'tsig_key_name'), ) ordering = ('server', 'tsig_key_name') verbose_name = _('dynamic DNS Server') verbose_name_plural = _('dynamic DNS Servers') def __str__(self): return f'{self.server} ({self.tsig_key_name})' def clean(self): # Remove trailing dots from the server name/address self.server = self.server.lower().rstrip('.') # Ensure trailing dots from domain-style fields self.tsig_key_name = normalize_fqdn(self.tsig_key_name.lower().rstrip('.')) @property def address(self) -> Optional[str]: addrinfo = socket.getaddrinfo(self.server, 'domain', proto=socket.IPPROTO_UDP) for family, _, _, _, sockaddr in addrinfo: if family in (socket.AF_INET, socket.AF_INET6) and sockaddr[0]: return sockaddr[0] def create_update(self, zone: str) -> dns.update.Update: return dns.update.Update( zone=normalize_fqdn(zone), keyring=dns.tsigkeyring.from_text({ self.tsig_key_name: self.tsig_key }), keyname=self.tsig_key_name, keyalgorithm=self.tsig_algorithm ) class ZoneQuerySet(models.QuerySet): def find_for_dns_name(self, dns_name: str) -> Optional['Zone']: # Generate all possible zones zones = [] parts = dns_name.lower().split('.') for i in range(len(parts)): zones.append('.'.join(parts[-i - 1:])) # Find the zone, if any return self.filter(name__in=zones).order_by(Length('name').desc()).first() class Zone(models.Model): name = models.CharField( verbose_name=_('zone name'), max_length=255, validators=[HostnameValidator()], unique=True, ) ttl = models.PositiveIntegerField( verbose_name=_('TTL'), ) server = models.ForeignKey( to=Server, verbose_name=_('DDNS Server'), on_delete=models.PROTECT, ) objects = ZoneQuerySet.as_manager() class Meta: ordering = ('name',) verbose_name = _('forward zone') verbose_name_plural = _('forward zones') def __str__(self): return self.name def clean(self): # Ensure trailing dots from domain-style fields self.name = normalize_fqdn(self.name) def get_updater(self): return self.server.create_update(self.name) class ReverseZoneQuerySet(models.QuerySet): def find_for_address(self, address: ip.IPAddress) -> Optional['ReverseZone']: # Find the zone, if any zones = list(ReverseZone.objects.filter(prefix__net_contains=address)) if not zones: return None zones.sort(key=lambda zone: zone.prefix.prefixlen) return zones[-1] class ReverseZone(models.Model): prefix = IPNetworkField( verbose_name=_('prefix'), unique=True, ) name = models.CharField( verbose_name=_('reverse zone name'), max_length=255, blank=True, help_text=_("RFC 2317 style reverse DNS, required when the prefix doesn't map to a reverse zone"), ) ttl = models.PositiveIntegerField( verbose_name=_('TTL'), ) server = models.ForeignKey( to=Server, verbose_name=_('DDNS Server'), on_delete=models.PROTECT, ) objects = ReverseZoneQuerySet.as_manager() class Meta: ordering = ('prefix',) verbose_name = _('reverse zone') verbose_name_plural = _('reverse zones') def __str__(self): return f'for {self.prefix}' def record_name(self, address: ip.IPAddress): record_name = self.name if IPNetwork(self.prefix).version == 4: for pos, octet in enumerate(address.words): if (pos + 1) * 8 <= self.prefix.prefixlen: continue record_name = f'{octet}.{record_name}' else: nibbles = f'{address.value:032x}' for pos, nibble in enumerate(nibbles): if (pos + 1) * 4 <= self.prefix.prefixlen: continue record_name = f'{nibble}.{record_name}' return record_name def clean(self): if isinstance(self.prefix, IPNetwork) and self.prefix.version == 4: if self.prefix.prefixlen not in [0, 8, 16, 24] and not self.name: raise ValidationError({ 'name': _('Required when prefix length is not 0, 8, 16 or 24'), }) elif not self.name: # Generate it for the user self.name = 'in-addr.arpa' for pos, octet in enumerate(self.prefix.ip.words): if pos * 8 >= self.prefix.prefixlen: break self.name = f'{octet}.{self.name}' elif isinstance(self.prefix, IPNetwork) and self.prefix.version == 6: if self.prefix.prefixlen % 4 != 0 and not self.name: raise ValidationError({ 'name': _('Required when prefix length is not a nibble boundary'), }) elif not self.name: # Generate it for the user self.name = 'ip6.arpa' nibbles = f'{self.prefix.ip.value:032x}' for pos, nibble in enumerate(nibbles): if pos * 4 >= self.prefix.prefixlen: break self.name = f'{nibble}.{self.name}' # Ensure trailing dots from domain-style fields self.name = normalize_fqdn(self.name) class DNSStatus(models.Model): ip_address = models.OneToOneField( to=IPAddress, verbose_name=_('IP address'), on_delete=models.CASCADE, ) last_update = models.DateTimeField( verbose_name=_('last update'), auto_now=True, ) forward_action = models.PositiveSmallIntegerField( verbose_name=_('forward record action'), choices=ACTION_CHOICES, blank=True, null=True, ) forward_rcode = models.PositiveIntegerField( verbose_name=_('forward record response'), blank=True, null=True, ) reverse_action = models.PositiveSmallIntegerField( verbose_name=_('reverse record action'), choices=ACTION_CHOICES, blank=True, null=True, ) reverse_rcode = models.PositiveIntegerField( verbose_name=_('reverse record response'), blank=True, null=True, ) class Meta: verbose_name = _('DNS status') verbose_name_plural = _('DNS status') def get_forward_rcode_display(self) -> Optional[str]: return get_rcode_display(self.forward_rcode) def get_forward_rcode_html_display(self) -> Optional[str]: output = get_rcode_display(self.forward_rcode) colour = 'green' if self.forward_rcode == rcode.NOERROR else 'red' return format_html('{output} Optional[str]: return get_rcode_display(self.reverse_rcode) def get_reverse_rcode_html_display(self) -> Optional[str]: output = get_rcode_display(self.reverse_rcode) colour = 'green' if self.reverse_rcode == rcode.NOERROR else 'red' return format_html('{output} Optional[str]: return get_rcode_display(self.forward_rcode) def get_forward_rcode_html_display(self) -> Optional[str]: output = get_rcode_display(self.forward_rcode) colour = 'green' if self.forward_rcode == rcode.NOERROR else 'red' return format_html('{output}