Files
get-domain-suffix-iana/update_data_domain_zone.py
2026-03-11 23:08:57 +05:30

347 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Script to update domain_zone table with additional data fields
"""
import mysql.connector
import requests
import json
import sys
import time
from bs4 import BeautifulSoup
import re
import idna
# Database connection configuration
DB_CONFIG = {
'host': 'l2',
'port': 3306,
'user': 'root',
'password': None, # Will be set from command line or input
'database': 'sp_spider',
'charset': 'utf8mb4',
'ssl_disabled': True,
'auth_plugin': 'mysql_native_password'
}
def get_iana_registry_info(domain):
"""Get registry information from IANA for a domain"""
try:
# Remove dot prefix if present
clean_domain = domain.lstrip('.')
# Try to access the IANA detail page
url = f"https://www.iana.org/domains/root/db/{clean_domain}"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
info = {
'registration_url': None,
'whois_server': None,
'rdap_server': None,
'name_servers': [],
'sponsoring': None,
'administrative': None,
'technical': None
}
# Extract registration URL
reg_links = soup.find_all('a', href=re.compile(r'registrar|registry', re.IGNORECASE))
if reg_links:
info['registration_url'] = reg_links[0].get('href')
# Extract WHOIS server
whois_text = soup.find(text=re.compile(r'WHOIS|whois', re.IGNORECASE))
if whois_text:
parent = whois_text.parent
if parent and parent.name == 'td':
next_td = parent.find_next_sibling('td')
if next_td:
info['whois_server'] = next_td.get_text(strip=True)
# Extract RDAP server
rdap_text = soup.find(text=re.compile(r'RDAP|rdap', re.IGNORECASE))
if rdap_text:
parent = rdap_text.parent
if parent and parent.name == 'td':
next_td = parent.find_next_sibling('td')
if next_td:
info['rdap_server'] = next_td.get_text(strip=True)
# Extract name servers
ns_text = soup.find(text=re.compile(r'name.?server|ns', re.IGNORECASE))
if ns_text:
parent = ns_text.parent
if parent and parent.name == 'td':
next_td = parent.find_next_sibling('td')
if next_td:
ns_list = next_td.get_text(strip=True).split(',')
info['name_servers'] = [ns.strip() for ns in ns_list if ns.strip()]
# Extract sponsoring organization
sponsor_text = soup.find(text=re.compile(r'sponsor|registry|manager', re.IGNORECASE))
if sponsor_text:
parent = sponsor_text.parent
if parent and parent.name == 'td':
next_td = parent.find_next_sibling('td')
if next_td:
info['sponsoring'] = next_td.get_text(strip=True)
# Extract administrative contact
admin_text = soup.find(text=re.compile(r'administrative|admin', re.IGNORECASE))
if admin_text:
parent = admin_text.parent
if parent and parent.name == 'td':
next_td = parent.find_next_sibling('td')
if next_td:
info['administrative'] = next_td.get_text(strip=True)
# Extract technical contact
tech_text = soup.find(text=re.compile(r'technical|tech', re.IGNORECASE))
if tech_text:
parent = tech_text.parent
if parent and parent.name == 'td':
next_td = parent.find_next_sibling('td')
if next_td:
info['technical'] = next_td.get_text(strip=True)
return info
except Exception as e:
print(f"Error fetching IANA info for {domain}: {e}")
return None
def get_rdap_info(domain):
"""Get RDAP information for a domain"""
try:
# Remove dot prefix if present
clean_domain = domain.lstrip('.')
# Try common RDAP servers
rdap_servers = [
f"https://rdap.org/domain/{clean_domain}",
f"https://data.iana.org/rdap/{clean_domain}",
f"https://rdap.verisign.com/com/v1/domain/{clean_domain}",
f"https://rdap.nic.fr/domain/{clean_domain}"
]
for rdap_url in rdap_servers:
try:
response = requests.get(rdap_url, timeout=5)
if response.status_code == 200:
data = response.json()
info = {
'rdap_server': rdap_url.split('/')[2], # Extract server domain
'name_servers': [],
'port43': None
}
# Extract name servers from RDAP data
if 'nameservers' in data:
info['name_servers'] = [ns.get('ldhName', '') for ns in data['nameservers'] if ns.get('ldhName')]
# Extract WHOIS (port43) server
if 'port43' in data:
info['port43'] = data['port43'].get('server', '')
return info
except Exception:
continue
return None
except Exception as e:
print(f"Error fetching RDAP info for {domain}: {e}")
return None
def get_dns_servers(domain):
"""Get DNS servers using DNS resolution"""
try:
import dns.resolver
clean_domain = domain.lstrip('.')
# Try to get NS records
answers = dns.resolver.resolve(clean_domain, 'NS')
name_servers = [str(rdata) for rdata in answers]
return {'name_servers': name_servers}
except Exception:
return None
def update_domain_data():
"""Update domain_zone table with additional data"""
try:
conn = mysql.connector.connect(**DB_CONFIG)
cursor = conn.cursor()
# Get domains that need data enrichment
cursor.execute("""
SELECT id, domain, root_utf
FROM domain_zone
WHERE removed = FALSE
AND (registration_url IS NULL OR whois_server IS NULL OR rdap_server IS NULL)
ORDER BY id
LIMIT 50
""")
domains_to_update = cursor.fetchall()
if not domains_to_update:
print("All domains already have complete data!")
return True
print(f"Updating data for {len(domains_to_update)} domains...")
for domain_id, domain, root_utf in domains_to_update:
print(f"Processing {domain}...")
# Get IANA registry information
iana_info = get_iana_registry_info(domain)
# Get RDAP information
rdap_info = get_rdap_info(domain)
# Get DNS servers
dns_info = get_dns_servers(domain)
# Merge information
update_data = {
'registration_url': iana_info.get('registration_url') if iana_info else None,
'whois_server': iana_info.get('whois_server') if iana_info else None,
'rdap_server': rdap_info.get('rdap_server') if rdap_info else None,
'name_servers': json.dumps(rdap_info.get('name_servers', []) if rdap_info else (dns_info.get('name_servers', []) if dns_info else [])),
'sponsoring': iana_info.get('sponsoring') if iana_info else None,
'administrative': iana_info.get('administrative') if iana_info else None,
'technical': iana_info.get('technical') if iana_info else None
}
# Update database
cursor.execute("""
UPDATE domain_zone SET
registration_url = %s,
whois_server = %s,
rdap_server = %s,
name_servers = %s,
sponsoring = %s,
administrative = %s,
technical = %s,
updated_at = CURRENT_TIMESTAMP
WHERE id = %s
""", (
update_data['registration_url'],
update_data['whois_server'],
update_data['rdap_server'],
update_data['name_servers'],
update_data['sponsoring'],
update_data['administrative'],
update_data['technical'],
domain_id
))
conn.commit()
# Rate limiting
time.sleep(1)
print(f"Updated {len(domains_to_update)} domains successfully")
# Show statistics
cursor.execute("""
SELECT
COUNT(*) as total,
COUNT(CASE WHEN registration_url IS NOT NULL THEN 1 END) as with_reg_url,
COUNT(CASE WHEN whois_server IS NOT NULL THEN 1 END) as with_whois,
COUNT(CASE WHEN rdap_server IS NOT NULL THEN 1 END) as with_rdap,
COUNT(CASE WHEN name_servers IS NOT NULL THEN 1 END) as with_ns
FROM domain_zone
WHERE removed = FALSE
""")
stats = cursor.fetchone()
print(f"\nDatabase Statistics:")
print(f" Total domains: {stats[0]}")
print(f" With registration URL: {stats[1]}")
print(f" With WHOIS server: {stats[2]}")
print(f" With RDAP server: {stats[3]}")
print(f" With name servers: {stats[4]}")
return True
except mysql.connector.Error as e:
print(f"Database error: {e}")
return False
finally:
if 'conn' in locals() and conn.is_connected():
cursor.close()
conn.close()
def show_sample_data():
"""Show sample data from domain_zone table"""
try:
conn = mysql.connector.connect(**DB_CONFIG)
cursor = conn.cursor()
print("\n=== Sample enriched data from domain_zone table ===")
cursor.execute("""
SELECT id, domain, root_utf, whois_server, rdap_server,
JSON_LENGTH(name_servers) as ns_count, sponsoring
FROM domain_zone
WHERE removed = FALSE
AND (registration_url IS NOT NULL OR whois_server IS NOT NULL)
ORDER BY id
LIMIT 10
""")
for row in cursor.fetchall():
domain_display = row[1]
if row[1].startswith('xn--') and row[2]:
domain_display = f"{row[1]} ({row[2]})"
print(f"{row[0]} {domain_display}")
print(f" WHOIS: {row[3] or 'N/A'}")
print(f" RDAP: {row[4] or 'N/A'}")
print(f" Name Servers: {row[5] or 0}")
print(f" Sponsor: {row[6] or 'N/A'}")
print()
cursor.close()
conn.close()
except mysql.connector.Error as e:
print(f"Database error: {e}")
def main():
import getpass
# Get password from command line argument or prompt
if len(sys.argv) > 1:
password = sys.argv[1]
else:
password = getpass.getpass("Enter MariaDB password for user 'root': ")
DB_CONFIG['password'] = password
print("Starting domain_zone data enrichment process...")
# Update domain data
if not update_domain_data():
print("Failed to update domain data")
sys.exit(1)
# Show sample data
show_sample_data()
print("\n=== Domain data enrichment completed ===")
if __name__ == "__main__":
main()