diff --git a/dnsverify.py b/dnsverify.py index 86b9f62..874e3ed 100644 --- a/dnsverify.py +++ b/dnsverify.py @@ -1,76 +1,116 @@ -#!/bin/env python - -''' - Check whether the authoritative nameservers returned for all puzzle managed - domains belong to the list of pitc_nameservers and fail if one does not. -''' +#!/usr/bin/env python +# author: Philipp Gassmann +# check if domains configured with octodns are configured to use our nameservers +# Usage: ./dnsverify.py 2> /dev/null import sys import yaml +from pprint import pprint -from dns import resolver +import dns.query +import dns.resolver +from dns.exception import DNSException -# list of puzzle authoritative name servers -pitc_nameservers = [ - 'ns1.dnsimple.com.', - 'ns2.dnsimple.com.', - 'ns3.dnsimple.com.', - 'ns4.dnsimple.com.', - 'ns5.dnsmadeeasy.com.', - 'ns6.dnsmadeeasy.com.', - 'ns7.dnsmadeeasy.com.' - ] +def puzzle_nameservers(): + dnspool = [ + "ns1.dnsimple.com.", + "ns2.dnsimple.com.", + "ns3.dnsimple.com.", + "ns4.dnsimple.com.", + "ns5.dnsmadeeasy.com.", + "ns6.dnsmadeeasy.com.", + "ns7.dnsmadeeasy.com." ] + return dnspool -# list of puzzle managed zone files -pitc_domains = [ 'puzzle.ch.yaml', 'puzzle.yaml', 'nonpuzzle.yaml' ] +# query_authoritative_ns() based on https://stackoverflow.com/questions/4066614/how-can-i-find-the-authoritative-dns-server-for-a-domain-using-dnspython +def query_authoritative_ns(domain, log=lambda msg: None): -# configure opendns resolver -resolver = resolver.Resolver() -resolver.nameservers = ['208.67.222.222','208.67.220.220'] + #default_resolver = dns.resolver.get_default_resolver() + my_resolver = dns.resolver.Resolver() + my_resolver.nameservers = ['8.8.8.8'] -def get_authoritative_ns(domains): - ''' - dsc: Query the domains and return the authoritative name server. - arg: [list], domain to query - ret: [str], nameserver - ''' - for domain in domains: - answers = resolver.resolve(domain,'NS') - for server in answers: - if not verify_authoritative_ns(str(server)): - print("ERROR: {} got answer from {}, not managed by puzzle".format(domain, server), file=sys.stderr) - return False + nameserver = my_resolver.nameservers[0] + + n = domain.split('.') + + for i in range(len(n), 0, -1): + sub = '.'.join(n[i-1:]) + + log('Looking up %s on %s' % (sub, nameserver)) + query = dns.message.make_query(sub, dns.rdatatype.NS) + response = dns.query.tcp(query, nameserver) + + rcode = response.rcode() + if rcode != dns.rcode.NOERROR: + if rcode == dns.rcode.NXDOMAIN: + raise Exception('%s does not exist.' % (sub)) else: - #print("{} got answer from {}".format(domain, server)) - pass + raise Exception('Error %s' % (dns.rcode.to_text(rcode))) + + if len(response.authority) > 0: + rrsets = response.authority + elif len(response.additional) > 0: + rrsets = [response.additional] + else: + rrsets = response.answer + + # Handle all RRsets, not just the first one + for rrset in rrsets: + for rr in rrset: + if rr.rdtype == dns.rdatatype.SOA: + log('Same server is authoritative for %s' % (sub)) + elif rr.rdtype == dns.rdatatype.A: + ns = rr.items[0].address + log('Glue record for %s: %s' % (rr.name, ns)) + elif rr.rdtype == dns.rdatatype.NS: + authority = rr.target + nameserver = my_resolver.query(authority).rrset[0].to_text() + log('%s [%s] is authoritative for %s; ttl %i' % + (authority, nameserver, sub, rrset.ttl)) + result = rrset + else: + # IPv6 glue records etc + #log('Ignoring %s' % (rr)) + pass + return result + +def log (msg): + sys.stderr.write(msg + u'\n') + +def domains_from_config(filename): + filecontent = open(filename, 'r') + config_domains = yaml.load(filecontent, Loader=yaml.FullLoader) + domains = config_domains['zones'].keys() + return domains + +def verify_whois(domain): + domain = domain.strip('.') + result = query_authoritative_ns(domain, log) + for entry in result: + nameserver = entry.target.to_text() + if not nameserver in puzzle_nameservers(): + print('Domain %s has authoritative NS %s which is not in puzzle_nameservers' % (domain, nameserver)) + return False + log('Domain %s is ok' % (domain)) return True -def get_domains(filenames): - ''' - dsc: Loads domain names from a list of yaml files. - arg: [list], filenames - ret: [list], arbitrary list of domain names - ''' - domains = [] - for file in filenames: - with open(file, 'r') as zone_file: - yaml_data = yaml.safe_load(zone_file) - yaml_list = list(yaml_data.get('zones')) - domains.extend(yaml_list) - return domains +def verify_domains_from_config(filename): + status_success = True + domains = domains_from_config(filename) + for domain in domains: + if not verify_whois(domain): + status_success = False + return status_success -def verify_authoritative_ns(nameserver): - ''' - dsc: Verifies if the authoritative NS belongs to the puzzle managed NS. - arg: [str], nameserver - ret: [boolean], true if ok; false if nok. - ''' - if nameserver in pitc_nameservers: - return True - return False if __name__ == "__main__": - dns = get_domains(pitc_domains) - if not get_authoritative_ns(dns): - sys.exit(1) - sys.exit(0) + global_status_success = True + for domain in [ + './puzzle.ch.yaml', + './puzzle.yaml', + './nonpuzzle.yaml' + ]: + if not verify_domains_from_config(domain): + global_status_success = False + if not global_status_success: + exit(1)