This commit is contained in:
Kar
2026-03-11 23:08:57 +05:30
parent 26e70981ee
commit 95aab950da
6 changed files with 892 additions and 1 deletions

View File

@@ -0,0 +1,23 @@
-- Create domain_zone table with comprehensive data fields
CREATE TABLE IF NOT EXISTS domain_zone (
id INT AUTO_INCREMENT PRIMARY KEY,
domain VARCHAR(63) NOT NULL UNIQUE,
root_utf VARCHAR(255),
punycode BOOLEAN DEFAULT FALSE,
registration_url TEXT,
whois_server VARCHAR(255),
rdap_server VARCHAR(255),
name_servers JSON,
sponsoring TEXT,
administrative TEXT,
technical TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
removed BOOLEAN DEFAULT FALSE,
INDEX idx_domain (domain),
INDEX idx_removed (removed),
INDEX idx_root_utf (root_utf),
INDEX idx_punycode (punycode),
INDEX idx_whois_server (whois_server),
INDEX idx_rdap_server (rdap_server)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

140
fetch_domain_zone.py Normal file
View File

@@ -0,0 +1,140 @@
#!/usr/bin/env python3
"""
Script to fetch domain zone data from IANA root zone database
"""
import requests
from bs4 import BeautifulSoup
import sys
import re
def fetch_domain_zone_data():
"""Fetch domain zone data from IANA root zone database"""
try:
# Add headers to mimic a browser
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get('https://www.iana.org/domains/root/db', headers=headers)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
# Find the main table with domain data
domains = []
# Look for table rows containing domain information
# The IANA page typically has a table with domain, type, and manager information
table = soup.find('table')
if table:
rows = table.find_all('tr')
for row in rows[1:]: # Skip header row
cells = row.find_all('td')
if len(cells) >= 3:
domain = cells[0].get_text(strip=True).lower()
domain_type = cells[1].get_text(strip=True)
tld_manager = cells[2].get_text(strip=True)
# Clean up the data
domain = re.sub(r'[^a-z0-9.-]', '', domain)
domain_type = re.sub(r'\s+', ' ', domain_type)
tld_manager = re.sub(r'\s+', ' ', tld_manager)
if domain and domain != '.':
domains.append({
'domain': domain,
'type': domain_type,
'tld_manager': tld_manager
})
# If table approach doesn't work, try alternative parsing
if not domains:
# Look for domain links in the page
domain_links = soup.find_all('a', href=re.compile(r'/domains/root/db/'))
for link in domain_links:
domain_text = link.get_text(strip=True).lower()
if domain_text and len(domain_text) > 1 and not domain_text.startswith('.'):
# Try to get more info by following the link
detail_url = f"https://www.iana.org{link['href']}"
detail_data = fetch_domain_detail(detail_url)
if detail_data:
domains.append(detail_data)
print(f"Fetched {len(domains)} domains from IANA root zone database")
return domains
except requests.RequestException as e:
print(f"Error fetching IANA root zone data: {e}")
return None
except Exception as e:
print(f"Error parsing IANA data: {e}")
return None
def fetch_domain_detail(detail_url):
"""Fetch detailed information for a specific domain"""
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(detail_url, headers=headers)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
# Extract domain name from URL or page
domain = detail_url.split('/')[-1].lower()
# Look for type and manager information
domain_type = "generic" # default
tld_manager = "Unknown"
# Try to find type information
type_elements = soup.find_all(text=re.compile(r'Type|Type of domain', re.IGNORECASE))
for element in type_elements:
parent = element.parent
if parent:
next_sibling = parent.find_next_sibling() or parent.find_next()
if next_sibling:
domain_type = next_sibling.get_text(strip=True)
break
# Try to find manager information
manager_elements = soup.find_all(text=re.compile(r'Manager|Sponsor|Registry', re.IGNORECASE))
for element in manager_elements:
parent = element.parent
if parent:
next_sibling = parent.find_next_sibling() or parent.find_next()
if next_sibling:
tld_manager = next_sibling.get_text(strip=True)
break
return {
'domain': domain,
'type': domain_type,
'tld_manager': tld_manager
}
except Exception as e:
print(f"Error fetching detail for {detail_url}: {e}")
return None
def main():
print("Fetching IANA root zone database data...")
domains = fetch_domain_zone_data()
if domains:
print(f"\nSample data:")
for i, domain in enumerate(domains[:10]):
print(f"{i+1}. {domain['domain']} - {domain['type']} - {domain['tld_manager']}")
print(f"\nTotal domains fetched: {len(domains)}")
return domains
else:
print("Failed to fetch domain data")
return None
if __name__ == "__main__":
main()

119
idn_mappings.py Normal file
View File

@@ -0,0 +1,119 @@
#!/usr/bin/env python3
"""
Script to map Punycode TLDs to their Unicode representations
"""
import idna
# Known IDN TLD mappings
IDN_MAPPINGS = {
'xn--p1ai': '.рф', # Russia
'xn--fiqs8s': '.中国', # China
'xn--fiqz9s': '.中國', # China (traditional)
'xn--lgbbat1ad8j': '.الجزائر', # Algeria
'xn--yfro4i67o': '.קום', # Israel (KOM)
'xn--4gbrim': '.مصر', # Egypt
'xn--55qx5d': '.موريتانيا', # Mauritania
'xn--80akhbyknj4f': '.հայ', # Armenia
'xn--80asehdb': '.бел', # Belarus
'xn--90a3ac': '.мкд', # Macedonia
'xn--45brj9c': '.бг', # Bulgaria
'xn--p1ai': '.рф', # Russia (duplicate)
'xn--hlcj6aya': '.سوريا', # Syria
'xn--mgbcpq6gpa1a': '.السعودية', # Saudi Arabia
'xn--ogbpf8fl': '.سودان', # Sudan
'xn--kprw13d': '.გე', # Georgia
'xn--kpry57d': '.გე', # Georgia (alternative)
'xn--o1ac': '.ελ', # Greece
'xn--80ao21a': '.қаз', # Kazakhstan
'xn--fgbp6a': '.مغرب', # Morocco
'xn--j1amh': '.укр', # Ukraine
'xn--mix891f': '.ไทย', # Thailand
'xn--mix082f': '.ไทย', # Thailand (alternative)
'xn--mxtq1m': '.新加坡', # Singapore
'xn--node': '.नेट', # India (NET)
'xn--j6w193g': '.香港', # Hong Kong
'xn--55qw42g': '.中国', # China (alternative)
'xn--5tzm5g': '.台灣', # Taiwan
'xn--6frz82g': '.ලංකා', # Sri Lanka
'xn--80adxhks': '.мкд', # Macedonia (alternative)
'xn--l1acc': '.мон', # Mongolia
'xn--9t4b11yi5a': '.இலங்கை', # Sri Lanka (alternative)
'xn--rhqv96g': '.世博', # Expo
'xn--0zwm56d': '.澳洲', # Australia
'xn--czru2d': '.कोम', # India (COM)
'xn--d1acj3b': '.дети', # Kids
'xn--d1alf': '.москва', # Moscow
'xn--h2brj9c': '.срб', # Serbia
'xn--h2breg3eve': '.срб', # Serbia (alternative)
'xn--k1x57d': '.新加坡', # Singapore (alternative)
'xn--mgbbh1a71e': '.امارات', # UAE
'xn--mgbaam7a8h': '.الاردن', # Jordan
'xn--mgbayh7gpa': '.الاردن', # Jordan (alternative)
'xn--y9a3aq': '.հայ', # Armenia (alternative)
'xn--mgbx4cd0ab': '.مليسيا', # Malaysia
'xn--54b7fta0cc': '.بھارت', # India
'xn--90ae5b': '.بازار', # Iran (Bazaar)
'xn--l1nej': '.موقع', # Iran (Site)
'xn--mgbgu82a': '.شبكة', # Iran (Network)
'xn--fiq64b': '.कॉम', # India (COM alternative)
'xn--kcrx77d1x4a': '.சிங்கப்பூர்', # Singapore (Tamil)
'xn--i1b6b1a6a2e': '.संगठन', # India (Organization)
'xn--nqv7f': '.فلسطين', # Palestine
'xn--qqh11a': '.مصر', # Egypt (alternative)
'xn--c1avg': '.бел', # Belarus (alternative)
'xn--e1a4c': '.ею', # European Union
'xn--8h0a': '.ايران', # Iran
'xn--1qqw23a': '.游戏', # China (Game)
'xn--3bst00m': '.公司', # China (Company)
'xn--45br5cyl': '.бг', # Bulgaria (alternative)
'xn--s9brj9c': '.срб', # Serbia (alternative)
'xn--czrs0t': '.कोम', # India (COM alternative)
'xn--czr694b': '.कॉम', # India (COM alternative)
'xn--gecrj9c': '.克罗地亚', # Croatia
'xn--p1ai': '.рф', # Russia (duplicate)
'xn--9krt00a': '.日本', # Japan
'xn--xkc2dl3a5ee0h': '.ಭಾರತ', # India (Kannada)
'xn--fzys8d69uvgm': '.تونس', # Tunisia
'xn--fzc2c9e2c': '.السعودية', # Saudi Arabia (alternative)
}
def punycode_to_unicode(punycode):
"""Convert Punycode to Unicode representation"""
try:
if punycode.startswith('xn--'):
return idna.decode(punycode)
return punycode
except:
return IDN_MAPPINGS.get(punycode, punycode)
def get_all_idn_tlds():
"""Get all IDN TLDs with their Unicode representations"""
import requests
response = requests.get('https://data.iana.org/TLD/tlds-alpha-by-domain.txt')
lines = response.text.strip().split('\n')
tlds = []
for line in lines:
line = line.strip()
if line and not line.startswith('#'):
tlds.append(line.lower())
idn_tlds = []
for tld in tlds:
if tld.startswith('xn--'):
unicode_form = punycode_to_unicode(tld)
idn_tlds.append({
'punycode': tld,
'unicode': unicode_form,
'display': f"{tld} ({unicode_form})"
})
return idn_tlds
if __name__ == "__main__":
idn_tlds = get_all_idn_tlds()
print(f"Found {len(idn_tlds)} IDN TLDs:")
for tld in idn_tlds[:20]:
print(f" {tld['display']}")

View File

@@ -1,2 +1,4 @@
mysql-connector-python==8.2.0 mysql-connector-python==8.2.0
requests==2.31.0 requests==2.31.0
beautifulsoup4==4.12.2
dnspython==2.4.2

263
update.py
View File

@@ -7,6 +7,9 @@ import mysql.connector
import requests import requests
import sys import sys
from datetime import datetime from datetime import datetime
from bs4 import BeautifulSoup
import re
import idna
# Database connection configuration # Database connection configuration
DB_CONFIG = { DB_CONFIG = {
@@ -23,6 +26,7 @@ DB_CONFIG = {
# URLs for data sources # URLs for data sources
IANA_TLD_URL = 'https://data.iana.org/TLD/tlds-alpha-by-domain.txt' IANA_TLD_URL = 'https://data.iana.org/TLD/tlds-alpha-by-domain.txt'
PSL_URL = 'https://publicsuffix.org/list/public_suffix_list.dat' PSL_URL = 'https://publicsuffix.org/list/public_suffix_list.dat'
IANA_ROOT_ZONE_URL = 'https://www.iana.org/domains/root/db'
def fetch_tld_data(): def fetch_tld_data():
"""Fetch TLD data from IANA""" """Fetch TLD data from IANA"""
@@ -62,6 +66,112 @@ def fetch_psl_data():
print(f"Error fetching PSL data: {e}") print(f"Error fetching PSL data: {e}")
return None return None
def fetch_domain_zone_data():
"""Fetch domain zone data using TLD list as authoritative source"""
try:
# Get the authoritative TLD list
tld_response = requests.get(IANA_TLD_URL)
tld_response.raise_for_status()
lines = tld_response.text.strip().split('\n')
all_tlds = []
for line in lines:
line = line.strip()
if line and not line.startswith('#'):
all_tlds.append(line.lower())
print(f"Authoritative TLD list contains: {len(all_tlds)} TLDs")
# Create simple domain list following TLD order
domains = []
for tld in all_tlds:
domains.append({
'domain': tld # Store without dot prefix like domain_root
})
print(f"Created domain list: {len(domains)} domains")
print(f"First 5 domains: {[d['domain'] for d in domains[:5]]}")
return domains
except requests.RequestException as e:
print(f"Error fetching TLD data: {e}")
return None
except Exception as e:
print(f"Error processing domain data: {e}")
return None
def fetch_domain_detail(detail_url):
"""Fetch detailed information for a specific domain"""
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(detail_url, headers=headers)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
# Extract domain name from URL or page
domain = detail_url.split('/')[-1].lower()
# Look for type and manager information
domain_type = "generic" # default
tld_manager = "Unknown"
# Try to find type information
type_elements = soup.find_all(text=re.compile(r'Type|Type of domain', re.IGNORECASE))
for element in type_elements:
parent = element.parent
if parent:
next_sibling = parent.find_next_sibling() or parent.find_next()
if next_sibling:
domain_type = next_sibling.get_text(strip=True)
break
# Try to find manager information
manager_elements = soup.find_all(text=re.compile(r'Manager|Sponsor|Registry', re.IGNORECASE))
for element in manager_elements:
parent = element.parent
if parent:
next_sibling = parent.find_next_sibling() or parent.find_next()
if next_sibling:
tld_manager = next_sibling.get_text(strip=True)
break
return {
'domain': domain,
'type': domain_type,
'tld_manager': tld_manager
}
except Exception as e:
print(f"Error fetching detail for {detail_url}: {e}")
return None
def enhance_domains_with_idn(domains):
"""Enhance domain list with IDN Unicode representations"""
enhanced_domains = []
for domain_data in domains:
domain = domain_data['domain']
enhanced_data = domain_data.copy()
# Check if domain is Punycode and convert to Unicode
if domain.startswith('xn--'):
try:
unicode_domain = idna.decode(domain)
enhanced_data['unicode_domain'] = unicode_domain
except:
enhanced_data['unicode_domain'] = domain
else:
enhanced_data['unicode_domain'] = domain
enhanced_domains.append(enhanced_data)
return enhanced_domains
def update_domain_root(tlds): def update_domain_root(tlds):
"""Update domain_root table with soft delete and new entries""" """Update domain_root table with soft delete and new entries"""
try: try:
@@ -234,8 +344,118 @@ def update_domain_suffix(suffixes):
cursor.close() cursor.close()
conn.close() conn.close()
def update_domain_zone(domains):
"""Update domain_zone table with soft delete and new entries"""
try:
conn = mysql.connector.connect(**DB_CONFIG)
cursor = conn.cursor()
# Get current entries in database
cursor.execute("SELECT id, domain, root_utf, punycode FROM domain_zone WHERE removed = FALSE")
current_entries = {row[1]: {'id': row[0], 'root_utf': row[2], 'punycode': row[3]} for row in cursor.fetchall()}
# Convert domains to set for faster lookup
domain_set = {domain['domain'] for domain in domains}
current_domains = set(current_entries.keys())
# Create mapping for new data
domain_data = {domain['domain']: {'unicode_domain': domain.get('unicode_domain', domain)} for domain in domains}
# Mark entries as removed if not in source
removed_domains = current_domains - domain_set
if removed_domains:
print(f"Marking {len(removed_domains)} domains as removed")
for domain in removed_domains:
cursor.execute(
"UPDATE domain_zone SET removed = TRUE, updated_at = CURRENT_TIMESTAMP WHERE domain = %s",
(domain,)
)
# Add new entries
new_domains = domain_set - current_domains
if new_domains:
print(f"Adding {len(new_domains)} new domains")
insert_query = "INSERT IGNORE INTO domain_zone (domain, root_utf, punycode, removed) VALUES (%s, %s, %s, FALSE)"
batch_size = 100
new_domain_list = list(new_domains)
for i in range(0, len(new_domain_list), batch_size):
batch = new_domain_list[i:i + batch_size]
data = []
for domain in batch:
unicode_domain = domain_data[domain]['unicode_domain']
is_punycode = domain.startswith('xn--')
data.append((domain, unicode_domain, is_punycode))
cursor.executemany(insert_query, data)
conn.commit()
print(f"domain_zone batch {i//batch_size + 1}: {cursor.rowcount} new domains")
# Update existing entries if root_utf changed
common_domains = current_domains & domain_set
for domain in common_domains:
current_data = current_entries[domain]
new_data = domain_data[domain]
new_unicode = new_data['unicode_domain']
if current_data['root_utf'] != new_unicode:
cursor.execute(
"UPDATE domain_zone SET root_utf = %s, updated_at = CURRENT_TIMESTAMP WHERE domain = %s",
(new_unicode, domain)
)
# Restore entries that were previously removed but now exist in source
if common_domains:
cursor.execute("SELECT domain FROM domain_zone WHERE removed = TRUE AND domain IN (%s)" %
','.join(['%s'] * len(common_domains)), list(common_domains))
to_restore = [row[0] for row in cursor.fetchall()]
else:
to_restore = []
if to_restore:
print(f"Restoring {len(to_restore)} previously removed domains")
for domain in to_restore:
new_unicode = domain_data[domain]['unicode_domain']
is_punycode = domain.startswith('xn--')
cursor.execute(
"UPDATE domain_zone SET removed = FALSE, root_utf = %s, punycode = %s, updated_at = CURRENT_TIMESTAMP WHERE domain = %s",
(new_unicode, is_punycode, domain)
)
# Update updated_at timestamp for all active entries that still exist in source
if common_domains:
print(f"Updating timestamps for {len(common_domains)} verified active domains")
cursor.execute(
"UPDATE domain_zone SET updated_at = CURRENT_TIMESTAMP WHERE removed = FALSE AND domain IN (%s)" %
','.join(['%s'] * len(common_domains)), list(common_domains)
)
conn.commit()
# Show statistics
cursor.execute("SELECT COUNT(*) FROM domain_zone WHERE removed = FALSE")
active_count = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM domain_zone WHERE removed = TRUE")
removed_count = cursor.fetchone()[0]
print(f"domain_zone update completed:")
print(f" Active entries: {active_count}")
print(f" Removed entries: {removed_count}")
print(f" New entries added: {len(new_domains)}")
print(f" Entries marked as removed: {len(removed_domains)}")
print(f" Entries restored: {len(to_restore)}")
return True
except mysql.connector.Error as e:
print(f"Database error in domain_zone: {e}")
return False
finally:
if 'conn' in locals() and conn.is_connected():
cursor.close()
conn.close()
def show_sample_data(): def show_sample_data():
"""Show sample data from both tables""" """Show sample data from all tables"""
try: try:
conn = mysql.connector.connect(**DB_CONFIG) conn = mysql.connector.connect(**DB_CONFIG)
cursor = conn.cursor() cursor = conn.cursor()
@@ -252,6 +472,30 @@ def show_sample_data():
status = "REMOVED" if row[2] else "ACTIVE" status = "REMOVED" if row[2] else "ACTIVE"
print(f"{row[0]} {row[1]} [{status}] {row[3]}") print(f"{row[0]} {row[1]} [{status}] {row[3]}")
print("\n=== Sample data from domain_zone table ===")
cursor.execute("SELECT id, domain, root_utf, punycode, removed, created_at FROM domain_zone ORDER BY id LIMIT 15")
for row in cursor.fetchall():
status = "REMOVED" if row[4] else "ACTIVE"
domain_display = row[1]
root_utf_display = row[2] if row[2] else ""
punycode_flag = "IDN" if row[3] else "REG"
# Show Unicode representation for IDN domains
if row[1].startswith('xn--') and row[2]:
domain_display = f"{row[1]} ({row[2]})"
elif row[1].startswith('xn--'):
try:
unicode_domain = idna.decode(row[1])
domain_display = f"{row[1]} ({unicode_domain})"
except:
domain_display = row[1]
# Format the display
if root_utf_display and root_utf_display != row[1]:
print(f"{row[0]} {domain_display} [{punycode_flag}] UTF:{root_utf_display} [{status}] {row[5]}")
else:
print(f"{row[0]} {domain_display} [{punycode_flag}] [{status}] {row[5]}")
cursor.close() cursor.close()
conn.close() conn.close()
@@ -287,6 +531,17 @@ def main():
sys.exit(1) sys.exit(1)
print(f"Fetched {len(suffixes)} suffixes") print(f"Fetched {len(suffixes)} suffixes")
# Fetch domain zone data
print(f"\nFetching domain zone data from TLD list and web scraping...")
domains = fetch_domain_zone_data()
if not domains:
print("Failed to fetch domain zone data")
sys.exit(1)
print(f"Fetched {len(domains)} domain zones")
# Enhance domains with IDN Unicode representations
domains = enhance_domains_with_idn(domains)
# Update domain_root table # Update domain_root table
print(f"\nUpdating domain_root table...") print(f"\nUpdating domain_root table...")
if not update_domain_root(tlds): if not update_domain_root(tlds):
@@ -299,6 +554,12 @@ def main():
print("Failed to update domain_suffix table") print("Failed to update domain_suffix table")
sys.exit(1) sys.exit(1)
# Update domain_zone table
print(f"\nUpdating domain_zone table...")
if not update_domain_zone(domains):
print("Failed to update domain_zone table")
sys.exit(1)
# Show sample data # Show sample data
show_sample_data() show_sample_data()

346
update_data_domain_zone.py Normal file
View File

@@ -0,0 +1,346 @@
#!/usr/bin/env python3
"""
Script to update domain_zone table with additional data fields
"""
import mysql.connector
import requests
import json
import sys
import time
from bs4 import BeautifulSoup
import re
import idna
# Database connection configuration
DB_CONFIG = {
'host': 'l2',
'port': 3306,
'user': 'root',
'password': None, # Will be set from command line or input
'database': 'sp_spider',
'charset': 'utf8mb4',
'ssl_disabled': True,
'auth_plugin': 'mysql_native_password'
}
def get_iana_registry_info(domain):
"""Get registry information from IANA for a domain"""
try:
# Remove dot prefix if present
clean_domain = domain.lstrip('.')
# Try to access the IANA detail page
url = f"https://www.iana.org/domains/root/db/{clean_domain}"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
info = {
'registration_url': None,
'whois_server': None,
'rdap_server': None,
'name_servers': [],
'sponsoring': None,
'administrative': None,
'technical': None
}
# Extract registration URL
reg_links = soup.find_all('a', href=re.compile(r'registrar|registry', re.IGNORECASE))
if reg_links:
info['registration_url'] = reg_links[0].get('href')
# Extract WHOIS server
whois_text = soup.find(text=re.compile(r'WHOIS|whois', re.IGNORECASE))
if whois_text:
parent = whois_text.parent
if parent and parent.name == 'td':
next_td = parent.find_next_sibling('td')
if next_td:
info['whois_server'] = next_td.get_text(strip=True)
# Extract RDAP server
rdap_text = soup.find(text=re.compile(r'RDAP|rdap', re.IGNORECASE))
if rdap_text:
parent = rdap_text.parent
if parent and parent.name == 'td':
next_td = parent.find_next_sibling('td')
if next_td:
info['rdap_server'] = next_td.get_text(strip=True)
# Extract name servers
ns_text = soup.find(text=re.compile(r'name.?server|ns', re.IGNORECASE))
if ns_text:
parent = ns_text.parent
if parent and parent.name == 'td':
next_td = parent.find_next_sibling('td')
if next_td:
ns_list = next_td.get_text(strip=True).split(',')
info['name_servers'] = [ns.strip() for ns in ns_list if ns.strip()]
# Extract sponsoring organization
sponsor_text = soup.find(text=re.compile(r'sponsor|registry|manager', re.IGNORECASE))
if sponsor_text:
parent = sponsor_text.parent
if parent and parent.name == 'td':
next_td = parent.find_next_sibling('td')
if next_td:
info['sponsoring'] = next_td.get_text(strip=True)
# Extract administrative contact
admin_text = soup.find(text=re.compile(r'administrative|admin', re.IGNORECASE))
if admin_text:
parent = admin_text.parent
if parent and parent.name == 'td':
next_td = parent.find_next_sibling('td')
if next_td:
info['administrative'] = next_td.get_text(strip=True)
# Extract technical contact
tech_text = soup.find(text=re.compile(r'technical|tech', re.IGNORECASE))
if tech_text:
parent = tech_text.parent
if parent and parent.name == 'td':
next_td = parent.find_next_sibling('td')
if next_td:
info['technical'] = next_td.get_text(strip=True)
return info
except Exception as e:
print(f"Error fetching IANA info for {domain}: {e}")
return None
def get_rdap_info(domain):
"""Get RDAP information for a domain"""
try:
# Remove dot prefix if present
clean_domain = domain.lstrip('.')
# Try common RDAP servers
rdap_servers = [
f"https://rdap.org/domain/{clean_domain}",
f"https://data.iana.org/rdap/{clean_domain}",
f"https://rdap.verisign.com/com/v1/domain/{clean_domain}",
f"https://rdap.nic.fr/domain/{clean_domain}"
]
for rdap_url in rdap_servers:
try:
response = requests.get(rdap_url, timeout=5)
if response.status_code == 200:
data = response.json()
info = {
'rdap_server': rdap_url.split('/')[2], # Extract server domain
'name_servers': [],
'port43': None
}
# Extract name servers from RDAP data
if 'nameservers' in data:
info['name_servers'] = [ns.get('ldhName', '') for ns in data['nameservers'] if ns.get('ldhName')]
# Extract WHOIS (port43) server
if 'port43' in data:
info['port43'] = data['port43'].get('server', '')
return info
except Exception:
continue
return None
except Exception as e:
print(f"Error fetching RDAP info for {domain}: {e}")
return None
def get_dns_servers(domain):
"""Get DNS servers using DNS resolution"""
try:
import dns.resolver
clean_domain = domain.lstrip('.')
# Try to get NS records
answers = dns.resolver.resolve(clean_domain, 'NS')
name_servers = [str(rdata) for rdata in answers]
return {'name_servers': name_servers}
except Exception:
return None
def update_domain_data():
"""Update domain_zone table with additional data"""
try:
conn = mysql.connector.connect(**DB_CONFIG)
cursor = conn.cursor()
# Get domains that need data enrichment
cursor.execute("""
SELECT id, domain, root_utf
FROM domain_zone
WHERE removed = FALSE
AND (registration_url IS NULL OR whois_server IS NULL OR rdap_server IS NULL)
ORDER BY id
LIMIT 50
""")
domains_to_update = cursor.fetchall()
if not domains_to_update:
print("All domains already have complete data!")
return True
print(f"Updating data for {len(domains_to_update)} domains...")
for domain_id, domain, root_utf in domains_to_update:
print(f"Processing {domain}...")
# Get IANA registry information
iana_info = get_iana_registry_info(domain)
# Get RDAP information
rdap_info = get_rdap_info(domain)
# Get DNS servers
dns_info = get_dns_servers(domain)
# Merge information
update_data = {
'registration_url': iana_info.get('registration_url') if iana_info else None,
'whois_server': iana_info.get('whois_server') if iana_info else None,
'rdap_server': rdap_info.get('rdap_server') if rdap_info else None,
'name_servers': json.dumps(rdap_info.get('name_servers', []) if rdap_info else (dns_info.get('name_servers', []) if dns_info else [])),
'sponsoring': iana_info.get('sponsoring') if iana_info else None,
'administrative': iana_info.get('administrative') if iana_info else None,
'technical': iana_info.get('technical') if iana_info else None
}
# Update database
cursor.execute("""
UPDATE domain_zone SET
registration_url = %s,
whois_server = %s,
rdap_server = %s,
name_servers = %s,
sponsoring = %s,
administrative = %s,
technical = %s,
updated_at = CURRENT_TIMESTAMP
WHERE id = %s
""", (
update_data['registration_url'],
update_data['whois_server'],
update_data['rdap_server'],
update_data['name_servers'],
update_data['sponsoring'],
update_data['administrative'],
update_data['technical'],
domain_id
))
conn.commit()
# Rate limiting
time.sleep(1)
print(f"Updated {len(domains_to_update)} domains successfully")
# Show statistics
cursor.execute("""
SELECT
COUNT(*) as total,
COUNT(CASE WHEN registration_url IS NOT NULL THEN 1 END) as with_reg_url,
COUNT(CASE WHEN whois_server IS NOT NULL THEN 1 END) as with_whois,
COUNT(CASE WHEN rdap_server IS NOT NULL THEN 1 END) as with_rdap,
COUNT(CASE WHEN name_servers IS NOT NULL THEN 1 END) as with_ns
FROM domain_zone
WHERE removed = FALSE
""")
stats = cursor.fetchone()
print(f"\nDatabase Statistics:")
print(f" Total domains: {stats[0]}")
print(f" With registration URL: {stats[1]}")
print(f" With WHOIS server: {stats[2]}")
print(f" With RDAP server: {stats[3]}")
print(f" With name servers: {stats[4]}")
return True
except mysql.connector.Error as e:
print(f"Database error: {e}")
return False
finally:
if 'conn' in locals() and conn.is_connected():
cursor.close()
conn.close()
def show_sample_data():
"""Show sample data from domain_zone table"""
try:
conn = mysql.connector.connect(**DB_CONFIG)
cursor = conn.cursor()
print("\n=== Sample enriched data from domain_zone table ===")
cursor.execute("""
SELECT id, domain, root_utf, whois_server, rdap_server,
JSON_LENGTH(name_servers) as ns_count, sponsoring
FROM domain_zone
WHERE removed = FALSE
AND (registration_url IS NOT NULL OR whois_server IS NOT NULL)
ORDER BY id
LIMIT 10
""")
for row in cursor.fetchall():
domain_display = row[1]
if row[1].startswith('xn--') and row[2]:
domain_display = f"{row[1]} ({row[2]})"
print(f"{row[0]} {domain_display}")
print(f" WHOIS: {row[3] or 'N/A'}")
print(f" RDAP: {row[4] or 'N/A'}")
print(f" Name Servers: {row[5] or 0}")
print(f" Sponsor: {row[6] or 'N/A'}")
print()
cursor.close()
conn.close()
except mysql.connector.Error as e:
print(f"Database error: {e}")
def main():
import getpass
# Get password from command line argument or prompt
if len(sys.argv) > 1:
password = sys.argv[1]
else:
password = getpass.getpass("Enter MariaDB password for user 'root': ")
DB_CONFIG['password'] = password
print("Starting domain_zone data enrichment process...")
# Update domain data
if not update_domain_data():
print("Failed to update domain data")
sys.exit(1)
# Show sample data
show_sample_data()
print("\n=== Domain data enrichment completed ===")
if __name__ == "__main__":
main()