Select a domain to watch its live feed
+diff --git a/main.go b/main.go index f45cc2b..2e4cd46 100644 --- a/main.go +++ b/main.go @@ -23,33 +23,49 @@ import ( ) // ───────────────────────────────────────────────────────────────── -// Global state +// Constants & globals // ───────────────────────────────────────────────────────────────── const mainDBFile = "siliconpin_spider.sqlite" var mainDB *sql.DB -// per-domain SSE brokers +// SSE brokers – one per domain var ( brokersMu sync.RWMutex brokers = map[string]*Broker{} ) -// per-domain DB connections (kept open) +// open domain DB handles var ( domainDBsMu sync.RWMutex domainDBs = map[string]*sql.DB{} ) -// guard against duplicate crawlers +// crawler goroutine guard var ( crawlersMu sync.Mutex crawlers = map[string]bool{} ) +// pause/resume channels – one per domain +// sending to pauseCh pauses; sending to resumeCh resumes +var ( + pauseChsMu sync.RWMutex + pauseChs = map[string]chan struct{}{} // pause signal + resumeChs = map[string]chan struct{}{} // resume signal +) + +// domain status values stored in main DB +const ( + statusRunning = "running" + statusPaused = "paused" + statusDone = "done" + statusPending = "pending" +) + // ───────────────────────────────────────────────────────────────── -// SSE Broker – fan-out to multiple subscribers per domain +// SSE Broker // ───────────────────────────────────────────────────────────────── type Broker struct { @@ -57,9 +73,7 @@ type Broker struct { clients map[chan string]struct{} } -func newBroker() *Broker { - return &Broker{clients: make(map[chan string]struct{})} -} +func newBroker() *Broker { return &Broker{clients: make(map[chan string]struct{})} } func (b *Broker) subscribe() chan string { ch := make(chan string, 64) @@ -81,7 +95,7 @@ func (b *Broker) publish(msg string) { for ch := range b.clients { select { case ch <- msg: - default: // slow client – drop message + default: } } } @@ -103,22 +117,29 @@ func getBroker(domain string) *Broker { return br } -// ───────────────────────────────────────────────────────────────── -// SSE event helper -// ───────────────────────────────────────────────────────────────── - -type sseEvent struct { +type ssePayload struct { Event string `json:"event"` Data interface{} `json:"data"` } func emit(br *Broker, event string, data interface{}) { - payload, _ := json.Marshal(sseEvent{Event: event, Data: data}) - br.publish(string(payload)) + b, _ := json.Marshal(ssePayload{Event: event, Data: data}) + br.publish(string(b)) +} + +// broadcast emits to ALL domain brokers (e.g. for a new_domain event) +func broadcast(event string, data interface{}) { + brokersMu.RLock() + defer brokersMu.RUnlock() + b, _ := json.Marshal(ssePayload{Event: event, Data: data}) + msg := string(b) + for _, br := range brokers { + br.publish(msg) + } } // ───────────────────────────────────────────────────────────────── -// Database helpers +// Main DB helpers // ───────────────────────────────────────────────────────────────── func initMainDB() { @@ -132,6 +153,8 @@ func initMainDB() { id INTEGER PRIMARY KEY AUTOINCREMENT, domain TEXT NOT NULL UNIQUE, interval INTEGER NOT NULL DEFAULT 60, + status TEXT NOT NULL DEFAULT 'pending', + parent TEXT NOT NULL DEFAULT '', created_at DATETIME NOT NULL, updated_at DATETIME NOT NULL )`) @@ -141,6 +164,67 @@ func initMainDB() { log.Printf("Main DB ready: %s", mainDBFile) } +func setDomainStatus(domain, status string) { + now := time.Now().UTC().Format(time.RFC3339) + mainDB.Exec(`UPDATE domains SET status=?, updated_at=? WHERE domain=?`, status, now, domain) +} + +type DomainRow struct { + ID int `json:"id"` + Domain string `json:"domain"` + Interval int `json:"interval"` + Status string `json:"status"` + Parent string `json:"parent,omitempty"` + URLCount int `json:"url_count"` + QueueLen int `json:"queue_len"` + CreatedAt string `json:"created_at"` + UpdatedAt string `json:"updated_at"` +} + +func listDomains() ([]DomainRow, error) { + rows, err := mainDB.Query( + `SELECT id, domain, interval, status, parent, created_at, updated_at + FROM domains ORDER BY id ASC`) + if err != nil { + return nil, err + } + defer rows.Close() + + var out []DomainRow + for rows.Next() { + var d DomainRow + if err := rows.Scan(&d.ID, &d.Domain, &d.Interval, &d.Status, + &d.Parent, &d.CreatedAt, &d.UpdatedAt); err != nil { + continue + } + // get live counts from domain DB + if db, err2 := openDomainDB(d.Domain); err2 == nil { + db.QueryRow(`SELECT COUNT(1) FROM urls`).Scan(&d.URLCount) + db.QueryRow(`SELECT COUNT(1) FROM queue`).Scan(&d.QueueLen) + } + out = append(out, d) + } + return out, nil +} + +// registerDomain upserts a domain in the main DB. +// parentDomain is "" for user-added domains, otherwise the domain that found it. +func registerDomain(domain string, interval int, parentDomain string) error { + now := time.Now().UTC().Format(time.RFC3339) + _, err := mainDB.Exec(` + INSERT INTO domains (domain, interval, status, parent, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?) + ON CONFLICT(domain) DO UPDATE SET + interval=excluded.interval, + updated_at=excluded.updated_at`, + domain, interval, statusPending, parentDomain, now, now) + return err +} + +// ───────────────────────────────────────────────────────────────── +// Domain DB helpers +// ───────────────────────────────────────────────────────────────── + func openDomainDB(domain string) (*sql.DB, error) { domainDBsMu.RLock() db, ok := domainDBs[domain] @@ -153,24 +237,32 @@ func openDomainDB(domain string) (*sql.DB, error) { if err != nil { return nil, err } - _, err = db.Exec(` + if _, err = db.Exec(` CREATE TABLE IF NOT EXISTS urls ( id INTEGER PRIMARY KEY AUTOINCREMENT, url TEXT NOT NULL UNIQUE, created_at DATETIME NOT NULL, updated_at DATETIME NOT NULL - )`) - if err != nil { + )`); err != nil { db.Close() return nil, err } - _, err = db.Exec(` + if _, err = db.Exec(` CREATE TABLE IF NOT EXISTS queue ( id INTEGER PRIMARY KEY AUTOINCREMENT, url TEXT NOT NULL UNIQUE, added_at DATETIME NOT NULL - )`) - if err != nil { + )`); err != nil { + db.Close() + return nil, err + } + // cross-domain links discovered during crawl + if _, err = db.Exec(` + CREATE TABLE IF NOT EXISTS ext_links ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ext_domain TEXT NOT NULL UNIQUE, + found_at DATETIME NOT NULL + )`); err != nil { db.Close() return nil, err } @@ -185,8 +277,7 @@ func insertURL(db *sql.DB, rawURL string) (bool, error) { now := time.Now().UTC().Format(time.RFC3339) res, err := db.Exec( `INSERT OR IGNORE INTO urls (url, created_at, updated_at) VALUES (?, ?, ?)`, - rawURL, now, now, - ) + rawURL, now, now) if err != nil { return false, err } @@ -200,29 +291,22 @@ func isURLKnown(db *sql.DB, rawURL string) bool { return c > 0 } -// ── persistent queue helpers ────────────────────────────────────── - -// enqueueURL adds a URL to the persistent queue if not already there -// and not already crawled. func enqueueURL(db *sql.DB, rawURL string) { now := time.Now().UTC().Format(time.RFC3339) db.Exec(`INSERT OR IGNORE INTO queue (url, added_at) VALUES (?, ?)`, rawURL, now) } -// dequeueURL removes and returns the oldest queued URL (FIFO). -// Returns "", false when the queue is empty. func dequeueURL(db *sql.DB) (string, bool) { tx, err := db.Begin() if err != nil { return "", false } defer tx.Rollback() //nolint:errcheck - var id int64 var rawURL string - err = tx.QueryRow(`SELECT id, url FROM queue ORDER BY id ASC LIMIT 1`).Scan(&id, &rawURL) - if err != nil { - return "", false // empty + if err = tx.QueryRow(`SELECT id, url FROM queue ORDER BY id ASC LIMIT 1`). + Scan(&id, &rawURL); err != nil { + return "", false } if _, err = tx.Exec(`DELETE FROM queue WHERE id = ?`, id); err != nil { return "", false @@ -233,31 +317,138 @@ func dequeueURL(db *sql.DB) (string, bool) { return rawURL, true } -// queueLen returns the current number of pending URLs. func queueLen(db *sql.DB) int { var n int db.QueryRow(`SELECT COUNT(1) FROM queue`).Scan(&n) return n } -// seedQueue inserts the start URL only when the queue is completely empty -// (first ever run). On restart the persisted queue is used as-is. func seedQueue(db *sql.DB, startURL string) { - var qCount, uCount int - db.QueryRow(`SELECT COUNT(1) FROM queue`).Scan(&qCount) - db.QueryRow(`SELECT COUNT(1) FROM urls`).Scan(&uCount) - if qCount == 0 && uCount == 0 { + var qc, uc int + db.QueryRow(`SELECT COUNT(1) FROM queue`).Scan(&qc) + db.QueryRow(`SELECT COUNT(1) FROM urls`).Scan(&uc) + if qc == 0 && uc == 0 { enqueueURL(db, startURL) } } +// recordExtLink saves a discovered external domain and auto-registers it. +func recordExtLink(srcDomain, extDomain string, parentInterval int) { + db, err := openDomainDB(srcDomain) + if err != nil { + return + } + now := time.Now().UTC().Format(time.RFC3339) + res, _ := db.Exec( + `INSERT OR IGNORE INTO ext_links (ext_domain, found_at) VALUES (?, ?)`, + extDomain, now) + n, _ := res.RowsAffected() + if n == 0 { + return // already recorded + } + + // Register in main DB (inherit parent's interval) + if err := registerDomain(extDomain, parentInterval, srcDomain); err != nil { + log.Printf("registerDomain %s (from %s): %v", extDomain, srcDomain, err) + return + } + + log.Printf("[%s] discovered external domain: %s", srcDomain, extDomain) + + // Notify UI + broadcast("new_domain", map[string]string{ + "domain": extDomain, + "parent": srcDomain, + }) + + // Init the new domain's DB and start its crawler + if _, err := openDomainDB(extDomain); err != nil { + log.Printf("openDomainDB %s: %v", extDomain, err) + return + } + + crawlersMu.Lock() + if !crawlers[extDomain] { + crawlers[extDomain] = true + go crawlDomain(extDomain, parentInterval) + } + crawlersMu.Unlock() +} + // ───────────────────────────────────────────────────────────────── -// robots.txt (minimal, single-pass parser) +// Pause / resume machinery +// ───────────────────────────────────────────────────────────────── + +func ensurePauseChannels(domain string) { + pauseChsMu.Lock() + defer pauseChsMu.Unlock() + if _, ok := pauseChs[domain]; !ok { + pauseChs[domain] = make(chan struct{}, 1) + resumeChs[domain] = make(chan struct{}, 1) + } +} + +// pauseCrawler signals the crawler to pause. Non-blocking. +func pauseCrawler(domain string) { + pauseChsMu.RLock() + ch, ok := pauseChs[domain] + pauseChsMu.RUnlock() + if !ok { + return + } + select { + case ch <- struct{}{}: + default: + } +} + +// resumeCrawler signals a paused crawler to continue. Non-blocking. +func resumeCrawler(domain string) { + pauseChsMu.RLock() + ch, ok := resumeChs[domain] + pauseChsMu.RUnlock() + if !ok { + return + } + select { + case ch <- struct{}{}: + default: + } +} + +// checkPause is called inside the crawl loop between requests. +// If a pause signal is pending it blocks until resume arrives. +func checkPause(domain string, br *Broker) { + pauseChsMu.RLock() + pCh := pauseChs[domain] + rCh := resumeChs[domain] + pauseChsMu.RUnlock() + + select { + case <-pCh: + setDomainStatus(domain, statusPaused) + emit(br, "paused", map[string]string{"domain": domain}) + log.Printf("[%s] paused", domain) + // drain any duplicate pause signals + for len(pCh) > 0 { + <-pCh + } + // block until resume + <-rCh + setDomainStatus(domain, statusRunning) + emit(br, "resumed", map[string]string{"domain": domain}) + log.Printf("[%s] resumed", domain) + default: + } +} + +// ───────────────────────────────────────────────────────────────── +// robots.txt // ───────────────────────────────────────────────────────────────── type robotsRules struct { disallowed []string - crawlDelay int // 0 = not set + crawlDelay int } func fetchRobots(domain string) *robotsRules { @@ -277,11 +468,9 @@ func fetchRobots(domain string) *robotsRules { continue } lower := strings.ToLower(line) - if strings.HasPrefix(lower, "user-agent:") { agent := strings.TrimSpace(line[len("user-agent:"):]) - inSection = agent == "*" || - strings.EqualFold(agent, "siliconpin_spider") + inSection = agent == "*" || strings.EqualFold(agent, "siliconpin_spider") continue } if !inSection { @@ -310,14 +499,21 @@ func (r *robotsRules) allowed(path string) bool { } // ───────────────────────────────────────────────────────────────── -// Link extractor – same-host HTML links only +// Link extractor // ───────────────────────────────────────────────────────────────── var hrefRe = regexp.MustCompile(`(?i)href=["']([^"'#][^"']*)["']`) -func extractLinks(base *url.URL, body string) []string { - seen := map[string]bool{} - var links []string +type extractedLinks struct { + sameHost []string + external []string // distinct external hostnames (not full URLs) +} + +func extractLinks(base *url.URL, body string) extractedLinks { + seenSame := map[string]bool{} + seenExt := map[string]bool{} + var result extractedLinks + for _, m := range hrefRe.FindAllStringSubmatch(body, -1) { href := strings.TrimSpace(m[1]) parsed, err := url.Parse(href) @@ -330,16 +526,25 @@ func extractLinks(base *url.URL, body string) []string { if resolved.Scheme != "http" && resolved.Scheme != "https" { continue } - if !strings.EqualFold(resolved.Hostname(), base.Hostname()) { - continue - } - s := resolved.String() - if !seen[s] { - seen[s] = true - links = append(links, s) + host := strings.ToLower(resolved.Hostname()) + baseHost := strings.ToLower(base.Hostname()) + + if host == baseHost { + s := resolved.String() + if !seenSame[s] { + seenSame[s] = true + result.sameHost = append(result.sameHost, s) + } + } else { + // strip www. for normalisation + extDomain := strings.TrimPrefix(host, "www.") + if extDomain != "" && !seenExt[extDomain] && isValidDomain(extDomain) { + seenExt[extDomain] = true + result.external = append(result.external, extDomain) + } } } - return links + return result } // ───────────────────────────────────────────────────────────────── @@ -347,20 +552,21 @@ func extractLinks(base *url.URL, body string) []string { // ───────────────────────────────────────────────────────────────── func crawlDomain(domain string, intervalSec int) { - log.Printf("[%s] crawler started (base interval %ds)", domain, intervalSec) + log.Printf("[%s] crawler started (interval %ds)", domain, intervalSec) br := getBroker(domain) + ensurePauseChannels(domain) db, err := openDomainDB(domain) if err != nil { - emit(br, "error", map[string]string{"msg": "DB error: " + err.Error()}) + emit(br, "error", map[string]string{"msg": "DB open failed: " + err.Error()}) return } - // ── robots.txt ────────────────────────────────────────────── + setDomainStatus(domain, statusRunning) + + // robots.txt emit(br, "status", map[string]string{"msg": "fetching robots.txt"}) robots := fetchRobots(domain) - - // robots.txt crawl-delay overrides our setting if higher if robots.crawlDelay > intervalSec { intervalSec = robots.crawlDelay now := time.Now().UTC().Format(time.RFC3339) @@ -373,10 +579,6 @@ func crawlDomain(domain string, intervalSec int) { "effective_delay": intervalSec, }) - // ── Persistent BFS queue ──────────────────────────────────── - // On first run: seed with the start URL. - // On restart: the queue table already holds the pending URLs — - // we just continue from where we left off. startURL := "https://" + domain + "/" seedQueue(db, startURL) @@ -391,24 +593,25 @@ func crawlDomain(domain string, intervalSec int) { } for { - // Re-read interval in case it was updated via API + // ── pause check ───────────────────────────────────────── + checkPause(domain, br) + + // ── re-read interval ──────────────────────────────────── var cur int - if err := mainDB.QueryRow(`SELECT interval FROM domains WHERE domain=?`, domain).Scan(&cur); err == nil && cur > 0 { + if mainDB.QueryRow(`SELECT interval FROM domains WHERE domain=?`, domain). + Scan(&cur) == nil && cur > 0 { intervalSec = cur } target, ok := dequeueURL(db) if !ok { - break // queue exhausted + break } - // Skip if already crawled (can happen if same URL was enqueued - // multiple times before being dequeued, or after a re-seed) if isURLKnown(db, target) { continue } - // robots check parsed, err := url.Parse(target) if err != nil { continue @@ -418,24 +621,24 @@ func crawlDomain(domain string, intervalSec int) { continue } - // random delay: [interval, interval*2] seconds + // random delay [interval, interval*2] delaySec := intervalSec + rand.Intn(intervalSec+1) - delay := time.Duration(delaySec) * time.Second emit(br, "waiting", map[string]interface{}{ "url": target, "delay_s": delaySec, "queue": queueLen(db), }) - time.Sleep(delay) + time.Sleep(time.Duration(delaySec) * time.Second) + + // ── pause check after sleep (could have been paused during wait) ── + checkPause(domain, br) - // fetch emit(br, "fetching", map[string]string{"url": target}) resp, err := httpClient.Get(target) if err != nil { emit(br, "error", map[string]string{"url": target, "msg": err.Error()}) log.Printf("[%s] fetch error %s: %v", domain, target, err) - // Re-enqueue so it's retried next run - enqueueURL(db, target) + enqueueURL(db, target) // retry next run continue } @@ -444,13 +647,12 @@ func crawlDomain(domain string, intervalSec int) { var bodyStr string if isHTML { - raw, _ := io.ReadAll(io.LimitReader(resp.Body, 5<<20)) // 5 MB cap + raw, _ := io.ReadAll(io.LimitReader(resp.Body, 5<<20)) bodyStr = string(raw) } resp.Body.Close() - inserted, _ := insertURL(db, target) - if inserted { + if ins, _ := insertURL(db, target); ins { emit(br, "saved", map[string]interface{}{ "url": target, "status": resp.StatusCode, @@ -459,11 +661,12 @@ func crawlDomain(domain string, intervalSec int) { log.Printf("[%s] saved: %s", domain, target) } - // discover links from HTML pages if isHTML && resp.StatusCode == 200 { links := extractLinks(parsed, bodyStr) + + // same-host links → queue newCount := 0 - for _, link := range links { + for _, link := range links.sameHost { if !isURLKnown(db, link) { enqueueURL(db, link) newCount++ @@ -471,13 +674,20 @@ func crawlDomain(domain string, intervalSec int) { } emit(br, "links_found", map[string]interface{}{ "url": target, - "found": len(links), + "found": len(links.sameHost), "new": newCount, "queue_len": queueLen(db), + "external": len(links.external), }) + + // external domains → auto-register & crawl + for _, extDomain := range links.external { + recordExtLink(domain, extDomain, intervalSec) + } } } + setDomainStatus(domain, statusDone) emit(br, "done", map[string]string{"domain": domain, "msg": "crawl complete"}) log.Printf("[%s] crawl complete", domain) @@ -487,7 +697,7 @@ func crawlDomain(domain string, intervalSec int) { } // ───────────────────────────────────────────────────────────────── -// HTTP handlers +// HTTP helpers // ───────────────────────────────────────────────────────────────── func sanitizeDomain(raw string) string { @@ -503,37 +713,39 @@ var domainRe = regexp.MustCompile(`^[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])? func isValidDomain(d string) bool { return domainRe.MatchString(d) } +func jsonOK(w http.ResponseWriter, code int, v interface{}) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(code) + json.NewEncoder(w).Encode(v) +} + +// ───────────────────────────────────────────────────────────────── +// Handlers +// ───────────────────────────────────────────────────────────────── + // POST /api/add_domain func addDomainHandler(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "method not allowed", http.StatusMethodNotAllowed) return } - var body struct { Domain string `json:"domain"` CrawlDelay string `json:"Crawl-delay"` } - w.Header().Set("Content-Type", "application/json") - if err := json.NewDecoder(r.Body).Decode(&body); err != nil { - w.WriteHeader(http.StatusBadRequest) - json.NewEncoder(w).Encode(map[string]string{"error": "invalid JSON"}) + jsonOK(w, http.StatusBadRequest, map[string]string{"error": "invalid JSON"}) return } if body.Domain == "" { - w.WriteHeader(http.StatusBadRequest) - json.NewEncoder(w).Encode(map[string]string{"error": "domain is required"}) + jsonOK(w, http.StatusBadRequest, map[string]string{"error": "domain is required"}) return } - domain := sanitizeDomain(body.Domain) if !isValidDomain(domain) { - w.WriteHeader(http.StatusBadRequest) - json.NewEncoder(w).Encode(map[string]string{"error": "invalid domain"}) + jsonOK(w, http.StatusBadRequest, map[string]string{"error": "invalid domain"}) return } - interval := 60 if body.CrawlDelay != "" { fmt.Sscanf(body.CrawlDelay, "%d", &interval) @@ -542,25 +754,15 @@ func addDomainHandler(w http.ResponseWriter, r *http.Request) { } } - now := time.Now().UTC().Format(time.RFC3339) - _, err := mainDB.Exec( - `INSERT INTO domains (domain,interval,created_at,updated_at) VALUES (?,?,?,?) - ON CONFLICT(domain) DO UPDATE SET interval=excluded.interval, updated_at=excluded.updated_at`, - domain, interval, now, now, - ) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - json.NewEncoder(w).Encode(map[string]string{"error": "db error"}) + if err := registerDomain(domain, interval, ""); err != nil { + jsonOK(w, http.StatusInternalServerError, map[string]string{"error": "db error"}) return } - if _, err := openDomainDB(domain); err != nil { - w.WriteHeader(http.StatusInternalServerError) - json.NewEncoder(w).Encode(map[string]string{"error": "domain DB init failed"}) + jsonOK(w, http.StatusInternalServerError, map[string]string{"error": "domain DB init failed"}) return } - // start crawler if not already running crawlersMu.Lock() if !crawlers[domain] { crawlers[domain] = true @@ -568,8 +770,9 @@ func addDomainHandler(w http.ResponseWriter, r *http.Request) { } crawlersMu.Unlock() - w.WriteHeader(http.StatusCreated) - json.NewEncoder(w).Encode(map[string]interface{}{ + broadcast("new_domain", map[string]string{"domain": domain, "parent": ""}) + + jsonOK(w, http.StatusCreated, map[string]interface{}{ "message": "domain added, crawler started", "domain": domain, "interval": interval, @@ -578,35 +781,98 @@ func addDomainHandler(w http.ResponseWriter, r *http.Request) { }) } -// GET /api/sse/{domain} -func sseHandler(w http.ResponseWriter, r *http.Request) { - rawDomain := strings.TrimPrefix(r.URL.Path, "/api/sse/") - domain := sanitizeDomain(rawDomain) - if !isValidDomain(domain) { - http.Error(w, "invalid domain", http.StatusBadRequest) +// GET /api/domains +func domainsHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) return } + list, err := listDomains() + if err != nil { + jsonOK(w, http.StatusInternalServerError, map[string]string{"error": err.Error()}) + return + } + if list == nil { + list = []DomainRow{} + } + jsonOK(w, http.StatusOK, list) +} + +// POST /api/pause/{domain} +func pauseHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + domain := sanitizeDomain(strings.TrimPrefix(r.URL.Path, "/api/pause/")) + if !isValidDomain(domain) { + jsonOK(w, http.StatusBadRequest, map[string]string{"error": "invalid domain"}) + return + } + + crawlersMu.Lock() + running := crawlers[domain] + crawlersMu.Unlock() + + if !running { + jsonOK(w, http.StatusConflict, map[string]string{"error": "crawler not running for this domain"}) + return + } + + pauseCrawler(domain) + jsonOK(w, http.StatusOK, map[string]string{"message": "pause signal sent", "domain": domain}) +} + +// POST /api/resume/{domain} +func resumeHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + domain := sanitizeDomain(strings.TrimPrefix(r.URL.Path, "/api/resume/")) + if !isValidDomain(domain) { + jsonOK(w, http.StatusBadRequest, map[string]string{"error": "invalid domain"}) + return + } + resumeCrawler(domain) + jsonOK(w, http.StatusOK, map[string]string{"message": "resume signal sent", "domain": domain}) +} + +// GET /api/sse/{domain} — or /api/sse/ (global stream for all domains) +func sseHandler(w http.ResponseWriter, r *http.Request) { + rawDomain := strings.TrimPrefix(r.URL.Path, "/api/sse/") + rawDomain = strings.TrimRight(rawDomain, "/") flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "streaming not supported", http.StatusInternalServerError) return } - w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") - w.Header().Set("X-Accel-Buffering", "no") // nginx: disable proxy buffering + w.Header().Set("X-Accel-Buffering", "no") w.Header().Set("Access-Control-Allow-Origin", "*") - br := getBroker(domain) + // If no domain specified, subscribe to the global "__all__" broker + // which receives broadcasts (new_domain, shutdown, etc.) + domainKey := rawDomain + if domainKey == "" { + domainKey = "__all__" + } else { + domainKey = sanitizeDomain(domainKey) + if !isValidDomain(domainKey) && domainKey != "__all__" { + http.Error(w, "invalid domain", http.StatusBadRequest) + return + } + } + + br := getBroker(domainKey) ch := br.subscribe() defer br.unsubscribe(ch) - log.Printf("[SSE] client connected → %s", domain) - - // send immediate connected event - fmt.Fprintf(w, "data: {\"event\":\"connected\",\"data\":{\"domain\":%q}}\n\n", domain) + log.Printf("[SSE] client connected → %s", domainKey) + fmt.Fprintf(w, "data: {\"event\":\"connected\",\"data\":{\"domain\":%q}}\n\n", domainKey) flusher.Flush() ticker := time.NewTicker(25 * time.Second) @@ -615,7 +881,7 @@ func sseHandler(w http.ResponseWriter, r *http.Request) { for { select { case <-r.Context().Done(): - log.Printf("[SSE] client disconnected → %s", domain) + log.Printf("[SSE] client disconnected → %s", domainKey) return case msg := <-ch: fmt.Fprintf(w, "data: %s\n\n", msg) @@ -627,6 +893,27 @@ func sseHandler(w http.ResponseWriter, r *http.Request) { } } +// ───────────────────────────────────────────────────────────────── +// broadcast helper – publish to __all__ broker +// ───────────────────────────────────────────────────────────────── + +func init() { + // ensure the global broadcast broker always exists + getBroker("__all__") +} + +// override broadcast to also send to __all__ +func broadcastAll(event string, data interface{}) { + b, _ := json.Marshal(ssePayload{Event: event, Data: data}) + msg := string(b) + + brokersMu.RLock() + defer brokersMu.RUnlock() + for _, br := range brokers { + br.publish(msg) + } +} + // ───────────────────────────────────────────────────────────────── // main // ───────────────────────────────────────────────────────────────── @@ -640,20 +927,28 @@ func main() { initMainDB() - // Resume any domains already in the DB from a previous run - rows, err := mainDB.Query(`SELECT domain, interval FROM domains`) + // Resume domains from previous run + rows, err := mainDB.Query(`SELECT domain, interval, status FROM domains`) if err == nil { for rows.Next() { - var d string + var d, status string var iv int - if rows.Scan(&d, &iv) == nil { - crawlersMu.Lock() - if !crawlers[d] { - crawlers[d] = true - go crawlDomain(d, iv) - } - crawlersMu.Unlock() + if rows.Scan(&d, &iv, &status) != nil { + continue } + // don't restart completed or paused crawls automatically; + // only restart those that were mid-flight (running/pending) + if status == statusDone { + continue + } + crawlersMu.Lock() + if !crawlers[d] { + crawlers[d] = true + // reset status so it shows running + setDomainStatus(d, statusPending) + go crawlDomain(d, iv) + } + crawlersMu.Unlock() } rows.Close() } @@ -661,61 +956,46 @@ func main() { mux := http.NewServeMux() mux.Handle("/", http.FileServer(http.Dir("./static"))) mux.HandleFunc("/api/add_domain", addDomainHandler) + mux.HandleFunc("/api/domains", domainsHandler) + mux.HandleFunc("/api/pause/", pauseHandler) + mux.HandleFunc("/api/resume/", resumeHandler) mux.HandleFunc("/api/sse/", sseHandler) - srv := &http.Server{ - Addr: ":8080", - Handler: mux, - } + srv := &http.Server{Addr: ":8080", Handler: mux} - // ── Graceful shutdown ──────────────────────────────────────── quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) go func() { - log.Printf("siliconpin_spider listening on %s", srv.Addr) + log.Printf("siliconpin_spider listening on :8080") if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { - log.Fatalf("server error: %v", err) + log.Fatalf("server: %v", err) } }() - sig := <-quit - log.Printf("received %s — shutting down gracefully…", sig) + <-quit + log.Println("shutting down…") - // 1. Stop accepting new HTTP requests; give in-flight ones 10s to finish ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - if err := srv.Shutdown(ctx); err != nil { - log.Printf("HTTP shutdown error: %v", err) - } + srv.Shutdown(ctx) //nolint:errcheck - // 2. Notify all SSE clients brokersMu.RLock() - for domain, br := range brokers { - emit(br, "shutdown", map[string]string{"domain": domain, "msg": "server stopping"}) + for d, br := range brokers { + emit(br, "shutdown", map[string]string{"domain": d, "msg": "server stopping"}) } brokersMu.RUnlock() - - // Brief pause so SSE messages flush to clients time.Sleep(500 * time.Millisecond) - // 3. Checkpoint WAL → merge pending writes into the .sqlite file - // After this the .sqlite is fully self-contained (no WAL needed). domainDBsMu.RLock() - for domain, db := range domainDBs { - if _, err := db.Exec(`PRAGMA wal_checkpoint(TRUNCATE)`); err != nil { - log.Printf("checkpoint %s: %v", domain, err) - } else { - log.Printf("checkpointed %s.sqlite", domain) - } + for d, db := range domainDBs { + db.Exec(`PRAGMA wal_checkpoint(TRUNCATE)`) //nolint:errcheck db.Close() + log.Printf("checkpointed %s.sqlite", d) } domainDBsMu.RUnlock() - if _, err := mainDB.Exec(`PRAGMA wal_checkpoint(TRUNCATE)`); err != nil { - log.Printf("checkpoint main DB: %v", err) - } + mainDB.Exec(`PRAGMA wal_checkpoint(TRUNCATE)`) //nolint:errcheck mainDB.Close() - - log.Println("shutdown complete — all WAL data flushed, goodbye.") + log.Println("goodbye.") } diff --git a/static/index.html b/static/index.html index e06677f..f48ba7a 100644 --- a/static/index.html +++ b/static/index.html @@ -5,134 +5,379 @@
Polite web crawler — respects robots.txt · random delay · SSE live feed
+Select a domain to watch its live feed
+