import hashlib from dns import flags, rdatatype, resolver as dnsres from paramiko.client import MissingHostKeyPolicy from paramiko.common import DEBUG _key_algorithms = { 'ssh-rsa' : '1', 'ssh-dss' : '2', 'ecdsa-sha2-nistp256' : '3', 'ecdsa-sha2-nistp384' : '3', 'ecdsa-sha2-nistp521' : '3', 'ssh-ed25519' : '4', } _hash_funcs = { '1' : hashlib.sha1, '2' : hashlib.sha256, } class SSHFPPolicy(MissingHostKeyPolicy): '''Checks for a matching SSHFP RR''' def __init__(self, resolver = None): if resolver is None: resolver = dnsres.Resolver() resolver.use_edns(0, flags.DO, 1280) self._resolver = resolver def missing_host_key(self, client, hostname, key): try: key_alg = _key_algorithms[key.get_name()] except KeyError: raise Exception('Unsupported key type for SSHFP: %s' % key.get_name()) try: resp = self._resolver.query(hostname, 'SSHFP') except dnsres.NoAnswer: raise Exception('Could not obtain SSHFP records for host: %s' % hostname) if not resp.response.flags & flags.AD: raise Exception('Answer does not have a valid DNSSEC signature') for item in resp: try: alg, fg_type, fg = item.to_text().split() except ValueError: raise Exception('Invalid SSHFP record format: %s' % item.to_text()) if alg != key_alg: continue if not fg_type in _hash_funcs: continue fg_expect = _hash_funcs[fg_type](key.asbytes()).hexdigest() if fg_expect == fg: client._log(DEBUG, 'Found valid SSHFP record for host %s' % hostname) return raise Exception('No matching SSHFP records found')