In this post I will show another snippet of code in Python, which will be used in automations/pipelines. This is an example of how to perform simple maintenance of type A records in DNS, in our case, Bind9.

The premise of the automation that I will do soon is to read the entries/configurations from a Blueprint in our gitlab and apply them to Bind9. That’s why this piece of code is important.

For this, of several Python modules I’ve looked for, so far dnspython https://pypi.org/project/dnspython/ (https://www.dnspython.org/examples/) has had the best answer for what I need.

The example below is simple, read-only, creation and deletion of type A records. If you need any more types, there are some examples on the Internet, or you can get in touch and I’ll try to help.

import dns.zone
import dns.ipv4
import dns.rdataset
import dns.rdtypes.IN.A
import dns.update

import os.path

def load_DNSZones(str_filename):
    """
    Loads the DNS Zones file and returns the Zone class object.
    :str_filename: Full path of the dns zones file.
    :return: Zone Class
    :rtype: zone Class
    """    
    try:
        zone = dns.zone.from_file(str_filename, os.path.basename(str_filename), relativize=False)
        return zone
    except Exception as e:
        print('load_DNSZones Error: ' + str(e) + ' - Line: ' + str(e.__traceback__.tb_lineno))
        raise    

def load_ARecords(obj_zone, str_hostname):
    """
    Loads/fetches a specific A Record entry from the zone file.
    :obj_zone: Zone class object already loaded.
    :str_hostname: Type A record name/DNS
    :return: Dataset with record data.
    :rtype: rdataset Class
    """    
    try:
        rdataset = obj_zone.find_rdataset(str_hostname, dns.rdatatype.A, create=True)
        return rdataset
    except Exception as e:
        print('load_ARecords Error: ' + str(e) + ' - Line: ' + str(e.__traceback__.tb_lineno))
        raise    

def add_ARecords(obj_rdataset, str_ipaddr, int_TTL):
    """
    Adds a specific record of type A to the Zones file, the record will be the same as the one loaded/searched for in load_ARecords().
    :obj_rdataset: rdataset object returned from load_ARecords() function.
    :str_ipaddr: Destination host IP (A).
    :int_TTL: DNS TLS
    :return: None.
    """        
    try:
        rdata = dns.rdtypes.IN.A.A(dns.rdataclass.IN, dns.rdatatype.A, str_ipaddr)
        obj_rdataset.add(rdata, int_TTL)
#        zone.to_file('devops-db.info', relativize=False, want_comments=True)
    except Exception as e:
        print('add_ARecords Error: ' + str(e) + ' - Line: ' + str(e.__traceback__.tb_lineno))
        raise    

def remove_ARecords(obj_zone, str_hostname):
    """
    Deletes a specific type A record.
    :obj_zone: Zone class object already loaded.
    :str_hostname: Type A record name/DNS
    :return: None.
    """            
    try:
        rdataset_remove = obj_zone.delete_rdataset(str_hostname, dns.rdatatype.A)
#        zone.to_file('devops-db.info', relativize=False, want_comments=True)
        return rdataset_remove
    except Exception as e:
        print('remove_ARecords Error: ' + str(e) + ' - Line: ' + str(e.__traceback__.tb_lineno))
        raise    

def write_DNSZones(obj_zone):
    """
    DNS changes persist in the Zones configuration file loaded in the load_DNSZones() function.
    :obj_zone: Zone class object already loaded.
    :return: None.
    """            
    try:
        obj_zone.to_file('devops-db.info', relativize=False, want_comments=True)
    except Exception as e:
        print('write_DNSZones Error: ' + str(e) + ' - Line: ' + str(e.__traceback__.tb_lineno))
        raise    

def get_ARecords(obj_zone):
    """
    Returns the list of type A DNS records from the loaded zone file.
    :obj_zone: Zone class object already loaded.
    :return: List of Type A Records.
    :rtype: List of Dictionaries.
    """            
    try:
        lst_ARecords = []
        for a_records in obj_zone.iterate_rdatas('A'):
            dict_ARecord = {'domain': a_records[0].parent().to_text(), 'dns': a_records[0].relativize(a_records[0].parent()).to_text(), 
                            'ttl': str(a_records[1]), 'A': a_records[2].to_text()}
            lst_ARecords.append(dict_ARecord)
        return lst_ARecords
    except Exception as e:
        print('get_ARecords Error: ' + str(e) + ' - Line: ' + str(e.__traceback__.tb_lineno))
        raise    

