#!/usr/bin/env python3 # Copyright (C) 2014-2016 Felix Geyer # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 2 or (at your option) # version 3 of the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import argparse import datetime import dns.exception import dns.flags import dns.rdatatype import dns.rdtypes.ANY.TLSA import dns.resolver import hashlib import re import socket import struct import ssl import subprocess import sys VERSION = "1.1" class ProtocolError(Exception): pass def nagios_ok(msg: str) -> None: print("DANE OK - " + msg) sys.exit(0) def nagios_warning(msg: str) -> None: print("DANE WARNING - " + msg) sys.exit(1) def nagios_critical(msg: str) -> None: print("DANE CRITICAL - " + msg) sys.exit(2) def nagios_unknown(msg: str) -> None: print("DANE UNKONWN - " + msg) sys.exit(3) def create_resolver(dnssec: bool = True, timeout: int = None, nameserver: str = None) -> dns.resolver.Resolver: resolver = dns.resolver.Resolver() if timeout and timeout != 0: resolver.lifetime = timeout if nameserver: resolver.nameservers = [nameserver] if dnssec: resolver.edns = 0 resolver.payload = 1280 resolver.ednsflags = dns.flags.DO return resolver def check_dns_response_auth(response: dns.resolver.Answer) -> bool: if response.response.flags & dns.flags.AD: return True else: return False def extract_pubkey(cert_binary: bytes) -> bytes: # should really be done with a python 3 module that exposes this openssl api # extract public key in pem format pubkey_pem = subprocess.check_output(["openssl", "x509", "-pubkey", "-inform", "der", "-noout"], input=cert_binary) # conver to binary / der format pubkey_binary = subprocess.check_output(["openssl", "pkey", "-pubin", "-outform", "der"], input=pubkey_pem) return pubkey_binary def get_tlsa_records(args: argparse.Namespace) -> dns.resolver.Answer: resolver = create_resolver(dnssec=args.dnssec, timeout=args.timeout, nameserver=args.nameserver) tlsa_domain = "_{}._tcp.{}".format(args.port, args.host) try: tlsa_records = resolver.query(tlsa_domain, dns.rdatatype.TLSA) except dns.resolver.NXDOMAIN: nagios_critical("No DNS TLSA record found: {}".format(tlsa_domain)) except dns.exception.Timeout: nagios_unknown("DNS query timeout: {}".format(tlsa_domain)) if args.dnssec and not check_dns_response_auth(tlsa_records): nagios_unknown("DNS query not DNSSEC validated") return tlsa_records def validate_dane(cert_binary: bytes, pkix_valid: bool, tlsa_record: dns.rdtypes.ANY.TLSA.TLSA) -> bool: if tlsa_record.usage == 3: pass elif tlsa_record.usage == 1 and pkix_valid: pass else: # usage=2 unsupported return False if tlsa_record.selector == 0: data = cert_binary elif tlsa_record.selector == 1: data = extract_pubkey(cert_binary) else: return False if tlsa_record.mtype == 0: hashed = data elif tlsa_record.mtype == 1: hashed = hashlib.sha256(data).digest() elif tlsa_record.mtype == 2: hashed = hashlib.sha512(data).digest() else: return False return hashed == tlsa_record.cert def check_cert_expiry(args: argparse.Namespace, cert: dict, days_warning: int, days_critical: int = None) -> datetime.timedelta: not_after = datetime.datetime.strptime(cert["notAfter"], "%b %d %H:%M:%S %Y %Z") date_diff = not_after - datetime.datetime.now() if days_critical: if date_diff <= datetime.timedelta(days_critical): nagios_critical("{}:{} cert expires in {} days".format(args.host, args.port, date_diff.days)) if date_diff <= datetime.timedelta(days_warning): nagios_warning("{}:{} cert expires in {} days".format(args.host, args.port, date_diff.days)) return date_diff def connect_to_host(connect_host: str, connect_port: int, args: argparse.Namespace, check_cert: bool) -> ssl.SSLSocket: if args.timeout == 0: socket.setdefaulttimeout(None) else: socket.setdefaulttimeout(args.timeout) context = ssl.create_default_context() if not check_cert: context.check_hostname = False context.verify_mode = ssl.CERT_NONE # TODO: use our resolver try: sock = socket.create_connection((connect_host, connect_port)) except OSError as e: nagios_unknown("Can't establish a connection to {}:{}:\n{}" .format(connect_host, connect_port, str(e))) peername = sock.getpeername()[0] if ":" in peername: peername = "[{}]".format(peername) try: if args.starttls == "smtp": connect_smtp(sock) elif args.starttls == "ftp": connect_ftp(sock) elif args.starttls == "imap": connect_imap(sock) elif args.starttls == "xmpp": connect_xmpp(sock, args.host) elif args.starttls == "quassel": connect_quassel(sock, args.host) except (ProtocolError, OSError) as e: nagios_unknown("Failed to initiate STARTTLS to {} ({}:{}):\n{}" .format(connect_host, peername, connect_port, str(e))) try: return context.wrap_socket(sock, server_hostname=args.host) except (ProtocolError, ssl.SSLError, OSError) as e: if isinstance(e, ssl.SSLError) and e.reason == "CERTIFICATE_VERIFY_FAILED": # pass exceptions concerning certificate validation to the caller raise else: nagios_unknown("Can't establish a TLS connection to {} ({}:{}):\n{}" .format(connect_host, peername, connect_port, str(e))) def smtp_read_response(sock: socket.socket) -> str: response = "" line_end = False while not line_end: line = sock.recv(1024) response += line.decode("ASCII") if len(line) < 4 or line[3] != "-": line_end = True return response def connect_smtp(sock: socket.socket) -> None: smtp_read_response(sock) sock.sendall(b"EHLO openssl.client.net\r\n") response = smtp_read_response(sock) if "STARTTLS" not in response: raise ProtocolError("Server doesn't support STARTTLS") sock.sendall(b"STARTTLS\r\n") smtp_read_response(sock) def connect_ftp(sock: socket.socket) -> None: buf = sock.recv(1024) sock.sendall(b"AUTH TLS\r\n") response = sock.recv(1024).decode("ASCII").strip("\r\n\t ") if not re.search(r"^2\d\d", response): raise ProtocolError("Server doesn't support STARTTLS ({})".format(response)) def connect_imap(sock: socket.socket) -> None: sock.recv(1024) sock.sendall(b". CAPABILITY\n") response = smtp_read_response(sock) if "STARTTLS" not in response: raise ProtocolError("Server doesn't support STARTTLS") sock.sendall(b". STARTTLS\n") smtp_read_response(sock) def connect_xmpp(sock: socket.socket, host: str) -> None: sock.sendall("".format(host).encode("ASCII")) buf = sock.recv(1024) if "") buf = sock.recv(1024) if " None: MAGIC = 0x42b33f00 FEATURE_ENCRYPTION = 0x1 PROTOCOL_DATAGRAM = 0x2 PROTOCOL_END = (0x1 << 31) sock.sendall(struct.pack("!I", MAGIC | FEATURE_ENCRYPTION)) sock.sendall(struct.pack("!I", PROTOCOL_DATAGRAM | PROTOCOL_END)) try: response = struct.unpack("!I", sock.recv(4))[0] except struct.error: raise ProtocolError("No valid response from server") protocol_type = (response & 0xff) connection_features = (response >> 24) if not (protocol_type & PROTOCOL_DATAGRAM): raise ProtocolError("Server doesn't support the protocol") if not (connection_features & FEATURE_ENCRYPTION): raise ProtocolError("Server doesn't support TLS") def main() -> None: parser = argparse.ArgumentParser(description=""" Nagios/Icinga plugin for checking DANE/TLSA records. It compares the DANE/TLSA record against the TLS certificate provided by a service.""") parser.add_argument("--host", "-H", dest="host", required=True, help="Hostname to check.") parser.add_argument("--port", "-p", type=int, required=True, help="TCP port to check.") parser.add_argument("--connect-host", "--ip", "-I", dest="connect_host", help="Connect to this host instead of --host.") parser.add_argument("--connect-port", dest="connect_port", help="Connect to this port instead of --port.") parser.add_argument("--starttls", choices=["smtp", "ftp", "imap", "xmpp", "quassel"], help="Send the protocol-specific messages to enable TLS.") parser = argparse.ArgumentParser(description=""" Nagios/Icinga plugin for checking DANE/TLSA records. It compares the DANE/TLSA record against the TLS certificate provided by a service.""") parser.add_argument("--host", "-H", dest="host", required=True, help="Hostname to check.") parser.add_argument("--port", "-p", type=int, required=True, help="TCP port to check.") parser.add_argument("--connect-host", "--ip", "-I", dest="connect_host", help="Connect to this host instead of --host.") parser.add_argument("--connect-port", dest="connect_port", help="Connect to this port instead of --port.") parser.add_argument("--starttls", choices=["smtp", "ftp", "imap", "xmpp", "quassel"], help="Send the protocol-specific messages to enable TLS.") parser.add_argument("--check-pkix", action="store_true", help="Additionally perform traditional checks on the certificate " "(ca trust path, hostname, expiry).") parser.add_argument("--min-days-valid", help="Minimum number of days a certificate has to be valid. " "Format: INTEGER[,INTEGER]. " "1st is #days for warning, 2nd is critical.") parser.add_argument("--no-dnssec", dest="dnssec", action="store_false", help="Continue even when DNS replies aren't DNSSEC authenticated.") parser.add_argument("--nameserver", help="Use a custom nameserver.") parser.add_argument("--timeout", type=int, default=10, help="Network timeout in sec. Default: 10") parser.add_argument("--version", action="version", version="%(prog)s " + VERSION) args = parser.parse_args() pyver = sys.version_info if pyver[0] < 3 or (pyver[0] == 3 and pyver[1] < 4): nagios_unknown("check_dane requires Python >= 3.4") if args.port < 1 or args.port > 65535: nagios_unknown("Invalid port") if args.min_days_valid and not re.search(r"^\d+(,\d+)?$", args.min_days_valid): nagios_unknown("--check-cert-expire takes INTEGER[,INTEGER] as arguments") if args.timeout < 0: nagios_unknown("Invalid timeout argument") if args.connect_host: connect_host = args.connect_host else: connect_host = args.host if args.connect_port: connect_port = args.connect_port else: connect_port = args.port tlsa_records = get_tlsa_records(args) has_usage1_tlsa = False for record in tlsa_records: if record.usage == 1: has_usage1_tlsa = True break initial_check_pkix = (args.check_pkix or has_usage1_tlsa) try: # validate against PKIX if manually requested or we've found a usage=1 tlsa record ssl_sock = connect_to_host(connect_host, connect_port, args, initial_check_pkix) pkix_valid = initial_check_pkix except (ssl.CertificateError, ssl.SSLError) as e: if args.check_pkix: nagios_critical(str(e)) else: ssl_sock = connect_to_host(connect_host, connect_port, args, False) pkix_valid = False pkix_error = str(e) cert_binary = ssl_sock.getpeercert(binary_form=True) cert_dict = ssl_sock.getpeercert() ssl_sock.close() dane_valid_cert = False for tlsa in tlsa_records: if validate_dane(cert_binary, pkix_valid, tlsa): dane_valid_cert = True break if not dane_valid_cert: # test if it would match if it were pkix_valid additional_msg = "" for tlsa in tlsa_records: if validate_dane(cert_binary, True, tlsa): additional_msg = "\nIt matches a TLSA usage=1 record but fails PKIX validation:\n" + pkix_error break nagios_critical("Certificate doesn't match TLSA record" + additional_msg) if pkix_valid and args.min_days_valid: days_parts = args.min_days_valid.split(",") if len(days_parts) == 2: timedelta_valid = check_cert_expiry(args, cert_dict, int(days_parts[0]), int(days_parts[1])) else: timedelta_valid = check_cert_expiry(args, cert_dict, int(days_parts[0])) expire_str = ", expires in {} days".format(timedelta_valid.days) else: expire_str = "" message = "{}:{} cert matches TLSA record".format(args.host, args.port) if not args.dnssec: message += " (DNSSEC not validated)" message += expire_str nagios_ok(message) if __name__ == "__main__": main() message = "{}:{} cert matches TLSA record".format(args.host, args.port) if not args.dnssec: message += " (DNSSEC not validated)" message += expire_str nagios_ok(message) if __name__ == "__main__": main() # kate: space-indent on; indent-width 4;