359 lines
13 KiB
Python
359 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Script to fetch IANA root domain database and save as JSON files
|
|
"""
|
|
|
|
import re
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
from urllib.parse import urljoin
|
|
import time
|
|
import json
|
|
import os
|
|
from datetime import datetime
|
|
import idna
|
|
|
|
def fetch_domain_details(extension, url):
|
|
"""Fetch detailed information for a specific domain extension"""
|
|
try:
|
|
response = requests.get(url)
|
|
response.raise_for_status()
|
|
|
|
soup = BeautifulSoup(response.content, 'html.parser')
|
|
|
|
details = {
|
|
'name_servers': [],
|
|
'whois_server': None,
|
|
'rdap_server': None,
|
|
'registration_url': None,
|
|
'record_updated': None,
|
|
'registration_date': None,
|
|
'iana_reports': []
|
|
}
|
|
|
|
# Extract information from the Registry Information section
|
|
content_text = soup.get_text()
|
|
|
|
# Find WHOIS Server
|
|
whois_match = re.search(r'WHOIS Server:\s*(\S+)', content_text)
|
|
if whois_match:
|
|
details['whois_server'] = whois_match.group(1)
|
|
|
|
# Find RDAP Server
|
|
rdap_match = re.search(r'RDAP Server:\s*(\S+)', content_text)
|
|
if rdap_match:
|
|
details['rdap_server'] = rdap_match.group(1)
|
|
|
|
# Find registration URL
|
|
reg_url_match = re.search(r'URL for registration services:\s*\[([^\]]+)\]', content_text)
|
|
if reg_url_match:
|
|
details['registration_url'] = reg_url_match.group(1)
|
|
|
|
# Find dates
|
|
updated_match = re.search(r'Record last updated\s+(\d{4}-\d{2}-\d{2})', content_text)
|
|
if updated_match:
|
|
details['record_updated'] = updated_match.group(1)
|
|
|
|
reg_date_match = re.search(r'Registration date\s+(\d{4}-\d{2}-\d{2})', content_text)
|
|
if reg_date_match:
|
|
details['registration_date'] = reg_date_match.group(1)
|
|
|
|
# Find IANA Reports
|
|
report_links = soup.find_all('a', href=re.compile(r'/reports/'))
|
|
for link in report_links:
|
|
details['iana_reports'].append({
|
|
'title': link.get_text().strip(),
|
|
'url': urljoin(url, link['href'])
|
|
})
|
|
|
|
# Look for name servers in tables and other formats
|
|
name_servers = []
|
|
|
|
# First try to find name servers in tables
|
|
tables = soup.find_all('table')
|
|
for table in tables:
|
|
rows = table.find_all('tr')
|
|
for row in rows:
|
|
cells = row.find_all(['td', 'th'])
|
|
if len(cells) >= 1:
|
|
# Check if first cell looks like a name server
|
|
first_cell = cells[0].get_text().strip()
|
|
if '.' in first_cell and not first_cell.startswith('http'):
|
|
# This might be a name server
|
|
name_servers.append(first_cell)
|
|
|
|
# If no tables found, try text patterns
|
|
if not name_servers:
|
|
# Look for name servers in text content
|
|
ns_patterns = [
|
|
r'([a-zA-Z0-9.-]+\.nic\.[a-zA-Z]{2,})', # *.nic.tld pattern
|
|
r'([a-zA-Z0-9.-]+\.[a-zA-Z]{2,}\s+[0-9.]+)', # hostname + IP pattern
|
|
r'(ns[0-9]*\.[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})', # ns*.domain.tld pattern
|
|
r'([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})\s+[0-9.]+\s+[0-9a-fA-F:]+' # hostname + IPv4 + IPv6
|
|
]
|
|
|
|
for pattern in ns_patterns:
|
|
matches = re.findall(pattern, content_text, re.IGNORECASE)
|
|
for match in matches:
|
|
if isinstance(match, tuple):
|
|
# If regex returns groups, take the first one
|
|
ns_name = match[0] if match[0] else match[1] if len(match) > 1 else ''
|
|
else:
|
|
ns_name = match
|
|
|
|
if ns_name and '.' in ns_name and len(ns_name) > 5:
|
|
name_servers.append(ns_name.strip())
|
|
|
|
# Also try to find name servers in pre/code blocks
|
|
pre_blocks = soup.find_all(['pre', 'code'])
|
|
for block in pre_blocks:
|
|
block_text = block.get_text().strip()
|
|
lines = block_text.split('\n')
|
|
for line in lines:
|
|
# Look for lines that contain server names
|
|
if re.match(r'^[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', line.strip()):
|
|
parts = line.strip().split('\t')
|
|
if parts:
|
|
name_servers.append(parts[0].strip())
|
|
|
|
# Remove duplicates and clean up
|
|
details['name_servers'] = list(set([ns for ns in name_servers if ns and len(ns) > 3]))
|
|
|
|
print(f" - WHOIS: {details['whois_server']}")
|
|
print(f" - RDAP: {details['rdap_server']}")
|
|
print(f" - Registration URL: {details['registration_url']}")
|
|
print(f" - Name Servers: {details['name_servers']}")
|
|
|
|
return details
|
|
|
|
except Exception as e:
|
|
print(f"Error fetching details for {extension}: {e}")
|
|
return {
|
|
'name_servers': [],
|
|
'whois_server': None,
|
|
'rdap_server': None,
|
|
'registration_url': None,
|
|
'record_updated': None,
|
|
'registration_date': None,
|
|
'iana_reports': []
|
|
}
|
|
|
|
def fetch_iana_domains():
|
|
"""Fetch all domain extensions from IANA root database"""
|
|
base_url = "https://www.iana.org/domains/root/db"
|
|
|
|
print("Fetching IANA root domain database...")
|
|
response = requests.get(base_url)
|
|
response.raise_for_status()
|
|
|
|
soup = BeautifulSoup(response.content, 'html.parser')
|
|
|
|
# Find the main table with domain information
|
|
domains = []
|
|
|
|
# Look for the first table that contains domain data
|
|
tables = soup.find_all('table')
|
|
main_table = None
|
|
|
|
for table in tables:
|
|
rows = table.find_all('tr')
|
|
if len(rows) > 1: # Check if table has data rows
|
|
# Check if first row contains our expected headers
|
|
first_row = rows[0]
|
|
headers = [cell.get_text().strip() for cell in first_row.find_all(['th', 'td'])]
|
|
if 'Domain' in headers and 'Type' in headers and 'TLD Manager' in headers:
|
|
main_table = table
|
|
break
|
|
|
|
if not main_table:
|
|
print("Could not find the main domain table!")
|
|
return domains
|
|
|
|
# Extract data from the main table
|
|
rows = main_table.find_all('tr')[1:] # Skip header row
|
|
|
|
for row in rows:
|
|
cells = row.find_all(['td', 'th'])
|
|
if len(cells) >= 3:
|
|
domain_text = cells[0].get_text().strip()
|
|
domain_type = cells[1].get_text().strip()
|
|
manager = cells[2].get_text().strip()
|
|
|
|
# Remove leading dot from domain
|
|
if domain_text.startswith('.'):
|
|
domain_text = domain_text[1:]
|
|
|
|
if domain_text and len(domain_text) > 0:
|
|
# Convert to Punycode for URL if it contains non-ASCII characters
|
|
try:
|
|
punycode_extension = idna.encode(domain_text).decode('ascii')
|
|
url = urljoin(base_url, f"/domains/root/db/{punycode_extension}.html")
|
|
except (idna.IDNAError, UnicodeError):
|
|
# Fallback to original if Punycode conversion fails
|
|
url = urljoin(base_url, f"/domains/root/db/{domain_text}.html")
|
|
|
|
domains.append({
|
|
'extension': domain_text,
|
|
'url': url,
|
|
'type': domain_type if domain_type else None,
|
|
'manager': manager if manager else None
|
|
})
|
|
|
|
print(f"Found {len(domains)} domain extensions from main table")
|
|
return domains
|
|
|
|
def get_domain_type_and_manager(soup, extension, details):
|
|
"""Extract domain type and TLD manager from the individual domain page"""
|
|
# Look for domain type in parentheses
|
|
type_pattern = re.compile(r'\(([^)]*top-level domain[^)]*)\)')
|
|
|
|
# Search in all text content
|
|
content = soup.get_text()
|
|
type_match = type_pattern.search(content)
|
|
domain_type = type_match.group(1) if type_match else None
|
|
|
|
# Extract TLD Manager from Sponsoring Organisation
|
|
manager = None
|
|
|
|
# Look for "Sponsoring Organisation" header and get the next bold text
|
|
h2_tags = soup.find_all('h2')
|
|
for h2 in h2_tags:
|
|
if 'Sponsoring Organisation' in h2.get_text():
|
|
# Get the next bold tag after this h2
|
|
next_bold = h2.find_next('b')
|
|
if next_bold:
|
|
manager = next_bold.get_text().strip()
|
|
break
|
|
|
|
# If no manager found from sponsoring org, try registration URL
|
|
if not manager and details.get('registration_url'):
|
|
reg_url = details['registration_url']
|
|
# Extract domain name as manager
|
|
url_match = re.search(r'https?://(?:www\.)?([^/]+)', reg_url)
|
|
if url_match:
|
|
manager = url_match.group(1)
|
|
else:
|
|
# If it's just a URL without protocol, use as is
|
|
manager = reg_url.replace('http://', '').replace('https://', '').replace('www.', '')
|
|
|
|
# If still no manager found, try other patterns
|
|
if not manager:
|
|
registry_patterns = [
|
|
r'Registry[:\s]+([^\n]+)',
|
|
r'Sponsor[:\s]+([^\n]+)',
|
|
r'Manager[:\s]+([^\n]+)',
|
|
r'Organization[:\s]+([^\n]+)'
|
|
]
|
|
|
|
for pattern in registry_patterns:
|
|
match = re.search(pattern, content, re.IGNORECASE)
|
|
if match:
|
|
manager = match.group(1).strip()
|
|
break
|
|
|
|
return domain_type, manager
|
|
|
|
def main():
|
|
# Create data directory if it doesn't exist
|
|
data_dir = 'data'
|
|
if not os.path.exists(data_dir):
|
|
os.makedirs(data_dir)
|
|
print(f"Created {data_dir} directory")
|
|
|
|
try:
|
|
# Fetch domain list from IANA
|
|
domains = fetch_iana_domains()
|
|
|
|
if not domains:
|
|
print("No domains found. Exiting.")
|
|
return
|
|
|
|
total_domains = len(domains)
|
|
processed_count = 0
|
|
|
|
# Process each domain
|
|
for i, domain in enumerate(domains, 1):
|
|
extension = domain['extension']
|
|
url = domain['url']
|
|
|
|
print(f"\nProcessing {extension} ({i}/{total_domains})...")
|
|
|
|
# Fetch detailed information (name servers, WHOIS, RDAP, etc.)
|
|
details = fetch_domain_details(extension, url)
|
|
|
|
# Use pre-extracted type and manager from main table
|
|
domain_type = domain.get('type')
|
|
manager = domain.get('manager')
|
|
|
|
print(f" - Domain Type: {domain_type}")
|
|
print(f" - TLD Manager: {manager}")
|
|
|
|
# Create document for JSON
|
|
document = {
|
|
'extension': extension,
|
|
'url': url,
|
|
'type': domain_type,
|
|
'manager': manager,
|
|
'whois_server': details['whois_server'],
|
|
'rdap_server': details['rdap_server'],
|
|
'registration_url': details['registration_url'],
|
|
'name_servers': details['name_servers'],
|
|
'record_updated': details['record_updated'],
|
|
'registration_date': details['registration_date'],
|
|
'iana_reports': details['iana_reports'],
|
|
'fetched_at': datetime.now().isoformat()
|
|
}
|
|
|
|
# Save as individual JSON file
|
|
filename = os.path.join(data_dir, f"{extension}.json")
|
|
with open(filename, 'w', encoding='utf-8') as f:
|
|
json.dump(document, f, indent=2, ensure_ascii=False)
|
|
|
|
processed_count += 1
|
|
print(f" ✓ Saved {extension}.json")
|
|
|
|
# Add a 3-second delay to be respectful to the server
|
|
time.sleep(3)
|
|
|
|
# Create a summary JSON file with all domains
|
|
summary_file = os.path.join(data_dir, 'all_domains.json')
|
|
summary_data = {
|
|
'total_domains': total_domains,
|
|
'processed_domains': processed_count,
|
|
'fetched_at': datetime.now().isoformat(),
|
|
'domains': []
|
|
}
|
|
|
|
# Add basic info for all domains
|
|
for domain in domains:
|
|
summary_data['domains'].append({
|
|
'extension': domain['extension'],
|
|
'type': domain.get('type'),
|
|
'manager': domain.get('manager'),
|
|
'url': domain['url']
|
|
})
|
|
|
|
with open(summary_file, 'w', encoding='utf-8') as f:
|
|
json.dump(summary_data, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"\n✅ Total extensions processed: {processed_count}")
|
|
print(f"✅ Individual JSON files saved in {data_dir}/")
|
|
print(f"✅ Summary file saved as {summary_file}")
|
|
|
|
# Show statistics by type
|
|
type_stats = {}
|
|
for domain in domains:
|
|
dtype = domain.get('type')
|
|
if dtype:
|
|
type_stats[dtype] = type_stats.get(dtype, 0) + 1
|
|
|
|
print(f"\nDomain types:")
|
|
for dtype, count in sorted(type_stats.items(), key=lambda x: x[1], reverse=True):
|
|
print(f" {dtype}: {count}")
|
|
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|