How to make function calls to add/remove entries.
In the example below I create and remove the dns ldap2, test1, test2 and test3.

filename = '/work/python/bind9/devops-db.info'
origin = 'devops-db.info'
hostname='ldap2'
ipaddr='172.21.5.153'
ttl = 7200

tmp_Zones = load_DNSZones(filename)

tmp_lst_Records = get_ARecords(tmp_Zones)
print(tmp_lst_Records)

#######################################################################################################################

tmp_obj_Records = load_ARecords(tmp_Zones, hostname)
if len(tmp_obj_Records) == 0:
    obj_Return = add_ARecords(tmp_obj_Records, ipaddr, ttl)
obj_Return = write_DNSZones(tmp_Zones)

tmp_lst_Records = get_ARecords(tmp_Zones)
print(tmp_lst_Records)


#######################################################################################################################

tmp_obj_Records = load_ARecords(tmp_Zones, 'test1')
if len(tmp_obj_Records) == 0:
    obj_Return = add_ARecords(tmp_obj_Records, '172.21.5.155', ttl)

tmp_obj_Records = load_ARecords(tmp_Zones, 'test2')
if len(tmp_obj_Records) == 0:
    obj_Return = add_ARecords(tmp_obj_Records, '172.21.5.156', ttl)

tmp_obj_Records = load_ARecords(tmp_Zones, 'test3')
if len(tmp_obj_Records) == 0:
    obj_Return = add_ARecords(tmp_obj_Records, '172.21.5.157', ttl)

obj_Return = write_DNSZones(tmp_Zones)

tmp_lst_Records = get_ARecords(tmp_Zones)
print(tmp_lst_Records)


#######################################################################################################################

obj_Return = remove_ARecords(tmp_Zones, 'ldap2')
obj_Return = remove_ARecords(tmp_Zones, 'test1')
obj_Return = remove_ARecords(tmp_Zones, 'test2')
obj_Return = remove_ARecords(tmp_Zones, 'test3')

obj_Return = write_DNSZones(tmp_Zones)

tmp_lst_Records = get_ARecords(tmp_Zones)
print(tmp_lst_Records)

With this procedure, the file loses some of the training we know, but it works in the same way. Example:

devops-db.info. 7200 IN SOA ns1.devops-db.info. admin.devops-db.info. 2024042901 7200 3600 604800 7200 ; NegativeCacheTTL
devops-db.info. 7200 IN NS ns1.devops-db.info.
devops-db.info. 7200 IN NS 8.8.8.8.
devops-db.info. 7200 IN A 172.21.5.72

ldap.devops-db.info. 7200 IN A 172.21.5.150
ldap2.devops-db.info. 7200 IN A 172.21.5.153
ns1.devops-db.info. 7200 IN A 172.21.5.72
test1.devops-db.info. 7200 IN A 172.21.5.155
test2.devops-db.info. 7200 IN A 172.21.5.156
test3.devops-db.info. 7200 IN A 172.21.5.157

Example of how to use from_text and to_text to, in the end, generate the zones file.

str_filename = '/Work/Python/Bind9/devops-db.info'
obj_Zone = dns.zone.from_file(str_filename, os.path.basename(str_filename), relativize=False)

str_Zones = obj_Zone.to_text('devops-db.info', relativize=False, want_comments=True)

lst_Zones = str_Zones.splitlines()
print(lst_Zones)

for str_Line in lst_Zones:
    print(str_Line)

#######################################################################################################################

lst_Zones = ['devops-db.info. 7200 IN SOA ns1.devops-db.info. fausto.branco.devops-db.info. 2024042901 7200 3600 604800 7200 ; NegativeCacheTTL',
'devops-db.info. 7200 IN NS ns1.devops-db.info.',
'devops-db.info. 7200 IN NS 8.8.8.8.',
'devops-db.info. 7200 IN A 172.21.5.72',
'ldap.devops-db.info. 7200 IN A 172.21.5.150',
'ldap2.devops-db.info. 7200 IN A 172.21.5.153',
'ns1.devops-db.info. 7200 IN A 172.21.5.72',
'test1.devops-db.info. 7200 IN A 172.21.5.155',
'test2.devops-db.info. 7200 IN A 172.21.5.156',
'test3.devops-db.info. 7200 IN A 172.21.5.157']

str_Zones = '\n'.join(lst_Zones)

new_zone = dns.zone.from_text(str_Zones, relativize=False, origin='devops-db.info')
new_zone.to_file('devops-db.info', relativize=False, want_comments=True)

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.