ansible-base/roles/icinga2_server/files/check_openvpn
2020-10-03 14:19:22 +02:00

187 lines
6.3 KiB
Python

#!/usr/bin/env python
# Check if an OpenVPN server runs on a given UDP or TCP port.
#
# Copyright 2013 Roland Wolters
# Copyright 2016 Alarig Le Lay
#
# Version 20160803
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import os
import sys
import time
import hmac
import hashlib
import struct
import socket
import argparse
import binascii
HMAC_CLIENT_KEY_START = 192
BUFFER_SIZE = 1024
ALGORITHMS_AVAILABLE = hashlib.algorithms_available \
if hasattr(hashlib, "algorithms_available") else hashlib.algorithms
def ok(msg):
print('OK: %s' % msg)
return 0
def critical(msg):
print('CRIT: %s' % msg)
return 2
def buildpacket(tcp, key, digestmod):
packet = 1
ts = int(time.time())
session = os.urandom(8)
if key:
# hmac
h = hmac.new(key, digestmod=digestmod)
h.update(struct.pack('>I', packet)) # packet id
h.update(struct.pack('>I', ts)) # net time
h.update(b'\x38') # type
h.update(session) # session id
h.update(struct.pack('>B', 0)) # message packet id array length
h.update(struct.pack('>I', 0)) # message packet id
# packet
result = b''
result += b'\x38' # type
result += session # session id
if key: result += h.digest() # hmac
result += struct.pack('>I', packet) # packet id
result += struct.pack('>I', ts) # net time
result += struct.pack('>B', 0) # message packet id array length
result += struct.pack('>I', 0) # message packet id
if tcp: result = struct.pack('>H', len(result)) + result
return result
def checkserver(host, port, tcp, timeout, key, digest):
packet = buildpacket(tcp, key, digest)
check = checkserver_tcp if tcp else checkserver_udp
return check(host, port, timeout, packet)
def checkserver_udp(host, port, timeout, packet):
# thanks to glucas for the idea
try:
af, socktype, proto, canonname, sa = socket.getaddrinfo(host, port, \
socket.AF_UNSPEC, socket.SOCK_DGRAM)[0]
s = socket.socket(af, socktype, proto)
s.settimeout(timeout)
except socket.error:
return critical('Unable to create UDP socket')
try:
s.sendto(packet, (host, port))
data, _ = s.recvfrom(BUFFER_SIZE)
reply = binascii.hexlify(data)
return ok('OpenVPN UDP server response (hex): %s' % reply)
except:
return critical('OpenVPN UDP server not responding')
finally:
s.close()
def checkserver_tcp(host, port, timeout, packet):
try:
af, socktype, proto, canonname, sa = socket.getaddrinfo(host, port, \
socket.AF_UNSPEC, socket.SOCK_STREAM)[0]
s = socket.socket(af, socktype, proto)
s.settimeout(timeout)
except socket.error:
return critical('Unable to create TCP socket')
try:
s.connect((host, port))
s.send(packet)
data = s.recv(BUFFER_SIZE)
if len(data) <= 0: raise RuntimeError
reply = binascii.hexlify(data)
return ok('OpenVPN TCP server response (hex): %s' % reply)
except:
return critical('OpenVPN TCP server not responding')
finally:
s.close()
def readkey(path):
key = None
try:
with open(path, 'r') as myfile: key = myfile.read()
except:
return None
index_start = key.find('-\n');
index_end = key.find('\n-', index_start);
if index_start < 0 or index_end < 0 or index_end <= index_start:
return None
index_start += 2
key = key[index_start:index_end].replace('\n', '').replace('\r', '')
return key
def optionsparser(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('-p', '--port', help='set port number (default is %(default)d)', type=int, default=1194)
parser.add_argument('-t', '--tcp', help='use tcp instead of udp', action='store_true')
parser.add_argument('--timeout', help='set timeout (default is %(default)d)', type=int, default=5)
parser.add_argument('--digest', help='set HMAC digest (default is "%(default)s")', default='sha1')
parser.add_argument('--digest-size', help='set HMAC digest size', type=int)
parser.add_argument('--digest-key', help='set HMAC key')
parser.add_argument('--tls-auth', help='set tls-auth file')
parser.add_argument('host', help='the OpenVPN host name or IP')
return parser.parse_args(argv)
def main(argv=None):
args = optionsparser(argv)
if args.digest_size and args.digest_size < 0:
critical('digest size must be positive')
if args.tls_auth and args.digest_key:
critical('--tls-auth cannot go with --digest-key')
key = args.digest_key
digest = args.digest
digest_size = args.digest_size
digest = digest.lower()
if digest not in ALGORITHMS_AVAILABLE:
return critical('digest not available')
try:
digest = getattr(hashlib, digest)
if not digest_size: digest_size = digest().digest_size
except:
return critical('digest creation failed')
if args.tls_auth:
key = readkey(args.tls_auth)
if key == None: return critical('cannot read tls auth file')
index_start = HMAC_CLIENT_KEY_START * 2
index_end = (HMAC_CLIENT_KEY_START + digest_size) * 2
key = key[index_start:index_end]
if key: key = binascii.unhexlify(key)
return checkserver(args.host, args.port, args.tcp, args.timeout, key, digest)
if __name__ == '__main__':
code = main()
sys.exit(code)