From 822f484eb7e8bdb7b3a3d7d2fbc12dfed5e17e4e Mon Sep 17 00:00:00 2001 From: Alarig Le Lay Date: Wed, 24 Jul 2024 14:38:08 +0200 Subject: [PATCH] Adding fallback to registrar RDAP server Signed-off-by: Alarig Le Lay --- check_domain_expiration_rdap.py | 114 ++++++++++++++++++++++++++------ 1 file changed, 94 insertions(+), 20 deletions(-) diff --git a/check_domain_expiration_rdap.py b/check_domain_expiration_rdap.py index eb4ac64..5e602fe 100755 --- a/check_domain_expiration_rdap.py +++ b/check_domain_expiration_rdap.py @@ -9,19 +9,21 @@ import logging import requests import nagiosplugin -import pandas -import pyunycode import requests_cache _log = logging.getLogger('nagiosplugin') -def expiration(domain): - list2dict = [] +# cache session for json and csv storage +session = requests_cache.CachedSession( + '/tmp/iana_rdap_cache', + cache_control=True +) - session = requests_cache.CachedSession( - '/tmp/iana_rdap_cache', - cache_control=True - ) +def find_rdap_server(domain): + """Find the TLD rdap server.""" + import pandas + + list2dict = [] req = session.get('https://data.iana.org/rdap/dns.json', timeout=120) for k,v in req.json()['services']: for x in k: @@ -29,7 +31,6 @@ def expiration(domain): df = pandas.DataFrame(list2dict) - domain = pyunycode.convert(domain) tld = domain.split('.')[-1] try: url = df[df.name == (tld)].iloc[0].url @@ -41,25 +42,33 @@ def expiration(domain): _log.debug(f'The used RDAP server is {url}') - req_rdap = requests.get(f'{url}domain/{domain}') + return url + + +def parse_ldap(domain, rdap_server): + req_rdap = requests.get(f'{rdap_server}domain/{domain}') match req_rdap.status_code: case 403: raise nagiosplugin.CheckError( - f'Got {req_rdap.status_code}, the RDAP server {url} refused to reply' + f'Got {req_rdap.status_code}, the RDAP server {rdap_server} refused to reply' ) case 404: raise nagiosplugin.CheckError( f'Got {req_rdap.status_code}, the domain {domain} has not been found' ) + case 409: + raise nagiosplugin.CheckError( + f'Got {req_rdap.status_code}, the RDAP server {rdap_server} has too many requests' + ) case 503: raise nagiosplugin.CheckError( - f'Got {req_rdap.status_code}, the RDAP server {url} seems broken' + f'Got {req_rdap.status_code}, the RDAP server {rdap_server} seems broken' ) case _: pass - _log.debug(f'The used RDAP JSON is {req_rdap.json()}') + _log.debug(f'The used RDAP JSON from {req_rdap.url} is {req_rdap.json()}') raw_expiration = [ event.get('eventDate', False) @@ -67,16 +76,78 @@ def expiration(domain): if event.get('eventAction', {}) == 'expiration' ] - try: + # if we have not found the field expiration in the list eventAction + if len(raw_expiration) == 0: + _log.debug(f'The domain JSON for {domain} does not have "eventAction"."expiration" field, run with -vvv or --debug to have the JSON dump') + raw_registrar = [ + entity.get('vcardArray', False) + for entity in req_rdap.json().get('entities', {}) + if 'registrar' in entity.get('roles') + ] + + # I hope that order of the fields is consistent + # and I do not know at all what fn means + # We try to find the registrar here + for line in raw_registrar[0][1]: + if 'fn' in line: + raw_expiration.append(line[3]) + + elif len(raw_expiration) == 1: fecha = raw_expiration[0].split('T')[0] - except IndexError: + today = datetime.datetime.now() + delta = datetime.datetime.strptime(fecha, '%Y-%m-%d') - today + raw_expiration[0] = delta.days + + else: raise nagiosplugin.CheckError( - f'The domain JSON for {domain} does not have "eventAction"."expiration" field, run with -vvv or --debug to have the JSON dump' + f'{raw_expiration} is too long' ) - today = datetime.datetime.now() - delta = datetime.datetime.strptime(fecha, '%Y-%m-%d') - today - return delta.days + return raw_expiration + +def expiration(domain): + """Find the expiration date for the domain.""" + + raw_expiration = parse_ldap(domain, find_rdap_server(domain)) + + # we have parsed the eventAction field about expiration + if isinstance(raw_expiration[0], int): + return raw_expiration[0] + # we have not, so we try to fall back to registrar ldap + elif isinstance(raw_expiration[0], str): + import csv + # fetch csv + iana_registrars_csv = session.get( + 'https://www.iana.org/assignments/registrar-ids/registrar-ids-1.csv', + timeout=120 + ).content.decode('utf-8') + # parse csv + registrar_rdap_found = False + for registrar_row in csv.reader( + iana_registrars_csv.splitlines(), + delimiter=',' + ): + # lower case comparaison just in case (haha) + if registrar_row[1].lower() == raw_expiration[0].lower(): + # re-query + _log.debug(f'Falling back to registrar RDAP: {registrar_row[3]}') + registrar_rdap_found = True + registrar_expiration = parse_ldap(domain, registrar_row[3]) + if isinstance(registrar_expiration[0], int): + return registrar_expiration[0] + else: + raise nagiosplugin.CheckError( + f'Neither TLD or {registrar_row[3]} have expiration data' + ) + if not(registrar_rdap_found): + raise nagiosplugin.CheckError( + f'The registrar {raw_expiration[0]} is not fond from {iana_registrars_csv.url}' + ) + + else: + raise nagiosplugin.CheckError( + f'Error while parsing the JSON, {raw_expiration[0]} does not have an expected format' + ) # data acquisition @@ -124,6 +195,8 @@ class ExpirationSummary(nagiosplugin.Summary): @nagiosplugin.guarded def main(): + import pyunycode + argp = argparse.ArgumentParser(description=__doc__) argp.add_argument( '-w', '--warning', metavar='int', default='30', @@ -154,8 +227,9 @@ def main(): level=logging.DEBUG ) + domain = pyunycode.convert(args.domain) check = nagiosplugin.Check( - Expiration(args.domain), + Expiration(domain), nagiosplugin.ScalarContext( 'daystoexpiration', warning=wrange,