#!/usr/bin/env python3 """ Script to fetch IANA root domain database and save to MongoDB collection 'extensions' """ import pymongo import re import requests from bs4 import BeautifulSoup from urllib.parse import urljoin import time from datetime import datetime def fetch_domain_details(extension, url): """Fetch detailed information for a specific domain extension""" try: response = requests.get(url) response.raise_for_status() soup = BeautifulSoup(response.content, 'html.parser') details = { 'name_servers': [], 'whois_server': None, 'rdap_server': None, 'registration_url': None, 'record_updated': None, 'registration_date': None, 'iana_reports': [] } # Extract information from the Registry Information section content_text = soup.get_text() # Find WHOIS Server whois_match = re.search(r'WHOIS Server:\s*(\S+)', content_text) if whois_match: details['whois_server'] = whois_match.group(1) # Find RDAP Server rdap_match = re.search(r'RDAP Server:\s*(\S+)', content_text) if rdap_match: details['rdap_server'] = rdap_match.group(1) # Find registration URL reg_url_match = re.search(r'URL for registration services:\s*\[([^\]]+)\]', content_text) if reg_url_match: details['registration_url'] = reg_url_match.group(1) # Find dates updated_match = re.search(r'Record last updated\s+(\d{4}-\d{2}-\d{2})', content_text) if updated_match: details['record_updated'] = updated_match.group(1) reg_date_match = re.search(r'Registration date\s+(\d{4}-\d{2}-\d{2})', content_text) if reg_date_match: details['registration_date'] = reg_date_match.group(1) # Find IANA Reports report_links = soup.find_all('a', href=re.compile(r'/reports/')) for link in report_links: details['iana_reports'].append({ 'title': link.get_text().strip(), 'url': urljoin(url, link['href']) }) # Look for name servers in tables and other formats name_servers = [] # First try to find name servers in tables tables = soup.find_all('table') for table in tables: rows = table.find_all('tr') for row in rows: cells = row.find_all(['td', 'th']) if len(cells) >= 1: # Check if first cell looks like a name server first_cell = cells[0].get_text().strip() if '.' in first_cell and not first_cell.startswith('http'): # This might be a name server name_servers.append(first_cell) # If no tables found, try text patterns if not name_servers: # Look for name servers in text content ns_patterns = [ r'([a-zA-Z0-9.-]+\.nic\.[a-zA-Z]{2,})', # *.nic.tld pattern r'([a-zA-Z0-9.-]+\.[a-zA-Z]{2,}\s+[0-9.]+)', # hostname + IP pattern r'(ns[0-9]*\.[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})', # ns*.domain.tld pattern r'([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})\s+[0-9.]+\s+[0-9a-fA-F:]+' # hostname + IPv4 + IPv6 ] for pattern in ns_patterns: matches = re.findall(pattern, content_text, re.IGNORECASE) for match in matches: if isinstance(match, tuple): # If regex returns groups, take the first one ns_name = match[0] if match[0] else match[1] if len(match) > 1 else '' else: ns_name = match if ns_name and '.' in ns_name and len(ns_name) > 5: name_servers.append(ns_name.strip()) # Also try to find name servers in pre/code blocks pre_blocks = soup.find_all(['pre', 'code']) for block in pre_blocks: block_text = block.get_text().strip() lines = block_text.split('\n') for line in lines: # Look for lines that contain server names if re.match(r'^[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', line.strip()): parts = line.strip().split('\t') if parts: name_servers.append(parts[0].strip()) # Remove duplicates and clean up details['name_servers'] = list(set([ns for ns in name_servers if ns and len(ns) > 3])) print(f" - WHOIS: {details['whois_server']}") print(f" - RDAP: {details['rdap_server']}") print(f" - Registration URL: {details['registration_url']}") print(f" - Name Servers: {details['name_servers']}") return details except Exception as e: print(f"Error fetching details for {extension}: {e}") return { 'name_servers': [], 'whois_server': None, 'rdap_server': None, 'registration_url': None, 'record_updated': None, 'registration_date': None, 'iana_reports': [] } def fetch_iana_domains(): """Fetch all domain extensions from IANA root database""" base_url = "https://www.iana.org/domains/root/db" print("Fetching IANA root domain database...") response = requests.get(base_url) response.raise_for_status() soup = BeautifulSoup(response.content, 'html.parser') # Find the main table with domain information domains = [] # Look for the first table that contains domain data tables = soup.find_all('table') main_table = None for table in tables: rows = table.find_all('tr') if len(rows) > 1: # Check if table has data rows # Check if first row contains our expected headers first_row = rows[0] headers = [cell.get_text().strip() for cell in first_row.find_all(['th', 'td'])] if 'Domain' in headers and 'Type' in headers and 'TLD Manager' in headers: main_table = table break if not main_table: print("Could not find the main domain table!") return domains # Extract data from the main table rows = main_table.find_all('tr')[1:] # Skip header row for row in rows: cells = row.find_all(['td', 'th']) if len(cells) >= 3: domain_text = cells[0].get_text().strip() domain_type = cells[1].get_text().strip() manager = cells[2].get_text().strip() # Remove leading dot from domain if domain_text.startswith('.'): domain_text = domain_text[1:] if domain_text and len(domain_text) > 0: url = urljoin(base_url, f"/domains/root/db/{domain_text}.html") domains.append({ 'extension': domain_text, 'url': url, 'type': domain_type if domain_type else None, 'manager': manager if manager else None }) print(f"Found {len(domains)} domain extensions from main table") return domains def get_domain_type_and_manager(soup, extension, details): """Extract domain type and TLD manager from the individual domain page""" # Look for domain type in parentheses type_pattern = re.compile(r'\(([^)]*top-level domain[^)]*)\)') # Search in all text content content = soup.get_text() type_match = type_pattern.search(content) domain_type = type_match.group(1) if type_match else None # Extract TLD Manager from Sponsoring Organisation manager = None # Look for "Sponsoring Organisation" header and get the next bold text h2_tags = soup.find_all('h2') for h2 in h2_tags: if 'Sponsoring Organisation' in h2.get_text(): # Get the next bold tag after this h2 next_bold = h2.find_next('b') if next_bold: manager = next_bold.get_text().strip() break # If no manager found from sponsoring org, try registration URL if not manager and details.get('registration_url'): reg_url = details['registration_url'] # Extract domain name as manager url_match = re.search(r'https?://(?:www\.)?([^/]+)', reg_url) if url_match: manager = url_match.group(1) else: # If it's just a URL without protocol, use as is manager = reg_url.replace('http://', '').replace('https://', '').replace('www.', '') # If still no manager found, try other patterns if not manager: registry_patterns = [ r'Registry[:\s]+([^\n]+)', r'Sponsor[:\s]+([^\n]+)', r'Manager[:\s]+([^\n]+)', r'Organization[:\s]+([^\n]+)' ] for pattern in registry_patterns: match = re.search(pattern, content, re.IGNORECASE) if match: manager = match.group(1).strip() break return domain_type, manager def main(): # MongoDB connection parameters mongo_uri = "mongodb://l2:27017/iana" try: # Connect to MongoDB print("Connecting to MongoDB...") client = pymongo.MongoClient(mongo_uri) db = client.get_database() collection = db.extensions # Fetch domain list from IANA domains = fetch_iana_domains() if not domains: print("No domains found. Exiting.") return total_domains = len(domains) processed_count = 0 # Process each domain for i, domain in enumerate(domains, 1): extension = domain['extension'] url = domain['url'] print(f"\nProcessing {extension} ({i}/{total_domains})...") # Fetch detailed information (name servers, WHOIS, RDAP, etc.) details = fetch_domain_details(extension, url) # Use pre-extracted type and manager from main table domain_type = domain.get('type') manager = domain.get('manager') print(f" - Domain Type: {domain_type}") print(f" - TLD Manager: {manager}") # Create document for MongoDB document = { 'extension': extension, 'url': url, 'type': domain_type, 'manager': manager, 'whois_server': details['whois_server'], 'rdap_server': details['rdap_server'], 'registration_url': details['registration_url'], 'name_servers': details['name_servers'], 'record_updated': details['record_updated'], 'registration_date': details['registration_date'], 'iana_reports': details['iana_reports'], 'last_fetched': datetime.utcnow(), 'fetched_at': datetime.now().isoformat() } # Upsert to MongoDB (update if exists, insert if new) collection.update_one( {'extension': extension}, {'$set': document}, upsert=True ) processed_count += 1 print(f" āœ“ Saved {extension} to MongoDB") # Add a 3-second delay to be respectful to the server time.sleep(3) # Verify insertion total_count = collection.count_documents({}) print(f"\nāœ… Total extensions in MongoDB: {total_count}") print(f"āœ… Processed {processed_count} domains this run") # Show sample data print("\nSample data:") sample_docs = collection.find().limit(5) for doc in sample_docs: print(f" {doc['extension']}: {doc.get('type', 'N/A')} - {doc.get('manager', 'N/A')}") # Show statistics by type type_stats = list(collection.aggregate([ {'$match': {'type': {'$ne': None}}}, {'$group': {'_id': '$type', 'count': {'$sum': 1}}}, {'$sort': {'count': -1}} ])) print(f"\nDomain types:") for stat in type_stats: print(f" {stat['_id']}: {stat['count']}") client.close() print("\nMongoDB connection closed.") except Exception as e: print(f"Error: {e}") if __name__ == "__main__": main()