#!/usr/bin/env python # author: Philipp Gassmann # check if domains configured with octodns are configured to use our nameservers # Usage: ./dnsverify.py 2> /dev/null import sys import yaml from pprint import pprint import dns.query import dns.resolver from dns.exception import DNSException def puzzle_nameservers(): dnspool = [ "ns1.dnsimple.com.", "ns2.dnsimple.com.", "ns3.dnsimple.com.", "ns4.dnsimple.com.", "ns5.dnsmadeeasy.com.", "ns6.dnsmadeeasy.com.", "ns7.dnsmadeeasy.com." ] return dnspool # query_authoritative_ns() based on https://stackoverflow.com/questions/4066614/how-can-i-find-the-authoritative-dns-server-for-a-domain-using-dnspython def query_authoritative_ns(domain, log=lambda msg: None): #default_resolver = dns.resolver.get_default_resolver() my_resolver = dns.resolver.Resolver() my_resolver.nameservers = ['8.8.8.8'] nameserver = my_resolver.nameservers[0] n = domain.split('.') for i in range(len(n), 0, -1): sub = '.'.join(n[i-1:]) log('Looking up %s on %s' % (sub, nameserver)) query = dns.message.make_query(sub, dns.rdatatype.NS) response = dns.query.tcp(query, nameserver) rcode = response.rcode() if rcode != dns.rcode.NOERROR: if rcode == dns.rcode.NXDOMAIN: raise Exception('%s does not exist.' % (sub)) else: raise Exception('Error %s' % (dns.rcode.to_text(rcode))) if len(response.authority) > 0: rrsets = response.authority elif len(response.additional) > 0: rrsets = [response.additional] else: rrsets = response.answer # Handle all RRsets, not just the first one for rrset in rrsets: for rr in rrset: if rr.rdtype == dns.rdatatype.SOA: log('Same server is authoritative for %s' % (sub)) elif rr.rdtype == dns.rdatatype.A: ns = rr.items[0].address log('Glue record for %s: %s' % (rr.name, ns)) elif rr.rdtype == dns.rdatatype.NS: authority = rr.target nameserver = my_resolver.query(authority).rrset[0].to_text() log('%s [%s] is authoritative for %s; ttl %i' % (authority, nameserver, sub, rrset.ttl)) result = rrset else: # IPv6 glue records etc #log('Ignoring %s' % (rr)) pass return result def log (msg): sys.stderr.write(msg + u'\n') def domains_from_config(filename): filecontent = open(filename, 'r') config_domains = yaml.load(filecontent, Loader=yaml.FullLoader) domains = config_domains['zones'].keys() return domains def verify_whois(domain): domain = domain.strip('.') result = query_authoritative_ns(domain, log) for entry in result: nameserver = entry.target.to_text() if not nameserver in puzzle_nameservers(): print('Domain %s has authoritative NS %s which is not in puzzle_nameservers' % (domain, nameserver)) return False log('Domain %s is ok' % (domain)) return True def verify_domains_from_config(filename): status_success = True domains = domains_from_config(filename) for domain in domains: if not verify_whois(domain): status_success = False return status_success if __name__ == "__main__": global_status_success = True for domain in [ './puzzle.ch.yaml', './puzzle.yaml', './nonpuzzle.yaml' ]: if not verify_domains_from_config(domain): global_status_success = False if not global_status_success: exit(1)