spiderJump

This commit is contained in:
Kar
2026-02-20 20:42:59 +05:30
parent 309ba5fbae
commit babb1e7da1
2 changed files with 788 additions and 263 deletions

616
main.go
View File

@@ -23,33 +23,49 @@ import (
)
// ─────────────────────────────────────────────────────────────────
// Global state
// Constants & globals
// ─────────────────────────────────────────────────────────────────
const mainDBFile = "siliconpin_spider.sqlite"
var mainDB *sql.DB
// per-domain SSE brokers
// SSE brokers one per domain
var (
brokersMu sync.RWMutex
brokers = map[string]*Broker{}
)
// per-domain DB connections (kept open)
// open domain DB handles
var (
domainDBsMu sync.RWMutex
domainDBs = map[string]*sql.DB{}
)
// guard against duplicate crawlers
// crawler goroutine guard
var (
crawlersMu sync.Mutex
crawlers = map[string]bool{}
)
// pause/resume channels one per domain
// sending to pauseCh pauses; sending to resumeCh resumes
var (
pauseChsMu sync.RWMutex
pauseChs = map[string]chan struct{}{} // pause signal
resumeChs = map[string]chan struct{}{} // resume signal
)
// domain status values stored in main DB
const (
statusRunning = "running"
statusPaused = "paused"
statusDone = "done"
statusPending = "pending"
)
// ─────────────────────────────────────────────────────────────────
// SSE Broker fan-out to multiple subscribers per domain
// SSE Broker
// ─────────────────────────────────────────────────────────────────
type Broker struct {
@@ -57,9 +73,7 @@ type Broker struct {
clients map[chan string]struct{}
}
func newBroker() *Broker {
return &Broker{clients: make(map[chan string]struct{})}
}
func newBroker() *Broker { return &Broker{clients: make(map[chan string]struct{})} }
func (b *Broker) subscribe() chan string {
ch := make(chan string, 64)
@@ -81,7 +95,7 @@ func (b *Broker) publish(msg string) {
for ch := range b.clients {
select {
case ch <- msg:
default: // slow client drop message
default:
}
}
}
@@ -103,22 +117,29 @@ func getBroker(domain string) *Broker {
return br
}
// ─────────────────────────────────────────────────────────────────
// SSE event helper
// ─────────────────────────────────────────────────────────────────
type sseEvent struct {
type ssePayload struct {
Event string `json:"event"`
Data interface{} `json:"data"`
}
func emit(br *Broker, event string, data interface{}) {
payload, _ := json.Marshal(sseEvent{Event: event, Data: data})
br.publish(string(payload))
b, _ := json.Marshal(ssePayload{Event: event, Data: data})
br.publish(string(b))
}
// broadcast emits to ALL domain brokers (e.g. for a new_domain event)
func broadcast(event string, data interface{}) {
brokersMu.RLock()
defer brokersMu.RUnlock()
b, _ := json.Marshal(ssePayload{Event: event, Data: data})
msg := string(b)
for _, br := range brokers {
br.publish(msg)
}
}
// ─────────────────────────────────────────────────────────────────
// Database helpers
// Main DB helpers
// ─────────────────────────────────────────────────────────────────
func initMainDB() {
@@ -132,6 +153,8 @@ func initMainDB() {
id INTEGER PRIMARY KEY AUTOINCREMENT,
domain TEXT NOT NULL UNIQUE,
interval INTEGER NOT NULL DEFAULT 60,
status TEXT NOT NULL DEFAULT 'pending',
parent TEXT NOT NULL DEFAULT '',
created_at DATETIME NOT NULL,
updated_at DATETIME NOT NULL
)`)
@@ -141,6 +164,67 @@ func initMainDB() {
log.Printf("Main DB ready: %s", mainDBFile)
}
func setDomainStatus(domain, status string) {
now := time.Now().UTC().Format(time.RFC3339)
mainDB.Exec(`UPDATE domains SET status=?, updated_at=? WHERE domain=?`, status, now, domain)
}
type DomainRow struct {
ID int `json:"id"`
Domain string `json:"domain"`
Interval int `json:"interval"`
Status string `json:"status"`
Parent string `json:"parent,omitempty"`
URLCount int `json:"url_count"`
QueueLen int `json:"queue_len"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
}
func listDomains() ([]DomainRow, error) {
rows, err := mainDB.Query(
`SELECT id, domain, interval, status, parent, created_at, updated_at
FROM domains ORDER BY id ASC`)
if err != nil {
return nil, err
}
defer rows.Close()
var out []DomainRow
for rows.Next() {
var d DomainRow
if err := rows.Scan(&d.ID, &d.Domain, &d.Interval, &d.Status,
&d.Parent, &d.CreatedAt, &d.UpdatedAt); err != nil {
continue
}
// get live counts from domain DB
if db, err2 := openDomainDB(d.Domain); err2 == nil {
db.QueryRow(`SELECT COUNT(1) FROM urls`).Scan(&d.URLCount)
db.QueryRow(`SELECT COUNT(1) FROM queue`).Scan(&d.QueueLen)
}
out = append(out, d)
}
return out, nil
}
// registerDomain upserts a domain in the main DB.
// parentDomain is "" for user-added domains, otherwise the domain that found it.
func registerDomain(domain string, interval int, parentDomain string) error {
now := time.Now().UTC().Format(time.RFC3339)
_, err := mainDB.Exec(`
INSERT INTO domains (domain, interval, status, parent, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(domain) DO UPDATE SET
interval=excluded.interval,
updated_at=excluded.updated_at`,
domain, interval, statusPending, parentDomain, now, now)
return err
}
// ─────────────────────────────────────────────────────────────────
// Domain DB helpers
// ─────────────────────────────────────────────────────────────────
func openDomainDB(domain string) (*sql.DB, error) {
domainDBsMu.RLock()
db, ok := domainDBs[domain]
@@ -153,24 +237,32 @@ func openDomainDB(domain string) (*sql.DB, error) {
if err != nil {
return nil, err
}
_, err = db.Exec(`
if _, err = db.Exec(`
CREATE TABLE IF NOT EXISTS urls (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT NOT NULL UNIQUE,
created_at DATETIME NOT NULL,
updated_at DATETIME NOT NULL
)`)
if err != nil {
)`); err != nil {
db.Close()
return nil, err
}
_, err = db.Exec(`
if _, err = db.Exec(`
CREATE TABLE IF NOT EXISTS queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT NOT NULL UNIQUE,
added_at DATETIME NOT NULL
)`)
if err != nil {
)`); err != nil {
db.Close()
return nil, err
}
// cross-domain links discovered during crawl
if _, err = db.Exec(`
CREATE TABLE IF NOT EXISTS ext_links (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ext_domain TEXT NOT NULL UNIQUE,
found_at DATETIME NOT NULL
)`); err != nil {
db.Close()
return nil, err
}
@@ -185,8 +277,7 @@ func insertURL(db *sql.DB, rawURL string) (bool, error) {
now := time.Now().UTC().Format(time.RFC3339)
res, err := db.Exec(
`INSERT OR IGNORE INTO urls (url, created_at, updated_at) VALUES (?, ?, ?)`,
rawURL, now, now,
)
rawURL, now, now)
if err != nil {
return false, err
}
@@ -200,29 +291,22 @@ func isURLKnown(db *sql.DB, rawURL string) bool {
return c > 0
}
// ── persistent queue helpers ──────────────────────────────────────
// enqueueURL adds a URL to the persistent queue if not already there
// and not already crawled.
func enqueueURL(db *sql.DB, rawURL string) {
now := time.Now().UTC().Format(time.RFC3339)
db.Exec(`INSERT OR IGNORE INTO queue (url, added_at) VALUES (?, ?)`, rawURL, now)
}
// dequeueURL removes and returns the oldest queued URL (FIFO).
// Returns "", false when the queue is empty.
func dequeueURL(db *sql.DB) (string, bool) {
tx, err := db.Begin()
if err != nil {
return "", false
}
defer tx.Rollback() //nolint:errcheck
var id int64
var rawURL string
err = tx.QueryRow(`SELECT id, url FROM queue ORDER BY id ASC LIMIT 1`).Scan(&id, &rawURL)
if err != nil {
return "", false // empty
if err = tx.QueryRow(`SELECT id, url FROM queue ORDER BY id ASC LIMIT 1`).
Scan(&id, &rawURL); err != nil {
return "", false
}
if _, err = tx.Exec(`DELETE FROM queue WHERE id = ?`, id); err != nil {
return "", false
@@ -233,31 +317,138 @@ func dequeueURL(db *sql.DB) (string, bool) {
return rawURL, true
}
// queueLen returns the current number of pending URLs.
func queueLen(db *sql.DB) int {
var n int
db.QueryRow(`SELECT COUNT(1) FROM queue`).Scan(&n)
return n
}
// seedQueue inserts the start URL only when the queue is completely empty
// (first ever run). On restart the persisted queue is used as-is.
func seedQueue(db *sql.DB, startURL string) {
var qCount, uCount int
db.QueryRow(`SELECT COUNT(1) FROM queue`).Scan(&qCount)
db.QueryRow(`SELECT COUNT(1) FROM urls`).Scan(&uCount)
if qCount == 0 && uCount == 0 {
var qc, uc int
db.QueryRow(`SELECT COUNT(1) FROM queue`).Scan(&qc)
db.QueryRow(`SELECT COUNT(1) FROM urls`).Scan(&uc)
if qc == 0 && uc == 0 {
enqueueURL(db, startURL)
}
}
// recordExtLink saves a discovered external domain and auto-registers it.
func recordExtLink(srcDomain, extDomain string, parentInterval int) {
db, err := openDomainDB(srcDomain)
if err != nil {
return
}
now := time.Now().UTC().Format(time.RFC3339)
res, _ := db.Exec(
`INSERT OR IGNORE INTO ext_links (ext_domain, found_at) VALUES (?, ?)`,
extDomain, now)
n, _ := res.RowsAffected()
if n == 0 {
return // already recorded
}
// Register in main DB (inherit parent's interval)
if err := registerDomain(extDomain, parentInterval, srcDomain); err != nil {
log.Printf("registerDomain %s (from %s): %v", extDomain, srcDomain, err)
return
}
log.Printf("[%s] discovered external domain: %s", srcDomain, extDomain)
// Notify UI
broadcast("new_domain", map[string]string{
"domain": extDomain,
"parent": srcDomain,
})
// Init the new domain's DB and start its crawler
if _, err := openDomainDB(extDomain); err != nil {
log.Printf("openDomainDB %s: %v", extDomain, err)
return
}
crawlersMu.Lock()
if !crawlers[extDomain] {
crawlers[extDomain] = true
go crawlDomain(extDomain, parentInterval)
}
crawlersMu.Unlock()
}
// ─────────────────────────────────────────────────────────────────
// robots.txt (minimal, single-pass parser)
// Pause / resume machinery
// ─────────────────────────────────────────────────────────────────
func ensurePauseChannels(domain string) {
pauseChsMu.Lock()
defer pauseChsMu.Unlock()
if _, ok := pauseChs[domain]; !ok {
pauseChs[domain] = make(chan struct{}, 1)
resumeChs[domain] = make(chan struct{}, 1)
}
}
// pauseCrawler signals the crawler to pause. Non-blocking.
func pauseCrawler(domain string) {
pauseChsMu.RLock()
ch, ok := pauseChs[domain]
pauseChsMu.RUnlock()
if !ok {
return
}
select {
case ch <- struct{}{}:
default:
}
}
// resumeCrawler signals a paused crawler to continue. Non-blocking.
func resumeCrawler(domain string) {
pauseChsMu.RLock()
ch, ok := resumeChs[domain]
pauseChsMu.RUnlock()
if !ok {
return
}
select {
case ch <- struct{}{}:
default:
}
}
// checkPause is called inside the crawl loop between requests.
// If a pause signal is pending it blocks until resume arrives.
func checkPause(domain string, br *Broker) {
pauseChsMu.RLock()
pCh := pauseChs[domain]
rCh := resumeChs[domain]
pauseChsMu.RUnlock()
select {
case <-pCh:
setDomainStatus(domain, statusPaused)
emit(br, "paused", map[string]string{"domain": domain})
log.Printf("[%s] paused", domain)
// drain any duplicate pause signals
for len(pCh) > 0 {
<-pCh
}
// block until resume
<-rCh
setDomainStatus(domain, statusRunning)
emit(br, "resumed", map[string]string{"domain": domain})
log.Printf("[%s] resumed", domain)
default:
}
}
// ─────────────────────────────────────────────────────────────────
// robots.txt
// ─────────────────────────────────────────────────────────────────
type robotsRules struct {
disallowed []string
crawlDelay int // 0 = not set
crawlDelay int
}
func fetchRobots(domain string) *robotsRules {
@@ -277,11 +468,9 @@ func fetchRobots(domain string) *robotsRules {
continue
}
lower := strings.ToLower(line)
if strings.HasPrefix(lower, "user-agent:") {
agent := strings.TrimSpace(line[len("user-agent:"):])
inSection = agent == "*" ||
strings.EqualFold(agent, "siliconpin_spider")
inSection = agent == "*" || strings.EqualFold(agent, "siliconpin_spider")
continue
}
if !inSection {
@@ -310,14 +499,21 @@ func (r *robotsRules) allowed(path string) bool {
}
// ─────────────────────────────────────────────────────────────────
// Link extractor same-host HTML links only
// Link extractor
// ─────────────────────────────────────────────────────────────────
var hrefRe = regexp.MustCompile(`(?i)href=["']([^"'#][^"']*)["']`)
func extractLinks(base *url.URL, body string) []string {
seen := map[string]bool{}
var links []string
type extractedLinks struct {
sameHost []string
external []string // distinct external hostnames (not full URLs)
}
func extractLinks(base *url.URL, body string) extractedLinks {
seenSame := map[string]bool{}
seenExt := map[string]bool{}
var result extractedLinks
for _, m := range hrefRe.FindAllStringSubmatch(body, -1) {
href := strings.TrimSpace(m[1])
parsed, err := url.Parse(href)
@@ -330,16 +526,25 @@ func extractLinks(base *url.URL, body string) []string {
if resolved.Scheme != "http" && resolved.Scheme != "https" {
continue
}
if !strings.EqualFold(resolved.Hostname(), base.Hostname()) {
continue
}
s := resolved.String()
if !seen[s] {
seen[s] = true
links = append(links, s)
host := strings.ToLower(resolved.Hostname())
baseHost := strings.ToLower(base.Hostname())
if host == baseHost {
s := resolved.String()
if !seenSame[s] {
seenSame[s] = true
result.sameHost = append(result.sameHost, s)
}
} else {
// strip www. for normalisation
extDomain := strings.TrimPrefix(host, "www.")
if extDomain != "" && !seenExt[extDomain] && isValidDomain(extDomain) {
seenExt[extDomain] = true
result.external = append(result.external, extDomain)
}
}
}
return links
return result
}
// ─────────────────────────────────────────────────────────────────
@@ -347,20 +552,21 @@ func extractLinks(base *url.URL, body string) []string {
// ─────────────────────────────────────────────────────────────────
func crawlDomain(domain string, intervalSec int) {
log.Printf("[%s] crawler started (base interval %ds)", domain, intervalSec)
log.Printf("[%s] crawler started (interval %ds)", domain, intervalSec)
br := getBroker(domain)
ensurePauseChannels(domain)
db, err := openDomainDB(domain)
if err != nil {
emit(br, "error", map[string]string{"msg": "DB error: " + err.Error()})
emit(br, "error", map[string]string{"msg": "DB open failed: " + err.Error()})
return
}
// ── robots.txt ──────────────────────────────────────────────
setDomainStatus(domain, statusRunning)
// robots.txt
emit(br, "status", map[string]string{"msg": "fetching robots.txt"})
robots := fetchRobots(domain)
// robots.txt crawl-delay overrides our setting if higher
if robots.crawlDelay > intervalSec {
intervalSec = robots.crawlDelay
now := time.Now().UTC().Format(time.RFC3339)
@@ -373,10 +579,6 @@ func crawlDomain(domain string, intervalSec int) {
"effective_delay": intervalSec,
})
// ── Persistent BFS queue ────────────────────────────────────
// On first run: seed with the start URL.
// On restart: the queue table already holds the pending URLs —
// we just continue from where we left off.
startURL := "https://" + domain + "/"
seedQueue(db, startURL)
@@ -391,24 +593,25 @@ func crawlDomain(domain string, intervalSec int) {
}
for {
// Re-read interval in case it was updated via API
// ── pause check ─────────────────────────────────────────
checkPause(domain, br)
// ── re-read interval ────────────────────────────────────
var cur int
if err := mainDB.QueryRow(`SELECT interval FROM domains WHERE domain=?`, domain).Scan(&cur); err == nil && cur > 0 {
if mainDB.QueryRow(`SELECT interval FROM domains WHERE domain=?`, domain).
Scan(&cur) == nil && cur > 0 {
intervalSec = cur
}
target, ok := dequeueURL(db)
if !ok {
break // queue exhausted
break
}
// Skip if already crawled (can happen if same URL was enqueued
// multiple times before being dequeued, or after a re-seed)
if isURLKnown(db, target) {
continue
}
// robots check
parsed, err := url.Parse(target)
if err != nil {
continue
@@ -418,24 +621,24 @@ func crawlDomain(domain string, intervalSec int) {
continue
}
// random delay: [interval, interval*2] seconds
// random delay [interval, interval*2]
delaySec := intervalSec + rand.Intn(intervalSec+1)
delay := time.Duration(delaySec) * time.Second
emit(br, "waiting", map[string]interface{}{
"url": target,
"delay_s": delaySec,
"queue": queueLen(db),
})
time.Sleep(delay)
time.Sleep(time.Duration(delaySec) * time.Second)
// ── pause check after sleep (could have been paused during wait) ──
checkPause(domain, br)
// fetch
emit(br, "fetching", map[string]string{"url": target})
resp, err := httpClient.Get(target)
if err != nil {
emit(br, "error", map[string]string{"url": target, "msg": err.Error()})
log.Printf("[%s] fetch error %s: %v", domain, target, err)
// Re-enqueue so it's retried next run
enqueueURL(db, target)
enqueueURL(db, target) // retry next run
continue
}
@@ -444,13 +647,12 @@ func crawlDomain(domain string, intervalSec int) {
var bodyStr string
if isHTML {
raw, _ := io.ReadAll(io.LimitReader(resp.Body, 5<<20)) // 5 MB cap
raw, _ := io.ReadAll(io.LimitReader(resp.Body, 5<<20))
bodyStr = string(raw)
}
resp.Body.Close()
inserted, _ := insertURL(db, target)
if inserted {
if ins, _ := insertURL(db, target); ins {
emit(br, "saved", map[string]interface{}{
"url": target,
"status": resp.StatusCode,
@@ -459,11 +661,12 @@ func crawlDomain(domain string, intervalSec int) {
log.Printf("[%s] saved: %s", domain, target)
}
// discover links from HTML pages
if isHTML && resp.StatusCode == 200 {
links := extractLinks(parsed, bodyStr)
// same-host links → queue
newCount := 0
for _, link := range links {
for _, link := range links.sameHost {
if !isURLKnown(db, link) {
enqueueURL(db, link)
newCount++
@@ -471,13 +674,20 @@ func crawlDomain(domain string, intervalSec int) {
}
emit(br, "links_found", map[string]interface{}{
"url": target,
"found": len(links),
"found": len(links.sameHost),
"new": newCount,
"queue_len": queueLen(db),
"external": len(links.external),
})
// external domains → auto-register & crawl
for _, extDomain := range links.external {
recordExtLink(domain, extDomain, intervalSec)
}
}
}
setDomainStatus(domain, statusDone)
emit(br, "done", map[string]string{"domain": domain, "msg": "crawl complete"})
log.Printf("[%s] crawl complete", domain)
@@ -487,7 +697,7 @@ func crawlDomain(domain string, intervalSec int) {
}
// ─────────────────────────────────────────────────────────────────
// HTTP handlers
// HTTP helpers
// ─────────────────────────────────────────────────────────────────
func sanitizeDomain(raw string) string {
@@ -503,37 +713,39 @@ var domainRe = regexp.MustCompile(`^[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?
func isValidDomain(d string) bool { return domainRe.MatchString(d) }
func jsonOK(w http.ResponseWriter, code int, v interface{}) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(code)
json.NewEncoder(w).Encode(v)
}
// ─────────────────────────────────────────────────────────────────
// Handlers
// ─────────────────────────────────────────────────────────────────
// POST /api/add_domain
func addDomainHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
var body struct {
Domain string `json:"domain"`
CrawlDelay string `json:"Crawl-delay"`
}
w.Header().Set("Content-Type", "application/json")
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(map[string]string{"error": "invalid JSON"})
jsonOK(w, http.StatusBadRequest, map[string]string{"error": "invalid JSON"})
return
}
if body.Domain == "" {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(map[string]string{"error": "domain is required"})
jsonOK(w, http.StatusBadRequest, map[string]string{"error": "domain is required"})
return
}
domain := sanitizeDomain(body.Domain)
if !isValidDomain(domain) {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(map[string]string{"error": "invalid domain"})
jsonOK(w, http.StatusBadRequest, map[string]string{"error": "invalid domain"})
return
}
interval := 60
if body.CrawlDelay != "" {
fmt.Sscanf(body.CrawlDelay, "%d", &interval)
@@ -542,25 +754,15 @@ func addDomainHandler(w http.ResponseWriter, r *http.Request) {
}
}
now := time.Now().UTC().Format(time.RFC3339)
_, err := mainDB.Exec(
`INSERT INTO domains (domain,interval,created_at,updated_at) VALUES (?,?,?,?)
ON CONFLICT(domain) DO UPDATE SET interval=excluded.interval, updated_at=excluded.updated_at`,
domain, interval, now, now,
)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(map[string]string{"error": "db error"})
if err := registerDomain(domain, interval, ""); err != nil {
jsonOK(w, http.StatusInternalServerError, map[string]string{"error": "db error"})
return
}
if _, err := openDomainDB(domain); err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(map[string]string{"error": "domain DB init failed"})
jsonOK(w, http.StatusInternalServerError, map[string]string{"error": "domain DB init failed"})
return
}
// start crawler if not already running
crawlersMu.Lock()
if !crawlers[domain] {
crawlers[domain] = true
@@ -568,8 +770,9 @@ func addDomainHandler(w http.ResponseWriter, r *http.Request) {
}
crawlersMu.Unlock()
w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(map[string]interface{}{
broadcast("new_domain", map[string]string{"domain": domain, "parent": ""})
jsonOK(w, http.StatusCreated, map[string]interface{}{
"message": "domain added, crawler started",
"domain": domain,
"interval": interval,
@@ -578,35 +781,98 @@ func addDomainHandler(w http.ResponseWriter, r *http.Request) {
})
}
// GET /api/sse/{domain}
func sseHandler(w http.ResponseWriter, r *http.Request) {
rawDomain := strings.TrimPrefix(r.URL.Path, "/api/sse/")
domain := sanitizeDomain(rawDomain)
if !isValidDomain(domain) {
http.Error(w, "invalid domain", http.StatusBadRequest)
// GET /api/domains
func domainsHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
list, err := listDomains()
if err != nil {
jsonOK(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
return
}
if list == nil {
list = []DomainRow{}
}
jsonOK(w, http.StatusOK, list)
}
// POST /api/pause/{domain}
func pauseHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
domain := sanitizeDomain(strings.TrimPrefix(r.URL.Path, "/api/pause/"))
if !isValidDomain(domain) {
jsonOK(w, http.StatusBadRequest, map[string]string{"error": "invalid domain"})
return
}
crawlersMu.Lock()
running := crawlers[domain]
crawlersMu.Unlock()
if !running {
jsonOK(w, http.StatusConflict, map[string]string{"error": "crawler not running for this domain"})
return
}
pauseCrawler(domain)
jsonOK(w, http.StatusOK, map[string]string{"message": "pause signal sent", "domain": domain})
}
// POST /api/resume/{domain}
func resumeHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
domain := sanitizeDomain(strings.TrimPrefix(r.URL.Path, "/api/resume/"))
if !isValidDomain(domain) {
jsonOK(w, http.StatusBadRequest, map[string]string{"error": "invalid domain"})
return
}
resumeCrawler(domain)
jsonOK(w, http.StatusOK, map[string]string{"message": "resume signal sent", "domain": domain})
}
// GET /api/sse/{domain} — or /api/sse/ (global stream for all domains)
func sseHandler(w http.ResponseWriter, r *http.Request) {
rawDomain := strings.TrimPrefix(r.URL.Path, "/api/sse/")
rawDomain = strings.TrimRight(rawDomain, "/")
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming not supported", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no") // nginx: disable proxy buffering
w.Header().Set("X-Accel-Buffering", "no")
w.Header().Set("Access-Control-Allow-Origin", "*")
br := getBroker(domain)
// If no domain specified, subscribe to the global "__all__" broker
// which receives broadcasts (new_domain, shutdown, etc.)
domainKey := rawDomain
if domainKey == "" {
domainKey = "__all__"
} else {
domainKey = sanitizeDomain(domainKey)
if !isValidDomain(domainKey) && domainKey != "__all__" {
http.Error(w, "invalid domain", http.StatusBadRequest)
return
}
}
br := getBroker(domainKey)
ch := br.subscribe()
defer br.unsubscribe(ch)
log.Printf("[SSE] client connected → %s", domain)
// send immediate connected event
fmt.Fprintf(w, "data: {\"event\":\"connected\",\"data\":{\"domain\":%q}}\n\n", domain)
log.Printf("[SSE] client connected → %s", domainKey)
fmt.Fprintf(w, "data: {\"event\":\"connected\",\"data\":{\"domain\":%q}}\n\n", domainKey)
flusher.Flush()
ticker := time.NewTicker(25 * time.Second)
@@ -615,7 +881,7 @@ func sseHandler(w http.ResponseWriter, r *http.Request) {
for {
select {
case <-r.Context().Done():
log.Printf("[SSE] client disconnected → %s", domain)
log.Printf("[SSE] client disconnected → %s", domainKey)
return
case msg := <-ch:
fmt.Fprintf(w, "data: %s\n\n", msg)
@@ -627,6 +893,27 @@ func sseHandler(w http.ResponseWriter, r *http.Request) {
}
}
// ─────────────────────────────────────────────────────────────────
// broadcast helper publish to __all__ broker
// ─────────────────────────────────────────────────────────────────
func init() {
// ensure the global broadcast broker always exists
getBroker("__all__")
}
// override broadcast to also send to __all__
func broadcastAll(event string, data interface{}) {
b, _ := json.Marshal(ssePayload{Event: event, Data: data})
msg := string(b)
brokersMu.RLock()
defer brokersMu.RUnlock()
for _, br := range brokers {
br.publish(msg)
}
}
// ─────────────────────────────────────────────────────────────────
// main
// ─────────────────────────────────────────────────────────────────
@@ -640,20 +927,28 @@ func main() {
initMainDB()
// Resume any domains already in the DB from a previous run
rows, err := mainDB.Query(`SELECT domain, interval FROM domains`)
// Resume domains from previous run
rows, err := mainDB.Query(`SELECT domain, interval, status FROM domains`)
if err == nil {
for rows.Next() {
var d string
var d, status string
var iv int
if rows.Scan(&d, &iv) == nil {
crawlersMu.Lock()
if !crawlers[d] {
crawlers[d] = true
go crawlDomain(d, iv)
}
crawlersMu.Unlock()
if rows.Scan(&d, &iv, &status) != nil {
continue
}
// don't restart completed or paused crawls automatically;
// only restart those that were mid-flight (running/pending)
if status == statusDone {
continue
}
crawlersMu.Lock()
if !crawlers[d] {
crawlers[d] = true
// reset status so it shows running
setDomainStatus(d, statusPending)
go crawlDomain(d, iv)
}
crawlersMu.Unlock()
}
rows.Close()
}
@@ -661,61 +956,46 @@ func main() {
mux := http.NewServeMux()
mux.Handle("/", http.FileServer(http.Dir("./static")))
mux.HandleFunc("/api/add_domain", addDomainHandler)
mux.HandleFunc("/api/domains", domainsHandler)
mux.HandleFunc("/api/pause/", pauseHandler)
mux.HandleFunc("/api/resume/", resumeHandler)
mux.HandleFunc("/api/sse/", sseHandler)
srv := &http.Server{
Addr: ":8080",
Handler: mux,
}
srv := &http.Server{Addr: ":8080", Handler: mux}
// ── Graceful shutdown ────────────────────────────────────────
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
go func() {
log.Printf("siliconpin_spider listening on %s", srv.Addr)
log.Printf("siliconpin_spider listening on :8080")
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("server error: %v", err)
log.Fatalf("server: %v", err)
}
}()
sig := <-quit
log.Printf("received %s — shutting down gracefully…", sig)
<-quit
log.Println("shutting down…")
// 1. Stop accepting new HTTP requests; give in-flight ones 10s to finish
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
log.Printf("HTTP shutdown error: %v", err)
}
srv.Shutdown(ctx) //nolint:errcheck
// 2. Notify all SSE clients
brokersMu.RLock()
for domain, br := range brokers {
emit(br, "shutdown", map[string]string{"domain": domain, "msg": "server stopping"})
for d, br := range brokers {
emit(br, "shutdown", map[string]string{"domain": d, "msg": "server stopping"})
}
brokersMu.RUnlock()
// Brief pause so SSE messages flush to clients
time.Sleep(500 * time.Millisecond)
// 3. Checkpoint WAL → merge pending writes into the .sqlite file
// After this the .sqlite is fully self-contained (no WAL needed).
domainDBsMu.RLock()
for domain, db := range domainDBs {
if _, err := db.Exec(`PRAGMA wal_checkpoint(TRUNCATE)`); err != nil {
log.Printf("checkpoint %s: %v", domain, err)
} else {
log.Printf("checkpointed %s.sqlite", domain)
}
for d, db := range domainDBs {
db.Exec(`PRAGMA wal_checkpoint(TRUNCATE)`) //nolint:errcheck
db.Close()
log.Printf("checkpointed %s.sqlite", d)
}
domainDBsMu.RUnlock()
if _, err := mainDB.Exec(`PRAGMA wal_checkpoint(TRUNCATE)`); err != nil {
log.Printf("checkpoint main DB: %v", err)
}
mainDB.Exec(`PRAGMA wal_checkpoint(TRUNCATE)`) //nolint:errcheck
mainDB.Close()
log.Println("shutdown complete — all WAL data flushed, goodbye.")
log.Println("goodbye.")
}